diff --git a/worker/agent.go b/worker/agent.go index 147e887..3996a61 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -98,6 +98,11 @@ func (a *agent) work() { } else { leftdata = nil inpack.a = a + select { + case <-a.worker.closed: + return + default: + } a.worker.in <- inpack if len(data) == l { break diff --git a/worker/worker.go b/worker/worker.go index 7ca1389..0ea022f 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -18,16 +18,19 @@ const ( // It can connect to multi-server and grab jobs. type Worker struct { sync.Mutex - agents []*agent - funcs jobFuncs - in chan *inPack - running bool - ready bool + agents []*agent + funcs jobFuncs + in chan *inPack + running bool + ready bool + jobLeftNum int64 Id string ErrorHandler ErrorHandler JobHandler JobHandler limit chan bool + closed chan struct{} + leftJobs chan struct{} } // New returns a worker. @@ -147,14 +150,20 @@ func (worker *Worker) handleInPack(inpack *inPack) { inpack.a.Grab() case dtJobAssign, dtJobAssignUniq: go func() { - if err := worker.exec(inpack); err != nil { - worker.err(err) + go func() { + worker.incrExecJobNum() + defer func() { + worker.decrExecJobNum() + }() + if err := worker.exec(inpack); err != nil { + worker.err(err) + } + }() + if worker.limit != nil { + worker.limit <- true } + inpack.a.Grab() }() - if worker.limit != nil { - worker.limit <- true - } - inpack.a.Grab() case dtError: worker.err(inpack.Err()) fallthrough @@ -204,10 +213,22 @@ func (worker *Worker) Work() { for _, a := range worker.agents { a.Grab() } + // 执行任务(阻塞) var inpack *inPack for inpack = range worker.in { worker.handleInPack(inpack) } + // 关闭Worker进程后 等待任务完成后退出 + worker.Lock() + leftJobNum := int(worker.jobLeftNum) + worker.Unlock() + if worker.leftJobs != nil { + for i := 0; i < leftJobNum; i++ { + <-worker.leftJobs + } + } + worker.Reset() + worker.close() } // custome handling warper @@ -223,12 +244,21 @@ func (worker *Worker) customeHandler(inpack *inPack) { func (worker *Worker) Close() { worker.Lock() defer worker.Unlock() - if worker.running == true { - for _, a := range worker.agents { - a.Close() - } + if worker.running == true && worker.closed == nil { + worker.closed = make(chan struct{}, 1) + worker.closed <- struct{}{} worker.running = false close(worker.in) + // 创建关闭后执行中的任务列表 + if worker.jobLeftNum != 0 { + worker.leftJobs = make(chan struct{}, worker.jobLeftNum+int64(len(worker.in))) + } + } +} + +func (worker *Worker) close() { + for _, a := range worker.agents { + a.Close() } } @@ -258,6 +288,23 @@ func (worker *Worker) SetId(id string) { worker.broadcast(outpack) } +func (worker *Worker) incrExecJobNum() int64 { + worker.Lock() + defer worker.Unlock() + worker.jobLeftNum++ + return worker.jobLeftNum +} + +func (worker *Worker) decrExecJobNum() int64 { + worker.Lock() + defer worker.Unlock() + worker.jobLeftNum-- + if worker.jobLeftNum < 0 { + worker.jobLeftNum = 0 + } + return worker.jobLeftNum +} + // inner job executing func (worker *Worker) exec(inpack *inPack) (err error) { defer func() { @@ -283,22 +330,25 @@ func (worker *Worker) exec(inpack *inPack) (err error) { } else { r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second) } - if worker.running { - outpack := getOutPack() - if r.err == nil { - outpack.dataType = dtWorkComplete + //if worker.running { + outpack := getOutPack() + if r.err == nil { + outpack.dataType = dtWorkComplete + } else { + if len(r.data) == 0 { + outpack.dataType = dtWorkFail } else { - if len(r.data) == 0 { - outpack.dataType = dtWorkFail - } else { - outpack.dataType = dtWorkException - } - err = r.err + outpack.dataType = dtWorkException } - outpack.handle = inpack.handle - outpack.data = r.data - inpack.a.Write(outpack) + err = r.err } + outpack.handle = inpack.handle + outpack.data = r.data + err = inpack.a.Write(outpack) + if worker.leftJobs != nil { + worker.leftJobs <- struct{}{} + } + //} return } func (worker *Worker) reRegisterFuncsForAgent(a *agent) {