From c8f2f5085c4e3700b4c35b63dc3d7f682e0d2624 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Sun, 22 Sep 2013 22:58:22 +0800 Subject: [PATCH] race is bad, how should we do? --- client/client.go | 32 +++++++++++++-------------- client/common.go | 48 ++++++++++++++++++++-------------------- example/client/client.go | 16 ++++++++------ 3 files changed, 49 insertions(+), 47 deletions(-) diff --git a/client/client.go b/client/client.go index 355b1e7..c426226 100644 --- a/client/client.go +++ b/client/client.go @@ -9,6 +9,7 @@ import ( "io" "net" "sync" + "fmt" ) /* @@ -114,6 +115,7 @@ func (client *Client) readLoop() { client.err(err) continue } + fmt.Printf("[%X]", data) client.in <- data } close(client.in) @@ -191,10 +193,6 @@ func (client *Client) handleInner(key string, resp *Response) { // Internal do func (client *Client) do(funcname string, data []byte, flag uint32) (handle string, err error) { - id := IdGen.Id() - req := getJob(id, []byte(funcname), data) - req.DataType = flag - client.write(req) var wg sync.WaitGroup wg.Add(1) client.mutex.RLock() @@ -208,6 +206,10 @@ func (client *Client) do(funcname string, data []byte, } handle = resp.Handle }) + id := IdGen.Id() + req := getJob(id, []byte(funcname), data) + req.DataType = flag + client.write(req) wg.Wait() return } @@ -260,10 +262,6 @@ func (client *Client) DoBg(funcname string, data []byte, // Get job status from job server. // !!!Not fully tested.!!! func (client *Client) Status(handle string) (status *Status, err error) { - req := getRequest() - req.DataType = GET_STATUS - req.Data = []byte(handle) - client.write(req) var wg sync.WaitGroup wg.Add(1) client.mutex.Lock() @@ -277,25 +275,27 @@ func (client *Client) Status(handle string) (status *Status, err error) { client.err(err) } }) + req := getRequest() + req.DataType = GET_STATUS + req.Data = []byte(handle) + client.write(req) wg.Wait() return } // Send a something out, get the samething back. func (client *Client) Echo(data []byte) (echo []byte, err error) { + var wg sync.WaitGroup + wg.Add(1) + client.innerHandler["e"] = ResponseHandler(func(resp *Response) { + defer wg.Done() + echo = resp.Data + }) req := getRequest() req.DataType = ECHO_REQ req.Data = data client.write(req) - var wg sync.WaitGroup - wg.Add(1) - client.mutex.Lock() client.lastcall = "e" - client.innerHandler["e"] = ResponseHandler(func(resp *Response) { - defer wg.Done() - defer client.mutex.Unlock() - echo = resp.Data - }) wg.Wait() return } diff --git a/client/common.go b/client/common.go index 4636bfb..cc40750 100644 --- a/client/common.go +++ b/client/common.go @@ -22,30 +22,30 @@ const ( RES_STR = "\x00RES" // package data type - CAN_DO = 1 - CANT_DO = 2 - RESET_ABILITIES = 3 - PRE_SLEEP = 4 - NOOP = 6 - JOB_CREATED = 8 - GRAB_JOB = 9 - NO_JOB = 10 - JOB_ASSIGN = 11 - WORK_STATUS = 12 - WORK_COMPLETE = 13 - WORK_FAIL = 14 - GET_STATUS = 15 - ECHO_REQ = 16 - ECHO_RES = 17 - ERROR = 19 - STATUS_RES = 20 - SET_CLIENT_ID = 22 - CAN_DO_TIMEOUT = 23 - WORK_EXCEPTION = 25 - WORK_DATA = 28 - WORK_WARNING = 29 - GRAB_JOB_UNIQ = 30 - JOB_ASSIGN_UNIQ = 31 + CAN_DO = 0x1 + CANT_DO = 0x2 + RESET_ABILITIES = 0x3 + PRE_SLEEP = 0x4 + NOOP = 0x6 + JOB_CREATED = 0x8 + GRAB_JOB = 0x9 + NO_JOB = 0xa + JOB_ASSIGN = 0xb + WORK_STATUS = 0xc + WORK_COMPLETE = 0xd + WORK_FAIL = 0xe + GET_STATUS = 0xf + ECHO_REQ = 0x10 + ECHO_RES = 0x11 + ERROR = 0x13 + STATUS_RES = 0x14 + SET_CLIENT_ID = 0x16 + CAN_DO_TIMEOUT = 0x17 + WORK_EXCEPTION = 0x19 + WORK_DATA = 0x1c + WORK_WARNING = 0x1d + GRAB_JOB_UNIQ = 0x1e + JOB_ASSIGN_UNIQ = 0x1f SUBMIT_JOB = 7 SUBMIT_JOB_BG = 18 diff --git a/example/client/client.go b/example/client/client.go index cce04dc..9161c57 100644 --- a/example/client/client.go +++ b/example/client/client.go @@ -3,7 +3,6 @@ package main import ( "log" "sync" - "time" "github.com/mikespook/gearman-go/client" ) @@ -14,29 +13,32 @@ func main() { // by implementing IdGenerator interface. // client.IdGen = client.NewAutoIncId() - c, err := client.New("127.0.0.1:4730") + c, err := client.New("tcp4", "127.0.0.1:4730") if err != nil { log.Fatalln(err) } defer c.Close() - c.ErrHandler = func(e error) { + c.ErrorHandler = func(e error) { log.Println(e) } echo := []byte("Hello\x00 world") wg.Add(1) - echomsg, err := c.Echo(echo, time.Second) + echomsg, err := c.Echo(echo) if err != nil { log.Fatalln(err) } log.Println(string(echomsg)) wg.Done() - jobHandler := func(job *client.Job) { + jobHandler := func(job *client.Response) { log.Printf("%s", job.Data) wg.Done() } - handle := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) + handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) + if err != nil { + log.Fatalln(err) + } wg.Add(1) - status, err := c.Status(handle, time.Second) + status, err := c.Status(handle) if err != nil { log.Fatalln(err) }