fixed the worker's testing cases

This commit is contained in:
Xing Xing 2013-12-26 15:55:16 +08:00
parent bf25cc1728
commit 9f96247384
6 changed files with 42 additions and 31 deletions

View File

@ -3,6 +3,7 @@ package worker
import ( import (
"io" "io"
"net" "net"
"strings"
"sync" "sync"
) )
@ -44,10 +45,10 @@ func (a *agent) work() {
var data, leftdata []byte var data, leftdata []byte
for { for {
if data, err = a.read(bufferSize); err != nil { if data, err = a.read(bufferSize); err != nil {
a.worker.err(err)
if err == ErrLostConn { if err == ErrLostConn {
break break
} }
a.worker.err(err)
// If it is unexpected error and the connection wasn't // If it is unexpected error and the connection wasn't
// closed by Gearmand, the agent should close the conection // closed by Gearmand, the agent should close the conection
// and reconnect to job server. // and reconnect to job server.
@ -103,6 +104,16 @@ func (a *agent) PreSleep() {
a.write(outpack) a.write(outpack)
} }
func isClosed(err error) bool {
switch {
case err == io.EOF:
fallthrough
case strings.Contains(err.Error(), "use of closed network connection"):
return true
}
return false
}
// read length bytes from the socket // read length bytes from the socket
func (a *agent) read(length int) (data []byte, err error) { func (a *agent) read(length int) (data []byte, err error) {
n := 0 n := 0
@ -110,7 +121,7 @@ func (a *agent) read(length int) (data []byte, err error) {
// read until data can be unpacked // read until data can be unpacked
for i := length; i > 0 || len(data) < minPacketLength; i -= n { for i := length; i > 0 || len(data) < minPacketLength; i -= n {
if n, err = a.conn.Read(buf); err != nil { if n, err = a.conn.Read(buf); err != nil {
if err == io.EOF { if isClosed(err) {
err = ErrLostConn err = ErrLostConn
} }
return return

View File

@ -10,10 +10,10 @@ const (
minPacketLength = 12 minPacketLength = 12
// \x00REQ // \x00REQ
req = 5391697 req = 5391697
reqStr = "\x00REQ" reqStr = "\x00REQ"
// \x00RES // \x00RES
res = 5391699 res = 5391699
resStr = "\x00RES" resStr = "\x00RES"
// package data type // package data type
@ -21,7 +21,7 @@ const (
dtCantDo = 2 dtCantDo = 2
dtResetAbilities = 3 dtResetAbilities = 3
dtPreSleep = 4 dtPreSleep = 4
dtNoop = 6 dtNoop = 6
dtJobCreated = 8 dtJobCreated = 8
dtGrabJob = 9 dtGrabJob = 9
dtNoJob = 10 dtNoJob = 10
@ -32,16 +32,16 @@ const (
dtGetStatus = 15 dtGetStatus = 15
dtEchoReq = 16 dtEchoReq = 16
dtEchoRes = 17 dtEchoRes = 17
dtError = 19 dtError = 19
dtStatusRes = 20 dtStatusRes = 20
dtSetClientId = 22 dtSetClientId = 22
dtCanDoTimeout = 23 dtCanDoTimeout = 23
dtAllYours = 24 dtAllYours = 24
dtWorkException = 25 dtWorkException = 25
dtWorkData = 28 dtWorkData = 28
dtWorkWarning = 29 dtWorkWarning = 29
dtGrabJobUniq = 30 dtGrabJobUniq = 30
dtJobAssignUniq = 31 dtJobAssignUniq = 31
) )
func getBuffer(l int) (buf []byte) { func getBuffer(l int) (buf []byte) {

View File

@ -11,7 +11,7 @@ func ExampleWorker() {
w := worker.New(worker.Unlimited) w := worker.New(worker.Unlimited)
defer w.Close() defer w.Close()
// Add a gearman job server // Add a gearman job server
if err := w.AddServer(worker.NETWORK, "127.0.0.1:4730"); err != nil { if err := w.AddServer(worker.Network, "127.0.0.1:4730"); err != nil {
fmt.Println(err) fmt.Println(err)
return return
} }

View File

@ -7,19 +7,19 @@ import (
var ( var (
inpackcases = map[uint32]map[string]string{ inpackcases = map[uint32]map[string]string{
noop: map[string]string{ dtNoop: map[string]string{
"src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00", "src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00",
}, },
noJob: map[string]string{ dtNoJob: map[string]string{
"src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00", "src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00",
}, },
jobAssign: map[string]string{ dtJobAssign: map[string]string{
"src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz", "src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz",
"handle": "a", "handle": "a",
"fn": "b", "fn": "b",
"data": "xyz", "data": "xyz",
}, },
jobAssign_UNIQ: map[string]string{ dtJobAssignUniq: map[string]string{
"src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz", "src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz",
"handle": "a", "handle": "a",
"fn": "b", "fn": "b",

View File

@ -7,51 +7,51 @@ import (
var ( var (
outpackcases = map[uint32]map[string]string{ outpackcases = map[uint32]map[string]string{
canDo: map[string]string{ dtCanDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a", "src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a",
"data": "a", "data": "a",
}, },
canDo_TIMEOUT: map[string]string{ dtCanDoTimeout: map[string]string{
"src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01", "src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01",
"data": "a\x00\x00\x00\x00\x01", "data": "a\x00\x00\x00\x00\x01",
}, },
cantDo: map[string]string{ dtCantDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a", "src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a",
"data": "a", "data": "a",
}, },
resetAbilities: map[string]string{ dtResetAbilities: map[string]string{
"src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00", "src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00",
}, },
preSleep: map[string]string{ dtPreSleep: map[string]string{
"src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00", "src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00",
}, },
GRAB_JOB: map[string]string{ dtGrabJob: map[string]string{
"src": "\x00REQ\x00\x00\x00\x09\x00\x00\x00\x00", "src": "\x00REQ\x00\x00\x00\x09\x00\x00\x00\x00",
}, },
GRAB_JOB_UNIQ: map[string]string{ dtGrabJobUniq: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1E\x00\x00\x00\x00", "src": "\x00REQ\x00\x00\x00\x1E\x00\x00\x00\x00",
}, },
WORK_DATA: map[string]string{ dtWorkData: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1C\x00\x00\x00\x03a\x00b", "src": "\x00REQ\x00\x00\x00\x1C\x00\x00\x00\x03a\x00b",
"data": "a\x00b", "data": "a\x00b",
}, },
WORK_WARNING: map[string]string{ dtWorkWarning: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b", "src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b",
"data": "a\x00b", "data": "a\x00b",
}, },
workStatus: map[string]string{ dtWorkStatus: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100", "src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100",
"data": "a\x0050\x00100", "data": "a\x0050\x00100",
}, },
workComplete: map[string]string{ dtWorkComplete: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b", "src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b",
"data": "a\x00b", "data": "a\x00b",
}, },
workFail: map[string]string{ dtWorkFail: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a", "src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a",
"handle": "a", "handle": "a",
}, },
WORK_EXCEPTION: map[string]string{ dtWorkException: map[string]string{
"src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b", "src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b",
"data": "a\x00b", "data": "a\x00b",
}, },
@ -59,7 +59,7 @@ var (
"src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a", "src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a",
"data": "a", "data": "a",
}, },
ALL_YOURS: map[string]string{ dtAllYours: map[string]string{
"src": "\x00REQ\x00\x00\x00\x18\x00\x00\x00\x00", "src": "\x00REQ\x00\x00\x00\x18\x00\x00\x00\x00",
}, },
} }

View File

@ -168,8 +168,8 @@ func (worker *Worker) Ready() (err error) {
if len(worker.funcs) == 0 { if len(worker.funcs) == 0 {
return ErrNoneFuncs return ErrNoneFuncs
} }
for _, v := range worker.agents { for _, a := range worker.agents {
if err = v.Connect(); err != nil { if err = a.Connect(); err != nil {
return return
} }
} }