forked from yuxh/gearman-go
Close friendly
This commit is contained in:
parent
7a8a5d4a6a
commit
9549d98a7c
@ -41,7 +41,7 @@ func newAgent(addr string, worker *Worker) (a *agent, err error) {
|
|||||||
func (a *agent) outLoop() {
|
func (a *agent) outLoop() {
|
||||||
ok := true
|
ok := true
|
||||||
var job *Job
|
var job *Job
|
||||||
for ok {
|
for a.worker.running && ok {
|
||||||
if job, ok = <-a.out; ok {
|
if job, ok = <-a.out; ok {
|
||||||
if err := a.write(job.Encode()); err != nil {
|
if err := a.write(job.Encode()); err != nil {
|
||||||
a.worker.err(err)
|
a.worker.err(err)
|
||||||
@ -92,7 +92,9 @@ func (a *agent) inLoop() {
|
|||||||
a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil))
|
a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil))
|
||||||
case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
|
case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
|
||||||
job.agent = a
|
job.agent = a
|
||||||
a.worker.in <- job
|
if a.worker.running {
|
||||||
|
a.worker.in <- job
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -165,7 +165,7 @@ func (worker *Worker) removeFunc(funcname string) {
|
|||||||
func (worker *Worker) dealJob(job *Job) {
|
func (worker *Worker) dealJob(job *Job) {
|
||||||
defer func() {
|
defer func() {
|
||||||
job.Close()
|
job.Close()
|
||||||
if worker.limit != nil {
|
if worker.running && worker.limit != nil {
|
||||||
worker.limit <- true
|
worker.limit <- true
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -185,7 +185,6 @@ func (worker *Worker) dealJob(job *Job) {
|
|||||||
// Main loop
|
// Main loop
|
||||||
func (worker *Worker) Work() {
|
func (worker *Worker) Work() {
|
||||||
defer func() {
|
defer func() {
|
||||||
worker.running = false
|
|
||||||
for _, v := range worker.agents {
|
for _, v := range worker.agents {
|
||||||
v.Close()
|
v.Close()
|
||||||
}
|
}
|
||||||
@ -220,6 +219,7 @@ func (worker *Worker) handleJob(job *Job) {
|
|||||||
|
|
||||||
// Close.
|
// Close.
|
||||||
func (worker *Worker) Close() {
|
func (worker *Worker) Close() {
|
||||||
|
worker.running = false
|
||||||
close(worker.in)
|
close(worker.in)
|
||||||
if worker.limit != nil {
|
if worker.limit != nil {
|
||||||
close(worker.limit)
|
close(worker.limit)
|
||||||
@ -299,7 +299,9 @@ func (worker *Worker) exec(job *Job) (err error) {
|
|||||||
job.magicCode = common.REQ
|
job.magicCode = common.REQ
|
||||||
job.DataType = datatype
|
job.DataType = datatype
|
||||||
job.Data = r.data
|
job.Data = r.data
|
||||||
job.agent.WriteJob(job)
|
if worker.running {
|
||||||
|
job.agent.WriteJob(job)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user