From d66157f79e2c5b57b7178953ebe2e94d5aeeffaf Mon Sep 17 00:00:00 2001 From: mikespook Date: Sat, 5 Jan 2013 15:24:06 +0800 Subject: [PATCH] a dirty implementing of pooling, refactoring needed --- client/pool.go | 135 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 111 insertions(+), 24 deletions(-) diff --git a/client/pool.go b/client/pool.go index d0f00cd..86dc67f 100644 --- a/client/pool.go +++ b/client/pool.go @@ -6,55 +6,142 @@ package client import ( + "time" + "errors" + "math/rand" + "github.com/mikespook/gearman-go/common" +) + +const ( + PoolSize = 10 + DefaultRetry = 5 +) + +var ( + ErrTooMany = errors.New("Too many errors occurred.") ) type poolItem struct { - + *Client + Rate int + Addr string +} + +func (item *poolItem) connect(pool *Pool) (err error) { + item.Client, err = New(item.Addr); + item.ErrHandler = pool.ErrHandler + item.JobHandler = pool.JobHandler + item.StatusHandler = pool.StatusHandler + item.TimeOut = pool.TimeOut + return +} + + +type SelectionHandler func(map[string]*poolItem, string) string + +func SelectWithRate(pool map[string]*poolItem, +last string) (addr string) { + total := 0 + for _, item := range pool { + total += item.Rate + if rand.Intn(total) < item.Rate { + return item.Addr + } + } + return last +} + +func SelectRandom(pool map[string]*poolItem, +last string) (addr string) { + r := rand.Intn(len(pool)) + i := 0 + for k, _ := range pool { + if r == i { + return k + } + i ++ + } + return last } -/* -The client pool -usage: -pool := client.NewPool() -pool.Add("127.0.0.1:4730", 1) -handle := pool.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG) -*/ type Pool struct { + SelectionHandler SelectionHandler + ErrHandler common.ErrorHandler + JobHandler JobHandler + StatusHandler StatusHandler + TimeOut time.Duration + Retry int + + items map[string]*poolItem + last string + handles map[string]string } -// Create a new client. -// Connect to "addr" through "network" -// Eg. -// client, err := client.New("127.0.0.1:4730") +// Create a new pool. func NewPool() (pool *Pool) { + return &Pool{ + items: make(map[string]*poolItem, PoolSize), + Retry: DefaultRetry, + SelectionHandler: SelectWithRate, + } } +// Add a server with rate. func (pool *Pool) Add(addr string, rate int) { - // init a poolItem with Client & rate + var item *poolItem + var ok bool + if item, ok = pool.items[addr]; ok { + item.Rate = rate + } else { + item = &poolItem{Rate: rate, Addr: addr} + item.connect(pool) + pool.items[addr] = item + } } -// Do the function. -// funcname is a string with function name. -// data is encoding to byte array. -// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, -// and if it is background job: JOB_BG. -// JOB_LOW | JOB_BG means the job is running with low level in background. -func (pool *Pool) Do(funcname string, data []byte, flag byte) (handle string, err error) { - // Select a job server +func (pool *Pool) Do(funcname string, data []byte, +flag byte) (addr, handle string, err error) { + for i := 0; i < pool.Retry; i ++ { + addr = pool.SelectionHandler(pool.items, pool.last) + item, ok := pool.items[addr] + if ok { + pool.last = addr + handle, err = item.Do(funcname, data, flag) + // error handling + // mapping the handle to the server + return + } + } + err = ErrTooMany + return } // Get job status from job server. // !!!Not fully tested.!!! -func (pool *Pool) Status(handle string) { - // +func (pool *Pool) Status(addr, handle string) { + if item, ok := pool.items[addr]; ok { + item.Status(handle) + } } // Send a something out, get the samething back. func (pool *Pool) Echo(data []byte) { + for i := 0; i < pool.Retry; i ++ { + addr = pool.SelectionHandler(pool.items, pool.last) + item, ok := pool.items[addr] + if ok { + pool.last = addr + item.Echo(data) + } + } } // Close -func (client *Client) Close() (err error) { +func (pool *Pool) Close() (err error) { + for _, c := range pool.items { + err = c.Close() + } + return }