add synchronization to prevent race condition on worker shutdown (#89)

* add synchronization to prevent race condition on worker shutdown

* fix incorrect synchronization and add race unit test

* fix race while broadcasting outpack
This commit is contained in:
galih rivanto 2019-05-28 17:41:20 +07:00 committed by Sadlil Rhythom
parent b902646ce8
commit 81d00aa9ce
3 changed files with 68 additions and 3 deletions

View File

@ -111,6 +111,9 @@ func (a *agent) work() {
} }
func (a *agent) disconnect_error(err error) { func (a *agent) disconnect_error(err error) {
a.Lock()
defer a.Unlock()
if a.conn != nil { if a.conn != nil {
err = &WorkerDisconnectError{ err = &WorkerDisconnectError{
err: err, err: err,

View File

@ -4,9 +4,9 @@ package worker
import ( import (
"fmt" "fmt"
"strconv"
"sync" "sync"
"time" "time"
"strconv"
) )
const ( const (
@ -72,7 +72,7 @@ func (worker *Worker) AddServer(net, addr string) (err error) {
// Broadcast an outpack to all Gearman server. // Broadcast an outpack to all Gearman server.
func (worker *Worker) broadcast(outpack *outPack) { func (worker *Worker) broadcast(outpack *outPack) {
for _, v := range worker.agents { for _, v := range worker.agents {
v.write(outpack) v.Write(outpack)
} }
} }
@ -186,7 +186,7 @@ func (worker *Worker) Ready() (err error) {
return return
} }
// Main loop, block here // Work start main loop (blocking)
// Most of time, this should be evaluated in goroutine. // Most of time, this should be evaluated in goroutine.
func (worker *Worker) Work() { func (worker *Worker) Work() {
if !worker.ready { if !worker.ready {
@ -197,7 +197,10 @@ func (worker *Worker) Work() {
} }
} }
worker.Lock()
worker.running = true worker.running = true
worker.Unlock()
for _, a := range worker.agents { for _, a := range worker.agents {
a.Grab() a.Grab()
} }

View File

@ -0,0 +1,59 @@
package worker
import (
"fmt"
"sync"
"testing"
)
func TestWorkerRace(t *testing.T) {
// from example worker
// An example of worker
w := New(Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
// A function for handling jobs
foobar := func(job Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
// tear down
w.Close()
}