From e9c29799fbab17bdf4ff5d6a0841b8c0bc05a524 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Fri, 20 Dec 2013 15:24:22 +0800 Subject: [PATCH] refactoring worker side data package --- client/request.go | 2 +- example/client/client.go | 13 ++++++------- example/worker/worker.go | 25 ++++++++++--------------- worker/{response.go => inpack.go} | 12 ++++++------ worker/{request.go => outpack.go} | 8 ++++---- worker/worker.go | 1 - 6 files changed, 27 insertions(+), 34 deletions(-) rename worker/{response.go => inpack.go} (86%) rename worker/{request.go => outpack.go} (88%) diff --git a/client/request.go b/client/request.go index 485eecc..f906692 100644 --- a/client/request.go +++ b/client/request.go @@ -9,7 +9,7 @@ import ( "encoding/binary" ) -// request +// Request from client type request struct { DataType uint32 Data []byte diff --git a/example/client/client.go b/example/client/client.go index cce04dc..bde0532 100644 --- a/example/client/client.go +++ b/example/client/client.go @@ -3,7 +3,6 @@ package main import ( "log" "sync" - "time" "github.com/mikespook/gearman-go/client" ) @@ -14,29 +13,29 @@ func main() { // by implementing IdGenerator interface. // client.IdGen = client.NewAutoIncId() - c, err := client.New("127.0.0.1:4730") + c, err := client.New("tcp4", "127.0.0.1:4730") if err != nil { log.Fatalln(err) } defer c.Close() - c.ErrHandler = func(e error) { + c.ErrorHandler = func(e error) { log.Println(e) } echo := []byte("Hello\x00 world") wg.Add(1) - echomsg, err := c.Echo(echo, time.Second) + echomsg, err := c.Echo(echo) if err != nil { log.Fatalln(err) } log.Println(string(echomsg)) wg.Done() - jobHandler := func(job *client.Job) { + jobHandler := func(job *client.Response) { log.Printf("%s", job.Data) wg.Done() } - handle := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) + handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) wg.Add(1) - status, err := c.Status(handle, time.Second) + status, err := c.Status(handle) if err != nil { log.Fatalln(err) } diff --git a/example/worker/worker.go b/example/worker/worker.go index bcb7154..f31b5fe 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -9,29 +9,25 @@ import ( "github.com/mikespook/gearman-go/worker" ) -func ToUpper(job *worker.Job) ([]byte, error) { - log.Printf("ToUpper: Handle=[%s]; UID=[%s], Data=[%s]\n", - job.Handle, job.UniqueId, job.Data) - data := []byte(strings.ToUpper(string(job.Data))) +func ToUpper(job worker.Job) ([]byte, error) { + log.Printf("ToUpper: Data=[%s]\n", job.Data()) + data := []byte(strings.ToUpper(string(job.Data()))) return data, nil } -func ToUpperDelay10(job *worker.Job) ([]byte, error) { - log.Printf("ToUpperDelay10: Handle=[%s]; UID=[%s], Data=[%s]\n", - job.Handle, job.UniqueId, job.Data) +func ToUpperDelay10(job worker.Job) ([]byte, error) { + log.Printf("ToUpper: Data=[%s]\n", job.Data()) time.Sleep(10 * time.Second) - data := []byte(strings.ToUpper(string(job.Data))) + data := []byte(strings.ToUpper(string(job.Data()))) return data, nil } - - func main() { log.Println("Starting ...") defer log.Println("Shutdown complete!") w := worker.New(worker.Unlimited) defer w.Close() - w.ErrHandler = func(e error) { + w.ErrorHandler = func(e error) { log.Println(e) if e == worker.ErrConnection { proc, err := os.FindProcess(os.Getpid()) @@ -43,12 +39,11 @@ func main() { } } } - w.JobHandler = func(job *worker.Job) error { - log.Printf("H=%s, UID=%s, Data=%s, DataType=%d\n", job.Handle, - job.UniqueId, job.Data, job.DataType) + w.JobHandler = func(job worker.Job) error { + log.Printf("Data=%s\n", job.Data()) return nil } - w.AddServer("127.0.0.1:4730") + w.AddServer("tcp4", "127.0.0.1:4730") w.AddFunc("ToUpper", ToUpper, worker.Immediately) w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) diff --git a/worker/response.go b/worker/inpack.go similarity index 86% rename from worker/response.go rename to worker/inpack.go index d528474..6eb03d1 100644 --- a/worker/response.go +++ b/worker/inpack.go @@ -12,20 +12,20 @@ import ( ) // Worker side job -type Response struct { +type InPack struct { DataType uint32 Data []byte Handle, UniqueId, Fn string - agentId string + a *agent } // Create a new job -func getResponse() (resp *Response) { - return &Response{} +func getInPack() (resp *InPack) { + return &InPack{} } // Decode job from byte slice -func decodeResponse(data []byte) (resp *Response, l int, err error) { +func decodeInPack(data []byte) (resp *InPack, l int, err error) { if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes err = fmt.Errorf("Invalid data: %V", data) return @@ -36,7 +36,7 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) { err = fmt.Errorf("Invalid data: %V", data) return } - resp = getResponse() + resp = getInPack() resp.DataType = binary.BigEndian.Uint32(data[4:8]) switch resp.DataType { case JOB_ASSIGN: diff --git a/worker/request.go b/worker/outpack.go similarity index 88% rename from worker/request.go rename to worker/outpack.go index 5539ff9..17dce2b 100644 --- a/worker/request.go +++ b/worker/outpack.go @@ -10,19 +10,19 @@ import ( ) // Worker side job -type request struct { +type OutPack struct { DataType uint32 Data []byte Handle, UniqueId, Fn string } -func getRequest() (req *request) { +func getOutPack() (req *OutPack) { // TODO pool - return &request{} + return &OutPack{} } // Encode a job to byte slice -func (req *request) Encode() (data []byte) { +func (req *OutPack) Encode() (data []byte) { var l int if req.DataType == WORK_FAIL { l = len(req.Handle) diff --git a/worker/worker.go b/worker/worker.go index 82cc18b..ed9dfc9 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -183,7 +183,6 @@ func (worker *Worker) Work() { } var resp *Response for resp = range worker.in { - fmt.Println(resp) go worker.dealResp(resp) } }