pooling finished

This commit is contained in:
mikespook 2013-01-25 15:16:11 +08:00
parent af05460727
commit 962846c2f7
2 changed files with 114 additions and 20 deletions

View File

@ -7,6 +7,7 @@ package client
import ( import (
"sync" "sync"
"errors"
"math/rand" "math/rand"
"github.com/mikespook/gearman-go/common" "github.com/mikespook/gearman-go/common"
) )
@ -15,6 +16,10 @@ const (
PoolSize = 10 PoolSize = 10
) )
var (
ErrNotFound = errors.New("Server Not Found")
)
type poolClient struct { type poolClient struct {
*Client *Client
Rate int Rate int
@ -52,7 +57,10 @@ last string) (addr string) {
type Pool struct { type Pool struct {
SelectionHandler SelectionHandler SelectionHandler SelectionHandler
ErrHandler common.ErrorHandler ErrHandler common.ErrorHandler
clients map[string]*poolClient clients map[string]*poolClient
last string
mutex sync.Mutex mutex sync.Mutex
} }
@ -90,25 +98,45 @@ func (pool *Pool) Remove(addr string) {
} }
func (pool *Pool) Do(funcname string, data []byte, func (pool *Pool) Do(funcname string, data []byte,
flag byte, h JobHandler) (handle string, err error) { flag byte, h JobHandler) (addr, handle string) {
client := pool.selectServer()
handle = client.Do(funcname, data, flag, h)
addr = client.addr
return return
} }
func (pool *Pool) DoBg(funcname string, data []byte, func (pool *Pool) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) { flag byte) (addr, handle string) {
client := pool.selectServer()
handle = client.DoBg(funcname, data, flag)
addr = client.addr
return return
} }
// Get job status from job server. // Get job status from job server.
// !!!Not fully tested.!!! // !!!Not fully tested.!!!
func (pool *Pool) Status(handle string) (status *Status) { func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
if client, ok := pool.clients[addr]; ok {
status = client.Status(handle)
} else {
err = ErrNotFound
}
return return
} }
// Send a something out, get the samething back. // Send a something out, get the samething back.
func (pool *Pool) Echo(data []byte) (r []byte) { func (pool *Pool) Echo(addr string, data []byte) (r []byte, err error) {
var client *poolClient
if addr == "" {
client = pool.selectServer()
} else {
var ok bool
if client, ok = pool.clients[addr]; !ok {
err = ErrNotFound
return
}
}
r = client.Echo(data)
return return
} }
@ -120,3 +148,15 @@ func (pool *Pool) Close() (err map[string]error) {
} }
return return
} }
func (pool *Pool) selectServer() (client *poolClient) {
for client == nil {
addr := pool.SelectionHandler(pool.clients, pool.last)
var ok bool
if client, ok = pool.clients[addr]; ok {
pool.last = addr
break
}
}
return
}

View File

@ -20,30 +20,84 @@ func TestPoolAdd(t *testing.T) {
t.Errorf("2 servers expected, %d got.", len(pool.clients)) t.Errorf("2 servers expected, %d got.", len(pool.clients))
} }
} }
/*
func TestPoolEcho(t *testing.T) { func TestPoolEcho(t *testing.T) {
pool.JobHandler = func(job *Job) error { echo, err := pool.Echo("", []byte("Hello world"))
echo := string(job.Data) if err != nil {
if echo == "Hello world" { t.Error(err)
t.Log(echo) return
} else { }
t.Errorf("Invalid echo data: %s", job.Data) if string(echo) != "Hello world" {
} t.Errorf("Invalid echo data: %s", echo)
return nil return
}
_, err = pool.Echo("not exists", []byte("Hello world"))
if err != ErrNotFound {
t.Errorf("ErrNotFound expected, got %s", err)
}
}
func TestPoolDoBg(t *testing.T) {
if addr, handle := pool.DoBg("ToUpper", []byte("abcdef"),
JOB_LOW); handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
} }
pool.Echo([]byte("Hello world"))
} }
func TestPoolDo(t *testing.T) { func TestPoolDo(t *testing.T) {
if addr, handle, err := pool.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { jobHandler := func(job *Job) {
t.Error(err) str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
if addr, handle := pool.Do("ToUpper", []byte("abcdef"),
JOB_LOW, jobHandler); handle == "" {
t.Error("Handle is empty.")
} else { } else {
t.Log(handle) t.Log(addr, handle)
}
}
func TestPoolStatus(t *testing.T) {
s1, err := pool.Status("127.0.0.1:4730", "handle not exists")
if err != nil {
t.Error(err)
return
}
if s1.Known {
t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
}
if s1.Running {
t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
}
addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil);
s2, err := pool.Status(addr, handle)
if err != nil {
t.Error(err)
return
}
if !s2.Known {
t.Errorf("The job (%s) should be known.", s2.Handle)
}
if s2.Running {
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
}
_, err = pool.Status("not exists", "not exists")
if err != ErrNotFound {
t.Error(err)
} }
} }
*/
func TestPoolClose(t *testing.T) { func TestPoolClose(t *testing.T) {
return return
if err := pool.Close(); err != nil { if err := pool.Close(); err != nil {