merge, client need to refactor

This commit is contained in:
Xing Xing 2013-12-24 22:04:10 +08:00
commit 3aa95042e6
30 changed files with 565 additions and 856 deletions

View File

@ -1,3 +1,4 @@
language: go
- 1.2
before_install:
- sudo apt-get install -qq gearman-job-server

View File

@ -34,7 +34,12 @@ Usage
w.AddServer("127.0.0.1:4730")
w.AddFunc("ToUpper", ToUpper, worker.Immediately)
w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
w.Work()
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work()
## Client

View File

@ -9,7 +9,7 @@ import (
"io"
"net"
"sync"
"fmt"
// "fmt"
)
/*
@ -21,13 +21,14 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
*/
type Client struct {
sync.Mutex
net, addr, lastcall string
respHandler map[string]ResponseHandler
innerHandler map[string]ResponseHandler
in chan []byte
isConn bool
conn net.Conn
mutex sync.RWMutex
ErrorHandler ErrorHandler
}
@ -115,7 +116,6 @@ func (client *Client) readLoop() {
client.err(err)
continue
}
fmt.Printf("[%X]", data)
client.in <- data
}
close(client.in)
@ -141,23 +141,25 @@ func (client *Client) processLoop() {
continue
}
leftdata = nil
switch resp.DataType {
case ERROR:
if client.lastcall != "" {
client.handleInner(client.lastcall, resp)
client.lastcall = ""
} else {
client.err(GetError(resp.Data))
for resp != nil {
switch resp.DataType {
case ERROR:
if client.lastcall != "" {
resp = client.handleInner(client.lastcall, resp)
client.lastcall = ""
} else {
client.err(GetError(resp.Data))
}
case STATUS_RES:
resp = client.handleInner("s"+resp.Handle, resp)
case JOB_CREATED:
resp = client.handleInner("c", resp)
case ECHO_RES:
resp = client.handleInner("e", resp)
case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE,
WORK_FAIL, WORK_EXCEPTION:
resp = client.handleResponse(resp.Handle, resp)
}
case STATUS_RES:
client.handleInner("s"+resp.Handle, resp)
case JOB_CREATED:
client.handleInner("c", resp)
case ECHO_RES:
client.handleInner("e", resp)
case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE,
WORK_FAIL, WORK_EXCEPTION:
client.handleResponse(resp.Handle, resp)
}
if len(data) > l {
leftdata = data[l:]
@ -173,44 +175,44 @@ func (client *Client) err(e error) {
}
// job handler
func (client *Client) handleResponse(key string, resp *Response) {
client.mutex.RLock()
defer client.mutex.RUnlock()
func (client *Client) handleResponse(key string, resp *Response) *Response {
if h, ok := client.respHandler[key]; ok {
h(resp)
delete(client.respHandler, key)
return nil
}
return resp
}
// job handler
func (client *Client) handleInner(key string, resp *Response) {
func (client *Client) handleInner(key string, resp *Response) * Response {
if h, ok := client.innerHandler[key]; ok {
h(resp)
delete(client.innerHandler, key)
return nil
}
return resp
}
// Internal do
func (client *Client) do(funcname string, data []byte,
flag uint32) (handle string, err error) {
var wg sync.WaitGroup
wg.Add(1)
client.mutex.RLock()
var mutex sync.Mutex
mutex.Lock()
client.lastcall = "c"
client.innerHandler["c"] = ResponseHandler(func(resp *Response) {
defer wg.Done()
defer client.mutex.RUnlock()
client.innerHandler["c"] = func(resp *Response) {
if resp.DataType == ERROR {
err = GetError(resp.Data)
return
}
handle = resp.Handle
})
mutex.Unlock()
}
id := IdGen.Id()
req := getJob(id, []byte(funcname), data)
req.DataType = flag
client.write(req)
wg.Wait()
mutex.Lock()
return
}
@ -232,8 +234,6 @@ func (client *Client) Do(funcname string, data []byte,
datatype = SUBMIT_JOB
}
handle, err = client.do(funcname, data, datatype)
client.mutex.Lock()
defer client.mutex.Unlock()
if h != nil {
client.respHandler[handle] = h
}
@ -262,41 +262,39 @@ func (client *Client) DoBg(funcname string, data []byte,
// Get job status from job server.
// !!!Not fully tested.!!!
func (client *Client) Status(handle string) (status *Status, err error) {
var wg sync.WaitGroup
wg.Add(1)
client.mutex.Lock()
var mutex sync.Mutex
mutex.Lock()
client.lastcall = "s" + handle
client.innerHandler["s"+handle] = ResponseHandler(func(resp *Response) {
defer wg.Done()
defer client.mutex.Unlock()
client.innerHandler["s"+handle] = func(resp *Response) {
var err error
status, err = resp.Status()
if err != nil {
client.err(err)
}
})
mutex.Unlock()
}
req := getRequest()
req.DataType = GET_STATUS
req.Data = []byte(handle)
client.write(req)
wg.Wait()
mutex.Lock()
return
}
// Send a something out, get the samething back.
func (client *Client) Echo(data []byte) (echo []byte, err error) {
var wg sync.WaitGroup
wg.Add(1)
client.innerHandler["e"] = ResponseHandler(func(resp *Response) {
defer wg.Done()
var mutex sync.Mutex
mutex.Lock()
client.innerHandler["e"] = func(resp *Response) {
echo = resp.Data
})
mutex.Unlock()
}
req := getRequest()
req.DataType = ECHO_REQ
req.Data = data
client.write(req)
client.lastcall = "e"
wg.Wait()
client.write(req)
mutex.Lock()
return
}

View File

@ -9,7 +9,7 @@ import (
"encoding/binary"
)
// request
// Request from client
type request struct {
DataType uint32
Data []byte

BIN
example/client/client Executable file

Binary file not shown.

View File

@ -1,35 +1,35 @@
package main
import (
"github.com/mikespook/gearman-go/client"
"log"
"sync"
"github.com/mikespook/gearman-go/client"
)
func main() {
var wg sync.WaitGroup
// Set the autoinc id generator
// You can write your own id generator
// You can write your own id generator
// by implementing IdGenerator interface.
// client.IdGen = client.NewAutoIncId()
c, err := client.New("tcp4", "127.0.0.1:4730")
c, err := client.New("tcp4", "127.0.0.1:4730")
if err != nil {
log.Fatalln(err)
}
defer c.Close()
c.ErrorHandler = func(e error) {
c.ErrorHandler = func(e error) {
log.Println(e)
}
echo := []byte("Hello\x00 world")
wg.Add(1)
echomsg, err := c.Echo(echo)
echomsg, err := c.Echo(echo)
if err != nil {
log.Fatalln(err)
}
log.Println(string(echomsg))
wg.Done()
jobHandler := func(job *client.Response) {
jobHandler := func(job *client.Job) {
log.Printf("%s", job.Data)
wg.Done()
}
@ -38,7 +38,7 @@ func main() {
log.Fatalln(err)
}
wg.Add(1)
status, err := c.Status(handle)
status, err := c.Status(handle)
if err != nil {
log.Fatalln(err)
}

View File

@ -1,7 +0,0 @@
= Exec Worker
This program execute shell or php scripts as backend jobs.
Scripts can communicat with the worker through STDERR using formated output.
USE THIS AT YOUR OWN RASK!!!

View File

@ -1,40 +0,0 @@
<?php
# create our client object
$gmclient= new GearmanClient();
# add the default server (localhost)
$gmclient->addServer();
$data = array(
'Name' => 'foobar',
'Args' => array("0", "1", "2", "3"),
);
$c = isset($_SERVER['argv'][1]) ? $_SERVER['argv'][1] : 10;
for ($i = 0; $i < $c; $i ++) {
# run reverse client in the background
$job_handle = $gmclient->doBackground("execphp", json_encode($data));
if ($gmclient->returnCode() != GEARMAN_SUCCESS) {
echo "bad return code\n";
exit;
}
}
/*
$data = array(
'Name' => 'notexists',
'Args' => array("0", "1", "2", "3"),
);
# run reverse client in the background
$job_handle = $gmclient->doBackground("exec", json_encode($data));
if ($gmclient->returnCode() != GEARMAN_SUCCESS)
{
echo "bad return code\n";
exit;
}
*/
echo "done!\n";

View File

@ -1,123 +0,0 @@
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a commercial
// license that can be found in the LICENSE file.
package main
import (
"io"
"bytes"
"os/exec"
"encoding/json"
"github.com/mikespook/golib/log"
"github.com/mikespook/gearman-go/worker"
)
type outData struct {
Numerator, Denominator int
Warning bool
Data []byte
Debug string
}
type ShExec struct {
Name, basedir string
Args []string
*exec.Cmd
job *worker.Job
Logger *log.Logger
}
func NewShExec(basedir string, job *worker.Job) (sh *ShExec, err error) {
sh = &ShExec{
basedir: basedir,
job: job,
Args: make([]string, 0),
}
if err = sh.parse(job.Data); err != nil {
return nil, err
}
return
}
func (sh *ShExec) parse(data []byte) (err error) {
if err = json.Unmarshal(data, sh); err != nil {
return
}
return
}
func (sh *ShExec) Append(args ... string) {
sh.Args = append(sh.Args, args ...)
}
func (sh *ShExec) Prepend(args ... string) {
sh.Args = append(args, sh.Args ...)
}
func (sh *ShExec) Exec() (rslt []byte, err error){
sh.Logger.Debugf("Executing: Handle=%s, Exec=%s, Args=%v",
sh.job.Handle, sh.Name, sh.Args)
sh.Cmd = exec.Command(sh.Name, sh.Args ... )
go func() {
if ok := <-sh.job.Canceled(); ok {
sh.Cmd.Process.Kill()
}
}()
sh.Cmd.Dir = sh.basedir
var buf bytes.Buffer
sh.Cmd.Stdout = &buf
var errPipe io.ReadCloser
if errPipe, err = sh.Cmd.StderrPipe(); err != nil {
return nil, err
}
defer errPipe.Close()
go sh.processErr(errPipe)
if err = sh.Cmd.Run(); err != nil {
return nil, err
}
rslt = buf.Bytes()
return
}
func (sh *ShExec) processErr(pipe io.ReadCloser) {
result := make([]byte, 1024)
var more []byte
for {
n, err := pipe.Read(result)
if err != nil {
if err != io.EOF {
sh.job.UpdateData([]byte(err.Error()), true)
}
return
}
if more != nil {
result = append(more, result[:n]...)
} else {
result = result[:n]
}
if n < 1024 {
var out outData
if err := json.Unmarshal(result, &out); err != nil {
sh.job.UpdateData([]byte(result), true)
return
}
if out.Debug == "" {
if out.Data != nil {
sh.job.UpdateData(out.Data, out.Warning)
}
if out.Numerator != 0 || out.Denominator != 0 {
sh.job.UpdateStatus(out.Numerator, out.Denominator)
}
} else {
sh.Logger.Debugf("Debug: Handle=%s, Exec=%s, Args=%v, Data=%s",
sh.job.Handle, sh.Name, sh.Args, out.Debug)
}
more = nil
} else {
more = result
}
}
}

View File

@ -1,38 +0,0 @@
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a commercial
// license that can be found in the LICENSE file.
package main
import (
"github.com/mikespook/golib/log"
"github.com/mikespook/gearman-go/worker"
)
func execShell(job *worker.Job) (result []byte, err error) {
log.Messagef("[Shell]Received: Handle=%s", job.Handle)
defer log.Messagef("[Shell]Finished: Handle=%s", job.Handle)
log.Debugf("[Shell]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data)
var sh *ShExec
if sh, err = NewShExec(*basedir, job); err != nil {
return
}
sh.Logger = log.DefaultLogger
return sh.Exec()
}
func execPHP(job *worker.Job) (result []byte, err error) {
log.Messagef("[PHP]Received: Handle=%s", job.Handle)
defer log.Messagef("[PHP]Finished: Handle=%s", job.Handle)
log.Debugf("[PHP]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data)
var sh *ShExec
if sh, err = NewShExec(*basedir, job); err != nil {
return
}
sh.Prepend("-f", sh.Name + ".php")
sh.Name = "php"
sh.Logger = log.DefaultLogger
return sh.Exec()
}

View File

@ -1,40 +0,0 @@
<?php
class X {
private $stderr;
function __construct() {
$this->stderr = fopen("php://stderr", "r");
}
public function sendMsg($msg, $warning = false, $numerator = 0, $denominator = 0) {
$result = array(
'Numerator' => $numerator,
'Denominator' => $denominator,
'Warning' => $warning,
'Data' => $msg,
);
fwrite($this->stderr, json_encode($result));
}
public function debug($msg) {
$result = array(
'Debug' => $msg,
);
fwrite($this->stderr, json_encode($result));
}
public function close() {
fclose($this->stderr);
}
public function argv($index) {
$argv = $_SERVER['argv'];
return isset($argv[$index]) ? $argv[$index] : null;
}
public function argc() {
$argc = $_SERVER['argc'];
return $argc;
}
}

View File

@ -1,11 +0,0 @@
<?php
function autoloader($class) {
$names = split("_", $class);
if (count($names) > 1) {
include implode(DIRECTORY_SEPARATOR, $names) . '.php';
} else {
include $class . '.php';
}
}
spl_autoload_register('autoloader');

View File

@ -1,47 +0,0 @@
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a commercial
// license that can be found in the LICENSE file.
package main
import (
"github.com/mikespook/golib/log"
"flag"
"strings"
)
var (
logfile = flag.String("log", "",
"Log file to write errors and information to." +
" Empty string output to STDOUT.")
loglevel = flag.String("log-level", "all", "Log level to record." +
" Values 'error', 'warning', 'message', 'debug', 'all' and 'none'" +
" are accepted. Use '|' to combine more levels.")
)
func initLog() {
level := log.LogNone
levels := strings.SplitN(*loglevel, "|", -1)
for _, v := range levels {
switch v {
case "none":
level = level | log.LogNone
break
case "error":
level = level | log.LogError
case "warning":
level = level | log.LogWarning
case "message":
level = level | log.LogMessage
case "debug":
level = level | log.LogDebug
case "all":
level = log.LogAll
default:
}
}
if err := log.Init(*logfile, level); err != nil {
log.Error(err)
}
}

View File

@ -1,8 +0,0 @@
<?php
define('ISCLI', PHP_SAPI === 'cli');
if (!ISCLI) {
die("cli only!");
}
define("ROOT", dirname(__FILE__));
define("LIB", ROOT . "/../lib/");
include_once(LIB . "bootstrap.php");

View File

@ -1,8 +0,0 @@
<?php
include("default.php");
$x = new X();
$x->sendMsg("test");
$x->debug("debug message");
sleep(10);
$x->close();
exit(0);

View File

@ -1,106 +0,0 @@
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a commercial
// license that can be found in the LICENSE file.
package main
import (
"os"
"flag"
"time"
"github.com/mikespook/golib/log"
"github.com/mikespook/golib/pid"
"github.com/mikespook/golib/prof"
"github.com/mikespook/golib/signal"
"github.com/mikespook/gearman-go/worker"
)
var (
pidfile = flag.String("pid", "/run/seedworker.pid",
"PID file to write pid")
proffile = flag.String("prof", "", "Profiling file")
dumpfile = flag.String("dump", "", "Heap dumping file")
dumptime = flag.Int("dumptime", 5, "Heap dumping time interval")
joblimit = flag.Int("job-limit", worker.Unlimited,
"Maximum number of concurrently executing job." +
" Zero is unlimited.")
basedir = flag.String("basedir", "", "Working directory of the php scripts.")
timeout = flag.Uint("timeout", 30, "Executing time out.")
gearmand = flag.String("gearmand", "127.0.0.1:4730", "Address and port of gearmand")
)
func init() {
flag.Parse()
initLog()
if *basedir == "" {
*basedir = "./script/"
}
}
func main() {
log.Message("Starting ... ")
defer func() {
time.Sleep(time.Second)
log.Message("Shutdown complate!")
}()
// init profiling file
if *proffile != "" {
log.Debugf("Open a profiling file: %s", *proffile)
if err := prof.Start(*proffile); err != nil {
log.Error(err)
} else {
defer prof.Stop()
}
}
// init heap dumping file
if *dumpfile != "" {
log.Debugf("Open a heap dumping file: %s", *dumpfile)
if err := prof.NewDump(*dumpfile); err != nil {
log.Error(err)
} else {
defer prof.CloseDump()
go func() {
for prof.Dumping {
time.Sleep(time.Duration(*dumptime) * time.Second)
prof.Dump()
}
}()
}
}
// init pid file
log.Debugf("Open a pid file: %s", *pidfile)
if pidFile, err := pid.New(*pidfile); err != nil {
log.Error(err)
} else {
defer pidFile.Close()
}
w := worker.New(*joblimit)
if err := w.AddServer(*gearmand); err != nil {
log.Error(err)
return
}
if err := w.AddFunc("exec", execShell, uint32(*timeout)); err != nil {
log.Error(err)
return
}
if err := w.AddFunc("execphp", execPHP, uint32(*timeout)); err != nil {
log.Error(err)
return
}
defer w.Close()
go w.Work()
// signal handler
sh := signal.NewHandler()
sh.Bind(os.Interrupt, func() bool {return true})
sh.Loop()
}

BIN
example/worker/worker Executable file

Binary file not shown.

View File

@ -1,61 +1,60 @@
package main
import (
"os"
"log"
"time"
"strings"
"github.com/mikespook/golib/signal"
"github.com/mikespook/gearman-go/worker"
"github.com/mikespook/gearman-go/worker"
"github.com/mikespook/golib/signal"
"log"
"os"
"strings"
"time"
)
func ToUpper(job *worker.Job) ([]byte, error) {
log.Printf("ToUpper: Handle=[%s]; UID=[%s], Data=[%s]\n",
job.Handle, job.UniqueId, job.Data)
data := []byte(strings.ToUpper(string(job.Data)))
return data, nil
func ToUpper(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data())
data := []byte(strings.ToUpper(string(job.Data())))
return data, nil
}
func ToUpperDelay10(job *worker.Job) ([]byte, error) {
log.Printf("ToUpperDelay10: Handle=[%s]; UID=[%s], Data=[%s]\n",
job.Handle, job.UniqueId, job.Data)
time.Sleep(10 * time.Second)
data := []byte(strings.ToUpper(string(job.Data)))
return data, nil
func ToUpperDelay10(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data())
time.Sleep(10 * time.Second)
data := []byte(strings.ToUpper(string(job.Data())))
return data, nil
}
func main() {
log.Println("Starting ...")
defer log.Println("Shutdown complete!")
w := worker.New(worker.Unlimited)
defer w.Close()
w.ErrHandler = func(e error) {
log.Println(e)
if e == worker.ErrConnection {
proc, err := os.FindProcess(os.Getpid())
if err != nil {
log.Println(err)
}
if err := proc.Signal(os.Interrupt); err != nil {
log.Println(err)
}
}
}
w.JobHandler = func(job *worker.Job) error {
log.Printf("H=%s, UID=%s, Data=%s, DataType=%d\n", job.Handle,
job.UniqueId, job.Data, job.DataType)
return nil
}
w.AddServer("127.0.0.1:4730")
w.AddFunc("ToUpper", ToUpper, worker.Immediately)
w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately)
w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately)
go w.Work()
sh := signal.NewHandler()
sh.Bind(os.Interrupt, func() bool {return true})
sh.Loop()
log.Println("Starting ...")
defer log.Println("Shutdown complete!")
w := worker.New(worker.Unlimited)
defer w.Close()
w.ErrorHandler = func(e error) {
log.Println(e)
if e == worker.ErrConnection {
proc, err := os.FindProcess(os.Getpid())
if err != nil {
log.Println(err)
}
if err := proc.Signal(os.Interrupt); err != nil {
log.Println(err)
}
}
}
w.JobHandler = func(job worker.Job) error {
log.Printf("Data=%s\n", job.Data())
return nil
}
w.AddServer("tcp4", "127.0.0.1:4730")
w.AddFunc("ToUpper", ToUpper, worker.Immediately)
w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately)
w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work()
sh := signal.NewHandler()
sh.Bind(os.Interrupt, func() bool { return true })
sh.Loop()
}

View File

@ -54,6 +54,10 @@ func TestJobs(t *testing.T) {
w.ErrorHandler = func(e error) {
t.Error(e)
}
if err := w.Ready(); err != nil {
t.Error(err)
return
}
go w.Work()
t.Log("Worker is running...")

View File

@ -11,17 +11,16 @@ import (
// The agent of job server.
type agent struct {
conn net.Conn
worker *Worker
in chan []byte
net, addr string
isConn bool
conn net.Conn
worker *Worker
in chan []byte
net, addr string
}
// Create the agent of job server.
func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
a = &agent{
net: net,
net: net,
addr: addr,
worker: worker,
in: make(chan []byte, QUEUE_SIZE),
@ -34,44 +33,16 @@ func (a *agent) Connect() (err error) {
if err != nil {
return
}
a.isConn = true
go a.work()
return
}
func (a *agent) Work() {
go a.readLoop()
var resp *Response
func (a *agent) work() {
var inpack *inPack
var l int
var err error
var data, leftdata []byte
for data = range a.in {
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
}
l = len(data)
if l < MIN_PACKET_LEN { // not enough data
leftdata = data
continue
}
if resp, l, err = decodeResponse(data); err != nil {
a.worker.err(err)
continue
}
leftdata = nil
resp.agentId = a.net + a.addr
a.worker.in <- resp
if len(data) > l {
leftdata = data[l:]
}
}
}
// read data from socket
func (a *agent) readLoop() {
var data []byte
var err error
for a.isConn {
for {
if data, err = a.read(BUFFER_SIZE); err != nil {
if err == ErrConnClosed {
break
@ -79,15 +50,42 @@ func (a *agent) readLoop() {
a.worker.err(err)
continue
}
a.in <- data
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
}
if len(data) < MIN_PACKET_LEN { // not enough data
leftdata = data
continue
}
if inpack, l, err = decodeInPack(data); err != nil {
a.worker.err(err)
continue
}
leftdata = nil
inpack.a = a
a.worker.in <- inpack
if len(data) > l {
leftdata = data[l:]
}
}
close(a.in)
}
func (a *agent) Close() {
a.conn.Close()
}
func (a *agent) Grab() {
outpack := getOutPack()
outpack.dataType = GRAB_JOB_UNIQ
a.write(outpack)
}
func (a *agent) PreSleep() {
outpack := getOutPack()
outpack.dataType = PRE_SLEEP
a.write(outpack)
}
// read length bytes from the socket
func (a *agent) read(length int) (data []byte, err error) {
n := 0
@ -95,13 +93,11 @@ func (a *agent) read(length int) (data []byte, err error) {
// read until data can be unpacked
for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n {
if n, err = a.conn.Read(buf); err != nil {
if !a.isConn {
err = ErrConnClosed
return
}
if err == io.EOF && n == 0 {
if data == nil {
err = ErrConnection
} else {
err = ErrConnClosed
}
}
return
@ -115,9 +111,9 @@ func (a *agent) read(length int) (data []byte, err error) {
}
// Internal write the encoded job.
func (a *agent) write(req *request) (err error) {
func (a *agent) write(outpack *outPack) (err error) {
var n int
buf := req.Encode()
buf := outpack.Encode()
for i := 0; i < len(buf); i += n {
n, err = a.conn.Write(buf[i:])
if err != nil {

View File

@ -41,6 +41,7 @@ const (
STATUS_RES = 20
SET_CLIENT_ID = 22
CAN_DO_TIMEOUT = 23
ALL_YOURS = 24
WORK_EXCEPTION = 25
WORK_DATA = 28
WORK_WARNING = 29

108
worker/inpack.go Normal file
View File

@ -0,0 +1,108 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
)
// Worker side job
type inPack struct {
dataType uint32
data []byte
handle, uniqueId, fn string
a *agent
}
// Create a new job
func getInPack() *inPack {
return &inPack{}
}
func (inpack *inPack) Data() []byte {
return inpack.data
}
// Send some datas to client.
// Using this in a job's executing.
func (inpack *inPack) SendData(data []byte) {
outpack := getOutPack()
outpack.dataType = WORK_DATA
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
func (inpack *inPack) SendWarning(data []byte) {
outpack := getOutPack()
outpack.dataType = WORK_WARNING
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
// Update status.
// Tall client how many percent job has been executed.
func (inpack *inPack) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
outpack := getOutPack()
outpack.dataType = WORK_STATUS
hl := len(inpack.handle)
nl := len(n)
dl := len(d)
outpack.data = getBuffer(hl + nl + dl + 3)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], n)
copy(outpack.data[hl+nl+2:], d)
inpack.a.write(outpack)
}
// Decode job from byte slice
func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %V", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %V", data)
return
}
inpack = getInPack()
inpack.dataType = binary.BigEndian.Uint32(data[4:8])
switch inpack.dataType {
case JOB_ASSIGN:
s := bytes.SplitN(dt, []byte{'\x00'}, 3)
if len(s) == 3 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.data = s[2]
}
case JOB_ASSIGN_UNIQ:
s := bytes.SplitN(dt, []byte{'\x00'}, 4)
if len(s) == 4 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.uniqueId = string(s[2])
inpack.data = s[3]
}
default:
inpack.data = dt
}
l = dl + MIN_PACKET_LEN
return
}

62
worker/inpack_test.go Normal file
View File

@ -0,0 +1,62 @@
package worker
import (
"bytes"
"testing"
)
var (
inpackcases = map[uint32]map[string]string{
NOOP: map[string]string{
"src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00",
},
NO_JOB: map[string]string{
"src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00",
},
JOB_ASSIGN: map[string]string{
"src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz",
"handle": "a",
"fn": "b",
"data": "xyz",
},
JOB_ASSIGN_UNIQ: map[string]string{
"src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz",
"handle": "a",
"fn": "b",
"uid": "c",
"data": "xyz",
},
}
)
func TestInPack(t *testing.T) {
for k, v := range inpackcases {
inpack, _, err := decodeInPack([]byte(v["src"]))
if err != nil {
t.Error(err)
}
if inpack.dataType != k {
t.Errorf("DataType: %d expected, %d got.", k, inpack.dataType)
}
if handle, ok := v["handle"]; ok {
if inpack.handle != handle {
t.Errorf("Handle: %s expected, %s got.", handle, inpack.handle)
}
}
if fn, ok := v["fn"]; ok {
if inpack.fn != fn {
t.Errorf("FuncName: %s expected, %s got.", fn, inpack.fn)
}
}
if uid, ok := v["uid"]; ok {
if inpack.uniqueId != uid {
t.Errorf("UID: %s expected, %s got.", uid, inpack.uniqueId)
}
}
if data, ok := v["data"]; ok {
if bytes.Compare([]byte(data), inpack.data) != 0 {
t.Errorf("UID: %v expected, %v got.", data, inpack.data)
}
}
}
}

View File

@ -1,68 +1,8 @@
package worker
import (
"strconv"
)
type Job interface {
Data() []byte
SendWarning(data []byte)
SendData(data []byte)
UpdateStatus(numerator, denominator int)
}
type _job struct {
a *agent
Handle string
data []byte
}
func getJob() *_job {
return &_job{}
}
func (j *_job) Data() []byte {
return j.data
}
// Send some datas to client.
// Using this in a job's executing.
func (j *_job) SendData(data []byte) {
req := getRequest()
req.DataType = WORK_DATA
hl := len(j.Handle)
l := hl + len(data) + 1
req.Data = getBuffer(l)
copy(req.Data, []byte(j.Handle))
copy(req.Data[hl + 1:], data)
j.a.write(req)
}
func (j *_job) SendWarning(data []byte) {
req := getRequest()
req.DataType = WORK_WARNING
hl := len(j.Handle)
l := hl + len(data) + 1
req.Data = getBuffer(l)
copy(req.Data, []byte(j.Handle))
copy(req.Data[hl + 1:], data)
j.a.write(req)
}
// Update status.
// Tall client how many percent job has been executed.
func (j *_job) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
req := getRequest()
req.DataType = WORK_STATUS
hl := len(j.Handle)
nl := len(n)
dl := len(d)
req.Data = getBuffer(hl + nl + dl + 3)
copy(req.Data, []byte(j.Handle))
copy(req.Data[hl+1:], n)
copy(req.Data[hl+nl+2:], d)
j.a.write(req)
}

53
worker/outpack.go Normal file
View File

@ -0,0 +1,53 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker
import (
// "fmt"
"encoding/binary"
)
// Worker side job
type outPack struct {
dataType uint32
data []byte
handle string
}
func getOutPack() (outpack *outPack) {
// TODO pool
return &outPack{}
}
// Encode a job to byte slice
func (outpack *outPack) Encode() (data []byte) {
var l int
if outpack.dataType == WORK_FAIL {
l = len(outpack.handle)
} else {
l = len(outpack.data)
if outpack.handle != "" {
l += len(outpack.handle) + 1
}
}
data = getBuffer(l + MIN_PACKET_LEN)
binary.BigEndian.PutUint32(data[:4], REQ)
binary.BigEndian.PutUint32(data[4:8], outpack.dataType)
binary.BigEndian.PutUint32(data[8:MIN_PACKET_LEN], uint32(l))
i := MIN_PACKET_LEN
if outpack.handle != "" {
hi := len(outpack.handle) + i
copy(data[i:hi], []byte(outpack.handle))
if outpack.dataType != WORK_FAIL {
data[hi] = '\x00'
}
i = hi + 1
}
if outpack.dataType != WORK_FAIL {
copy(data[i:], outpack.data)
}
return
}

83
worker/outpack_test.go Normal file
View File

@ -0,0 +1,83 @@
package worker
import (
"bytes"
"testing"
)
var (
outpackcases = map[uint32]map[string]string{
CAN_DO: map[string]string{
"src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a",
"data": "a",
},
CAN_DO_TIMEOUT: map[string]string{
"src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01",
"data": "a\x00\x00\x00\x00\x01",
},
CANT_DO: map[string]string{
"src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a",
"data": "a",
},
RESET_ABILITIES: map[string]string{
"src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00",
},
PRE_SLEEP: map[string]string{
"src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00",
},
GRAB_JOB: map[string]string{
"src": "\x00REQ\x00\x00\x00\x09\x00\x00\x00\x00",
},
GRAB_JOB_UNIQ: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1E\x00\x00\x00\x00",
},
WORK_DATA: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1C\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
WORK_WARNING: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
WORK_STATUS: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100",
"data": "a\x0050\x00100",
},
WORK_COMPLETE: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
WORK_FAIL: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a",
"handle": "a",
},
WORK_EXCEPTION: map[string]string{
"src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
SET_CLIENT_ID: map[string]string{
"src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a",
"data": "a",
},
ALL_YOURS: map[string]string{
"src": "\x00REQ\x00\x00\x00\x18\x00\x00\x00\x00",
},
}
)
func TestOutPack(t *testing.T) {
for k, v := range outpackcases {
outpack := getOutPack()
outpack.dataType = k
if handle, ok := v["handle"]; ok {
outpack.handle = handle
}
if data, ok := v["data"]; ok {
outpack.data = []byte(data)
}
data := outpack.Encode()
if bytes.Compare([]byte(v["src"]), data) != 0 {
t.Errorf("%d: %X expected, %X got.", k, v["src"], data)
}
}
}

View File

@ -1,50 +0,0 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker
import (
"encoding/binary"
)
// Worker side job
type request struct {
DataType uint32
Data []byte
Handle, UniqueId, Fn string
}
func getRequest() (req *request) {
// TODO pool
return &request{}
}
// Encode a job to byte slice
func (req *request) Encode() (data []byte) {
var l int
if req.DataType == WORK_FAIL {
l = len(req.Handle)
} else {
l = len(req.Data)
if req.Handle != "" {
l += len(req.Handle) + 1
}
}
data = getBuffer(l + MIN_PACKET_LEN)
binary.BigEndian.PutUint32(data[:4], REQ)
binary.BigEndian.PutUint32(data[4:8], req.DataType)
binary.BigEndian.PutUint32(data[8:MIN_PACKET_LEN], uint32(l))
i := MIN_PACKET_LEN
if req.Handle != "" {
hi := len(req.Handle) + i
copy(data[i:hi], []byte(req.Handle))
if req.DataType != WORK_FAIL {
data[hi] = '\x00'
}
i = i + hi
}
copy(data[i:], req.Data)
return
}

View File

@ -1,62 +0,0 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker
import (
"bytes"
"fmt"
"encoding/binary"
)
// Worker side job
type Response struct {
DataType uint32
Data []byte
Handle, UniqueId, Fn string
agentId string
}
// Create a new job
func getResponse() (resp *Response) {
return &Response{}
}
// Decode job from byte slice
func decodeResponse(data []byte) (resp *Response, l int, err error) {
if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %V", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %V", data)
return
}
resp = getResponse()
resp.DataType = binary.BigEndian.Uint32(data[4:8])
switch resp.DataType {
case JOB_ASSIGN:
s := bytes.SplitN(dt, []byte{'\x00'}, 3)
if len(s) == 3 {
resp.Handle = string(s[0])
resp.Fn = string(s[1])
resp.Data = s[2]
}
case JOB_ASSIGN_UNIQ:
s := bytes.SplitN(dt, []byte{'\x00'}, 4)
if len(s) == 4 {
resp.Handle = string(s[0])
resp.Fn = string(s[1])
resp.UniqueId = string(s[2])
resp.Data = s[3]
}
default:
resp.Data = dt
}
l = dl + MIN_PACKET_LEN
return
}

View File

@ -5,16 +5,13 @@
package worker
import (
"fmt"
"time"
"sync"
"encoding/binary"
"fmt"
"sync"
"time"
)
const (
Unlimited = 0
OneByOne = 1
Immediately = 0
)
@ -22,7 +19,7 @@ const (
Worker side api for gearman
usage:
w = worker.New(worker.Unlimited)
w = worker.New()
w.AddFunction("foobar", foobar)
w.AddServer("127.0.0.1:4730")
w.Work() // Enter the worker's main loop
@ -37,28 +34,24 @@ func foobar(job *Job) (data []byte, err os.Error) {
}
*/
type Worker struct {
agents map[string]*agent
agents []*agent
funcs JobFuncs
in chan *Response
in chan *inPack
running bool
limit chan bool
Id string
// assign a ErrFunc to handle errors
ErrorHandler ErrorHandler
JobHandler JobHandler
mutex sync.Mutex
JobHandler JobHandler
mutex sync.Mutex
}
// Get a new worker
func New(l int) (worker *Worker) {
func New() (worker *Worker) {
worker = &Worker{
agents: make(map[string]*agent, QUEUE_SIZE),
agents: make([]*agent, 0),
funcs: make(JobFuncs),
in: make(chan *Response, QUEUE_SIZE),
}
if l != Unlimited {
worker.limit = make(chan bool, l)
in: make(chan *inPack, QUEUE_SIZE),
}
return
}
@ -78,16 +71,16 @@ func (worker *Worker) AddServer(net, addr string) (err error) {
if err != nil {
return err
}
worker.agents[net + addr] = a
worker.agents = append(worker.agents, a)
return
}
// Write a job to job server.
// Here, the job's mean is not the oraginal mean.
// Just looks like a network package for job's result or tell job server, there was a fail.
func (worker *Worker) broadcast(req *request) {
func (worker *Worker) broadcast(outpack *outPack) {
for _, v := range worker.agents {
v.write(req)
v.write(outpack)
}
}
@ -110,19 +103,19 @@ func (worker *Worker) AddFunc(funcname string,
// inner add function
func (worker *Worker) addFunc(funcname string, timeout uint32) {
req := getRequest()
outpack := getOutPack()
if timeout == 0 {
req.DataType = CAN_DO
req.Data = []byte(funcname)
outpack.dataType = CAN_DO
outpack.data = []byte(funcname)
} else {
req.DataType = CAN_DO_TIMEOUT
outpack.dataType = CAN_DO_TIMEOUT
l := len(funcname)
req.Data = getBuffer(l + 5)
copy(req.Data, []byte(funcname))
req.Data[l] = '\x00'
binary.BigEndian.PutUint32(req.Data[l + 1:], timeout)
outpack.data = getBuffer(l + 5)
copy(outpack.data, []byte(funcname))
outpack.data[l] = '\x00'
binary.BigEndian.PutUint32(outpack.data[l+1:], timeout)
}
worker.broadcast(req)
worker.broadcast(outpack)
}
// Remove a function.
@ -141,63 +134,57 @@ func (worker *Worker) RemoveFunc(funcname string) (err error) {
// inner remove function
func (worker *Worker) removeFunc(funcname string) {
req := getRequest()
req.DataType = CANT_DO
req.Data = []byte(funcname)
worker.broadcast(req)
outpack := getOutPack()
outpack.dataType = CANT_DO
outpack.data = []byte(funcname)
worker.broadcast(outpack)
}
func (worker *Worker) dealResp(resp *Response) {
defer func() {
if worker.running && worker.limit != nil {
<-worker.limit
}
}()
switch resp.DataType {
func (worker *Worker) handleInPack(inpack *inPack) {
switch inpack.dataType {
case NO_JOB:
inpack.a.PreSleep()
case NOOP:
inpack.a.Grab()
case ERROR:
worker.err(GetError(resp.Data))
worker.err(GetError(inpack.data))
case JOB_ASSIGN, JOB_ASSIGN_UNIQ:
if err := worker.exec(resp); err != nil {
if err := worker.exec(inpack); err != nil {
worker.err(err)
}
default:
worker.handleResponse(resp)
worker.customeHandler(inpack)
}
}
func (worker *Worker) Ready() (err error) {
for _, v := range worker.agents {
if err = v.Connect(); err != nil {
return
}
}
for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout)
}
return
}
// Main loop
func (worker *Worker) Work() {
defer func() {
for _, v := range worker.agents {
v.Close()
}
}()
worker.running = true
for _, v := range worker.agents {
v.Connect()
go v.Work()
v.Grab()
}
worker.Reset()
for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout)
}
var resp *Response
for resp = range worker.in {
fmt.Println(resp)
go worker.dealResp(resp)
var inpack *inPack
for inpack = range worker.in {
go worker.handleInPack(inpack)
}
}
// job handler
func (worker *Worker) handleResponse(resp *Response) {
func (worker *Worker) customeHandler(inpack *inPack) {
if worker.JobHandler != nil {
job := getJob()
job.a = worker.agents[resp.agentId]
job.Handle = resp.Handle
if resp.DataType == ECHO_RES {
job.data = resp.Data
}
if err := worker.JobHandler(job); err != nil {
if err := worker.JobHandler(inpack); err != nil {
worker.err(err)
}
}
@ -207,39 +194,36 @@ func (worker *Worker) handleResponse(resp *Response) {
func (worker *Worker) Close() {
worker.running = false
close(worker.in)
if worker.limit != nil {
close(worker.limit)
}
}
// Send a something out, get the samething back.
func (worker *Worker) Echo(data []byte) {
req := getRequest()
req.DataType = ECHO_REQ
req.Data = data
worker.broadcast(req)
outpack := getOutPack()
outpack.dataType = ECHO_REQ
outpack.data = data
worker.broadcast(outpack)
}
// Remove all of functions.
// Both from the worker or job servers.
func (worker *Worker) Reset() {
req := getRequest()
req.DataType = RESET_ABILITIES
worker.broadcast(req)
outpack := getOutPack()
outpack.dataType = RESET_ABILITIES
worker.broadcast(outpack)
worker.funcs = make(JobFuncs)
}
// Set the worker's unique id.
func (worker *Worker) SetId(id string) {
worker.Id = id
req := getRequest()
req.DataType = SET_CLIENT_ID
req.Data = []byte(id)
worker.broadcast(req)
outpack := getOutPack()
outpack.dataType = SET_CLIENT_ID
outpack.data = []byte(id)
worker.broadcast(outpack)
}
// Execute the job. And send back the result.
func (worker *Worker) exec(resp *Response) (err error) {
func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() {
if r := recover(); r != nil {
if e, ok := r.(error); ok {
@ -249,34 +233,33 @@ func (worker *Worker) exec(resp *Response) (err error) {
}
}
}()
f, ok := worker.funcs[resp.Fn]
f, ok := worker.funcs[inpack.fn]
if !ok {
return fmt.Errorf("The function does not exist: %s", resp.Fn)
return fmt.Errorf("The function does not exist: %s", inpack.fn)
}
var r *result
job := getJob()
job.a = worker.agents[resp.agentId]
job.Handle = resp.Handle
if f.timeout == 0 {
d, e := f.f(job)
d, e := f.f(inpack)
r = &result{data: d, err: e}
} else {
r = execTimeout(f.f, job, time.Duration(f.timeout)*time.Second)
r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
}
req := getRequest()
if r.err == nil {
req.DataType = WORK_COMPLETE
} else {
if r.data == nil {
req.DataType = WORK_FAIL
} else {
req.DataType = WORK_EXCEPTION
}
err = r.err
}
req.Data = r.data
if worker.running {
job.a.write(req)
outpack := getOutPack()
if r.err == nil {
outpack.dataType = WORK_COMPLETE
} else {
if len(r.data) == 0 {
outpack.dataType = WORK_FAIL
} else {
outpack.dataType = WORK_EXCEPTION
}
err = r.err
}
outpack.handle = inpack.handle
outpack.data = r.data
inpack.a.write(outpack)
inpack.a.Grab()
}
return
}

View File

@ -1,6 +1,9 @@
package worker
import "testing"
import (
"sync"
"testing"
)
var worker *Worker
@ -44,7 +47,20 @@ func TestWorkerRemoveFunc(t *testing.T) {
}
func TestWork(t *testing.T) {
var wg sync.WaitGroup
worker.JobHandler = func(job Job) error {
t.Logf("%s", job.Data())
wg.Done()
return nil
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
worker.Echo([]byte("Hello"))
wg.Wait()
}
func TestWorkerClose(t *testing.T) {