From ddba5ab2f2d7357f0c7e0cf95be63f7b3c100bf9 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Wed, 25 Dec 2013 15:13:21 +0800 Subject: [PATCH] blocking on GRAB_JOB instead of executing blocked --- worker/worker.go | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index ff5278d..e77ff9e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -12,6 +12,9 @@ import ( ) const ( + Unlimited = 0 + OneByOne = 1 + Immediately = 0 ) @@ -19,7 +22,7 @@ const ( Worker side api for gearman usage: -w = worker.New() +w = worker.New(worker.Unlimited) w.AddFunction("foobar", foobar) w.AddServer("127.0.0.1:4730") w.Work() // Enter the worker's main loop @@ -34,6 +37,7 @@ func foobar(job *Job) (data []byte, err os.Error) { } */ type Worker struct { + sync.Mutex agents []*agent funcs JobFuncs in chan *inPack @@ -43,16 +47,19 @@ type Worker struct { // assign a ErrFunc to handle errors ErrorHandler ErrorHandler JobHandler JobHandler - mutex sync.Mutex + limit chan bool } // Get a new worker -func New() (worker *Worker) { +func New(limit int) (worker *Worker) { worker = &Worker{ agents: make([]*agent, 0), funcs: make(JobFuncs), in: make(chan *inPack, QUEUE_SIZE), } + if limit != Unlimited { + worker.limit = make(chan bool, limit - 1) + } return } @@ -89,8 +96,8 @@ func (worker *Worker) broadcast(outpack *outPack) { // The API will tell every connected job server that 'I can do this' func (worker *Worker) AddFunc(funcname string, f JobFunc, timeout uint32) (err error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() + worker.Lock() + defer worker.Unlock() if _, ok := worker.funcs[funcname]; ok { return fmt.Errorf("The function already exists: %s", funcname) } @@ -120,8 +127,8 @@ func (worker *Worker) addFunc(funcname string, timeout uint32) { // Remove a function. func (worker *Worker) RemoveFunc(funcname string) (err error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() + worker.Lock() + defer worker.Unlock() if _, ok := worker.funcs[funcname]; !ok { return fmt.Errorf("The function does not exist: %s", funcname) } @@ -149,9 +156,17 @@ func (worker *Worker) handleInPack(inpack *inPack) { case ERROR: worker.err(GetError(inpack.data)) case JOB_ASSIGN, JOB_ASSIGN_UNIQ: - if err := worker.exec(inpack); err != nil { - worker.err(err) + go func() { + if err := worker.exec(inpack); err != nil { + worker.err(err) + } + }() + if (worker.limit != nil) { + worker.limit <- true } + inpack.a.Grab() + case ECHO_RES: + fallthrough default: worker.customeHandler(inpack) } @@ -177,7 +192,7 @@ func (worker *Worker) Work() { } var inpack *inPack for inpack = range worker.in { - go worker.handleInPack(inpack) + worker.handleInPack(inpack) } } @@ -225,6 +240,9 @@ func (worker *Worker) SetId(id string) { // Execute the job. And send back the result. func (worker *Worker) exec(inpack *inPack) (err error) { defer func() { + if (worker.limit != nil) { + <-worker.limit + } if r := recover(); r != nil { if e, ok := r.(error); ok { err = e @@ -259,7 +277,6 @@ func (worker *Worker) exec(inpack *inPack) (err error) { outpack.handle = inpack.handle outpack.data = r.data inpack.a.write(outpack) - inpack.a.Grab() } return }