From 568c70b083399853f58d853ded6211ddfd2f98c5 Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 23 Apr 2013 16:58:06 +0800 Subject: [PATCH] #12 fixed --- client/client.go | 24 +++++++++++---------- client/id.go | 2 +- client/job.go | 3 +-- client/pool.go | 5 +++-- common/error.go | 4 ++-- example/client.go | 15 ++++++++------ gearman_test.go | 53 ++++++++++++++++++++++++++++++++++------------- worker/job.go | 26 +++++++++++++++++++---- worker/worker.go | 22 +++----------------- 9 files changed, 93 insertions(+), 61 deletions(-) diff --git a/client/client.go b/client/client.go index 2cbd39b..d09e70f 100644 --- a/client/client.go +++ b/client/client.go @@ -82,16 +82,12 @@ func New(addr string) (client *Client, err error) { // func (client *Client) connect() (err error) { - client.mutex.Lock() - defer client.mutex.Unlock() client.conn, err = net.Dial(common.NETWORK, client.addr) return } // Internal write func (client *Client) write(buf []byte) (err error) { - client.mutex.RLock() - defer client.mutex.RUnlock() var n int for i := 0; i < len(buf); i += n { n, err = client.conn.Write(buf[i:]) @@ -104,8 +100,6 @@ func (client *Client) write(buf []byte) (err error) { // read length bytes from the socket func (client *Client) readData(length int) (data []byte, err error) { - client.mutex.RLock() - defer client.mutex.RUnlock() n := 0 buf := make([]byte, common.BUFFER_SIZE) // read until data can be unpacked @@ -236,9 +230,11 @@ func (client *Client) err (e error) { // job handler func (client *Client) handleJob(job *Job) { - if h, ok := client.jobhandlers[job.UniqueId]; ok { + client.mutex.RLock() + defer client.mutex.RUnlock() + if h, ok := client.jobhandlers[job.Handle]; ok { h(job) - delete(client.jobhandlers, job.UniqueId) + delete(client.jobhandlers, job.Handle) } } @@ -316,9 +312,11 @@ flag byte, jobhandler JobHandler) (handle string) { datatype = common.SUBMIT_JOB } id := IdGen.Id() + client.mutex.Lock() + defer client.mutex.Unlock() handle = client.do(funcname, data, datatype, id) if jobhandler != nil { - client.jobhandlers[id] = jobhandler + client.jobhandlers[handle] = jobhandler } return } @@ -341,9 +339,13 @@ flag byte) (handle string) { // Get job status from job server. // !!!Not fully tested.!!! -func (client *Client) Status(handle string) (status *Status) { +func (client *Client) Status(handle string, timeout time.Duration) (status *Status, err error) { client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle))) - status = <-client.status + select { + case status = <-client.status: + case <-time.NewTimer(timeout).C: + err = common.ErrTimeOut + } return } diff --git a/client/id.go b/client/id.go index e55c911..1b86751 100644 --- a/client/id.go +++ b/client/id.go @@ -16,7 +16,7 @@ type objectId struct { } func (id *objectId) Id() string { - return id.String() + return id.Hex() } func NewObjectId() IdGenerator { diff --git a/client/job.go b/client/job.go index 3679093..2402a58 100644 --- a/client/job.go +++ b/client/job.go @@ -51,7 +51,6 @@ func decodeJob(data []byte) (job *Job, err error) { return nil, common.Errorf("Invalid data: %V", data) } data = data[12:] - var handle string switch datatype { case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, @@ -59,7 +58,7 @@ func decodeJob(data []byte) (job *Job, err error) { i := bytes.IndexByte(data, '\x00') if i != -1 { handle = string(data[:i]) - data = data[i:] + data = data[i + 1:] } } diff --git a/client/pool.go b/client/pool.go index 03831b0..894cf56 100644 --- a/client/pool.go +++ b/client/pool.go @@ -7,6 +7,7 @@ package client import ( "sync" + "time" "errors" "math/rand" "github.com/mikespook/gearman-go/common" @@ -115,9 +116,9 @@ flag byte) (addr, handle string) { // Get job status from job server. // !!!Not fully tested.!!! -func (pool *Pool) Status(addr, handle string) (status *Status, err error) { +func (pool *Pool) Status(addr, handle string, timeout time.Duration) (status *Status, err error) { if client, ok := pool.clients[addr]; ok { - status = client.Status(handle) + status, err = client.Status(handle, timeout) } else { err = ErrNotFound } diff --git a/common/error.go b/common/error.go index b86ce38..4ee3195 100644 --- a/common/error.go +++ b/common/error.go @@ -24,8 +24,8 @@ var ( ErrFuncNotFound = errors.New("The function was not found.") ErrConnection = errors.New("Connection error.") ErrNoActiveAgent = errors.New("No active agent.") - ErrExecTimeOut = errors.New("Executing time out.") - ErrUnknown = errors.New("Unknown error.") + ErrTimeOut = errors.New("Executing time out.") + ErrUnknown = errors.New("Unknown error.") ) func DisablePanic() {recover()} diff --git a/example/client.go b/example/client.go index f7cdfa4..d377312 100644 --- a/example/client.go +++ b/example/client.go @@ -3,6 +3,7 @@ package main import ( "log" "sync" + "time" "github.com/mikespook/gearman-go/client" ) @@ -11,7 +12,7 @@ func main() { // Set the autoinc id generator // You can write your own id generator // by implementing IdGenerator interface. - client.IdGen = client.NewAutoIncId() + // client.IdGen = client.NewAutoIncId() c, err := client.New("127.0.0.1:4730") if err != nil { @@ -20,20 +21,22 @@ func main() { defer c.Close() c.ErrHandler = func(e error) { log.Println(e) - panic(e) } echo := []byte("Hello\x00 world") wg.Add(1) - c.Echo(echo) - wg.Add(1) + log.Println(string(c.Echo(echo))) + wg.Done() jobHandler := func(job *client.Job) { log.Printf("%s", job.Data) wg.Done() } handle := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) - wg.Add(1) - log.Printf("%t", c.Status(handle)) + status, err := c.Status(handle, time.Second) + if err != nil { + log.Fatalln(err) + } + log.Printf("%t", status) wg.Wait() } diff --git a/gearman_test.go b/gearman_test.go index 79081d4..b4c827c 100644 --- a/gearman_test.go +++ b/gearman_test.go @@ -10,9 +10,7 @@ The protocol was implemented by native way. package gearman import ( - "fmt" "time" - "errors" "testing" "strings" "github.com/mikespook/gearman-go/client" @@ -21,7 +19,6 @@ import ( const( STR = "The gearman-go is a pure go implemented library." - UPPERSTR = "THE GEARMAN-GO IS A PURE GO IMPLEMENTED LIRBRARY." GEARMAND = "127.0.0.1:4730" ) @@ -51,6 +48,10 @@ func TestJobs(t *testing.T) { t.Error(err) return } + + w.ErrHandler = func(e error) { + t.Error(e) + } go w.Work() c, err := client.New(GEARMAND) @@ -61,29 +62,37 @@ func TestJobs(t *testing.T) { defer c.Close() c.ErrHandler = func(e error) { - panic(e) - } - - jobHandler := func(job *client.Job) { - if (string(job.Data) != UPPERSTR) { - panic(errors.New(fmt.Sprintf("%s expected, got %s", UPPERSTR, job.Data))) - } + t.Error(e) } { + jobHandler := func(job *client.Job) { + upper := strings.ToUpper(STR) + if (string(job.Data) != upper) { + t.Error("%s expected, got %s", []byte(upper), job.Data) + } + } + handle := c.Do("ToUpper", []byte(STR), client.JOB_NORMAL, jobHandler) - status := c.Status(handle) + status, err := c.Status(handle, time.Second) + if err != nil { + t.Error(err) + return + } if !status.Known { t.Errorf("%s should be known", status.Handle) return } } - { handle := c.DoBg("Sleep", nil, client.JOB_NORMAL) time.Sleep(time.Second) - status := c.Status(handle) + status, err := c.Status(handle, time.Second) + if err != nil { + t.Error(err) + return + } if !status.Known { t.Errorf("%s should be known", status.Handle) @@ -91,7 +100,23 @@ func TestJobs(t *testing.T) { } if !status.Running { - t.Errorf("%s shouldn be running", status.Handle) + t.Errorf("%s should be running", status.Handle) + } + } + { + status, err := c.Status("not exists handle", time.Second) + if err != nil { + t.Error(err) + return + } + + if status.Known { + t.Errorf("%s shouldn't be known", status.Handle) + return + } + + if status.Running { + t.Errorf("%s shouldn't be running", status.Handle) } } } diff --git a/worker/job.go b/worker/job.go index bbbe60b..01924d7 100644 --- a/worker/job.go +++ b/worker/job.go @@ -6,6 +6,7 @@ package worker import ( + "bytes" "strconv" "github.com/mikespook/gearman-go/common" ) @@ -13,7 +14,7 @@ import ( // Worker side job type Job struct { Data []byte - Handle, UniqueId string + Handle, UniqueId, Fn string agent *agent magicCode, DataType uint32 c chan bool @@ -24,8 +25,7 @@ func newJob(magiccode, datatype uint32, data []byte) (job *Job) { return &Job{magicCode: magiccode, DataType: datatype, Data: data, - c: make(chan bool), - } + c: make(chan bool),} } // Decode job from byte slice @@ -39,7 +39,25 @@ func decodeJob(data []byte) (job *Job, err error) { return nil, common.Errorf("Invalid data: %V", data) } data = data[12:] - job = newJob(common.RES, datatype, data) + job = &Job{magicCode: common.RES, DataType: datatype, c: make(chan bool),} + switch datatype { + case common.JOB_ASSIGN: + s := bytes.SplitN(data, []byte{'\x00'}, 3) + if len(s) == 3 { + job.Handle = string(s[0]) + job.Fn = string(s[1]) + data = s[2] + } + case common.JOB_ASSIGN_UNIQ: + s := bytes.SplitN(data, []byte{'\x00'}, 4) + if len(s) == 4 { + job.Handle = string(s[0]) + job.Fn = string(s[1]) + job.UniqueId = string(s[2]) + data = s[3] + } + } + job.Data = data return } diff --git a/worker/worker.go b/worker/worker.go index 5f26b46..7ad830d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -6,7 +6,6 @@ package worker import ( "time" - "bytes" "github.com/mikespook/gearman-go/common" ) @@ -258,24 +257,9 @@ func (worker *Worker) exec(job *Job) (err error) { } } } () - var limit int - if job.DataType == common.JOB_ASSIGN { - limit = 3 - } else { - limit = 4 - } - jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit) - job.Handle = string(jobdata[0]) - funcname := string(jobdata[1]) - if job.DataType == common.JOB_ASSIGN { - job.Data = jobdata[2] - } else { - job.UniqueId = string(jobdata[2]) - job.Data = jobdata[3] - } - f, ok := worker.funcs[funcname] + f, ok := worker.funcs[job.Fn] if !ok { - return common.Errorf("The function does not exist: %s", funcname) + return common.Errorf("The function does not exist: %s", job.Fn) } var r *result if f.timeout == 0 { @@ -333,7 +317,7 @@ func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) { case r = <-rslt: case <-time.After(timeout): go job.cancel() - return &result{err:common.ErrExecTimeOut} + return &result{err:common.ErrTimeOut} } return r }