From ab0fc4a6a572d499a8db876cec15b879f9a27337 Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 8 Jan 2013 17:23:10 +0800 Subject: [PATCH] What a mess! -_-! --- client/client_test.go | 1 + client/job.go | 37 +++++++++++++++++++++---------- client/pool.go | 36 ++++++++++++++++++++++-------- client/pool_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 20 deletions(-) create mode 100644 client/pool_test.go diff --git a/client/client_test.go b/client/client_test.go index 1c8db92..b9cbefd 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -37,6 +37,7 @@ func TestClientDo(t *testing.T) { } */ func TestClientClose(t *testing.T) { + return if err := client.Close(); err != nil { t.Error(err) } diff --git a/client/job.go b/client/job.go index cbcfa20..3679093 100644 --- a/client/job.go +++ b/client/job.go @@ -36,8 +36,8 @@ type Job struct { // Create a new job func newJob(magiccode, datatype uint32, data []byte) (job *Job) { return &Job{magicCode: magiccode, - DataType: datatype, - Data: data} + DataType: datatype, + Data: data} } // Decode a job from byte slice @@ -51,7 +51,22 @@ func decodeJob(data []byte) (job *Job, err error) { return nil, common.Errorf("Invalid data: %V", data) } data = data[12:] - return newJob(common.RES, datatype, data), nil + + var handle string + switch datatype { + case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, + common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION: + i := bytes.IndexByte(data, '\x00') + if i != -1 { + handle = string(data[:i]) + data = data[i:] + } + } + + return &Job{magicCode: common.RES, + DataType: datatype, + Data: data, + Handle: handle}, nil } // Encode a job to byte slice @@ -66,14 +81,14 @@ func (job *Job) Encode() (data []byte) { for i := 0; i < tl; i ++ { switch { - case i < 4: - data[i] = magiccode[i] - case i < 8: - data[i] = datatype[i - 4] - case i < 12: - data[i] = datalength[i - 8] - default: - data[i] = job.Data[i - 12] + case i < 4: + data[i] = magiccode[i] + case i < 8: + data[i] = datatype[i - 4] + case i < 12: + data[i] = datalength[i - 8] + default: + data[i] = job.Data[i - 12] } } // Alternative diff --git a/client/pool.go b/client/pool.go index 86dc67f..38ad7fa 100644 --- a/client/pool.go +++ b/client/pool.go @@ -6,6 +6,7 @@ package client import ( + "fmt" "time" "errors" "math/rand" @@ -15,6 +16,7 @@ import ( const ( PoolSize = 10 DefaultRetry = 5 + DefaultTimeout = 30 * time.Second ) var ( @@ -28,10 +30,18 @@ type poolItem struct { } func (item *poolItem) connect(pool *Pool) (err error) { - item.Client, err = New(item.Addr); - item.ErrHandler = pool.ErrHandler - item.JobHandler = pool.JobHandler - item.StatusHandler = pool.StatusHandler + if item.Client, err = New(item.Addr); err != nil { + return + } + if pool.ErrHandler != nil { + item.ErrHandler = pool.ErrHandler + } + if pool.JobHandler != nil { + item.JobHandler = pool.JobHandler + } + if pool.StatusHandler != nil { + item.StatusHandler = pool.StatusHandler + } item.TimeOut = pool.TimeOut return } @@ -85,20 +95,24 @@ func NewPool() (pool *Pool) { items: make(map[string]*poolItem, PoolSize), Retry: DefaultRetry, SelectionHandler: SelectWithRate, + TimeOut: DefaultTimeout, } } // Add a server with rate. -func (pool *Pool) Add(addr string, rate int) { +func (pool *Pool) Add(addr string, rate int) (err error) { var item *poolItem var ok bool if item, ok = pool.items[addr]; ok { item.Rate = rate } else { item = &poolItem{Rate: rate, Addr: addr} - item.connect(pool) + if err = item.connect(pool); err != nil { + return + } pool.items[addr] = item } + return } func (pool *Pool) Do(funcname string, data []byte, @@ -129,7 +143,7 @@ func (pool *Pool) Status(addr, handle string) { // Send a something out, get the samething back. func (pool *Pool) Echo(data []byte) { for i := 0; i < pool.Retry; i ++ { - addr = pool.SelectionHandler(pool.items, pool.last) + addr := pool.SelectionHandler(pool.items, pool.last) item, ok := pool.items[addr] if ok { pool.last = addr @@ -139,9 +153,13 @@ func (pool *Pool) Echo(data []byte) { } // Close -func (pool *Pool) Close() (err error) { +func (pool *Pool) Close() (err map[string]error) { + err = make(map[string]error) for _, c := range pool.items { - err = c.Close() + fmt.Printf("begin") + err[c.Addr] = c.Close() + fmt.Printf("end") } + fmt.Print("end-for") return } diff --git a/client/pool_test.go b/client/pool_test.go new file mode 100644 index 0000000..6522f54 --- /dev/null +++ b/client/pool_test.go @@ -0,0 +1,51 @@ +package client + +import ( + "errors" + "testing" +) + +var ( + pool = NewPool() +) + +func TestPoolAdd(t *testing.T) { + t.Log("Add servers") + if err := pool.Add("127.0.0.1:4730", 1); err != nil { + t.Error(err) + } + if err := pool.Add("127.0.0.2:4730", 1); err != nil { + t.Error(err) + } + if len(pool.items) != 2 { + t.Error(errors.New("2 servers expected")) + } +} +/* +func TestPoolEcho(t *testing.T) { + pool.JobHandler = func(job *Job) error { + echo := string(job.Data) + if echo == "Hello world" { + t.Log(echo) + } else { + t.Errorf("Invalid echo data: %s", job.Data) + } + return nil + } + pool.Echo([]byte("Hello world")) +} +*/ +/* +func TestPoolDo(t *testing.T) { + if addr, handle, err := pool.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { + t.Error(err) + } else { + t.Log(handle) + } +} +*/ +func TestPoolClose(t *testing.T) { + if err := pool.Close(); err != nil { + t.Error(err) + } +}