From 09c626f48830df67b05f5a54ca5e41428a9286dc Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Tue, 10 Jun 2014 03:46:21 +0100 Subject: [PATCH] Cope with io.EOF as a disconnect error --- worker/agent.go | 23 ++++++++++++++--------- worker/worker.go | 7 +++++-- worker/worker_disconnect_test.go | 3 --- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 310cca8..9befa9c 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -4,6 +4,7 @@ import ( "bufio" "net" "sync" + "io" ) // The agent of job server. @@ -57,20 +58,14 @@ func (a *agent) work() { if opErr.Temporary() { continue }else{ - a.Lock() - if( a.conn != nil ){ - a.Unlock() - err = &WorkerDisconnectError{ - OpError : opErr, - agent : a, - } - a.worker.err(err) - } + a.disconnect_error(err) // else - we're probably dc'ing due to a Close() break } + } else if( err == io.EOF ){ + a.disconnect_error(err) } a.worker.err(err) // If it is unexpected error and the connection wasn't @@ -106,6 +101,16 @@ func (a *agent) work() { } } +func (a * agent) disconnect_error( err error ){ + if( a.conn != nil ){ + err = &WorkerDisconnectError{ + err : err, + agent : a, + } + a.worker.err(err) + } +} + func (a *agent) Close() { a.Lock() defer a.Unlock() diff --git a/worker/worker.go b/worker/worker.go index 3b9f695..0bb7b4c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -7,7 +7,6 @@ import ( "fmt" "sync" "time" - "net" ) const ( @@ -335,9 +334,13 @@ func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) { // Error type passed when a worker connection disconnects type WorkerDisconnectError struct{ - *net.OpError + err error agent * agent } +func (e *WorkerDisconnectError) Error() ( string){ + return e.err.Error(); +} + // Responds to the error by asking the worker to reconnect func (e *WorkerDisconnectError) Reconnect() ( err error ){ return e.agent.reconnect() diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go index 8925e7a..34a3621 100644 --- a/worker/worker_disconnect_test.go +++ b/worker/worker_disconnect_test.go @@ -36,7 +36,6 @@ func run_gearman() { // Make sure we clear up our gearman: defer func() { - log.Println("killing gearmand") gm_cmd.Process.Kill() }() @@ -58,7 +57,6 @@ func run_gearman() { func check_gearman_present() bool { con, err := net.Dial(`tcp`, `127.0.0.1:`+port) if err != nil { - log.Println("gearman not ready " + err.Error()) return false } con.Close() @@ -233,7 +231,6 @@ func TestDcRc(t *testing.T) { } func send_client_request() { - log.Println("sending client request") c, err := client.New(Network, "127.0.0.1:"+port) if err == nil { _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh)