diff --git a/client/client.go b/client/client.go index e19f6b2..0f8d00a 100644 --- a/client/client.go +++ b/client/client.go @@ -6,9 +6,9 @@ package client import ( - "bytes" "io" "net" + "bytes" "strconv" "bitbucket.org/mikespook/golib/autoinc" "bitbucket.org/mikespook/gearman-go/common" @@ -24,8 +24,8 @@ type StatusHandler func(string, bool, bool, uint64, uint64) The client side api for gearman usage: - c := client.New("tcp4", "127.0.0.1:4730") - handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) +c := client.New("tcp4", "127.0.0.1:4730") +handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) */ type Client struct { @@ -43,9 +43,9 @@ type Client struct { // Create a new client. // Connect to "addr" through "network" // Eg. -// client, err := client.New("tcp4", "127.0.0.1:4730") -func New(network, addr string) (client *Client, err error) { - conn, err := net.Dial(network, addr) +// client, err := client.New("127.0.0.1:4730") +func New(addr string) (client *Client, err error) { + conn, err := net.Dial("tcp", addr) if err != nil { return } @@ -56,8 +56,8 @@ func New(network, addr string) (client *Client, err error) { conn: conn, ai: autoinc.New(0, 1), } - go client.outLoop() go client.inLoop() + go client.outLoop() return } @@ -66,7 +66,9 @@ func (client *Client) outLoop() { ok := true for ok { if job, ok := <-client.out; ok { - client.write(job.Encode()) + if err := client.write(job.Encode()); err != nil { + client.err(err) + } } } } @@ -88,14 +90,14 @@ func (client *Client) inLoop() { case common.ERROR: _, err := common.GetError(job.Data) client.err(err) - case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, + case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION, common.ECHO_RES: - client.handleJob(job) + go client.handleJob(job) case common.JOB_CREATED: client.jobCreated <- job case common.STATUS_RES: - client.handleStatus(job) + go client.handleStatus(job) } } } @@ -112,6 +114,10 @@ func (client *Client) read() (data []byte, err error) { var n int if n, err = client.conn.Read(buf); err != nil { if err == io.EOF && n == 0 { + if data == nil { + err = common.ErrEmptyReading + return + } break } return @@ -123,8 +129,8 @@ func (client *Client) read() (data []byte, err error) { } } // split package - start, end := 0, 4 tl := len(data) + start, end := 0, 4 for i := 0; i < tl; i++ { if string(data[start:end]) == common.RES_STR { l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) @@ -193,19 +199,19 @@ func (client *Client) handleStatus(job *Job) { // JOB_LOW | JOB_BG means the job is running with low level in background. func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { var datatype uint32 - if flag & common.JOB_LOW == common.JOB_LOW { - if flag & common.JOB_BG == common.JOB_BG { + if flag & JOB_LOW == JOB_LOW { + if flag & JOB_BG == JOB_BG { datatype = common.SUBMIT_JOB_LOW_BG } else { datatype = common.SUBMIT_JOB_LOW } - } else if flag & common.JOB_HIGH == common.JOB_HIGH { - if flag & common.JOB_BG == common.JOB_BG { + } else if flag & JOB_HIGH == JOB_HIGH { + if flag & JOB_BG == JOB_BG { datatype = common.SUBMIT_JOB_HIGH_BG } else { datatype = common.SUBMIT_JOB_HIGH } - } else if flag & common.JOB_BG == common.JOB_BG { + } else if flag & JOB_BG == JOB_BG { datatype = common.SUBMIT_JOB_BG } else { datatype = common.SUBMIT_JOB @@ -213,14 +219,13 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string uid := strconv.Itoa(int(client.ai.Id())) l := len(funcname) + len(uid) + len(data) + 2 - rel := make([]byte, l) + rel := make([]byte, 0, l) rel = append(rel, []byte(funcname)...) // len(funcname) rel = append(rel, '\x00') // 1 Byte rel = append(rel, []byte(uid)...) // len(uid) rel = append(rel, '\x00') // 1 Byte rel = append(rel, data...) // len(data) client.writeJob(newJob(common.REQ, datatype, rel)) - // Waiting for JOB_CREATED job := <-client.jobCreated return string(job.Data), nil @@ -259,5 +264,6 @@ func (client *Client) write(buf []byte) (err error) { func (client *Client) Close() (err error) { close(client.jobCreated) close(client.in) + close(client.out) return client.conn.Close(); } diff --git a/client/client_test.go b/client/client_test.go index a85832f..3853642 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,7 +1,6 @@ package client import ( - "bitbucket.org/mikespook/gearman-go/common" "testing" ) @@ -10,9 +9,12 @@ var client *Client func TestClientAddServer(t *testing.T) { t.Log("Add local server 127.0.0.1:4730") var err error - if client, err = New("tcp4", "127.0.0.1:4730"); err != nil { + if client, err = New("127.0.0.1:4730"); err != nil { t.Error(err) } + client.ErrHandler = func(e error) { + t.Error(e) + } } func TestClientEcho(t *testing.T) { @@ -29,7 +31,7 @@ func TestClientEcho(t *testing.T) { } func TestClientDo(t *testing.T) { - if handle, err := client.Do("ToUpper", []byte("abcdef"), common.JOB_LOW|common.JOB_BG); err != nil { + if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { t.Error(err) } else { t.Log(handle) diff --git a/client/job.go b/client/job.go index c381464..6b7246a 100644 --- a/client/job.go +++ b/client/job.go @@ -9,6 +9,20 @@ import ( "bytes" "bitbucket.org/mikespook/gearman-go/common" ) + +const ( + // Job type + // JOB_NORMAL | JOB_BG means a normal level job run in background + // normal level + JOB_NORMAL = 0 + // background job + JOB_BG = 1 + // low level + JOB_LOW = 2 + // high level + JOB_HIGH = 4 +) + // An error handler type ErrorHandler func(error) @@ -42,13 +56,15 @@ func decodeJob(data []byte) (job *Job, err error) { // Encode a job to byte slice func (job *Job) Encode() (data []byte) { - l := len(job.Data) + 12 - data = make([]byte, l) + l := len(job.Data) + tl := l + 12 + data = make([]byte, tl) magiccode := common.Uint32ToBytes(job.magicCode) datatype := common.Uint32ToBytes(job.DataType) datalength := common.Uint32ToBytes(uint32(l)) - for i := 0; i < l; i ++ { + + for i := 0; i < tl; i ++ { switch { case i < 4: data[i] = magiccode[i] @@ -60,6 +76,13 @@ func (job *Job) Encode() (data []byte) { data[i] = job.Data[i - 12] } } + // Alternative + /* + data = append(data, magiccode[:] ...) + data = append(data, datatype[:] ...) + data = append(data, datalength[:] ...) + data = append(data, job.Data ...) + */ return } diff --git a/common/error.go b/common/error.go index 25ca67e..8c1e8fa 100644 --- a/common/error.go +++ b/common/error.go @@ -21,6 +21,7 @@ var ( ErrOutOfCap = errors.New("Out of the capability.") ErrNotConn = errors.New("Did not connect to job server.") ErrFuncNotFound = errors.New("The function was not found.") + ErrEmptyReading = errors.New("Empty reading.") ) // Extract the error message diff --git a/common/gearman.go b/common/gearman.go index 1bdd6f7..eebaec1 100644 --- a/common/gearman.go +++ b/common/gearman.go @@ -54,17 +54,6 @@ const ( SUBMIT_JOB_HIGH_BG = 32 SUBMIT_JOB_LOW = 33 SUBMIT_JOB_LOW_BG = 34 - - // Job type - // JOB_NORMAL | JOB_BG means a normal level job run in background - // normal level - JOB_NORMAL = 0 - // background job - JOB_BG = 1 - // low level - JOB_LOW = 2 - // high level - JOB_HIGH = 4 ) // Decode [4]byte to uint32 diff --git a/example/client.go b/example/client.go index d406eef..d1ad7a2 100644 --- a/example/client.go +++ b/example/client.go @@ -3,29 +3,32 @@ package main import ( "log" "sync" - "bitbucket.org/mikespook/gearman-go" "bitbucket.org/mikespook/gearman-go/client" ) func main() { var wg sync.WaitGroup - c, err := client.New("tcp4", "127.0.0.1:4730") + c, err := client.New("127.0.0.1:4730") if err != nil { log.Fatalln(err) } defer c.Close() echo := []byte("Hello\x00 world") c.JobHandler = func(job *client.Job) error { - log.Printf("%V", job) + log.Printf("%s", job.Data) wg.Done() return nil } + c.ErrHandler = func(e error) { + log.Println(e) + panic(e) + } wg.Add(1) c.Echo(echo) - - handle, err := c.Do("ToUpper", echo, gearman.JOB_NORMAL) + wg.Add(1) + handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL) if err != nil { log.Fatalln(err) } else { @@ -38,5 +41,6 @@ func main() { } wg.Add(1) c.Status(handle) + wg.Wait() }