promoted the executing timeout

--HG--
branch : dev
This commit is contained in:
mikespook 2012-06-05 14:36:39 +08:00
parent 5ee7cdb7de
commit 79c4bc2e47
3 changed files with 39 additions and 6 deletions

View File

@ -72,7 +72,6 @@ func (a *agent) inLoop() {
rel, err := a.read() rel, err := a.read()
if err != nil { if err != nil {
if err == common.ErrConnection { if err == common.ErrConnection {
// TODO: reconnection
for i:= 0; i < 3 && a.worker.running; i++ { for i:= 0; i < 3 && a.worker.running; i++ {
if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { if conn, err := net.Dial(common.NETWORK, a.addr); err != nil {
a.worker.err(common.Errorf("Reconnection: %d faild", i)) a.worker.err(common.Errorf("Reconnection: %d faild", i))

View File

@ -16,13 +16,16 @@ type Job struct {
Handle, UniqueId string Handle, UniqueId string
agent *agent agent *agent
magicCode, DataType uint32 magicCode, DataType uint32
c chan bool
} }
// Create a new job // Create a new job
func newJob(magiccode, datatype uint32, data []byte) (job *Job) { func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
return &Job{magicCode: magiccode, return &Job{magicCode: magiccode,
DataType: datatype, DataType: datatype,
Data: data} Data: data,
c: make(chan bool),
}
} }
// Decode job from byte slice // Decode job from byte slice
@ -88,3 +91,13 @@ func (job *Job) UpdateStatus(numerator, denominator int) {
result = append(result, d...) result = append(result, d...)
job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result))
} }
// cancel the job executing
func (job *Job) cancel() {
job.c <- true
}
// When a job was canceled, return a true form a channel
func (job *Job) Canceled() chan bool {
return job.c
}

View File

@ -5,6 +5,7 @@
package worker package worker
import ( import (
"time"
"bytes" "bytes"
"bitbucket.org/mikespook/gearman-go/common" "bitbucket.org/mikespook/gearman-go/common"
) )
@ -271,21 +272,36 @@ func (worker *Worker) exec(job *Job) (err error) {
if !ok { if !ok {
return common.Errorf("The function does not exist: %s", funcname) return common.Errorf("The function does not exist: %s", funcname)
} }
result, err := f.f(job) rslt := make(chan *result)
defer close(rslt)
go func() {
defer func() {recover()}()
var r result
r.data, r.err = f.f(job)
rslt <- &r
}()
var r *result
select {
case r = <-rslt:
case <-time.After(time.Duration(f.timeout) * time.Second):
r = &result{data:nil, err: common.ErrExecTimeOut}
job.cancel()
}
var datatype uint32 var datatype uint32
if err == nil { if r.err == nil {
datatype = common.WORK_COMPLETE datatype = common.WORK_COMPLETE
} else { } else {
if result == nil { if r.data == nil {
datatype = common.WORK_FAIL datatype = common.WORK_FAIL
} else { } else {
datatype = common.WORK_EXCEPTION datatype = common.WORK_EXCEPTION
} }
err = r.err
} }
job.magicCode = common.REQ job.magicCode = common.REQ
job.DataType = datatype job.DataType = datatype
job.Data = result job.Data = r.data
job.agent.WriteJob(job) job.agent.WriteJob(job)
return return
} }
@ -300,3 +316,8 @@ func (worker *Worker) removeAgent(a *agent) {
worker.err(common.ErrNoActiveAgent) worker.err(common.ErrNoActiveAgent)
} }
} }
type result struct {
data []byte
err error
}