diff --git a/worker/agent.go b/worker/agent.go index e6a95f5..76fbbde 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -3,6 +3,7 @@ package worker import ( "io" "net" + "strings" "sync" ) @@ -44,10 +45,10 @@ func (a *agent) work() { var data, leftdata []byte for { if data, err = a.read(bufferSize); err != nil { - a.worker.err(err) if err == ErrLostConn { break } + a.worker.err(err) // If it is unexpected error and the connection wasn't // closed by Gearmand, the agent should close the conection // and reconnect to job server. @@ -103,6 +104,16 @@ func (a *agent) PreSleep() { a.write(outpack) } +func isClosed(err error) bool { + switch { + case err == io.EOF: + fallthrough + case strings.Contains(err.Error(), "use of closed network connection"): + return true + } + return false +} + // read length bytes from the socket func (a *agent) read(length int) (data []byte, err error) { n := 0 @@ -110,7 +121,7 @@ func (a *agent) read(length int) (data []byte, err error) { // read until data can be unpacked for i := length; i > 0 || len(data) < minPacketLength; i -= n { if n, err = a.conn.Read(buf); err != nil { - if err == io.EOF { + if isClosed(err) { err = ErrLostConn } return diff --git a/worker/common.go b/worker/common.go index 99c0e6d..8d8ebc4 100644 --- a/worker/common.go +++ b/worker/common.go @@ -10,10 +10,10 @@ const ( minPacketLength = 12 // \x00REQ - req = 5391697 + req = 5391697 reqStr = "\x00REQ" // \x00RES - res = 5391699 + res = 5391699 resStr = "\x00RES" // package data type @@ -21,7 +21,7 @@ const ( dtCantDo = 2 dtResetAbilities = 3 dtPreSleep = 4 - dtNoop = 6 + dtNoop = 6 dtJobCreated = 8 dtGrabJob = 9 dtNoJob = 10 @@ -32,16 +32,16 @@ const ( dtGetStatus = 15 dtEchoReq = 16 dtEchoRes = 17 - dtError = 19 + dtError = 19 dtStatusRes = 20 - dtSetClientId = 22 - dtCanDoTimeout = 23 + dtSetClientId = 22 + dtCanDoTimeout = 23 dtAllYours = 24 dtWorkException = 25 dtWorkData = 28 dtWorkWarning = 29 - dtGrabJobUniq = 30 - dtJobAssignUniq = 31 + dtGrabJobUniq = 30 + dtJobAssignUniq = 31 ) func getBuffer(l int) (buf []byte) { diff --git a/worker/example_test.go b/worker/example_test.go index 52032e4..4fbaaed 100644 --- a/worker/example_test.go +++ b/worker/example_test.go @@ -11,7 +11,7 @@ func ExampleWorker() { w := worker.New(worker.Unlimited) defer w.Close() // Add a gearman job server - if err := w.AddServer(worker.NETWORK, "127.0.0.1:4730"); err != nil { + if err := w.AddServer(worker.Network, "127.0.0.1:4730"); err != nil { fmt.Println(err) return } diff --git a/worker/inpack_test.go b/worker/inpack_test.go index 4ae978e..b1c099f 100644 --- a/worker/inpack_test.go +++ b/worker/inpack_test.go @@ -7,19 +7,19 @@ import ( var ( inpackcases = map[uint32]map[string]string{ - noop: map[string]string{ + dtNoop: map[string]string{ "src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00", }, - noJob: map[string]string{ + dtNoJob: map[string]string{ "src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00", }, - jobAssign: map[string]string{ + dtJobAssign: map[string]string{ "src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz", "handle": "a", "fn": "b", "data": "xyz", }, - jobAssign_UNIQ: map[string]string{ + dtJobAssignUniq: map[string]string{ "src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz", "handle": "a", "fn": "b", diff --git a/worker/outpack_test.go b/worker/outpack_test.go index 65ba330..a377445 100644 --- a/worker/outpack_test.go +++ b/worker/outpack_test.go @@ -7,51 +7,51 @@ import ( var ( outpackcases = map[uint32]map[string]string{ - canDo: map[string]string{ + dtCanDo: map[string]string{ "src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a", "data": "a", }, - canDo_TIMEOUT: map[string]string{ + dtCanDoTimeout: map[string]string{ "src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01", "data": "a\x00\x00\x00\x00\x01", }, - cantDo: map[string]string{ + dtCantDo: map[string]string{ "src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a", "data": "a", }, - resetAbilities: map[string]string{ + dtResetAbilities: map[string]string{ "src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00", }, - preSleep: map[string]string{ + dtPreSleep: map[string]string{ "src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00", }, - GRAB_JOB: map[string]string{ + dtGrabJob: map[string]string{ "src": "\x00REQ\x00\x00\x00\x09\x00\x00\x00\x00", }, - GRAB_JOB_UNIQ: map[string]string{ + dtGrabJobUniq: map[string]string{ "src": "\x00REQ\x00\x00\x00\x1E\x00\x00\x00\x00", }, - WORK_DATA: map[string]string{ + dtWorkData: map[string]string{ "src": "\x00REQ\x00\x00\x00\x1C\x00\x00\x00\x03a\x00b", "data": "a\x00b", }, - WORK_WARNING: map[string]string{ + dtWorkWarning: map[string]string{ "src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b", "data": "a\x00b", }, - workStatus: map[string]string{ + dtWorkStatus: map[string]string{ "src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100", "data": "a\x0050\x00100", }, - workComplete: map[string]string{ + dtWorkComplete: map[string]string{ "src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b", "data": "a\x00b", }, - workFail: map[string]string{ + dtWorkFail: map[string]string{ "src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a", "handle": "a", }, - WORK_EXCEPTION: map[string]string{ + dtWorkException: map[string]string{ "src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b", "data": "a\x00b", }, @@ -59,7 +59,7 @@ var ( "src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a", "data": "a", }, - ALL_YOURS: map[string]string{ + dtAllYours: map[string]string{ "src": "\x00REQ\x00\x00\x00\x18\x00\x00\x00\x00", }, } diff --git a/worker/worker.go b/worker/worker.go index f6cdb13..82d3747 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -168,8 +168,8 @@ func (worker *Worker) Ready() (err error) { if len(worker.funcs) == 0 { return ErrNoneFuncs } - for _, v := range worker.agents { - if err = v.Connect(); err != nil { + for _, a := range worker.agents { + if err = a.Connect(); err != nil { return } }