forked from yuxh/gearman-go
What a mess! -_-!
This commit is contained in:
parent
d66157f79e
commit
ab0fc4a6a5
@ -37,6 +37,7 @@ func TestClientDo(t *testing.T) {
|
|||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
func TestClientClose(t *testing.T) {
|
func TestClientClose(t *testing.T) {
|
||||||
|
return
|
||||||
if err := client.Close(); err != nil {
|
if err := client.Close(); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
@ -51,7 +51,22 @@ func decodeJob(data []byte) (job *Job, err error) {
|
|||||||
return nil, common.Errorf("Invalid data: %V", data)
|
return nil, common.Errorf("Invalid data: %V", data)
|
||||||
}
|
}
|
||||||
data = data[12:]
|
data = data[12:]
|
||||||
return newJob(common.RES, datatype, data), nil
|
|
||||||
|
var handle string
|
||||||
|
switch datatype {
|
||||||
|
case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
|
||||||
|
common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
|
||||||
|
i := bytes.IndexByte(data, '\x00')
|
||||||
|
if i != -1 {
|
||||||
|
handle = string(data[:i])
|
||||||
|
data = data[i:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Job{magicCode: common.RES,
|
||||||
|
DataType: datatype,
|
||||||
|
Data: data,
|
||||||
|
Handle: handle}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode a job to byte slice
|
// Encode a job to byte slice
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
"errors"
|
"errors"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
@ -15,6 +16,7 @@ import (
|
|||||||
const (
|
const (
|
||||||
PoolSize = 10
|
PoolSize = 10
|
||||||
DefaultRetry = 5
|
DefaultRetry = 5
|
||||||
|
DefaultTimeout = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -28,10 +30,18 @@ type poolItem struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (item *poolItem) connect(pool *Pool) (err error) {
|
func (item *poolItem) connect(pool *Pool) (err error) {
|
||||||
item.Client, err = New(item.Addr);
|
if item.Client, err = New(item.Addr); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if pool.ErrHandler != nil {
|
||||||
item.ErrHandler = pool.ErrHandler
|
item.ErrHandler = pool.ErrHandler
|
||||||
|
}
|
||||||
|
if pool.JobHandler != nil {
|
||||||
item.JobHandler = pool.JobHandler
|
item.JobHandler = pool.JobHandler
|
||||||
|
}
|
||||||
|
if pool.StatusHandler != nil {
|
||||||
item.StatusHandler = pool.StatusHandler
|
item.StatusHandler = pool.StatusHandler
|
||||||
|
}
|
||||||
item.TimeOut = pool.TimeOut
|
item.TimeOut = pool.TimeOut
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -85,20 +95,24 @@ func NewPool() (pool *Pool) {
|
|||||||
items: make(map[string]*poolItem, PoolSize),
|
items: make(map[string]*poolItem, PoolSize),
|
||||||
Retry: DefaultRetry,
|
Retry: DefaultRetry,
|
||||||
SelectionHandler: SelectWithRate,
|
SelectionHandler: SelectWithRate,
|
||||||
|
TimeOut: DefaultTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a server with rate.
|
// Add a server with rate.
|
||||||
func (pool *Pool) Add(addr string, rate int) {
|
func (pool *Pool) Add(addr string, rate int) (err error) {
|
||||||
var item *poolItem
|
var item *poolItem
|
||||||
var ok bool
|
var ok bool
|
||||||
if item, ok = pool.items[addr]; ok {
|
if item, ok = pool.items[addr]; ok {
|
||||||
item.Rate = rate
|
item.Rate = rate
|
||||||
} else {
|
} else {
|
||||||
item = &poolItem{Rate: rate, Addr: addr}
|
item = &poolItem{Rate: rate, Addr: addr}
|
||||||
item.connect(pool)
|
if err = item.connect(pool); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
pool.items[addr] = item
|
pool.items[addr] = item
|
||||||
}
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *Pool) Do(funcname string, data []byte,
|
func (pool *Pool) Do(funcname string, data []byte,
|
||||||
@ -129,7 +143,7 @@ func (pool *Pool) Status(addr, handle string) {
|
|||||||
// Send a something out, get the samething back.
|
// Send a something out, get the samething back.
|
||||||
func (pool *Pool) Echo(data []byte) {
|
func (pool *Pool) Echo(data []byte) {
|
||||||
for i := 0; i < pool.Retry; i ++ {
|
for i := 0; i < pool.Retry; i ++ {
|
||||||
addr = pool.SelectionHandler(pool.items, pool.last)
|
addr := pool.SelectionHandler(pool.items, pool.last)
|
||||||
item, ok := pool.items[addr]
|
item, ok := pool.items[addr]
|
||||||
if ok {
|
if ok {
|
||||||
pool.last = addr
|
pool.last = addr
|
||||||
@ -139,9 +153,13 @@ func (pool *Pool) Echo(data []byte) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Close
|
// Close
|
||||||
func (pool *Pool) Close() (err error) {
|
func (pool *Pool) Close() (err map[string]error) {
|
||||||
|
err = make(map[string]error)
|
||||||
for _, c := range pool.items {
|
for _, c := range pool.items {
|
||||||
err = c.Close()
|
fmt.Printf("begin")
|
||||||
|
err[c.Addr] = c.Close()
|
||||||
|
fmt.Printf("end")
|
||||||
}
|
}
|
||||||
|
fmt.Print("end-for")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
51
client/pool_test.go
Normal file
51
client/pool_test.go
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
pool = NewPool()
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPoolAdd(t *testing.T) {
|
||||||
|
t.Log("Add servers")
|
||||||
|
if err := pool.Add("127.0.0.1:4730", 1); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if err := pool.Add("127.0.0.2:4730", 1); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if len(pool.items) != 2 {
|
||||||
|
t.Error(errors.New("2 servers expected"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
func TestPoolEcho(t *testing.T) {
|
||||||
|
pool.JobHandler = func(job *Job) error {
|
||||||
|
echo := string(job.Data)
|
||||||
|
if echo == "Hello world" {
|
||||||
|
t.Log(echo)
|
||||||
|
} else {
|
||||||
|
t.Errorf("Invalid echo data: %s", job.Data)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
pool.Echo([]byte("Hello world"))
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
/*
|
||||||
|
func TestPoolDo(t *testing.T) {
|
||||||
|
if addr, handle, err := pool.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else {
|
||||||
|
t.Log(handle)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
func TestPoolClose(t *testing.T) {
|
||||||
|
if err := pool.Close(); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user