fixed a exec issue, timeout exec need to fix

--HG--
branch : dev
This commit is contained in:
mikespook 2012-08-30 16:12:15 +08:00
parent e82732a5cc
commit b7ee1d68f5
4 changed files with 30 additions and 32 deletions

View File

@ -16,6 +16,6 @@ def main():
check_request_status(completed_job_request)
if __name__ == '__main__':
for i in range(100):
for i in range(2):
main()

View File

@ -9,7 +9,7 @@ import (
)
func ToUpper(job *worker.Job) ([]byte, error) {
log.Printf("Handle=[%s]; UID=[%s], Data=[%s]\n",
log.Printf("ToUpper -- Handle=[%s]; UID=[%s], Data=[%s]\n",
job.Handle, job.UniqueId, job.Data)
data := []byte(strings.ToUpper(string(job.Data)))
return data, nil
@ -33,8 +33,8 @@ func main() {
}
}
w.JobHandler = func(job *worker.Job) error {
log.Printf("H=%s, UID=%s, Data=%s\n", job.Handle,
job.UniqueId, job.Data)
log.Printf("H=%s, UID=%s, Data=%s, DataType=%d\n", job.Handle,
job.UniqueId, job.Data, job.DataType)
return nil
}
w.AddServer("127.0.0.1:4730")

View File

@ -7,7 +7,6 @@ package worker
import (
"io"
"net"
"time"
"bitbucket.org/mikespook/gearman-go/common"
)
@ -33,6 +32,8 @@ func newAgent(addr string, worker *Worker) (a *agent, err error) {
in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE),
}
// reset abilities
a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil))
return
}
@ -52,21 +53,15 @@ func (a *agent) outLoop() {
// inputing loop
func (a *agent) inLoop() {
defer func() {
recover()
if r := recover(); r != nil {
a.worker.err(common.Errorf("Exiting: %s", r))
}
close(a.in)
close(a.out)
a.worker.removeAgent(a)
}()
noop := true
go func() {
for a.worker.running {
if noop && len(a.in) == 0 {
a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil))
}
<-time.After(time.Second)
}
}()
for a.worker.running {
a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil))
RESTART:
// got noop msg and in queue is zero, grab job
rel, err := a.read()
@ -94,10 +89,7 @@ func (a *agent) inLoop() {
}
switch job.DataType {
case common.NOOP:
noop = true
case common.NO_JOB:
noop = false
a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil))
a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil))
case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
job.agent = a
a.worker.in <- job

View File

@ -272,6 +272,10 @@ func (worker *Worker) exec(job *Job) (err error) {
if !ok {
return common.Errorf("The function does not exist: %s", funcname)
}
var r result
if f.timeout == 0 {
r.data, r.err = f.f(job)
} else {
rslt := make(chan *result)
defer close(rslt)
go func() {
@ -280,13 +284,15 @@ func (worker *Worker) exec(job *Job) (err error) {
r.data, r.err = f.f(job)
rslt <- &r
}()
var r *result
select {
case r = <-rslt:
case re := <-rslt:
r.data = re.data
r.err = re.err
case <-time.After(time.Duration(f.timeout) * time.Second):
r = &result{data:nil, err: common.ErrExecTimeOut}
r.err = common.ErrExecTimeOut
job.cancel()
}
}
var datatype uint32
if r.err == nil {
datatype = common.WORK_COMPLETE