From b7ee1d68f50581d1b1115657c7a0662142642b54 Mon Sep 17 00:00:00 2001 From: mikespook Date: Thu, 30 Aug 2012 16:12:15 +0800 Subject: [PATCH] fixed a exec issue, timeout exec need to fix --HG-- branch : dev --- example/py/client.py | 2 +- example/worker.go | 6 +++--- worker/agent.go | 22 +++++++--------------- worker/worker.go | 32 +++++++++++++++++++------------- 4 files changed, 30 insertions(+), 32 deletions(-) diff --git a/example/py/client.py b/example/py/client.py index 592f8ee..d40551e 100755 --- a/example/py/client.py +++ b/example/py/client.py @@ -16,6 +16,6 @@ def main(): check_request_status(completed_job_request) if __name__ == '__main__': - for i in range(100): + for i in range(2): main() diff --git a/example/worker.go b/example/worker.go index 18fed29..a747584 100644 --- a/example/worker.go +++ b/example/worker.go @@ -9,7 +9,7 @@ import ( ) func ToUpper(job *worker.Job) ([]byte, error) { - log.Printf("Handle=[%s]; UID=[%s], Data=[%s]\n", + log.Printf("ToUpper -- Handle=[%s]; UID=[%s], Data=[%s]\n", job.Handle, job.UniqueId, job.Data) data := []byte(strings.ToUpper(string(job.Data))) return data, nil @@ -33,8 +33,8 @@ func main() { } } w.JobHandler = func(job *worker.Job) error { - log.Printf("H=%s, UID=%s, Data=%s\n", job.Handle, - job.UniqueId, job.Data) + log.Printf("H=%s, UID=%s, Data=%s, DataType=%d\n", job.Handle, + job.UniqueId, job.Data, job.DataType) return nil } w.AddServer("127.0.0.1:4730") diff --git a/worker/agent.go b/worker/agent.go index 1e0fe45..7740acd 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -7,7 +7,6 @@ package worker import ( "io" "net" - "time" "bitbucket.org/mikespook/gearman-go/common" ) @@ -33,6 +32,8 @@ func newAgent(addr string, worker *Worker) (a *agent, err error) { in: make(chan []byte, common.QUEUE_SIZE), out: make(chan *Job, common.QUEUE_SIZE), } + // reset abilities + a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil)) return } @@ -52,21 +53,15 @@ func (a *agent) outLoop() { // inputing loop func (a *agent) inLoop() { defer func() { - recover() + if r := recover(); r != nil { + a.worker.err(common.Errorf("Exiting: %s", r)) + } close(a.in) close(a.out) a.worker.removeAgent(a) }() - noop := true - go func() { - for a.worker.running { - if noop && len(a.in) == 0 { - a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) - } - <-time.After(time.Second) - } - }() for a.worker.running { + a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) RESTART: // got noop msg and in queue is zero, grab job rel, err := a.read() @@ -94,10 +89,7 @@ func (a *agent) inLoop() { } switch job.DataType { case common.NOOP: - noop = true - case common.NO_JOB: - noop = false - a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) + a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil)) case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: job.agent = a a.worker.in <- job diff --git a/worker/worker.go b/worker/worker.go index dd99c44..b248aaa 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -272,20 +272,26 @@ func (worker *Worker) exec(job *Job) (err error) { if !ok { return common.Errorf("The function does not exist: %s", funcname) } - rslt := make(chan *result) - defer close(rslt) - go func() { - defer func() {recover()}() - var r result + var r result + if f.timeout == 0 { r.data, r.err = f.f(job) - rslt <- &r - }() - var r *result - select { - case r = <-rslt: - case <-time.After(time.Duration(f.timeout) * time.Second): - r = &result{data:nil, err: common.ErrExecTimeOut} - job.cancel() + } else { + rslt := make(chan *result) + defer close(rslt) + go func() { + defer func() {recover()}() + var r result + r.data, r.err = f.f(job) + rslt <- &r + }() + select { + case re := <-rslt: + r.data = re.data + r.err = re.err + case <-time.After(time.Duration(f.timeout) * time.Second): + r.err = common.ErrExecTimeOut + job.cancel() + } } var datatype uint32 if r.err == nil {