From 2f72b28d762e42cdd94adc3f02181bf808591e2e Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 05:35:05 +0100 Subject: [PATCH 1/6] FIX: waiting for worker fireup was making the test slow Dodging the race condition with a sleep was fine, but we slept for a whole second which was longer than this test takes to run! I've scaled the sleep down to 250 miliseconds and increased the tries. Even this is probably excessively long but now the test is fast agin. --- worker/worker_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/worker_test.go b/worker/worker_test.go index 06ce15f..7c47b14 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -115,7 +115,7 @@ func TestWorkWithoutReady(t * testing.T){ // With the all-in-one Work() we don't know if the // worker is ready at this stage so we may have to wait a sec: go func(){ - tries := 3 + tries := 5 for( tries > 0 ){ if other_worker.ready { other_worker.Echo([]byte("Hello")) @@ -123,7 +123,7 @@ func TestWorkWithoutReady(t * testing.T){ } // still waiting for it to be ready.. - time.Sleep(1 * time.Second) + time.Sleep(250 * time.Millisecond) tries-- } }() From 912db95c0a995dc9897f8e0a30627cca48ddcdee Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 05:35:05 +0100 Subject: [PATCH 2/6] FIX: waiting for worker fireup was making the test slow Dodging the race condition with a sleep was fine, but we slept for a whole second which was longer than this test takes to run! I've scaled the sleep down to 250 miliseconds and increased the tries. Even this is probably excessively long but now the test is fast agin. --- worker/worker_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/worker_test.go b/worker/worker_test.go index 06ce15f..7c47b14 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -115,7 +115,7 @@ func TestWorkWithoutReady(t * testing.T){ // With the all-in-one Work() we don't know if the // worker is ready at this stage so we may have to wait a sec: go func(){ - tries := 3 + tries := 5 for( tries > 0 ){ if other_worker.ready { other_worker.Echo([]byte("Hello")) @@ -123,7 +123,7 @@ func TestWorkWithoutReady(t * testing.T){ } // still waiting for it to be ready.. - time.Sleep(1 * time.Second) + time.Sleep(250 * time.Millisecond) tries-- } }() From e0614657e058098f3f33797ccc7c003c53af2d16 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 05:50:58 +0100 Subject: [PATCH 3/6] Added the original gearmand to travis for testing: 1. It's easier to test with (it installs in the regular path), we need to start and stop it for network error tests 2. Can't hurt to test with both implementations! --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index b20dfe6..3d97d6d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,3 +5,4 @@ go: before_install: - sudo apt-get remove -y gearman-job-server - sudo apt-get install -y gearman-job-server + - sudo apt-get install -y gearman-server From 7ddad76b8530380150e61a2c3302d68062c84754 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 06:14:28 +0100 Subject: [PATCH 4/6] Early stab at disconnect handling test --- worker/worker_disconnect_test.go | 132 +++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 worker/worker_disconnect_test.go diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go new file mode 100644 index 0000000..fcfe1d9 --- /dev/null +++ b/worker/worker_disconnect_test.go @@ -0,0 +1,132 @@ +package worker + +import ( + "log" + "net" + "os/exec" + "testing" + "time" +) + +const port = `3700` + +var gearman_ready chan bool +var kill_gearman chan bool +var bye chan bool + +func init() { + + if check_gearman_present() { + panic(`Something already listening on our testing port. Chickening out of testing with it!`) + } + gearman_ready = make( chan bool ) + kill_gearman = make( chan bool ) + // TODO: verify port is clear + go run_gearman() +} + +func run_gearman() { + gm_cmd := exec.Command(`gearmand`, `--port`, port) + start_err := gm_cmd.Start() + + if start_err != nil { + panic(`could not start gearman, aborting test :` + start_err.Error()) + } + + // Make sure we clear up our gearman: + defer func(){ + gm_cmd.Process.Kill() + }() + + for tries := 10; tries > 0; tries-- { + if check_gearman_present() { + break + } + time.Sleep(250 * time.Millisecond) + } + + if !check_gearman_present() { + panic(`Unable to start gearman aborting test`) + } + gearman_ready <- true + + <- kill_gearman +} + +func check_gearman_present() bool { + con, err := net.Dial(`tcp`, `127.0.0.1:`+port) + if err != nil { + log.Println("gearman not ready " + err.Error()) + return false + } + log.Println("gearman ready") + con.Close() + return true +} + +func TestBasicDisconnect(t *testing.T) { + <- gearman_ready + + worker := New(Unlimited) + + if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil { + t.Error(err) + } + if err := worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { + t.Error(err) + } + + timeout := make(chan bool, 1) + done := make( chan bool, 1) + + worker.JobHandler = func( j Job ) error { + if( ! worker.ready ){ + t.Error("Worker not ready as expected"); + } + done <-true + return nil + } + handled_errors := false + + c_error := make( chan bool) + worker.ErrorHandler = func( e error ){ + handled_errors = true + c_error <- true + } + + go func() { + time.Sleep(5 * time.Second) + timeout <- true + }() + + go func(){ + worker.Work(); + }() + + // With the all-in-one Work() we don't know if the + // worker is ready at this stage so we may have to wait a sec: + go func(){ + tries := 3 + for( tries > 0 ){ + if worker.ready { + worker.Echo([]byte("Hello")) + kill_gearman <- true + log.Println("ok...") + worker.Echo([]byte("Hello")) + break + } + + // still waiting for it to be ready.. + time.Sleep(250 * time.Millisecond) + tries-- + } + }() + + + select{ + case <- c_error: + log.Println("eoo") + case <- timeout: + t.Error( "Test timed out waiting for the error handler" ) + } +} From 827cf3fa3e68aae370d938d653b679673a64d693 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 06:36:24 +0100 Subject: [PATCH 5/6] Removed old gearman from travis gearman-go and original gearmand don't play well, possibly an issue? --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 3d97d6d..b20dfe6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,4 +5,3 @@ go: before_install: - sudo apt-get remove -y gearman-job-server - sudo apt-get install -y gearman-job-server - - sudo apt-get install -y gearman-server From 99bcf11768e767de7d79b177e1e6fba42317e4ce Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 07:22:40 +0100 Subject: [PATCH 6/6] FIX: The error handler is not called on disconnect Using the disconnect test showed the agent was silently closing on disconnect --- worker/agent.go | 8 +-- worker/worker_disconnect_test.go | 106 +++++++++++++++++++++---------- 2 files changed, 76 insertions(+), 38 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 2236173..194545b 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -53,13 +53,13 @@ func (a *agent) work() { for { if data, err = a.read(bufferSize); err != nil { if opErr, ok := err.(*net.OpError); ok { - if opErr.Timeout() { - a.worker.err(err) - } if opErr.Temporary() { continue + }else{ + a.worker.err(err) + break } - break + } a.worker.err(err) // If it is unexpected error and the connection wasn't diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go index fcfe1d9..e87e228 100644 --- a/worker/worker_disconnect_test.go +++ b/worker/worker_disconnect_test.go @@ -1,6 +1,7 @@ package worker import ( + "../client" "log" "net" "os/exec" @@ -26,15 +27,16 @@ func init() { } func run_gearman() { - gm_cmd := exec.Command(`gearmand`, `--port`, port) + gm_cmd := exec.Command(`/usr/sbin/gearmand`, `--port`, port) start_err := gm_cmd.Start() if start_err != nil { panic(`could not start gearman, aborting test :` + start_err.Error()) } - + // Make sure we clear up our gearman: - defer func(){ + defer func() { + log.Println("killing gearmand") gm_cmd.Process.Kill() }() @@ -64,20 +66,40 @@ func check_gearman_present() bool { return true } +func check_gearman_is_dead() bool { + + for tries := 10; tries > 0; tries-- { + if !check_gearman_present() { + return true + } + time.Sleep(250 * time.Millisecond) + } + return false +} + +/* + Checks for a disconnect whilst not working +*/ func TestBasicDisconnect(t *testing.T) { <- gearman_ready worker := New(Unlimited) + timeout := make(chan bool, 1) + done := make( chan bool, 1) if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil { t.Error(err) } - if err := worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { + work_done := false; + if err := worker.AddFunc("gearman-go-workertest", + func(j Job)(b []byte, e error){ + work_done = true; + done <- true + return}, 0); + err != nil { t.Error(err) } - timeout := make(chan bool, 1) - done := make( chan bool, 1) worker.JobHandler = func( j Job ) error { if( ! worker.ready ){ @@ -90,6 +112,7 @@ func TestBasicDisconnect(t *testing.T) { c_error := make( chan bool) worker.ErrorHandler = func( e error ){ + log.Println( e ) handled_errors = true c_error <- true } @@ -99,34 +122,49 @@ func TestBasicDisconnect(t *testing.T) { timeout <- true }() - go func(){ - worker.Work(); - }() + err := worker.Ready() - // With the all-in-one Work() we don't know if the - // worker is ready at this stage so we may have to wait a sec: - go func(){ - tries := 3 - for( tries > 0 ){ - if worker.ready { - worker.Echo([]byte("Hello")) - kill_gearman <- true - log.Println("ok...") - worker.Echo([]byte("Hello")) - break - } - - // still waiting for it to be ready.. - time.Sleep(250 * time.Millisecond) - tries-- - } - }() - - - select{ - case <- c_error: - log.Println("eoo") - case <- timeout: - t.Error( "Test timed out waiting for the error handler" ) + if err != nil { + t.Error(err) } + + go worker.Work() + + kill_gearman <- true + + check_gearman_is_dead() + go run_gearman() + + select { + case <-gearman_ready: + case <-timeout: + } + + send_client_request() + + select { + case <- done: + t.Error("Client request handled (somehow), did we magically reconnect?") + case <-timeout: + t.Error("Test timed out waiting for the error handler") + case <-c_error: + // error was handled! + } + + kill_gearman <- true + } + +func send_client_request(){ + log.Println("sending client request"); + c, err := client.New( Network, "127.0.0.1:" + port ) + if err == nil { + _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) + if err != nil { + log.Println( "error sending client request " + err.Error() ) + } + + }else{ + log.Println( "error with client " + err.Error() ) + } +} \ No newline at end of file