From 962846c2f7aa3bb4bd6e5a8a9ccebb048453f0dd Mon Sep 17 00:00:00 2001 From: mikespook Date: Fri, 25 Jan 2013 15:16:11 +0800 Subject: [PATCH] pooling finished --- client/pool.go | 52 ++++++++++++++++++++++++---- client/pool_test.go | 82 +++++++++++++++++++++++++++++++++++++-------- 2 files changed, 114 insertions(+), 20 deletions(-) diff --git a/client/pool.go b/client/pool.go index 9c5e643..03831b0 100644 --- a/client/pool.go +++ b/client/pool.go @@ -7,6 +7,7 @@ package client import ( "sync" + "errors" "math/rand" "github.com/mikespook/gearman-go/common" ) @@ -15,6 +16,10 @@ const ( PoolSize = 10 ) +var ( + ErrNotFound = errors.New("Server Not Found") +) + type poolClient struct { *Client Rate int @@ -52,7 +57,10 @@ last string) (addr string) { type Pool struct { SelectionHandler SelectionHandler ErrHandler common.ErrorHandler + clients map[string]*poolClient + last string + mutex sync.Mutex } @@ -90,25 +98,45 @@ func (pool *Pool) Remove(addr string) { } func (pool *Pool) Do(funcname string, data []byte, -flag byte, h JobHandler) (handle string, err error) { +flag byte, h JobHandler) (addr, handle string) { + client := pool.selectServer() + handle = client.Do(funcname, data, flag, h) + addr = client.addr return } func (pool *Pool) DoBg(funcname string, data []byte, -flag byte) (handle string, err error) { +flag byte) (addr, handle string) { + client := pool.selectServer() + handle = client.DoBg(funcname, data, flag) + addr = client.addr return } - - // Get job status from job server. // !!!Not fully tested.!!! -func (pool *Pool) Status(handle string) (status *Status) { +func (pool *Pool) Status(addr, handle string) (status *Status, err error) { + if client, ok := pool.clients[addr]; ok { + status = client.Status(handle) + } else { + err = ErrNotFound + } return } // Send a something out, get the samething back. -func (pool *Pool) Echo(data []byte) (r []byte) { +func (pool *Pool) Echo(addr string, data []byte) (r []byte, err error) { + var client *poolClient + if addr == "" { + client = pool.selectServer() + } else { + var ok bool + if client, ok = pool.clients[addr]; !ok { + err = ErrNotFound + return + } + } + r = client.Echo(data) return } @@ -120,3 +148,15 @@ func (pool *Pool) Close() (err map[string]error) { } return } + +func (pool *Pool) selectServer() (client *poolClient) { + for client == nil { + addr := pool.SelectionHandler(pool.clients, pool.last) + var ok bool + if client, ok = pool.clients[addr]; ok { + pool.last = addr + break + } + } + return +} diff --git a/client/pool_test.go b/client/pool_test.go index 9c86bc8..bf21f0c 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -20,30 +20,84 @@ func TestPoolAdd(t *testing.T) { t.Errorf("2 servers expected, %d got.", len(pool.clients)) } } -/* func TestPoolEcho(t *testing.T) { - pool.JobHandler = func(job *Job) error { - echo := string(job.Data) - if echo == "Hello world" { - t.Log(echo) - } else { - t.Errorf("Invalid echo data: %s", job.Data) - } - return nil + echo, err := pool.Echo("", []byte("Hello world")) + if err != nil { + t.Error(err) + return + } + if string(echo) != "Hello world" { + t.Errorf("Invalid echo data: %s", echo) + return + } + + _, err = pool.Echo("not exists", []byte("Hello world")) + if err != ErrNotFound { + t.Errorf("ErrNotFound expected, got %s", err) + } +} + +func TestPoolDoBg(t *testing.T) { + if addr, handle := pool.DoBg("ToUpper", []byte("abcdef"), + JOB_LOW); handle == "" { + t.Error("Handle is empty.") + } else { + t.Log(addr, handle) } - pool.Echo([]byte("Hello world")) } func TestPoolDo(t *testing.T) { - if addr, handle, err := pool.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { - t.Error(err) + jobHandler := func(job *Job) { + str := string(job.Data) + if str == "ABCDEF" { + t.Log(str) + } else { + t.Errorf("Invalid data: %s", job.Data) + } + return + } + if addr, handle := pool.Do("ToUpper", []byte("abcdef"), + JOB_LOW, jobHandler); handle == "" { + t.Error("Handle is empty.") } else { - t.Log(handle) + t.Log(addr, handle) + } +} + +func TestPoolStatus(t *testing.T) { + s1, err := pool.Status("127.0.0.1:4730", "handle not exists") + if err != nil { + t.Error(err) + return + } + if s1.Known { + t.Errorf("The job (%s) shouldn't be known.", s1.Handle) + } + if s1.Running { + t.Errorf("The job (%s) shouldn't be running.", s1.Handle) + } + + addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil); + s2, err := pool.Status(addr, handle) + if err != nil { + t.Error(err) + return + } + + if !s2.Known { + t.Errorf("The job (%s) should be known.", s2.Handle) + } + if s2.Running { + t.Errorf("The job (%s) shouldn't be running.", s2.Handle) + } + + _, err = pool.Status("not exists", "not exists") + if err != ErrNotFound { + t.Error(err) } } -*/ func TestPoolClose(t *testing.T) { return if err := pool.Close(); err != nil {