diff --git a/client/client.go b/client/client.go index 082efb4..e7e6444 100644 --- a/client/client.go +++ b/client/client.go @@ -3,10 +3,10 @@ package client import ( + "bufio" "io" "net" "sync" - "bufio" ) // One client connect to one server. @@ -20,7 +20,7 @@ type Client struct { in chan *Response isConn bool conn net.Conn - rw *bufio.ReadWriter + rw *bufio.ReadWriter ErrorHandler ErrorHandler } diff --git a/client/common.go b/client/common.go index 89ec4c4..d28caff 100644 --- a/client/common.go +++ b/client/common.go @@ -50,11 +50,11 @@ const ( dtSubmitJobLow = 33 dtSubmitJobLowBg = 34 - WorkComplate = dtWorkComplete - WorkDate = dtWorkData - WorkStatus = dtWorkStatus - WorkWarning = dtWorkWarning - WorkFail = dtWorkFail + WorkComplate = dtWorkComplete + WorkDate = dtWorkData + WorkStatus = dtWorkStatus + WorkWarning = dtWorkWarning + WorkFail = dtWorkFail WorkException = dtWorkException ) diff --git a/client/pool.go b/client/pool.go index 1204908..d5db727 100644 --- a/client/pool.go +++ b/client/pool.go @@ -11,9 +11,9 @@ const ( ) var ( - ErrNotFound = errors.New("Server Not Found") + ErrNotFound = errors.New("Server Not Found") SelectWithRate = selectWithRate - SelectRandom = selectRandom + SelectRandom = selectRandom ) type poolClient struct { diff --git a/client/response.go b/client/response.go index c215cb1..1fbc449 100644 --- a/client/response.go +++ b/client/response.go @@ -61,7 +61,7 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) { return } dl := int(binary.BigEndian.Uint32(data[8:12])) - if a < minPacketLength + dl { + if a < minPacketLength+dl { err = fmt.Errorf("Invalid data: %V", data) return } diff --git a/example/client/client.go b/example/client/client.go index f84178e..67c9e0a 100644 --- a/example/client/client.go +++ b/example/client/client.go @@ -7,7 +7,6 @@ import ( ) func main() { - var wg sync.WaitGroup // Set the autoinc id generator // You can write your own id generator // by implementing IdGenerator interface. @@ -22,27 +21,58 @@ func main() { log.Println(e) } echo := []byte("Hello\x00 world") - wg.Add(1) echomsg, err := c.Echo(echo) if err != nil { log.Fatalln(err) } log.Println(string(echomsg)) - wg.Done() jobHandler := func(resp *client.Response) { - log.Printf("%s", resp.Data) - wg.Done() + switch resp.DataType { + case client.WorkException: + fallthrough + case client.WorkFail: + fallthrough + case client.WorkComplate: + if data, err := resp.Result(); err == nil { + log.Printf("RESULT: %V\n", data) + } else { + log.Printf("RESULT: %s\n", err) + } + case client.WorkWarning: + fallthrough + case client.WorkDate: + if data, err := resp.Update(); err == nil { + log.Printf("UPDATE: %V\n", data) + } else { + log.Printf("UPDATE: %V, %s\n", data, err) + } + case client.WorkStatus: + if data, err := resp.Status(); err == nil { + log.Printf("STATUS: %V\n", data) + } else { + log.Printf("STATUS: %s\n", err) + } + default: + log.Printf("UNKNOWN: %V", resp.Data) + } } handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler) if err != nil { log.Fatalln(err) } - wg.Add(1) status, err := c.Status(handle) if err != nil { log.Fatalln(err) } log.Printf("%t", status) - wg.Wait() + _, err = c.Do("Foobar", echo, client.JobNormal, jobHandler) + if err != nil { + log.Fatalln(err) + } + + log.Println("Press Ctrl-C to exit ...") + var mutex sync.Mutex + mutex.Lock() + mutex.Lock() } diff --git a/example/worker/worker.go b/example/worker/worker.go index 2fc6103..6b07e06 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -22,6 +22,16 @@ func ToUpperDelay10(job worker.Job) ([]byte, error) { return data, nil } +func Foobar(job worker.Job) ([]byte, error) { + log.Printf("Foobar: Data=[%s]\n", job.Data()) + for i := 0; i < 10; i++ { + job.SendWarning([]byte{byte(i)}) + job.SendData([]byte{byte(i)}) + job.UpdateStatus(i+1, 100) + } + return job.Data(), nil +} + func main() { log.Println("Starting ...") defer log.Println("Shutdown complete!") @@ -44,11 +54,12 @@ func main() { return nil } w.AddServer("tcp4", "127.0.0.1:4730") - w.AddFunc("ToUpper", ToUpper, worker.Immediately) + w.AddFunc("Foobar", Foobar, worker.Unlimited) + w.AddFunc("ToUpper", ToUpper, worker.Unlimited) w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) - w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately) - w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately) + w.AddFunc("SysInfo", worker.SysInfo, worker.Unlimited) + w.AddFunc("MemInfo", worker.MemInfo, worker.Unlimited) if err := w.Ready(); err != nil { log.Fatal(err) return diff --git a/worker/agent.go b/worker/agent.go index 9c6cf26..3d5e941 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -1,18 +1,18 @@ package worker import ( + "bufio" "io" "net" "strings" "sync" - "bufio" ) // The agent of job server. type agent struct { sync.Mutex conn net.Conn - rw *bufio.ReadWriter + rw *bufio.ReadWriter worker *Worker in chan []byte net, addr string diff --git a/worker/inpack_test.go b/worker/inpack_test.go index f852a04..7ce36fe 100644 --- a/worker/inpack_test.go +++ b/worker/inpack_test.go @@ -62,12 +62,12 @@ func TestInPack(t *testing.T) { } func BenchmarkDecode(b *testing.B) { - for i := 0; i < b.N; i++ { + for i := 0; i < b.N; i++ { for _, v := range inpackcases { _, _, err := decodeInPack([]byte(v["src"])) if err != nil { b.Error(err) } } - } + } } diff --git a/worker/outpack_test.go b/worker/outpack_test.go index 6ae9423..2dbb4b2 100644 --- a/worker/outpack_test.go +++ b/worker/outpack_test.go @@ -83,7 +83,7 @@ func TestOutPack(t *testing.T) { } func BenchmarkEncode(b *testing.B) { - for i := 0; i < b.N; i++ { + for i := 0; i < b.N; i++ { for k, v := range outpackcases { outpack := getOutPack() outpack.dataType = k @@ -95,5 +95,5 @@ func BenchmarkEncode(b *testing.B) { } outpack.Encode() } - } + } }