diff --git a/.travis.yml b/.travis.yml index d897dd8..b20dfe6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,7 @@ language: go +go: - 1.2 + before_install: - - sudo apt-get install -qq gearman-job-server + - sudo apt-get remove -y gearman-job-server + - sudo apt-get install -y gearman-job-server diff --git a/example/worker/worker b/example/worker/worker deleted file mode 100755 index c2c9df5..0000000 Binary files a/example/worker/worker and /dev/null differ diff --git a/worker/agent.go b/worker/agent.go index a260084..22f8781 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -3,10 +3,12 @@ package worker import ( "io" "net" + "sync" ) // The agent of job server. type agent struct { + sync.Mutex conn net.Conn worker *Worker in chan []byte @@ -25,6 +27,8 @@ func newAgent(net, addr string, worker *Worker) (a *agent, err error) { } func (a *agent) Connect() (err error) { + a.Lock() + defer a.Unlock() a.conn, err = net.Dial(a.net, a.addr) if err != nil { return @@ -40,11 +44,18 @@ func (a *agent) work() { var data, leftdata []byte for { if data, err = a.read(BUFFER_SIZE); err != nil { - if err == ErrConnClosed { + a.worker.err(err) + if err == ErrLostConn { + break + } + // If it is unexpected error and the connection wasn't + // closed by Gearmand, the agent should colse the conection + // and reconnect to job server. + a.conn, err = net.Dial(a.net, a.addr) + if err != nil { + a.worker.err(err) break } - a.worker.err(err) - continue } if len(leftdata) > 0 { // some data left for processing data = append(leftdata, data...) @@ -67,16 +78,25 @@ func (a *agent) work() { } func (a *agent) Close() { - a.conn.Close() + a.Lock() + defer a.Unlock() + if a.conn != nil { + a.conn.Close() + a.conn = nil + } } func (a *agent) Grab() { + a.Lock() + defer a.Unlock() outpack := getOutPack() outpack.dataType = GRAB_JOB_UNIQ a.write(outpack) } func (a *agent) PreSleep() { + a.Lock() + defer a.Unlock() outpack := getOutPack() outpack.dataType = PRE_SLEEP a.write(outpack) @@ -89,12 +109,8 @@ func (a *agent) read(length int) (data []byte, err error) { // read until data can be unpacked for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n { if n, err = a.conn.Read(buf); err != nil { - if err == io.EOF && n == 0 { - if data == nil { - err = ErrConnection - } else { - err = ErrConnClosed - } + if err == io.EOF { + err = ErrLostConn } return } diff --git a/worker/error.go b/worker/error.go index 3e2ae7f..ceade3e 100644 --- a/worker/error.go +++ b/worker/error.go @@ -7,20 +7,11 @@ import ( ) var ( - ErrJobTimeOut = errors.New("Do a job time out") - ErrInvalidData = errors.New("Invalid data") - ErrWorkWarning = errors.New("Work warning") - ErrWorkFail = errors.New("Work fail") - ErrWorkException = errors.New("Work exeption") - ErrDataType = errors.New("Invalid data type") - ErrOutOfCap = errors.New("Out of the capability") - ErrNotConn = errors.New("Did not connect to job server") - ErrFuncNotFound = errors.New("The function was not found") - ErrConnection = errors.New("Connection error") - ErrNoActiveAgent = errors.New("No active agent") - ErrTimeOut = errors.New("Executing time out") - ErrUnknown = errors.New("Unknown error") - ErrConnClosed = errors.New("Connection closed") + ErrNoneAgents = errors.New("None active agents") + ErrNoneFuncs = errors.New("None functions") + ErrTimeOut = errors.New("Executing time out") + ErrUnknown = errors.New("Unknown error") + ErrLostConn = errors.New("Lost connection with Gearmand") ) // Extract the error message diff --git a/worker/example_test.go b/worker/example_test.go new file mode 100644 index 0000000..f317783 --- /dev/null +++ b/worker/example_test.go @@ -0,0 +1,57 @@ +package worker_test + +import ( + "fmt" + "github.com/mikespook/gearman-go/worker" + "sync" +) + +func ExampleWorker() { + // An example of worker + w := worker.New(worker.Unlimited) + defer w.Close() + // Add a gearman job server + if err := w.AddServer("tcp4", "127.0.0.1:4730"); err != nil { + fmt.Println(err) + return + } + // A function for handling jobs + foobar := func(job worker.Job) ([]byte, error) { + // Do nothing here + return nil, nil + } + // Add the function to worker + if err := w.AddFunc("foobar", foobar, 0); err != nil { + fmt.Println(err) + return + } + var wg sync.WaitGroup + // A custome handler, for handling other results, eg. ECHO, ERROR. + w.JobHandler = func(job worker.Job) error { + if job.Err() == nil { + fmt.Println(string(job.Data())) + } else { + fmt.Println(job.Err()) + } + wg.Done() + return nil + } + // An error handler for handling worker's internal errors. + w.ErrorHandler = func(e error) { + fmt.Println(e) + // Ignore the error or shutdown the worker + } + // Tell Gearman job server: I'm ready! + if err := w.Ready(); err != nil { + fmt.Println(err) + return + } + // Running main loop + go w.Work() + wg.Add(1) + // calling Echo + w.Echo([]byte("Hello")) + // Waiting results + wg.Wait() + // Output: Hello +} diff --git a/worker/inpack.go b/worker/inpack.go index b00996b..c6edc9c 100644 --- a/worker/inpack.go +++ b/worker/inpack.go @@ -24,6 +24,13 @@ func (inpack *inPack) Data() []byte { return inpack.data } +func (inpack *inPack) Err() error { + if inpack.dataType == ERROR { + return GetError(inpack.data) + } + return nil +} + // Send some datas to client. // Using this in a job's executing. func (inpack *inPack) SendData(data []byte) { diff --git a/worker/job.go b/worker/job.go index 4f9950a..d2d7d6f 100644 --- a/worker/job.go +++ b/worker/job.go @@ -1,6 +1,7 @@ package worker type Job interface { + Err() error Data() []byte SendWarning(data []byte) SendData(data []byte) diff --git a/worker/worker.go b/worker/worker.go index bbc7c5c..478f00f 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -10,10 +10,10 @@ import ( ) const ( - Unlimited = 0 - OneByOne = 1 + Unlimited = iota + OneByOne - Immediately = 0 + Immediately = iota ) // Worker is the only structure needed by worker side developing. @@ -139,8 +139,6 @@ func (worker *Worker) handleInPack(inpack *inPack) { inpack.a.PreSleep() case NOOP: inpack.a.Grab() - case ERROR: - worker.err(GetError(inpack.data)) case JOB_ASSIGN, JOB_ASSIGN_UNIQ: go func() { if err := worker.exec(inpack); err != nil { @@ -151,6 +149,9 @@ func (worker *Worker) handleInPack(inpack *inPack) { worker.limit <- true } inpack.a.Grab() + case ERROR: + worker.err(inpack.Err()) + fallthrough case ECHO_RES: fallthrough default: @@ -161,6 +162,12 @@ func (worker *Worker) handleInPack(inpack *inPack) { // Connect to Gearman server and tell every server // what can this worker do. func (worker *Worker) Ready() (err error) { + if len(worker.agents) == 0 { + return ErrNoneAgents + } + if len(worker.funcs) == 0 { + return ErrNoneFuncs + } for _, v := range worker.agents { if err = v.Connect(); err != nil { return @@ -201,8 +208,12 @@ func (worker *Worker) customeHandler(inpack *inPack) { // Close connection and exit main loop func (worker *Worker) Close() { - worker.running = false - close(worker.in) + worker.Lock() + worker.Unlock() + if worker.running == true { + worker.running = false + close(worker.in) + } } // Echo diff --git a/worker/worker_test.go b/worker/worker_test.go index bb0666c..b76e6b9 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -11,6 +11,13 @@ func init() { worker = New(Unlimited) } +func TestWorkerErrNoneAgents(t *testing.T) { + err := worker.Ready() + if err != ErrNoneAgents { + t.Error("ErrNoneAgents expected.") + } +} + func TestWorkerAddServer(t *testing.T) { t.Log("Add local server 127.0.0.1:4730.") if err := worker.AddServer("tcp4", "127.0.0.1:4730"); err != nil { @@ -23,6 +30,13 @@ func TestWorkerAddServer(t *testing.T) { } } +func TestWorkerErrNoneFuncs(t *testing.T) { + err := worker.Ready() + if err != ErrNoneFuncs { + t.Error("ErrNoneFuncs expected.") + } +} + func foobar(job Job) ([]byte, error) { return nil, nil }