diff --git a/client/pool.go b/client/pool.go index d5db727..23efb74 100644 --- a/client/pool.go +++ b/client/pool.go @@ -18,7 +18,8 @@ var ( type poolClient struct { *Client - Rate int + Rate int + mutex sync.Mutex } type SelectionHandler func(map[string]*poolClient, string) string @@ -95,6 +96,8 @@ func (pool *Pool) Remove(addr string) { func (pool *Pool) Do(funcname string, data []byte, flag byte, h ResponseHandler) (addr, handle string, err error) { client := pool.selectServer() + client.Lock() + defer client.Unlock() handle, err = client.Do(funcname, data, flag, h) addr = client.addr return @@ -103,6 +106,8 @@ func (pool *Pool) Do(funcname string, data []byte, func (pool *Pool) DoBg(funcname string, data []byte, flag byte) (addr, handle string, err error) { client := pool.selectServer() + client.Lock() + defer client.Unlock() handle, err = client.DoBg(funcname, data, flag) addr = client.addr return @@ -112,6 +117,8 @@ func (pool *Pool) DoBg(funcname string, data []byte, // !!!Not fully tested.!!! func (pool *Pool) Status(addr, handle string) (status *Status, err error) { if client, ok := pool.clients[addr]; ok { + client.Lock() + defer client.Unlock() status, err = client.Status(handle) } else { err = ErrNotFound @@ -131,6 +138,8 @@ func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { return } } + client.Lock() + defer client.Unlock() echo, err = client.Echo(data) return } diff --git a/worker/agent.go b/worker/agent.go index 2236173..194545b 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -53,13 +53,13 @@ func (a *agent) work() { for { if data, err = a.read(bufferSize); err != nil { if opErr, ok := err.(*net.OpError); ok { - if opErr.Timeout() { - a.worker.err(err) - } if opErr.Temporary() { continue + }else{ + a.worker.err(err) + break } - break + } a.worker.err(err) // If it is unexpected error and the connection wasn't diff --git a/worker/worker.go b/worker/worker.go index 4d5fa7b..a81f4ab 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -22,6 +22,7 @@ type Worker struct { funcs jobFuncs in chan *inPack running bool + ready bool Id string ErrorHandler ErrorHandler @@ -174,12 +175,21 @@ func (worker *Worker) Ready() (err error) { for funcname, f := range worker.funcs { worker.addFunc(funcname, f.timeout) } + worker.ready = true return } // Main loop, block here // Most of time, this should be evaluated in goroutine. func (worker *Worker) Work() { + if ! worker.ready { + // didn't run Ready beforehand, so we'll have to do it: + err := worker.Ready() + if err != nil { + panic( err ) + } + } + defer func() { for _, a := range worker.agents { a.Close() diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go new file mode 100644 index 0000000..e87e228 --- /dev/null +++ b/worker/worker_disconnect_test.go @@ -0,0 +1,170 @@ +package worker + +import ( + "../client" + "log" + "net" + "os/exec" + "testing" + "time" +) + +const port = `3700` + +var gearman_ready chan bool +var kill_gearman chan bool +var bye chan bool + +func init() { + + if check_gearman_present() { + panic(`Something already listening on our testing port. Chickening out of testing with it!`) + } + gearman_ready = make( chan bool ) + kill_gearman = make( chan bool ) + // TODO: verify port is clear + go run_gearman() +} + +func run_gearman() { + gm_cmd := exec.Command(`/usr/sbin/gearmand`, `--port`, port) + start_err := gm_cmd.Start() + + if start_err != nil { + panic(`could not start gearman, aborting test :` + start_err.Error()) + } + + // Make sure we clear up our gearman: + defer func() { + log.Println("killing gearmand") + gm_cmd.Process.Kill() + }() + + for tries := 10; tries > 0; tries-- { + if check_gearman_present() { + break + } + time.Sleep(250 * time.Millisecond) + } + + if !check_gearman_present() { + panic(`Unable to start gearman aborting test`) + } + gearman_ready <- true + + <- kill_gearman +} + +func check_gearman_present() bool { + con, err := net.Dial(`tcp`, `127.0.0.1:`+port) + if err != nil { + log.Println("gearman not ready " + err.Error()) + return false + } + log.Println("gearman ready") + con.Close() + return true +} + +func check_gearman_is_dead() bool { + + for tries := 10; tries > 0; tries-- { + if !check_gearman_present() { + return true + } + time.Sleep(250 * time.Millisecond) + } + return false +} + +/* + Checks for a disconnect whilst not working +*/ +func TestBasicDisconnect(t *testing.T) { + <- gearman_ready + + worker := New(Unlimited) + timeout := make(chan bool, 1) + done := make( chan bool, 1) + + if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil { + t.Error(err) + } + work_done := false; + if err := worker.AddFunc("gearman-go-workertest", + func(j Job)(b []byte, e error){ + work_done = true; + done <- true + return}, 0); + err != nil { + t.Error(err) + } + + + worker.JobHandler = func( j Job ) error { + if( ! worker.ready ){ + t.Error("Worker not ready as expected"); + } + done <-true + return nil + } + handled_errors := false + + c_error := make( chan bool) + worker.ErrorHandler = func( e error ){ + log.Println( e ) + handled_errors = true + c_error <- true + } + + go func() { + time.Sleep(5 * time.Second) + timeout <- true + }() + + err := worker.Ready() + + if err != nil { + t.Error(err) + } + + go worker.Work() + + kill_gearman <- true + + check_gearman_is_dead() + go run_gearman() + + select { + case <-gearman_ready: + case <-timeout: + } + + send_client_request() + + select { + case <- done: + t.Error("Client request handled (somehow), did we magically reconnect?") + case <-timeout: + t.Error("Test timed out waiting for the error handler") + case <-c_error: + // error was handled! + } + + kill_gearman <- true + +} + +func send_client_request(){ + log.Println("sending client request"); + c, err := client.New( Network, "127.0.0.1:" + port ) + if err == nil { + _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) + if err != nil { + log.Println( "error sending client request " + err.Error() ) + } + + }else{ + log.Println( "error with client " + err.Error() ) + } +} \ No newline at end of file diff --git a/worker/worker_test.go b/worker/worker_test.go index 7fa8f0b..7c47b14 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -3,6 +3,7 @@ package worker import ( "sync" "testing" + "time" ) var worker *Worker @@ -77,6 +78,92 @@ func TestWork(t *testing.T) { wg.Wait() } + func TestWorkerClose(t *testing.T) { worker.Close() } + +func TestWorkWithoutReady(t * testing.T){ + other_worker := New(Unlimited) + + if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil { + t.Error(err) + } + if err := other_worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { + t.Error(err) + } + + timeout := make(chan bool, 1) + done := make( chan bool, 1) + + other_worker.JobHandler = func( j Job ) error { + if( ! other_worker.ready ){ + t.Error("Worker not ready as expected"); + } + done <-true + return nil + } + go func() { + time.Sleep(5 * time.Second) + timeout <- true + }() + + go func(){ + other_worker.Work(); + }() + + // With the all-in-one Work() we don't know if the + // worker is ready at this stage so we may have to wait a sec: + go func(){ + tries := 5 + for( tries > 0 ){ + if other_worker.ready { + other_worker.Echo([]byte("Hello")) + break + } + + // still waiting for it to be ready.. + time.Sleep(250 * time.Millisecond) + tries-- + } + }() + + // determine if we've finished or timed out: + select{ + case <- timeout: + t.Error("Test timed out waiting for the worker") + case <- done: + } +} + +func TestWorkWithoutReadyWithPanic(t * testing.T){ + other_worker := New(Unlimited) + + timeout := make(chan bool, 1) + done := make( chan bool, 1) + + // Going to work with no worker setup. + // when Work (hopefully) calls Ready it will get an error which should cause it to panic() + go func(){ + defer func() { + if err := recover(); err != nil { + done <- true + return + } + t.Error("Work should raise a panic.") + done <- true + }() + other_worker.Work(); + }() + go func() { + time.Sleep(2 * time.Second) + timeout <- true + }() + + select{ + case <- timeout: + t.Error("Test timed out waiting for the worker") + case <- done: + } + +}