diff --git a/worker/agent.go b/worker/agent.go index 18c8f67..147e887 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -111,6 +111,9 @@ func (a *agent) work() { } func (a *agent) disconnect_error(err error) { + a.Lock() + defer a.Unlock() + if a.conn != nil { err = &WorkerDisconnectError{ err: err, diff --git a/worker/worker.go b/worker/worker.go index 8ee04e6..7ca1389 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -4,9 +4,9 @@ package worker import ( "fmt" + "strconv" "sync" "time" - "strconv" ) const ( @@ -72,7 +72,7 @@ func (worker *Worker) AddServer(net, addr string) (err error) { // Broadcast an outpack to all Gearman server. func (worker *Worker) broadcast(outpack *outPack) { for _, v := range worker.agents { - v.write(outpack) + v.Write(outpack) } } @@ -186,7 +186,7 @@ func (worker *Worker) Ready() (err error) { return } -// Main loop, block here +// Work start main loop (blocking) // Most of time, this should be evaluated in goroutine. func (worker *Worker) Work() { if !worker.ready { @@ -197,7 +197,10 @@ func (worker *Worker) Work() { } } + worker.Lock() worker.running = true + worker.Unlock() + for _, a := range worker.agents { a.Grab() } diff --git a/worker/worker_racy_test.go b/worker/worker_racy_test.go new file mode 100644 index 0000000..c60e387 --- /dev/null +++ b/worker/worker_racy_test.go @@ -0,0 +1,59 @@ +package worker + +import ( + "fmt" + "sync" + "testing" +) + +func TestWorkerRace(t *testing.T) { + // from example worker + // An example of worker + w := New(Unlimited) + defer w.Close() + // Add a gearman job server + if err := w.AddServer(Network, "127.0.0.1:4730"); err != nil { + t.Fatal(err) + } + // A function for handling jobs + foobar := func(job Job) ([]byte, error) { + // Do nothing here + return nil, nil + } + // Add the function to worker + if err := w.AddFunc("foobar", foobar, 0); err != nil { + fmt.Println(err) + return + } + var wg sync.WaitGroup + // A custome handler, for handling other results, eg. ECHO, dtError. + w.JobHandler = func(job Job) error { + if job.Err() == nil { + fmt.Println(string(job.Data())) + } else { + fmt.Println(job.Err()) + } + wg.Done() + return nil + } + // An error handler for handling worker's internal errors. + w.ErrorHandler = func(e error) { + fmt.Println(e) + // Ignore the error or shutdown the worker + } + // Tell Gearman job server: I'm ready! + if err := w.Ready(); err != nil { + fmt.Println(err) + return + } + // Running main loop + go w.Work() + wg.Add(1) + // calling Echo + w.Echo([]byte("Hello")) + // Waiting results + wg.Wait() + + // tear down + w.Close() +}