Просмотр исходного кода

a better documents for the client package

tags/0.2
Xing Xing 10 лет назад
Родитель
Сommit
bf25cc1728
14 измененных файлов: 177 добавлений и 206 удалений
  1. +87
    -117
      client/client.go
  2. +4
    -4
      client/client_test.go
  3. +43
    -42
      client/common.go
  4. +3
    -11
      client/error.go
  5. +5
    -0
      client/id.go
  6. +7
    -5
      client/pool.go
  7. +3
    -3
      client/pool_test.go
  8. +4
    -4
      client/request.go
  9. +13
    -13
      client/response.go
  10. Двоичные данные
      example/client/client
  11. +4
    -4
      example/client/client.go
  12. +1
    -1
      example/worker/worker.go
  13. +2
    -1
      worker/agent.go
  14. +1
    -1
      worker/worker_test.go

+ 87
- 117
client/client.go Просмотреть файл

@@ -1,66 +1,47 @@
// The client package helps developers connect to Gearmand, send
// jobs and fetch result.
package client

import (
"io"
"net"
"sync"
// "fmt"
)

/*
The client side api for gearman

usage:
c := client.New("tcp4", "127.0.0.1:4730")
handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)

*/
// One client connect to one server.
// Use Pool for multi-connections.
type Client struct {
sync.Mutex

net, addr, lastcall string
respHandler map[string]ResponseHandler
innerHandler map[string]ResponseHandler
in chan []byte
in chan *Response
isConn bool
conn net.Conn

ErrorHandler ErrorHandler
}

// Create a new client.
// Connect to "addr" through "network"
// Eg.
// client, err := client.New("127.0.0.1:4730")
func New(net, addr string) (client *Client, err error) {
// Return a client.
func New(network, addr string) (client *Client, err error) {
client = &Client{
net: net,
net: network,
addr: addr,
respHandler: make(map[string]ResponseHandler, QUEUE_SIZE),
innerHandler: make(map[string]ResponseHandler, QUEUE_SIZE),
in: make(chan []byte, QUEUE_SIZE),
respHandler: make(map[string]ResponseHandler, queueSize),
innerHandler: make(map[string]ResponseHandler, queueSize),
in: make(chan *Response, queueSize),
}
if err = client.connect(); err != nil {
return
}
go client.readLoop()
go client.processLoop()
return
}

// {{{ private functions

//
func (client *Client) connect() (err error) {
client.conn, err = net.Dial(client.net, client.addr)
if err != nil {
return
}
client.isConn = true
go client.readLoop()
go client.processLoop()
return
}

// Internal write
func (client *Client) write(req *request) (err error) {
var n int
buf := req.Encode()
@@ -73,61 +54,52 @@ func (client *Client) write(req *request) (err error) {
return
}

// read length bytes from the socket
func (client *Client) read(length int) (data []byte, err error) {
n := 0
buf := getBuffer(BUFFER_SIZE)
buf := getBuffer(bufferSize)
// read until data can be unpacked
for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n {
for i := length; i > 0 || len(data) < minPacketLength; i -= n {
if n, err = client.conn.Read(buf); err != nil {
if !client.isConn {
err = ErrConnClosed
return
}
if err == io.EOF && n == 0 {
if data == nil {
err = ErrConnection
}
if err == io.EOF {
err = ErrLostConn
}
return
}
data = append(data, buf[0:n]...)
if n < BUFFER_SIZE {
if n < bufferSize {
break
}
}
return
}

// read data from socket
func (client *Client) readLoop() {
var data []byte
defer close(client.in)
var data, leftdata []byte
var err error
for client.isConn {
if data, err = client.read(BUFFER_SIZE); err != nil {
if err == ErrConnClosed {
var resp *Response
for {
if data, err = client.read(bufferSize); err != nil {
client.err(err)
if err == ErrLostConn {
break
}
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the client should close the conection
// and reconnect to job server.
client.Close()
client.conn, err = net.Dial(client.net, client.addr)
if err != nil {
client.err(err)
break
}
client.err(err)
continue
}
client.in <- data
}
close(client.in)
}

// decode data & process it
func (client *Client) processLoop() {
var resp *Response
var l int
var err error
var data, leftdata []byte
for data = range client.in {
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
}
l = len(data)
if l < MIN_PACKET_LEN { // not enough data
l := len(data)
if l < minPacketLength { // not enough data
leftdata = data
continue
}
@@ -135,41 +107,43 @@ func (client *Client) processLoop() {
client.err(err)
continue
}
client.in <- resp
leftdata = nil
for resp != nil {
switch resp.DataType {
case ERROR:
if client.lastcall != "" {
resp = client.handleInner(client.lastcall, resp)
client.lastcall = ""
} else {
client.err(GetError(resp.Data))
}
case STATUS_RES:
resp = client.handleInner("s"+resp.Handle, resp)
case JOB_CREATED:
resp = client.handleInner("c", resp)
case ECHO_RES:
resp = client.handleInner("e", resp)
case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE,
WORK_FAIL, WORK_EXCEPTION:
resp = client.handleResponse(resp.Handle, resp)
}
}
if len(data) > l {
leftdata = data[l:]
}
}
}

// error handler
func (client *Client) processLoop() {
for resp := range client.in {
switch resp.DataType {
case dtError:
if client.lastcall != "" {
resp = client.handleInner(client.lastcall, resp)
client.lastcall = ""
} else {
client.err(getError(resp.Data))
}
case dtStatusRes:
resp = client.handleInner("s"+resp.Handle, resp)
case dtJobCreated:
resp = client.handleInner("c", resp)
case dtEchoRes:
resp = client.handleInner("e", resp)
case dtWorkData, dtWorkWarning, dtWorkStatus, dtWorkComplete,
dtWorkFail, dtWorkException:
resp = client.handleResponse(resp.Handle, resp)
}
}
}

func (client *Client) err(e error) {
if client.ErrorHandler != nil {
client.ErrorHandler(e)
}
}

// job handler
func (client *Client) handleResponse(key string, resp *Response) *Response {
if h, ok := client.respHandler[key]; ok {
h(resp)
@@ -179,7 +153,6 @@ func (client *Client) handleResponse(key string, resp *Response) *Response {
return resp
}

// job handler
func (client *Client) handleInner(key string, resp *Response) *Response {
if h, ok := client.innerHandler[key]; ok {
h(resp)
@@ -189,15 +162,14 @@ func (client *Client) handleInner(key string, resp *Response) *Response {
return resp
}

// Internal do
func (client *Client) do(funcname string, data []byte,
flag uint32) (handle string, err error) {
var mutex sync.Mutex
mutex.Lock()
client.lastcall = "c"
client.innerHandler["c"] = func(resp *Response) {
if resp.DataType == ERROR {
err = GetError(resp.Data)
if resp.DataType == dtError {
err = getError(resp.Data)
return
}
handle = resp.Handle
@@ -211,22 +183,18 @@ func (client *Client) do(funcname string, data []byte,
return
}

// }}}

// Do the function.
// funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (handle string, err error) {
var datatype uint32
switch flag {
case JOB_LOW:
datatype = SUBMIT_JOB_LOW
case JOB_HIGH:
datatype = SUBMIT_JOB_HIGH
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = SUBMIT_JOB
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype)
if h != nil {
@@ -235,27 +203,24 @@ func (client *Client) Do(funcname string, data []byte,
return
}

// Do the function at background.
// funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) {
var datatype uint32
switch flag {
case JOB_LOW:
datatype = SUBMIT_JOB_LOW_BG
case JOB_HIGH:
datatype = SUBMIT_JOB_HIGH_BG
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = SUBMIT_JOB_BG
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype)
return
}

// Get job status from job server.
// !!!Not fully tested.!!!
func (client *Client) Status(handle string) (status *Status, err error) {
var mutex sync.Mutex
mutex.Lock()
@@ -269,14 +234,14 @@ func (client *Client) Status(handle string) (status *Status, err error) {
mutex.Unlock()
}
req := getRequest()
req.DataType = GET_STATUS
req.DataType = dtGetStatus
req.Data = []byte(handle)
client.write(req)
mutex.Lock()
return
}

// Send a something out, get the samething back.
// Echo.
func (client *Client) Echo(data []byte) (echo []byte, err error) {
var mutex sync.Mutex
mutex.Lock()
@@ -285,7 +250,7 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) {
mutex.Unlock()
}
req := getRequest()
req.DataType = ECHO_REQ
req.DataType = dtEchoReq
req.Data = data
client.lastcall = "e"
client.write(req)
@@ -293,8 +258,13 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) {
return
}

// Close
// Close connection
func (client *Client) Close() (err error) {
client.isConn = false
return client.conn.Close()
client.Lock()
defer client.Unlock()
if client.conn != nil {
err = client.conn.Close()
client.conn = nil
}
return
}

+ 4
- 4
client/client_test.go Просмотреть файл

@@ -13,7 +13,7 @@ var client *Client
func TestClientAddServer(t *testing.T) {
t.Log("Add local server 127.0.0.1:4730")
var err error
if client, err = New("tcp4", "127.0.0.1:4730"); err != nil {
if client, err = New(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
client.ErrorHandler = func(e error) {
@@ -34,7 +34,7 @@ func TestClientEcho(t *testing.T) {
}

func TestClientDoBg(t *testing.T) {
handle, err := client.DoBg("ToUpper", []byte("abcdef"), JOB_LOW)
handle, err := client.DoBg("ToUpper", []byte("abcdef"), JobLow)
if err != nil {
t.Error(err)
return
@@ -57,7 +57,7 @@ func TestClientDo(t *testing.T) {
return
}
handle, err := client.Do("ToUpper", []byte("abcdef"),
JOB_LOW, jobHandler)
JobLow, jobHandler)
if err != nil {
t.Error(err)
return
@@ -84,7 +84,7 @@ func TestClientStatus(t *testing.T) {
return
}

handle, err := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil)
handle, err := client.Do("Delay5sec", []byte("abcdef"), JobLow, nil)
if err != nil {
t.Error(err)
return


+ 43
- 42
client/common.go Просмотреть файл

@@ -1,66 +1,67 @@
package client

const (
NETWORK = "tcp"
Network = "tcp"
// queue size
QUEUE_SIZE = 8
queueSize = 8
// read buffer size
BUFFER_SIZE = 1024
bufferSize = 1024
// min packet length
MIN_PACKET_LEN = 12
minPacketLength = 12

// \x00REQ
REQ = 5391697
REQ_STR = "\x00REQ"
req = 5391697
reqStr = "\x00REQ"
// \x00RES
RES = 5391699
RES_STR = "\x00RES"
res = 5391699
resStr = "\x00RES"

// package data type
CAN_DO = 0x1
CANT_DO = 0x2
RESET_ABILITIES = 0x3
PRE_SLEEP = 0x4
NOOP = 0x6
JOB_CREATED = 0x8
GRAB_JOB = 0x9
NO_JOB = 0xa
JOB_ASSIGN = 0xb
WORK_STATUS = 0xc
WORK_COMPLETE = 0xd
WORK_FAIL = 0xe
GET_STATUS = 0xf
ECHO_REQ = 0x10
ECHO_RES = 0x11
ERROR = 0x13
STATUS_RES = 0x14
SET_CLIENT_ID = 0x16
CAN_DO_TIMEOUT = 0x17
WORK_EXCEPTION = 0x19
WORK_DATA = 0x1c
WORK_WARNING = 0x1d
GRAB_JOB_UNIQ = 0x1e
JOB_ASSIGN_UNIQ = 0x1f
dtCanDo = 1
dtCantDo = 2
dtResetAbilities = 3
dtPreSleep = 4
dtNoop = 6
dtJobCreated = 8
dtGrabJob = 9
dtNoJob = 10
dtJobAssign = 11
dtWorkStatus = 12
dtWorkComplete = 13
dtWorkFail = 14
dtGetStatus = 15
dtEchoReq = 16
dtEchoRes = 17
dtError = 19
dtStatusRes = 20
dtSetClientId = 22
dtCanDoTimeout = 23
dtAllYours = 24
dtWorkException = 25
dtWorkData = 28
dtWorkWarning = 29
dtGrabJobUniq = 30
dtJobAssignUniq = 31

SUBMIT_JOB = 7
SUBMIT_JOB_BG = 18
SUBMIT_JOB_HIGH = 21
SUBMIT_JOB_HIGH_BG = 32
SUBMIT_JOB_LOW = 33
SUBMIT_JOB_LOW_BG = 34
dtSubmitJob = 7
dtSubmitJobBg = 18
dtSubmitJobHigh = 21
dtSubmitJobHighBg = 32
dtSubmitJobLow = 33
dtSubmitJobLowBg = 34
)

const (
// Job type
// JOB_NORMAL | JOB_BG means a normal level job run in background
// normal level
JOB_NORMAL = 0
JobNormal = 0
// background job
JOB_BG = 1
JobBg = 1
// low level
JOB_LOW = 2
JobLow = 2
// high level
JOB_HIGH = 4
JobHigh = 4
)

func getBuffer(l int) (buf []byte) {


+ 3
- 11
client/error.go Просмотреть файл

@@ -7,24 +7,16 @@ import (
)

var (
ErrJobTimeOut = errors.New("Do a job time out")
ErrInvalidData = errors.New("Invalid data")
ErrWorkWarning = errors.New("Work warning")
ErrInvalidData = errors.New("Invalid data")
ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type")
ErrOutOfCap = errors.New("Out of the capability")
ErrNotConn = errors.New("Did not connect to job server")
ErrFuncNotFound = errors.New("The function was not found")
ErrConnection = errors.New("Connection error")
ErrNoActiveAgent = errors.New("No active agent")
ErrTimeOut = errors.New("Executing time out")
ErrUnknown = errors.New("Unknown error")
ErrConnClosed = errors.New("Connection closed")
ErrLostConn = errors.New("Lost connection with Gearmand")
)

// Extract the error message
func GetError(data []byte) (err error) {
func getError(data []byte) (err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %V", data)


+ 5
- 0
client/id.go Просмотреть файл

@@ -7,6 +7,8 @@ import (
)

var (
// Global ID generator
// Default is an autoincrement ID generator
IdGen IdGenerator
)

@@ -14,6 +16,8 @@ func init() {
IdGen = NewAutoIncId()
}

// ID generator interface. Users can implament this for
// their own generator.
type IdGenerator interface {
Id() string
}
@@ -28,6 +32,7 @@ func (ai *autoincId) Id() string {
return strconv.FormatInt(next, 10)
}

// Return an autoincrement ID generator
func NewAutoIncId() IdGenerator {
// we'll consider the nano fraction of a second at startup unique
// and count up from there.


+ 7
- 5
client/pool.go Просмотреть файл

@@ -7,11 +7,13 @@ import (
)

const (
PoolSize = 10
poolSize = 10
)

var (
ErrNotFound = errors.New("Server Not Found")
SelectWithRate = selectWithRate
SelectRandom = selectRandom
)

type poolClient struct {
@@ -21,7 +23,7 @@ type poolClient struct {

type SelectionHandler func(map[string]*poolClient, string) string

func SelectWithRate(pool map[string]*poolClient,
func selectWithRate(pool map[string]*poolClient,
last string) (addr string) {
total := 0
for _, item := range pool {
@@ -33,7 +35,7 @@ func SelectWithRate(pool map[string]*poolClient,
return last
}

func SelectRandom(pool map[string]*poolClient,
func selectRandom(pool map[string]*poolClient,
last string) (addr string) {
r := rand.Intn(len(pool))
i := 0
@@ -56,10 +58,10 @@ type Pool struct {
mutex sync.Mutex
}

// Create a new pool.
// Return a new pool.
func NewPool() (pool *Pool) {
return &Pool{
clients: make(map[string]*poolClient, PoolSize),
clients: make(map[string]*poolClient, poolSize),
SelectionHandler: SelectWithRate,
}
}


+ 3
- 3
client/pool_test.go Просмотреть файл

@@ -42,7 +42,7 @@ func TestPoolEcho(t *testing.T) {

func TestPoolDoBg(t *testing.T) {
addr, handle, err := pool.DoBg("ToUpper",
[]byte("abcdef"), JOB_LOW)
[]byte("abcdef"), JobLow)
if err != nil {
t.Error(err)
return
@@ -65,7 +65,7 @@ func TestPoolDo(t *testing.T) {
return
}
addr, handle, err := pool.Do("ToUpper",
[]byte("abcdef"), JOB_LOW, jobHandler)
[]byte("abcdef"), JobLow, jobHandler)
if err != nil {
t.Error(err)
}
@@ -89,7 +89,7 @@ func TestPoolStatus(t *testing.T) {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
}
addr, handle, err := pool.Do("Delay5sec",
[]byte("abcdef"), JOB_LOW, nil)
[]byte("abcdef"), JobLow, nil)
if err != nil {
t.Error(err)
return


+ 4
- 4
client/request.go Просмотреть файл

@@ -12,13 +12,13 @@ type request struct {

// Encode a Request to byte slice
func (req *request) Encode() (data []byte) {
l := len(req.Data) // length of data
tl := l + MIN_PACKET_LEN // add 12 bytes head
l := len(req.Data) // length of data
tl := l + minPacketLength // add 12 bytes head
data = getBuffer(tl)
copy(data[:4], REQ_STR)
copy(data[:4], reqStr)
binary.BigEndian.PutUint32(data[4:8], req.DataType)
binary.BigEndian.PutUint32(data[8:12], uint32(l))
copy(data[MIN_PACKET_LEN:], req.Data)
copy(data[minPacketLength:], req.Data)
return
}



+ 13
- 13
client/response.go Просмотреть файл

@@ -24,14 +24,14 @@ type Response struct {
// after calling this method, the Response.Handle will be filled
func (resp *Response) Result() (data []byte, err error) {
switch resp.DataType {
case WORK_FAIL:
case dtWorkFail:
resp.Handle = string(resp.Data)
err = ErrWorkFail
return
case WORK_EXCEPTION:
case dtWorkException:
err = ErrWorkException
fallthrough
case WORK_COMPLETE:
case dtWorkComplete:
s := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = fmt.Errorf("Invalid data: %V", resp.Data)
@@ -47,8 +47,8 @@ func (resp *Response) Result() (data []byte, err error) {

// Extract the job's update
func (resp *Response) Update() (data []byte, err error) {
if resp.DataType != WORK_DATA &&
resp.DataType != WORK_WARNING {
if resp.DataType != dtWorkData &&
resp.DataType != dtWorkWarning {
err = ErrDataType
return
}
@@ -57,7 +57,7 @@ func (resp *Response) Update() (data []byte, err error) {
err = ErrInvalidData
return
}
if resp.DataType == WORK_WARNING {
if resp.DataType == dtWorkWarning {
err = ErrWorkWarning
}
resp.Handle = string(s[0])
@@ -67,12 +67,12 @@ func (resp *Response) Update() (data []byte, err error) {

// Decode a job from byte slice
func decodeResponse(data []byte) (resp *Response, l int, err error) {
if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes
if len(data) < minPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %V", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN]
dt := data[minPacketLength : dl+minPacketLength]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %V", data)
return
@@ -80,10 +80,10 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) {
resp = getResponse()
resp.DataType = binary.BigEndian.Uint32(data[4:8])
switch resp.DataType {
case JOB_CREATED:
case dtJobCreated:
resp.Handle = string(dt)
case STATUS_RES, WORK_DATA, WORK_WARNING, WORK_STATUS,
WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION:
case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus,
dtWorkComplete, dtWorkFail, dtWorkException:
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 2 {
resp.Handle = string(s[0])
@@ -92,12 +92,12 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) {
err = fmt.Errorf("Invalid data: %V", data)
return
}
case ECHO_RES:
case dtEchoRes:
fallthrough
default:
resp.Data = dt
}
l = dl + MIN_PACKET_LEN
l = dl + minPacketLength
return
}



Двоичные данные
example/client/client Просмотреть файл


+ 4
- 4
example/client/client.go Просмотреть файл

@@ -13,7 +13,7 @@ func main() {
// by implementing IdGenerator interface.
// client.IdGen = client.NewAutoIncId()

c, err := client.New("tcp4", "127.0.0.1:4730")
c, err := client.New(client.Network, "127.0.0.1:4730")
if err != nil {
log.Fatalln(err)
}
@@ -29,11 +29,11 @@ func main() {
}
log.Println(string(echomsg))
wg.Done()
jobHandler := func(job *client.Job) {
log.Printf("%s", job.Data)
jobHandler := func(resp *client.Response) {
log.Printf("%s", resp.Data)
wg.Done()
}
handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler)
handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler)
if err != nil {
log.Fatalln(err)
}


+ 1
- 1
example/worker/worker.go Просмотреть файл

@@ -29,7 +29,7 @@ func main() {
defer w.Close()
w.ErrorHandler = func(e error) {
log.Println(e)
if e == worker.ErrConnection {
if e == worker.ErrLostConn {
proc, err := os.FindProcess(os.Getpid())
if err != nil {
log.Println(err)


+ 2
- 1
worker/agent.go Просмотреть файл

@@ -49,8 +49,9 @@ func (a *agent) work() {
break
}
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the agent should colse the conection
// closed by Gearmand, the agent should close the conection
// and reconnect to job server.
a.Close()
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
a.worker.err(err)


+ 1
- 1
worker/worker_test.go Просмотреть файл

@@ -20,7 +20,7 @@ func TestWorkerErrNoneAgents(t *testing.T) {

func TestWorkerAddServer(t *testing.T) {
t.Log("Add local server 127.0.0.1:4730.")
if err := worker.AddServer("tcp4", "127.0.0.1:4730"); err != nil {
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}



Загрузка…
Отмена
Сохранить