高热共公日志库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
3.4 KiB

  1. package alils
  2. import (
  3. "encoding/json"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/astaxie/beego/logs"
  8. "github.com/gogo/protobuf/proto"
  9. )
  10. const (
  11. // CacheSize set the flush size
  12. CacheSize int = 64
  13. // Delimiter define the topic delimiter
  14. Delimiter string = "##"
  15. )
  16. // Config is the Config for Ali Log
  17. type Config struct {
  18. Project string `json:"project"`
  19. Endpoint string `json:"endpoint"`
  20. KeyID string `json:"key_id"`
  21. KeySecret string `json:"key_secret"`
  22. LogStore string `json:"log_store"`
  23. Topics []string `json:"topics"`
  24. Source string `json:"source"`
  25. Level int `json:"level"`
  26. FlushWhen int `json:"flush_when"`
  27. }
  28. // aliLSWriter implements LoggerInterface.
  29. // it writes messages in keep-live tcp connection.
  30. type aliLSWriter struct {
  31. store *LogStore
  32. group []*LogGroup
  33. withMap bool
  34. groupMap map[string]*LogGroup
  35. lock *sync.Mutex
  36. Config
  37. }
  38. // NewAliLS create a new Logger
  39. func NewAliLS() logs.Logger {
  40. alils := new(aliLSWriter)
  41. alils.Level = logs.LevelTrace
  42. return alils
  43. }
  44. // Init parse config and init struct
  45. func (c *aliLSWriter) Init(jsonConfig string) (err error) {
  46. json.Unmarshal([]byte(jsonConfig), c)
  47. if c.FlushWhen > CacheSize {
  48. c.FlushWhen = CacheSize
  49. }
  50. prj := &LogProject{
  51. Name: c.Project,
  52. Endpoint: c.Endpoint,
  53. AccessKeyID: c.KeyID,
  54. AccessKeySecret: c.KeySecret,
  55. }
  56. c.store, err = prj.GetLogStore(c.LogStore)
  57. if err != nil {
  58. return err
  59. }
  60. // Create default Log Group
  61. c.group = append(c.group, &LogGroup{
  62. Topic: proto.String(""),
  63. Source: proto.String(c.Source),
  64. Logs: make([]*Log, 0, c.FlushWhen),
  65. })
  66. // Create other Log Group
  67. c.groupMap = make(map[string]*LogGroup)
  68. for _, topic := range c.Topics {
  69. lg := &LogGroup{
  70. Topic: proto.String(topic),
  71. Source: proto.String(c.Source),
  72. Logs: make([]*Log, 0, c.FlushWhen),
  73. }
  74. c.group = append(c.group, lg)
  75. c.groupMap[topic] = lg
  76. }
  77. if len(c.group) == 1 {
  78. c.withMap = false
  79. } else {
  80. c.withMap = true
  81. }
  82. c.lock = &sync.Mutex{}
  83. return nil
  84. }
  85. // WriteMsg write message in connection.
  86. // if connection is down, try to re-connect.
  87. func (c *aliLSWriter) WriteMsg(when time.Time, msg string, level int) (err error) {
  88. if level > c.Level {
  89. return nil
  90. }
  91. var topic string
  92. var content string
  93. var lg *LogGroup
  94. if c.withMap {
  95. // Topic,LogGroup
  96. strs := strings.SplitN(msg, Delimiter, 2)
  97. if len(strs) == 2 {
  98. pos := strings.LastIndex(strs[0], " ")
  99. topic = strs[0][pos+1 : len(strs[0])]
  100. content = strs[0][0:pos] + strs[1]
  101. lg = c.groupMap[topic]
  102. }
  103. // send to empty Topic
  104. if lg == nil {
  105. content = msg
  106. lg = c.group[0]
  107. }
  108. } else {
  109. content = msg
  110. lg = c.group[0]
  111. }
  112. c1 := &LogContent{
  113. Key: proto.String("msg"),
  114. Value: proto.String(content),
  115. }
  116. l := &Log{
  117. Time: proto.Uint32(uint32(when.Unix())),
  118. Contents: []*LogContent{
  119. c1,
  120. },
  121. }
  122. c.lock.Lock()
  123. lg.Logs = append(lg.Logs, l)
  124. c.lock.Unlock()
  125. if len(lg.Logs) >= c.FlushWhen {
  126. c.flush(lg)
  127. }
  128. return nil
  129. }
  130. // Flush implementing method. empty.
  131. func (c *aliLSWriter) Flush() {
  132. // flush all group
  133. for _, lg := range c.group {
  134. c.flush(lg)
  135. }
  136. }
  137. // Destroy destroy connection writer and close tcp listener.
  138. func (c *aliLSWriter) Destroy() {
  139. }
  140. func (c *aliLSWriter) flush(lg *LogGroup) {
  141. c.lock.Lock()
  142. defer c.lock.Unlock()
  143. err := c.store.PutLogs(lg)
  144. if err != nil {
  145. return
  146. }
  147. lg.Logs = make([]*Log, 0, c.FlushWhen)
  148. }
  149. func init() {
  150. logs.Register(logs.AdapterAliLS, NewAliLS)
  151. }