From a003eac543c881722412169a8f6c028967dcccf2 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Tue, 6 Jan 2015 11:34:39 +0800 Subject: [PATCH] fixed the closing method --- worker/worker.go | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index 0bb7b4c..c8c91db 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -22,7 +22,7 @@ type Worker struct { funcs jobFuncs in chan *inPack running bool - ready bool + ready bool Id string ErrorHandler ErrorHandler @@ -30,7 +30,6 @@ type Worker struct { limit chan bool } - // Return a worker. // // If limit is set to Unlimited(=0), the worker will grab all jobs @@ -95,11 +94,11 @@ func (worker *Worker) AddFunc(funcname string, // inner add func (worker *Worker) addFunc(funcname string, timeout uint32) { - outpack := prepFuncOutpack( funcname, timeout) + outpack := prepFuncOutpack(funcname, timeout) worker.broadcast(outpack) } -func prepFuncOutpack(funcname string, timeout uint32) (*outPack){ +func prepFuncOutpack(funcname string, timeout uint32) *outPack { outpack := getOutPack() if timeout == 0 { outpack.dataType = dtCanDo @@ -188,19 +187,14 @@ func (worker *Worker) Ready() (err error) { // Main loop, block here // Most of time, this should be evaluated in goroutine. func (worker *Worker) Work() { - if ! worker.ready { + if !worker.ready { // didn't run Ready beforehand, so we'll have to do it: err := worker.Ready() if err != nil { - panic( err ) + panic(err) } } - defer func() { - for _, a := range worker.agents { - a.Close() - } - }() worker.running = true for _, a := range worker.agents { a.Grab() @@ -223,8 +217,11 @@ func (worker *Worker) customeHandler(inpack *inPack) { // Close connection and exit main loop func (worker *Worker) Close() { worker.Lock() - worker.Unlock() + defer worker.Unlock() if worker.running == true { + for _, a := range worker.agents { + a.Close() + } worker.running = false close(worker.in) } @@ -299,11 +296,11 @@ func (worker *Worker) exec(inpack *inPack) (err error) { } return } -func (worker *Worker)reRegisterFuncsForAgent( a * agent ){ +func (worker *Worker) reRegisterFuncsForAgent(a *agent) { worker.Lock() defer worker.Unlock() for funcname, f := range worker.funcs { - outpack := prepFuncOutpack( funcname, f.timeout) + outpack := prepFuncOutpack(funcname, f.timeout) a.write(outpack) } @@ -333,19 +330,21 @@ func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) { } // Error type passed when a worker connection disconnects -type WorkerDisconnectError struct{ - err error - agent * agent +type WorkerDisconnectError struct { + err error + agent *agent } -func (e *WorkerDisconnectError) Error() ( string){ - return e.err.Error(); + +func (e *WorkerDisconnectError) Error() string { + return e.err.Error() } // Responds to the error by asking the worker to reconnect -func (e *WorkerDisconnectError) Reconnect() ( err error ){ +func (e *WorkerDisconnectError) Reconnect() (err error) { return e.agent.reconnect() } -// Which server was this for? -func(e *WorkerDisconnectError) Server() ( net string, addr string ){ + +// Which server was this for? +func (e *WorkerDisconnectError) Server() (net string, addr string) { return e.agent.net, e.agent.addr }