Merge pull request #11 from miraclesu/close_friendly

Close friendly
This commit is contained in:
Xing 2013-04-02 23:35:00 -07:00
commit 1591d0521b
2 changed files with 9 additions and 5 deletions

View File

@ -41,7 +41,7 @@ func newAgent(addr string, worker *Worker) (a *agent, err error) {
func (a *agent) outLoop() {
ok := true
var job *Job
for ok {
for a.worker.running && ok {
if job, ok = <-a.out; ok {
if err := a.write(job.Encode()); err != nil {
a.worker.err(err)
@ -92,10 +92,12 @@ func (a *agent) inLoop() {
a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil))
case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
job.agent = a
if a.worker.running {
a.worker.in <- job
}
}
}
}
func (a *agent) Close() {
a.conn.Close()

View File

@ -165,7 +165,7 @@ func (worker *Worker) removeFunc(funcname string) {
func (worker *Worker) dealJob(job *Job) {
defer func() {
job.Close()
if worker.limit != nil {
if worker.running && worker.limit != nil {
worker.limit <- true
}
}()
@ -185,7 +185,6 @@ func (worker *Worker) dealJob(job *Job) {
// Main loop
func (worker *Worker) Work() {
defer func() {
worker.running = false
for _, v := range worker.agents {
v.Close()
}
@ -220,6 +219,7 @@ func (worker *Worker) handleJob(job *Job) {
// Close.
func (worker *Worker) Close() {
worker.running = false
close(worker.in)
if worker.limit != nil {
close(worker.limit)
@ -299,7 +299,9 @@ func (worker *Worker) exec(job *Job) (err error) {
job.magicCode = common.REQ
job.DataType = datatype
job.Data = r.data
if worker.running {
job.agent.WriteJob(job)
}
return
}