Browse Source

接入阿里sls

tags/v1.3.0
liangzy 4 years ago
parent
commit
27ff5fb3db
15 changed files with 156 additions and 2598 deletions
  1. +17
    -0
      config.go
  2. +17
    -0
      funtion.go
  3. +8
    -3
      log_test.go
  4. +41
    -148
      logs/alils/alils.go
  5. +25
    -0
      logs/alils/callback.go
  6. +0
    -13
      logs/alils/config.go
  7. +0
    -1038
      logs/alils/log.pb.go
  8. +0
    -42
      logs/alils/log_config.go
  9. +0
    -819
      logs/alils/log_project.go
  10. +0
    -271
      logs/alils/log_store.go
  11. +0
    -91
      logs/alils/machine_group.go
  12. +0
    -62
      logs/alils/request.go
  13. +0
    -111
      logs/alils/signature.go
  14. +47
    -0
      logs/alils/sys.go
  15. +1
    -0
      options.go

+ 17
- 0
config.go View File

@@ -54,3 +54,20 @@ func (c *EsConfig) String() string {
b, _ := json.Marshal(c)
return bytes.NewBuffer(b).String()
}

type AliLSConfig struct {
Project string `json:"project"`
Endpoint string `json:"endpoint"`
KeyID string `json:"key_id"`
KeySecret string `json:"key_secret"`
LogStore string `json:"log_store"`
Topics []string `json:"topics"`
Source string `json:"source"`
Level Level `json:"level"`
FlushWhen int `json:"flush_when"`
}

func (c *AliLSConfig) String() string {
b, _ := json.Marshal(c)
return bytes.NewBuffer(b).String()
}

+ 17
- 0
funtion.go View File

@@ -49,4 +49,21 @@ var adatperMapper = map[Adapter]func(l *Logger, level Level) error{

return l.SetLogger(logs.AdapterEs, c.String())
},

AdapterAliLs: func(l *Logger, level Level) error {

c := AliLSConfig{
Project: "gaore-app-logstore",
Endpoint: "gaore-app-logstore.cn-shenzhen.log.aliyuncs.com",
KeyID: "LTAI4GCHwcqtrFD4DHRHxR4k",
KeySecret: "Ln19xfVYy6OMlJeF9aBvFl4fhRUKBl",
LogStore: "gaore-app-logstore",
Topics: nil,
Source: "",
Level: level,
FlushWhen: 0,
}

return l.SetLogger(logs.AdapterAliLS, c.String())
},
}

+ 8
- 3
log_test.go View File

@@ -1,7 +1,7 @@
package grlogs

import (
"fmt"
_ "golib.gaore.com/GaoreGo/grlogs/logs/alils"
_ "golib.gaore.com/GaoreGo/grlogs/logs/es"
"testing"
"time"
@@ -32,6 +32,11 @@ func TestGetLogger(t *testing.T) {
Get("wifi").Critical("neoweiwoewe")
}

func TestGetEs(t *testing.T) {
fmt.Println("hello world")
func TestGetAliLs(t *testing.T) {
l := Get("es").SetAdapter(LevelAll, AdapterAliLs)
l.Info("endport")
l.Info("endport")
l.Info("endport")
l.Info("endport")
time.Sleep(time.Millisecond * 500)
}

+ 41
- 148
logs/alils/alils.go View File

@@ -2,23 +2,22 @@ package alils

import (
"encoding/json"
"strings"
"sync"
"time"

"github.com/gogo/protobuf/proto"
"fmt"
"github.com/aliyun/aliyun-log-go-sdk/producer"
"golib.gaore.com/GaoreGo/grlogs/logs"
"time"
)

const (
// CacheSize set the flush size
CacheSize int = 64
// Delimiter define the topic delimiter
Delimiter string = "##"
)
func NewAliLS() logs.Logger {
cw := &alilsLogger{
Level: logs.LevelDebug,
}
return cw
}

// Config is the Config for Ali Log
type Config struct {
type alilsLogger struct {
producer *producer.Producer
callback *Callback
Project string `json:"project"`
Endpoint string `json:"endpoint"`
KeyID string `json:"key_id"`
@@ -30,155 +29,49 @@ type Config struct {
FlushWhen int `json:"flush_when"`
}

// aliLSWriter implements LoggerInterface.
// it writes messages in keep-live tcp connection.
type aliLSWriter struct {
store *LogStore
group []*LogGroup
withMap bool
groupMap map[string]*LogGroup
lock *sync.Mutex
Config
}

// NewAliLS create a new Logger
func NewAliLS() logs.Logger {
alils := new(aliLSWriter)
alils.Level = logs.LevelTrace
return alils
}

// Init parse config and init struct
func (c *aliLSWriter) Init(jsonConfig string) (err error) {
func (a *alilsLogger) Init(jsonconfig string) error {

json.Unmarshal([]byte(jsonConfig), c)

if c.FlushWhen > CacheSize {
c.FlushWhen = CacheSize
}

prj := &LogProject{
Name: c.Project,
Endpoint: c.Endpoint,
AccessKeyID: c.KeyID,
AccessKeySecret: c.KeySecret,
}

c.store, err = prj.GetLogStore(c.LogStore)
err := json.Unmarshal([]byte(jsonconfig), a)
if err != nil {
return err
}

// Create default Log Group
c.group = append(c.group, &LogGroup{
Topic: proto.String(""),
Source: proto.String(c.Source),
Logs: make([]*Log, 0, c.FlushWhen),
})

// Create other Log Group
c.groupMap = make(map[string]*LogGroup)
for _, topic := range c.Topics {

lg := &LogGroup{
Topic: proto.String(topic),
Source: proto.String(c.Source),
Logs: make([]*Log, 0, c.FlushWhen),
}

c.group = append(c.group, lg)
c.groupMap[topic] = lg
}

if len(c.group) == 1 {
c.withMap = false
} else {
c.withMap = true
}

c.lock = &sync.Mutex{}
producerConfig := producer.GetDefaultProducerConfig()
producerConfig.Endpoint = a.Endpoint
producerConfig.AccessKeyID = a.KeyID
producerConfig.AccessKeySecret = a.KeySecret
producerConfig.LingerMs = 100
a.producer = producer.InitProducer(producerConfig)
a.callback = &Callback{}
a.producer.Start()

return nil
}

// WriteMsg write message in connection.
// if connection is down, try to re-connect.
func (c *aliLSWriter) WriteMsg(when time.Time, msg string, level int, lable string, env string) (err error) {

if level > c.Level {
return nil
}

var topic string
var content string
var lg *LogGroup
if c.withMap {

// Topic,LogGroup
strs := strings.SplitN(msg, Delimiter, 2)
if len(strs) == 2 {
pos := strings.LastIndex(strs[0], " ")
topic = strs[0][pos+1 : len(strs[0])]
content = strs[0][0:pos] + strs[1]
lg = c.groupMap[topic]
}

// send to empty Topic
if lg == nil {
content = msg
lg = c.group[0]
}
} else {
content = msg
lg = c.group[0]
}

c1 := &LogContent{
Key: proto.String("msg"),
Value: proto.String(content),
}

l := &Log{
Time: proto.Uint32(uint32(when.Unix())),
Contents: []*LogContent{
c1,
},
}

c.lock.Lock()
lg.Logs = append(lg.Logs, l)
c.lock.Unlock()

if len(lg.Logs) >= c.FlushWhen {
c.flush(lg)
}

return nil
func (a *alilsLogger) WriteMsg(when time.Time, msg string, level int, lable string, env string) error {

vals := map[string]string{"msg": msg, "level": fmt.Sprintf("%d", level)}
vals["level_string"] = logs.GetLevelString(level)
vals["env"] = env
vals["lable"] = lable
vals["hostname"] = GetHostname()
vals["working_idr"] = Getwd()
vals["home_dir"] = GetUserHomename()
vals["hardware_addr"] = GetCurrentInterfaceHardwareAddr()
vals["client_addrs"] = GetCurrentInterfaceAddrs()

log := producer.GenerateLog(uint32(when.Unix()), vals)
err := a.producer.SendLog(a.Project, a.LogStore, "topic", "127.0.0.1", log)
return err
}

// Flush implementing method. empty.
func (c *aliLSWriter) Flush() {

// flush all group
for _, lg := range c.group {
c.flush(lg)
}
func (a *alilsLogger) Destroy() {
a.producer.Close(60)
a.producer.SafeClose()
}

// Destroy destroy connection writer and close tcp listener.
func (c *aliLSWriter) Destroy() {
}

func (c *aliLSWriter) flush(lg *LogGroup) {

c.lock.Lock()
defer c.lock.Unlock()
err := c.store.PutLogs(lg)
if err != nil {
return
}
func (a *alilsLogger) Flush() {

lg.Logs = make([]*Log, 0, c.FlushWhen)
}

func init() {


+ 25
- 0
logs/alils/callback.go View File

@@ -0,0 +1,25 @@
package alils

import (
"fmt"
"github.com/aliyun/aliyun-log-go-sdk/producer"
)

type Callback struct {
}

func (callback *Callback) Success(result *producer.Result) {
attemptList := result.GetReservedAttempts()
for _, attempt := range attemptList {
fmt.Printf("%+v \n", attempt)
}
}

func (callback *Callback) Fail(result *producer.Result) {
fmt.Println(result.IsSuccessful())
fmt.Println(result.GetErrorCode())
fmt.Println(result.GetErrorMessage())
fmt.Println(result.GetReservedAttempts())
fmt.Println(result.GetRequestId())
fmt.Println(result.GetTimeStampMs())
}

+ 0
- 13
logs/alils/config.go View File

@@ -1,13 +0,0 @@
package alils

const (
version = "0.5.0" // SDK version
signatureMethod = "hmac-sha1" // Signature method

// OffsetNewest stands for the log head offset, i.e. the offset that will be
// assigned to the next message that will be produced to the shard.
OffsetNewest = "end"
// OffsetOldest stands for the oldest offset available on the logstore for a
// shard.
OffsetOldest = "begin"
)

+ 0
- 1038
logs/alils/log.pb.go
File diff suppressed because it is too large
View File


+ 0
- 42
logs/alils/log_config.go View File

@@ -1,42 +0,0 @@
package alils

// InputDetail define log detail
type InputDetail struct {
LogType string `json:"logType"`
LogPath string `json:"logPath"`
FilePattern string `json:"filePattern"`
LocalStorage bool `json:"localStorage"`
TimeFormat string `json:"timeFormat"`
LogBeginRegex string `json:"logBeginRegex"`
Regex string `json:"regex"`
Keys []string `json:"key"`
FilterKeys []string `json:"filterKey"`
FilterRegex []string `json:"filterRegex"`
TopicFormat string `json:"topicFormat"`
}

// OutputDetail define the output detail
type OutputDetail struct {
Endpoint string `json:"endpoint"`
LogStoreName string `json:"logstoreName"`
}

// LogConfig define Log Config
type LogConfig struct {
Name string `json:"configName"`
InputType string `json:"inputType"`
InputDetail InputDetail `json:"inputDetail"`
OutputType string `json:"outputType"`
OutputDetail OutputDetail `json:"outputDetail"`

CreateTime uint32
LastModifyTime uint32

project *LogProject
}

// GetAppliedMachineGroup returns applied machine group of this config.
func (c *LogConfig) GetAppliedMachineGroup(confName string) (groupNames []string, err error) {
groupNames, err = c.project.GetAppliedMachineGroups(c.Name)
return
}

+ 0
- 819
logs/alils/log_project.go View File

@@ -1,819 +0,0 @@
/*
Package alils implements the SDK(v0.5.0) of Simple Log Service(abbr. SLS).

For more description about SLS, please read this article:
http://gitlab.alibaba-inc.com/sls/doc.
*/
package alils

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
)

// Error message in SLS HTTP response.
type errorMessage struct {
Code string `json:"errorCode"`
Message string `json:"errorMessage"`
}

// LogProject Define the Ali Project detail
type LogProject struct {
Name string // Project name
Endpoint string // IP or hostname of SLS endpoint
AccessKeyID string
AccessKeySecret string
}

// NewLogProject creates a new SLS project.
func NewLogProject(name, endpoint, AccessKeyID, accessKeySecret string) (p *LogProject, err error) {
p = &LogProject{
Name: name,
Endpoint: endpoint,
AccessKeyID: AccessKeyID,
AccessKeySecret: accessKeySecret,
}
return p, nil
}

// ListLogStore returns all logstore names of project p.
func (p *LogProject) ListLogStore() (storeNames []string, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

uri := fmt.Sprintf("/logstores")
r, err := request(p, "GET", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to list logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

type Body struct {
Count int
LogStores []string
}
body := &Body{}

err = json.Unmarshal(buf, body)
if err != nil {
return
}

storeNames = body.LogStores

return
}

// GetLogStore returns logstore according by logstore name.
func (p *LogProject) GetLogStore(name string) (s *LogStore, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

r, err := request(p, "GET", "/logstores/"+name, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

s = &LogStore{}
err = json.Unmarshal(buf, s)
if err != nil {
return
}
s.project = p
return
}

// CreateLogStore creates a new logstore in SLS,
// where name is logstore name,
// and ttl is time-to-live(in day) of logs,
// and shardCnt is the number of shards.
func (p *LogProject) CreateLogStore(name string, ttl, shardCnt int) (err error) {

type Body struct {
Name string `json:"logstoreName"`
TTL int `json:"ttl"`
ShardCount int `json:"shardCount"`
}

store := &Body{
Name: name,
TTL: ttl,
ShardCount: shardCnt,
}

body, err := json.Marshal(store)
if err != nil {
return
}

h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}

r, err := request(p, "POST", "/logstores", h, body)
if err != nil {
return
}

body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to create logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

return
}

// DeleteLogStore deletes a logstore according by logstore name.
func (p *LogProject) DeleteLogStore(name string) (err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

r, err := request(p, "DELETE", "/logstores/"+name, h, nil)
if err != nil {
return
}

body, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to delete logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}

// UpdateLogStore updates a logstore according by logstore name,
// obviously we can't modify the logstore name itself.
func (p *LogProject) UpdateLogStore(name string, ttl, shardCnt int) (err error) {

type Body struct {
Name string `json:"logstoreName"`
TTL int `json:"ttl"`
ShardCount int `json:"shardCount"`
}

store := &Body{
Name: name,
TTL: ttl,
ShardCount: shardCnt,
}

body, err := json.Marshal(store)
if err != nil {
return
}

h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}

r, err := request(p, "PUT", "/logstores", h, body)
if err != nil {
return
}

body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to update logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

return
}

// ListMachineGroup returns machine group name list and the total number of machine groups.
// The offset starts from 0 and the size is the max number of machine groups could be returned.
func (p *LogProject) ListMachineGroup(offset, size int) (m []string, total int, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

if size <= 0 {
size = 500
}

uri := fmt.Sprintf("/machinegroups?offset=%v&size=%v", offset, size)
r, err := request(p, "GET", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to list machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

type Body struct {
MachineGroups []string
Count int
Total int
}
body := &Body{}

err = json.Unmarshal(buf, body)
if err != nil {
return
}

m = body.MachineGroups
total = body.Total

return
}

// GetMachineGroup retruns machine group according by machine group name.
func (p *LogProject) GetMachineGroup(name string) (m *MachineGroup, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

r, err := request(p, "GET", "/machinegroups/"+name, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get machine group:%v", name)
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

m = &MachineGroup{}
err = json.Unmarshal(buf, m)
if err != nil {
return
}
m.project = p
return
}

// CreateMachineGroup creates a new machine group in SLS.
func (p *LogProject) CreateMachineGroup(m *MachineGroup) (err error) {

body, err := json.Marshal(m)
if err != nil {
return
}

h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}

r, err := request(p, "POST", "/machinegroups", h, body)
if err != nil {
return
}

body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to create machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

return
}

// UpdateMachineGroup updates a machine group.
func (p *LogProject) UpdateMachineGroup(m *MachineGroup) (err error) {

body, err := json.Marshal(m)
if err != nil {
return
}

h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}

r, err := request(p, "PUT", "/machinegroups/"+m.Name, h, body)
if err != nil {
return
}

body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to update machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

return
}

// DeleteMachineGroup deletes machine group according machine group name.
func (p *LogProject) DeleteMachineGroup(name string) (err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

r, err := request(p, "DELETE", "/machinegroups/"+name, h, nil)
if err != nil {
return
}

body, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to delete machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}

// ListConfig returns config names list and the total number of configs.
// The offset starts from 0 and the size is the max number of configs could be returned.
func (p *LogProject) ListConfig(offset, size int) (cfgNames []string, total int, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

if size <= 0 {
size = 100
}

uri := fmt.Sprintf("/configs?offset=%v&size=%v", offset, size)
r, err := request(p, "GET", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to delete machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

type Body struct {
Total int
Configs []string
}
body := &Body{}

err = json.Unmarshal(buf, body)
if err != nil {
return
}

cfgNames = body.Configs
total = body.Total
return
}

// GetConfig returns config according by config name.
func (p *LogProject) GetConfig(name string) (c *LogConfig, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

r, err := request(p, "GET", "/configs/"+name, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to delete config")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

c = &LogConfig{}
err = json.Unmarshal(buf, c)
if err != nil {
return
}
c.project = p
return
}

// UpdateConfig updates a config.
func (p *LogProject) UpdateConfig(c *LogConfig) (err error) {

body, err := json.Marshal(c)
if err != nil {
return
}

h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}

r, err := request(p, "PUT", "/configs/"+c.Name, h, body)
if err != nil {
return
}

body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to update config")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

return
}

// CreateConfig creates a new config in SLS.
func (p *LogProject) CreateConfig(c *LogConfig) (err error) {

body, err := json.Marshal(c)
if err != nil {
return
}

h := map[string]string{
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}

r, err := request(p, "POST", "/configs", h, body)
if err != nil {
return
}

body, err = ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to update config")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

return
}

// DeleteConfig deletes a config according by config name.
func (p *LogProject) DeleteConfig(name string) (err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

r, err := request(p, "DELETE", "/configs/"+name, h, nil)
if err != nil {
return
}

body, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(body, errMsg)
if err != nil {
err = fmt.Errorf("failed to delete config")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}

// GetAppliedMachineGroups returns applied machine group names list according config name.
func (p *LogProject) GetAppliedMachineGroups(confName string) (groupNames []string, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

uri := fmt.Sprintf("/configs/%v/machinegroups", confName)
r, err := request(p, "GET", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get applied machine groups")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

type Body struct {
Count int
Machinegroups []string
}

body := &Body{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}

groupNames = body.Machinegroups
return
}

// GetAppliedConfigs returns applied config names list according machine group name groupName.
func (p *LogProject) GetAppliedConfigs(groupName string) (confNames []string, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

uri := fmt.Sprintf("/machinegroups/%v/configs", groupName)
r, err := request(p, "GET", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to applied configs")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

type Cfg struct {
Count int `json:"count"`
Configs []string `json:"configs"`
}

body := &Cfg{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}

confNames = body.Configs
return
}

// ApplyConfigToMachineGroup applies config to machine group.
func (p *LogProject) ApplyConfigToMachineGroup(confName, groupName string) (err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

uri := fmt.Sprintf("/machinegroups/%v/configs/%v", groupName, confName)
r, err := request(p, "PUT", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to apply config to machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}

// RemoveConfigFromMachineGroup removes config from machine group.
func (p *LogProject) RemoveConfigFromMachineGroup(confName, groupName string) (err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

uri := fmt.Sprintf("/machinegroups/%v/configs/%v", groupName, confName)
r, err := request(p, "DELETE", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to remove config from machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Printf("%s\n", dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}

+ 0
- 271
logs/alils/log_store.go View File

@@ -1,271 +0,0 @@
package alils

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
"strconv"

lz4 "github.com/cloudflare/golz4"
"github.com/gogo/protobuf/proto"
)

// LogStore Store the logs
type LogStore struct {
Name string `json:"logstoreName"`
TTL int
ShardCount int

CreateTime uint32
LastModifyTime uint32

project *LogProject
}

// Shard define the Log Shard
type Shard struct {
ShardID int `json:"shardID"`
}

// ListShards returns shard id list of this logstore.
func (s *LogStore) ListShards() (shardIDs []int, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

uri := fmt.Sprintf("/logstores/%v/shards", s.Name)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to list logstore")
dump, _ := httputil.DumpResponse(r, true)
fmt.Println(dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

var shards []*Shard
err = json.Unmarshal(buf, &shards)
if err != nil {
return
}

for _, v := range shards {
shardIDs = append(shardIDs, v.ShardID)
}
return
}

// PutLogs put logs into logstore.
// The callers should transform user logs into LogGroup.
func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
body, err := proto.Marshal(lg)
if err != nil {
return
}

// Compresse body with lz4
out := make([]byte, lz4.CompressBound(body))
n, err := lz4.Compress(body, out)
if err != nil {
return
}

h := map[string]string{
"x-sls-compresstype": "lz4",
"x-sls-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/x-protobuf",
}

uri := fmt.Sprintf("/logstores/%v", s.Name)
r, err := request(s.project, "POST", uri, h, out[:n])
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to put logs")
dump, _ := httputil.DumpResponse(r, true)
fmt.Println(dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}
return
}

// GetCursor gets log cursor of one shard specified by shardID.
// The from can be in three form: a) unix timestamp in seccond, b) "begin", c) "end".
// For more detail please read: http://gitlab.alibaba-inc.com/sls/doc/blob/master/api/shard.md#logstore
func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

uri := fmt.Sprintf("/logstores/%v/shards/%v?type=cursor&from=%v",
s.Name, shardID, from)

r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get cursor")
dump, _ := httputil.DumpResponse(r, true)
fmt.Println(dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

type Body struct {
Cursor string
}
body := &Body{}

err = json.Unmarshal(buf, body)
if err != nil {
return
}
cursor = body.Cursor
return
}

// GetLogsBytes gets logs binary data from shard specified by shardID according cursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogsBytes(shardID int, cursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {

h := map[string]string{
"x-sls-bodyrawsize": "0",
"Accept": "application/x-protobuf",
"Accept-Encoding": "lz4",
}

uri := fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
s.Name, shardID, cursor, logGroupMaxCount)

r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to get cursor")
dump, _ := httputil.DumpResponse(r, true)
fmt.Println(dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

v, ok := r.Header["X-Sls-Compresstype"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-sls-compresstype' header")
return
}
if v[0] != "lz4" {
err = fmt.Errorf("unexpected compress type:%v", v[0])
return
}

v, ok = r.Header["X-Sls-Cursor"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-sls-cursor' header")
return
}
nextCursor = v[0]

v, ok = r.Header["X-Sls-Bodyrawsize"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-sls-bodyrawsize' header")
return
}
bodyRawSize, err := strconv.Atoi(v[0])
if err != nil {
return
}

out = make([]byte, bodyRawSize)
err = lz4.Uncompress(buf, out)
if err != nil {
return
}

return
}

// LogsBytesDecode decodes logs binary data retruned by GetLogsBytes API
func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {

gl = &LogGroupList{}
err = proto.Unmarshal(data, gl)
if err != nil {
return
}

return
}

// GetLogs gets logs from shard specified by shardID according cursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogs(shardID int, cursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {

out, nextCursor, err := s.GetLogsBytes(shardID, cursor, logGroupMaxCount)
if err != nil {
return
}

gl, err = LogsBytesDecode(out)
if err != nil {
return
}

return
}

+ 0
- 91
logs/alils/machine_group.go View File

@@ -1,91 +0,0 @@
package alils

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
)

// MachineGroupAttribute define the Attribute
type MachineGroupAttribute struct {
ExternalName string `json:"externalName"`
TopicName string `json:"groupTopic"`
}

// MachineGroup define the machine Group
type MachineGroup struct {
Name string `json:"groupName"`
Type string `json:"groupType"`
MachineIDType string `json:"machineIdentifyType"`
MachineIDList []string `json:"machineList"`

Attribute MachineGroupAttribute `json:"groupAttribute"`

CreateTime uint32
LastModifyTime uint32

project *LogProject
}

// Machine define the Machine
type Machine struct {
IP string
UniqueID string `json:"machine-uniqueid"`
UserdefinedID string `json:"userdefined-id"`
}

// MachineList define the Machine List
type MachineList struct {
Total int
Machines []*Machine
}

// ListMachines returns machine list of this machine group.
func (m *MachineGroup) ListMachines() (ms []*Machine, total int, err error) {
h := map[string]string{
"x-sls-bodyrawsize": "0",
}

uri := fmt.Sprintf("/machinegroups/%v/machines", m.Name)
r, err := request(m.project, "GET", uri, h, nil)
if err != nil {
return
}

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return
}

if r.StatusCode != http.StatusOK {
errMsg := &errorMessage{}
err = json.Unmarshal(buf, errMsg)
if err != nil {
err = fmt.Errorf("failed to remove config from machine group")
dump, _ := httputil.DumpResponse(r, true)
fmt.Println(dump)
return
}
err = fmt.Errorf("%v:%v", errMsg.Code, errMsg.Message)
return
}

body := &MachineList{}
err = json.Unmarshal(buf, body)
if err != nil {
return
}

ms = body.Machines
total = body.Total

return
}

// GetAppliedConfigs returns applied configs of this machine group.
func (m *MachineGroup) GetAppliedConfigs() (confNames []string, err error) {
confNames, err = m.project.GetAppliedConfigs(m.Name)
return
}

+ 0
- 62
logs/alils/request.go View File

@@ -1,62 +0,0 @@
package alils

import (
"bytes"
"crypto/md5"
"fmt"
"net/http"
)

// request sends a request to SLS.
func request(project *LogProject, method, uri string, headers map[string]string,
body []byte) (resp *http.Response, err error) {

// The caller should provide 'x-sls-bodyrawsize' header
if _, ok := headers["x-sls-bodyrawsize"]; !ok {
err = fmt.Errorf("Can't find 'x-sls-bodyrawsize' header")
return
}

// SLS public request headers
headers["Host"] = project.Name + "." + project.Endpoint
headers["Date"] = nowRFC1123()
headers["x-sls-apiversion"] = version
headers["x-sls-signaturemethod"] = signatureMethod
if body != nil {
bodyMD5 := fmt.Sprintf("%X", md5.Sum(body))
headers["Content-MD5"] = bodyMD5

if _, ok := headers["Content-Type"]; !ok {
err = fmt.Errorf("Can't find 'Content-Type' header")
return
}
}

// Calc Authorization
// Authorization = "SLS <AccessKeyID>:<Signature>"
digest, err := signature(project, method, uri, headers)
if err != nil {
return
}
auth := fmt.Sprintf("SLS %v:%v", project.AccessKeyID, digest)
headers["Authorization"] = auth

// Initialize http request
reader := bytes.NewReader(body)
urlStr := fmt.Sprintf("http://%v.%v%v", project.Name, project.Endpoint, uri)
req, err := http.NewRequest(method, urlStr, reader)
if err != nil {
return
}
for k, v := range headers {
req.Header.Add(k, v)
}

// Get ready to do request
resp, err = http.DefaultClient.Do(req)
if err != nil {
return
}

return
}

+ 0
- 111
logs/alils/signature.go View File

@@ -1,111 +0,0 @@
package alils

import (
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"net/url"
"sort"
"strings"
"time"
)

// GMT location
var gmtLoc = time.FixedZone("GMT", 0)

// NowRFC1123 returns now time in RFC1123 format with GMT timezone,
// eg. "Mon, 02 Jan 2006 15:04:05 GMT".
func nowRFC1123() string {
return time.Now().In(gmtLoc).Format(time.RFC1123)
}

// signature calculates a request's signature digest.
func signature(project *LogProject, method, uri string,
headers map[string]string) (digest string, err error) {
var contentMD5, contentType, date, canoHeaders, canoResource string
var slsHeaderKeys sort.StringSlice

// SignString = VERB + "\n"
// + CONTENT-MD5 + "\n"
// + CONTENT-TYPE + "\n"
// + DATE + "\n"
// + CanonicalizedSLSHeaders + "\n"
// + CanonicalizedResource

if val, ok := headers["Content-MD5"]; ok {
contentMD5 = val
}

if val, ok := headers["Content-Type"]; ok {
contentType = val
}

date, ok := headers["Date"]
if !ok {
err = fmt.Errorf("Can't find 'Date' header")
return
}

// Calc CanonicalizedSLSHeaders
slsHeaders := make(map[string]string, len(headers))
for k, v := range headers {
l := strings.TrimSpace(strings.ToLower(k))
if strings.HasPrefix(l, "x-sls-") {
slsHeaders[l] = strings.TrimSpace(v)
slsHeaderKeys = append(slsHeaderKeys, l)
}
}

sort.Sort(slsHeaderKeys)
for i, k := range slsHeaderKeys {
canoHeaders += k + ":" + slsHeaders[k]
if i+1 < len(slsHeaderKeys) {
canoHeaders += "\n"
}
}

// Calc CanonicalizedResource
u, err := url.Parse(uri)
if err != nil {
return
}

canoResource += url.QueryEscape(u.Path)
if u.RawQuery != "" {
var keys sort.StringSlice

vals := u.Query()
for k := range vals {
keys = append(keys, k)
}

sort.Sort(keys)
canoResource += "?"
for i, k := range keys {
if i > 0 {
canoResource += "&"
}

for _, v := range vals[k] {
canoResource += k + "=" + v
}
}
}

signStr := method + "\n" +
contentMD5 + "\n" +
contentType + "\n" +
date + "\n" +
canoHeaders + "\n" +
canoResource

// Signature = base64(hmac-sha1(UTF8-Encoding-Of(SignString),AccessKeySecret))
mac := hmac.New(sha1.New, []byte(project.AccessKeySecret))
_, err = mac.Write([]byte(signStr))
if err != nil {
return
}
digest = base64.StdEncoding.EncodeToString(mac.Sum(nil))
return
}

+ 47
- 0
logs/alils/sys.go View File

@@ -0,0 +1,47 @@
package alils

import (
"fmt"
"net"
"os"
)

func GetCurrentInterface() *net.Interface {
if inter, err := net.InterfaceByName("eth0"); err == nil {
return inter
} else if inter, err := net.InterfaceByName("en0"); err == nil {
return inter
}
return nil
}

func GetCurrentInterfaceHardwareAddr() string {
if inter := GetCurrentInterface(); inter != nil {
return fmt.Sprintf("%s", inter.HardwareAddr)
}
return ""
}

func GetCurrentInterfaceAddrs() string {
if inter := GetCurrentInterface(); inter != nil {
if addrs, err := inter.Addrs(); err == nil {
return fmt.Sprintf("%s", addrs)
}
}
return ""
}

func GetHostname() string {
hostname, _ := os.Hostname()
return hostname
}

func GetUserHomename() string {
homedir, _ := os.UserHomeDir()
return homedir
}

func Getwd() string {
wd, _ := os.Getwd()
return wd
}

+ 1
- 0
options.go View File

@@ -21,4 +21,5 @@ const (
AdapterConsole Adapter = "console"
AdapterSocket Adapter = "socket"
AdapterElasticSearch Adapter = "es"
AdapterAliLs Adapter = "alils"
)

Loading…
Cancel
Save