diff --git a/worker/agent.go b/worker/agent.go index 2236173..194545b 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -53,13 +53,13 @@ func (a *agent) work() { for { if data, err = a.read(bufferSize); err != nil { if opErr, ok := err.(*net.OpError); ok { - if opErr.Timeout() { - a.worker.err(err) - } if opErr.Temporary() { continue + }else{ + a.worker.err(err) + break } - break + } a.worker.err(err) // If it is unexpected error and the connection wasn't diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go index fcfe1d9..e87e228 100644 --- a/worker/worker_disconnect_test.go +++ b/worker/worker_disconnect_test.go @@ -1,6 +1,7 @@ package worker import ( + "../client" "log" "net" "os/exec" @@ -26,15 +27,16 @@ func init() { } func run_gearman() { - gm_cmd := exec.Command(`gearmand`, `--port`, port) + gm_cmd := exec.Command(`/usr/sbin/gearmand`, `--port`, port) start_err := gm_cmd.Start() if start_err != nil { panic(`could not start gearman, aborting test :` + start_err.Error()) } - + // Make sure we clear up our gearman: - defer func(){ + defer func() { + log.Println("killing gearmand") gm_cmd.Process.Kill() }() @@ -64,20 +66,40 @@ func check_gearman_present() bool { return true } +func check_gearman_is_dead() bool { + + for tries := 10; tries > 0; tries-- { + if !check_gearman_present() { + return true + } + time.Sleep(250 * time.Millisecond) + } + return false +} + +/* + Checks for a disconnect whilst not working +*/ func TestBasicDisconnect(t *testing.T) { <- gearman_ready worker := New(Unlimited) + timeout := make(chan bool, 1) + done := make( chan bool, 1) if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil { t.Error(err) } - if err := worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { + work_done := false; + if err := worker.AddFunc("gearman-go-workertest", + func(j Job)(b []byte, e error){ + work_done = true; + done <- true + return}, 0); + err != nil { t.Error(err) } - timeout := make(chan bool, 1) - done := make( chan bool, 1) worker.JobHandler = func( j Job ) error { if( ! worker.ready ){ @@ -90,6 +112,7 @@ func TestBasicDisconnect(t *testing.T) { c_error := make( chan bool) worker.ErrorHandler = func( e error ){ + log.Println( e ) handled_errors = true c_error <- true } @@ -99,34 +122,49 @@ func TestBasicDisconnect(t *testing.T) { timeout <- true }() - go func(){ - worker.Work(); - }() + err := worker.Ready() - // With the all-in-one Work() we don't know if the - // worker is ready at this stage so we may have to wait a sec: - go func(){ - tries := 3 - for( tries > 0 ){ - if worker.ready { - worker.Echo([]byte("Hello")) - kill_gearman <- true - log.Println("ok...") - worker.Echo([]byte("Hello")) - break - } - - // still waiting for it to be ready.. - time.Sleep(250 * time.Millisecond) - tries-- - } - }() - - - select{ - case <- c_error: - log.Println("eoo") - case <- timeout: - t.Error( "Test timed out waiting for the error handler" ) + if err != nil { + t.Error(err) } + + go worker.Work() + + kill_gearman <- true + + check_gearman_is_dead() + go run_gearman() + + select { + case <-gearman_ready: + case <-timeout: + } + + send_client_request() + + select { + case <- done: + t.Error("Client request handled (somehow), did we magically reconnect?") + case <-timeout: + t.Error("Test timed out waiting for the error handler") + case <-c_error: + // error was handled! + } + + kill_gearman <- true + } + +func send_client_request(){ + log.Println("sending client request"); + c, err := client.New( Network, "127.0.0.1:" + port ) + if err == nil { + _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) + if err != nil { + log.Println( "error sending client request " + err.Error() ) + } + + }else{ + log.Println( "error with client " + err.Error() ) + } +} \ No newline at end of file