fixed job limit bug

This commit is contained in:
suchj 2012-12-29 23:54:20 +08:00
parent b72825d48a
commit 37d7888544

View File

@ -162,6 +162,26 @@ func (worker *Worker) removeFunc(funcname string) {
worker.broadcast(job) worker.broadcast(job)
} }
func (worker *Worker) dealJob(job *Job) {
defer func() {
job.Close()
if worker.limit != nil {
worker.limit <- true
}
}()
switch job.DataType {
case common.ERROR:
_, err := common.GetError(job.Data)
worker.err(err)
case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
if err := worker.exec(job); err != nil {
worker.err(err)
}
default:
worker.handleJob(job)
}
}
// Main loop // Main loop
func (worker *Worker) Work() { func (worker *Worker) Work() {
defer func() { defer func() {
@ -181,20 +201,10 @@ func (worker *Worker) Work() {
for ok { for ok {
var job *Job var job *Job
if job, ok = <-worker.in; ok { if job, ok = <-worker.in; ok {
go func() { if worker.limit != nil {
defer job.Close() <-worker.limit
switch job.DataType {
case common.ERROR:
_, err := common.GetError(job.Data)
worker.err(err)
case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
if err := worker.exec(job); err != nil {
worker.err(err)
} }
default: go worker.dealJob(job)
worker.handleJob(job)
}
}()
} }
} }
} }
@ -248,12 +258,6 @@ func (worker *Worker) exec(job *Job) (err error) {
} }
} }
} () } ()
if worker.limit != nil {
<-worker.limit
defer func() {
worker.limit <- true
}()
}
var limit int var limit int
if job.DataType == common.JOB_ASSIGN { if job.DataType == common.JOB_ASSIGN {
limit = 3 limit = 3