From af054607276549cf3795b72993b19a6236cf83e4 Mon Sep 17 00:00:00 2001 From: mikespook Date: Thu, 24 Jan 2013 18:13:02 +0800 Subject: [PATCH] fixed the infinite loop of client --- client/client.go | 11 ++-- client/pool.go | 121 ++++++++++++-------------------------------- client/pool_test.go | 16 +++--- 3 files changed, 46 insertions(+), 102 deletions(-) diff --git a/client/client.go b/client/client.go index 963d9b9..33aa4c0 100644 --- a/client/client.go +++ b/client/client.go @@ -193,21 +193,21 @@ func (client *Client) inLoop() { if err != nil { if err == common.ErrConnection { client.Close() - break } client.err(err) - continue + break } job, err := decodeJob(rel) if err != nil { client.err(err) continue + //break } switch job.DataType { case common.ERROR: _, err := common.GetError(job.Data) client.err(err) - case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, + case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION: client.handleJob(job) case common.ECHO_RES: @@ -216,6 +216,8 @@ func (client *Client) inLoop() { client.handleCreated(job) case common.STATUS_RES: client.handleStatus(job) + default: + break } } } @@ -350,12 +352,11 @@ func (client *Client) Echo(data []byte) (r []byte) { // Close func (client *Client) Close() (err error) { -// close(client.in) + close(client.in) close(client.out) close(client.echo) close(client.created) close(client.status) - return client.conn.Close(); } diff --git a/client/pool.go b/client/pool.go index 8878454..9c5e643 100644 --- a/client/pool.go +++ b/client/pool.go @@ -6,55 +6,35 @@ package client import ( - "fmt" - "time" - "errors" + "sync" "math/rand" "github.com/mikespook/gearman-go/common" ) const ( PoolSize = 10 - DefaultRetry = 5 - DefaultTimeout = 30 * time.Second ) -var ( - ErrTooMany = errors.New("Too many errors occurred.") -) - -type poolItem struct { +type poolClient struct { *Client Rate int - Addr string } -func (item *poolItem) connect(pool *Pool) (err error) { - if item.Client, err = New(item.Addr); err != nil { - return - } - if pool.ErrHandler != nil { - item.ErrHandler = pool.ErrHandler - } - return -} +type SelectionHandler func(map[string]*poolClient, string) string - -type SelectionHandler func(map[string]*poolItem, string) string - -func SelectWithRate(pool map[string]*poolItem, +func SelectWithRate(pool map[string]*poolClient, last string) (addr string) { total := 0 for _, item := range pool { total += item.Rate if rand.Intn(total) < item.Rate { - return item.Addr + return item.addr } } return last } -func SelectRandom(pool map[string]*poolItem, +func SelectRandom(pool map[string]*poolClient, last string) (addr string) { r := rand.Intn(len(pool)) i := 0 @@ -72,73 +52,50 @@ last string) (addr string) { type Pool struct { SelectionHandler SelectionHandler ErrHandler common.ErrorHandler - JobHandler JobHandler - StatusHandler StatusHandler - TimeOut time.Duration - Retry int - - items map[string]*poolItem - last string - handles map[string]string + clients map[string]*poolClient + mutex sync.Mutex } // Create a new pool. func NewPool() (pool *Pool) { return &Pool{ - items: make(map[string]*poolItem, PoolSize), - Retry: DefaultRetry, + clients: make(map[string]*poolClient, PoolSize), SelectionHandler: SelectWithRate, - TimeOut: DefaultTimeout, } } // Add a server with rate. func (pool *Pool) Add(addr string, rate int) (err error) { - var item *poolItem + pool.mutex.Lock() + defer pool.mutex.Unlock() + var item *poolClient var ok bool - if item, ok = pool.items[addr]; ok { + if item, ok = pool.clients[addr]; ok { item.Rate = rate } else { - item = &poolItem{Rate: rate, Addr: addr} - if err = item.connect(pool); err != nil { - return - } - pool.items[addr] = item + var client *Client + client, err = New(addr) + item = &poolClient{Client: client, Rate: rate} + err = item.connect() + pool.clients[addr] = item } return } +// Remove a server. +func (pool *Pool) Remove(addr string) { + pool.mutex.Lock() + defer pool.mutex.Unlock() + delete(pool.clients, addr) +} + func (pool *Pool) Do(funcname string, data []byte, -flag byte, h JobHandler) (addr, handle string, err error) { - for i := 0; i < pool.Retry; i ++ { - addr = pool.SelectionHandler(pool.items, pool.last) - item, ok := pool.items[addr] - if ok { - pool.last = addr - handle = item.Do(funcname, data, flag, h) - // error handling - // mapping the handle to the server - return - } - } - err = ErrTooMany +flag byte, h JobHandler) (handle string, err error) { return } func (pool *Pool) DoBg(funcname string, data []byte, -flag byte) (addr, handle string, err error) { - for i := 0; i < pool.Retry; i ++ { - addr = pool.SelectionHandler(pool.items, pool.last) - item, ok := pool.items[addr] - if ok { - pool.last = addr - handle = item.DoBg(funcname, data, flag) - // error handling - // mapping the handle to the server - return - } - } - err = ErrTooMany +flag byte) (handle string, err error) { return } @@ -146,32 +103,20 @@ flag byte) (addr, handle string, err error) { // Get job status from job server. // !!!Not fully tested.!!! -func (pool *Pool) Status(addr, handle string) { - if item, ok := pool.items[addr]; ok { - item.Status(handle) - } +func (pool *Pool) Status(handle string) (status *Status) { + return } // Send a something out, get the samething back. -func (pool *Pool) Echo(data []byte) { - for i := 0; i < pool.Retry; i ++ { - addr := pool.SelectionHandler(pool.items, pool.last) - item, ok := pool.items[addr] - if ok { - pool.last = addr - item.Echo(data) - } - } +func (pool *Pool) Echo(data []byte) (r []byte) { + return } // Close func (pool *Pool) Close() (err map[string]error) { err = make(map[string]error) - for _, c := range pool.items { - fmt.Printf("begin") - err[c.Addr] = c.Close() - fmt.Printf("end") + for _, c := range pool.clients { + err[c.addr] = c.Close() } - fmt.Print("end-for") return } diff --git a/client/pool_test.go b/client/pool_test.go index 84afea8..9c86bc8 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -1,17 +1,14 @@ package client import ( -// "errors" -// "testing" + "testing" ) var ( pool = NewPool() ) -/* func TestPoolAdd(t *testing.T) { - return t.Log("Add servers") if err := pool.Add("127.0.0.1:4730", 1); err != nil { t.Error(err) @@ -19,10 +16,11 @@ func TestPoolAdd(t *testing.T) { if err := pool.Add("127.0.0.2:4730", 1); err != nil { t.Error(err) } - if len(pool.items) != 2 { - t.Error(errors.New("2 servers expected")) + if len(pool.clients) != 2 { + t.Errorf("2 servers expected, %d got.", len(pool.clients)) } } +/* func TestPoolEcho(t *testing.T) { pool.JobHandler = func(job *Job) error { @@ -36,8 +34,7 @@ func TestPoolEcho(t *testing.T) { } pool.Echo([]byte("Hello world")) } -*/ -/* + func TestPoolDo(t *testing.T) { if addr, handle, err := pool.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { t.Error(err) @@ -45,10 +42,11 @@ func TestPoolDo(t *testing.T) { t.Log(handle) } } + +*/ func TestPoolClose(t *testing.T) { return if err := pool.Close(); err != nil { t.Error(err) } } -*/