From 79c4bc2e47aa74211227ba175901b9f6180122c9 Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 5 Jun 2012 14:36:39 +0800 Subject: [PATCH] promoted the executing timeout --HG-- branch : dev --- worker/agent.go | 1 - worker/job.go | 15 ++++++++++++++- worker/worker.go | 29 +++++++++++++++++++++++++---- 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 66bf4c1..1e0fe45 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -72,7 +72,6 @@ func (a *agent) inLoop() { rel, err := a.read() if err != nil { if err == common.ErrConnection { - // TODO: reconnection for i:= 0; i < 3 && a.worker.running; i++ { if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { a.worker.err(common.Errorf("Reconnection: %d faild", i)) diff --git a/worker/job.go b/worker/job.go index 687ffb1..71d9ce5 100644 --- a/worker/job.go +++ b/worker/job.go @@ -16,13 +16,16 @@ type Job struct { Handle, UniqueId string agent *agent magicCode, DataType uint32 + c chan bool } // Create a new job func newJob(magiccode, datatype uint32, data []byte) (job *Job) { return &Job{magicCode: magiccode, DataType: datatype, - Data: data} + Data: data, + c: make(chan bool), + } } // Decode job from byte slice @@ -88,3 +91,13 @@ func (job *Job) UpdateStatus(numerator, denominator int) { result = append(result, d...) job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) } + +// cancel the job executing +func (job *Job) cancel() { + job.c <- true +} + +// When a job was canceled, return a true form a channel +func (job *Job) Canceled() chan bool { + return job.c +} diff --git a/worker/worker.go b/worker/worker.go index 5d25b42..dd99c44 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,6 +5,7 @@ package worker import ( + "time" "bytes" "bitbucket.org/mikespook/gearman-go/common" ) @@ -271,21 +272,36 @@ func (worker *Worker) exec(job *Job) (err error) { if !ok { return common.Errorf("The function does not exist: %s", funcname) } - result, err := f.f(job) + rslt := make(chan *result) + defer close(rslt) + go func() { + defer func() {recover()}() + var r result + r.data, r.err = f.f(job) + rslt <- &r + }() + var r *result + select { + case r = <-rslt: + case <-time.After(time.Duration(f.timeout) * time.Second): + r = &result{data:nil, err: common.ErrExecTimeOut} + job.cancel() + } var datatype uint32 - if err == nil { + if r.err == nil { datatype = common.WORK_COMPLETE } else { - if result == nil { + if r.data == nil { datatype = common.WORK_FAIL } else { datatype = common.WORK_EXCEPTION } + err = r.err } job.magicCode = common.REQ job.DataType = datatype - job.Data = result + job.Data = r.data job.agent.WriteJob(job) return } @@ -300,3 +316,8 @@ func (worker *Worker) removeAgent(a *agent) { worker.err(common.ErrNoActiveAgent) } } + +type result struct { + data []byte + err error +}