From 0591572d8ed918db05a15ae8ebb59dba2934dd1f Mon Sep 17 00:00:00 2001 From: Randall McPherson Date: Fri, 16 May 2014 10:23:44 -0400 Subject: [PATCH 01/10] Make pool clients safe for concurrent access with mutex. --- client/pool.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/client/pool.go b/client/pool.go index d5db727..23efb74 100644 --- a/client/pool.go +++ b/client/pool.go @@ -18,7 +18,8 @@ var ( type poolClient struct { *Client - Rate int + Rate int + mutex sync.Mutex } type SelectionHandler func(map[string]*poolClient, string) string @@ -95,6 +96,8 @@ func (pool *Pool) Remove(addr string) { func (pool *Pool) Do(funcname string, data []byte, flag byte, h ResponseHandler) (addr, handle string, err error) { client := pool.selectServer() + client.Lock() + defer client.Unlock() handle, err = client.Do(funcname, data, flag, h) addr = client.addr return @@ -103,6 +106,8 @@ func (pool *Pool) Do(funcname string, data []byte, func (pool *Pool) DoBg(funcname string, data []byte, flag byte) (addr, handle string, err error) { client := pool.selectServer() + client.Lock() + defer client.Unlock() handle, err = client.DoBg(funcname, data, flag) addr = client.addr return @@ -112,6 +117,8 @@ func (pool *Pool) DoBg(funcname string, data []byte, // !!!Not fully tested.!!! func (pool *Pool) Status(addr, handle string) (status *Status, err error) { if client, ok := pool.clients[addr]; ok { + client.Lock() + defer client.Unlock() status, err = client.Status(handle) } else { err = ErrNotFound @@ -131,6 +138,8 @@ func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { return } } + client.Lock() + defer client.Unlock() echo, err = client.Echo(data) return } From 24e93b4a2cba45dfb0dcfb777cdfc8cd2f513af3 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Thu, 29 May 2014 15:35:27 +0100 Subject: [PATCH 02/10] More helpful panic if you call Work() before you call Ready() --- worker/worker.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/worker/worker.go b/worker/worker.go index 4d5fa7b..f4d926e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -22,6 +22,7 @@ type Worker struct { funcs jobFuncs in chan *inPack running bool + ready bool Id string ErrorHandler ErrorHandler @@ -174,12 +175,17 @@ func (worker *Worker) Ready() (err error) { for funcname, f := range worker.funcs { worker.addFunc(funcname, f.timeout) } + worker.ready = true return } // Main loop, block here // Most of time, this should be evaluated in goroutine. func (worker *Worker) Work() { + if ! worker.ready { + panic( "worker: Work() called before Ready()") + } + defer func() { for _, a := range worker.agents { a.Close() From 6688c29c37c0e91da1032f2e19ef4fbf81ad8b89 Mon Sep 17 00:00:00 2001 From: draxil Date: Sun, 1 Jun 2014 16:59:57 +0100 Subject: [PATCH 03/10] Enahanced Work() without Ready() behaviour: Now if you try to call Work() without calling Ready(), it will trigger an attempt to run Ready(), and will only panic if there is an error. --- worker/worker.go | 6 +++++- worker/worker_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/worker/worker.go b/worker/worker.go index f4d926e..a81f4ab 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -183,7 +183,11 @@ func (worker *Worker) Ready() (err error) { // Most of time, this should be evaluated in goroutine. func (worker *Worker) Work() { if ! worker.ready { - panic( "worker: Work() called before Ready()") + // didn't run Ready beforehand, so we'll have to do it: + err := worker.Ready() + if err != nil { + panic( err ) + } } defer func() { diff --git a/worker/worker_test.go b/worker/worker_test.go index 7fa8f0b..9960591 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -77,6 +77,33 @@ func TestWork(t *testing.T) { wg.Wait() } + func TestWorkerClose(t *testing.T) { worker.Close() } + +func TestWorkWithoutReady(t * testing.T){ + other_worker := New(Unlimited) + var wg sync.WaitGroup + + if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil { + t.Error(err) + } + if err := other_worker.AddFunc("foobar", foobar, 0); err != nil { + t.Error(err) + } + + other_worker.JobHandler = func( j Job ) error { + if( ! other_worker.ready ){ + t.Error("Worker not ready as expected"); + } + wg.Done() + return nil + } + + go other_worker.Work(); + + wg.Add(1) + worker.Echo([]byte("Hello")) + wg.Wait(); +} From 0a4489d1fea167b2558a5bb6fd81e8845af55569 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Wed, 4 Jun 2014 13:31:25 +0100 Subject: [PATCH 04/10] Fixed and improved Work() without Ready() test: * FIX: committed test froze * FIX: committed test had a race condition! * Added properly handled panic test * Timeouts so that these tests should fail now if something goes wrong instead of failing. --- worker/worker_test.go | 74 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 67 insertions(+), 7 deletions(-) diff --git a/worker/worker_test.go b/worker/worker_test.go index 9960591..06ce15f 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -3,6 +3,7 @@ package worker import ( "sync" "testing" + "time" ) var worker *Worker @@ -84,26 +85,85 @@ func TestWorkerClose(t *testing.T) { func TestWorkWithoutReady(t * testing.T){ other_worker := New(Unlimited) - var wg sync.WaitGroup if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil { t.Error(err) } - if err := other_worker.AddFunc("foobar", foobar, 0); err != nil { + if err := other_worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { t.Error(err) } + + timeout := make(chan bool, 1) + done := make( chan bool, 1) other_worker.JobHandler = func( j Job ) error { if( ! other_worker.ready ){ t.Error("Worker not ready as expected"); } - wg.Done() + done <-true return nil } + go func() { + time.Sleep(5 * time.Second) + timeout <- true + }() + + go func(){ + other_worker.Work(); + }() + + // With the all-in-one Work() we don't know if the + // worker is ready at this stage so we may have to wait a sec: + go func(){ + tries := 3 + for( tries > 0 ){ + if other_worker.ready { + other_worker.Echo([]byte("Hello")) + break + } + + // still waiting for it to be ready.. + time.Sleep(1 * time.Second) + tries-- + } + }() + + // determine if we've finished or timed out: + select{ + case <- timeout: + t.Error("Test timed out waiting for the worker") + case <- done: + } +} - go other_worker.Work(); +func TestWorkWithoutReadyWithPanic(t * testing.T){ + other_worker := New(Unlimited) + + timeout := make(chan bool, 1) + done := make( chan bool, 1) + + // Going to work with no worker setup. + // when Work (hopefully) calls Ready it will get an error which should cause it to panic() + go func(){ + defer func() { + if err := recover(); err != nil { + done <- true + return + } + t.Error("Work should raise a panic.") + done <- true + }() + other_worker.Work(); + }() + go func() { + time.Sleep(2 * time.Second) + timeout <- true + }() + + select{ + case <- timeout: + t.Error("Test timed out waiting for the worker") + case <- done: + } - wg.Add(1) - worker.Echo([]byte("Hello")) - wg.Wait(); } From 2f72b28d762e42cdd94adc3f02181bf808591e2e Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 05:35:05 +0100 Subject: [PATCH 05/10] FIX: waiting for worker fireup was making the test slow Dodging the race condition with a sleep was fine, but we slept for a whole second which was longer than this test takes to run! I've scaled the sleep down to 250 miliseconds and increased the tries. Even this is probably excessively long but now the test is fast agin. --- worker/worker_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/worker_test.go b/worker/worker_test.go index 06ce15f..7c47b14 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -115,7 +115,7 @@ func TestWorkWithoutReady(t * testing.T){ // With the all-in-one Work() we don't know if the // worker is ready at this stage so we may have to wait a sec: go func(){ - tries := 3 + tries := 5 for( tries > 0 ){ if other_worker.ready { other_worker.Echo([]byte("Hello")) @@ -123,7 +123,7 @@ func TestWorkWithoutReady(t * testing.T){ } // still waiting for it to be ready.. - time.Sleep(1 * time.Second) + time.Sleep(250 * time.Millisecond) tries-- } }() From 912db95c0a995dc9897f8e0a30627cca48ddcdee Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 05:35:05 +0100 Subject: [PATCH 06/10] FIX: waiting for worker fireup was making the test slow Dodging the race condition with a sleep was fine, but we slept for a whole second which was longer than this test takes to run! I've scaled the sleep down to 250 miliseconds and increased the tries. Even this is probably excessively long but now the test is fast agin. --- worker/worker_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/worker_test.go b/worker/worker_test.go index 06ce15f..7c47b14 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -115,7 +115,7 @@ func TestWorkWithoutReady(t * testing.T){ // With the all-in-one Work() we don't know if the // worker is ready at this stage so we may have to wait a sec: go func(){ - tries := 3 + tries := 5 for( tries > 0 ){ if other_worker.ready { other_worker.Echo([]byte("Hello")) @@ -123,7 +123,7 @@ func TestWorkWithoutReady(t * testing.T){ } // still waiting for it to be ready.. - time.Sleep(1 * time.Second) + time.Sleep(250 * time.Millisecond) tries-- } }() From e0614657e058098f3f33797ccc7c003c53af2d16 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 05:50:58 +0100 Subject: [PATCH 07/10] Added the original gearmand to travis for testing: 1. It's easier to test with (it installs in the regular path), we need to start and stop it for network error tests 2. Can't hurt to test with both implementations! --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index b20dfe6..3d97d6d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,3 +5,4 @@ go: before_install: - sudo apt-get remove -y gearman-job-server - sudo apt-get install -y gearman-job-server + - sudo apt-get install -y gearman-server From 7ddad76b8530380150e61a2c3302d68062c84754 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 06:14:28 +0100 Subject: [PATCH 08/10] Early stab at disconnect handling test --- worker/worker_disconnect_test.go | 132 +++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 worker/worker_disconnect_test.go diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go new file mode 100644 index 0000000..fcfe1d9 --- /dev/null +++ b/worker/worker_disconnect_test.go @@ -0,0 +1,132 @@ +package worker + +import ( + "log" + "net" + "os/exec" + "testing" + "time" +) + +const port = `3700` + +var gearman_ready chan bool +var kill_gearman chan bool +var bye chan bool + +func init() { + + if check_gearman_present() { + panic(`Something already listening on our testing port. Chickening out of testing with it!`) + } + gearman_ready = make( chan bool ) + kill_gearman = make( chan bool ) + // TODO: verify port is clear + go run_gearman() +} + +func run_gearman() { + gm_cmd := exec.Command(`gearmand`, `--port`, port) + start_err := gm_cmd.Start() + + if start_err != nil { + panic(`could not start gearman, aborting test :` + start_err.Error()) + } + + // Make sure we clear up our gearman: + defer func(){ + gm_cmd.Process.Kill() + }() + + for tries := 10; tries > 0; tries-- { + if check_gearman_present() { + break + } + time.Sleep(250 * time.Millisecond) + } + + if !check_gearman_present() { + panic(`Unable to start gearman aborting test`) + } + gearman_ready <- true + + <- kill_gearman +} + +func check_gearman_present() bool { + con, err := net.Dial(`tcp`, `127.0.0.1:`+port) + if err != nil { + log.Println("gearman not ready " + err.Error()) + return false + } + log.Println("gearman ready") + con.Close() + return true +} + +func TestBasicDisconnect(t *testing.T) { + <- gearman_ready + + worker := New(Unlimited) + + if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil { + t.Error(err) + } + if err := worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { + t.Error(err) + } + + timeout := make(chan bool, 1) + done := make( chan bool, 1) + + worker.JobHandler = func( j Job ) error { + if( ! worker.ready ){ + t.Error("Worker not ready as expected"); + } + done <-true + return nil + } + handled_errors := false + + c_error := make( chan bool) + worker.ErrorHandler = func( e error ){ + handled_errors = true + c_error <- true + } + + go func() { + time.Sleep(5 * time.Second) + timeout <- true + }() + + go func(){ + worker.Work(); + }() + + // With the all-in-one Work() we don't know if the + // worker is ready at this stage so we may have to wait a sec: + go func(){ + tries := 3 + for( tries > 0 ){ + if worker.ready { + worker.Echo([]byte("Hello")) + kill_gearman <- true + log.Println("ok...") + worker.Echo([]byte("Hello")) + break + } + + // still waiting for it to be ready.. + time.Sleep(250 * time.Millisecond) + tries-- + } + }() + + + select{ + case <- c_error: + log.Println("eoo") + case <- timeout: + t.Error( "Test timed out waiting for the error handler" ) + } +} From 827cf3fa3e68aae370d938d653b679673a64d693 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 06:36:24 +0100 Subject: [PATCH 09/10] Removed old gearman from travis gearman-go and original gearmand don't play well, possibly an issue? --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 3d97d6d..b20dfe6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,4 +5,3 @@ go: before_install: - sudo apt-get remove -y gearman-job-server - sudo apt-get install -y gearman-job-server - - sudo apt-get install -y gearman-server From 99bcf11768e767de7d79b177e1e6fba42317e4ce Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 07:22:40 +0100 Subject: [PATCH 10/10] FIX: The error handler is not called on disconnect Using the disconnect test showed the agent was silently closing on disconnect --- worker/agent.go | 8 +-- worker/worker_disconnect_test.go | 104 +++++++++++++++++++++---------- 2 files changed, 75 insertions(+), 37 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 2236173..194545b 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -53,13 +53,13 @@ func (a *agent) work() { for { if data, err = a.read(bufferSize); err != nil { if opErr, ok := err.(*net.OpError); ok { - if opErr.Timeout() { - a.worker.err(err) - } if opErr.Temporary() { continue + }else{ + a.worker.err(err) + break } - break + } a.worker.err(err) // If it is unexpected error and the connection wasn't diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go index fcfe1d9..e87e228 100644 --- a/worker/worker_disconnect_test.go +++ b/worker/worker_disconnect_test.go @@ -1,6 +1,7 @@ package worker import ( + "../client" "log" "net" "os/exec" @@ -26,15 +27,16 @@ func init() { } func run_gearman() { - gm_cmd := exec.Command(`gearmand`, `--port`, port) + gm_cmd := exec.Command(`/usr/sbin/gearmand`, `--port`, port) start_err := gm_cmd.Start() if start_err != nil { panic(`could not start gearman, aborting test :` + start_err.Error()) } - + // Make sure we clear up our gearman: - defer func(){ + defer func() { + log.Println("killing gearmand") gm_cmd.Process.Kill() }() @@ -64,20 +66,40 @@ func check_gearman_present() bool { return true } +func check_gearman_is_dead() bool { + + for tries := 10; tries > 0; tries-- { + if !check_gearman_present() { + return true + } + time.Sleep(250 * time.Millisecond) + } + return false +} + +/* + Checks for a disconnect whilst not working +*/ func TestBasicDisconnect(t *testing.T) { <- gearman_ready worker := New(Unlimited) + timeout := make(chan bool, 1) + done := make( chan bool, 1) if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil { t.Error(err) } - if err := worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { + work_done := false; + if err := worker.AddFunc("gearman-go-workertest", + func(j Job)(b []byte, e error){ + work_done = true; + done <- true + return}, 0); + err != nil { t.Error(err) } - timeout := make(chan bool, 1) - done := make( chan bool, 1) worker.JobHandler = func( j Job ) error { if( ! worker.ready ){ @@ -90,6 +112,7 @@ func TestBasicDisconnect(t *testing.T) { c_error := make( chan bool) worker.ErrorHandler = func( e error ){ + log.Println( e ) handled_errors = true c_error <- true } @@ -99,34 +122,49 @@ func TestBasicDisconnect(t *testing.T) { timeout <- true }() - go func(){ - worker.Work(); - }() + err := worker.Ready() - // With the all-in-one Work() we don't know if the - // worker is ready at this stage so we may have to wait a sec: - go func(){ - tries := 3 - for( tries > 0 ){ - if worker.ready { - worker.Echo([]byte("Hello")) - kill_gearman <- true - log.Println("ok...") - worker.Echo([]byte("Hello")) - break - } - - // still waiting for it to be ready.. - time.Sleep(250 * time.Millisecond) - tries-- - } - }() - + if err != nil { + t.Error(err) + } + + go worker.Work() + + kill_gearman <- true + + check_gearman_is_dead() + go run_gearman() - select{ - case <- c_error: - log.Println("eoo") - case <- timeout: - t.Error( "Test timed out waiting for the error handler" ) + select { + case <-gearman_ready: + case <-timeout: } + + send_client_request() + + select { + case <- done: + t.Error("Client request handled (somehow), did we magically reconnect?") + case <-timeout: + t.Error("Test timed out waiting for the error handler") + case <-c_error: + // error was handled! + } + + kill_gearman <- true + } + +func send_client_request(){ + log.Println("sending client request"); + c, err := client.New( Network, "127.0.0.1:" + port ) + if err == nil { + _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) + if err != nil { + log.Println( "error sending client request " + err.Error() ) + } + + }else{ + log.Println( "error with client " + err.Error() ) + } +} \ No newline at end of file