fixed resources leaking

--HG--
branch : dev
This commit is contained in:
mikespook 2012-09-02 22:42:54 +08:00
parent adf3729627
commit fe5a31a51e
5 changed files with 59 additions and 38 deletions

View File

@ -32,7 +32,7 @@ Install both:
log.Println(e) log.Println(e)
} }
w.AddServer("127.0.0.1:4730") w.AddServer("127.0.0.1:4730")
w.AddFunc("ToUpper", ToUpper, 0) w.AddFunc("ToUpper", ToUpper, worker.Immediately)
w.AddFunc("ToUpperTimeOut5", ToUpper, 5) w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
w.Work() w.Work()
@ -63,6 +63,7 @@ Xing Xing <mikespook@gmail.com>
# History # History
* 0.1.2 Fixed issues: timeout executing, resources leaking.
* 0.1.1 Fixed the issue of grabbing jobs. * 0.1.1 Fixed the issue of grabbing jobs.
* 0.1 Code refactoring; Redesign the API. * 0.1 Code refactoring; Redesign the API.
* 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API. * 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API.

View File

@ -12,15 +12,25 @@ def check_request_status(job_request):
def main(): def main():
client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730']) client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730'])
try:
completed_job_request = client.submit_job("ToUpper", "arbitrary binary data") completed_job_request = client.submit_job("ToUpper", "arbitrary binary data")
check_request_status(completed_job_request) check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("ToUpperTimeOut5", "arbitrary binary data") completed_job_request = client.submit_job("ToUpperTimeOut5", "arbitrary binary data")
check_request_status(completed_job_request) check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("ToUpperTimeOut20", "arbitrary binary data") completed_job_request = client.submit_job("ToUpperTimeOut20", "arbitrary binary data")
check_request_status(completed_job_request) check_request_status(completed_job_request)
except Exception as e:
print type(e)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -49,7 +49,7 @@ func main() {
return nil return nil
} }
w.AddServer("127.0.0.1:4730") w.AddServer("127.0.0.1:4730")
w.AddFunc("ToUpper", ToUpper, 0) w.AddFunc("ToUpper", ToUpper, worker.Immediately)
w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
go w.Work() go w.Work()

View File

@ -91,12 +91,18 @@ func (job *Job) UpdateStatus(numerator, denominator int) {
job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result))
} }
// close the job
func (job *Job) Close() {
close(job.c)
}
// cancel the job executing // cancel the job executing
func (job *Job) cancel() { func (job *Job) cancel() {
defer func() {recover()}()
job.c <- true job.c <- true
} }
// When a job was canceled, return a true form a channel // When a job was canceled, return a true form a channel
func (job *Job) Canceled() chan bool { func (job *Job) Canceled() <-chan bool {
return job.c return job.c
} }

View File

@ -13,6 +13,8 @@ import (
const ( const (
Unlimited = 0 Unlimited = 0
OneByOne = 1 OneByOne = 1
Immediately = 0
) )
var ( var (
@ -21,7 +23,7 @@ var (
// Job handler // Job handler
type JobHandler func(*Job) error type JobHandler func(*Job) error
type JobFunc func(job *Job) ([]byte, error) type JobFunc func(*Job) ([]byte, error)
// The definition of the callback function. // The definition of the callback function.
type jobFunc struct { type jobFunc struct {
@ -179,21 +181,20 @@ func (worker *Worker) Work() {
var job *Job var job *Job
for ok { for ok {
if job, ok = <-worker.in; ok { if job, ok = <-worker.in; ok {
go func() {
defer job.Close()
switch job.DataType { switch job.DataType {
case common.ERROR: case common.ERROR:
go func() {
_, err := common.GetError(job.Data) _, err := common.GetError(job.Data)
worker.err(err) worker.err(err)
}()
case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
go func() {
if err := worker.exec(job); err != nil { if err := worker.exec(job); err != nil {
worker.err(err) worker.err(err)
} }
}()
default: default:
go worker.handleJob(job) worker.handleJob(job)
} }
}()
} }
} }
} }
@ -272,26 +273,12 @@ func (worker *Worker) exec(job *Job) (err error) {
if !ok { if !ok {
return common.Errorf("The function does not exist: %s", funcname) return common.Errorf("The function does not exist: %s", funcname)
} }
var r result var r *result
if f.timeout == 0 { if f.timeout == 0 {
r.data, r.err = f.f(job) d, e := f.f(job)
r = &result{data:d, err: e}
} else { } else {
rslt := make(chan *result) r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second)
defer close(rslt)
go func() {
defer func() {recover()}()
var r result
r.data, r.err = f.f(job)
rslt <- &r
}()
select {
case re := <-rslt:
r.data = re.data
r.err = re.err
case <-time.After(time.Duration(f.timeout) * time.Second):
r.err = common.ErrExecTimeOut
job.cancel()
}
} }
var datatype uint32 var datatype uint32
if r.err == nil { if r.err == nil {
@ -327,3 +314,20 @@ type result struct {
data []byte data []byte
err error err error
} }
func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) {
rslt := make(chan *result)
defer close(rslt)
go func() {
defer func() {recover()}()
d, e := f(job)
rslt <- &result{data: d, err: e}
}()
select {
case r = <-rslt:
case <-time.After(timeout):
go job.cancel()
return &result{err:common.ErrExecTimeOut}
}
return r
}