diff --git a/README.md b/README.md index c9a341a..4095447 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,12 @@ Usage w.AddServer("127.0.0.1:4730") w.AddFunc("ToUpper", ToUpper, worker.Immediately) w.AddFunc("ToUpperTimeOut5", ToUpper, 5) - w.Work() + if err := w.Ready(); err != nil { + log.Fatal(err) + return + } + go w.Work() + ## Client diff --git a/example/client/client b/example/client/client new file mode 100755 index 0000000..c01975f Binary files /dev/null and b/example/client/client differ diff --git a/example/client/client.go b/example/client/client.go index bde0532..cb8e4d1 100644 --- a/example/client/client.go +++ b/example/client/client.go @@ -1,45 +1,45 @@ package main import ( - "log" - "sync" - "github.com/mikespook/gearman-go/client" + "github.com/mikespook/gearman-go/client" + "log" + "sync" ) func main() { - var wg sync.WaitGroup - // Set the autoinc id generator - // You can write your own id generator - // by implementing IdGenerator interface. - // client.IdGen = client.NewAutoIncId() + var wg sync.WaitGroup + // Set the autoinc id generator + // You can write your own id generator + // by implementing IdGenerator interface. + // client.IdGen = client.NewAutoIncId() - c, err := client.New("tcp4", "127.0.0.1:4730") - if err != nil { - log.Fatalln(err) - } - defer c.Close() - c.ErrorHandler = func(e error) { - log.Println(e) - } - echo := []byte("Hello\x00 world") - wg.Add(1) - echomsg, err := c.Echo(echo) - if err != nil { - log.Fatalln(err) - } - log.Println(string(echomsg)) - wg.Done() - jobHandler := func(job *client.Response) { - log.Printf("%s", job.Data) - wg.Done() - } - handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) - wg.Add(1) - status, err := c.Status(handle) - if err != nil { - log.Fatalln(err) - } - log.Printf("%t", status) + c, err := client.New("tcp4", "127.0.0.1:4730") + if err != nil { + log.Fatalln(err) + } + defer c.Close() + c.ErrorHandler = func(e error) { + log.Println(e) + } + echo := []byte("Hello\x00 world") + wg.Add(1) + echomsg, err := c.Echo(echo) + if err != nil { + log.Fatalln(err) + } + log.Println(string(echomsg)) + wg.Done() + jobHandler := func(job *client.Response) { + log.Printf("%s", job.Data) + wg.Done() + } + handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) + wg.Add(1) + status, err := c.Status(handle) + if err != nil { + log.Fatalln(err) + } + log.Printf("%t", status) - wg.Wait() + wg.Wait() } diff --git a/example/exec-worker/README.md b/example/exec-worker/README.md deleted file mode 100644 index bfb7d55..0000000 --- a/example/exec-worker/README.md +++ /dev/null @@ -1,7 +0,0 @@ -= Exec Worker - -This program execute shell or php scripts as backend jobs. - -Scripts can communicat with the worker through STDERR using formated output. - -USE THIS AT YOUR OWN RASK!!! diff --git a/example/exec-worker/client/client.php b/example/exec-worker/client/client.php deleted file mode 100644 index 38515f5..0000000 --- a/example/exec-worker/client/client.php +++ /dev/null @@ -1,40 +0,0 @@ -addServer(); -$data = array( - 'Name' => 'foobar', - 'Args' => array("0", "1", "2", "3"), -); - -$c = isset($_SERVER['argv'][1]) ? $_SERVER['argv'][1] : 10; - -for ($i = 0; $i < $c; $i ++) { - - # run reverse client in the background - $job_handle = $gmclient->doBackground("execphp", json_encode($data)); - - if ($gmclient->returnCode() != GEARMAN_SUCCESS) { - echo "bad return code\n"; - exit; - } -} -/* -$data = array( - 'Name' => 'notexists', - 'Args' => array("0", "1", "2", "3"), -); - -# run reverse client in the background -$job_handle = $gmclient->doBackground("exec", json_encode($data)); - -if ($gmclient->returnCode() != GEARMAN_SUCCESS) -{ - echo "bad return code\n"; - exit; -} - */ -echo "done!\n"; diff --git a/example/exec-worker/exec.go b/example/exec-worker/exec.go deleted file mode 100644 index 760f97b..0000000 --- a/example/exec-worker/exec.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2012 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a commercial -// license that can be found in the LICENSE file. - -package main - -import ( - "io" - "bytes" - "os/exec" - "encoding/json" - "github.com/mikespook/golib/log" - "github.com/mikespook/gearman-go/worker" -) - -type outData struct { - Numerator, Denominator int - Warning bool - Data []byte - Debug string -} - -type ShExec struct { - Name, basedir string - Args []string - *exec.Cmd - job *worker.Job - Logger *log.Logger -} - -func NewShExec(basedir string, job *worker.Job) (sh *ShExec, err error) { - sh = &ShExec{ - basedir: basedir, - job: job, - Args: make([]string, 0), - } - if err = sh.parse(job.Data); err != nil { - return nil, err - } - return -} - -func (sh *ShExec) parse(data []byte) (err error) { - if err = json.Unmarshal(data, sh); err != nil { - return - } - return -} - -func (sh *ShExec) Append(args ... string) { - sh.Args = append(sh.Args, args ...) -} - -func (sh *ShExec) Prepend(args ... string) { - sh.Args = append(args, sh.Args ...) -} - -func (sh *ShExec) Exec() (rslt []byte, err error){ - sh.Logger.Debugf("Executing: Handle=%s, Exec=%s, Args=%v", - sh.job.Handle, sh.Name, sh.Args) - sh.Cmd = exec.Command(sh.Name, sh.Args ... ) - go func() { - if ok := <-sh.job.Canceled(); ok { - sh.Cmd.Process.Kill() - } - }() - sh.Cmd.Dir = sh.basedir - var buf bytes.Buffer - sh.Cmd.Stdout = &buf - var errPipe io.ReadCloser - if errPipe, err = sh.Cmd.StderrPipe(); err != nil { - return nil, err - } - defer errPipe.Close() - go sh.processErr(errPipe) - if err = sh.Cmd.Run(); err != nil { - return nil, err - } - rslt = buf.Bytes() - return -} - -func (sh *ShExec) processErr(pipe io.ReadCloser) { - result := make([]byte, 1024) - var more []byte - for { - n, err := pipe.Read(result) - if err != nil { - if err != io.EOF { - sh.job.UpdateData([]byte(err.Error()), true) - } - return - } - if more != nil { - result = append(more, result[:n]...) - } else { - result = result[:n] - } - if n < 1024 { - var out outData - if err := json.Unmarshal(result, &out); err != nil { - sh.job.UpdateData([]byte(result), true) - return - } - if out.Debug == "" { - if out.Data != nil { - sh.job.UpdateData(out.Data, out.Warning) - } - if out.Numerator != 0 || out.Denominator != 0 { - sh.job.UpdateStatus(out.Numerator, out.Denominator) - } - } else { - sh.Logger.Debugf("Debug: Handle=%s, Exec=%s, Args=%v, Data=%s", - sh.job.Handle, sh.Name, sh.Args, out.Debug) - } - more = nil - } else { - more = result - } - } -} - diff --git a/example/exec-worker/func.go b/example/exec-worker/func.go deleted file mode 100644 index 383861c..0000000 --- a/example/exec-worker/func.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2012 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a commercial -// license that can be found in the LICENSE file. - -package main - -import ( - "github.com/mikespook/golib/log" - "github.com/mikespook/gearman-go/worker" -) - -func execShell(job *worker.Job) (result []byte, err error) { - log.Messagef("[Shell]Received: Handle=%s", job.Handle) - defer log.Messagef("[Shell]Finished: Handle=%s", job.Handle) - log.Debugf("[Shell]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data) - var sh *ShExec - if sh, err = NewShExec(*basedir, job); err != nil { - return - } - sh.Logger = log.DefaultLogger - return sh.Exec() -} - -func execPHP(job *worker.Job) (result []byte, err error) { - log.Messagef("[PHP]Received: Handle=%s", job.Handle) - defer log.Messagef("[PHP]Finished: Handle=%s", job.Handle) - log.Debugf("[PHP]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data) - var sh *ShExec - if sh, err = NewShExec(*basedir, job); err != nil { - return - } - sh.Prepend("-f", sh.Name + ".php") - sh.Name = "php" - sh.Logger = log.DefaultLogger - return sh.Exec() -} - diff --git a/example/exec-worker/lib/X.php b/example/exec-worker/lib/X.php deleted file mode 100644 index 2079009..0000000 --- a/example/exec-worker/lib/X.php +++ /dev/null @@ -1,40 +0,0 @@ -stderr = fopen("php://stderr", "r"); - } - - public function sendMsg($msg, $warning = false, $numerator = 0, $denominator = 0) { - $result = array( - 'Numerator' => $numerator, - 'Denominator' => $denominator, - 'Warning' => $warning, - 'Data' => $msg, - ); - fwrite($this->stderr, json_encode($result)); - } - - public function debug($msg) { - $result = array( - 'Debug' => $msg, - ); - fwrite($this->stderr, json_encode($result)); - } - - public function close() { - fclose($this->stderr); - } - - public function argv($index) { - $argv = $_SERVER['argv']; - return isset($argv[$index]) ? $argv[$index] : null; - } - - public function argc() { - $argc = $_SERVER['argc']; - return $argc; - } -} diff --git a/example/exec-worker/lib/bootstrap.php b/example/exec-worker/lib/bootstrap.php deleted file mode 100644 index 788673d..0000000 --- a/example/exec-worker/lib/bootstrap.php +++ /dev/null @@ -1,11 +0,0 @@ - 1) { - include implode(DIRECTORY_SEPARATOR, $names) . '.php'; - } else { - include $class . '.php'; - } -} - -spl_autoload_register('autoloader'); diff --git a/example/exec-worker/log.go b/example/exec-worker/log.go deleted file mode 100644 index bc1aaae..0000000 --- a/example/exec-worker/log.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2012 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a commercial -// license that can be found in the LICENSE file. - -package main - -import ( - "github.com/mikespook/golib/log" - "flag" - "strings" -) - -var ( - logfile = flag.String("log", "", - "Log file to write errors and information to." + - " Empty string output to STDOUT.") - loglevel = flag.String("log-level", "all", "Log level to record." + - " Values 'error', 'warning', 'message', 'debug', 'all' and 'none'" + - " are accepted. Use '|' to combine more levels.") -) - -func initLog() { - level := log.LogNone - levels := strings.SplitN(*loglevel, "|", -1) - for _, v := range levels { - switch v { - case "none": - level = level | log.LogNone - break - case "error": - level = level | log.LogError - case "warning": - level = level | log.LogWarning - case "message": - level = level | log.LogMessage - case "debug": - level = level | log.LogDebug - case "all": - level = log.LogAll - default: - } - } - if err := log.Init(*logfile, level); err != nil { - log.Error(err) - } -} diff --git a/example/exec-worker/script/default.php b/example/exec-worker/script/default.php deleted file mode 100644 index c6bd04a..0000000 --- a/example/exec-worker/script/default.php +++ /dev/null @@ -1,8 +0,0 @@ -sendMsg("test"); -$x->debug("debug message"); -sleep(10); -$x->close(); -exit(0); diff --git a/example/exec-worker/worker.go b/example/exec-worker/worker.go deleted file mode 100644 index 271639a..0000000 --- a/example/exec-worker/worker.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2012 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a commercial -// license that can be found in the LICENSE file. - -package main - -import ( - "os" - "flag" - "time" - "github.com/mikespook/golib/log" - "github.com/mikespook/golib/pid" - "github.com/mikespook/golib/prof" - "github.com/mikespook/golib/signal" - "github.com/mikespook/gearman-go/worker" -) - -var ( - pidfile = flag.String("pid", "/run/seedworker.pid", - "PID file to write pid") - proffile = flag.String("prof", "", "Profiling file") - dumpfile = flag.String("dump", "", "Heap dumping file") - dumptime = flag.Int("dumptime", 5, "Heap dumping time interval") - - joblimit = flag.Int("job-limit", worker.Unlimited, - "Maximum number of concurrently executing job." + - " Zero is unlimited.") - - basedir = flag.String("basedir", "", "Working directory of the php scripts.") - timeout = flag.Uint("timeout", 30, "Executing time out.") - gearmand = flag.String("gearmand", "127.0.0.1:4730", "Address and port of gearmand") -) - -func init() { - - flag.Parse() - initLog() - - if *basedir == "" { - *basedir = "./script/" - } -} - -func main() { - log.Message("Starting ... ") - defer func() { - time.Sleep(time.Second) - log.Message("Shutdown complate!") - }() - - // init profiling file - if *proffile != "" { - log.Debugf("Open a profiling file: %s", *proffile) - if err := prof.Start(*proffile); err != nil { - log.Error(err) - } else { - defer prof.Stop() - } - } - - // init heap dumping file - if *dumpfile != "" { - log.Debugf("Open a heap dumping file: %s", *dumpfile) - if err := prof.NewDump(*dumpfile); err != nil { - log.Error(err) - } else { - defer prof.CloseDump() - go func() { - for prof.Dumping { - time.Sleep(time.Duration(*dumptime) * time.Second) - prof.Dump() - } - }() - } - } - - // init pid file - log.Debugf("Open a pid file: %s", *pidfile) - if pidFile, err := pid.New(*pidfile); err != nil { - log.Error(err) - } else { - defer pidFile.Close() - } - - w := worker.New(*joblimit) - if err := w.AddServer(*gearmand); err != nil { - log.Error(err) - return - } - if err := w.AddFunc("exec", execShell, uint32(*timeout)); err != nil { - log.Error(err) - return - } - if err := w.AddFunc("execphp", execPHP, uint32(*timeout)); err != nil { - log.Error(err) - return - } - defer w.Close() - go w.Work() - - // signal handler - sh := signal.NewHandler() - sh.Bind(os.Interrupt, func() bool {return true}) - sh.Loop() -} diff --git a/example/worker/worker b/example/worker/worker new file mode 100755 index 0000000..c7ea095 Binary files /dev/null and b/example/worker/worker differ diff --git a/example/worker/worker.go b/example/worker/worker.go index a2c924a..d610776 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -45,10 +45,10 @@ func main() { } w.AddServer("tcp4", "127.0.0.1:4730") w.AddFunc("ToUpper", ToUpper, worker.Immediately) - w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) - w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) - w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately) - w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately) + w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) + w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) + w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately) + w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately) if err := w.Ready(); err != nil { log.Fatal(err) return diff --git a/worker/outpack.go b/worker/outpack.go index 4b8ff73..45e7c58 100644 --- a/worker/outpack.go +++ b/worker/outpack.go @@ -6,7 +6,7 @@ package worker import ( -// "fmt" + // "fmt" "encoding/binary" )