diff --git a/client/client.go b/client/client.go index 00dbd63..46b544b 100644 --- a/client/client.go +++ b/client/client.go @@ -1,66 +1,47 @@ +// The client package helps developers connect to Gearmand, send +// jobs and fetch result. package client import ( "io" "net" "sync" - // "fmt" ) -/* -The client side api for gearman - -usage: -c := client.New("tcp4", "127.0.0.1:4730") -handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) - -*/ +// One client connect to one server. +// Use Pool for multi-connections. type Client struct { sync.Mutex net, addr, lastcall string respHandler map[string]ResponseHandler innerHandler map[string]ResponseHandler - in chan []byte + in chan *Response isConn bool conn net.Conn ErrorHandler ErrorHandler } -// Create a new client. -// Connect to "addr" through "network" -// Eg. -// client, err := client.New("127.0.0.1:4730") -func New(net, addr string) (client *Client, err error) { +// Return a client. +func New(network, addr string) (client *Client, err error) { client = &Client{ - net: net, + net: network, addr: addr, - respHandler: make(map[string]ResponseHandler, QUEUE_SIZE), - innerHandler: make(map[string]ResponseHandler, QUEUE_SIZE), - in: make(chan []byte, QUEUE_SIZE), + respHandler: make(map[string]ResponseHandler, queueSize), + innerHandler: make(map[string]ResponseHandler, queueSize), + in: make(chan *Response, queueSize), } - if err = client.connect(); err != nil { - return - } - go client.readLoop() - go client.processLoop() - return -} - -// {{{ private functions - -// -func (client *Client) connect() (err error) { client.conn, err = net.Dial(client.net, client.addr) if err != nil { return } client.isConn = true + go client.readLoop() + go client.processLoop() return } -// Internal write func (client *Client) write(req *request) (err error) { var n int buf := req.Encode() @@ -73,61 +54,52 @@ func (client *Client) write(req *request) (err error) { return } -// read length bytes from the socket func (client *Client) read(length int) (data []byte, err error) { n := 0 - buf := getBuffer(BUFFER_SIZE) + buf := getBuffer(bufferSize) // read until data can be unpacked - for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n { + for i := length; i > 0 || len(data) < minPacketLength; i -= n { if n, err = client.conn.Read(buf); err != nil { - if !client.isConn { - err = ErrConnClosed - return - } - if err == io.EOF && n == 0 { - if data == nil { - err = ErrConnection - } + if err == io.EOF { + err = ErrLostConn } return } data = append(data, buf[0:n]...) - if n < BUFFER_SIZE { + if n < bufferSize { break } } return } -// read data from socket func (client *Client) readLoop() { - var data []byte + defer close(client.in) + var data, leftdata []byte var err error - for client.isConn { - if data, err = client.read(BUFFER_SIZE); err != nil { - if err == ErrConnClosed { + var resp *Response + for { + if data, err = client.read(bufferSize); err != nil { + client.err(err) + if err == ErrLostConn { + break + } + // If it is unexpected error and the connection wasn't + // closed by Gearmand, the client should close the conection + // and reconnect to job server. + client.Close() + client.conn, err = net.Dial(client.net, client.addr) + if err != nil { + client.err(err) break } - client.err(err) continue } - client.in <- data - } - close(client.in) -} - -// decode data & process it -func (client *Client) processLoop() { - var resp *Response - var l int - var err error - var data, leftdata []byte - for data = range client.in { if len(leftdata) > 0 { // some data left for processing data = append(leftdata, data...) } - l = len(data) - if l < MIN_PACKET_LEN { // not enough data + l := len(data) + if l < minPacketLength { // not enough data leftdata = data continue } @@ -135,41 +107,43 @@ func (client *Client) processLoop() { client.err(err) continue } + client.in <- resp leftdata = nil - for resp != nil { - switch resp.DataType { - case ERROR: - if client.lastcall != "" { - resp = client.handleInner(client.lastcall, resp) - client.lastcall = "" - } else { - client.err(GetError(resp.Data)) - } - case STATUS_RES: - resp = client.handleInner("s"+resp.Handle, resp) - case JOB_CREATED: - resp = client.handleInner("c", resp) - case ECHO_RES: - resp = client.handleInner("e", resp) - case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, - WORK_FAIL, WORK_EXCEPTION: - resp = client.handleResponse(resp.Handle, resp) - } - } if len(data) > l { leftdata = data[l:] } } } -// error handler +func (client *Client) processLoop() { + for resp := range client.in { + switch resp.DataType { + case dtError: + if client.lastcall != "" { + resp = client.handleInner(client.lastcall, resp) + client.lastcall = "" + } else { + client.err(getError(resp.Data)) + } + case dtStatusRes: + resp = client.handleInner("s"+resp.Handle, resp) + case dtJobCreated: + resp = client.handleInner("c", resp) + case dtEchoRes: + resp = client.handleInner("e", resp) + case dtWorkData, dtWorkWarning, dtWorkStatus, dtWorkComplete, + dtWorkFail, dtWorkException: + resp = client.handleResponse(resp.Handle, resp) + } + } +} + func (client *Client) err(e error) { if client.ErrorHandler != nil { client.ErrorHandler(e) } } -// job handler func (client *Client) handleResponse(key string, resp *Response) *Response { if h, ok := client.respHandler[key]; ok { h(resp) @@ -179,7 +153,6 @@ func (client *Client) handleResponse(key string, resp *Response) *Response { return resp } -// job handler func (client *Client) handleInner(key string, resp *Response) *Response { if h, ok := client.innerHandler[key]; ok { h(resp) @@ -189,15 +162,14 @@ func (client *Client) handleInner(key string, resp *Response) *Response { return resp } -// Internal do func (client *Client) do(funcname string, data []byte, flag uint32) (handle string, err error) { var mutex sync.Mutex mutex.Lock() client.lastcall = "c" client.innerHandler["c"] = func(resp *Response) { - if resp.DataType == ERROR { - err = GetError(resp.Data) + if resp.DataType == dtError { + err = getError(resp.Data) return } handle = resp.Handle @@ -211,22 +183,18 @@ func (client *Client) do(funcname string, data []byte, return } -// }}} - -// Do the function. -// funcname is a string with function name. -// data is encoding to byte array. -// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH +// Call the function and get a response. +// flag can be set to: JobLow, JobNormal and JobHigh func (client *Client) Do(funcname string, data []byte, flag byte, h ResponseHandler) (handle string, err error) { var datatype uint32 switch flag { - case JOB_LOW: - datatype = SUBMIT_JOB_LOW - case JOB_HIGH: - datatype = SUBMIT_JOB_HIGH + case JobLow: + datatype = dtSubmitJobLow + case JobHigh: + datatype = dtSubmitJobHigh default: - datatype = SUBMIT_JOB + datatype = dtSubmitJob } handle, err = client.do(funcname, data, datatype) if h != nil { @@ -235,27 +203,24 @@ func (client *Client) Do(funcname string, data []byte, return } -// Do the function at background. -// funcname is a string with function name. -// data is encoding to byte array. -// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH +// Call the function in background, no response needed. +// flag can be set to: JobLow, JobNormal and JobHigh func (client *Client) DoBg(funcname string, data []byte, flag byte) (handle string, err error) { var datatype uint32 switch flag { - case JOB_LOW: - datatype = SUBMIT_JOB_LOW_BG - case JOB_HIGH: - datatype = SUBMIT_JOB_HIGH_BG + case JobLow: + datatype = dtSubmitJobLowBg + case JobHigh: + datatype = dtSubmitJobHighBg default: - datatype = SUBMIT_JOB_BG + datatype = dtSubmitJobBg } handle, err = client.do(funcname, data, datatype) return } // Get job status from job server. -// !!!Not fully tested.!!! func (client *Client) Status(handle string) (status *Status, err error) { var mutex sync.Mutex mutex.Lock() @@ -269,14 +234,14 @@ func (client *Client) Status(handle string) (status *Status, err error) { mutex.Unlock() } req := getRequest() - req.DataType = GET_STATUS + req.DataType = dtGetStatus req.Data = []byte(handle) client.write(req) mutex.Lock() return } -// Send a something out, get the samething back. +// Echo. func (client *Client) Echo(data []byte) (echo []byte, err error) { var mutex sync.Mutex mutex.Lock() @@ -285,7 +250,7 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) { mutex.Unlock() } req := getRequest() - req.DataType = ECHO_REQ + req.DataType = dtEchoReq req.Data = data client.lastcall = "e" client.write(req) @@ -293,8 +258,13 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) { return } -// Close +// Close connection func (client *Client) Close() (err error) { - client.isConn = false - return client.conn.Close() + client.Lock() + defer client.Unlock() + if client.conn != nil { + err = client.conn.Close() + client.conn = nil + } + return } diff --git a/client/client_test.go b/client/client_test.go index 1cbb48a..ca86868 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -13,7 +13,7 @@ var client *Client func TestClientAddServer(t *testing.T) { t.Log("Add local server 127.0.0.1:4730") var err error - if client, err = New("tcp4", "127.0.0.1:4730"); err != nil { + if client, err = New(Network, "127.0.0.1:4730"); err != nil { t.Fatal(err) } client.ErrorHandler = func(e error) { @@ -34,7 +34,7 @@ func TestClientEcho(t *testing.T) { } func TestClientDoBg(t *testing.T) { - handle, err := client.DoBg("ToUpper", []byte("abcdef"), JOB_LOW) + handle, err := client.DoBg("ToUpper", []byte("abcdef"), JobLow) if err != nil { t.Error(err) return @@ -57,7 +57,7 @@ func TestClientDo(t *testing.T) { return } handle, err := client.Do("ToUpper", []byte("abcdef"), - JOB_LOW, jobHandler) + JobLow, jobHandler) if err != nil { t.Error(err) return @@ -84,7 +84,7 @@ func TestClientStatus(t *testing.T) { return } - handle, err := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil) + handle, err := client.Do("Delay5sec", []byte("abcdef"), JobLow, nil) if err != nil { t.Error(err) return diff --git a/client/common.go b/client/common.go index 34b1c81..b011662 100644 --- a/client/common.go +++ b/client/common.go @@ -1,66 +1,67 @@ package client const ( - NETWORK = "tcp" + Network = "tcp" // queue size - QUEUE_SIZE = 8 + queueSize = 8 // read buffer size - BUFFER_SIZE = 1024 + bufferSize = 1024 // min packet length - MIN_PACKET_LEN = 12 + minPacketLength = 12 // \x00REQ - REQ = 5391697 - REQ_STR = "\x00REQ" + req = 5391697 + reqStr = "\x00REQ" // \x00RES - RES = 5391699 - RES_STR = "\x00RES" + res = 5391699 + resStr = "\x00RES" // package data type - CAN_DO = 0x1 - CANT_DO = 0x2 - RESET_ABILITIES = 0x3 - PRE_SLEEP = 0x4 - NOOP = 0x6 - JOB_CREATED = 0x8 - GRAB_JOB = 0x9 - NO_JOB = 0xa - JOB_ASSIGN = 0xb - WORK_STATUS = 0xc - WORK_COMPLETE = 0xd - WORK_FAIL = 0xe - GET_STATUS = 0xf - ECHO_REQ = 0x10 - ECHO_RES = 0x11 - ERROR = 0x13 - STATUS_RES = 0x14 - SET_CLIENT_ID = 0x16 - CAN_DO_TIMEOUT = 0x17 - WORK_EXCEPTION = 0x19 - WORK_DATA = 0x1c - WORK_WARNING = 0x1d - GRAB_JOB_UNIQ = 0x1e - JOB_ASSIGN_UNIQ = 0x1f + dtCanDo = 1 + dtCantDo = 2 + dtResetAbilities = 3 + dtPreSleep = 4 + dtNoop = 6 + dtJobCreated = 8 + dtGrabJob = 9 + dtNoJob = 10 + dtJobAssign = 11 + dtWorkStatus = 12 + dtWorkComplete = 13 + dtWorkFail = 14 + dtGetStatus = 15 + dtEchoReq = 16 + dtEchoRes = 17 + dtError = 19 + dtStatusRes = 20 + dtSetClientId = 22 + dtCanDoTimeout = 23 + dtAllYours = 24 + dtWorkException = 25 + dtWorkData = 28 + dtWorkWarning = 29 + dtGrabJobUniq = 30 + dtJobAssignUniq = 31 - SUBMIT_JOB = 7 - SUBMIT_JOB_BG = 18 - SUBMIT_JOB_HIGH = 21 - SUBMIT_JOB_HIGH_BG = 32 - SUBMIT_JOB_LOW = 33 - SUBMIT_JOB_LOW_BG = 34 + dtSubmitJob = 7 + dtSubmitJobBg = 18 + dtSubmitJobHigh = 21 + dtSubmitJobHighBg = 32 + dtSubmitJobLow = 33 + dtSubmitJobLowBg = 34 ) const ( // Job type // JOB_NORMAL | JOB_BG means a normal level job run in background // normal level - JOB_NORMAL = 0 + JobNormal = 0 // background job - JOB_BG = 1 + JobBg = 1 // low level - JOB_LOW = 2 + JobLow = 2 // high level - JOB_HIGH = 4 + JobHigh = 4 ) func getBuffer(l int) (buf []byte) { diff --git a/client/error.go b/client/error.go index a10857c..b6c61ea 100644 --- a/client/error.go +++ b/client/error.go @@ -7,24 +7,16 @@ import ( ) var ( - ErrJobTimeOut = errors.New("Do a job time out") - ErrInvalidData = errors.New("Invalid data") ErrWorkWarning = errors.New("Work warning") + ErrInvalidData = errors.New("Invalid data") ErrWorkFail = errors.New("Work fail") ErrWorkException = errors.New("Work exeption") ErrDataType = errors.New("Invalid data type") - ErrOutOfCap = errors.New("Out of the capability") - ErrNotConn = errors.New("Did not connect to job server") - ErrFuncNotFound = errors.New("The function was not found") - ErrConnection = errors.New("Connection error") - ErrNoActiveAgent = errors.New("No active agent") - ErrTimeOut = errors.New("Executing time out") - ErrUnknown = errors.New("Unknown error") - ErrConnClosed = errors.New("Connection closed") + ErrLostConn = errors.New("Lost connection with Gearmand") ) // Extract the error message -func GetError(data []byte) (err error) { +func getError(data []byte) (err error) { rel := bytes.SplitN(data, []byte{'\x00'}, 2) if len(rel) != 2 { err = fmt.Errorf("Not a error data: %V", data) diff --git a/client/id.go b/client/id.go index eb9992a..4c92e99 100644 --- a/client/id.go +++ b/client/id.go @@ -7,6 +7,8 @@ import ( ) var ( + // Global ID generator + // Default is an autoincrement ID generator IdGen IdGenerator ) @@ -14,6 +16,8 @@ func init() { IdGen = NewAutoIncId() } +// ID generator interface. Users can implament this for +// their own generator. type IdGenerator interface { Id() string } @@ -28,6 +32,7 @@ func (ai *autoincId) Id() string { return strconv.FormatInt(next, 10) } +// Return an autoincrement ID generator func NewAutoIncId() IdGenerator { // we'll consider the nano fraction of a second at startup unique // and count up from there. diff --git a/client/pool.go b/client/pool.go index fb0e65c..1204908 100644 --- a/client/pool.go +++ b/client/pool.go @@ -7,11 +7,13 @@ import ( ) const ( - PoolSize = 10 + poolSize = 10 ) var ( ErrNotFound = errors.New("Server Not Found") + SelectWithRate = selectWithRate + SelectRandom = selectRandom ) type poolClient struct { @@ -21,7 +23,7 @@ type poolClient struct { type SelectionHandler func(map[string]*poolClient, string) string -func SelectWithRate(pool map[string]*poolClient, +func selectWithRate(pool map[string]*poolClient, last string) (addr string) { total := 0 for _, item := range pool { @@ -33,7 +35,7 @@ func SelectWithRate(pool map[string]*poolClient, return last } -func SelectRandom(pool map[string]*poolClient, +func selectRandom(pool map[string]*poolClient, last string) (addr string) { r := rand.Intn(len(pool)) i := 0 @@ -56,10 +58,10 @@ type Pool struct { mutex sync.Mutex } -// Create a new pool. +// Return a new pool. func NewPool() (pool *Pool) { return &Pool{ - clients: make(map[string]*poolClient, PoolSize), + clients: make(map[string]*poolClient, poolSize), SelectionHandler: SelectWithRate, } } diff --git a/client/pool_test.go b/client/pool_test.go index bd3402f..5324db9 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -42,7 +42,7 @@ func TestPoolEcho(t *testing.T) { func TestPoolDoBg(t *testing.T) { addr, handle, err := pool.DoBg("ToUpper", - []byte("abcdef"), JOB_LOW) + []byte("abcdef"), JobLow) if err != nil { t.Error(err) return @@ -65,7 +65,7 @@ func TestPoolDo(t *testing.T) { return } addr, handle, err := pool.Do("ToUpper", - []byte("abcdef"), JOB_LOW, jobHandler) + []byte("abcdef"), JobLow, jobHandler) if err != nil { t.Error(err) } @@ -89,7 +89,7 @@ func TestPoolStatus(t *testing.T) { t.Errorf("The job (%s) shouldn't be running.", status.Handle) } addr, handle, err := pool.Do("Delay5sec", - []byte("abcdef"), JOB_LOW, nil) + []byte("abcdef"), JobLow, nil) if err != nil { t.Error(err) return diff --git a/client/request.go b/client/request.go index 0c49b18..a374997 100644 --- a/client/request.go +++ b/client/request.go @@ -12,13 +12,13 @@ type request struct { // Encode a Request to byte slice func (req *request) Encode() (data []byte) { - l := len(req.Data) // length of data - tl := l + MIN_PACKET_LEN // add 12 bytes head + l := len(req.Data) // length of data + tl := l + minPacketLength // add 12 bytes head data = getBuffer(tl) - copy(data[:4], REQ_STR) + copy(data[:4], reqStr) binary.BigEndian.PutUint32(data[4:8], req.DataType) binary.BigEndian.PutUint32(data[8:12], uint32(l)) - copy(data[MIN_PACKET_LEN:], req.Data) + copy(data[minPacketLength:], req.Data) return } diff --git a/client/response.go b/client/response.go index 0e7d667..3903ab7 100644 --- a/client/response.go +++ b/client/response.go @@ -24,14 +24,14 @@ type Response struct { // after calling this method, the Response.Handle will be filled func (resp *Response) Result() (data []byte, err error) { switch resp.DataType { - case WORK_FAIL: + case dtWorkFail: resp.Handle = string(resp.Data) err = ErrWorkFail return - case WORK_EXCEPTION: + case dtWorkException: err = ErrWorkException fallthrough - case WORK_COMPLETE: + case dtWorkComplete: s := bytes.SplitN(resp.Data, []byte{'\x00'}, 2) if len(s) != 2 { err = fmt.Errorf("Invalid data: %V", resp.Data) @@ -47,8 +47,8 @@ func (resp *Response) Result() (data []byte, err error) { // Extract the job's update func (resp *Response) Update() (data []byte, err error) { - if resp.DataType != WORK_DATA && - resp.DataType != WORK_WARNING { + if resp.DataType != dtWorkData && + resp.DataType != dtWorkWarning { err = ErrDataType return } @@ -57,7 +57,7 @@ func (resp *Response) Update() (data []byte, err error) { err = ErrInvalidData return } - if resp.DataType == WORK_WARNING { + if resp.DataType == dtWorkWarning { err = ErrWorkWarning } resp.Handle = string(s[0]) @@ -67,12 +67,12 @@ func (resp *Response) Update() (data []byte, err error) { // Decode a job from byte slice func decodeResponse(data []byte) (resp *Response, l int, err error) { - if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes + if len(data) < minPacketLength { // valid package should not less 12 bytes err = fmt.Errorf("Invalid data: %V", data) return } dl := int(binary.BigEndian.Uint32(data[8:12])) - dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN] + dt := data[minPacketLength : dl+minPacketLength] if len(dt) != int(dl) { // length not equal err = fmt.Errorf("Invalid data: %V", data) return @@ -80,10 +80,10 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) { resp = getResponse() resp.DataType = binary.BigEndian.Uint32(data[4:8]) switch resp.DataType { - case JOB_CREATED: + case dtJobCreated: resp.Handle = string(dt) - case STATUS_RES, WORK_DATA, WORK_WARNING, WORK_STATUS, - WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: + case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus, + dtWorkComplete, dtWorkFail, dtWorkException: s := bytes.SplitN(dt, []byte{'\x00'}, 2) if len(s) >= 2 { resp.Handle = string(s[0]) @@ -92,12 +92,12 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) { err = fmt.Errorf("Invalid data: %V", data) return } - case ECHO_RES: + case dtEchoRes: fallthrough default: resp.Data = dt } - l = dl + MIN_PACKET_LEN + l = dl + minPacketLength return } diff --git a/example/client/client b/example/client/client deleted file mode 100755 index c01975f..0000000 Binary files a/example/client/client and /dev/null differ diff --git a/example/client/client.go b/example/client/client.go index c65d7b3..f84178e 100644 --- a/example/client/client.go +++ b/example/client/client.go @@ -13,7 +13,7 @@ func main() { // by implementing IdGenerator interface. // client.IdGen = client.NewAutoIncId() - c, err := client.New("tcp4", "127.0.0.1:4730") + c, err := client.New(client.Network, "127.0.0.1:4730") if err != nil { log.Fatalln(err) } @@ -29,11 +29,11 @@ func main() { } log.Println(string(echomsg)) wg.Done() - jobHandler := func(job *client.Job) { - log.Printf("%s", job.Data) + jobHandler := func(resp *client.Response) { + log.Printf("%s", resp.Data) wg.Done() } - handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) + handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler) if err != nil { log.Fatalln(err) } diff --git a/example/worker/worker.go b/example/worker/worker.go index d610776..2fc6103 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -29,7 +29,7 @@ func main() { defer w.Close() w.ErrorHandler = func(e error) { log.Println(e) - if e == worker.ErrConnection { + if e == worker.ErrLostConn { proc, err := os.FindProcess(os.Getpid()) if err != nil { log.Println(err) diff --git a/worker/agent.go b/worker/agent.go index 8dd0ba6..e6a95f5 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -49,8 +49,9 @@ func (a *agent) work() { break } // If it is unexpected error and the connection wasn't - // closed by Gearmand, the agent should colse the conection + // closed by Gearmand, the agent should close the conection // and reconnect to job server. + a.Close() a.conn, err = net.Dial(a.net, a.addr) if err != nil { a.worker.err(err) diff --git a/worker/worker_test.go b/worker/worker_test.go index b76e6b9..7fa8f0b 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -20,7 +20,7 @@ func TestWorkerErrNoneAgents(t *testing.T) { func TestWorkerAddServer(t *testing.T) { t.Log("Add local server 127.0.0.1:4730.") - if err := worker.AddServer("tcp4", "127.0.0.1:4730"); err != nil { + if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil { t.Error(err) }