From 4997e30a7742c84f7cb5e127176cddf676568414 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Thu, 29 Aug 2013 18:08:05 +0800 Subject: [PATCH] runabled 0.2 --- client/client.go | 107 +++++++++++++++++++++++++++--------------- client/client_test.go | 65 +++++++++++++------------ client/common_test.go | 48 ------------------- client/error.go | 2 +- client/handler.go | 3 +- client/id.go | 35 -------------- client/pool.go | 12 ++--- client/pool_test.go | 48 +++++++++---------- client/request.go | 10 ++-- client/response.go | 47 +++++++++++-------- 10 files changed, 170 insertions(+), 207 deletions(-) delete mode 100644 client/common_test.go delete mode 100644 client/id.go diff --git a/client/client.go b/client/client.go index 2bd209f..f2d6efd 100644 --- a/client/client.go +++ b/client/client.go @@ -6,10 +6,10 @@ package client import ( + "github.com/mikespook/golib/idgen" "io" "net" "sync" - "github.com/mikespook/golib/idgen" ) /* @@ -21,14 +21,14 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) */ type Client struct { - net, addr string - respHandler map[string]ResponseHandler - createdHandler ResponseHandler - in chan []byte - isConn bool - conn net.Conn - mutex sync.RWMutex - ErrorHandler ErrorHandler + net, addr string + respHandler map[string]ResponseHandler + innerHandler map[string]ResponseHandler + in chan []byte + isConn bool + conn net.Conn + mutex sync.RWMutex + ErrorHandler ErrorHandler IdGen idgen.IdGen } @@ -39,10 +39,12 @@ type Client struct { // client, err := client.New("127.0.0.1:4730") func New(net, addr string) (client *Client, err error) { client = &Client{ - net: net, - addr: addr, + net: net, + addr: addr, respHandler: make(map[string]ResponseHandler, QUEUE_SIZE), - in: make(chan []byte, QUEUE_SIZE), + innerHandler: make(map[string]ResponseHandler, QUEUE_SIZE), + in: make(chan []byte, QUEUE_SIZE), + IdGen: idgen.NewObjectId(), } if err = client.connect(); err != nil { return @@ -119,14 +121,14 @@ func (client *Client) readLoop() { // decode data & process it func (client *Client) processLoop() { - var resp *response + var resp *Response var l int var err error var data, leftdata []byte for data = range client.in { l = len(data) if len(leftdata) > 0 { // some data left for processing - data = append(leftdata, data ...) + data = append(leftdata, data...) } if l < MIN_PACKET_LEN { // not enough data leftdata = data @@ -137,9 +139,15 @@ func (client *Client) processLoop() { continue } switch resp.DataType { - case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, -WORK_FAIL, WORK_EXCEPTION: - client.handleResponse(string(resp.Handle), resp) + case STATUS_RES: + client.handleInner("status-" + resp.Handle, resp) + case JOB_CREATED: + client.handleInner("created", resp) + case ECHO_RES: + client.handleInner("echo", resp) + case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, + WORK_FAIL, WORK_EXCEPTION: + client.handleResponse(resp.Handle, resp) } if len(data) > l { leftdata = data[l:] @@ -155,28 +163,37 @@ func (client *Client) err(e error) { } // job handler -func (client *Client) handleResponse(key string, resp *response) { +func (client *Client) handleResponse(key string, resp *Response) { client.mutex.RLock() defer client.mutex.RUnlock() if h, ok := client.respHandler[key]; ok { h(resp) - delete(client.respHandler, string(resp.Handle)) + delete(client.respHandler, key) + } +} + +// job handler +func (client *Client) handleInner(key string, resp *Response) { + if h, ok := client.innerHandler[key]; ok { + h(resp) + delete(client.innerHandler, key) } } // Internal do func (client *Client) do(funcname string, data []byte, - flag uint32) (handle []byte) { - req := getJob(funcname, client.IdGen.Id().(string), data) - client.mutex.Lock() - defer client.mutex.Unlock() + flag uint32) (handle string) { + id := client.IdGen.Id().(string) + req := getJob(funcname, id, data) client.write(req) var wg sync.WaitGroup wg.Add(1) - client.createdHandler = func(resp *response) { + client.mutex.RLock() + client.innerHandler["created"] = ResponseHandler(func(resp *Response) { defer wg.Done() + defer client.mutex.RUnlock() handle = resp.Handle - } + }) wg.Wait() return } @@ -188,7 +205,7 @@ func (client *Client) do(funcname string, data []byte, // data is encoding to byte array. // flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH func (client *Client) Do(funcname string, data []byte, - flag byte, h ResponseHandler) (handle []byte) { + flag byte, h ResponseHandler) (handle string) { var datatype uint32 switch flag { case JOB_LOW: @@ -202,7 +219,7 @@ func (client *Client) Do(funcname string, data []byte, client.mutex.Lock() defer client.mutex.Unlock() if h != nil { - client.respHandler[string(handle)] = h + client.respHandler[handle] = h } return } @@ -212,7 +229,7 @@ func (client *Client) Do(funcname string, data []byte, // data is encoding to byte array. // flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH func (client *Client) DoBg(funcname string, data []byte, - flag byte) (handle []byte) { + flag byte) (handle string) { var datatype uint32 switch flag { case JOB_LOW: @@ -228,26 +245,42 @@ func (client *Client) DoBg(funcname string, data []byte, // Get job status from job server. // !!!Not fully tested.!!! -func (client *Client) Status(handle []byte, h ResponseHandler) (err error) { +func (client *Client) Status(handle string, h ResponseHandler) (status *Status, err error) { req := getRequest() req.DataType = GET_STATUS - req.Data = handle + req.Data = []byte(handle) client.write(req) - if h != nil { - client.respHandler["status-" + string(handle)] = h - } + var wg sync.WaitGroup + wg.Add(1) + client.mutex.Lock() + client.innerHandler["status-" + handle] = ResponseHandler(func(resp *Response) { + defer wg.Done() + defer client.mutex.Unlock() + var err error + status, err = resp.Status() + if err != nil { + client.err(err) + } + }) + wg.Wait() return } // Send a something out, get the samething back. -func (client *Client) Echo(data []byte, h ResponseHandler) (err error) { +func (client *Client) Echo(data []byte) (echo []byte, err error) { req := getRequest() req.DataType = ECHO_REQ req.Data = data client.write(req) - if h != nil { - client.respHandler["echo"] = h - } + var wg sync.WaitGroup + wg.Add(1) + client.mutex.Lock() + client.innerHandler["echo"] = ResponseHandler(func(resp *Response) { + defer wg.Done() + defer client.mutex.Unlock() + echo = resp.Data + }) + wg.Wait() return } diff --git a/client/client_test.go b/client/client_test.go index 4fb1910..c877930 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,34 +1,34 @@ package client import ( + "fmt" "testing" - "time" ) var client *Client +func printHandle(resp *Response) { + fmt.Printf("%V", resp) +} + func TestClientAddServer(t *testing.T) { t.Log("Add local server 127.0.0.1:4730") var err error - if client, err = New("127.0.0.1:4730"); err != nil { + if client, err = New("tcp4", "127.0.0.1:4730"); err != nil { t.Error(err) return } - client.ErrHandler = func(e error) { + client.ErrorHandler = func(e error) { t.Log(e) } } func TestClientEcho(t *testing.T) { - echo, err := client.Echo([]byte("Hello world"), time.Second) + err := client.Echo([]byte("Hello world"), printHandle) if err != nil { t.Error(err) return } - if string(echo) != "Hello world" { - t.Errorf("Invalid echo data: %s", echo) - return - } } func TestClientDoBg(t *testing.T) { @@ -39,7 +39,7 @@ func TestClientDoBg(t *testing.T) { } func TestClientDo(t *testing.T) { - jobHandler := func(job *Job) { + jobHandler := func(job *Response) { str := string(job.Data) if str == "ABCDEF" { t.Log(str) @@ -58,34 +58,37 @@ func TestClientDo(t *testing.T) { func TestClientStatus(t *testing.T) { - s1, err := client.Status("handle not exists", time.Second) + err := client.Status("handle not exists", printHandle) if err != nil { t.Error(err) return } - if s1.Known { - t.Errorf("The job (%s) shouldn't be known.", s1.Handle) - return - } - if s1.Running { - t.Errorf("The job (%s) shouldn't be running.", s1.Handle) - return - } + /* + if s1.Known { + t.Errorf("The job (%s) shouldn't be known.", s1.Handle) + return + } + if s1.Running { + t.Errorf("The job (%s) shouldn't be running.", s1.Handle) + return + }*/ handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil) - s2, err := client.Status(handle, time.Second) - if err != nil { - t.Error(err) - return - } - if !s2.Known { - t.Errorf("The job (%s) should be known.", s2.Handle) - return - } - if s2.Running { - t.Errorf("The job (%s) shouldn't be running.", s2.Handle) - return - } + err = client.Status(handle, printHandle) + /* + if err != nil { + t.Error(err) + return + } + if !s2.Known { + t.Errorf("The job (%s) should be known.", s2.Handle) + return + } + if s2.Running { + t.Errorf("The job (%s) shouldn't be running.", s2.Handle) + return + } + */ } func TestClientClose(t *testing.T) { diff --git a/client/common_test.go b/client/common_test.go deleted file mode 100644 index fa8ded9..0000000 --- a/client/common_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package client - -import ( - "bytes" - "testing" -) - -var ( - testCase = map[uint32][4]byte{ - 0: [...]byte{0, 0, 0, 0}, - 1: [...]byte{0, 0, 0, 1}, - 256: [...]byte{0, 0, 1, 0}, - 256 * 256: [...]byte{0, 1, 0, 0}, - 256 * 256 * 256: [...]byte{1, 0, 0, 0}, - 256*256*256 + 256*256 + 256 + 1: [...]byte{1, 1, 1, 1}, - 4294967295: [...]byte{0xFF, 0xFF, 0xFF, 0xFF}, - } -) - -func TestUint32ToBytes(t *testing.T) { - for k, v := range testCase { - b := Uint32ToBytes(k) - if bytes.Compare(b[:], v[:]) != 0 { - t.Errorf("%v was expected, but %v was got", v, b) - } - } -} - -func TestBytesToUint32s(t *testing.T) { - for k, v := range testCase { - u := BytesToUint32([4]byte(v)) - if u != k { - t.Errorf("%v was expected, but %v was got", k, u) - } - } -} - -func BenchmarkByteToUnit32(b *testing.B) { - for i := 0; i < b.N; i++ { - BytesToUint32([4]byte{0xF, 0xF, 0xF, 0xF}) - } -} - -func BenchmarkUint32ToByte(b *testing.B) { - for i := 0; i < b.N; i++ { - Uint32ToBytes(123456) - } -} diff --git a/client/error.go b/client/error.go index 8c77616..d6be69b 100644 --- a/client/error.go +++ b/client/error.go @@ -9,8 +9,8 @@ import ( "bytes" "errors" "fmt" - "syscall" "strconv" + "syscall" ) var ( diff --git a/client/handler.go b/client/handler.go index cee4cdd..6e5fbaf 100644 --- a/client/handler.go +++ b/client/handler.go @@ -1,7 +1,8 @@ package client // Response handler -type ResponseHandler func(*response) +type ResponseHandler func(*Response) + // Error handler type ErrorHandler func(error) diff --git a/client/id.go b/client/id.go deleted file mode 100644 index 0fe8ab7..0000000 --- a/client/id.go +++ /dev/null @@ -1,35 +0,0 @@ -package client - -import ( - "github.com/mikespook/golib/autoinc" - "labix.org/v2/mgo/bson" - "strconv" -) - -type IdGenerator interface { - Id() string -} - -// ObjectId -type objectId struct{} - -func (id *objectId) Id() string { - return bson.NewObjectId().Hex() -} - -func NewObjectId() IdGenerator { - return &objectId{} -} - -// AutoIncId -type autoincId struct { - *autoinc.AutoInc -} - -func (id *autoincId) Id() string { - return strconv.Itoa(id.AutoInc.Id()) -} - -func NewAutoIncId() IdGenerator { - return &autoincId{autoinc.New(1, 1)} -} diff --git a/client/pool.go b/client/pool.go index 8bcab3e..ddc2ac6 100644 --- a/client/pool.go +++ b/client/pool.go @@ -94,7 +94,7 @@ func (pool *Pool) Remove(addr string) { } func (pool *Pool) Do(funcname string, data []byte, - flag byte, h ResponseHandler) (addr string, handle []byte) { + flag byte, h ResponseHandler) (addr, handle string) { client := pool.selectServer() handle = client.Do(funcname, data, flag, h) addr = client.addr @@ -102,7 +102,7 @@ func (pool *Pool) Do(funcname string, data []byte, } func (pool *Pool) DoBg(funcname string, data []byte, - flag byte) (addr string, handle []byte) { + flag byte) (addr, handle string) { client := pool.selectServer() handle = client.DoBg(funcname, data, flag) addr = client.addr @@ -111,9 +111,9 @@ func (pool *Pool) DoBg(funcname string, data []byte, // Get job status from job server. // !!!Not fully tested.!!! -func (pool *Pool) Status(addr string, handle []byte, h ResponseHandler) (err error) { +func (pool *Pool) Status(addr, handle string, h ResponseHandler) (status *Status, err error) { if client, ok := pool.clients[addr]; ok { - err = client.Status(handle, h) + status, err = client.Status(handle, h) } else { err = ErrNotFound } @@ -121,7 +121,7 @@ func (pool *Pool) Status(addr string, handle []byte, h ResponseHandler) (err err } // Send a something out, get the samething back. -func (pool *Pool) Echo(addr string, data []byte, h ResponseHandler) (r []byte, err error) { +func (pool *Pool) Echo(addr string, data []byte, h ResponseHandler) (echo []byte, err error) { var client *poolClient if addr == "" { client = pool.selectServer() @@ -132,7 +132,7 @@ func (pool *Pool) Echo(addr string, data []byte, h ResponseHandler) (r []byte, e return } } - err = client.Echo(data, h) + echo, err = client.Echo(data) return } diff --git a/client/pool_test.go b/client/pool_test.go index 778c43c..f96fbc1 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -2,7 +2,6 @@ package client import ( "testing" - "time" ) var ( @@ -11,10 +10,10 @@ var ( func TestPoolAdd(t *testing.T) { t.Log("Add servers") - if err := pool.Add("127.0.0.1:4730", 1); err != nil { + if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil { t.Error(err) } - if err := pool.Add("127.0.0.1:4730", 1); err != nil { + if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil { t.Error(err) } if len(pool.clients) != 2 { @@ -23,7 +22,7 @@ func TestPoolAdd(t *testing.T) { } func TestPoolEcho(t *testing.T) { - echo, err := pool.Echo("", []byte("Hello pool"), time.Second) + echo, err := pool.Echo("", []byte("Hello pool"), printHandle) if err != nil { t.Error(err) return @@ -33,7 +32,7 @@ func TestPoolEcho(t *testing.T) { return } - _, err = pool.Echo("not exists", []byte("Hello pool"), time.Second) + _, err = pool.Echo("not exists", []byte("Hello pool"), printHandle) if err != ErrNotFound { t.Errorf("ErrNotFound expected, got %s", err) } @@ -49,7 +48,7 @@ func TestPoolDoBg(t *testing.T) { } func TestPoolDo(t *testing.T) { - jobHandler := func(job *Job) { + jobHandler := func(job *Response) { str := string(job.Data) if str == "ABCDEF" { t.Log(str) @@ -67,33 +66,34 @@ func TestPoolDo(t *testing.T) { } func TestPoolStatus(t *testing.T) { - s1, err := pool.Status("127.0.0.1:4730", "handle not exists", time.Second) + err := pool.Status("127.0.0.1:4730", "handle not exists", printHandle) if err != nil { t.Error(err) return } - if s1.Known { - t.Errorf("The job (%s) shouldn't be known.", s1.Handle) - } - if s1.Running { - t.Errorf("The job (%s) shouldn't be running.", s1.Handle) - } - + /* + if s1.Known { + t.Errorf("The job (%s) shouldn't be known.", s1.Handle) + } + if s1.Running { + t.Errorf("The job (%s) shouldn't be running.", s1.Handle) + } + */ addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil) - s2, err := pool.Status(addr, handle, time.Second) + err = pool.Status(addr, handle, printHandle) if err != nil { t.Error(err) return } - - if !s2.Known { - t.Errorf("The job (%s) should be known.", s2.Handle) - } - if s2.Running { - t.Errorf("The job (%s) shouldn't be running.", s2.Handle) - } - - _, err = pool.Status("not exists", "not exists", time.Second) + /* + if !s2.Known { + t.Errorf("The job (%s) should be known.", s2.Handle) + } + if s2.Running { + t.Errorf("The job (%s) shouldn't be running.", s2.Handle) + } + */ + err = pool.Status("not exists", "not exists", printHandle) if err != ErrNotFound { t.Error(err) } diff --git a/client/request.go b/client/request.go index 28f925c..f5f62ce 100644 --- a/client/request.go +++ b/client/request.go @@ -11,19 +11,19 @@ import ( // request type request struct { - DataType uint32 - Data []byte + DataType uint32 + Data []byte } // Encode a Request to byte slice func (req *request) Encode() (data []byte) { - l := len(req.Data) // length of data - tl := l + 12 // add 12 bytes head + l := len(req.Data) // length of data + tl := l + MIN_PACKET_LEN // add 12 bytes head data = getBuffer(tl) copy(data[:4], REQ_STR) binary.BigEndian.PutUint32(data[4:8], req.DataType) binary.BigEndian.PutUint32(data[8:12], uint32(l)) - copy(data[12:], req.Data) + copy(data[MIN_PACKET_LEN:], req.Data) return } diff --git a/client/response.go b/client/response.go index b98ec81..5d41fc3 100644 --- a/client/response.go +++ b/client/response.go @@ -6,17 +6,17 @@ package client import ( - "fmt" "bytes" - "strconv" "encoding/binary" + "fmt" + "strconv" ) // response -type response struct { - DataType uint32 - Data, Handle []byte - UID string +type Response struct { + DataType uint32 + Data []byte + UID, Handle string } // Extract the Response's result. @@ -24,10 +24,10 @@ type response struct { // if data != nil, err != nil, then worker has a exception // if data != nil, err == nil, then worker complate job // after calling this method, the Response.Handle will be filled -func (resp *response) Result() (data []byte, err error) { +func (resp *Response) Result() (data []byte, err error) { switch resp.DataType { case WORK_FAIL: - resp.Handle = resp.Data + resp.Handle = string(resp.Data) err = ErrWorkFail return case WORK_EXCEPTION: @@ -39,7 +39,7 @@ func (resp *response) Result() (data []byte, err error) { err = fmt.Errorf("Invalid data: %V", resp.Data) return } - resp.Handle = s[0] + resp.Handle = string(s[0]) data = s[1] default: err = ErrDataType @@ -48,7 +48,7 @@ func (resp *response) Result() (data []byte, err error) { } // Extract the job's update -func (resp *response) Update() (data []byte, err error) { +func (resp *Response) Update() (data []byte, err error) { if resp.DataType != WORK_DATA && resp.DataType != WORK_WARNING { err = ErrDataType @@ -62,19 +62,19 @@ func (resp *response) Update() (data []byte, err error) { if resp.DataType == WORK_WARNING { err = ErrWorkWarning } - resp.Handle = s[0] + resp.Handle = string(s[0]) data = s[1] return } // Decode a job from byte slice -func decodeResponse(data []byte) (resp *response, l int, err error) { +func decodeResponse(data []byte) (resp *Response, l int, err error) { if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes err = fmt.Errorf("Invalid data: %V", data) return } dl := int(binary.BigEndian.Uint32(data[8:12])) - dt := data[MIN_PACKET_LEN:dl+MIN_PACKET_LEN] + dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN] if len(dt) != int(dl) { // length not equal err = fmt.Errorf("Invalid data: %V", data) return @@ -82,11 +82,13 @@ func decodeResponse(data []byte) (resp *response, l int, err error) { resp = getResponse() resp.DataType = binary.BigEndian.Uint32(data[4:8]) switch resp.DataType { + case ECHO_RES: + resp.Data = dt case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: - s := bytes.SplitN(data, []byte{'\x00'}, 2) + s := bytes.SplitN(dt, []byte{'\x00'}, 2) if len(s) >= 2 { - resp.Handle = s[0] + resp.Handle = string(s[0]) resp.Data = s[1] } else { err = fmt.Errorf("Invalid data: %V", data) @@ -97,8 +99,16 @@ func decodeResponse(data []byte) (resp *response, l int, err error) { return } +func (resp *Response) IsEcho() bool { + return resp.DataType == ECHO_RES +} + +func (resp *Response) IsStatus() bool { + return resp.DataType == STATUS_RES +} + // status handler -func (resp *response) Status() (status *Status, err error) { +func (resp *Response) Status() (status *Status, err error) { data := bytes.SplitN(resp.Data, []byte{'\x00'}, 5) if len(data) != 5 { err = fmt.Errorf("Invalid data: %V", resp.Data) @@ -121,9 +131,8 @@ func (resp *response) Status() (status *Status, err error) { return } - -func getResponse() (resp *response) { +func getResponse() (resp *Response) { // TODO add a pool - resp = &response{} + resp = &Response{} return }