272 lines
5.8 KiB
Go
Executable File
272 lines
5.8 KiB
Go
Executable File
package alils
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/http/httputil"
|
|
"strconv"
|
|
|
|
lz4 "github.com/cloudflare/golz4"
|
|
"github.com/gogo/protobuf/proto"
|
|
)
|
|
|
|
// LogStore Store the logs
|
|
type LogStore struct {
|
|
Name string `json:"logstoreName"`
|
|
TTL int
|
|
ShardCount int
|
|
|
|
CreateTime uint32
|
|
LastModifyTime uint32
|
|
|
|
project *LogProject
|
|
}
|
|
|
|
// Shard define the Log Shard
|
|
type Shard struct {
|
|
ShardID int `json:"shardID"`
|
|
}
|
|
|
|
// ListShards returns shard id list of this logstore.
|
|
func (s *LogStore) ListShards() (shardIDs []int, err error) {
|
|
h := map[string]string{
|
|
"x-sls-bodyrawsize": "0",
|
|
}
|
|
|
|
uri := fmt.Sprintf("/logstores/%v/shards", s.Name)
|
|
r, err := request(s.project, "GET", uri, h, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
buf, err := ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if r.StatusCode != http.StatusOK {
|
|
errMsg := &errorMessage{}
|
|
err = json.Unmarshal(buf, errMsg)
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to list logstore")
|
|
dump, _ := httputil.DumpResponse(r, true)
|
|
fmt.Println(dump)
|
|
return
|
|
}
|
|
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
|
return
|
|
}
|
|
|
|
var shards []*Shard
|
|
err = json.Unmarshal(buf, &shards)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for _, v := range shards {
|
|
shardIDs = append(shardIDs, v.ShardID)
|
|
}
|
|
return
|
|
}
|
|
|
|
// PutLogs put logs into logstore.
|
|
// The callers should transform user logs into LogGroup.
|
|
func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
|
|
body, err := proto.Marshal(lg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Compresse body with lz4
|
|
out := make([]byte, lz4.CompressBound(body))
|
|
n, err := lz4.Compress(body, out)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
h := map[string]string{
|
|
"x-sls-compresstype": "lz4",
|
|
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
|
|
"Content-Type": "application/x-protobuf",
|
|
}
|
|
|
|
uri := fmt.Sprintf("/logstores/%v", s.Name)
|
|
r, err := request(s.project, "POST", uri, h, out[:n])
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
buf, err := ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if r.StatusCode != http.StatusOK {
|
|
errMsg := &errorMessage{}
|
|
err = json.Unmarshal(buf, errMsg)
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to put logs")
|
|
dump, _ := httputil.DumpResponse(r, true)
|
|
fmt.Println(dump)
|
|
return
|
|
}
|
|
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// GetCursor gets log cursor of one shard specified by shardID.
|
|
// The from can be in three form: a) unix timestamp in seccond, b) "begin", c) "end".
|
|
// For more detail please read: http://gitlab.alibaba-inc.com/sls/doc/blob/master/api/shard.md#logstore
|
|
func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error) {
|
|
h := map[string]string{
|
|
"x-sls-bodyrawsize": "0",
|
|
}
|
|
|
|
uri := fmt.Sprintf("/logstores/%v/shards/%v?type=cursor&from=%v",
|
|
s.Name, shardID, from)
|
|
|
|
r, err := request(s.project, "GET", uri, h, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
buf, err := ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if r.StatusCode != http.StatusOK {
|
|
errMsg := &errorMessage{}
|
|
err = json.Unmarshal(buf, errMsg)
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to get cursor")
|
|
dump, _ := httputil.DumpResponse(r, true)
|
|
fmt.Println(dump)
|
|
return
|
|
}
|
|
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
|
return
|
|
}
|
|
|
|
type Body struct {
|
|
Cursor string
|
|
}
|
|
body := &Body{}
|
|
|
|
err = json.Unmarshal(buf, body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
cursor = body.Cursor
|
|
return
|
|
}
|
|
|
|
// GetLogsBytes gets logs binary data from shard specified by shardID according cursor.
|
|
// The logGroupMaxCount is the max number of logGroup could be returned.
|
|
// The nextCursor is the next curosr can be used to read logs at next time.
|
|
func (s *LogStore) GetLogsBytes(shardID int, cursor string,
|
|
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
|
|
|
|
h := map[string]string{
|
|
"x-sls-bodyrawsize": "0",
|
|
"Accept": "application/x-protobuf",
|
|
"Accept-Encoding": "lz4",
|
|
}
|
|
|
|
uri := fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
|
|
s.Name, shardID, cursor, logGroupMaxCount)
|
|
|
|
r, err := request(s.project, "GET", uri, h, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
buf, err := ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if r.StatusCode != http.StatusOK {
|
|
errMsg := &errorMessage{}
|
|
err = json.Unmarshal(buf, errMsg)
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to get cursor")
|
|
dump, _ := httputil.DumpResponse(r, true)
|
|
fmt.Println(dump)
|
|
return
|
|
}
|
|
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
|
return
|
|
}
|
|
|
|
v, ok := r.Header["X-Sls-Compresstype"]
|
|
if !ok || len(v) == 0 {
|
|
err = fmt.Errorf("can't find 'x-sls-compresstype' header")
|
|
return
|
|
}
|
|
if v[0] != "lz4" {
|
|
err = fmt.Errorf("unexpected compress type:%v", v[0])
|
|
return
|
|
}
|
|
|
|
v, ok = r.Header["X-Sls-Cursor"]
|
|
if !ok || len(v) == 0 {
|
|
err = fmt.Errorf("can't find 'x-sls-cursor' header")
|
|
return
|
|
}
|
|
nextCursor = v[0]
|
|
|
|
v, ok = r.Header["X-Sls-Bodyrawsize"]
|
|
if !ok || len(v) == 0 {
|
|
err = fmt.Errorf("can't find 'x-sls-bodyrawsize' header")
|
|
return
|
|
}
|
|
bodyRawSize, err := strconv.Atoi(v[0])
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
out = make([]byte, bodyRawSize)
|
|
err = lz4.Uncompress(buf, out)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// LogsBytesDecode decodes logs binary data retruned by GetLogsBytes API
|
|
func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {
|
|
|
|
gl = &LogGroupList{}
|
|
err = proto.Unmarshal(data, gl)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// GetLogs gets logs from shard specified by shardID according cursor.
|
|
// The logGroupMaxCount is the max number of logGroup could be returned.
|
|
// The nextCursor is the next curosr can be used to read logs at next time.
|
|
func (s *LogStore) GetLogs(shardID int, cursor string,
|
|
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
|
|
|
|
out, nextCursor, err := s.GetLogsBytes(shardID, cursor, logGroupMaxCount)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
gl, err = LogsBytesDecode(out)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|