a dirty implementing of pooling, refactoring needed

This commit is contained in:
mikespook 2013-01-05 15:24:06 +08:00
parent 4de3ae3f49
commit d66157f79e

View File

@ -6,55 +6,142 @@
package client package client
import ( import (
"time"
"errors"
"math/rand"
"github.com/mikespook/gearman-go/common"
)
const (
PoolSize = 10
DefaultRetry = 5
)
var (
ErrTooMany = errors.New("Too many errors occurred.")
) )
type poolItem struct { type poolItem struct {
*Client
Rate int
Addr string
}
func (item *poolItem) connect(pool *Pool) (err error) {
item.Client, err = New(item.Addr);
item.ErrHandler = pool.ErrHandler
item.JobHandler = pool.JobHandler
item.StatusHandler = pool.StatusHandler
item.TimeOut = pool.TimeOut
return
}
type SelectionHandler func(map[string]*poolItem, string) string
func SelectWithRate(pool map[string]*poolItem,
last string) (addr string) {
total := 0
for _, item := range pool {
total += item.Rate
if rand.Intn(total) < item.Rate {
return item.Addr
}
}
return last
}
func SelectRandom(pool map[string]*poolItem,
last string) (addr string) {
r := rand.Intn(len(pool))
i := 0
for k, _ := range pool {
if r == i {
return k
}
i ++
}
return last
} }
/*
The client pool
usage:
pool := client.NewPool()
pool.Add("127.0.0.1:4730", 1)
handle := pool.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG)
*/
type Pool struct { type Pool struct {
SelectionHandler SelectionHandler
ErrHandler common.ErrorHandler
JobHandler JobHandler
StatusHandler StatusHandler
TimeOut time.Duration
Retry int
items map[string]*poolItem
last string
handles map[string]string
} }
// Create a new client. // Create a new pool.
// Connect to "addr" through "network"
// Eg.
// client, err := client.New("127.0.0.1:4730")
func NewPool() (pool *Pool) { func NewPool() (pool *Pool) {
return &Pool{
items: make(map[string]*poolItem, PoolSize),
Retry: DefaultRetry,
SelectionHandler: SelectWithRate,
}
} }
// Add a server with rate.
func (pool *Pool) Add(addr string, rate int) { func (pool *Pool) Add(addr string, rate int) {
// init a poolItem with Client & rate var item *poolItem
var ok bool
if item, ok = pool.items[addr]; ok {
item.Rate = rate
} else {
item = &poolItem{Rate: rate, Addr: addr}
item.connect(pool)
pool.items[addr] = item
}
} }
// Do the function. func (pool *Pool) Do(funcname string, data []byte,
// funcname is a string with function name. flag byte) (addr, handle string, err error) {
// data is encoding to byte array. for i := 0; i < pool.Retry; i ++ {
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, addr = pool.SelectionHandler(pool.items, pool.last)
// and if it is background job: JOB_BG. item, ok := pool.items[addr]
// JOB_LOW | JOB_BG means the job is running with low level in background. if ok {
func (pool *Pool) Do(funcname string, data []byte, flag byte) (handle string, err error) { pool.last = addr
// Select a job server handle, err = item.Do(funcname, data, flag)
// error handling
// mapping the handle to the server
return
}
}
err = ErrTooMany
return
} }
// Get job status from job server. // Get job status from job server.
// !!!Not fully tested.!!! // !!!Not fully tested.!!!
func (pool *Pool) Status(handle string) { func (pool *Pool) Status(addr, handle string) {
// if item, ok := pool.items[addr]; ok {
item.Status(handle)
}
} }
// Send a something out, get the samething back. // Send a something out, get the samething back.
func (pool *Pool) Echo(data []byte) { func (pool *Pool) Echo(data []byte) {
for i := 0; i < pool.Retry; i ++ {
addr = pool.SelectionHandler(pool.items, pool.last)
item, ok := pool.items[addr]
if ok {
pool.last = addr
item.Echo(data)
}
}
} }
// Close // Close
func (client *Client) Close() (err error) { func (pool *Pool) Close() (err error) {
for _, c := range pool.items {
err = c.Close()
}
return
} }