diff --git a/example/worker/worker.go b/example/worker/worker.go index f31b5fe..a2c924a 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -1,56 +1,60 @@ package main import ( - "os" - "log" - "time" - "strings" - "github.com/mikespook/golib/signal" - "github.com/mikespook/gearman-go/worker" + "github.com/mikespook/gearman-go/worker" + "github.com/mikespook/golib/signal" + "log" + "os" + "strings" + "time" ) func ToUpper(job worker.Job) ([]byte, error) { - log.Printf("ToUpper: Data=[%s]\n", job.Data()) - data := []byte(strings.ToUpper(string(job.Data()))) - return data, nil + log.Printf("ToUpper: Data=[%s]\n", job.Data()) + data := []byte(strings.ToUpper(string(job.Data()))) + return data, nil } func ToUpperDelay10(job worker.Job) ([]byte, error) { - log.Printf("ToUpper: Data=[%s]\n", job.Data()) - time.Sleep(10 * time.Second) - data := []byte(strings.ToUpper(string(job.Data()))) - return data, nil + log.Printf("ToUpper: Data=[%s]\n", job.Data()) + time.Sleep(10 * time.Second) + data := []byte(strings.ToUpper(string(job.Data()))) + return data, nil } func main() { - log.Println("Starting ...") - defer log.Println("Shutdown complete!") - w := worker.New(worker.Unlimited) - defer w.Close() + log.Println("Starting ...") + defer log.Println("Shutdown complete!") + w := worker.New(worker.Unlimited) + defer w.Close() w.ErrorHandler = func(e error) { - log.Println(e) - if e == worker.ErrConnection { - proc, err := os.FindProcess(os.Getpid()) - if err != nil { - log.Println(err) - } - if err := proc.Signal(os.Interrupt); err != nil { - log.Println(err) - } - } - } - w.JobHandler = func(job worker.Job) error { - log.Printf("Data=%s\n", job.Data()) - return nil - } - w.AddServer("tcp4", "127.0.0.1:4730") - w.AddFunc("ToUpper", ToUpper, worker.Immediately) + log.Println(e) + if e == worker.ErrConnection { + proc, err := os.FindProcess(os.Getpid()) + if err != nil { + log.Println(err) + } + if err := proc.Signal(os.Interrupt); err != nil { + log.Println(err) + } + } + } + w.JobHandler = func(job worker.Job) error { + log.Printf("Data=%s\n", job.Data()) + return nil + } + w.AddServer("tcp4", "127.0.0.1:4730") + w.AddFunc("ToUpper", ToUpper, worker.Immediately) w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately) w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately) - go w.Work() - sh := signal.NewHandler() - sh.Bind(os.Interrupt, func() bool {return true}) - sh.Loop() + if err := w.Ready(); err != nil { + log.Fatal(err) + return + } + go w.Work() + sh := signal.NewHandler() + sh.Bind(os.Interrupt, func() bool { return true }) + sh.Loop() } diff --git a/worker/agent.go b/worker/agent.go index ec159d3..961f9e0 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -15,7 +15,6 @@ type agent struct { worker *Worker in chan []byte net, addr string - isConn bool } // Create the agent of job server. @@ -34,16 +33,16 @@ func (a *agent) Connect() (err error) { if err != nil { return } - a.isConn = true + go a.work() return } -func (a *agent) Work() { +func (a *agent) work() { var inpack *inPack var l int var err error var data, leftdata []byte - for a.isConn { + for { if data, err = a.read(BUFFER_SIZE); err != nil { if err == ErrConnClosed { break @@ -72,9 +71,7 @@ func (a *agent) Work() { } func (a *agent) Close() { - if a.conn != nil { - a.conn.Close() - } + a.conn.Close() } func (a *agent) Grab() { @@ -96,13 +93,11 @@ func (a *agent) read(length int) (data []byte, err error) { // read until data can be unpacked for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n { if n, err = a.conn.Read(buf); err != nil { - if !a.isConn { - err = ErrConnClosed - return - } if err == io.EOF && n == 0 { if data == nil { err = ErrConnection + } else { + err = ErrConnClosed } } return diff --git a/worker/outpack.go b/worker/outpack.go index 109f888..4b8ff73 100644 --- a/worker/outpack.go +++ b/worker/outpack.go @@ -6,6 +6,7 @@ package worker import ( +// "fmt" "encoding/binary" ) @@ -43,8 +44,10 @@ func (outpack *outPack) Encode() (data []byte) { if outpack.dataType != WORK_FAIL { data[hi] = '\x00' } - i = i + hi + i = hi + 1 + } + if outpack.dataType != WORK_FAIL { + copy(data[i:], outpack.data) } - copy(data[i:], outpack.data) return } diff --git a/worker/worker.go b/worker/worker.go index 7c75c89..75cba34 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -174,9 +174,7 @@ func (worker *Worker) Ready() (err error) { if err = v.Connect(); err != nil { return } - go v.Work() } - worker.Reset() for funcname, f := range worker.funcs { worker.addFunc(funcname, f.timeout) } @@ -185,12 +183,10 @@ func (worker *Worker) Ready() (err error) { // Main loop func (worker *Worker) Work() { - defer func() { - for _, v := range worker.agents { - v.Close() - } - }() worker.running = true + for _, v := range worker.agents { + v.Grab() + } var inpack *inPack for inpack = range worker.in { go worker.handleInPack(inpack) @@ -263,19 +259,20 @@ func (worker *Worker) exec(inpack *inPack) (err error) { } else { r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second) } - outpack := getOutPack() - if r.err == nil { - outpack.dataType = WORK_COMPLETE - } else { - if r.data == nil { - outpack.dataType = WORK_FAIL - } else { - outpack.dataType = WORK_EXCEPTION - } - err = r.err - } - outpack.data = r.data if worker.running { + outpack := getOutPack() + if r.err == nil { + outpack.dataType = WORK_COMPLETE + } else { + if len(r.data) == 0 { + outpack.dataType = WORK_FAIL + } else { + outpack.dataType = WORK_EXCEPTION + } + err = r.err + } + outpack.handle = inpack.handle + outpack.data = r.data inpack.a.write(outpack) inpack.a.Grab() }