diff --git a/.travis.yml b/.travis.yml index b66a8c6..d897dd8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,4 @@ language: go + - 1.2 before_install: - sudo apt-get install -qq gearman-job-server diff --git a/README.md b/README.md index c9a341a..4095447 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,12 @@ Usage w.AddServer("127.0.0.1:4730") w.AddFunc("ToUpper", ToUpper, worker.Immediately) w.AddFunc("ToUpperTimeOut5", ToUpper, 5) - w.Work() + if err := w.Ready(); err != nil { + log.Fatal(err) + return + } + go w.Work() + ## Client diff --git a/client/client.go b/client/client.go index c426226..99d760b 100644 --- a/client/client.go +++ b/client/client.go @@ -9,7 +9,7 @@ import ( "io" "net" "sync" - "fmt" +// "fmt" ) /* @@ -21,13 +21,14 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) */ type Client struct { + sync.Mutex + net, addr, lastcall string respHandler map[string]ResponseHandler innerHandler map[string]ResponseHandler in chan []byte isConn bool conn net.Conn - mutex sync.RWMutex ErrorHandler ErrorHandler } @@ -115,7 +116,6 @@ func (client *Client) readLoop() { client.err(err) continue } - fmt.Printf("[%X]", data) client.in <- data } close(client.in) @@ -141,23 +141,25 @@ func (client *Client) processLoop() { continue } leftdata = nil - switch resp.DataType { - case ERROR: - if client.lastcall != "" { - client.handleInner(client.lastcall, resp) - client.lastcall = "" - } else { - client.err(GetError(resp.Data)) + for resp != nil { + switch resp.DataType { + case ERROR: + if client.lastcall != "" { + resp = client.handleInner(client.lastcall, resp) + client.lastcall = "" + } else { + client.err(GetError(resp.Data)) + } + case STATUS_RES: + resp = client.handleInner("s"+resp.Handle, resp) + case JOB_CREATED: + resp = client.handleInner("c", resp) + case ECHO_RES: + resp = client.handleInner("e", resp) + case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, + WORK_FAIL, WORK_EXCEPTION: + resp = client.handleResponse(resp.Handle, resp) } - case STATUS_RES: - client.handleInner("s"+resp.Handle, resp) - case JOB_CREATED: - client.handleInner("c", resp) - case ECHO_RES: - client.handleInner("e", resp) - case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, - WORK_FAIL, WORK_EXCEPTION: - client.handleResponse(resp.Handle, resp) } if len(data) > l { leftdata = data[l:] @@ -173,44 +175,44 @@ func (client *Client) err(e error) { } // job handler -func (client *Client) handleResponse(key string, resp *Response) { - client.mutex.RLock() - defer client.mutex.RUnlock() +func (client *Client) handleResponse(key string, resp *Response) *Response { if h, ok := client.respHandler[key]; ok { h(resp) delete(client.respHandler, key) + return nil } + return resp } // job handler -func (client *Client) handleInner(key string, resp *Response) { +func (client *Client) handleInner(key string, resp *Response) * Response { if h, ok := client.innerHandler[key]; ok { h(resp) delete(client.innerHandler, key) + return nil } + return resp } // Internal do func (client *Client) do(funcname string, data []byte, flag uint32) (handle string, err error) { - var wg sync.WaitGroup - wg.Add(1) - client.mutex.RLock() + var mutex sync.Mutex + mutex.Lock() client.lastcall = "c" - client.innerHandler["c"] = ResponseHandler(func(resp *Response) { - defer wg.Done() - defer client.mutex.RUnlock() + client.innerHandler["c"] = func(resp *Response) { if resp.DataType == ERROR { err = GetError(resp.Data) return } handle = resp.Handle - }) + mutex.Unlock() + } id := IdGen.Id() req := getJob(id, []byte(funcname), data) req.DataType = flag client.write(req) - wg.Wait() + mutex.Lock() return } @@ -232,8 +234,6 @@ func (client *Client) Do(funcname string, data []byte, datatype = SUBMIT_JOB } handle, err = client.do(funcname, data, datatype) - client.mutex.Lock() - defer client.mutex.Unlock() if h != nil { client.respHandler[handle] = h } @@ -262,41 +262,39 @@ func (client *Client) DoBg(funcname string, data []byte, // Get job status from job server. // !!!Not fully tested.!!! func (client *Client) Status(handle string) (status *Status, err error) { - var wg sync.WaitGroup - wg.Add(1) - client.mutex.Lock() + var mutex sync.Mutex + mutex.Lock() client.lastcall = "s" + handle - client.innerHandler["s"+handle] = ResponseHandler(func(resp *Response) { - defer wg.Done() - defer client.mutex.Unlock() + client.innerHandler["s"+handle] = func(resp *Response) { var err error status, err = resp.Status() if err != nil { client.err(err) } - }) + mutex.Unlock() + } req := getRequest() req.DataType = GET_STATUS req.Data = []byte(handle) client.write(req) - wg.Wait() + mutex.Lock() return } // Send a something out, get the samething back. func (client *Client) Echo(data []byte) (echo []byte, err error) { - var wg sync.WaitGroup - wg.Add(1) - client.innerHandler["e"] = ResponseHandler(func(resp *Response) { - defer wg.Done() + var mutex sync.Mutex + mutex.Lock() + client.innerHandler["e"] = func(resp *Response) { echo = resp.Data - }) + mutex.Unlock() + } req := getRequest() req.DataType = ECHO_REQ req.Data = data - client.write(req) client.lastcall = "e" - wg.Wait() + client.write(req) + mutex.Lock() return } diff --git a/client/request.go b/client/request.go index 485eecc..f906692 100644 --- a/client/request.go +++ b/client/request.go @@ -9,7 +9,7 @@ import ( "encoding/binary" ) -// request +// Request from client type request struct { DataType uint32 Data []byte diff --git a/example/client/client b/example/client/client new file mode 100755 index 0000000..c01975f Binary files /dev/null and b/example/client/client differ diff --git a/example/client/client.go b/example/client/client.go index 9161c57..65cb58e 100644 --- a/example/client/client.go +++ b/example/client/client.go @@ -1,35 +1,35 @@ package main import ( + "github.com/mikespook/gearman-go/client" "log" "sync" - "github.com/mikespook/gearman-go/client" ) func main() { var wg sync.WaitGroup // Set the autoinc id generator - // You can write your own id generator + // You can write your own id generator // by implementing IdGenerator interface. // client.IdGen = client.NewAutoIncId() - c, err := client.New("tcp4", "127.0.0.1:4730") + c, err := client.New("tcp4", "127.0.0.1:4730") if err != nil { log.Fatalln(err) } defer c.Close() - c.ErrorHandler = func(e error) { + c.ErrorHandler = func(e error) { log.Println(e) } echo := []byte("Hello\x00 world") wg.Add(1) - echomsg, err := c.Echo(echo) + echomsg, err := c.Echo(echo) if err != nil { log.Fatalln(err) } log.Println(string(echomsg)) wg.Done() - jobHandler := func(job *client.Response) { + jobHandler := func(job *client.Job) { log.Printf("%s", job.Data) wg.Done() } @@ -38,7 +38,7 @@ func main() { log.Fatalln(err) } wg.Add(1) - status, err := c.Status(handle) + status, err := c.Status(handle) if err != nil { log.Fatalln(err) } diff --git a/example/exec-worker/README.md b/example/exec-worker/README.md deleted file mode 100644 index bfb7d55..0000000 --- a/example/exec-worker/README.md +++ /dev/null @@ -1,7 +0,0 @@ -= Exec Worker - -This program execute shell or php scripts as backend jobs. - -Scripts can communicat with the worker through STDERR using formated output. - -USE THIS AT YOUR OWN RASK!!! diff --git a/example/exec-worker/client/client.php b/example/exec-worker/client/client.php deleted file mode 100644 index 38515f5..0000000 --- a/example/exec-worker/client/client.php +++ /dev/null @@ -1,40 +0,0 @@ -addServer(); -$data = array( - 'Name' => 'foobar', - 'Args' => array("0", "1", "2", "3"), -); - -$c = isset($_SERVER['argv'][1]) ? $_SERVER['argv'][1] : 10; - -for ($i = 0; $i < $c; $i ++) { - - # run reverse client in the background - $job_handle = $gmclient->doBackground("execphp", json_encode($data)); - - if ($gmclient->returnCode() != GEARMAN_SUCCESS) { - echo "bad return code\n"; - exit; - } -} -/* -$data = array( - 'Name' => 'notexists', - 'Args' => array("0", "1", "2", "3"), -); - -# run reverse client in the background -$job_handle = $gmclient->doBackground("exec", json_encode($data)); - -if ($gmclient->returnCode() != GEARMAN_SUCCESS) -{ - echo "bad return code\n"; - exit; -} - */ -echo "done!\n"; diff --git a/example/exec-worker/exec.go b/example/exec-worker/exec.go deleted file mode 100644 index 760f97b..0000000 --- a/example/exec-worker/exec.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2012 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a commercial -// license that can be found in the LICENSE file. - -package main - -import ( - "io" - "bytes" - "os/exec" - "encoding/json" - "github.com/mikespook/golib/log" - "github.com/mikespook/gearman-go/worker" -) - -type outData struct { - Numerator, Denominator int - Warning bool - Data []byte - Debug string -} - -type ShExec struct { - Name, basedir string - Args []string - *exec.Cmd - job *worker.Job - Logger *log.Logger -} - -func NewShExec(basedir string, job *worker.Job) (sh *ShExec, err error) { - sh = &ShExec{ - basedir: basedir, - job: job, - Args: make([]string, 0), - } - if err = sh.parse(job.Data); err != nil { - return nil, err - } - return -} - -func (sh *ShExec) parse(data []byte) (err error) { - if err = json.Unmarshal(data, sh); err != nil { - return - } - return -} - -func (sh *ShExec) Append(args ... string) { - sh.Args = append(sh.Args, args ...) -} - -func (sh *ShExec) Prepend(args ... string) { - sh.Args = append(args, sh.Args ...) -} - -func (sh *ShExec) Exec() (rslt []byte, err error){ - sh.Logger.Debugf("Executing: Handle=%s, Exec=%s, Args=%v", - sh.job.Handle, sh.Name, sh.Args) - sh.Cmd = exec.Command(sh.Name, sh.Args ... ) - go func() { - if ok := <-sh.job.Canceled(); ok { - sh.Cmd.Process.Kill() - } - }() - sh.Cmd.Dir = sh.basedir - var buf bytes.Buffer - sh.Cmd.Stdout = &buf - var errPipe io.ReadCloser - if errPipe, err = sh.Cmd.StderrPipe(); err != nil { - return nil, err - } - defer errPipe.Close() - go sh.processErr(errPipe) - if err = sh.Cmd.Run(); err != nil { - return nil, err - } - rslt = buf.Bytes() - return -} - -func (sh *ShExec) processErr(pipe io.ReadCloser) { - result := make([]byte, 1024) - var more []byte - for { - n, err := pipe.Read(result) - if err != nil { - if err != io.EOF { - sh.job.UpdateData([]byte(err.Error()), true) - } - return - } - if more != nil { - result = append(more, result[:n]...) - } else { - result = result[:n] - } - if n < 1024 { - var out outData - if err := json.Unmarshal(result, &out); err != nil { - sh.job.UpdateData([]byte(result), true) - return - } - if out.Debug == "" { - if out.Data != nil { - sh.job.UpdateData(out.Data, out.Warning) - } - if out.Numerator != 0 || out.Denominator != 0 { - sh.job.UpdateStatus(out.Numerator, out.Denominator) - } - } else { - sh.Logger.Debugf("Debug: Handle=%s, Exec=%s, Args=%v, Data=%s", - sh.job.Handle, sh.Name, sh.Args, out.Debug) - } - more = nil - } else { - more = result - } - } -} - diff --git a/example/exec-worker/func.go b/example/exec-worker/func.go deleted file mode 100644 index 383861c..0000000 --- a/example/exec-worker/func.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2012 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a commercial -// license that can be found in the LICENSE file. - -package main - -import ( - "github.com/mikespook/golib/log" - "github.com/mikespook/gearman-go/worker" -) - -func execShell(job *worker.Job) (result []byte, err error) { - log.Messagef("[Shell]Received: Handle=%s", job.Handle) - defer log.Messagef("[Shell]Finished: Handle=%s", job.Handle) - log.Debugf("[Shell]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data) - var sh *ShExec - if sh, err = NewShExec(*basedir, job); err != nil { - return - } - sh.Logger = log.DefaultLogger - return sh.Exec() -} - -func execPHP(job *worker.Job) (result []byte, err error) { - log.Messagef("[PHP]Received: Handle=%s", job.Handle) - defer log.Messagef("[PHP]Finished: Handle=%s", job.Handle) - log.Debugf("[PHP]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data) - var sh *ShExec - if sh, err = NewShExec(*basedir, job); err != nil { - return - } - sh.Prepend("-f", sh.Name + ".php") - sh.Name = "php" - sh.Logger = log.DefaultLogger - return sh.Exec() -} - diff --git a/example/exec-worker/lib/X.php b/example/exec-worker/lib/X.php deleted file mode 100644 index 2079009..0000000 --- a/example/exec-worker/lib/X.php +++ /dev/null @@ -1,40 +0,0 @@ -stderr = fopen("php://stderr", "r"); - } - - public function sendMsg($msg, $warning = false, $numerator = 0, $denominator = 0) { - $result = array( - 'Numerator' => $numerator, - 'Denominator' => $denominator, - 'Warning' => $warning, - 'Data' => $msg, - ); - fwrite($this->stderr, json_encode($result)); - } - - public function debug($msg) { - $result = array( - 'Debug' => $msg, - ); - fwrite($this->stderr, json_encode($result)); - } - - public function close() { - fclose($this->stderr); - } - - public function argv($index) { - $argv = $_SERVER['argv']; - return isset($argv[$index]) ? $argv[$index] : null; - } - - public function argc() { - $argc = $_SERVER['argc']; - return $argc; - } -} diff --git a/example/exec-worker/lib/bootstrap.php b/example/exec-worker/lib/bootstrap.php deleted file mode 100644 index 788673d..0000000 --- a/example/exec-worker/lib/bootstrap.php +++ /dev/null @@ -1,11 +0,0 @@ - 1) { - include implode(DIRECTORY_SEPARATOR, $names) . '.php'; - } else { - include $class . '.php'; - } -} - -spl_autoload_register('autoloader'); diff --git a/example/exec-worker/log.go b/example/exec-worker/log.go deleted file mode 100644 index bc1aaae..0000000 --- a/example/exec-worker/log.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2012 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a commercial -// license that can be found in the LICENSE file. - -package main - -import ( - "github.com/mikespook/golib/log" - "flag" - "strings" -) - -var ( - logfile = flag.String("log", "", - "Log file to write errors and information to." + - " Empty string output to STDOUT.") - loglevel = flag.String("log-level", "all", "Log level to record." + - " Values 'error', 'warning', 'message', 'debug', 'all' and 'none'" + - " are accepted. Use '|' to combine more levels.") -) - -func initLog() { - level := log.LogNone - levels := strings.SplitN(*loglevel, "|", -1) - for _, v := range levels { - switch v { - case "none": - level = level | log.LogNone - break - case "error": - level = level | log.LogError - case "warning": - level = level | log.LogWarning - case "message": - level = level | log.LogMessage - case "debug": - level = level | log.LogDebug - case "all": - level = log.LogAll - default: - } - } - if err := log.Init(*logfile, level); err != nil { - log.Error(err) - } -} diff --git a/example/exec-worker/script/default.php b/example/exec-worker/script/default.php deleted file mode 100644 index c6bd04a..0000000 --- a/example/exec-worker/script/default.php +++ /dev/null @@ -1,8 +0,0 @@ -sendMsg("test"); -$x->debug("debug message"); -sleep(10); -$x->close(); -exit(0); diff --git a/example/exec-worker/worker.go b/example/exec-worker/worker.go deleted file mode 100644 index 271639a..0000000 --- a/example/exec-worker/worker.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2012 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a commercial -// license that can be found in the LICENSE file. - -package main - -import ( - "os" - "flag" - "time" - "github.com/mikespook/golib/log" - "github.com/mikespook/golib/pid" - "github.com/mikespook/golib/prof" - "github.com/mikespook/golib/signal" - "github.com/mikespook/gearman-go/worker" -) - -var ( - pidfile = flag.String("pid", "/run/seedworker.pid", - "PID file to write pid") - proffile = flag.String("prof", "", "Profiling file") - dumpfile = flag.String("dump", "", "Heap dumping file") - dumptime = flag.Int("dumptime", 5, "Heap dumping time interval") - - joblimit = flag.Int("job-limit", worker.Unlimited, - "Maximum number of concurrently executing job." + - " Zero is unlimited.") - - basedir = flag.String("basedir", "", "Working directory of the php scripts.") - timeout = flag.Uint("timeout", 30, "Executing time out.") - gearmand = flag.String("gearmand", "127.0.0.1:4730", "Address and port of gearmand") -) - -func init() { - - flag.Parse() - initLog() - - if *basedir == "" { - *basedir = "./script/" - } -} - -func main() { - log.Message("Starting ... ") - defer func() { - time.Sleep(time.Second) - log.Message("Shutdown complate!") - }() - - // init profiling file - if *proffile != "" { - log.Debugf("Open a profiling file: %s", *proffile) - if err := prof.Start(*proffile); err != nil { - log.Error(err) - } else { - defer prof.Stop() - } - } - - // init heap dumping file - if *dumpfile != "" { - log.Debugf("Open a heap dumping file: %s", *dumpfile) - if err := prof.NewDump(*dumpfile); err != nil { - log.Error(err) - } else { - defer prof.CloseDump() - go func() { - for prof.Dumping { - time.Sleep(time.Duration(*dumptime) * time.Second) - prof.Dump() - } - }() - } - } - - // init pid file - log.Debugf("Open a pid file: %s", *pidfile) - if pidFile, err := pid.New(*pidfile); err != nil { - log.Error(err) - } else { - defer pidFile.Close() - } - - w := worker.New(*joblimit) - if err := w.AddServer(*gearmand); err != nil { - log.Error(err) - return - } - if err := w.AddFunc("exec", execShell, uint32(*timeout)); err != nil { - log.Error(err) - return - } - if err := w.AddFunc("execphp", execPHP, uint32(*timeout)); err != nil { - log.Error(err) - return - } - defer w.Close() - go w.Work() - - // signal handler - sh := signal.NewHandler() - sh.Bind(os.Interrupt, func() bool {return true}) - sh.Loop() -} diff --git a/example/worker/worker b/example/worker/worker new file mode 100755 index 0000000..c7ea095 Binary files /dev/null and b/example/worker/worker differ diff --git a/example/worker/worker.go b/example/worker/worker.go index bcb7154..d610776 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -1,61 +1,60 @@ package main import ( - "os" - "log" - "time" - "strings" - "github.com/mikespook/golib/signal" - "github.com/mikespook/gearman-go/worker" + "github.com/mikespook/gearman-go/worker" + "github.com/mikespook/golib/signal" + "log" + "os" + "strings" + "time" ) -func ToUpper(job *worker.Job) ([]byte, error) { - log.Printf("ToUpper: Handle=[%s]; UID=[%s], Data=[%s]\n", - job.Handle, job.UniqueId, job.Data) - data := []byte(strings.ToUpper(string(job.Data))) - return data, nil +func ToUpper(job worker.Job) ([]byte, error) { + log.Printf("ToUpper: Data=[%s]\n", job.Data()) + data := []byte(strings.ToUpper(string(job.Data()))) + return data, nil } -func ToUpperDelay10(job *worker.Job) ([]byte, error) { - log.Printf("ToUpperDelay10: Handle=[%s]; UID=[%s], Data=[%s]\n", - job.Handle, job.UniqueId, job.Data) - time.Sleep(10 * time.Second) - data := []byte(strings.ToUpper(string(job.Data))) - return data, nil +func ToUpperDelay10(job worker.Job) ([]byte, error) { + log.Printf("ToUpper: Data=[%s]\n", job.Data()) + time.Sleep(10 * time.Second) + data := []byte(strings.ToUpper(string(job.Data()))) + return data, nil } - - func main() { - log.Println("Starting ...") - defer log.Println("Shutdown complete!") - w := worker.New(worker.Unlimited) - defer w.Close() - w.ErrHandler = func(e error) { - log.Println(e) - if e == worker.ErrConnection { - proc, err := os.FindProcess(os.Getpid()) - if err != nil { - log.Println(err) - } - if err := proc.Signal(os.Interrupt); err != nil { - log.Println(err) - } - } - } - w.JobHandler = func(job *worker.Job) error { - log.Printf("H=%s, UID=%s, Data=%s, DataType=%d\n", job.Handle, - job.UniqueId, job.Data, job.DataType) - return nil - } - w.AddServer("127.0.0.1:4730") - w.AddFunc("ToUpper", ToUpper, worker.Immediately) - w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) - w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) - w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately) - w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately) - go w.Work() - sh := signal.NewHandler() - sh.Bind(os.Interrupt, func() bool {return true}) - sh.Loop() + log.Println("Starting ...") + defer log.Println("Shutdown complete!") + w := worker.New(worker.Unlimited) + defer w.Close() + w.ErrorHandler = func(e error) { + log.Println(e) + if e == worker.ErrConnection { + proc, err := os.FindProcess(os.Getpid()) + if err != nil { + log.Println(err) + } + if err := proc.Signal(os.Interrupt); err != nil { + log.Println(err) + } + } + } + w.JobHandler = func(job worker.Job) error { + log.Printf("Data=%s\n", job.Data()) + return nil + } + w.AddServer("tcp4", "127.0.0.1:4730") + w.AddFunc("ToUpper", ToUpper, worker.Immediately) + w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) + w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) + w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately) + w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately) + if err := w.Ready(); err != nil { + log.Fatal(err) + return + } + go w.Work() + sh := signal.NewHandler() + sh.Bind(os.Interrupt, func() bool { return true }) + sh.Loop() } diff --git a/gearman_test.go b/gearman_test.go index b8d41d4..fbb6dd6 100644 --- a/gearman_test.go +++ b/gearman_test.go @@ -54,6 +54,10 @@ func TestJobs(t *testing.T) { w.ErrorHandler = func(e error) { t.Error(e) } + if err := w.Ready(); err != nil { + t.Error(err) + return + } go w.Work() t.Log("Worker is running...") diff --git a/worker/agent.go b/worker/agent.go index 7d4fa56..961f9e0 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -11,17 +11,16 @@ import ( // The agent of job server. type agent struct { - conn net.Conn - worker *Worker - in chan []byte - net, addr string - isConn bool + conn net.Conn + worker *Worker + in chan []byte + net, addr string } // Create the agent of job server. func newAgent(net, addr string, worker *Worker) (a *agent, err error) { a = &agent{ - net: net, + net: net, addr: addr, worker: worker, in: make(chan []byte, QUEUE_SIZE), @@ -34,44 +33,16 @@ func (a *agent) Connect() (err error) { if err != nil { return } - a.isConn = true + go a.work() return } -func (a *agent) Work() { - go a.readLoop() - - var resp *Response +func (a *agent) work() { + var inpack *inPack var l int var err error var data, leftdata []byte - for data = range a.in { - if len(leftdata) > 0 { // some data left for processing - data = append(leftdata, data...) - } - l = len(data) - if l < MIN_PACKET_LEN { // not enough data - leftdata = data - continue - } - if resp, l, err = decodeResponse(data); err != nil { - a.worker.err(err) - continue - } - leftdata = nil - resp.agentId = a.net + a.addr - a.worker.in <- resp - if len(data) > l { - leftdata = data[l:] - } - } -} - -// read data from socket -func (a *agent) readLoop() { - var data []byte - var err error - for a.isConn { + for { if data, err = a.read(BUFFER_SIZE); err != nil { if err == ErrConnClosed { break @@ -79,15 +50,42 @@ func (a *agent) readLoop() { a.worker.err(err) continue } - a.in <- data + if len(leftdata) > 0 { // some data left for processing + data = append(leftdata, data...) + } + if len(data) < MIN_PACKET_LEN { // not enough data + leftdata = data + continue + } + if inpack, l, err = decodeInPack(data); err != nil { + a.worker.err(err) + continue + } + leftdata = nil + inpack.a = a + a.worker.in <- inpack + if len(data) > l { + leftdata = data[l:] + } } - close(a.in) } func (a *agent) Close() { a.conn.Close() } +func (a *agent) Grab() { + outpack := getOutPack() + outpack.dataType = GRAB_JOB_UNIQ + a.write(outpack) +} + +func (a *agent) PreSleep() { + outpack := getOutPack() + outpack.dataType = PRE_SLEEP + a.write(outpack) +} + // read length bytes from the socket func (a *agent) read(length int) (data []byte, err error) { n := 0 @@ -95,13 +93,11 @@ func (a *agent) read(length int) (data []byte, err error) { // read until data can be unpacked for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n { if n, err = a.conn.Read(buf); err != nil { - if !a.isConn { - err = ErrConnClosed - return - } if err == io.EOF && n == 0 { if data == nil { err = ErrConnection + } else { + err = ErrConnClosed } } return @@ -115,9 +111,9 @@ func (a *agent) read(length int) (data []byte, err error) { } // Internal write the encoded job. -func (a *agent) write(req *request) (err error) { +func (a *agent) write(outpack *outPack) (err error) { var n int - buf := req.Encode() + buf := outpack.Encode() for i := 0; i < len(buf); i += n { n, err = a.conn.Write(buf[i:]) if err != nil { diff --git a/worker/common.go b/worker/common.go index 8fbcf10..991bff4 100644 --- a/worker/common.go +++ b/worker/common.go @@ -41,6 +41,7 @@ const ( STATUS_RES = 20 SET_CLIENT_ID = 22 CAN_DO_TIMEOUT = 23 + ALL_YOURS = 24 WORK_EXCEPTION = 25 WORK_DATA = 28 WORK_WARNING = 29 diff --git a/worker/inpack.go b/worker/inpack.go new file mode 100644 index 0000000..25d9da4 --- /dev/null +++ b/worker/inpack.go @@ -0,0 +1,108 @@ +// Copyright 2011 Xing Xing +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + "bytes" + "encoding/binary" + "fmt" + "strconv" +) + +// Worker side job +type inPack struct { + dataType uint32 + data []byte + handle, uniqueId, fn string + a *agent +} + +// Create a new job +func getInPack() *inPack { + return &inPack{} +} + +func (inpack *inPack) Data() []byte { + return inpack.data +} + +// Send some datas to client. +// Using this in a job's executing. +func (inpack *inPack) SendData(data []byte) { + outpack := getOutPack() + outpack.dataType = WORK_DATA + hl := len(inpack.handle) + l := hl + len(data) + 1 + outpack.data = getBuffer(l) + copy(outpack.data, []byte(inpack.handle)) + copy(outpack.data[hl+1:], data) + inpack.a.write(outpack) +} + +func (inpack *inPack) SendWarning(data []byte) { + outpack := getOutPack() + outpack.dataType = WORK_WARNING + hl := len(inpack.handle) + l := hl + len(data) + 1 + outpack.data = getBuffer(l) + copy(outpack.data, []byte(inpack.handle)) + copy(outpack.data[hl+1:], data) + inpack.a.write(outpack) +} + +// Update status. +// Tall client how many percent job has been executed. +func (inpack *inPack) UpdateStatus(numerator, denominator int) { + n := []byte(strconv.Itoa(numerator)) + d := []byte(strconv.Itoa(denominator)) + outpack := getOutPack() + outpack.dataType = WORK_STATUS + hl := len(inpack.handle) + nl := len(n) + dl := len(d) + outpack.data = getBuffer(hl + nl + dl + 3) + copy(outpack.data, []byte(inpack.handle)) + copy(outpack.data[hl+1:], n) + copy(outpack.data[hl+nl+2:], d) + inpack.a.write(outpack) +} + +// Decode job from byte slice +func decodeInPack(data []byte) (inpack *inPack, l int, err error) { + if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes + err = fmt.Errorf("Invalid data: %V", data) + return + } + dl := int(binary.BigEndian.Uint32(data[8:12])) + dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN] + if len(dt) != int(dl) { // length not equal + err = fmt.Errorf("Invalid data: %V", data) + return + } + inpack = getInPack() + inpack.dataType = binary.BigEndian.Uint32(data[4:8]) + switch inpack.dataType { + case JOB_ASSIGN: + s := bytes.SplitN(dt, []byte{'\x00'}, 3) + if len(s) == 3 { + inpack.handle = string(s[0]) + inpack.fn = string(s[1]) + inpack.data = s[2] + } + case JOB_ASSIGN_UNIQ: + s := bytes.SplitN(dt, []byte{'\x00'}, 4) + if len(s) == 4 { + inpack.handle = string(s[0]) + inpack.fn = string(s[1]) + inpack.uniqueId = string(s[2]) + inpack.data = s[3] + } + default: + inpack.data = dt + } + l = dl + MIN_PACKET_LEN + return +} diff --git a/worker/inpack_test.go b/worker/inpack_test.go new file mode 100644 index 0000000..75b9ba5 --- /dev/null +++ b/worker/inpack_test.go @@ -0,0 +1,62 @@ +package worker + +import ( + "bytes" + "testing" +) + +var ( + inpackcases = map[uint32]map[string]string{ + NOOP: map[string]string{ + "src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00", + }, + NO_JOB: map[string]string{ + "src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00", + }, + JOB_ASSIGN: map[string]string{ + "src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz", + "handle": "a", + "fn": "b", + "data": "xyz", + }, + JOB_ASSIGN_UNIQ: map[string]string{ + "src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz", + "handle": "a", + "fn": "b", + "uid": "c", + "data": "xyz", + }, + } +) + +func TestInPack(t *testing.T) { + for k, v := range inpackcases { + inpack, _, err := decodeInPack([]byte(v["src"])) + if err != nil { + t.Error(err) + } + if inpack.dataType != k { + t.Errorf("DataType: %d expected, %d got.", k, inpack.dataType) + } + if handle, ok := v["handle"]; ok { + if inpack.handle != handle { + t.Errorf("Handle: %s expected, %s got.", handle, inpack.handle) + } + } + if fn, ok := v["fn"]; ok { + if inpack.fn != fn { + t.Errorf("FuncName: %s expected, %s got.", fn, inpack.fn) + } + } + if uid, ok := v["uid"]; ok { + if inpack.uniqueId != uid { + t.Errorf("UID: %s expected, %s got.", uid, inpack.uniqueId) + } + } + if data, ok := v["data"]; ok { + if bytes.Compare([]byte(data), inpack.data) != 0 { + t.Errorf("UID: %v expected, %v got.", data, inpack.data) + } + } + } +} diff --git a/worker/job.go b/worker/job.go index af032df..4f9950a 100644 --- a/worker/job.go +++ b/worker/job.go @@ -1,68 +1,8 @@ package worker -import ( - "strconv" -) - type Job interface { Data() []byte SendWarning(data []byte) SendData(data []byte) UpdateStatus(numerator, denominator int) } - -type _job struct { - a *agent - Handle string - data []byte -} - -func getJob() *_job { - return &_job{} -} - -func (j *_job) Data() []byte { - return j.data -} - -// Send some datas to client. -// Using this in a job's executing. -func (j *_job) SendData(data []byte) { - req := getRequest() - req.DataType = WORK_DATA - hl := len(j.Handle) - l := hl + len(data) + 1 - req.Data = getBuffer(l) - copy(req.Data, []byte(j.Handle)) - copy(req.Data[hl + 1:], data) - j.a.write(req) -} - -func (j *_job) SendWarning(data []byte) { - req := getRequest() - req.DataType = WORK_WARNING - hl := len(j.Handle) - l := hl + len(data) + 1 - req.Data = getBuffer(l) - copy(req.Data, []byte(j.Handle)) - copy(req.Data[hl + 1:], data) - j.a.write(req) -} - -// Update status. -// Tall client how many percent job has been executed. -func (j *_job) UpdateStatus(numerator, denominator int) { - n := []byte(strconv.Itoa(numerator)) - d := []byte(strconv.Itoa(denominator)) - req := getRequest() - req.DataType = WORK_STATUS - hl := len(j.Handle) - nl := len(n) - dl := len(d) - req.Data = getBuffer(hl + nl + dl + 3) - copy(req.Data, []byte(j.Handle)) - copy(req.Data[hl+1:], n) - copy(req.Data[hl+nl+2:], d) - j.a.write(req) -} - diff --git a/worker/outpack.go b/worker/outpack.go new file mode 100644 index 0000000..9464b8c --- /dev/null +++ b/worker/outpack.go @@ -0,0 +1,53 @@ +// Copyright 2011 Xing Xing +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + // "fmt" + "encoding/binary" +) + +// Worker side job +type outPack struct { + dataType uint32 + data []byte + handle string +} + +func getOutPack() (outpack *outPack) { + // TODO pool + return &outPack{} +} + +// Encode a job to byte slice +func (outpack *outPack) Encode() (data []byte) { + var l int + if outpack.dataType == WORK_FAIL { + l = len(outpack.handle) + } else { + l = len(outpack.data) + if outpack.handle != "" { + l += len(outpack.handle) + 1 + } + } + data = getBuffer(l + MIN_PACKET_LEN) + binary.BigEndian.PutUint32(data[:4], REQ) + binary.BigEndian.PutUint32(data[4:8], outpack.dataType) + binary.BigEndian.PutUint32(data[8:MIN_PACKET_LEN], uint32(l)) + i := MIN_PACKET_LEN + if outpack.handle != "" { + hi := len(outpack.handle) + i + copy(data[i:hi], []byte(outpack.handle)) + if outpack.dataType != WORK_FAIL { + data[hi] = '\x00' + } + i = hi + 1 + } + if outpack.dataType != WORK_FAIL { + copy(data[i:], outpack.data) + } + return +} diff --git a/worker/outpack_test.go b/worker/outpack_test.go new file mode 100644 index 0000000..dca1d8e --- /dev/null +++ b/worker/outpack_test.go @@ -0,0 +1,83 @@ +package worker + +import ( + "bytes" + "testing" +) + +var ( + outpackcases = map[uint32]map[string]string{ + CAN_DO: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a", + "data": "a", + }, + CAN_DO_TIMEOUT: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01", + "data": "a\x00\x00\x00\x00\x01", + }, + CANT_DO: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a", + "data": "a", + }, + RESET_ABILITIES: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00", + }, + PRE_SLEEP: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00", + }, + GRAB_JOB: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x09\x00\x00\x00\x00", + }, + GRAB_JOB_UNIQ: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x1E\x00\x00\x00\x00", + }, + WORK_DATA: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x1C\x00\x00\x00\x03a\x00b", + "data": "a\x00b", + }, + WORK_WARNING: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b", + "data": "a\x00b", + }, + WORK_STATUS: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100", + "data": "a\x0050\x00100", + }, + WORK_COMPLETE: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b", + "data": "a\x00b", + }, + WORK_FAIL: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a", + "handle": "a", + }, + WORK_EXCEPTION: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b", + "data": "a\x00b", + }, + SET_CLIENT_ID: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a", + "data": "a", + }, + ALL_YOURS: map[string]string{ + "src": "\x00REQ\x00\x00\x00\x18\x00\x00\x00\x00", + }, + } +) + +func TestOutPack(t *testing.T) { + for k, v := range outpackcases { + outpack := getOutPack() + outpack.dataType = k + if handle, ok := v["handle"]; ok { + outpack.handle = handle + } + if data, ok := v["data"]; ok { + outpack.data = []byte(data) + } + data := outpack.Encode() + if bytes.Compare([]byte(v["src"]), data) != 0 { + t.Errorf("%d: %X expected, %X got.", k, v["src"], data) + } + } +} diff --git a/worker/request.go b/worker/request.go deleted file mode 100644 index 5539ff9..0000000 --- a/worker/request.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2011 Xing Xing -// All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package worker - -import ( - "encoding/binary" -) - -// Worker side job -type request struct { - DataType uint32 - Data []byte - Handle, UniqueId, Fn string -} - -func getRequest() (req *request) { - // TODO pool - return &request{} -} - -// Encode a job to byte slice -func (req *request) Encode() (data []byte) { - var l int - if req.DataType == WORK_FAIL { - l = len(req.Handle) - } else { - l = len(req.Data) - if req.Handle != "" { - l += len(req.Handle) + 1 - } - } - data = getBuffer(l + MIN_PACKET_LEN) - binary.BigEndian.PutUint32(data[:4], REQ) - binary.BigEndian.PutUint32(data[4:8], req.DataType) - binary.BigEndian.PutUint32(data[8:MIN_PACKET_LEN], uint32(l)) - i := MIN_PACKET_LEN - if req.Handle != "" { - hi := len(req.Handle) + i - copy(data[i:hi], []byte(req.Handle)) - if req.DataType != WORK_FAIL { - data[hi] = '\x00' - } - i = i + hi - } - copy(data[i:], req.Data) - return -} diff --git a/worker/response.go b/worker/response.go deleted file mode 100644 index d528474..0000000 --- a/worker/response.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2011 Xing Xing -// All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package worker - -import ( - "bytes" - "fmt" - "encoding/binary" -) - -// Worker side job -type Response struct { - DataType uint32 - Data []byte - Handle, UniqueId, Fn string - agentId string -} - -// Create a new job -func getResponse() (resp *Response) { - return &Response{} -} - -// Decode job from byte slice -func decodeResponse(data []byte) (resp *Response, l int, err error) { - if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes - err = fmt.Errorf("Invalid data: %V", data) - return - } - dl := int(binary.BigEndian.Uint32(data[8:12])) - dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN] - if len(dt) != int(dl) { // length not equal - err = fmt.Errorf("Invalid data: %V", data) - return - } - resp = getResponse() - resp.DataType = binary.BigEndian.Uint32(data[4:8]) - switch resp.DataType { - case JOB_ASSIGN: - s := bytes.SplitN(dt, []byte{'\x00'}, 3) - if len(s) == 3 { - resp.Handle = string(s[0]) - resp.Fn = string(s[1]) - resp.Data = s[2] - } - case JOB_ASSIGN_UNIQ: - s := bytes.SplitN(dt, []byte{'\x00'}, 4) - if len(s) == 4 { - resp.Handle = string(s[0]) - resp.Fn = string(s[1]) - resp.UniqueId = string(s[2]) - resp.Data = s[3] - } - default: - resp.Data = dt - } - l = dl + MIN_PACKET_LEN - return -} diff --git a/worker/worker.go b/worker/worker.go index 82cc18b..ff5278d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,16 +5,13 @@ package worker import ( - "fmt" - "time" - "sync" "encoding/binary" + "fmt" + "sync" + "time" ) const ( - Unlimited = 0 - OneByOne = 1 - Immediately = 0 ) @@ -22,7 +19,7 @@ const ( Worker side api for gearman usage: -w = worker.New(worker.Unlimited) +w = worker.New() w.AddFunction("foobar", foobar) w.AddServer("127.0.0.1:4730") w.Work() // Enter the worker's main loop @@ -37,28 +34,24 @@ func foobar(job *Job) (data []byte, err os.Error) { } */ type Worker struct { - agents map[string]*agent + agents []*agent funcs JobFuncs - in chan *Response + in chan *inPack running bool - limit chan bool Id string // assign a ErrFunc to handle errors ErrorHandler ErrorHandler - JobHandler JobHandler - mutex sync.Mutex + JobHandler JobHandler + mutex sync.Mutex } // Get a new worker -func New(l int) (worker *Worker) { +func New() (worker *Worker) { worker = &Worker{ - agents: make(map[string]*agent, QUEUE_SIZE), + agents: make([]*agent, 0), funcs: make(JobFuncs), - in: make(chan *Response, QUEUE_SIZE), - } - if l != Unlimited { - worker.limit = make(chan bool, l) + in: make(chan *inPack, QUEUE_SIZE), } return } @@ -78,16 +71,16 @@ func (worker *Worker) AddServer(net, addr string) (err error) { if err != nil { return err } - worker.agents[net + addr] = a + worker.agents = append(worker.agents, a) return } // Write a job to job server. // Here, the job's mean is not the oraginal mean. // Just looks like a network package for job's result or tell job server, there was a fail. -func (worker *Worker) broadcast(req *request) { +func (worker *Worker) broadcast(outpack *outPack) { for _, v := range worker.agents { - v.write(req) + v.write(outpack) } } @@ -110,19 +103,19 @@ func (worker *Worker) AddFunc(funcname string, // inner add function func (worker *Worker) addFunc(funcname string, timeout uint32) { - req := getRequest() + outpack := getOutPack() if timeout == 0 { - req.DataType = CAN_DO - req.Data = []byte(funcname) + outpack.dataType = CAN_DO + outpack.data = []byte(funcname) } else { - req.DataType = CAN_DO_TIMEOUT + outpack.dataType = CAN_DO_TIMEOUT l := len(funcname) - req.Data = getBuffer(l + 5) - copy(req.Data, []byte(funcname)) - req.Data[l] = '\x00' - binary.BigEndian.PutUint32(req.Data[l + 1:], timeout) + outpack.data = getBuffer(l + 5) + copy(outpack.data, []byte(funcname)) + outpack.data[l] = '\x00' + binary.BigEndian.PutUint32(outpack.data[l+1:], timeout) } - worker.broadcast(req) + worker.broadcast(outpack) } // Remove a function. @@ -141,63 +134,57 @@ func (worker *Worker) RemoveFunc(funcname string) (err error) { // inner remove function func (worker *Worker) removeFunc(funcname string) { - req := getRequest() - req.DataType = CANT_DO - req.Data = []byte(funcname) - worker.broadcast(req) + outpack := getOutPack() + outpack.dataType = CANT_DO + outpack.data = []byte(funcname) + worker.broadcast(outpack) } -func (worker *Worker) dealResp(resp *Response) { - defer func() { - if worker.running && worker.limit != nil { - <-worker.limit - } - }() - switch resp.DataType { +func (worker *Worker) handleInPack(inpack *inPack) { + switch inpack.dataType { + case NO_JOB: + inpack.a.PreSleep() + case NOOP: + inpack.a.Grab() case ERROR: - worker.err(GetError(resp.Data)) + worker.err(GetError(inpack.data)) case JOB_ASSIGN, JOB_ASSIGN_UNIQ: - if err := worker.exec(resp); err != nil { + if err := worker.exec(inpack); err != nil { worker.err(err) } default: - worker.handleResponse(resp) + worker.customeHandler(inpack) } } +func (worker *Worker) Ready() (err error) { + for _, v := range worker.agents { + if err = v.Connect(); err != nil { + return + } + } + for funcname, f := range worker.funcs { + worker.addFunc(funcname, f.timeout) + } + return +} + // Main loop func (worker *Worker) Work() { - defer func() { - for _, v := range worker.agents { - v.Close() - } - }() worker.running = true for _, v := range worker.agents { - v.Connect() - go v.Work() + v.Grab() } - worker.Reset() - for funcname, f := range worker.funcs { - worker.addFunc(funcname, f.timeout) - } - var resp *Response - for resp = range worker.in { - fmt.Println(resp) - go worker.dealResp(resp) + var inpack *inPack + for inpack = range worker.in { + go worker.handleInPack(inpack) } } // job handler -func (worker *Worker) handleResponse(resp *Response) { +func (worker *Worker) customeHandler(inpack *inPack) { if worker.JobHandler != nil { - job := getJob() - job.a = worker.agents[resp.agentId] - job.Handle = resp.Handle - if resp.DataType == ECHO_RES { - job.data = resp.Data - } - if err := worker.JobHandler(job); err != nil { + if err := worker.JobHandler(inpack); err != nil { worker.err(err) } } @@ -207,39 +194,36 @@ func (worker *Worker) handleResponse(resp *Response) { func (worker *Worker) Close() { worker.running = false close(worker.in) - if worker.limit != nil { - close(worker.limit) - } } // Send a something out, get the samething back. func (worker *Worker) Echo(data []byte) { - req := getRequest() - req.DataType = ECHO_REQ - req.Data = data - worker.broadcast(req) + outpack := getOutPack() + outpack.dataType = ECHO_REQ + outpack.data = data + worker.broadcast(outpack) } // Remove all of functions. // Both from the worker or job servers. func (worker *Worker) Reset() { - req := getRequest() - req.DataType = RESET_ABILITIES - worker.broadcast(req) + outpack := getOutPack() + outpack.dataType = RESET_ABILITIES + worker.broadcast(outpack) worker.funcs = make(JobFuncs) } // Set the worker's unique id. func (worker *Worker) SetId(id string) { worker.Id = id - req := getRequest() - req.DataType = SET_CLIENT_ID - req.Data = []byte(id) - worker.broadcast(req) + outpack := getOutPack() + outpack.dataType = SET_CLIENT_ID + outpack.data = []byte(id) + worker.broadcast(outpack) } // Execute the job. And send back the result. -func (worker *Worker) exec(resp *Response) (err error) { +func (worker *Worker) exec(inpack *inPack) (err error) { defer func() { if r := recover(); r != nil { if e, ok := r.(error); ok { @@ -249,34 +233,33 @@ func (worker *Worker) exec(resp *Response) (err error) { } } }() - f, ok := worker.funcs[resp.Fn] + f, ok := worker.funcs[inpack.fn] if !ok { - return fmt.Errorf("The function does not exist: %s", resp.Fn) + return fmt.Errorf("The function does not exist: %s", inpack.fn) } var r *result - job := getJob() - job.a = worker.agents[resp.agentId] - job.Handle = resp.Handle if f.timeout == 0 { - d, e := f.f(job) + d, e := f.f(inpack) r = &result{data: d, err: e} } else { - r = execTimeout(f.f, job, time.Duration(f.timeout)*time.Second) + r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second) } - req := getRequest() - if r.err == nil { - req.DataType = WORK_COMPLETE - } else { - if r.data == nil { - req.DataType = WORK_FAIL - } else { - req.DataType = WORK_EXCEPTION - } - err = r.err - } - req.Data = r.data if worker.running { - job.a.write(req) + outpack := getOutPack() + if r.err == nil { + outpack.dataType = WORK_COMPLETE + } else { + if len(r.data) == 0 { + outpack.dataType = WORK_FAIL + } else { + outpack.dataType = WORK_EXCEPTION + } + err = r.err + } + outpack.handle = inpack.handle + outpack.data = r.data + inpack.a.write(outpack) + inpack.a.Grab() } return } diff --git a/worker/worker_test.go b/worker/worker_test.go index c875ff7..bb0666c 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -1,6 +1,9 @@ package worker -import "testing" +import ( + "sync" + "testing" +) var worker *Worker @@ -44,7 +47,20 @@ func TestWorkerRemoveFunc(t *testing.T) { } func TestWork(t *testing.T) { + var wg sync.WaitGroup + worker.JobHandler = func(job Job) error { + t.Logf("%s", job.Data()) + wg.Done() + return nil + } + if err := worker.Ready(); err != nil { + t.Error(err) + return + } go worker.Work() + wg.Add(1) + worker.Echo([]byte("Hello")) + wg.Wait() } func TestWorkerClose(t *testing.T) {