A new example added
This commit is contained in:
parent
91b315d61b
commit
919dfeaff0
7
example/exec-worker/README.md
Normal file
7
example/exec-worker/README.md
Normal file
@ -0,0 +1,7 @@
|
||||
= Exec Worker
|
||||
|
||||
This program execute shell or php scripts as backend jobs.
|
||||
|
||||
Scripts can communicat with the worker through STDERR using formated output.
|
||||
|
||||
USE THIS AT YOUR OWN RASK!!!
|
40
example/exec-worker/client/client.php
Normal file
40
example/exec-worker/client/client.php
Normal file
@ -0,0 +1,40 @@
|
||||
<?php
|
||||
|
||||
# create our client object
|
||||
$gmclient= new GearmanClient();
|
||||
|
||||
# add the default server (localhost)
|
||||
$gmclient->addServer();
|
||||
$data = array(
|
||||
'Name' => 'foobar',
|
||||
'Args' => array("0", "1", "2", "3"),
|
||||
);
|
||||
|
||||
$c = isset($_SERVER['argv'][1]) ? $_SERVER['argv'][1] : 10;
|
||||
|
||||
for ($i = 0; $i < $c; $i ++) {
|
||||
|
||||
# run reverse client in the background
|
||||
$job_handle = $gmclient->doBackground("execphp", json_encode($data));
|
||||
|
||||
if ($gmclient->returnCode() != GEARMAN_SUCCESS) {
|
||||
echo "bad return code\n";
|
||||
exit;
|
||||
}
|
||||
}
|
||||
/*
|
||||
$data = array(
|
||||
'Name' => 'notexists',
|
||||
'Args' => array("0", "1", "2", "3"),
|
||||
);
|
||||
|
||||
# run reverse client in the background
|
||||
$job_handle = $gmclient->doBackground("exec", json_encode($data));
|
||||
|
||||
if ($gmclient->returnCode() != GEARMAN_SUCCESS)
|
||||
{
|
||||
echo "bad return code\n";
|
||||
exit;
|
||||
}
|
||||
*/
|
||||
echo "done!\n";
|
123
example/exec-worker/exec.go
Normal file
123
example/exec-worker/exec.go
Normal file
@ -0,0 +1,123 @@
|
||||
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a commercial
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"io"
|
||||
"bytes"
|
||||
"os/exec"
|
||||
"encoding/json"
|
||||
"bitbucket.org/mikespook/golib/log"
|
||||
"bitbucket.org/mikespook/gearman-go/worker"
|
||||
)
|
||||
|
||||
type outData struct {
|
||||
Numerator, Denominator int
|
||||
Warning bool
|
||||
Data []byte
|
||||
Debug string
|
||||
}
|
||||
|
||||
type ShExec struct {
|
||||
Name, basedir string
|
||||
Args []string
|
||||
*exec.Cmd
|
||||
job *worker.Job
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
func NewShExec(basedir string, job *worker.Job) (sh *ShExec, err error) {
|
||||
sh = &ShExec{
|
||||
basedir: basedir,
|
||||
job: job,
|
||||
Args: make([]string, 0),
|
||||
}
|
||||
if err = sh.parse(job.Data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (sh *ShExec) parse(data []byte) (err error) {
|
||||
if err = json.Unmarshal(data, sh); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (sh *ShExec) Append(args ... string) {
|
||||
sh.Args = append(sh.Args, args ...)
|
||||
}
|
||||
|
||||
func (sh *ShExec) Prepend(args ... string) {
|
||||
sh.Args = append(args, sh.Args ...)
|
||||
}
|
||||
|
||||
func (sh *ShExec) Exec() (rslt []byte, err error){
|
||||
sh.Logger.Debugf("Executing: Handle=%s, Exec=%s, Args=%v",
|
||||
sh.job.Handle, sh.Name, sh.Args)
|
||||
sh.Cmd = exec.Command(sh.Name, sh.Args ... )
|
||||
go func() {
|
||||
if ok := <-sh.job.Canceled(); ok {
|
||||
sh.Cmd.Process.Kill()
|
||||
}
|
||||
}()
|
||||
sh.Cmd.Dir = sh.basedir
|
||||
var buf bytes.Buffer
|
||||
sh.Cmd.Stdout = &buf
|
||||
var errPipe io.ReadCloser
|
||||
if errPipe, err = sh.Cmd.StderrPipe(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer errPipe.Close()
|
||||
go sh.processErr(errPipe)
|
||||
if err = sh.Cmd.Run(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rslt = buf.Bytes()
|
||||
return
|
||||
}
|
||||
|
||||
func (sh *ShExec) processErr(pipe io.ReadCloser) {
|
||||
result := make([]byte, 1024)
|
||||
var more []byte
|
||||
for {
|
||||
n, err := pipe.Read(result)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
sh.job.UpdateData([]byte(err.Error()), true)
|
||||
}
|
||||
return
|
||||
}
|
||||
if more != nil {
|
||||
result = append(more, result[:n]...)
|
||||
} else {
|
||||
result = result[:n]
|
||||
}
|
||||
if n < 1024 {
|
||||
var out outData
|
||||
if err := json.Unmarshal(result, &out); err != nil {
|
||||
sh.job.UpdateData([]byte(result), true)
|
||||
return
|
||||
}
|
||||
if out.Debug == "" {
|
||||
if out.Data != nil {
|
||||
sh.job.UpdateData(out.Data, out.Warning)
|
||||
}
|
||||
if out.Numerator != 0 || out.Denominator != 0 {
|
||||
sh.job.UpdateStatus(out.Numerator, out.Denominator)
|
||||
}
|
||||
} else {
|
||||
sh.Logger.Debugf("Debug: Handle=%s, Exec=%s, Args=%v, Data=%s",
|
||||
sh.job.Handle, sh.Name, sh.Args, out.Debug)
|
||||
}
|
||||
more = nil
|
||||
} else {
|
||||
more = result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
38
example/exec-worker/func.go
Normal file
38
example/exec-worker/func.go
Normal file
@ -0,0 +1,38 @@
|
||||
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a commercial
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bitbucket.org/mikespook/golib/log"
|
||||
"bitbucket.org/mikespook/gearman-go/worker"
|
||||
)
|
||||
|
||||
func execShell(job *worker.Job) (result []byte, err error) {
|
||||
log.Messagef("[Shell]Received: Handle=%s", job.Handle)
|
||||
defer log.Messagef("[Shell]Finished: Handle=%s", job.Handle)
|
||||
log.Debugf("[Shell]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data)
|
||||
var sh *ShExec
|
||||
if sh, err = NewShExec(*basedir, job); err != nil {
|
||||
return
|
||||
}
|
||||
sh.Logger = log.DefaultLogger
|
||||
return sh.Exec()
|
||||
}
|
||||
|
||||
func execPHP(job *worker.Job) (result []byte, err error) {
|
||||
log.Messagef("[PHP]Received: Handle=%s", job.Handle)
|
||||
defer log.Messagef("[PHP]Finished: Handle=%s", job.Handle)
|
||||
log.Debugf("[PHP]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data)
|
||||
var sh *ShExec
|
||||
if sh, err = NewShExec(*basedir, job); err != nil {
|
||||
return
|
||||
}
|
||||
sh.Prepend("-f", sh.Name + ".php")
|
||||
sh.Name = "php"
|
||||
sh.Logger = log.DefaultLogger
|
||||
return sh.Exec()
|
||||
}
|
||||
|
40
example/exec-worker/lib/X.php
Normal file
40
example/exec-worker/lib/X.php
Normal file
@ -0,0 +1,40 @@
|
||||
<?php
|
||||
|
||||
class X {
|
||||
private $stderr;
|
||||
|
||||
function __construct() {
|
||||
$this->stderr = fopen("php://stderr", "r");
|
||||
}
|
||||
|
||||
public function sendMsg($msg, $warning = false, $numerator = 0, $denominator = 0) {
|
||||
$result = array(
|
||||
'Numerator' => $numerator,
|
||||
'Denominator' => $denominator,
|
||||
'Warning' => $warning,
|
||||
'Data' => $msg,
|
||||
);
|
||||
fwrite($this->stderr, json_encode($result));
|
||||
}
|
||||
|
||||
public function debug($msg) {
|
||||
$result = array(
|
||||
'Debug' => $msg,
|
||||
);
|
||||
fwrite($this->stderr, json_encode($result));
|
||||
}
|
||||
|
||||
public function close() {
|
||||
fclose($this->stderr);
|
||||
}
|
||||
|
||||
public function argv($index) {
|
||||
$argv = $_SERVER['argv'];
|
||||
return isset($argv[$index]) ? $argv[$index] : null;
|
||||
}
|
||||
|
||||
public function argc() {
|
||||
$argc = $_SERVER['argc'];
|
||||
return $argc;
|
||||
}
|
||||
}
|
11
example/exec-worker/lib/bootstrap.php
Normal file
11
example/exec-worker/lib/bootstrap.php
Normal file
@ -0,0 +1,11 @@
|
||||
<?php
|
||||
function autoloader($class) {
|
||||
$names = split("_", $class);
|
||||
if (count($names) > 1) {
|
||||
include implode(DIRECTORY_SEPARATOR, $names) . '.php';
|
||||
} else {
|
||||
include $class . '.php';
|
||||
}
|
||||
}
|
||||
|
||||
spl_autoload_register('autoloader');
|
47
example/exec-worker/log.go
Normal file
47
example/exec-worker/log.go
Normal file
@ -0,0 +1,47 @@
|
||||
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a commercial
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bitbucket.org/mikespook/golib/log"
|
||||
"flag"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var (
|
||||
logfile = flag.String("log", "",
|
||||
"Log file to write errors and information to." +
|
||||
" Empty string output to STDOUT.")
|
||||
loglevel = flag.String("log-level", "all", "Log level to record." +
|
||||
" Values 'error', 'warning', 'message', 'debug', 'all' and 'none'" +
|
||||
" are accepted. Use '|' to combine more levels.")
|
||||
)
|
||||
|
||||
func initLog() {
|
||||
level := log.LogNone
|
||||
levels := strings.SplitN(*loglevel, "|", -1)
|
||||
for _, v := range levels {
|
||||
switch v {
|
||||
case "none":
|
||||
level = level | log.LogNone
|
||||
break
|
||||
case "error":
|
||||
level = level | log.LogError
|
||||
case "warning":
|
||||
level = level | log.LogWarning
|
||||
case "message":
|
||||
level = level | log.LogMessage
|
||||
case "debug":
|
||||
level = level | log.LogDebug
|
||||
case "all":
|
||||
level = log.LogAll
|
||||
default:
|
||||
}
|
||||
}
|
||||
if err := log.Init(*logfile, level); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
8
example/exec-worker/script/default.php
Normal file
8
example/exec-worker/script/default.php
Normal file
@ -0,0 +1,8 @@
|
||||
<?php
|
||||
define('ISCLI', PHP_SAPI === 'cli');
|
||||
if (!ISCLI) {
|
||||
die("cli only!");
|
||||
}
|
||||
define("ROOT", dirname(__FILE__));
|
||||
define("LIB", ROOT . "/../lib/");
|
||||
include_once(LIB . "bootstrap.php");
|
8
example/exec-worker/script/foobar.php
Normal file
8
example/exec-worker/script/foobar.php
Normal file
@ -0,0 +1,8 @@
|
||||
<?php
|
||||
include("default.php");
|
||||
$x = new X();
|
||||
$x->sendMsg("test");
|
||||
$x->debug("debug message");
|
||||
sleep(10);
|
||||
$x->close();
|
||||
exit(0);
|
106
example/exec-worker/worker.go
Normal file
106
example/exec-worker/worker.go
Normal file
@ -0,0 +1,106 @@
|
||||
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
|
||||
// All rights reserved.
|
||||
// Use of this source code is governed by a commercial
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"flag"
|
||||
"time"
|
||||
"bitbucket.org/mikespook/golib/log"
|
||||
"bitbucket.org/mikespook/golib/pid"
|
||||
"bitbucket.org/mikespook/golib/prof"
|
||||
"bitbucket.org/mikespook/golib/signal"
|
||||
"bitbucket.org/mikespook/gearman-go/worker"
|
||||
)
|
||||
|
||||
var (
|
||||
pidfile = flag.String("pid", "/run/seedworker.pid",
|
||||
"PID file to write pid")
|
||||
proffile = flag.String("prof", "", "Profiling file")
|
||||
dumpfile = flag.String("dump", "", "Heap dumping file")
|
||||
dumptime = flag.Int("dumptime", 5, "Heap dumping time interval")
|
||||
|
||||
joblimit = flag.Int("job-limit", worker.Unlimited,
|
||||
"Maximum number of concurrently executing job." +
|
||||
" Zero is unlimited.")
|
||||
|
||||
basedir = flag.String("basedir", "", "Working directory of the php scripts.")
|
||||
timeout = flag.Uint("timeout", 30, "Executing time out.")
|
||||
gearmand = flag.String("gearmand", "127.0.0.1:4730", "Address and port of gearmand")
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
||||
flag.Parse()
|
||||
initLog()
|
||||
|
||||
if *basedir == "" {
|
||||
*basedir = "./script/"
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
log.Message("Starting ... ")
|
||||
defer func() {
|
||||
time.Sleep(time.Second)
|
||||
log.Message("Shutdown complate!")
|
||||
}()
|
||||
|
||||
// init profiling file
|
||||
if *proffile != "" {
|
||||
log.Debugf("Open a profiling file: %s", *proffile)
|
||||
if err := prof.Start(*proffile); err != nil {
|
||||
log.Error(err)
|
||||
} else {
|
||||
defer prof.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// init heap dumping file
|
||||
if *dumpfile != "" {
|
||||
log.Debugf("Open a heap dumping file: %s", *dumpfile)
|
||||
if err := prof.NewDump(*dumpfile); err != nil {
|
||||
log.Error(err)
|
||||
} else {
|
||||
defer prof.CloseDump()
|
||||
go func() {
|
||||
for prof.Dumping {
|
||||
time.Sleep(time.Duration(*dumptime) * time.Second)
|
||||
prof.Dump()
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// init pid file
|
||||
log.Debugf("Open a pid file: %s", *pidfile)
|
||||
if pidFile, err := pid.New(*pidfile); err != nil {
|
||||
log.Error(err)
|
||||
} else {
|
||||
defer pidFile.Close()
|
||||
}
|
||||
|
||||
w := worker.New(*joblimit)
|
||||
if err := w.AddServer(*gearmand); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
if err := w.AddFunc("exec", execShell, uint32(*timeout)); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
if err := w.AddFunc("execphp", execPHP, uint32(*timeout)); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
defer w.Close()
|
||||
go w.Work()
|
||||
|
||||
// signal handler
|
||||
sh := signal.NewHandler()
|
||||
sh.Bind(os.Interrupt, func() bool {return true})
|
||||
sh.Loop()
|
||||
}
|
Loading…
Reference in New Issue
Block a user