package client import ( "errors" "math/rand" "sync" ) const ( poolSize = 10 ) var ( ErrNotFound = errors.New("Server Not Found") ) type PoolClient struct { *Client Rate int mutex sync.Mutex } type SelectionHandler func(map[string]*PoolClient, string) string func SelectWithRate(pool map[string]*PoolClient, last string) (addr string) { total := 0 for _, item := range pool { total += item.Rate if rand.Intn(total) < item.Rate { return item.addr } } return last } func SelectRandom(pool map[string]*PoolClient, last string) (addr string) { r := rand.Intn(len(pool)) i := 0 for k, _ := range pool { if r == i { return k } i++ } return last } type Pool struct { SelectionHandler SelectionHandler ErrorHandler ErrorHandler Clients map[string]*PoolClient last string mutex sync.Mutex } // NewPool returns a new pool. func NewPool() (pool *Pool) { return &Pool{ Clients: make(map[string]*PoolClient, poolSize), SelectionHandler: SelectWithRate, } } // Add a server with rate. func (pool *Pool) Add(net, addr string, rate int) (err error) { pool.mutex.Lock() defer pool.mutex.Unlock() var item *PoolClient var ok bool if item, ok = pool.Clients[addr]; ok { item.Rate = rate } else { var client *Client client, err = New(net, addr) if err == nil { item = &PoolClient{Client: client, Rate: rate} pool.Clients[addr] = item } } return } // Remove a server. func (pool *Pool) Remove(addr string) { pool.mutex.Lock() defer pool.mutex.Unlock() delete(pool.Clients, addr) } func (pool *Pool) Do(funcname string, data []byte, flag byte, h ResponseHandler) (addr, handle string, err error) { client := pool.selectServer() client.Lock() defer client.Unlock() handle, err = client.Do(funcname, data, flag, h) addr = client.addr return } func (pool *Pool) DoBg(funcname string, data []byte, flag byte) (addr, handle string, err error) { client := pool.selectServer() client.Lock() defer client.Unlock() handle, err = client.DoBg(funcname, data, flag) addr = client.addr return } // Status gets job status from job server. // !!!Not fully tested.!!! func (pool *Pool) Status(addr, handle string) (status *Status, err error) { if client, ok := pool.Clients[addr]; ok { client.Lock() defer client.Unlock() status, err = client.Status(handle) } else { err = ErrNotFound } return } // Send a something out, get the samething back. func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { var client *PoolClient if addr == "" { client = pool.selectServer() } else { var ok bool if client, ok = pool.Clients[addr]; !ok { err = ErrNotFound return } } client.Lock() defer client.Unlock() echo, err = client.Echo(data) return } // Close func (pool *Pool) Close() (err map[string]error) { err = make(map[string]error) for _, c := range pool.Clients { err[c.addr] = c.Close() } return } // selecting server func (pool *Pool) selectServer() (client *PoolClient) { for client == nil { addr := pool.SelectionHandler(pool.Clients, pool.last) var ok bool if client, ok = pool.Clients[addr]; ok { pool.last = addr break } } return }