diff --git a/worker/worker.go b/worker/worker.go index 0456067..ad9a7fb 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -162,6 +162,26 @@ func (worker *Worker) removeFunc(funcname string) { worker.broadcast(job) } +func (worker *Worker) dealJob(job *Job) { + defer func() { + job.Close() + if worker.limit != nil { + worker.limit <- true + } + }() + switch job.DataType { + case common.ERROR: + _, err := common.GetError(job.Data) + worker.err(err) + case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: + if err := worker.exec(job); err != nil { + worker.err(err) + } + default: + worker.handleJob(job) + } +} + // Main loop func (worker *Worker) Work() { defer func() { @@ -181,20 +201,10 @@ func (worker *Worker) Work() { for ok { var job *Job if job, ok = <-worker.in; ok { - go func() { - defer job.Close() - switch job.DataType { - case common.ERROR: - _, err := common.GetError(job.Data) - worker.err(err) - case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: - if err := worker.exec(job); err != nil { - worker.err(err) - } - default: - worker.handleJob(job) - } - }() + if worker.limit != nil { + <-worker.limit + } + go worker.dealJob(job) } } } @@ -248,12 +258,6 @@ func (worker *Worker) exec(job *Job) (err error) { } } } () - if worker.limit != nil { - <-worker.limit - defer func() { - worker.limit <- true - }() - } var limit int if job.DataType == common.JOB_ASSIGN { limit = 3