Compare commits
No commits in common. "master" and "v1.2.2" have entirely different histories.
73
README.md
73
README.md
@ -1,21 +1,21 @@
|
||||
# grlogs
|
||||
本库为争游内部日志公共库, 该库基于 beegoLogger 基础上完善, 目前支持的引擎有 file、console、net、smtp、es、alisls
|
||||
本库为争游内部日志公共库
|
||||
|
||||
## 代码示例
|
||||
|
||||
1. 引入
|
||||
```
|
||||
```go
|
||||
import "golib.gaore.com/GaoreGo/grlogs"
|
||||
```
|
||||
2. 简单用法
|
||||
```
|
||||
```go
|
||||
grlogs.Get("test", 128).Info("hello word")
|
||||
grlogs.Get("test").Warning("hello word")
|
||||
```
|
||||
`Get` 方法中 `lable` 参数为标签,为识别分类所用,在Grlogs里一个分类使用一个管道进行日志
|
||||
|
||||
3. 进阶用法
|
||||
```
|
||||
```go
|
||||
logger := grlogs.GetEs("wifi")
|
||||
logger.SetAdapter(LevelAll, AdapterElasticSearch)
|
||||
logger.SetAdapter(LevelInfo, AdapterFile)
|
||||
@ -23,70 +23,9 @@ logger.Critical("出错了")
|
||||
logger.Info("出错了")
|
||||
```
|
||||
|
||||
4. 如果需要写入es 或 alils, 必须设置环境变量 `GRLOG_APP_NAME`, 不能有反斜杠, 如
|
||||
4. 如果需要写入es, 必须设置环境变量 `GRLOG_APP_NAME`, 不能有反斜杠, 如
|
||||
```shell script
|
||||
export GRLOG_APP_NAME=mkt.gaore.com;
|
||||
```
|
||||
还需要额外引入es库,完成初始化动作
|
||||
```go
|
||||
import _ "golib.gaore.com/GaoreGo/grlogs/logs/es"
|
||||
```
|
||||
|
||||
```go
|
||||
import _ "golib.gaore.com/GaoreGo/grlogs/logs/alils"
|
||||
```
|
||||
|
||||
5. 文件日志会写入到 `./runtime/logs/` 文件夹 **请务必在项目构建阶段创建该目录**
|
||||
|
||||
|
||||
6. `AliLS` 日志接入的是阿里SLS GOSDK , 下面是关于 [https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer] 描述
|
||||
|
||||
producer提供了两种关闭模式,分为有限关闭和安全关闭,安全关闭会等待producer中缓存的所有的数据全部发送完成以后在关闭producer,有限关闭会接收用户传递的一个参数值,时间单位为秒,当开始关闭producer的时候开始计时,超过传递的设定值还未能完全关闭producer的话会强制退出producer,此时可能会有部分数据未被成功发送而丢失。
|
||||
|
||||
所以用了aliLS 的 adapter 时,最好调用`grlogs.Close()` 或 `grlogs.CloseAll()` 方法安全关闭通道,以刷新缓冲区
|
||||
|
||||
7. 完整示例
|
||||
```go
|
||||
package grlogs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
_ "golib.gaore.com/GaoreGo/grlogs/logs/es"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestGetLogger(t *testing.T) {
|
||||
// 新建 channel 大小为128 标识为nds 日志通道 , Get 的方法 默认带 console 和 file 输出
|
||||
l := grlogs.Get("nds", 128).SetAdapter(LevelAll, AdapterElasticSearch)
|
||||
l.Debug("我正在调试")
|
||||
l.Critical("出错了")
|
||||
|
||||
// 复用 nds 的日志通道
|
||||
grlogs.Get("nds").Warning("hadoee %s", time.Now().Format(time.RFC1123))
|
||||
grlogs.Get("nds").Warning("hadoee %s", time.Now().Format(time.RFC1123))
|
||||
|
||||
// 新建 channel 大小为默认 标识为wifi 日志通道 , GetEs 的方法 默认带 console 和 file 和 elatisearch 输出
|
||||
grlogs.GetEs("wifi")
|
||||
for i := 0; i < 10; i++ {
|
||||
grlogs.Get("wifi").Warning("Warning")
|
||||
grlogs.Get("wifi").Warn("Warn")
|
||||
grlogs.Get("wifi").Debug("Debug")
|
||||
grlogs.Get("wifi").Error("Error")
|
||||
grlogs.Get("wifi").Notice("Notice")
|
||||
grlogs.Get("wifi").Info("Info")
|
||||
grlogs.Get("wifi").Alert("Alert")
|
||||
}
|
||||
|
||||
Get("wifi").Critical("neoweiwoewe")
|
||||
}
|
||||
|
||||
func TestDropAdapter(t *testing.T) {
|
||||
grlogs.SetAdapter(LevelAll, AdapterAliLs)
|
||||
grlogs.DropAdapter(AdapterAliLs)
|
||||
grlogs.Informational(errors.New("he hello"))
|
||||
grlogs.SetAdapter(LevelAll, AdapterAliLs)
|
||||
grlogs.Debug(errors.New("he hello"))
|
||||
grlogs.CloseAll()
|
||||
}
|
||||
```
|
||||
5. 文件日志会写入到 `./runtime/logs/` 文件夹 请务必在项目构建阶段创建该目录
|
17
config.go
17
config.go
@ -54,20 +54,3 @@ func (c *EsConfig) String() string {
|
||||
b, _ := json.Marshal(c)
|
||||
return bytes.NewBuffer(b).String()
|
||||
}
|
||||
|
||||
type AliLSConfig struct {
|
||||
Project string `json:"project"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
KeyID string `json:"key_id"`
|
||||
KeySecret string `json:"key_secret"`
|
||||
LogStore string `json:"log_store"`
|
||||
Topics []string `json:"topics"`
|
||||
Source string `json:"source"`
|
||||
Level Level `json:"level"`
|
||||
FlushWhen int `json:"flush_when"`
|
||||
}
|
||||
|
||||
func (c *AliLSConfig) String() string {
|
||||
b, _ := json.Marshal(c)
|
||||
return bytes.NewBuffer(b).String()
|
||||
}
|
||||
|
103
funtion.go
103
funtion.go
@ -5,21 +5,9 @@ import (
|
||||
"github.com/astaxie/beego/logs"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var adatperSetMapper = map[Adapter]func(l *Logger, level Level) error{
|
||||
|
||||
AdapterSocket: func(l *Logger, level Level) error {
|
||||
c := ConnLogConfig{
|
||||
ReconnectOnMsg: false,
|
||||
Reconnect: true,
|
||||
Net: "",
|
||||
Addr: "127.0.0.1:9888",
|
||||
Level: level,
|
||||
}
|
||||
return l.SetLogger(logs.AdapterConn, c.String())
|
||||
},
|
||||
var adatperMapper = map[Adapter]func(l *Logger, level Level) error{
|
||||
|
||||
AdapterFile: func(l *Logger, level Level) error {
|
||||
if wd, err := os.Getwd(); err == nil {
|
||||
@ -45,95 +33,20 @@ var adatperSetMapper = map[Adapter]func(l *Logger, level Level) error{
|
||||
},
|
||||
|
||||
AdapterElasticSearch: func(l *Logger, level Level) error {
|
||||
c := GenEsConfig(level)
|
||||
return l.SetLogger(logs.AdapterEs, c.String())
|
||||
},
|
||||
|
||||
AdapterAliLs: func(l *Logger, level Level) error {
|
||||
c := GenAliConfig(level)
|
||||
return l.SetLogger(logs.AdapterAliLS, c.String())
|
||||
},
|
||||
dsn := "http://es-cn-0pp1mm3hq000dnbh4.public.elasticsearch.aliyuncs.com:9200/"
|
||||
if envkey == "prod" || envkey == "" || envkey == "gray" {
|
||||
dsn = "http://es-cn-0pp1mm3hq000dnbh4.elasticsearch.aliyuncs.com:9200/"
|
||||
}
|
||||
|
||||
func GenAliConfig(level Level) *AliLSConfig {
|
||||
var project string = "gaore-app-logstore"
|
||||
var endpoint string
|
||||
|
||||
if os.Getenv(envkey) == "prod" || os.Getenv(envkey) == "" || os.Getenv(envkey) == "gray" {
|
||||
endpoint = project + ".cn-shenzhen-intranet.log.aliyuncs.com"
|
||||
} else if os.Getenv(envkey) == "dev" {
|
||||
endpoint = project + ".cn-shenzhen.log.aliyuncs.com"
|
||||
}
|
||||
|
||||
c := &AliLSConfig{
|
||||
Project: project,
|
||||
Endpoint: endpoint,
|
||||
KeyID: "LTAI4GCHwcqtrFD4DHRHxR4k",
|
||||
KeySecret: "Ln19xfVYy6OMlJeF9aBvFl4fhRUKBl",
|
||||
LogStore: "gaore-app-logstore",
|
||||
Topics: []string{os.Getenv("GRLOG_APP_NAME")},
|
||||
Source: "",
|
||||
Level: level,
|
||||
FlushWhen: 0,
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func GenEsConfig(level Level) *EsConfig {
|
||||
dsn := "http://es-cn-tl32xlfmu00015h34.public.elasticsearch.aliyuncs.com:9200/"
|
||||
if os.Getenv(envkey) == "prod" || os.Getenv(envkey) == "" || os.Getenv(envkey) == "gray" {
|
||||
dsn = "http://es-cn-tl32xlfmu00015h34.elasticsearch.aliyuncs.com:9200/"
|
||||
}
|
||||
|
||||
c := &EsConfig{
|
||||
Username: "kaifa_api",
|
||||
Password: "2quYX3bTeahO",
|
||||
c := EsConfig{
|
||||
Username: "elastic",
|
||||
Password: "Hellogaore@",
|
||||
Dsn: dsn,
|
||||
Level: level,
|
||||
Index: os.Getenv("GRLOG_APP_NAME"),
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
var adatperDropMapper = map[Adapter]func(l *Logger) error{
|
||||
|
||||
AdapterAliLs: func(l *Logger) error {
|
||||
return l.BeeLogger.DelLogger(logs.AdapterAliLS)
|
||||
},
|
||||
|
||||
AdapterFile: func(l *Logger) error {
|
||||
return l.BeeLogger.DelLogger(logs.AdapterFile)
|
||||
},
|
||||
|
||||
AdapterConsole: func(l *Logger) error {
|
||||
return l.BeeLogger.DelLogger(logs.AdapterConsole)
|
||||
},
|
||||
|
||||
AdapterElasticSearch: func(l *Logger) error {
|
||||
return l.BeeLogger.DelLogger(logs.AdapterEs)
|
||||
return l.SetLogger(logs.AdapterEs, c.String())
|
||||
},
|
||||
}
|
||||
|
||||
func formatLog(f interface{}, v ...interface{}) string {
|
||||
var msg string
|
||||
switch f.(type) {
|
||||
case string:
|
||||
msg = f.(string)
|
||||
if len(v) == 0 {
|
||||
return msg
|
||||
}
|
||||
if strings.Contains(msg, "%") && !strings.Contains(msg, "%%") {
|
||||
//format string
|
||||
} else {
|
||||
//do not contain format char
|
||||
msg += strings.Repeat(" %v", len(v))
|
||||
}
|
||||
default:
|
||||
msg = fmt.Sprint(f)
|
||||
if len(v) == 0 {
|
||||
return msg
|
||||
}
|
||||
msg += strings.Repeat(" %v", len(v))
|
||||
}
|
||||
return fmt.Sprintf(msg, v...)
|
||||
}
|
||||
|
59
grlogs.go
59
grlogs.go
@ -1,59 +0,0 @@
|
||||
package grlogs
|
||||
|
||||
import "strings"
|
||||
|
||||
func Info(v ...interface{}) {
|
||||
Get("grlogs").Info(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func Informational(v ...interface{}) {
|
||||
Get("grlogs").Informational(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func Warning(v ...interface{}) {
|
||||
Get("grlogs").Warning(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func Warn(v ...interface{}) {
|
||||
Get("grlogs").Warn(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func Notice(v ...interface{}) {
|
||||
Get("grlogs").Notice(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func Error(v ...interface{}) {
|
||||
Get("grlogs").Error(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func Critical(v ...interface{}) {
|
||||
Get("grlogs").Critical(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func Alert(v ...interface{}) {
|
||||
Get("grlogs").Alert(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func Emergency(v ...interface{}) {
|
||||
Get("grlogs").Emergency(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func Trace(v ...interface{}) {
|
||||
Get("grlogs").Trace(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func Debug(v ...interface{}) {
|
||||
Get("grlogs").Debug(generateFmtStr(len(v)), v...)
|
||||
}
|
||||
|
||||
func generateFmtStr(n int) string {
|
||||
return strings.Repeat("%v ", n)
|
||||
}
|
||||
|
||||
func SetAdapter(level Level, adapter Adapter) {
|
||||
Get("grlogs").SetAdapter(level, adapter)
|
||||
}
|
||||
|
||||
func DropAdapter(adapter Adapter) {
|
||||
Get("grlogs").DropAdapter(adapter)
|
||||
}
|
138
log.go
138
log.go
@ -11,21 +11,13 @@ var loggers = sync.Map{}
|
||||
|
||||
var envkey = "CENTER_RUNMODE"
|
||||
|
||||
var defaultModeMapping = map[string][]AdapterTupple{
|
||||
"es": []AdapterTupple{{LevelAll, AdapterConsole}, {LevelAll, AdapterElasticSearch}},
|
||||
"ali": []AdapterTupple{{LevelAll, AdapterConsole}, {LevelAll, AdapterAliLs}},
|
||||
"": []AdapterTupple{{LevelAll, AdapterConsole}, {LevelAll, AdapterFile}},
|
||||
}
|
||||
|
||||
type Logger struct {
|
||||
Lable string
|
||||
*logs.BeeLogger
|
||||
}
|
||||
|
||||
type Getter func() *Logger
|
||||
|
||||
func (self *Logger) SetAdapter(level Level, adapter Adapter) *Logger {
|
||||
if call, ok := adatperSetMapper[adapter]; ok {
|
||||
if call, ok := adatperMapper[adapter]; ok {
|
||||
if err := call(self, level); err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
@ -33,113 +25,51 @@ func (self *Logger) SetAdapter(level Level, adapter Adapter) *Logger {
|
||||
return self
|
||||
}
|
||||
|
||||
func (self *Logger) DropAdapter(adapter Adapter) *Logger {
|
||||
if call, ok := adatperDropMapper[adapter]; ok {
|
||||
if err := call(self); err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}
|
||||
return self
|
||||
func New(label string, channelLens ...int64) (l *Logger, loaded bool) {
|
||||
var channellens int64
|
||||
var tmp interface{}
|
||||
|
||||
tmp, loaded = loggers.LoadOrStore(label, new(Logger))
|
||||
l = tmp.(*Logger)
|
||||
if len(channelLens) > 0 {
|
||||
channellens = channelLens[0]
|
||||
}
|
||||
|
||||
func newLoggerFromMap(label string, defaultmode string, channelLens ...int64) Getter {
|
||||
|
||||
if tmp, ok := loggers.Load(label); ok {
|
||||
return tmp.(Getter)
|
||||
}
|
||||
|
||||
var l *Logger
|
||||
var once sync.Once
|
||||
wapperGetter := Getter(func() *Logger {
|
||||
once.Do(func() {
|
||||
|
||||
var channelLensNum int64 = 100
|
||||
if len(channelLens) > 0 && channelLens[0] > 0 {
|
||||
channelLensNum = channelLens[0]
|
||||
}
|
||||
|
||||
l = &Logger{BeeLogger: logs.NewLogger(channelLensNum)}
|
||||
if !loaded {
|
||||
l.Lable = label
|
||||
l.BeeLogger = logs.NewLogger(channelLensNum)
|
||||
l.BeeLogger = logs.NewLogger(channellens)
|
||||
l.BeeLogger.Lable = label
|
||||
l.Env = os.Getenv(envkey)
|
||||
l.SetPrefix(fmt.Sprintf("[env:%s logger:%s]", os.Getenv(envkey), label))
|
||||
l.EnableFuncCallDepth(true)
|
||||
l.SetLogFuncCallDepth(2)
|
||||
if mode, ok := defaultModeMapping[defaultmode]; ok {
|
||||
for _, v := range mode {
|
||||
l.SetAdapter(v.Level, v.Adapter)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return l
|
||||
})
|
||||
|
||||
tmp, loaded := loggers.LoadOrStore(label, wapperGetter)
|
||||
if loaded {
|
||||
return tmp.(Getter)
|
||||
return
|
||||
}
|
||||
|
||||
return wapperGetter
|
||||
}
|
||||
|
||||
func Get(label string, channelLens ...int64) *Logger {
|
||||
getter := newLoggerFromMap(label, "", channelLens...)
|
||||
return getter()
|
||||
}
|
||||
|
||||
func GetEs(label string, channelLens ...int64) *Logger {
|
||||
getter := newLoggerFromMap(label, "es", channelLens...)
|
||||
return getter()
|
||||
}
|
||||
|
||||
func GetAli(label string, channelLens ...int64) *Logger {
|
||||
getter := newLoggerFromMap(label, "ali", channelLens...)
|
||||
return getter()
|
||||
}
|
||||
|
||||
func Close(lables ...string) {
|
||||
wg := &sync.WaitGroup{}
|
||||
for _, lable := range lables {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
if v, ok := loggers.Load(lable); ok {
|
||||
if tmp, ok := v.(*Logger); ok {
|
||||
loggers.Delete(lable)
|
||||
tmp.BeeLogger.Close()
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
}
|
||||
|
||||
func CloseAll() {
|
||||
wg := &sync.WaitGroup{}
|
||||
loggers.Range(func(key, value interface{}) bool {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
if tmp, ok := value.(*Logger); ok {
|
||||
tmp.BeeLogger.Close()
|
||||
loggers.Delete(key)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
return true
|
||||
})
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func init() {
|
||||
var level Level = LevelInfo
|
||||
func Get(label string, channelLens ...int64) (l *Logger) {
|
||||
var filelevel Level = LevelInfo
|
||||
var loaded bool
|
||||
if os.Getenv(envkey) == "dev" {
|
||||
level = LevelAll
|
||||
filelevel = LevelAll
|
||||
}
|
||||
|
||||
defaultModeMapping["es"] = []AdapterTupple{{level, AdapterConsole}, {level, AdapterElasticSearch}}
|
||||
defaultModeMapping["ali"] = []AdapterTupple{{level, AdapterConsole}, {level, AdapterAliLs}}
|
||||
defaultModeMapping[""] = []AdapterTupple{{level, AdapterConsole}, {level, AdapterFile}}
|
||||
if l, loaded = New(label, channelLens...); !loaded {
|
||||
l.SetAdapter(filelevel, AdapterFile).SetAdapter(LevelAll, AdapterConsole)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func GetEs(label string, channelLens ...int64) (l *Logger) {
|
||||
var filelevel Level = LevelInfo
|
||||
var loaded bool
|
||||
if os.Getenv(envkey) == "dev" {
|
||||
filelevel = LevelAll
|
||||
}
|
||||
|
||||
if l, loaded = New(label, channelLens...); !loaded {
|
||||
l.SetAdapter(filelevel, AdapterFile).SetAdapter(LevelAll, AdapterConsole).SetAdapter(filelevel, AdapterElasticSearch)
|
||||
}
|
||||
return
|
||||
|
||||
}
|
||||
|
77
log_test.go
77
log_test.go
@ -1,98 +1,23 @@
|
||||
package grlogs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
_ "golib.gaore.com/GaoreGo/grlogs/logs/alils"
|
||||
_ "golib.gaore.com/GaoreGo/grlogs/logs/es"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestGetLogger(t *testing.T) {
|
||||
// 新建 channel 大小为128 标识为nds 日志通道 , Get 的方法 默认带 console 和 file 输出
|
||||
l := Get("nds", 128).SetAdapter(LevelAll, AdapterElasticSearch)
|
||||
l.Debug("我正在调试")
|
||||
l.Critical("出错了")
|
||||
|
||||
// 复用 nds 的日志通道
|
||||
Get("nds").Warning("hadoee %s", time.Now().Format(time.RFC1123))
|
||||
Get("nds").Warning("hadoee %s", time.Now().Format(time.RFC1123))
|
||||
|
||||
// 新建 channel 大小为默认 标识为wifi 日志通道 , GetEs 的方法 默认带 console 和 file 和 elatisearch 输出
|
||||
GetEs("wifi")
|
||||
for i := 0; i < 10; i++ {
|
||||
Get("wifi").Warning("Warning")
|
||||
Get("wifi").Warn("Warn")
|
||||
Get("wifi").Debug("Debug")
|
||||
Get("wifi").Error("Error")
|
||||
Get("wifi").Notice("Notice")
|
||||
Get("wifi").Info("Info")
|
||||
Get("wifi").Alert("Alert")
|
||||
Get("wifi").Error("neoweiwoewe")
|
||||
}
|
||||
|
||||
Get("wifi").Critical("neoweiwoewe")
|
||||
}
|
||||
|
||||
func TestGetAliLs(t *testing.T) {
|
||||
|
||||
lable := "test_alils"
|
||||
l := GetAli(lable).Async(128)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
l.Info("Info")
|
||||
l.Debug("Debug")
|
||||
l.Warn("Warn")
|
||||
l.Warning("Warning")
|
||||
l.Error("Error")
|
||||
l.Error("Error\n\n 测试换行")
|
||||
l.Warn("Warn")
|
||||
}
|
||||
|
||||
time.Sleep(time.Hour * 1)
|
||||
}
|
||||
|
||||
func TestDropAdapter(t *testing.T) {
|
||||
SetAdapter(LevelAll, AdapterAliLs)
|
||||
DropAdapter(AdapterAliLs)
|
||||
Informational(errors.New("he hello"))
|
||||
SetAdapter(LevelAll, AdapterAliLs)
|
||||
Debug(errors.New("he hello"))
|
||||
CloseAll()
|
||||
}
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
GetAli("ok").Debug("%d", i)
|
||||
time.Sleep(time.Second * 10)
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
GetAli("ok").Debug("aaaaaa%d", i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestGetEs(t *testing.T) {
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
GetEs("ok").Debug("%d", i)
|
||||
time.Sleep(time.Second * 10)
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
GetEs("ok").Debug("aaaaaa%d", i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
@ -2,24 +2,23 @@ package alils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/aliyun/aliyun-log-go-sdk/producer"
|
||||
"golib.gaore.com/GaoreGo/grlogs/logs"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"golib.gaore.com/GaoreGo/grlogs/logs"
|
||||
)
|
||||
|
||||
func NewAliLS() logs.Logger {
|
||||
cw := &alilsLogger{
|
||||
Level: logs.LevelDebug,
|
||||
}
|
||||
return cw
|
||||
}
|
||||
const (
|
||||
// CacheSize set the flush size
|
||||
CacheSize int = 64
|
||||
// Delimiter define the topic delimiter
|
||||
Delimiter string = "##"
|
||||
)
|
||||
|
||||
type alilsLogger struct {
|
||||
producer *producer.Producer
|
||||
callback *Callback
|
||||
// Config is the Config for Ali Log
|
||||
type Config struct {
|
||||
Project string `json:"project"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
KeyID string `json:"key_id"`
|
||||
@ -29,68 +28,157 @@ type alilsLogger struct {
|
||||
Source string `json:"source"`
|
||||
Level int `json:"level"`
|
||||
FlushWhen int `json:"flush_when"`
|
||||
Debug bool
|
||||
}
|
||||
|
||||
func (a *alilsLogger) Init(jsonconfig string) error {
|
||||
// aliLSWriter implements LoggerInterface.
|
||||
// it writes messages in keep-live tcp connection.
|
||||
type aliLSWriter struct {
|
||||
store *LogStore
|
||||
group []*LogGroup
|
||||
withMap bool
|
||||
groupMap map[string]*LogGroup
|
||||
lock *sync.Mutex
|
||||
Config
|
||||
}
|
||||
|
||||
err := json.Unmarshal([]byte(jsonconfig), a)
|
||||
// NewAliLS create a new Logger
|
||||
func NewAliLS() logs.Logger {
|
||||
alils := new(aliLSWriter)
|
||||
alils.Level = logs.LevelTrace
|
||||
return alils
|
||||
}
|
||||
|
||||
// Init parse config and init struct
|
||||
func (c *aliLSWriter) Init(jsonConfig string) (err error) {
|
||||
|
||||
json.Unmarshal([]byte(jsonConfig), c)
|
||||
|
||||
if c.FlushWhen > CacheSize {
|
||||
c.FlushWhen = CacheSize
|
||||
}
|
||||
|
||||
prj := &LogProject{
|
||||
Name: c.Project,
|
||||
Endpoint: c.Endpoint,
|
||||
AccessKeyID: c.KeyID,
|
||||
AccessKeySecret: c.KeySecret,
|
||||
}
|
||||
|
||||
c.store, err = prj.GetLogStore(c.LogStore)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
producerConfig := producer.GetDefaultProducerConfig()
|
||||
producerConfig.Endpoint = a.Endpoint
|
||||
producerConfig.AccessKeyID = a.KeyID
|
||||
producerConfig.AccessKeySecret = a.KeySecret
|
||||
producerConfig.LingerMs = 100
|
||||
producerConfig.NoRetryStatusCodeList = []int{-1}
|
||||
producerConfig.Retries = 2
|
||||
producerConfig.AllowLogLevel = "error"
|
||||
producerConfig.MaxIoWorkerCount = int64(runtime.NumCPU())
|
||||
a.producer = producer.InitProducer(producerConfig)
|
||||
a.callback = &Callback{}
|
||||
a.producer.Start()
|
||||
a.Debug = os.Getenv("GRLOG_ALILS_DEBUG") == "on"
|
||||
|
||||
// Create default Log Group
|
||||
c.group = append(c.group, &LogGroup{
|
||||
Topic: proto.String(""),
|
||||
Source: proto.String(c.Source),
|
||||
Logs: make([]*Log, 0, c.FlushWhen),
|
||||
})
|
||||
|
||||
// Create other Log Group
|
||||
c.groupMap = make(map[string]*LogGroup)
|
||||
for _, topic := range c.Topics {
|
||||
|
||||
lg := &LogGroup{
|
||||
Topic: proto.String(topic),
|
||||
Source: proto.String(c.Source),
|
||||
Logs: make([]*Log, 0, c.FlushWhen),
|
||||
}
|
||||
|
||||
c.group = append(c.group, lg)
|
||||
c.groupMap[topic] = lg
|
||||
}
|
||||
|
||||
if len(c.group) == 1 {
|
||||
c.withMap = false
|
||||
} else {
|
||||
c.withMap = true
|
||||
}
|
||||
|
||||
c.lock = &sync.Mutex{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *alilsLogger) WriteMsg(when time.Time, msg string, level int, lable string, env string) error {
|
||||
// WriteMsg write message in connection.
|
||||
// if connection is down, try to re-connect.
|
||||
func (c *aliLSWriter) WriteMsg(when time.Time, msg string, level int, lable string, env string) (err error) {
|
||||
|
||||
vals := map[string]string{"msg": msg, "level": fmt.Sprintf("%d", level)}
|
||||
vals["level_string"] = logs.GetLevelString(level)
|
||||
vals["env"] = env
|
||||
vals["lable"] = lable
|
||||
vals["hostname"] = GetHostname()
|
||||
vals["working_idr"] = Getwd()
|
||||
vals["home_dir"] = GetUserHomename()
|
||||
vals["hardware_addr"] = GetCurrentInterfaceHardwareAddr()
|
||||
vals["client_addrs"] = GetCurrentInterfaceAddrs()
|
||||
|
||||
log := producer.GenerateLog(uint32(when.Unix()), vals)
|
||||
|
||||
if a.Debug {
|
||||
for _, topic := range a.Topics {
|
||||
if err := a.producer.SendLogWithCallBack(a.Project, a.LogStore, topic, a.Source, log, a.callback); err != nil {
|
||||
return err
|
||||
if level > c.Level {
|
||||
return nil
|
||||
}
|
||||
|
||||
var topic string
|
||||
var content string
|
||||
var lg *LogGroup
|
||||
if c.withMap {
|
||||
|
||||
// Topic,LogGroup
|
||||
strs := strings.SplitN(msg, Delimiter, 2)
|
||||
if len(strs) == 2 {
|
||||
pos := strings.LastIndex(strs[0], " ")
|
||||
topic = strs[0][pos+1 : len(strs[0])]
|
||||
content = strs[0][0:pos] + strs[1]
|
||||
lg = c.groupMap[topic]
|
||||
}
|
||||
|
||||
// send to empty Topic
|
||||
if lg == nil {
|
||||
content = msg
|
||||
lg = c.group[0]
|
||||
}
|
||||
} else {
|
||||
for _, topic := range a.Topics {
|
||||
if err := a.producer.SendLog(a.Project, a.LogStore, topic, a.Source, log); err != nil {
|
||||
return err
|
||||
content = msg
|
||||
lg = c.group[0]
|
||||
}
|
||||
|
||||
c1 := &LogContent{
|
||||
Key: proto.String("msg"),
|
||||
Value: proto.String(content),
|
||||
}
|
||||
|
||||
l := &Log{
|
||||
Time: proto.Uint32(uint32(when.Unix())),
|
||||
Contents: []*LogContent{
|
||||
c1,
|
||||
},
|
||||
}
|
||||
|
||||
c.lock.Lock()
|
||||
lg.Logs = append(lg.Logs, l)
|
||||
c.lock.Unlock()
|
||||
|
||||
if len(lg.Logs) >= c.FlushWhen {
|
||||
c.flush(lg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *alilsLogger) Destroy() {
|
||||
a.producer.SafeClose()
|
||||
a.producer.Close(300)
|
||||
// Flush implementing method. empty.
|
||||
func (c *aliLSWriter) Flush() {
|
||||
|
||||
// flush all group
|
||||
for _, lg := range c.group {
|
||||
c.flush(lg)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *alilsLogger) Flush() {
|
||||
// Destroy destroy connection writer and close tcp listener.
|
||||
func (c *aliLSWriter) Destroy() {
|
||||
}
|
||||
|
||||
func (c *aliLSWriter) flush(lg *LogGroup) {
|
||||
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
err := c.store.PutLogs(lg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
lg.Logs = make([]*Log, 0, c.FlushWhen)
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -1,25 +0,0 @@
|
||||
package alils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/aliyun/aliyun-log-go-sdk/producer"
|
||||
)
|
||||
|
||||
type Callback struct {
|
||||
}
|
||||
|
||||
func (callback *Callback) Success(result *producer.Result) {
|
||||
attemptList := result.GetReservedAttempts()
|
||||
for _, attempt := range attemptList {
|
||||
fmt.Printf("alilog %+v \n", attempt)
|
||||
}
|
||||
}
|
||||
|
||||
func (callback *Callback) Fail(result *producer.Result) {
|
||||
fmt.Println("IsSuccessful", result.IsSuccessful())
|
||||
fmt.Println("GetErrorCode", result.GetErrorCode())
|
||||
fmt.Println("GetErrorMessage", result.GetErrorMessage())
|
||||
fmt.Println("GetReservedAttempts", result.GetReservedAttempts())
|
||||
fmt.Println("GetRequestId", result.GetRequestId())
|
||||
fmt.Println("GetTimeStampMs", result.GetTimeStampMs())
|
||||
}
|
13
logs/alils/config.go
Executable file
13
logs/alils/config.go
Executable file
@ -0,0 +1,13 @@
|
||||
package alils
|
||||
|
||||
const (
|
||||
version = "0.5.0" // SDK version
|
||||
signatureMethod = "hmac-sha1" // Signature method
|
||||
|
||||
// OffsetNewest stands for the log head offset, i.e. the offset that will be
|
||||
// assigned to the next message that will be produced to the shard.
|
||||
OffsetNewest = "end"
|
||||
// OffsetOldest stands for the oldest offset available on the logstore for a
|
||||
// shard.
|
||||
OffsetOldest = "begin"
|
||||
)
|
1038
logs/alils/log.pb.go
Executable file
1038
logs/alils/log.pb.go
Executable file
File diff suppressed because it is too large
Load Diff
42
logs/alils/log_config.go
Executable file
42
logs/alils/log_config.go
Executable file
@ -0,0 +1,42 @@
|
||||
package alils
|
||||
|
||||
// InputDetail define log detail
|
||||
type InputDetail struct {
|
||||
LogType string `json:"logType"`
|
||||
LogPath string `json:"logPath"`
|
||||
FilePattern string `json:"filePattern"`
|
||||
LocalStorage bool `json:"localStorage"`
|
||||
TimeFormat string `json:"timeFormat"`
|
||||
LogBeginRegex string `json:"logBeginRegex"`
|
||||
Regex string `json:"regex"`
|
||||
Keys []string `json:"key"`
|
||||
FilterKeys []string `json:"filterKey"`
|
||||
FilterRegex []string `json:"filterRegex"`
|
||||
TopicFormat string `json:"topicFormat"`
|
||||
}
|
||||
|
||||
// OutputDetail define the output detail
|
||||
type OutputDetail struct {
|
||||
Endpoint string `json:"endpoint"`
|
||||
LogStoreName string `json:"logstoreName"`
|
||||
}
|
||||
|
||||
// LogConfig define Log Config
|
||||
type LogConfig struct {
|
||||
Name string `json:"configName"`
|
||||
InputType string `json:"inputType"`
|
||||
InputDetail InputDetail `json:"inputDetail"`
|
||||
OutputType string `json:"outputType"`
|
||||
OutputDetail OutputDetail `json:"outputDetail"`
|
||||
|
||||
CreateTime uint32
|
||||
LastModifyTime uint32
|
||||
|
||||
project *LogProject
|
||||
}
|
||||
|
||||
// GetAppliedMachineGroup returns applied machine group of this config.
|
||||
func (c *LogConfig) GetAppliedMachineGroup(confName string) (groupNames []string, err error) {
|
||||
groupNames, err = c.project.GetAppliedMachineGroups(c.Name)
|
||||
return
|
||||
}
|
819
logs/alils/log_project.go
Executable file
819
logs/alils/log_project.go
Executable file
@ -0,0 +1,819 @@
|
||||
/*
|
||||
Package alils implements the SDK(v0.5.0) of Simple Log Service(abbr. SLS).
|
||||
|
||||
For more description about SLS, please read this article:
|
||||
http://gitlab.alibaba-inc.com/sls/doc.
|
||||
*/
|
||||
package alils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
)
|
||||
|
||||
// Error message in SLS HTTP response.
|
||||
type errorMessage struct {
|
||||
Code string `json:"errorCode"`
|
||||
Message string `json:"errorMessage"`
|
||||
}
|
||||
|
||||
// LogProject Define the Ali Project detail
|
||||
type LogProject struct {
|
||||
Name string // Project name
|
||||
Endpoint string // IP or hostname of SLS endpoint
|
||||
AccessKeyID string
|
||||
AccessKeySecret string
|
||||
}
|
||||
|
||||
// NewLogProject creates a new SLS project.
|
||||
func NewLogProject(name, endpoint, AccessKeyID, accessKeySecret string) (p *LogProject, err error) {
|
||||
p = &LogProject{
|
||||
Name: name,
|
||||
Endpoint: endpoint,
|
||||
AccessKeyID: AccessKeyID,
|
||||
AccessKeySecret: accessKeySecret,
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// ListLogStore returns all logstore names of project p.
|
||||
func (p *LogProject) ListLogStore() (storeNames []string, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/logstores")
|
||||
r, err := request(p, "GET", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to list logstore")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
type Body struct {
|
||||
Count int
|
||||
LogStores []string
|
||||
}
|
||||
body := &Body{}
|
||||
|
||||
err = json.Unmarshal(buf, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
storeNames = body.LogStores
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetLogStore returns logstore according by logstore name.
|
||||
func (p *LogProject) GetLogStore(name string) (s *LogStore, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
r, err := request(p, "GET", "/logstores/"+name, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to get logstore")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
s = &LogStore{}
|
||||
err = json.Unmarshal(buf, s)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.project = p
|
||||
return
|
||||
}
|
||||
|
||||
// CreateLogStore creates a new logstore in SLS,
|
||||
// where name is logstore name,
|
||||
// and ttl is time-to-live(in day) of logs,
|
||||
// and shardCnt is the number of shards.
|
||||
func (p *LogProject) CreateLogStore(name string, ttl, shardCnt int) (err error) {
|
||||
|
||||
type Body struct {
|
||||
Name string `json:"logstoreName"`
|
||||
TTL int `json:"ttl"`
|
||||
ShardCount int `json:"shardCount"`
|
||||
}
|
||||
|
||||
store := &Body{
|
||||
Name: name,
|
||||
TTL: ttl,
|
||||
ShardCount: shardCnt,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(store)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
|
||||
"Content-Type": "application/json",
|
||||
"Accept-Encoding": "deflate", // TODO: support lz4
|
||||
}
|
||||
|
||||
r, err := request(p, "POST", "/logstores", h, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
body, err = ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(body, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to create logstore")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteLogStore deletes a logstore according by logstore name.
|
||||
func (p *LogProject) DeleteLogStore(name string) (err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
r, err := request(p, "DELETE", "/logstores/"+name, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(body, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to delete logstore")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateLogStore updates a logstore according by logstore name,
|
||||
// obviously we can't modify the logstore name itself.
|
||||
func (p *LogProject) UpdateLogStore(name string, ttl, shardCnt int) (err error) {
|
||||
|
||||
type Body struct {
|
||||
Name string `json:"logstoreName"`
|
||||
TTL int `json:"ttl"`
|
||||
ShardCount int `json:"shardCount"`
|
||||
}
|
||||
|
||||
store := &Body{
|
||||
Name: name,
|
||||
TTL: ttl,
|
||||
ShardCount: shardCnt,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(store)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
|
||||
"Content-Type": "application/json",
|
||||
"Accept-Encoding": "deflate", // TODO: support lz4
|
||||
}
|
||||
|
||||
r, err := request(p, "PUT", "/logstores", h, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
body, err = ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(body, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to update logstore")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// ListMachineGroup returns machine group name list and the total number of machine groups.
|
||||
// The offset starts from 0 and the size is the max number of machine groups could be returned.
|
||||
func (p *LogProject) ListMachineGroup(offset, size int) (m []string, total int, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
if size <= 0 {
|
||||
size = 500
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/machinegroups?offset=%v&size=%v", offset, size)
|
||||
r, err := request(p, "GET", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to list machine group")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
type Body struct {
|
||||
MachineGroups []string
|
||||
Count int
|
||||
Total int
|
||||
}
|
||||
body := &Body{}
|
||||
|
||||
err = json.Unmarshal(buf, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
m = body.MachineGroups
|
||||
total = body.Total
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetMachineGroup retruns machine group according by machine group name.
|
||||
func (p *LogProject) GetMachineGroup(name string) (m *MachineGroup, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
r, err := request(p, "GET", "/machinegroups/"+name, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to get machine group:%v", name)
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
m = &MachineGroup{}
|
||||
err = json.Unmarshal(buf, m)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
m.project = p
|
||||
return
|
||||
}
|
||||
|
||||
// CreateMachineGroup creates a new machine group in SLS.
|
||||
func (p *LogProject) CreateMachineGroup(m *MachineGroup) (err error) {
|
||||
|
||||
body, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
|
||||
"Content-Type": "application/json",
|
||||
"Accept-Encoding": "deflate", // TODO: support lz4
|
||||
}
|
||||
|
||||
r, err := request(p, "POST", "/machinegroups", h, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
body, err = ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(body, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to create machine group")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateMachineGroup updates a machine group.
|
||||
func (p *LogProject) UpdateMachineGroup(m *MachineGroup) (err error) {
|
||||
|
||||
body, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
|
||||
"Content-Type": "application/json",
|
||||
"Accept-Encoding": "deflate", // TODO: support lz4
|
||||
}
|
||||
|
||||
r, err := request(p, "PUT", "/machinegroups/"+m.Name, h, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
body, err = ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(body, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to update machine group")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteMachineGroup deletes machine group according machine group name.
|
||||
func (p *LogProject) DeleteMachineGroup(name string) (err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
r, err := request(p, "DELETE", "/machinegroups/"+name, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(body, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to delete machine group")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ListConfig returns config names list and the total number of configs.
|
||||
// The offset starts from 0 and the size is the max number of configs could be returned.
|
||||
func (p *LogProject) ListConfig(offset, size int) (cfgNames []string, total int, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
if size <= 0 {
|
||||
size = 100
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/configs?offset=%v&size=%v", offset, size)
|
||||
r, err := request(p, "GET", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to delete machine group")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
type Body struct {
|
||||
Total int
|
||||
Configs []string
|
||||
}
|
||||
body := &Body{}
|
||||
|
||||
err = json.Unmarshal(buf, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
cfgNames = body.Configs
|
||||
total = body.Total
|
||||
return
|
||||
}
|
||||
|
||||
// GetConfig returns config according by config name.
|
||||
func (p *LogProject) GetConfig(name string) (c *LogConfig, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
r, err := request(p, "GET", "/configs/"+name, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to delete config")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
c = &LogConfig{}
|
||||
err = json.Unmarshal(buf, c)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
c.project = p
|
||||
return
|
||||
}
|
||||
|
||||
// UpdateConfig updates a config.
|
||||
func (p *LogProject) UpdateConfig(c *LogConfig) (err error) {
|
||||
|
||||
body, err := json.Marshal(c)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
|
||||
"Content-Type": "application/json",
|
||||
"Accept-Encoding": "deflate", // TODO: support lz4
|
||||
}
|
||||
|
||||
r, err := request(p, "PUT", "/configs/"+c.Name, h, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
body, err = ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(body, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to update config")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// CreateConfig creates a new config in SLS.
|
||||
func (p *LogProject) CreateConfig(c *LogConfig) (err error) {
|
||||
|
||||
body, err := json.Marshal(c)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
|
||||
"Content-Type": "application/json",
|
||||
"Accept-Encoding": "deflate", // TODO: support lz4
|
||||
}
|
||||
|
||||
r, err := request(p, "POST", "/configs", h, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
body, err = ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(body, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to update config")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteConfig deletes a config according by config name.
|
||||
func (p *LogProject) DeleteConfig(name string) (err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
r, err := request(p, "DELETE", "/configs/"+name, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(body, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to delete config")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetAppliedMachineGroups returns applied machine group names list according config name.
|
||||
func (p *LogProject) GetAppliedMachineGroups(confName string) (groupNames []string, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/configs/%v/machinegroups", confName)
|
||||
r, err := request(p, "GET", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to get applied machine groups")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
type Body struct {
|
||||
Count int
|
||||
Machinegroups []string
|
||||
}
|
||||
|
||||
body := &Body{}
|
||||
err = json.Unmarshal(buf, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
groupNames = body.Machinegroups
|
||||
return
|
||||
}
|
||||
|
||||
// GetAppliedConfigs returns applied config names list according machine group name groupName.
|
||||
func (p *LogProject) GetAppliedConfigs(groupName string) (confNames []string, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/machinegroups/%v/configs", groupName)
|
||||
r, err := request(p, "GET", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to applied configs")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
type Cfg struct {
|
||||
Count int `json:"count"`
|
||||
Configs []string `json:"configs"`
|
||||
}
|
||||
|
||||
body := &Cfg{}
|
||||
err = json.Unmarshal(buf, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
confNames = body.Configs
|
||||
return
|
||||
}
|
||||
|
||||
// ApplyConfigToMachineGroup applies config to machine group.
|
||||
func (p *LogProject) ApplyConfigToMachineGroup(confName, groupName string) (err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/machinegroups/%v/configs/%v", groupName, confName)
|
||||
r, err := request(p, "PUT", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to apply config to machine group")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// RemoveConfigFromMachineGroup removes config from machine group.
|
||||
func (p *LogProject) RemoveConfigFromMachineGroup(confName, groupName string) (err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/machinegroups/%v/configs/%v", groupName, confName)
|
||||
r, err := request(p, "DELETE", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to remove config from machine group")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Printf("%s\n", dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
271
logs/alils/log_store.go
Executable file
271
logs/alils/log_store.go
Executable file
@ -0,0 +1,271 @@
|
||||
package alils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"strconv"
|
||||
|
||||
lz4 "github.com/cloudflare/golz4"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
)
|
||||
|
||||
// LogStore Store the logs
|
||||
type LogStore struct {
|
||||
Name string `json:"logstoreName"`
|
||||
TTL int
|
||||
ShardCount int
|
||||
|
||||
CreateTime uint32
|
||||
LastModifyTime uint32
|
||||
|
||||
project *LogProject
|
||||
}
|
||||
|
||||
// Shard define the Log Shard
|
||||
type Shard struct {
|
||||
ShardID int `json:"shardID"`
|
||||
}
|
||||
|
||||
// ListShards returns shard id list of this logstore.
|
||||
func (s *LogStore) ListShards() (shardIDs []int, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/logstores/%v/shards", s.Name)
|
||||
r, err := request(s.project, "GET", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to list logstore")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Println(dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
var shards []*Shard
|
||||
err = json.Unmarshal(buf, &shards)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, v := range shards {
|
||||
shardIDs = append(shardIDs, v.ShardID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// PutLogs put logs into logstore.
|
||||
// The callers should transform user logs into LogGroup.
|
||||
func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
|
||||
body, err := proto.Marshal(lg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Compresse body with lz4
|
||||
out := make([]byte, lz4.CompressBound(body))
|
||||
n, err := lz4.Compress(body, out)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h := map[string]string{
|
||||
"x-sls-compresstype": "lz4",
|
||||
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
|
||||
"Content-Type": "application/x-protobuf",
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/logstores/%v", s.Name)
|
||||
r, err := request(s.project, "POST", uri, h, out[:n])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to put logs")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Println(dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// GetCursor gets log cursor of one shard specified by shardID.
|
||||
// The from can be in three form: a) unix timestamp in seccond, b) "begin", c) "end".
|
||||
// For more detail please read: http://gitlab.alibaba-inc.com/sls/doc/blob/master/api/shard.md#logstore
|
||||
func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/logstores/%v/shards/%v?type=cursor&from=%v",
|
||||
s.Name, shardID, from)
|
||||
|
||||
r, err := request(s.project, "GET", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to get cursor")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Println(dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
type Body struct {
|
||||
Cursor string
|
||||
}
|
||||
body := &Body{}
|
||||
|
||||
err = json.Unmarshal(buf, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
cursor = body.Cursor
|
||||
return
|
||||
}
|
||||
|
||||
// GetLogsBytes gets logs binary data from shard specified by shardID according cursor.
|
||||
// The logGroupMaxCount is the max number of logGroup could be returned.
|
||||
// The nextCursor is the next curosr can be used to read logs at next time.
|
||||
func (s *LogStore) GetLogsBytes(shardID int, cursor string,
|
||||
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
|
||||
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
"Accept": "application/x-protobuf",
|
||||
"Accept-Encoding": "lz4",
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
|
||||
s.Name, shardID, cursor, logGroupMaxCount)
|
||||
|
||||
r, err := request(s.project, "GET", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to get cursor")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Println(dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
v, ok := r.Header["X-Sls-Compresstype"]
|
||||
if !ok || len(v) == 0 {
|
||||
err = fmt.Errorf("can't find 'x-sls-compresstype' header")
|
||||
return
|
||||
}
|
||||
if v[0] != "lz4" {
|
||||
err = fmt.Errorf("unexpected compress type:%v", v[0])
|
||||
return
|
||||
}
|
||||
|
||||
v, ok = r.Header["X-Sls-Cursor"]
|
||||
if !ok || len(v) == 0 {
|
||||
err = fmt.Errorf("can't find 'x-sls-cursor' header")
|
||||
return
|
||||
}
|
||||
nextCursor = v[0]
|
||||
|
||||
v, ok = r.Header["X-Sls-Bodyrawsize"]
|
||||
if !ok || len(v) == 0 {
|
||||
err = fmt.Errorf("can't find 'x-sls-bodyrawsize' header")
|
||||
return
|
||||
}
|
||||
bodyRawSize, err := strconv.Atoi(v[0])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
out = make([]byte, bodyRawSize)
|
||||
err = lz4.Uncompress(buf, out)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// LogsBytesDecode decodes logs binary data retruned by GetLogsBytes API
|
||||
func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {
|
||||
|
||||
gl = &LogGroupList{}
|
||||
err = proto.Unmarshal(data, gl)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetLogs gets logs from shard specified by shardID according cursor.
|
||||
// The logGroupMaxCount is the max number of logGroup could be returned.
|
||||
// The nextCursor is the next curosr can be used to read logs at next time.
|
||||
func (s *LogStore) GetLogs(shardID int, cursor string,
|
||||
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
|
||||
|
||||
out, nextCursor, err := s.GetLogsBytes(shardID, cursor, logGroupMaxCount)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
gl, err = LogsBytesDecode(out)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
91
logs/alils/machine_group.go
Executable file
91
logs/alils/machine_group.go
Executable file
@ -0,0 +1,91 @@
|
||||
package alils
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
)
|
||||
|
||||
// MachineGroupAttribute define the Attribute
|
||||
type MachineGroupAttribute struct {
|
||||
ExternalName string `json:"externalName"`
|
||||
TopicName string `json:"groupTopic"`
|
||||
}
|
||||
|
||||
// MachineGroup define the machine Group
|
||||
type MachineGroup struct {
|
||||
Name string `json:"groupName"`
|
||||
Type string `json:"groupType"`
|
||||
MachineIDType string `json:"machineIdentifyType"`
|
||||
MachineIDList []string `json:"machineList"`
|
||||
|
||||
Attribute MachineGroupAttribute `json:"groupAttribute"`
|
||||
|
||||
CreateTime uint32
|
||||
LastModifyTime uint32
|
||||
|
||||
project *LogProject
|
||||
}
|
||||
|
||||
// Machine define the Machine
|
||||
type Machine struct {
|
||||
IP string
|
||||
UniqueID string `json:"machine-uniqueid"`
|
||||
UserdefinedID string `json:"userdefined-id"`
|
||||
}
|
||||
|
||||
// MachineList define the Machine List
|
||||
type MachineList struct {
|
||||
Total int
|
||||
Machines []*Machine
|
||||
}
|
||||
|
||||
// ListMachines returns machine list of this machine group.
|
||||
func (m *MachineGroup) ListMachines() (ms []*Machine, total int, err error) {
|
||||
h := map[string]string{
|
||||
"x-sls-bodyrawsize": "0",
|
||||
}
|
||||
|
||||
uri := fmt.Sprintf("/machinegroups/%v/machines", m.Name)
|
||||
r, err := request(m.project, "GET", uri, h, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.StatusCode != http.StatusOK {
|
||||
errMsg := &errorMessage{}
|
||||
err = json.Unmarshal(buf, errMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to remove config from machine group")
|
||||
dump, _ := httputil.DumpResponse(r, true)
|
||||
fmt.Println(dump)
|
||||
return
|
||||
}
|
||||
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
|
||||
return
|
||||
}
|
||||
|
||||
body := &MachineList{}
|
||||
err = json.Unmarshal(buf, body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ms = body.Machines
|
||||
total = body.Total
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetAppliedConfigs returns applied configs of this machine group.
|
||||
func (m *MachineGroup) GetAppliedConfigs() (confNames []string, err error) {
|
||||
confNames, err = m.project.GetAppliedConfigs(m.Name)
|
||||
return
|
||||
}
|
62
logs/alils/request.go
Executable file
62
logs/alils/request.go
Executable file
@ -0,0 +1,62 @@
|
||||
package alils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// request sends a request to SLS.
|
||||
func request(project *LogProject, method, uri string, headers map[string]string,
|
||||
body []byte) (resp *http.Response, err error) {
|
||||
|
||||
// The caller should provide 'x-sls-bodyrawsize' header
|
||||
if _, ok := headers["x-sls-bodyrawsize"]; !ok {
|
||||
err = fmt.Errorf("Can't find 'x-sls-bodyrawsize' header")
|
||||
return
|
||||
}
|
||||
|
||||
// SLS public request headers
|
||||
headers["Host"] = project.Name + "." + project.Endpoint
|
||||
headers["Date"] = nowRFC1123()
|
||||
headers["x-sls-apiversion"] = version
|
||||
headers["x-sls-signaturemethod"] = signatureMethod
|
||||
if body != nil {
|
||||
bodyMD5 := fmt.Sprintf("%X", md5.Sum(body))
|
||||
headers["Content-MD5"] = bodyMD5
|
||||
|
||||
if _, ok := headers["Content-Type"]; !ok {
|
||||
err = fmt.Errorf("Can't find 'Content-Type' header")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Calc Authorization
|
||||
// Authorization = "SLS <AccessKeyID>:<Signature>"
|
||||
digest, err := signature(project, method, uri, headers)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
auth := fmt.Sprintf("SLS %v:%v", project.AccessKeyID, digest)
|
||||
headers["Authorization"] = auth
|
||||
|
||||
// Initialize http request
|
||||
reader := bytes.NewReader(body)
|
||||
urlStr := fmt.Sprintf("http://%v.%v%v", project.Name, project.Endpoint, uri)
|
||||
req, err := http.NewRequest(method, urlStr, reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for k, v := range headers {
|
||||
req.Header.Add(k, v)
|
||||
}
|
||||
|
||||
// Get ready to do request
|
||||
resp, err = http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
111
logs/alils/signature.go
Executable file
111
logs/alils/signature.go
Executable file
@ -0,0 +1,111 @@
|
||||
package alils
|
||||
|
||||
import (
|
||||
"crypto/hmac"
|
||||
"crypto/sha1"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// GMT location
|
||||
var gmtLoc = time.FixedZone("GMT", 0)
|
||||
|
||||
// NowRFC1123 returns now time in RFC1123 format with GMT timezone,
|
||||
// eg. "Mon, 02 Jan 2006 15:04:05 GMT".
|
||||
func nowRFC1123() string {
|
||||
return time.Now().In(gmtLoc).Format(time.RFC1123)
|
||||
}
|
||||
|
||||
// signature calculates a request's signature digest.
|
||||
func signature(project *LogProject, method, uri string,
|
||||
headers map[string]string) (digest string, err error) {
|
||||
var contentMD5, contentType, date, canoHeaders, canoResource string
|
||||
var slsHeaderKeys sort.StringSlice
|
||||
|
||||
// SignString = VERB + "\n"
|
||||
// + CONTENT-MD5 + "\n"
|
||||
// + CONTENT-TYPE + "\n"
|
||||
// + DATE + "\n"
|
||||
// + CanonicalizedSLSHeaders + "\n"
|
||||
// + CanonicalizedResource
|
||||
|
||||
if val, ok := headers["Content-MD5"]; ok {
|
||||
contentMD5 = val
|
||||
}
|
||||
|
||||
if val, ok := headers["Content-Type"]; ok {
|
||||
contentType = val
|
||||
}
|
||||
|
||||
date, ok := headers["Date"]
|
||||
if !ok {
|
||||
err = fmt.Errorf("Can't find 'Date' header")
|
||||
return
|
||||
}
|
||||
|
||||
// Calc CanonicalizedSLSHeaders
|
||||
slsHeaders := make(map[string]string, len(headers))
|
||||
for k, v := range headers {
|
||||
l := strings.TrimSpace(strings.ToLower(k))
|
||||
if strings.HasPrefix(l, "x-sls-") {
|
||||
slsHeaders[l] = strings.TrimSpace(v)
|
||||
slsHeaderKeys = append(slsHeaderKeys, l)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(slsHeaderKeys)
|
||||
for i, k := range slsHeaderKeys {
|
||||
canoHeaders += k + ":" + slsHeaders[k]
|
||||
if i+1 < len(slsHeaderKeys) {
|
||||
canoHeaders += "\n"
|
||||
}
|
||||
}
|
||||
|
||||
// Calc CanonicalizedResource
|
||||
u, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
canoResource += url.QueryEscape(u.Path)
|
||||
if u.RawQuery != "" {
|
||||
var keys sort.StringSlice
|
||||
|
||||
vals := u.Query()
|
||||
for k := range vals {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
|
||||
sort.Sort(keys)
|
||||
canoResource += "?"
|
||||
for i, k := range keys {
|
||||
if i > 0 {
|
||||
canoResource += "&"
|
||||
}
|
||||
|
||||
for _, v := range vals[k] {
|
||||
canoResource += k + "=" + v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
signStr := method + "\n" +
|
||||
contentMD5 + "\n" +
|
||||
contentType + "\n" +
|
||||
date + "\n" +
|
||||
canoHeaders + "\n" +
|
||||
canoResource
|
||||
|
||||
// Signature = base64(hmac-sha1(UTF8-Encoding-Of(SignString),AccessKeySecret))
|
||||
mac := hmac.New(sha1.New, []byte(project.AccessKeySecret))
|
||||
_, err = mac.Write([]byte(signStr))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
digest = base64.StdEncoding.EncodeToString(mac.Sum(nil))
|
||||
return
|
||||
}
|
@ -1,47 +0,0 @@
|
||||
package alils
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
)
|
||||
|
||||
func GetCurrentInterface() *net.Interface {
|
||||
if inter, err := net.InterfaceByName("eth0"); err == nil {
|
||||
return inter
|
||||
} else if inter, err := net.InterfaceByName("en0"); err == nil {
|
||||
return inter
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetCurrentInterfaceHardwareAddr() string {
|
||||
if inter := GetCurrentInterface(); inter != nil {
|
||||
return fmt.Sprintf("%s", inter.HardwareAddr)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func GetCurrentInterfaceAddrs() string {
|
||||
if inter := GetCurrentInterface(); inter != nil {
|
||||
if addrs, err := inter.Addrs(); err == nil {
|
||||
return fmt.Sprintf("%s", addrs)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func GetHostname() string {
|
||||
hostname, _ := os.Hostname()
|
||||
return hostname
|
||||
}
|
||||
|
||||
func GetUserHomename() string {
|
||||
homedir, _ := os.UserHomeDir()
|
||||
return homedir
|
||||
}
|
||||
|
||||
func Getwd() string {
|
||||
wd, _ := os.Getwd()
|
||||
return wd
|
||||
}
|
@ -80,7 +80,7 @@ func (el *esLogger) WriteMsg(when time.Time, msg string, level int, lable string
|
||||
vals["hostname"] = GetHostname()
|
||||
vals["working_idr"] = Getwd()
|
||||
vals["home_dir"] = GetUserHomename()
|
||||
vals["hardware_addr"] = GetCurrentInterfaceHardwareAddr()
|
||||
vals["hardware_addr"] = GetCurrentInterface().HardwareAddr
|
||||
vals["client_addrs"] = GetCurrentInterfaceAddrs()
|
||||
|
||||
if el.IndexName == "" {
|
||||
@ -89,7 +89,7 @@ func (el *esLogger) WriteMsg(when time.Time, msg string, level int, lable string
|
||||
|
||||
d := goes.Document{
|
||||
Index: fmt.Sprintf("%s-%04d.%02d.%02d", el.IndexName, when.Year(), when.Month(), when.Day()),
|
||||
Type: "_doc",
|
||||
Type: "logs",
|
||||
Fields: vals,
|
||||
}
|
||||
_, err := el.Index(d, nil)
|
||||
|
@ -15,13 +15,6 @@ func GetCurrentInterface() *net.Interface {
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetCurrentInterfaceHardwareAddr() string {
|
||||
if inter := GetCurrentInterface(); inter != nil {
|
||||
return fmt.Sprintf("%s", inter.HardwareAddr)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func GetCurrentInterfaceAddrs() string {
|
||||
if inter := GetCurrentInterface(); inter != nil {
|
||||
if addrs, err := inter.Addrs(); err == nil {
|
||||
|
@ -281,7 +281,7 @@ func testFileRotate(t *testing.T, fn1, fn2 string, daily, hourly bool) {
|
||||
fw.hourlyOpenDate = fw.hourlyOpenTime.Day()
|
||||
}
|
||||
|
||||
fw.WriteMsg(time.Now(), "this is a msg for test", LevelDebug, "", "dev")
|
||||
fw.WriteMsg(time.Now(), "this is a msg for test", LevelDebug)
|
||||
|
||||
for _, file := range []string{fn1, fn2} {
|
||||
_, err := os.Stat(file)
|
||||
|
12
options.go
12
options.go
@ -4,11 +4,6 @@ type Level int
|
||||
|
||||
type Adapter string
|
||||
|
||||
type AdapterTupple struct {
|
||||
Level Level
|
||||
Adapter Adapter
|
||||
}
|
||||
|
||||
const (
|
||||
_ Level = iota
|
||||
LevelNone Level = iota
|
||||
@ -26,11 +21,4 @@ const (
|
||||
AdapterConsole Adapter = "console"
|
||||
AdapterSocket Adapter = "socket"
|
||||
AdapterElasticSearch Adapter = "es"
|
||||
AdapterAliLs Adapter = "alils"
|
||||
)
|
||||
|
||||
const (
|
||||
DEV = "dev"
|
||||
GRAY = "gray"
|
||||
PROD = "prod"
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user