高热共公日志库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

272 lines
5.8 KiB

  1. package alils
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/http/httputil"
  8. "strconv"
  9. lz4 "github.com/cloudflare/golz4"
  10. "github.com/gogo/protobuf/proto"
  11. )
  12. // LogStore Store the logs
  13. type LogStore struct {
  14. Name string `json:"logstoreName"`
  15. TTL int
  16. ShardCount int
  17. CreateTime uint32
  18. LastModifyTime uint32
  19. project *LogProject
  20. }
  21. // Shard define the Log Shard
  22. type Shard struct {
  23. ShardID int `json:"shardID"`
  24. }
  25. // ListShards returns shard id list of this logstore.
  26. func (s *LogStore) ListShards() (shardIDs []int, err error) {
  27. h := map[string]string{
  28. "x-sls-bodyrawsize": "0",
  29. }
  30. uri := fmt.Sprintf("/logstores/%v/shards", s.Name)
  31. r, err := request(s.project, "GET", uri, h, nil)
  32. if err != nil {
  33. return
  34. }
  35. buf, err := ioutil.ReadAll(r.Body)
  36. if err != nil {
  37. return
  38. }
  39. if r.StatusCode != http.StatusOK {
  40. errMsg := &errorMessage{}
  41. err = json.Unmarshal(buf, errMsg)
  42. if err != nil {
  43. err = fmt.Errorf("failed to list logstore")
  44. dump, _ := httputil.DumpResponse(r, true)
  45. fmt.Println(dump)
  46. return
  47. }
  48. err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
  49. return
  50. }
  51. var shards []*Shard
  52. err = json.Unmarshal(buf, &shards)
  53. if err != nil {
  54. return
  55. }
  56. for _, v := range shards {
  57. shardIDs = append(shardIDs, v.ShardID)
  58. }
  59. return
  60. }
  61. // PutLogs put logs into logstore.
  62. // The callers should transform user logs into LogGroup.
  63. func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
  64. body, err := proto.Marshal(lg)
  65. if err != nil {
  66. return
  67. }
  68. // Compresse body with lz4
  69. out := make([]byte, lz4.CompressBound(body))
  70. n, err := lz4.Compress(body, out)
  71. if err != nil {
  72. return
  73. }
  74. h := map[string]string{
  75. "x-sls-compresstype": "lz4",
  76. "x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
  77. "Content-Type": "application/x-protobuf",
  78. }
  79. uri := fmt.Sprintf("/logstores/%v", s.Name)
  80. r, err := request(s.project, "POST", uri, h, out[:n])
  81. if err != nil {
  82. return
  83. }
  84. buf, err := ioutil.ReadAll(r.Body)
  85. if err != nil {
  86. return
  87. }
  88. if r.StatusCode != http.StatusOK {
  89. errMsg := &errorMessage{}
  90. err = json.Unmarshal(buf, errMsg)
  91. if err != nil {
  92. err = fmt.Errorf("failed to put logs")
  93. dump, _ := httputil.DumpResponse(r, true)
  94. fmt.Println(dump)
  95. return
  96. }
  97. err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
  98. return
  99. }
  100. return
  101. }
  102. // GetCursor gets log cursor of one shard specified by shardID.
  103. // The from can be in three form: a) unix timestamp in seccond, b) "begin", c) "end".
  104. // For more detail please read: http://gitlab.alibaba-inc.com/sls/doc/blob/master/api/shard.md#logstore
  105. func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error) {
  106. h := map[string]string{
  107. "x-sls-bodyrawsize": "0",
  108. }
  109. uri := fmt.Sprintf("/logstores/%v/shards/%v?type=cursor&from=%v",
  110. s.Name, shardID, from)
  111. r, err := request(s.project, "GET", uri, h, nil)
  112. if err != nil {
  113. return
  114. }
  115. buf, err := ioutil.ReadAll(r.Body)
  116. if err != nil {
  117. return
  118. }
  119. if r.StatusCode != http.StatusOK {
  120. errMsg := &errorMessage{}
  121. err = json.Unmarshal(buf, errMsg)
  122. if err != nil {
  123. err = fmt.Errorf("failed to get cursor")
  124. dump, _ := httputil.DumpResponse(r, true)
  125. fmt.Println(dump)
  126. return
  127. }
  128. err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
  129. return
  130. }
  131. type Body struct {
  132. Cursor string
  133. }
  134. body := &Body{}
  135. err = json.Unmarshal(buf, body)
  136. if err != nil {
  137. return
  138. }
  139. cursor = body.Cursor
  140. return
  141. }
  142. // GetLogsBytes gets logs binary data from shard specified by shardID according cursor.
  143. // The logGroupMaxCount is the max number of logGroup could be returned.
  144. // The nextCursor is the next curosr can be used to read logs at next time.
  145. func (s *LogStore) GetLogsBytes(shardID int, cursor string,
  146. logGroupMaxCount int) (out []byte, nextCursor string, err error) {
  147. h := map[string]string{
  148. "x-sls-bodyrawsize": "0",
  149. "Accept": "application/x-protobuf",
  150. "Accept-Encoding": "lz4",
  151. }
  152. uri := fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
  153. s.Name, shardID, cursor, logGroupMaxCount)
  154. r, err := request(s.project, "GET", uri, h, nil)
  155. if err != nil {
  156. return
  157. }
  158. buf, err := ioutil.ReadAll(r.Body)
  159. if err != nil {
  160. return
  161. }
  162. if r.StatusCode != http.StatusOK {
  163. errMsg := &errorMessage{}
  164. err = json.Unmarshal(buf, errMsg)
  165. if err != nil {
  166. err = fmt.Errorf("failed to get cursor")
  167. dump, _ := httputil.DumpResponse(r, true)
  168. fmt.Println(dump)
  169. return
  170. }
  171. err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
  172. return
  173. }
  174. v, ok := r.Header["X-Sls-Compresstype"]
  175. if !ok || len(v) == 0 {
  176. err = fmt.Errorf("can't find 'x-sls-compresstype' header")
  177. return
  178. }
  179. if v[0] != "lz4" {
  180. err = fmt.Errorf("unexpected compress type:%v", v[0])
  181. return
  182. }
  183. v, ok = r.Header["X-Sls-Cursor"]
  184. if !ok || len(v) == 0 {
  185. err = fmt.Errorf("can't find 'x-sls-cursor' header")
  186. return
  187. }
  188. nextCursor = v[0]
  189. v, ok = r.Header["X-Sls-Bodyrawsize"]
  190. if !ok || len(v) == 0 {
  191. err = fmt.Errorf("can't find 'x-sls-bodyrawsize' header")
  192. return
  193. }
  194. bodyRawSize, err := strconv.Atoi(v[0])
  195. if err != nil {
  196. return
  197. }
  198. out = make([]byte, bodyRawSize)
  199. err = lz4.Uncompress(buf, out)
  200. if err != nil {
  201. return
  202. }
  203. return
  204. }
  205. // LogsBytesDecode decodes logs binary data retruned by GetLogsBytes API
  206. func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {
  207. gl = &LogGroupList{}
  208. err = proto.Unmarshal(data, gl)
  209. if err != nil {
  210. return
  211. }
  212. return
  213. }
  214. // GetLogs gets logs from shard specified by shardID according cursor.
  215. // The logGroupMaxCount is the max number of logGroup could be returned.
  216. // The nextCursor is the next curosr can be used to read logs at next time.
  217. func (s *LogStore) GetLogs(shardID int, cursor string,
  218. logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
  219. out, nextCursor, err := s.GetLogsBytes(shardID, cursor, logGroupMaxCount)
  220. if err != nil {
  221. return
  222. }
  223. gl, err = LogsBytesDecode(out)
  224. if err != nil {
  225. return
  226. }
  227. return
  228. }