Compare commits

...

23 Commits

Author SHA1 Message Date
8b94a0d256 修改es账号密码 2022-10-31 15:41:38 +08:00
38247cf3c9 修复es type错误问题 2022-10-28 17:18:50 +08:00
36a085eccb 修改es服务端地址 2022-10-28 16:03:47 +08:00
liangzy
e0682ed122 v1.3.5 Fix 修复goroutine并发调用初始化 nil pointer的问题 2020-05-16 14:45:18 +08:00
liangzy
cd85fd66ff 并发读nil错针的问题 2020-05-15 18:33:47 +08:00
liangzy
3c03b5c497 v1.3.3 暴露两个配置方法 2020-05-07 10:59:16 +08:00
liangzy
cd94900f19 v1.3.3 暴露两个配置方法 2020-05-06 15:34:47 +08:00
liangzy
ce9ab060d9 v1.3.3 暴露两个配置方法 2020-05-06 15:30:24 +08:00
liangzy
c653a48667 v1.3.2 增加 glogs.Debug 等直接方法 2020-05-06 14:34:31 +08:00
liangzy
8c78c2a6e2 v1.3.2 增加 glogs.Debug 等直接方法 2020-05-06 14:33:34 +08:00
liangzy
3a09faf4b8 v1.3.2 增加 glogs.Debug 等直接方法 2020-05-06 14:33:04 +08:00
liangzy
8998a4938d v1.3.2 增加 glogs.Debug 等直接方法 2020-05-06 14:32:26 +08:00
liangzy
01c35869b8 接入阿里sls, 增加关闭日志通道 2020-05-06 11:22:19 +08:00
liangzy
1d9b78eb37 接入阿里sls, 增加关闭日志通道 2020-05-06 11:10:39 +08:00
liangzy
b3a1b78006 接入阿里sls 2020-05-06 10:19:07 +08:00
liangzy
c014aca84b 接入阿里sls 2020-05-05 17:04:08 +08:00
liangzy
4843033fe0 接入阿里sls 2020-05-05 16:14:19 +08:00
liangzy
5e9f6ee73f 接入阿里sls 2020-05-05 16:10:42 +08:00
liangzy
27ff5fb3db 接入阿里sls 2020-05-04 21:10:20 +08:00
liangzy
eebd6500f3 fix hardward addr bug 2020-05-04 11:11:11 +08:00
liangzy
d6024cf64c 更新read me 2020-04-29 11:28:18 +08:00
liangzy
1e57935100 更新read me 2020-04-28 19:56:24 +08:00
liangzy
e6655504e9 更新read me 2020-04-28 18:47:31 +08:00
21 changed files with 582 additions and 2657 deletions

View File

@ -1,21 +1,21 @@
# grlogs # grlogs
本库为争游内部日志公共库 本库为争游内部日志公共库, 该库基于 beegoLogger 基础上完善, 目前支持的引擎有 file、console、net、smtp、es、alisls
## 代码示例 ## 代码示例
1. 引入 1. 引入
```go ```
import "golib.gaore.com/GaoreGo/grlogs" import "golib.gaore.com/GaoreGo/grlogs"
``` ```
2. 简单用法 2. 简单用法
```go ```
grlogs.Get("test", 128).Info("hello word") grlogs.Get("test", 128).Info("hello word")
grlogs.Get("test").Warning("hello word") grlogs.Get("test").Warning("hello word")
``` ```
`Get` 方法中 `lable` 参数为标签为识别分类所用在Grlogs里一个分类使用一个管道进行日志 `Get` 方法中 `lable` 参数为标签为识别分类所用在Grlogs里一个分类使用一个管道进行日志
3. 进阶用法 3. 进阶用法
```go ```
logger := grlogs.GetEs("wifi") logger := grlogs.GetEs("wifi")
logger.SetAdapter(LevelAll, AdapterElasticSearch) logger.SetAdapter(LevelAll, AdapterElasticSearch)
logger.SetAdapter(LevelInfo, AdapterFile) logger.SetAdapter(LevelInfo, AdapterFile)
@ -23,9 +23,70 @@ logger.Critical("出错了")
logger.Info("出错了") logger.Info("出错了")
``` ```
4. 如果需要写入es 必须设置环境变量 `GRLOG_APP_NAME`, 不能有反斜杠, 如 4. 如果需要写入es 或 alils 必须设置环境变量 `GRLOG_APP_NAME`, 不能有反斜杠, 如
```shell script ```shell script
export GRLOG_APP_NAME=mkt.gaore.com; export GRLOG_APP_NAME=mkt.gaore.com;
``` ```
还需要额外引入es库完成初始化动作
```go
import _ "golib.gaore.com/GaoreGo/grlogs/logs/es"
```
5. 文件日志会写入到 `./runtime/logs/` 文件夹 请务必在项目构建阶段创建该目录 ```go
import _ "golib.gaore.com/GaoreGo/grlogs/logs/alils"
```
5. 文件日志会写入到 `./runtime/logs/` 文件夹 **请务必在项目构建阶段创建该目录**
6. `AliLS` 日志接入的是阿里SLS GOSDK , 下面是关于 [https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer] 描述
producer提供了两种关闭模式分为有限关闭和安全关闭安全关闭会等待producer中缓存的所有的数据全部发送完成以后在关闭producer有限关闭会接收用户传递的一个参数值时间单位为秒当开始关闭producer的时候开始计时超过传递的设定值还未能完全关闭producer的话会强制退出producer此时可能会有部分数据未被成功发送而丢失。
所以用了aliLS 的 adapter 时,最好调用`grlogs.Close()` 或 `grlogs.CloseAll()` 方法安全关闭通道,以刷新缓冲区
7. 完整示例
```go
package grlogs
import (
"fmt"
_ "golib.gaore.com/GaoreGo/grlogs/logs/es"
"testing"
"time"
)
func TestGetLogger(t *testing.T) {
// 新建 channel 大小为128 标识为nds 日志通道 , Get 的方法 默认带 console 和 file 输出
l := grlogs.Get("nds", 128).SetAdapter(LevelAll, AdapterElasticSearch)
l.Debug("我正在调试")
l.Critical("出错了")
// 复用 nds 的日志通道
grlogs.Get("nds").Warning("hadoee %s", time.Now().Format(time.RFC1123))
grlogs.Get("nds").Warning("hadoee %s", time.Now().Format(time.RFC1123))
// 新建 channel 大小为默认 标识为wifi 日志通道 , GetEs 的方法 默认带 console 和 file 和 elatisearch 输出
grlogs.GetEs("wifi")
for i := 0; i < 10; i++ {
grlogs.Get("wifi").Warning("Warning")
grlogs.Get("wifi").Warn("Warn")
grlogs.Get("wifi").Debug("Debug")
grlogs.Get("wifi").Error("Error")
grlogs.Get("wifi").Notice("Notice")
grlogs.Get("wifi").Info("Info")
grlogs.Get("wifi").Alert("Alert")
}
Get("wifi").Critical("neoweiwoewe")
}
func TestDropAdapter(t *testing.T) {
grlogs.SetAdapter(LevelAll, AdapterAliLs)
grlogs.DropAdapter(AdapterAliLs)
grlogs.Informational(errors.New("he hello"))
grlogs.SetAdapter(LevelAll, AdapterAliLs)
grlogs.Debug(errors.New("he hello"))
grlogs.CloseAll()
}
```

View File

@ -54,3 +54,20 @@ func (c *EsConfig) String() string {
b, _ := json.Marshal(c) b, _ := json.Marshal(c)
return bytes.NewBuffer(b).String() return bytes.NewBuffer(b).String()
} }
type AliLSConfig struct {
Project string `json:"project"`
Endpoint string `json:"endpoint"`
KeyID string `json:"key_id"`
KeySecret string `json:"key_secret"`
LogStore string `json:"log_store"`
Topics []string `json:"topics"`
Source string `json:"source"`
Level Level `json:"level"`
FlushWhen int `json:"flush_when"`
}
func (c *AliLSConfig) String() string {
b, _ := json.Marshal(c)
return bytes.NewBuffer(b).String()
}

View File

@ -5,9 +5,21 @@ import (
"github.com/astaxie/beego/logs" "github.com/astaxie/beego/logs"
"os" "os"
"path" "path"
"strings"
) )
var adatperMapper = map[Adapter]func(l *Logger, level Level) error{ var adatperSetMapper = map[Adapter]func(l *Logger, level Level) error{
AdapterSocket: func(l *Logger, level Level) error {
c := ConnLogConfig{
ReconnectOnMsg: false,
Reconnect: true,
Net: "",
Addr: "127.0.0.1:9888",
Level: level,
}
return l.SetLogger(logs.AdapterConn, c.String())
},
AdapterFile: func(l *Logger, level Level) error { AdapterFile: func(l *Logger, level Level) error {
if wd, err := os.Getwd(); err == nil { if wd, err := os.Getwd(); err == nil {
@ -33,20 +45,95 @@ var adatperMapper = map[Adapter]func(l *Logger, level Level) error{
}, },
AdapterElasticSearch: func(l *Logger, level Level) error { AdapterElasticSearch: func(l *Logger, level Level) error {
c := GenEsConfig(level)
dsn := "http://es-cn-0pp1mm3hq000dnbh4.public.elasticsearch.aliyuncs.com:9200/"
if envkey == "prod" || envkey == "" || envkey == "gray" {
dsn = "http://es-cn-0pp1mm3hq000dnbh4.elasticsearch.aliyuncs.com:9200/"
}
c := EsConfig{
Username: "elastic",
Password: "Hellogaore@",
Dsn: dsn,
Level: level,
Index: os.Getenv("GRLOG_APP_NAME"),
}
return l.SetLogger(logs.AdapterEs, c.String()) return l.SetLogger(logs.AdapterEs, c.String())
}, },
AdapterAliLs: func(l *Logger, level Level) error {
c := GenAliConfig(level)
return l.SetLogger(logs.AdapterAliLS, c.String())
},
}
func GenAliConfig(level Level) *AliLSConfig {
var project string = "gaore-app-logstore"
var endpoint string
if os.Getenv(envkey) == "prod" || os.Getenv(envkey) == "" || os.Getenv(envkey) == "gray" {
endpoint = project + ".cn-shenzhen-intranet.log.aliyuncs.com"
} else if os.Getenv(envkey) == "dev" {
endpoint = project + ".cn-shenzhen.log.aliyuncs.com"
}
c := &AliLSConfig{
Project: project,
Endpoint: endpoint,
KeyID: "LTAI4GCHwcqtrFD4DHRHxR4k",
KeySecret: "Ln19xfVYy6OMlJeF9aBvFl4fhRUKBl",
LogStore: "gaore-app-logstore",
Topics: []string{os.Getenv("GRLOG_APP_NAME")},
Source: "",
Level: level,
FlushWhen: 0,
}
return c
}
func GenEsConfig(level Level) *EsConfig {
dsn := "http://es-cn-tl32xlfmu00015h34.public.elasticsearch.aliyuncs.com:9200/"
if os.Getenv(envkey) == "prod" || os.Getenv(envkey) == "" || os.Getenv(envkey) == "gray" {
dsn = "http://es-cn-tl32xlfmu00015h34.elasticsearch.aliyuncs.com:9200/"
}
c := &EsConfig{
Username: "kaifa_api",
Password: "2quYX3bTeahO",
Dsn: dsn,
Level: level,
Index: os.Getenv("GRLOG_APP_NAME"),
}
return c
}
var adatperDropMapper = map[Adapter]func(l *Logger) error{
AdapterAliLs: func(l *Logger) error {
return l.BeeLogger.DelLogger(logs.AdapterAliLS)
},
AdapterFile: func(l *Logger) error {
return l.BeeLogger.DelLogger(logs.AdapterFile)
},
AdapterConsole: func(l *Logger) error {
return l.BeeLogger.DelLogger(logs.AdapterConsole)
},
AdapterElasticSearch: func(l *Logger) error {
return l.BeeLogger.DelLogger(logs.AdapterEs)
},
}
func formatLog(f interface{}, v ...interface{}) string {
var msg string
switch f.(type) {
case string:
msg = f.(string)
if len(v) == 0 {
return msg
}
if strings.Contains(msg, "%") && !strings.Contains(msg, "%%") {
//format string
} else {
//do not contain format char
msg += strings.Repeat(" %v", len(v))
}
default:
msg = fmt.Sprint(f)
if len(v) == 0 {
return msg
}
msg += strings.Repeat(" %v", len(v))
}
return fmt.Sprintf(msg, v...)
} }

59
grlogs.go Normal file
View File

@ -0,0 +1,59 @@
package grlogs
import "strings"
func Info(v ...interface{}) {
Get("grlogs").Info(generateFmtStr(len(v)), v...)
}
func Informational(v ...interface{}) {
Get("grlogs").Informational(generateFmtStr(len(v)), v...)
}
func Warning(v ...interface{}) {
Get("grlogs").Warning(generateFmtStr(len(v)), v...)
}
func Warn(v ...interface{}) {
Get("grlogs").Warn(generateFmtStr(len(v)), v...)
}
func Notice(v ...interface{}) {
Get("grlogs").Notice(generateFmtStr(len(v)), v...)
}
func Error(v ...interface{}) {
Get("grlogs").Error(generateFmtStr(len(v)), v...)
}
func Critical(v ...interface{}) {
Get("grlogs").Critical(generateFmtStr(len(v)), v...)
}
func Alert(v ...interface{}) {
Get("grlogs").Alert(generateFmtStr(len(v)), v...)
}
func Emergency(v ...interface{}) {
Get("grlogs").Emergency(generateFmtStr(len(v)), v...)
}
func Trace(v ...interface{}) {
Get("grlogs").Trace(generateFmtStr(len(v)), v...)
}
func Debug(v ...interface{}) {
Get("grlogs").Debug(generateFmtStr(len(v)), v...)
}
func generateFmtStr(n int) string {
return strings.Repeat("%v ", n)
}
func SetAdapter(level Level, adapter Adapter) {
Get("grlogs").SetAdapter(level, adapter)
}
func DropAdapter(adapter Adapter) {
Get("grlogs").DropAdapter(adapter)
}

154
log.go
View File

@ -11,13 +11,21 @@ var loggers = sync.Map{}
var envkey = "CENTER_RUNMODE" var envkey = "CENTER_RUNMODE"
var defaultModeMapping = map[string][]AdapterTupple{
"es": []AdapterTupple{{LevelAll, AdapterConsole}, {LevelAll, AdapterElasticSearch}},
"ali": []AdapterTupple{{LevelAll, AdapterConsole}, {LevelAll, AdapterAliLs}},
"": []AdapterTupple{{LevelAll, AdapterConsole}, {LevelAll, AdapterFile}},
}
type Logger struct { type Logger struct {
Lable string Lable string
*logs.BeeLogger *logs.BeeLogger
} }
type Getter func() *Logger
func (self *Logger) SetAdapter(level Level, adapter Adapter) *Logger { func (self *Logger) SetAdapter(level Level, adapter Adapter) *Logger {
if call, ok := adatperMapper[adapter]; ok { if call, ok := adatperSetMapper[adapter]; ok {
if err := call(self, level); err != nil { if err := call(self, level); err != nil {
fmt.Println(err) fmt.Println(err)
} }
@ -25,51 +33,113 @@ func (self *Logger) SetAdapter(level Level, adapter Adapter) *Logger {
return self return self
} }
func New(label string, channelLens ...int64) (l *Logger, loaded bool) { func (self *Logger) DropAdapter(adapter Adapter) *Logger {
var channellens int64 if call, ok := adatperDropMapper[adapter]; ok {
var tmp interface{} if err := call(self); err != nil {
fmt.Println(err)
tmp, loaded = loggers.LoadOrStore(label, new(Logger)) }
l = tmp.(*Logger)
if len(channelLens) > 0 {
channellens = channelLens[0]
} }
return self
if !loaded {
l.Lable = label
l.BeeLogger = logs.NewLogger(channellens)
l.BeeLogger.Lable = label
l.Env = os.Getenv(envkey)
l.SetPrefix(fmt.Sprintf("[env:%s logger:%s]", os.Getenv(envkey), label))
l.EnableFuncCallDepth(true)
l.SetLogFuncCallDepth(2)
}
return
} }
func Get(label string, channelLens ...int64) (l *Logger) { func newLoggerFromMap(label string, defaultmode string, channelLens ...int64) Getter {
var filelevel Level = LevelInfo
var loaded bool if tmp, ok := loggers.Load(label); ok {
return tmp.(Getter)
}
var l *Logger
var once sync.Once
wapperGetter := Getter(func() *Logger {
once.Do(func() {
var channelLensNum int64 = 100
if len(channelLens) > 0 && channelLens[0] > 0 {
channelLensNum = channelLens[0]
}
l = &Logger{BeeLogger: logs.NewLogger(channelLensNum)}
l.Lable = label
l.BeeLogger = logs.NewLogger(channelLensNum)
l.BeeLogger.Lable = label
l.Env = os.Getenv(envkey)
l.SetPrefix(fmt.Sprintf("[env:%s logger:%s]", os.Getenv(envkey), label))
l.EnableFuncCallDepth(true)
l.SetLogFuncCallDepth(2)
if mode, ok := defaultModeMapping[defaultmode]; ok {
for _, v := range mode {
l.SetAdapter(v.Level, v.Adapter)
}
}
})
return l
})
tmp, loaded := loggers.LoadOrStore(label, wapperGetter)
if loaded {
return tmp.(Getter)
}
return wapperGetter
}
func Get(label string, channelLens ...int64) *Logger {
getter := newLoggerFromMap(label, "", channelLens...)
return getter()
}
func GetEs(label string, channelLens ...int64) *Logger {
getter := newLoggerFromMap(label, "es", channelLens...)
return getter()
}
func GetAli(label string, channelLens ...int64) *Logger {
getter := newLoggerFromMap(label, "ali", channelLens...)
return getter()
}
func Close(lables ...string) {
wg := &sync.WaitGroup{}
for _, lable := range lables {
wg.Add(1)
go func() {
if v, ok := loggers.Load(lable); ok {
if tmp, ok := v.(*Logger); ok {
loggers.Delete(lable)
tmp.BeeLogger.Close()
}
}
wg.Done()
}()
}
wg.Wait()
}
func CloseAll() {
wg := &sync.WaitGroup{}
loggers.Range(func(key, value interface{}) bool {
wg.Add(1)
go func() {
if tmp, ok := value.(*Logger); ok {
tmp.BeeLogger.Close()
loggers.Delete(key)
}
wg.Done()
}()
return true
})
wg.Wait()
}
func init() {
var level Level = LevelInfo
if os.Getenv(envkey) == "dev" { if os.Getenv(envkey) == "dev" {
filelevel = LevelAll level = LevelAll
} }
if l, loaded = New(label, channelLens...); !loaded { defaultModeMapping["es"] = []AdapterTupple{{level, AdapterConsole}, {level, AdapterElasticSearch}}
l.SetAdapter(filelevel, AdapterFile).SetAdapter(LevelAll, AdapterConsole) defaultModeMapping["ali"] = []AdapterTupple{{level, AdapterConsole}, {level, AdapterAliLs}}
} defaultModeMapping[""] = []AdapterTupple{{level, AdapterConsole}, {level, AdapterFile}}
return
}
func GetEs(label string, channelLens ...int64) (l *Logger) {
var filelevel Level = LevelInfo
var loaded bool
if os.Getenv(envkey) == "dev" {
filelevel = LevelAll
}
if l, loaded = New(label, channelLens...); !loaded {
l.SetAdapter(filelevel, AdapterFile).SetAdapter(LevelAll, AdapterConsole).SetAdapter(filelevel, AdapterElasticSearch)
}
return
} }

View File

@ -1,23 +1,98 @@
package grlogs package grlogs
import ( import (
"errors"
_ "golib.gaore.com/GaoreGo/grlogs/logs/alils"
_ "golib.gaore.com/GaoreGo/grlogs/logs/es" _ "golib.gaore.com/GaoreGo/grlogs/logs/es"
"sync"
"testing" "testing"
"time" "time"
) )
func TestGetLogger(t *testing.T) { func TestGetLogger(t *testing.T) {
// 新建 channel 大小为128 标识为nds 日志通道 , Get 的方法 默认带 console 和 file 输出
l := Get("nds", 128).SetAdapter(LevelAll, AdapterElasticSearch) l := Get("nds", 128).SetAdapter(LevelAll, AdapterElasticSearch)
l.Debug("我正在调试") l.Debug("我正在调试")
l.Critical("出错了") l.Critical("出错了")
// 复用 nds 的日志通道
Get("nds").Warning("hadoee %s", time.Now().Format(time.RFC1123))
Get("nds").Warning("hadoee %s", time.Now().Format(time.RFC1123)) Get("nds").Warning("hadoee %s", time.Now().Format(time.RFC1123))
// 新建 channel 大小为默认 标识为wifi 日志通道 , GetEs 的方法 默认带 console 和 file 和 elatisearch 输出
GetEs("wifi") GetEs("wifi")
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
Get("wifi").Error("neoweiwoewe") Get("wifi").Warning("Warning")
Get("wifi").Warn("Warn")
Get("wifi").Debug("Debug")
Get("wifi").Error("Error")
Get("wifi").Notice("Notice")
Get("wifi").Info("Info")
Get("wifi").Alert("Alert")
} }
Get("wifi").Critical("neoweiwoewe")
}
func TestGetAliLs(t *testing.T) {
lable := "test_alils"
l := GetAli(lable).Async(128)
for i := 0; i < 2; i++ {
l.Info("Info")
l.Debug("Debug")
l.Warn("Warn")
l.Warning("Warning")
l.Error("Error")
l.Error("Error\n\n 测试换行")
l.Warn("Warn")
}
time.Sleep(time.Hour * 1)
}
func TestDropAdapter(t *testing.T) {
SetAdapter(LevelAll, AdapterAliLs)
DropAdapter(AdapterAliLs)
Informational(errors.New("he hello"))
SetAdapter(LevelAll, AdapterAliLs)
Debug(errors.New("he hello"))
CloseAll()
}
func TestNew(t *testing.T) {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
GetAli("ok").Debug("%d", i)
time.Sleep(time.Second * 10)
wg.Done()
}(i)
}
for i := 0; i < 10; i++ {
GetAli("ok").Debug("aaaaaa%d", i)
}
wg.Wait()
} }
func TestGetEs(t *testing.T) { func TestGetEs(t *testing.T) {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
GetEs("ok").Debug("%d", i)
time.Sleep(time.Second * 10)
wg.Done()
}(i)
}
for i := 0; i < 10; i++ {
GetEs("ok").Debug("aaaaaa%d", i)
}
wg.Wait()
} }

View File

@ -2,23 +2,24 @@ package alils
import ( import (
"encoding/json" "encoding/json"
"strings" "fmt"
"sync" "github.com/aliyun/aliyun-log-go-sdk/producer"
"time"
"github.com/gogo/protobuf/proto"
"golib.gaore.com/GaoreGo/grlogs/logs" "golib.gaore.com/GaoreGo/grlogs/logs"
"os"
"runtime"
"time"
) )
const ( func NewAliLS() logs.Logger {
// CacheSize set the flush size cw := &alilsLogger{
CacheSize int = 64 Level: logs.LevelDebug,
// Delimiter define the topic delimiter }
Delimiter string = "##" return cw
) }
// Config is the Config for Ali Log type alilsLogger struct {
type Config struct { producer *producer.Producer
callback *Callback
Project string `json:"project"` Project string `json:"project"`
Endpoint string `json:"endpoint"` Endpoint string `json:"endpoint"`
KeyID string `json:"key_id"` KeyID string `json:"key_id"`
@ -28,157 +29,68 @@ type Config struct {
Source string `json:"source"` Source string `json:"source"`
Level int `json:"level"` Level int `json:"level"`
FlushWhen int `json:"flush_when"` FlushWhen int `json:"flush_when"`
Debug bool
} }
// aliLSWriter implements LoggerInterface. func (a *alilsLogger) Init(jsonconfig string) error {
// it writes messages in keep-live tcp connection.
type aliLSWriter struct {
store *LogStore
group []*LogGroup
withMap bool
groupMap map[string]*LogGroup
lock *sync.Mutex
Config
}
// NewAliLS create a new Logger err := json.Unmarshal([]byte(jsonconfig), a)
func NewAliLS() logs.Logger {
alils := new(aliLSWriter)
alils.Level = logs.LevelTrace
return alils
}
// Init parse config and init struct
func (c *aliLSWriter) Init(jsonConfig string) (err error) {
json.Unmarshal([]byte(jsonConfig), c)
if c.FlushWhen > CacheSize {
c.FlushWhen = CacheSize
}
prj := &LogProject{
Name: c.Project,
Endpoint: c.Endpoint,
AccessKeyID: c.KeyID,
AccessKeySecret: c.KeySecret,
}
c.store, err = prj.GetLogStore(c.LogStore)
if err != nil { if err != nil {
return err return err
} }
producerConfig := producer.GetDefaultProducerConfig()
// Create default Log Group producerConfig.Endpoint = a.Endpoint
c.group = append(c.group, &LogGroup{ producerConfig.AccessKeyID = a.KeyID
Topic: proto.String(""), producerConfig.AccessKeySecret = a.KeySecret
Source: proto.String(c.Source), producerConfig.LingerMs = 100
Logs: make([]*Log, 0, c.FlushWhen), producerConfig.NoRetryStatusCodeList = []int{-1}
}) producerConfig.Retries = 2
producerConfig.AllowLogLevel = "error"
// Create other Log Group producerConfig.MaxIoWorkerCount = int64(runtime.NumCPU())
c.groupMap = make(map[string]*LogGroup) a.producer = producer.InitProducer(producerConfig)
for _, topic := range c.Topics { a.callback = &Callback{}
a.producer.Start()
lg := &LogGroup{ a.Debug = os.Getenv("GRLOG_ALILS_DEBUG") == "on"
Topic: proto.String(topic),
Source: proto.String(c.Source),
Logs: make([]*Log, 0, c.FlushWhen),
}
c.group = append(c.group, lg)
c.groupMap[topic] = lg
}
if len(c.group) == 1 {
c.withMap = false
} else {
c.withMap = true
}
c.lock = &sync.Mutex{}
return nil return nil
} }
// WriteMsg write message in connection. func (a *alilsLogger) WriteMsg(when time.Time, msg string, level int, lable string, env string) error {
// if connection is down, try to re-connect.
func (c *aliLSWriter) WriteMsg(when time.Time, msg string, level int, lable string, env string) (err error) {
if level > c.Level { vals := map[string]string{"msg": msg, "level": fmt.Sprintf("%d", level)}
return nil vals["level_string"] = logs.GetLevelString(level)
} vals["env"] = env
vals["lable"] = lable
vals["hostname"] = GetHostname()
vals["working_idr"] = Getwd()
vals["home_dir"] = GetUserHomename()
vals["hardware_addr"] = GetCurrentInterfaceHardwareAddr()
vals["client_addrs"] = GetCurrentInterfaceAddrs()
var topic string log := producer.GenerateLog(uint32(when.Unix()), vals)
var content string
var lg *LogGroup
if c.withMap {
// TopicLogGroup if a.Debug {
strs := strings.SplitN(msg, Delimiter, 2) for _, topic := range a.Topics {
if len(strs) == 2 { if err := a.producer.SendLogWithCallBack(a.Project, a.LogStore, topic, a.Source, log, a.callback); err != nil {
pos := strings.LastIndex(strs[0], " ") return err
topic = strs[0][pos+1 : len(strs[0])] }
content = strs[0][0:pos] + strs[1]
lg = c.groupMap[topic]
}
// send to empty Topic
if lg == nil {
content = msg
lg = c.group[0]
} }
} else { } else {
content = msg for _, topic := range a.Topics {
lg = c.group[0] if err := a.producer.SendLog(a.Project, a.LogStore, topic, a.Source, log); err != nil {
return err
}
}
} }
c1 := &LogContent{
Key: proto.String("msg"),
Value: proto.String(content),
}
l := &Log{
Time: proto.Uint32(uint32(when.Unix())),
Contents: []*LogContent{
c1,
},
}
c.lock.Lock()
lg.Logs = append(lg.Logs, l)
c.lock.Unlock()
if len(lg.Logs) >= c.FlushWhen {
c.flush(lg)
}
return nil return nil
} }
// Flush implementing method. empty. func (a *alilsLogger) Destroy() {
func (c *aliLSWriter) Flush() { a.producer.SafeClose()
a.producer.Close(300)
// flush all group
for _, lg := range c.group {
c.flush(lg)
}
} }
// Destroy destroy connection writer and close tcp listener. func (a *alilsLogger) Flush() {
func (c *aliLSWriter) Destroy() {
}
func (c *aliLSWriter) flush(lg *LogGroup) {
c.lock.Lock()
defer c.lock.Unlock()
err := c.store.PutLogs(lg)
if err != nil {
return
}
lg.Logs = make([]*Log, 0, c.FlushWhen)
} }
func init() { func init() {

25
logs/alils/callback.go Normal file
View File

@ -0,0 +1,25 @@
package alils
import (
"fmt"
"github.com/aliyun/aliyun-log-go-sdk/producer"
)
type Callback struct {
}
func (callback *Callback) Success(result *producer.Result) {
attemptList := result.GetReservedAttempts()
for _, attempt := range attemptList {
fmt.Printf("alilog %+v \n", attempt)
}
}
func (callback *Callback) Fail(result *producer.Result) {
fmt.Println("IsSuccessful", result.IsSuccessful())
fmt.Println("GetErrorCode", result.GetErrorCode())
fmt.Println("GetErrorMessage", result.GetErrorMessage())
fmt.Println("GetReservedAttempts", result.GetReservedAttempts())
fmt.Println("GetRequestId", result.GetRequestId())
fmt.Println("GetTimeStampMs", result.GetTimeStampMs())
}

View File

@ -1,13 +0,0 @@
package alils
const (
version = "0.5.0" // SDK version
signatureMethod = "hmac-sha1" // Signature method
// OffsetNewest stands for the log head offset, i.e. the offset that will be
// assigned to the next message that will be produced to the shard.
OffsetNewest = "end"
// OffsetOldest stands for the oldest offset available on the logstore for a
// shard.
OffsetOldest = "begin"
)

File diff suppressed because it is too large Load Diff

View File

@ -1,42 +0,0 @@
package alils
// InputDetail define log detail
type InputDetail struct {
LogType string `json:"logType"`
LogPath string `json:"logPath"`
FilePattern string `json:"filePattern"`
LocalStorage bool `json:"localStorage"`
TimeFormat string `json:"timeFormat"`
LogBeginRegex string `json:"logBeginRegex"`
Regex string `json:"regex"`
Keys []string `json:"key"`
FilterKeys []string `json:"filterKey"`
FilterRegex []string `json:"filterRegex"`
TopicFormat string `json:"topicFormat"`
}
// OutputDetail define the output detail
type OutputDetail struct {
Endpoint string `json:"endpoint"`
LogStoreName string `json:"logstoreName"`
}
// LogConfig define Log Config
type LogConfig struct {
Name string `json:"configName"`
InputType string `json:"inputType"`
InputDetail InputDetail `json:"inputDetail"`
OutputType string `json:"outputType"`
OutputDetail OutputDetail `json:"outputDetail"`
CreateTime uint32
LastModifyTime uint32
project *LogProject
}
// GetAppliedMachineGroup returns applied machine group of this config.
func (c *LogConfig) GetAppliedMachineGroup(confName string) (groupNames []string, err error) {
groupNames, err = c.project.GetAppliedMachineGroups(c.Name)
return
}

View File

@ -1,819 +0,0 @@
/*
Package alils implements the SDK(v0.5.0) of Simple Log Service(abbr. SLS).
For more description about SLS, please read this article:
http://gitlab.alibaba-inc.com/sls/doc.
*/
package alils
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
)
// Error message in SLS HTTP response.
type errorMessage struct {
Code string `json:"errorCode"`
Message string `json:"errorMessage"`
}
// LogProject Define the Ali Project detail
type LogProject struct {
Name string // Project name
Endpoint string // IP or hostname of SLS endpoint
AccessKeyID string
AccessKeySecret string
}
// NewLogProject creates a new SLS project.
func NewLogProject(name, endpoint, AccessKeyID, accessKeySecret string) (p *LogProject, err error) {
p = &LogProject{
Name: name,
Endpoint: endpoint,
AccessKeyID: AccessKeyID,
AccessKeySecret: accessKeySecret,
}
return p, nil
}
// ListLogStore returns all logstore names of project p.
func (p *LogProject) ListLogStore() (storeNames []string, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
uri := fmt.Sprintf("/logstores")
r, err := request(p, "GET", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to list logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
type Body struct {
Count int
LogStores []string
}
body := &Body{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}
storeNames = body.LogStores
return
}
// GetLogStore returns logstore according by logstore name.
func (p *LogProject) GetLogStore(name string) (s *LogStore, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
r, err := request(p, "GET", "/logstores/"+name, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
s = &LogStore{}
err = json.Unmarshal(buf, s)
if err != nil {
return
}
s.project = p
return
}
// CreateLogStore creates a new logstore in SLS,
// where name is logstore name,
// and ttl is time-to-live(in day) of logs,
// and shardCnt is the number of shards.
func (p *LogProject) CreateLogStore(name string, ttl, shardCnt int) (err error) {
type Body struct {
Name string `json:"logstoreName"`
TTL int `json:"ttl"`
ShardCount int `json:"shardCount"`
}
store := &Body{
Name: name,
TTL: ttl,
ShardCount: shardCnt,
}
body, err := json.Marshal(store)
if err != nil {
return
}
h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
r, err := request(p, "POST", "/logstores", h, body)
if err != nil {
return
}
body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to create logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// DeleteLogStore deletes a logstore according by logstore name.
func (p *LogProject) DeleteLogStore(name string) (err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
r, err := request(p, "DELETE", "/logstores/"+name, h, nil)
if err != nil {
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to delete logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// UpdateLogStore updates a logstore according by logstore name,
// obviously we can't modify the logstore name itself.
func (p *LogProject) UpdateLogStore(name string, ttl, shardCnt int) (err error) {
type Body struct {
Name string `json:"logstoreName"`
TTL int `json:"ttl"`
ShardCount int `json:"shardCount"`
}
store := &Body{
Name: name,
TTL: ttl,
ShardCount: shardCnt,
}
body, err := json.Marshal(store)
if err != nil {
return
}
h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
r, err := request(p, "PUT", "/logstores", h, body)
if err != nil {
return
}
body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to update logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// ListMachineGroup returns machine group name list and the total number of machine groups.
// The offset starts from 0 and the size is the max number of machine groups could be returned.
func (p *LogProject) ListMachineGroup(offset, size int) (m []string, total int, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
if size <= 0 {
size = 500
}
uri := fmt.Sprintf("/machinegroups?offset=%v&size=%v", offset, size)
r, err := request(p, "GET", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to list machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
type Body struct {
MachineGroups []string
Count int
Total int
}
body := &Body{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}
m = body.MachineGroups
total = body.Total
return
}
// GetMachineGroup retruns machine group according by machine group name.
func (p *LogProject) GetMachineGroup(name string) (m *MachineGroup, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
r, err := request(p, "GET", "/machinegroups/"+name, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get machine group:%v", name)
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
m = &MachineGroup{}
err = json.Unmarshal(buf, m)
if err != nil {
return
}
m.project = p
return
}
// CreateMachineGroup creates a new machine group in SLS.
func (p *LogProject) CreateMachineGroup(m *MachineGroup) (err error) {
body, err := json.Marshal(m)
if err != nil {
return
}
h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
r, err := request(p, "POST", "/machinegroups", h, body)
if err != nil {
return
}
body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to create machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// UpdateMachineGroup updates a machine group.
func (p *LogProject) UpdateMachineGroup(m *MachineGroup) (err error) {
body, err := json.Marshal(m)
if err != nil {
return
}
h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
r, err := request(p, "PUT", "/machinegroups/"+m.Name, h, body)
if err != nil {
return
}
body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to update machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// DeleteMachineGroup deletes machine group according machine group name.
func (p *LogProject) DeleteMachineGroup(name string) (err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
r, err := request(p, "DELETE", "/machinegroups/"+name, h, nil)
if err != nil {
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to delete machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// ListConfig returns config names list and the total number of configs.
// The offset starts from 0 and the size is the max number of configs could be returned.
func (p *LogProject) ListConfig(offset, size int) (cfgNames []string, total int, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
if size <= 0 {
size = 100
}
uri := fmt.Sprintf("/configs?offset=%v&size=%v", offset, size)
r, err := request(p, "GET", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to delete machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
type Body struct {
Total int
Configs []string
}
body := &Body{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}
cfgNames = body.Configs
total = body.Total
return
}
// GetConfig returns config according by config name.
func (p *LogProject) GetConfig(name string) (c *LogConfig, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
r, err := request(p, "GET", "/configs/"+name, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to delete config")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
c = &LogConfig{}
err = json.Unmarshal(buf, c)
if err != nil {
return
}
c.project = p
return
}
// UpdateConfig updates a config.
func (p *LogProject) UpdateConfig(c *LogConfig) (err error) {
body, err := json.Marshal(c)
if err != nil {
return
}
h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
r, err := request(p, "PUT", "/configs/"+c.Name, h, body)
if err != nil {
return
}
body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to update config")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// CreateConfig creates a new config in SLS.
func (p *LogProject) CreateConfig(c *LogConfig) (err error) {
body, err := json.Marshal(c)
if err != nil {
return
}
h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}
r, err := request(p, "POST", "/configs", h, body)
if err != nil {
return
}
body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to update config")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// DeleteConfig deletes a config according by config name.
func (p *LogProject) DeleteConfig(name string) (err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
r, err := request(p, "DELETE", "/configs/"+name, h, nil)
if err != nil {
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to delete config")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// GetAppliedMachineGroups returns applied machine group names list according config name.
func (p *LogProject) GetAppliedMachineGroups(confName string) (groupNames []string, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
uri := fmt.Sprintf("/configs/%v/machinegroups", confName)
r, err := request(p, "GET", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get applied machine groups")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
type Body struct {
Count int
Machinegroups []string
}
body := &Body{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}
groupNames = body.Machinegroups
return
}
// GetAppliedConfigs returns applied config names list according machine group name groupName.
func (p *LogProject) GetAppliedConfigs(groupName string) (confNames []string, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
uri := fmt.Sprintf("/machinegroups/%v/configs", groupName)
r, err := request(p, "GET", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to applied configs")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
type Cfg struct {
Count int `json:"count"`
Configs []string `json:"configs"`
}
body := &Cfg{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}
confNames = body.Configs
return
}
// ApplyConfigToMachineGroup applies config to machine group.
func (p *LogProject) ApplyConfigToMachineGroup(confName, groupName string) (err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
uri := fmt.Sprintf("/machinegroups/%v/configs/%v", groupName, confName)
r, err := request(p, "PUT", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to apply config to machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// RemoveConfigFromMachineGroup removes config from machine group.
func (p *LogProject) RemoveConfigFromMachineGroup(confName, groupName string) (err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
uri := fmt.Sprintf("/machinegroups/%v/configs/%v", groupName, confName)
r, err := request(p, "DELETE", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to remove config from machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}

View File

@ -1,271 +0,0 @@
package alils
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
"strconv"
lz4 "github.com/cloudflare/golz4"
"github.com/gogo/protobuf/proto"
)
// LogStore Store the logs
type LogStore struct {
Name string `json:"logstoreName"`
TTL int
ShardCount int
CreateTime uint32
LastModifyTime uint32
project *LogProject
}
// Shard define the Log Shard
type Shard struct {
ShardID int `json:"shardID"`
}
// ListShards returns shard id list of this logstore.
func (s *LogStore) ListShards() (shardIDs []int, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
uri := fmt.Sprintf("/logstores/%v/shards", s.Name)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to list logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Println(dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
var shards []*Shard
err = json.Unmarshal(buf, &shards)
if err != nil {
return
}
for _, v := range shards {
shardIDs = append(shardIDs, v.ShardID)
}
return
}
// PutLogs put logs into logstore.
// The callers should transform user logs into LogGroup.
func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
body, err := proto.Marshal(lg)
if err != nil {
return
}
// Compresse body with lz4
out := make([]byte, lz4.CompressBound(body))
n, err := lz4.Compress(body, out)
if err != nil {
return
}
h := map[string]string{
"x-sls-compresstype": "lz4",
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/x-protobuf",
}
uri := fmt.Sprintf("/logstores/%v", s.Name)
r, err := request(s.project, "POST", uri, h, out[:n])
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to put logs")
dump, _ := httputil.DumpResponse(r, true)
fmt.Println(dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}
// GetCursor gets log cursor of one shard specified by shardID.
// The from can be in three form: a) unix timestamp in seccond, b) "begin", c) "end".
// For more detail please read: http://gitlab.alibaba-inc.com/sls/doc/blob/master/api/shard.md#logstore
func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
uri := fmt.Sprintf("/logstores/%v/shards/%v?type=cursor&from=%v",
s.Name, shardID, from)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get cursor")
dump, _ := httputil.DumpResponse(r, true)
fmt.Println(dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
type Body struct {
Cursor string
}
body := &Body{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}
cursor = body.Cursor
return
}
// GetLogsBytes gets logs binary data from shard specified by shardID according cursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogsBytes(shardID int, cursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
"Accept": "application/x-protobuf",
"Accept-Encoding": "lz4",
}
uri := fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
s.Name, shardID, cursor, logGroupMaxCount)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get cursor")
dump, _ := httputil.DumpResponse(r, true)
fmt.Println(dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
v, ok := r.Header["X-Sls-Compresstype"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-sls-compresstype' header")
return
}
if v[0] != "lz4" {
err = fmt.Errorf("unexpected compress type:%v", v[0])
return
}
v, ok = r.Header["X-Sls-Cursor"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-sls-cursor' header")
return
}
nextCursor = v[0]
v, ok = r.Header["X-Sls-Bodyrawsize"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-sls-bodyrawsize' header")
return
}
bodyRawSize, err := strconv.Atoi(v[0])
if err != nil {
return
}
out = make([]byte, bodyRawSize)
err = lz4.Uncompress(buf, out)
if err != nil {
return
}
return
}
// LogsBytesDecode decodes logs binary data retruned by GetLogsBytes API
func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {
gl = &LogGroupList{}
err = proto.Unmarshal(data, gl)
if err != nil {
return
}
return
}
// GetLogs gets logs from shard specified by shardID according cursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogs(shardID int, cursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
out, nextCursor, err := s.GetLogsBytes(shardID, cursor, logGroupMaxCount)
if err != nil {
return
}
gl, err = LogsBytesDecode(out)
if err != nil {
return
}
return
}

View File

@ -1,91 +0,0 @@
package alils
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
)
// MachineGroupAttribute define the Attribute
type MachineGroupAttribute struct {
ExternalName string `json:"externalName"`
TopicName string `json:"groupTopic"`
}
// MachineGroup define the machine Group
type MachineGroup struct {
Name string `json:"groupName"`
Type string `json:"groupType"`
MachineIDType string `json:"machineIdentifyType"`
MachineIDList []string `json:"machineList"`
Attribute MachineGroupAttribute `json:"groupAttribute"`
CreateTime uint32
LastModifyTime uint32
project *LogProject
}
// Machine define the Machine
type Machine struct {
IP string
UniqueID string `json:"machine-uniqueid"`
UserdefinedID string `json:"userdefined-id"`
}
// MachineList define the Machine List
type MachineList struct {
Total int
Machines []*Machine
}
// ListMachines returns machine list of this machine group.
func (m *MachineGroup) ListMachines() (ms []*Machine, total int, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}
uri := fmt.Sprintf("/machinegroups/%v/machines", m.Name)
r, err := request(m.project, "GET", uri, h, nil)
if err != nil {
return
}
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}
if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to remove config from machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Println(dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
body := &MachineList{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}
ms = body.Machines
total = body.Total
return
}
// GetAppliedConfigs returns applied configs of this machine group.
func (m *MachineGroup) GetAppliedConfigs() (confNames []string, err error) {
confNames, err = m.project.GetAppliedConfigs(m.Name)
return
}

View File

@ -1,62 +0,0 @@
package alils
import (
"bytes"
"crypto/md5"
"fmt"
"net/http"
)
// request sends a request to SLS.
func request(project *LogProject, method, uri string, headers map[string]string,
body []byte) (resp *http.Response, err error) {
// The caller should provide 'x-sls-bodyrawsize' header
if _, ok := headers["x-sls-bodyrawsize"]; !ok {
err = fmt.Errorf("Can't find 'x-sls-bodyrawsize' header")
return
}
// SLS public request headers
headers["Host"] = project.Name + "." + project.Endpoint
headers["Date"] = nowRFC1123()
headers["x-sls-apiversion"] = version
headers["x-sls-signaturemethod"] = signatureMethod
if body != nil {
bodyMD5 := fmt.Sprintf("%X", md5.Sum(body))
headers["Content-MD5"] = bodyMD5
if _, ok := headers["Content-Type"]; !ok {
err = fmt.Errorf("Can't find 'Content-Type' header")
return
}
}
// Calc Authorization
// Authorization = "SLS <AccessKeyID>:<Signature>"
digest, err := signature(project, method, uri, headers)
if err != nil {
return
}
auth := fmt.Sprintf("SLS %v:%v", project.AccessKeyID, digest)
headers["Authorization"] = auth
// Initialize http request
reader := bytes.NewReader(body)
urlStr := fmt.Sprintf("http://%v.%v%v", project.Name, project.Endpoint, uri)
req, err := http.NewRequest(method, urlStr, reader)
if err != nil {
return
}
for k, v := range headers {
req.Header.Add(k, v)
}
// Get ready to do request
resp, err = http.DefaultClient.Do(req)
if err != nil {
return
}
return
}

View File

@ -1,111 +0,0 @@
package alils
import (
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"net/url"
"sort"
"strings"
"time"
)
// GMT location
var gmtLoc = time.FixedZone("GMT", 0)
// NowRFC1123 returns now time in RFC1123 format with GMT timezone,
// eg. "Mon, 02 Jan 2006 15:04:05 GMT".
func nowRFC1123() string {
return time.Now().In(gmtLoc).Format(time.RFC1123)
}
// signature calculates a request's signature digest.
func signature(project *LogProject, method, uri string,
headers map[string]string) (digest string, err error) {
var contentMD5, contentType, date, canoHeaders, canoResource string
var slsHeaderKeys sort.StringSlice
// SignString = VERB + "\n"
// + CONTENT-MD5 + "\n"
// + CONTENT-TYPE + "\n"
// + DATE + "\n"
// + CanonicalizedSLSHeaders + "\n"
// + CanonicalizedResource
if val, ok := headers["Content-MD5"]; ok {
contentMD5 = val
}
if val, ok := headers["Content-Type"]; ok {
contentType = val
}
date, ok := headers["Date"]
if !ok {
err = fmt.Errorf("Can't find 'Date' header")
return
}
// Calc CanonicalizedSLSHeaders
slsHeaders := make(map[string]string, len(headers))
for k, v := range headers {
l := strings.TrimSpace(strings.ToLower(k))
if strings.HasPrefix(l, "x-sls-") {
slsHeaders[l] = strings.TrimSpace(v)
slsHeaderKeys = append(slsHeaderKeys, l)
}
}
sort.Sort(slsHeaderKeys)
for i, k := range slsHeaderKeys {
canoHeaders += k + ":" + slsHeaders[k]
if i+1 < len(slsHeaderKeys) {
canoHeaders += "\n"
}
}
// Calc CanonicalizedResource
u, err := url.Parse(uri)
if err != nil {
return
}
canoResource += url.QueryEscape(u.Path)
if u.RawQuery != "" {
var keys sort.StringSlice
vals := u.Query()
for k := range vals {
keys = append(keys, k)
}
sort.Sort(keys)
canoResource += "?"
for i, k := range keys {
if i > 0 {
canoResource += "&"
}
for _, v := range vals[k] {
canoResource += k + "=" + v
}
}
}
signStr := method + "\n" +
contentMD5 + "\n" +
contentType + "\n" +
date + "\n" +
canoHeaders + "\n" +
canoResource
// Signature = base64(hmac-sha1(UTF8-Encoding-Of(SignString)AccessKeySecret))
mac := hmac.New(sha1.New, []byte(project.AccessKeySecret))
_, err = mac.Write([]byte(signStr))
if err != nil {
return
}
digest = base64.StdEncoding.EncodeToString(mac.Sum(nil))
return
}

47
logs/alils/sys.go Normal file
View File

@ -0,0 +1,47 @@
package alils
import (
"fmt"
"net"
"os"
)
func GetCurrentInterface() *net.Interface {
if inter, err := net.InterfaceByName("eth0"); err == nil {
return inter
} else if inter, err := net.InterfaceByName("en0"); err == nil {
return inter
}
return nil
}
func GetCurrentInterfaceHardwareAddr() string {
if inter := GetCurrentInterface(); inter != nil {
return fmt.Sprintf("%s", inter.HardwareAddr)
}
return ""
}
func GetCurrentInterfaceAddrs() string {
if inter := GetCurrentInterface(); inter != nil {
if addrs, err := inter.Addrs(); err == nil {
return fmt.Sprintf("%s", addrs)
}
}
return ""
}
func GetHostname() string {
hostname, _ := os.Hostname()
return hostname
}
func GetUserHomename() string {
homedir, _ := os.UserHomeDir()
return homedir
}
func Getwd() string {
wd, _ := os.Getwd()
return wd
}

View File

@ -80,7 +80,7 @@ func (el *esLogger) WriteMsg(when time.Time, msg string, level int, lable string
vals["hostname"] = GetHostname() vals["hostname"] = GetHostname()
vals["working_idr"] = Getwd() vals["working_idr"] = Getwd()
vals["home_dir"] = GetUserHomename() vals["home_dir"] = GetUserHomename()
vals["hardware_addr"] = GetCurrentInterface().HardwareAddr vals["hardware_addr"] = GetCurrentInterfaceHardwareAddr()
vals["client_addrs"] = GetCurrentInterfaceAddrs() vals["client_addrs"] = GetCurrentInterfaceAddrs()
if el.IndexName == "" { if el.IndexName == "" {
@ -89,7 +89,7 @@ func (el *esLogger) WriteMsg(when time.Time, msg string, level int, lable string
d := goes.Document{ d := goes.Document{
Index: fmt.Sprintf("%s-%04d.%02d.%02d", el.IndexName, when.Year(), when.Month(), when.Day()), Index: fmt.Sprintf("%s-%04d.%02d.%02d", el.IndexName, when.Year(), when.Month(), when.Day()),
Type: "logs", Type: "_doc",
Fields: vals, Fields: vals,
} }
_, err := el.Index(d, nil) _, err := el.Index(d, nil)

View File

@ -15,6 +15,13 @@ func GetCurrentInterface() *net.Interface {
return nil return nil
} }
func GetCurrentInterfaceHardwareAddr() string {
if inter := GetCurrentInterface(); inter != nil {
return fmt.Sprintf("%s", inter.HardwareAddr)
}
return ""
}
func GetCurrentInterfaceAddrs() string { func GetCurrentInterfaceAddrs() string {
if inter := GetCurrentInterface(); inter != nil { if inter := GetCurrentInterface(); inter != nil {
if addrs, err := inter.Addrs(); err == nil { if addrs, err := inter.Addrs(); err == nil {

View File

@ -281,7 +281,7 @@ func testFileRotate(t *testing.T, fn1, fn2 string, daily, hourly bool) {
fw.hourlyOpenDate = fw.hourlyOpenTime.Day() fw.hourlyOpenDate = fw.hourlyOpenTime.Day()
} }
fw.WriteMsg(time.Now(), "this is a msg for test", LevelDebug) fw.WriteMsg(time.Now(), "this is a msg for test", LevelDebug, "", "dev")
for _, file := range []string{fn1, fn2} { for _, file := range []string{fn1, fn2} {
_, err := os.Stat(file) _, err := os.Stat(file)

View File

@ -4,6 +4,11 @@ type Level int
type Adapter string type Adapter string
type AdapterTupple struct {
Level Level
Adapter Adapter
}
const ( const (
_ Level = iota _ Level = iota
LevelNone Level = iota LevelNone Level = iota
@ -21,4 +26,11 @@ const (
AdapterConsole Adapter = "console" AdapterConsole Adapter = "console"
AdapterSocket Adapter = "socket" AdapterSocket Adapter = "socket"
AdapterElasticSearch Adapter = "es" AdapterElasticSearch Adapter = "es"
AdapterAliLs Adapter = "alils"
)
const (
DEV = "dev"
GRAY = "gray"
PROD = "prod"
) )