From 919dfeaff0e2d584d3ccb3dba33820f0438170db Mon Sep 17 00:00:00 2001 From: mikespook Date: Thu, 22 Nov 2012 17:15:20 +0800 Subject: [PATCH] A new example added --- example/exec-worker/README.md | 7 ++ example/exec-worker/client/client.php | 40 ++++++++ example/exec-worker/exec.go | 123 +++++++++++++++++++++++++ example/exec-worker/func.go | 38 ++++++++ example/exec-worker/lib/X.php | 40 ++++++++ example/exec-worker/lib/bootstrap.php | 11 +++ example/exec-worker/log.go | 47 ++++++++++ example/exec-worker/script/default.php | 8 ++ example/exec-worker/script/foobar.php | 8 ++ example/exec-worker/worker.go | 106 +++++++++++++++++++++ 10 files changed, 428 insertions(+) create mode 100644 example/exec-worker/README.md create mode 100644 example/exec-worker/client/client.php create mode 100644 example/exec-worker/exec.go create mode 100644 example/exec-worker/func.go create mode 100644 example/exec-worker/lib/X.php create mode 100644 example/exec-worker/lib/bootstrap.php create mode 100644 example/exec-worker/log.go create mode 100644 example/exec-worker/script/default.php create mode 100644 example/exec-worker/script/foobar.php create mode 100644 example/exec-worker/worker.go diff --git a/example/exec-worker/README.md b/example/exec-worker/README.md new file mode 100644 index 0000000..bfb7d55 --- /dev/null +++ b/example/exec-worker/README.md @@ -0,0 +1,7 @@ += Exec Worker + +This program execute shell or php scripts as backend jobs. + +Scripts can communicat with the worker through STDERR using formated output. + +USE THIS AT YOUR OWN RASK!!! diff --git a/example/exec-worker/client/client.php b/example/exec-worker/client/client.php new file mode 100644 index 0000000..38515f5 --- /dev/null +++ b/example/exec-worker/client/client.php @@ -0,0 +1,40 @@ +addServer(); +$data = array( + 'Name' => 'foobar', + 'Args' => array("0", "1", "2", "3"), +); + +$c = isset($_SERVER['argv'][1]) ? $_SERVER['argv'][1] : 10; + +for ($i = 0; $i < $c; $i ++) { + + # run reverse client in the background + $job_handle = $gmclient->doBackground("execphp", json_encode($data)); + + if ($gmclient->returnCode() != GEARMAN_SUCCESS) { + echo "bad return code\n"; + exit; + } +} +/* +$data = array( + 'Name' => 'notexists', + 'Args' => array("0", "1", "2", "3"), +); + +# run reverse client in the background +$job_handle = $gmclient->doBackground("exec", json_encode($data)); + +if ($gmclient->returnCode() != GEARMAN_SUCCESS) +{ + echo "bad return code\n"; + exit; +} + */ +echo "done!\n"; diff --git a/example/exec-worker/exec.go b/example/exec-worker/exec.go new file mode 100644 index 0000000..49f1fbb --- /dev/null +++ b/example/exec-worker/exec.go @@ -0,0 +1,123 @@ +// Copyright 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a commercial +// license that can be found in the LICENSE file. + +package main + +import ( + "io" + "bytes" + "os/exec" + "encoding/json" + "bitbucket.org/mikespook/golib/log" + "bitbucket.org/mikespook/gearman-go/worker" +) + +type outData struct { + Numerator, Denominator int + Warning bool + Data []byte + Debug string +} + +type ShExec struct { + Name, basedir string + Args []string + *exec.Cmd + job *worker.Job + Logger *log.Logger +} + +func NewShExec(basedir string, job *worker.Job) (sh *ShExec, err error) { + sh = &ShExec{ + basedir: basedir, + job: job, + Args: make([]string, 0), + } + if err = sh.parse(job.Data); err != nil { + return nil, err + } + return +} + +func (sh *ShExec) parse(data []byte) (err error) { + if err = json.Unmarshal(data, sh); err != nil { + return + } + return +} + +func (sh *ShExec) Append(args ... string) { + sh.Args = append(sh.Args, args ...) +} + +func (sh *ShExec) Prepend(args ... string) { + sh.Args = append(args, sh.Args ...) +} + +func (sh *ShExec) Exec() (rslt []byte, err error){ + sh.Logger.Debugf("Executing: Handle=%s, Exec=%s, Args=%v", + sh.job.Handle, sh.Name, sh.Args) + sh.Cmd = exec.Command(sh.Name, sh.Args ... ) + go func() { + if ok := <-sh.job.Canceled(); ok { + sh.Cmd.Process.Kill() + } + }() + sh.Cmd.Dir = sh.basedir + var buf bytes.Buffer + sh.Cmd.Stdout = &buf + var errPipe io.ReadCloser + if errPipe, err = sh.Cmd.StderrPipe(); err != nil { + return nil, err + } + defer errPipe.Close() + go sh.processErr(errPipe) + if err = sh.Cmd.Run(); err != nil { + return nil, err + } + rslt = buf.Bytes() + return +} + +func (sh *ShExec) processErr(pipe io.ReadCloser) { + result := make([]byte, 1024) + var more []byte + for { + n, err := pipe.Read(result) + if err != nil { + if err != io.EOF { + sh.job.UpdateData([]byte(err.Error()), true) + } + return + } + if more != nil { + result = append(more, result[:n]...) + } else { + result = result[:n] + } + if n < 1024 { + var out outData + if err := json.Unmarshal(result, &out); err != nil { + sh.job.UpdateData([]byte(result), true) + return + } + if out.Debug == "" { + if out.Data != nil { + sh.job.UpdateData(out.Data, out.Warning) + } + if out.Numerator != 0 || out.Denominator != 0 { + sh.job.UpdateStatus(out.Numerator, out.Denominator) + } + } else { + sh.Logger.Debugf("Debug: Handle=%s, Exec=%s, Args=%v, Data=%s", + sh.job.Handle, sh.Name, sh.Args, out.Debug) + } + more = nil + } else { + more = result + } + } +} + diff --git a/example/exec-worker/func.go b/example/exec-worker/func.go new file mode 100644 index 0000000..26d79f5 --- /dev/null +++ b/example/exec-worker/func.go @@ -0,0 +1,38 @@ +// Copyright 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a commercial +// license that can be found in the LICENSE file. + +package main + +import ( + "bitbucket.org/mikespook/golib/log" + "bitbucket.org/mikespook/gearman-go/worker" +) + +func execShell(job *worker.Job) (result []byte, err error) { + log.Messagef("[Shell]Received: Handle=%s", job.Handle) + defer log.Messagef("[Shell]Finished: Handle=%s", job.Handle) + log.Debugf("[Shell]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data) + var sh *ShExec + if sh, err = NewShExec(*basedir, job); err != nil { + return + } + sh.Logger = log.DefaultLogger + return sh.Exec() +} + +func execPHP(job *worker.Job) (result []byte, err error) { + log.Messagef("[PHP]Received: Handle=%s", job.Handle) + defer log.Messagef("[PHP]Finished: Handle=%s", job.Handle) + log.Debugf("[PHP]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data) + var sh *ShExec + if sh, err = NewShExec(*basedir, job); err != nil { + return + } + sh.Prepend("-f", sh.Name + ".php") + sh.Name = "php" + sh.Logger = log.DefaultLogger + return sh.Exec() +} + diff --git a/example/exec-worker/lib/X.php b/example/exec-worker/lib/X.php new file mode 100644 index 0000000..2079009 --- /dev/null +++ b/example/exec-worker/lib/X.php @@ -0,0 +1,40 @@ +stderr = fopen("php://stderr", "r"); + } + + public function sendMsg($msg, $warning = false, $numerator = 0, $denominator = 0) { + $result = array( + 'Numerator' => $numerator, + 'Denominator' => $denominator, + 'Warning' => $warning, + 'Data' => $msg, + ); + fwrite($this->stderr, json_encode($result)); + } + + public function debug($msg) { + $result = array( + 'Debug' => $msg, + ); + fwrite($this->stderr, json_encode($result)); + } + + public function close() { + fclose($this->stderr); + } + + public function argv($index) { + $argv = $_SERVER['argv']; + return isset($argv[$index]) ? $argv[$index] : null; + } + + public function argc() { + $argc = $_SERVER['argc']; + return $argc; + } +} diff --git a/example/exec-worker/lib/bootstrap.php b/example/exec-worker/lib/bootstrap.php new file mode 100644 index 0000000..788673d --- /dev/null +++ b/example/exec-worker/lib/bootstrap.php @@ -0,0 +1,11 @@ + 1) { + include implode(DIRECTORY_SEPARATOR, $names) . '.php'; + } else { + include $class . '.php'; + } +} + +spl_autoload_register('autoloader'); diff --git a/example/exec-worker/log.go b/example/exec-worker/log.go new file mode 100644 index 0000000..d4ad9dd --- /dev/null +++ b/example/exec-worker/log.go @@ -0,0 +1,47 @@ +// Copyright 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a commercial +// license that can be found in the LICENSE file. + +package main + +import ( + "bitbucket.org/mikespook/golib/log" + "flag" + "strings" +) + +var ( + logfile = flag.String("log", "", + "Log file to write errors and information to." + + " Empty string output to STDOUT.") + loglevel = flag.String("log-level", "all", "Log level to record." + + " Values 'error', 'warning', 'message', 'debug', 'all' and 'none'" + + " are accepted. Use '|' to combine more levels.") +) + +func initLog() { + level := log.LogNone + levels := strings.SplitN(*loglevel, "|", -1) + for _, v := range levels { + switch v { + case "none": + level = level | log.LogNone + break + case "error": + level = level | log.LogError + case "warning": + level = level | log.LogWarning + case "message": + level = level | log.LogMessage + case "debug": + level = level | log.LogDebug + case "all": + level = log.LogAll + default: + } + } + if err := log.Init(*logfile, level); err != nil { + log.Error(err) + } +} diff --git a/example/exec-worker/script/default.php b/example/exec-worker/script/default.php new file mode 100644 index 0000000..c6bd04a --- /dev/null +++ b/example/exec-worker/script/default.php @@ -0,0 +1,8 @@ +sendMsg("test"); +$x->debug("debug message"); +sleep(10); +$x->close(); +exit(0); diff --git a/example/exec-worker/worker.go b/example/exec-worker/worker.go new file mode 100644 index 0000000..39c32eb --- /dev/null +++ b/example/exec-worker/worker.go @@ -0,0 +1,106 @@ +// Copyright 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a commercial +// license that can be found in the LICENSE file. + +package main + +import ( + "os" + "flag" + "time" + "bitbucket.org/mikespook/golib/log" + "bitbucket.org/mikespook/golib/pid" + "bitbucket.org/mikespook/golib/prof" + "bitbucket.org/mikespook/golib/signal" + "bitbucket.org/mikespook/gearman-go/worker" +) + +var ( + pidfile = flag.String("pid", "/run/seedworker.pid", + "PID file to write pid") + proffile = flag.String("prof", "", "Profiling file") + dumpfile = flag.String("dump", "", "Heap dumping file") + dumptime = flag.Int("dumptime", 5, "Heap dumping time interval") + + joblimit = flag.Int("job-limit", worker.Unlimited, + "Maximum number of concurrently executing job." + + " Zero is unlimited.") + + basedir = flag.String("basedir", "", "Working directory of the php scripts.") + timeout = flag.Uint("timeout", 30, "Executing time out.") + gearmand = flag.String("gearmand", "127.0.0.1:4730", "Address and port of gearmand") +) + +func init() { + + flag.Parse() + initLog() + + if *basedir == "" { + *basedir = "./script/" + } +} + +func main() { + log.Message("Starting ... ") + defer func() { + time.Sleep(time.Second) + log.Message("Shutdown complate!") + }() + + // init profiling file + if *proffile != "" { + log.Debugf("Open a profiling file: %s", *proffile) + if err := prof.Start(*proffile); err != nil { + log.Error(err) + } else { + defer prof.Stop() + } + } + + // init heap dumping file + if *dumpfile != "" { + log.Debugf("Open a heap dumping file: %s", *dumpfile) + if err := prof.NewDump(*dumpfile); err != nil { + log.Error(err) + } else { + defer prof.CloseDump() + go func() { + for prof.Dumping { + time.Sleep(time.Duration(*dumptime) * time.Second) + prof.Dump() + } + }() + } + } + + // init pid file + log.Debugf("Open a pid file: %s", *pidfile) + if pidFile, err := pid.New(*pidfile); err != nil { + log.Error(err) + } else { + defer pidFile.Close() + } + + w := worker.New(*joblimit) + if err := w.AddServer(*gearmand); err != nil { + log.Error(err) + return + } + if err := w.AddFunc("exec", execShell, uint32(*timeout)); err != nil { + log.Error(err) + return + } + if err := w.AddFunc("execphp", execPHP, uint32(*timeout)); err != nil { + log.Error(err) + return + } + defer w.Close() + go w.Work() + + // signal handler + sh := signal.NewHandler() + sh.Bind(os.Interrupt, func() bool {return true}) + sh.Loop() +}