diff --git a/common/error.go b/common/error.go index 23d2879..ede37f7 100644 --- a/common/error.go +++ b/common/error.go @@ -22,7 +22,8 @@ var ( ErrOutOfCap = errors.New("Out of the capability.") ErrNotConn = errors.New("Did not connect to job server.") ErrFuncNotFound = errors.New("The function was not found.") - ErrConnection = errors.New("Connection error.") + ErrEmptyReading = errors.New("Empty reading.") + ErrNoActiveAgent = errors.New("No active agent.") ) func DisablePanic() {recover()} diff --git a/example/worker.go b/example/worker.go index 9a19338..d5f162f 100644 --- a/example/worker.go +++ b/example/worker.go @@ -19,6 +19,7 @@ func main() { log.Println("Starting ...") defer log.Println("Shutdown complete!") w := worker.New(worker.Unlimited) + defer w.Close() w.ErrHandler = func(e error) { log.Println(e) } diff --git a/worker/agent.go b/worker/agent.go index 4e92209..a8ff6d2 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -48,7 +48,7 @@ func (a *agent) outLoop() { // inputing loop func (a *agent) inLoop() { defer func() { - a.conn.Close() + recover() close(a.in) close(a.out) a.worker.removeAgent(a) @@ -86,6 +86,10 @@ func (a *agent) inLoop() { } } +func (a *agent) Close() { + a.conn.Close() +} + func (a *agent) Work() { go a.outLoop() go a.inLoop() diff --git a/worker/worker.go b/worker/worker.go index 667e16e..35af432 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -159,6 +159,9 @@ func (worker *Worker) removeFunc(funcname string) { func (worker *Worker) Work() { defer func() { worker.running = false + for _, v := range worker.agents { + v.Close() + } }() for funcname, f := range worker.funcs { worker.addFunc(funcname, f.timeout) @@ -168,8 +171,9 @@ func (worker *Worker) Work() { go v.Work() } ok := true + var job *Job for ok { - if job, ok := <-worker.in; ok { + if job, ok = <-worker.in; ok { switch job.DataType { case common.ERROR: go func() { @@ -280,6 +284,6 @@ func (worker *Worker) removeAgent(a *agent) { } } if len(worker.agents) == 0 { - worker.Close() + worker.err(common.ErrNoActiveAgent) } }