From 842edf79ba3e3c4a487088ecaa64f9356e15509d Mon Sep 17 00:00:00 2001 From: mikespook Date: Mon, 28 May 2012 10:34:16 +0800 Subject: [PATCH] fixed a infinite loop --- common/error.go | 1 + example/worker.go | 1 + worker/agent.go | 6 +++++- worker/worker.go | 8 ++++++-- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/common/error.go b/common/error.go index 463af2d..ede37f7 100644 --- a/common/error.go +++ b/common/error.go @@ -23,6 +23,7 @@ var ( ErrNotConn = errors.New("Did not connect to job server.") ErrFuncNotFound = errors.New("The function was not found.") ErrEmptyReading = errors.New("Empty reading.") + ErrNoActiveAgent = errors.New("No active agent.") ) func DisablePanic() {recover()} diff --git a/example/worker.go b/example/worker.go index 9a19338..d5f162f 100644 --- a/example/worker.go +++ b/example/worker.go @@ -19,6 +19,7 @@ func main() { log.Println("Starting ...") defer log.Println("Shutdown complete!") w := worker.New(worker.Unlimited) + defer w.Close() w.ErrHandler = func(e error) { log.Println(e) } diff --git a/worker/agent.go b/worker/agent.go index ceef56a..41c2faf 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -48,7 +48,7 @@ func (a *agent) outLoop() { // inputing loop func (a *agent) inLoop() { defer func() { - a.conn.Close() + recover() close(a.in) close(a.out) a.worker.removeAgent(a) @@ -85,6 +85,10 @@ func (a *agent) inLoop() { } } +func (a *agent) Close() { + a.conn.Close() +} + func (a *agent) Work() { go a.outLoop() go a.inLoop() diff --git a/worker/worker.go b/worker/worker.go index 667e16e..35af432 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -159,6 +159,9 @@ func (worker *Worker) removeFunc(funcname string) { func (worker *Worker) Work() { defer func() { worker.running = false + for _, v := range worker.agents { + v.Close() + } }() for funcname, f := range worker.funcs { worker.addFunc(funcname, f.timeout) @@ -168,8 +171,9 @@ func (worker *Worker) Work() { go v.Work() } ok := true + var job *Job for ok { - if job, ok := <-worker.in; ok { + if job, ok = <-worker.in; ok { switch job.DataType { case common.ERROR: go func() { @@ -280,6 +284,6 @@ func (worker *Worker) removeAgent(a *agent) { } } if len(worker.agents) == 0 { - worker.Close() + worker.err(common.ErrNoActiveAgent) } }