From fe5a31a51ed3a4a47c7ec2252fcc71478714595a Mon Sep 17 00:00:00 2001 From: mikespook Date: Sun, 2 Sep 2012 22:42:54 +0800 Subject: [PATCH] fixed resources leaking --HG-- branch : dev --- README.md | 3 ++- example/py/client.py | 22 +++++++++++----- example/worker.go | 2 +- worker/job.go | 8 +++++- worker/worker.go | 62 +++++++++++++++++++++++--------------------- 5 files changed, 59 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 1f24a68..8053be6 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ Install both: log.Println(e) } w.AddServer("127.0.0.1:4730") - w.AddFunc("ToUpper", ToUpper, 0) + w.AddFunc("ToUpper", ToUpper, worker.Immediately) w.AddFunc("ToUpperTimeOut5", ToUpper, 5) w.Work() @@ -63,6 +63,7 @@ Xing Xing # History + * 0.1.2 Fixed issues: timeout executing, resources leaking. * 0.1.1 Fixed the issue of grabbing jobs. * 0.1 Code refactoring; Redesign the API. * 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API. diff --git a/example/py/client.py b/example/py/client.py index 9f2f2e1..1cb3777 100755 --- a/example/py/client.py +++ b/example/py/client.py @@ -12,16 +12,26 @@ def check_request_status(job_request): def main(): client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730']) - completed_job_request = client.submit_job("ToUpper", "arbitrary binary data") - check_request_status(completed_job_request) + try: + completed_job_request = client.submit_job("ToUpper", "arbitrary binary data") + check_request_status(completed_job_request) + except Exception as e: + print type(e) - completed_job_request = client.submit_job("ToUpperTimeOut5", "arbitrary binary data") - check_request_status(completed_job_request) - completed_job_request = client.submit_job("ToUpperTimeOut20", "arbitrary binary data") - check_request_status(completed_job_request) + try: + completed_job_request = client.submit_job("ToUpperTimeOut5", "arbitrary binary data") + check_request_status(completed_job_request) + except Exception as e: + print type(e) + try: + completed_job_request = client.submit_job("ToUpperTimeOut20", "arbitrary binary data") + check_request_status(completed_job_request) + except Exception as e: + print type(e) + if __name__ == '__main__': main() diff --git a/example/worker.go b/example/worker.go index 6e06ca4..3975cac 100644 --- a/example/worker.go +++ b/example/worker.go @@ -49,7 +49,7 @@ func main() { return nil } w.AddServer("127.0.0.1:4730") - w.AddFunc("ToUpper", ToUpper, 0) + w.AddFunc("ToUpper", ToUpper, worker.Immediately) w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) go w.Work() diff --git a/worker/job.go b/worker/job.go index e3cebe0..278216a 100644 --- a/worker/job.go +++ b/worker/job.go @@ -91,12 +91,18 @@ func (job *Job) UpdateStatus(numerator, denominator int) { job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) } +// close the job +func (job *Job) Close() { + close(job.c) +} + // cancel the job executing func (job *Job) cancel() { + defer func() {recover()}() job.c <- true } // When a job was canceled, return a true form a channel -func (job *Job) Canceled() chan bool { +func (job *Job) Canceled() <-chan bool { return job.c } diff --git a/worker/worker.go b/worker/worker.go index b248aaa..27f92a9 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -13,6 +13,8 @@ import ( const ( Unlimited = 0 OneByOne = 1 + + Immediately = 0 ) var ( @@ -21,7 +23,7 @@ var ( // Job handler type JobHandler func(*Job) error -type JobFunc func(job *Job) ([]byte, error) +type JobFunc func(*Job) ([]byte, error) // The definition of the callback function. type jobFunc struct { @@ -179,21 +181,20 @@ func (worker *Worker) Work() { var job *Job for ok { if job, ok = <-worker.in; ok { - switch job.DataType { - case common.ERROR: - go func() { + go func() { + defer job.Close() + switch job.DataType { + case common.ERROR: _, err := common.GetError(job.Data) worker.err(err) - }() - case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: - go func() { + case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: if err := worker.exec(job); err != nil { worker.err(err) } - }() - default: - go worker.handleJob(job) - } + default: + worker.handleJob(job) + } + }() } } } @@ -272,26 +273,12 @@ func (worker *Worker) exec(job *Job) (err error) { if !ok { return common.Errorf("The function does not exist: %s", funcname) } - var r result + var r *result if f.timeout == 0 { - r.data, r.err = f.f(job) + d, e := f.f(job) + r = &result{data:d, err: e} } else { - rslt := make(chan *result) - defer close(rslt) - go func() { - defer func() {recover()}() - var r result - r.data, r.err = f.f(job) - rslt <- &r - }() - select { - case re := <-rslt: - r.data = re.data - r.err = re.err - case <-time.After(time.Duration(f.timeout) * time.Second): - r.err = common.ErrExecTimeOut - job.cancel() - } + r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second) } var datatype uint32 if r.err == nil { @@ -327,3 +314,20 @@ type result struct { data []byte err error } + +func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) { + rslt := make(chan *result) + defer close(rslt) + go func() { + defer func() {recover()}() + d, e := f(job) + rslt <- &result{data: d, err: e} + }() + select { + case r = <-rslt: + case <-time.After(timeout): + go job.cancel() + return &result{err:common.ErrExecTimeOut} + } + return r +}