diff --git a/README.md b/README.md index 891ff69..c9a341a 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ Usage ## Client - c, err := client.New("127.0.0.1:4730") + c, err := client.New("tcp4", "127.0.0.1:4730") // ... defer c.Close() data := []byte("Hello\x00 world") @@ -52,6 +52,17 @@ Usage handle := c.Do("ToUpper", data, client.JOB_NORMAL, jobHandler) // ... +Branches +======== + +Version 0.x means: _It is far far away from stable._ + +__Use at your own risk!__ + + * 0.1-testing Old API and some known issues, eg. [issue-14](https://github.com/mikespook/gearman-go/issues/14) + * 0.2-dev Refactoring a lot of things + * master current usable version + Authors ======= diff --git a/client/client.go b/client/client.go index f2d6efd..cf4da57 100644 --- a/client/client.go +++ b/client/client.go @@ -6,10 +6,10 @@ package client import ( - "github.com/mikespook/golib/idgen" "io" "net" "sync" + "github.com/mikespook/golib/idgen" ) /* @@ -21,15 +21,15 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) */ type Client struct { - net, addr string - respHandler map[string]ResponseHandler - innerHandler map[string]ResponseHandler - in chan []byte - isConn bool - conn net.Conn - mutex sync.RWMutex - ErrorHandler ErrorHandler + net, addr, lastcall string + respHandler map[string]ResponseHandler + innerHandler map[string]ResponseHandler + in chan []byte + isConn bool + conn net.Conn + mutex sync.RWMutex + ErrorHandler ErrorHandler IdGen idgen.IdGen } @@ -39,12 +39,12 @@ type Client struct { // client, err := client.New("127.0.0.1:4730") func New(net, addr string) (client *Client, err error) { client = &Client{ - net: net, - addr: addr, - respHandler: make(map[string]ResponseHandler, QUEUE_SIZE), + net: net, + addr: addr, + respHandler: make(map[string]ResponseHandler, QUEUE_SIZE), innerHandler: make(map[string]ResponseHandler, QUEUE_SIZE), - in: make(chan []byte, QUEUE_SIZE), - IdGen: idgen.NewObjectId(), + in: make(chan []byte, QUEUE_SIZE), + IdGen: idgen.NewObjectId(), } if err = client.connect(); err != nil { return @@ -126,10 +126,10 @@ func (client *Client) processLoop() { var err error var data, leftdata []byte for data = range client.in { - l = len(data) if len(leftdata) > 0 { // some data left for processing data = append(leftdata, data...) } + l = len(data) if l < MIN_PACKET_LEN { // not enough data leftdata = data continue @@ -138,13 +138,21 @@ func (client *Client) processLoop() { client.err(err) continue } + leftdata = nil switch resp.DataType { + case ERROR: + if client.lastcall != "" { + client.handleInner(client.lastcall, resp) + client.lastcall = "" + } else { + client.err(GetError(resp.Data)) + } case STATUS_RES: - client.handleInner("status-" + resp.Handle, resp) + client.handleInner("s"+resp.Handle, resp) case JOB_CREATED: - client.handleInner("created", resp) + client.handleInner("c", resp) case ECHO_RES: - client.handleInner("echo", resp) + client.handleInner("e", resp) case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: client.handleResponse(resp.Handle, resp) @@ -182,16 +190,22 @@ func (client *Client) handleInner(key string, resp *Response) { // Internal do func (client *Client) do(funcname string, data []byte, - flag uint32) (handle string) { + flag uint32) (handle string, err error) { id := client.IdGen.Id().(string) - req := getJob(funcname, id, data) + req := getJob(id, []byte(funcname), data) + req.DataType = flag client.write(req) var wg sync.WaitGroup wg.Add(1) client.mutex.RLock() - client.innerHandler["created"] = ResponseHandler(func(resp *Response) { + client.lastcall = "c" + client.innerHandler["c"] = ResponseHandler(func(resp *Response) { defer wg.Done() defer client.mutex.RUnlock() + if resp.DataType == ERROR { + err = GetError(resp.Data) + return + } handle = resp.Handle }) wg.Wait() @@ -205,7 +219,7 @@ func (client *Client) do(funcname string, data []byte, // data is encoding to byte array. // flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH func (client *Client) Do(funcname string, data []byte, - flag byte, h ResponseHandler) (handle string) { + flag byte, h ResponseHandler) (handle string, err error) { var datatype uint32 switch flag { case JOB_LOW: @@ -215,7 +229,7 @@ func (client *Client) Do(funcname string, data []byte, default: datatype = SUBMIT_JOB } - handle = client.do(funcname, data, datatype) + handle, err = client.do(funcname, data, datatype) client.mutex.Lock() defer client.mutex.Unlock() if h != nil { @@ -229,7 +243,7 @@ func (client *Client) Do(funcname string, data []byte, // data is encoding to byte array. // flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH func (client *Client) DoBg(funcname string, data []byte, - flag byte) (handle string) { + flag byte) (handle string, err error) { var datatype uint32 switch flag { case JOB_LOW: @@ -239,13 +253,13 @@ func (client *Client) DoBg(funcname string, data []byte, default: datatype = SUBMIT_JOB_BG } - handle = client.do(funcname, data, datatype) + handle, err = client.do(funcname, data, datatype) return } // Get job status from job server. // !!!Not fully tested.!!! -func (client *Client) Status(handle string, h ResponseHandler) (status *Status, err error) { +func (client *Client) Status(handle string) (status *Status, err error) { req := getRequest() req.DataType = GET_STATUS req.Data = []byte(handle) @@ -253,7 +267,8 @@ func (client *Client) Status(handle string, h ResponseHandler) (status *Status, var wg sync.WaitGroup wg.Add(1) client.mutex.Lock() - client.innerHandler["status-" + handle] = ResponseHandler(func(resp *Response) { + client.lastcall = "s" + handle + client.innerHandler["s" + handle] = ResponseHandler(func(resp *Response) { defer wg.Done() defer client.mutex.Unlock() var err error @@ -275,7 +290,8 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) { var wg sync.WaitGroup wg.Add(1) client.mutex.Lock() - client.innerHandler["echo"] = ResponseHandler(func(resp *Response) { + client.lastcall = "e" + client.innerHandler["e"] = ResponseHandler(func(resp *Response) { defer wg.Done() defer client.mutex.Unlock() echo = resp.Data diff --git a/client/client_test.go b/client/client_test.go index c877930..1a8c13c 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,15 +1,14 @@ package client import ( - "fmt" "testing" ) -var client *Client +const ( + TestStr = "Hello world" +) -func printHandle(resp *Response) { - fmt.Printf("%V", resp) -} +var client *Client func TestClientAddServer(t *testing.T) { t.Log("Add local server 127.0.0.1:4730") @@ -24,17 +23,23 @@ func TestClientAddServer(t *testing.T) { } func TestClientEcho(t *testing.T) { - err := client.Echo([]byte("Hello world"), printHandle) + echo, err := client.Echo([]byte(TestStr)) if err != nil { t.Error(err) return } + if string(echo) != TestStr { + t.Errorf("Echo error, %s expected, %s got", TestStr, echo) + return + } } func TestClientDoBg(t *testing.T) { if handle := client.DoBg("ToUpper", []byte("abcdef"), JOB_LOW); handle == "" { t.Error("Handle is empty.") + } else { + t.Log(handle) } } @@ -57,38 +62,34 @@ func TestClientDo(t *testing.T) { } func TestClientStatus(t *testing.T) { - - err := client.Status("handle not exists", printHandle) + status, err := client.Status("handle not exists") if err != nil { t.Error(err) return } - /* - if s1.Known { - t.Errorf("The job (%s) shouldn't be known.", s1.Handle) - return - } - if s1.Running { - t.Errorf("The job (%s) shouldn't be running.", s1.Handle) - return - }*/ + if status.Known { + t.Errorf("The job (%s) shouldn't be known.", status.Handle) + return + } + if status.Running { + t.Errorf("The job (%s) shouldn't be running.", status.Handle) + return + } handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil) - err = client.Status(handle, printHandle) - /* - if err != nil { - t.Error(err) - return - } - if !s2.Known { - t.Errorf("The job (%s) should be known.", s2.Handle) - return - } - if s2.Running { - t.Errorf("The job (%s) shouldn't be running.", s2.Handle) - return - } - */ + status, err = client.Status(handle) + if err != nil { + t.Error(err) + return + } + if !status.Known { + t.Errorf("The job (%s) should be known.", status.Handle) + return + } + if status.Running { + t.Errorf("The job (%s) shouldn't be running.", status.Handle) + return + } } func TestClientClose(t *testing.T) { diff --git a/client/error.go b/client/error.go index d6be69b..5600158 100644 --- a/client/error.go +++ b/client/error.go @@ -6,11 +6,9 @@ package client import ( + "fmt" "bytes" "errors" - "fmt" - "strconv" - "syscall" ) var ( @@ -33,17 +31,12 @@ var ( func DisablePanic() { recover() } // Extract the error message -func GetError(data []byte) (eno syscall.Errno, err error) { +func GetError(data []byte) (err error) { rel := bytes.SplitN(data, []byte{'\x00'}, 2) if len(rel) != 2 { err = fmt.Errorf("Not a error data: %V", data) return } - var n uint64 - if n, err = strconv.ParseUint(string(rel[0]), 10, 0); err != nil { - return - } - eno = syscall.Errno(n) - err = errors.New(string(rel[1])) + err = errors.New(fmt.Sprintf("%s: %s", rel[0], rel[1])) return } diff --git a/client/pool.go b/client/pool.go index ddc2ac6..9c82a3d 100644 --- a/client/pool.go +++ b/client/pool.go @@ -94,26 +94,26 @@ func (pool *Pool) Remove(addr string) { } func (pool *Pool) Do(funcname string, data []byte, - flag byte, h ResponseHandler) (addr, handle string) { + flag byte, h ResponseHandler) (addr, handle string, err error) { client := pool.selectServer() - handle = client.Do(funcname, data, flag, h) + handle, err = client.Do(funcname, data, flag, h) addr = client.addr return } func (pool *Pool) DoBg(funcname string, data []byte, - flag byte) (addr, handle string) { + flag byte) (addr, handle string, err error) { client := pool.selectServer() - handle = client.DoBg(funcname, data, flag) + handle, err = client.DoBg(funcname, data, flag) addr = client.addr return } // Get job status from job server. // !!!Not fully tested.!!! -func (pool *Pool) Status(addr, handle string, h ResponseHandler) (status *Status, err error) { +func (pool *Pool) Status(addr, handle string) (status *Status, err error) { if client, ok := pool.clients[addr]; ok { - status, err = client.Status(handle, h) + status, err = client.Status(handle) } else { err = ErrNotFound } @@ -121,7 +121,7 @@ func (pool *Pool) Status(addr, handle string, h ResponseHandler) (status *Status } // Send a something out, get the samething back. -func (pool *Pool) Echo(addr string, data []byte, h ResponseHandler) (echo []byte, err error) { +func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { var client *poolClient if addr == "" { client = pool.selectServer() diff --git a/client/pool_test.go b/client/pool_test.go index f96fbc1..a44133e 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -13,7 +13,7 @@ func TestPoolAdd(t *testing.T) { if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil { t.Error(err) } - if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil { + if err := pool.Add("tcp4", "127.0.1.1:4730", 1); err != nil { t.Error(err) } if len(pool.clients) != 2 { @@ -22,17 +22,17 @@ func TestPoolAdd(t *testing.T) { } func TestPoolEcho(t *testing.T) { - echo, err := pool.Echo("", []byte("Hello pool"), printHandle) + echo, err := pool.Echo("", []byte(TestStr)) if err != nil { t.Error(err) return } - if string(echo) != "Hello pool" { + if string(echo) != TestStr { t.Errorf("Invalid echo data: %s", echo) return } - _, err = pool.Echo("not exists", []byte("Hello pool"), printHandle) + _, err = pool.Echo("not exists", []byte(TestStr)) if err != ErrNotFound { t.Errorf("ErrNotFound expected, got %s", err) } @@ -66,36 +66,34 @@ func TestPoolDo(t *testing.T) { } func TestPoolStatus(t *testing.T) { - err := pool.Status("127.0.0.1:4730", "handle not exists", printHandle) + status, err := pool.Status("127.0.0.1:4730", "handle not exists") if err != nil { t.Error(err) return } - /* - if s1.Known { - t.Errorf("The job (%s) shouldn't be known.", s1.Handle) - } - if s1.Running { - t.Errorf("The job (%s) shouldn't be running.", s1.Handle) - } - */ + if status.Known { + t.Errorf("The job (%s) shouldn't be known.", status.Handle) + } + if status.Running { + t.Errorf("The job (%s) shouldn't be running.", status.Handle) + } addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil) - err = pool.Status(addr, handle, printHandle) + status, err = pool.Status(addr, handle) if err != nil { t.Error(err) return } - /* - if !s2.Known { - t.Errorf("The job (%s) should be known.", s2.Handle) - } - if s2.Running { - t.Errorf("The job (%s) shouldn't be running.", s2.Handle) - } - */ - err = pool.Status("not exists", "not exists", printHandle) + + if !status.Known { + t.Errorf("The job (%s) should be known.", status.Handle) + } + if status.Running { + t.Errorf("The job (%s) shouldn't be running.", status.Handle) + } + status, err = pool.Status("not exists", "not exists") if err != ErrNotFound { t.Error(err) + return } } diff --git a/client/request.go b/client/request.go index f5f62ce..9939a5f 100644 --- a/client/request.go +++ b/client/request.go @@ -33,7 +33,7 @@ func getRequest() (req *request) { return } -func getJob(funcname, id string, data []byte) (req *request) { +func getJob(id string, funcname, data []byte) (req *request) { req = getRequest() a := len(funcname) b := len(id) diff --git a/client/response.go b/client/response.go index 5d41fc3..89076f0 100644 --- a/client/response.go +++ b/client/response.go @@ -6,17 +6,17 @@ package client import ( - "bytes" - "encoding/binary" "fmt" + "bytes" "strconv" + "encoding/binary" ) // response type Response struct { DataType uint32 - Data []byte - UID, Handle string + Data, UID []byte + Handle string } // Extract the Response's result. @@ -82,9 +82,9 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) { resp = getResponse() resp.DataType = binary.BigEndian.Uint32(data[4:8]) switch resp.DataType { - case ECHO_RES: - resp.Data = dt - case WORK_DATA, WORK_WARNING, WORK_STATUS, + case JOB_CREATED: + resp.Handle = string(dt) + case STATUS_RES, WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: s := bytes.SplitN(dt, []byte{'\x00'}, 2) if len(s) >= 2 { @@ -94,8 +94,12 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) { err = fmt.Errorf("Invalid data: %V", data) return } + case ECHO_RES: + fallthrough + default: + resp.Data = dt } - l = len(resp.Data) + MIN_PACKET_LEN + l = dl + MIN_PACKET_LEN return } @@ -109,23 +113,23 @@ func (resp *Response) IsStatus() bool { // status handler func (resp *Response) Status() (status *Status, err error) { - data := bytes.SplitN(resp.Data, []byte{'\x00'}, 5) - if len(data) != 5 { + data := bytes.SplitN(resp.Data, []byte{'\x00'}, 4) + if len(data) != 4 { err = fmt.Errorf("Invalid data: %V", resp.Data) return } status = &Status{} - status.Handle = data[0] - status.Known = (data[1][0] == '1') - status.Running = (data[2][0] == '1') - status.Numerator, err = strconv.ParseUint(string(data[3]), 10, 0) + status.Handle = resp.Handle + status.Known = (data[0][0] == '1') + status.Running = (data[1][0] == '1') + status.Numerator, err = strconv.ParseUint(string(data[2]), 10, 0) if err != nil { - err = fmt.Errorf("Invalid Integer: %s", data[3]) + err = fmt.Errorf("Invalid Integer: %s", data[2]) return } - status.Denominator, err = strconv.ParseUint(string(data[4]), 10, 0) + status.Denominator, err = strconv.ParseUint(string(data[3]), 10, 0) if err != nil { - err = fmt.Errorf("Invalid Integer: %s", data[4]) + err = fmt.Errorf("Invalid Integer: %s", data[3]) return } return diff --git a/client/status.go b/client/status.go index f656203..4370c33 100644 --- a/client/status.go +++ b/client/status.go @@ -1,7 +1,7 @@ package client type Status struct { - Handle []byte + Handle string Known, Running bool Numerator, Denominator uint64 } diff --git a/gearman_test.go b/gearman_test.go index 9e35b1c..58d0303 100644 --- a/gearman_test.go +++ b/gearman_test.go @@ -41,6 +41,7 @@ func TestJobs(t *testing.T) { return } defer w.Close() + t.Log("Servers added...") if err := w.AddFunc("ToUpper", ToUpper, 0); err != nil { t.Error(err) return @@ -49,38 +50,42 @@ func TestJobs(t *testing.T) { t.Error(err) return } - + t.Log("Functions added...") w.ErrHandler = func(e error) { t.Error(e) } go w.Work() + t.Log("Worker is running...") - c, err := client.New(GEARMAND) + c, err := client.New("tcp4", GEARMAND) if err != nil { t.Error(err) return } defer c.Close() - c.ErrHandler = func(e error) { -// t.Error(e) + c.ErrorHandler = func(e error) { t.Log(e) } { var w sync.WaitGroup - jobHandler := func(job *client.Job) { + jobHandler := func(job *client.Response) { upper := strings.ToUpper(STR) if (string(job.Data) != upper) { - t.Error("%s expected, got %s", []byte(upper), job.Data) + t.Errorf("%s expected, got %s", upper, job.Data) } w.Done() } w.Add(1) - handle := c.Do("ToUpper", []byte(STR), client.JOB_NORMAL, jobHandler) + handle, err := c.Do("ToUpper", []byte(STR), client.JOB_NORMAL, jobHandler) + if err != nil { + t.Error(err) + return + } w.Wait() - status, err := c.Status(handle, time.Second) + status, err := c.Status(handle) if err != nil { t.Error(err) return @@ -95,9 +100,13 @@ func TestJobs(t *testing.T) { } } { - handle := c.DoBg("Sleep", nil, client.JOB_NORMAL) + handle, err := c.DoBg("Sleep", nil, client.JOB_NORMAL) + if err != nil { + t.Error(err) + return + } time.Sleep(time.Second) - status, err := c.Status(handle, time.Second) + status, err := c.Status(handle) if err != nil { t.Error(err) return @@ -113,7 +122,7 @@ func TestJobs(t *testing.T) { } } { - status, err := c.Status("not exists handle", time.Second) + status, err := c.Status("not exists handle") if err != nil { t.Error(err) return