@@ -20,13 +20,13 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) | |||
*/ | |||
type Client struct { | |||
net, addr, lastcall string | |||
respHandler map[string]ResponseHandler | |||
innerHandler map[string]ResponseHandler | |||
in chan []byte | |||
isConn bool | |||
conn net.Conn | |||
mutex sync.RWMutex | |||
net, addr, lastcall string | |||
respHandler map[string]ResponseHandler | |||
innerHandler map[string]ResponseHandler | |||
in chan []byte | |||
isConn bool | |||
conn net.Conn | |||
mutex sync.RWMutex | |||
ErrorHandler ErrorHandler | |||
} | |||
@@ -265,7 +265,7 @@ func (client *Client) Status(handle string) (status *Status, err error) { | |||
wg.Add(1) | |||
client.mutex.Lock() | |||
client.lastcall = "s" + handle | |||
client.innerHandler["s" + handle] = ResponseHandler(func(resp *Response) { | |||
client.innerHandler["s"+handle] = ResponseHandler(func(resp *Response) { | |||
defer wg.Done() | |||
defer client.mutex.Unlock() | |||
var err error | |||
@@ -6,9 +6,9 @@ | |||
package client | |||
import ( | |||
"fmt" | |||
"bytes" | |||
"errors" | |||
"fmt" | |||
) | |||
var ( | |||
@@ -40,3 +40,6 @@ func GetError(data []byte) (err error) { | |||
err = errors.New(fmt.Sprintf("%s: %s", rel[0], rel[1])) | |||
return | |||
} | |||
// Error handler | |||
type ErrorHandler func(error) |
@@ -1,11 +0,0 @@ | |||
package client | |||
// Response handler | |||
type ResponseHandler func(*Response) | |||
// Error handler | |||
type ErrorHandler func(error) | |||
// Status handler | |||
// handle, known, running, numerator, denominator | |||
type StatusHandler func(string, bool, bool, uint64, uint64) |
@@ -1,9 +1,9 @@ | |||
package client | |||
import ( | |||
"time" | |||
"strconv" | |||
"sync/atomic" | |||
"time" | |||
) | |||
var ( | |||
@@ -40,7 +40,7 @@ func TestPoolEcho(t *testing.T) { | |||
func TestPoolDoBg(t *testing.T) { | |||
addr, handle, err := pool.DoBg("ToUpper", | |||
[]byte("abcdef"), JOB_LOW); | |||
[]byte("abcdef"), JOB_LOW) | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
@@ -6,17 +6,20 @@ | |||
package client | |||
import ( | |||
"fmt" | |||
"bytes" | |||
"strconv" | |||
"encoding/binary" | |||
"fmt" | |||
"strconv" | |||
) | |||
// Response handler | |||
type ResponseHandler func(*Response) | |||
// response | |||
type Response struct { | |||
DataType uint32 | |||
Data, UID []byte | |||
Handle string | |||
DataType uint32 | |||
Data, UID []byte | |||
Handle string | |||
} | |||
// Extract the Response's result. | |||
@@ -1,5 +1,9 @@ | |||
package client | |||
// Status handler | |||
// handle, known, running, numerator, denominator | |||
type StatusHandler func(string, bool, bool, uint64, uint64) | |||
type Status struct { | |||
Handle string | |||
Known, Running bool | |||
@@ -1,54 +0,0 @@ | |||
// Copyright 2011 - 2012 Xing Xing <mikespook@gmail.com>. | |||
// All rights reserved. | |||
// Use of this source code is governed by a MIT | |||
// license that can be found in the LICENSE file. | |||
package common | |||
import ( | |||
"fmt" | |||
"bytes" | |||
"errors" | |||
"syscall" | |||
) | |||
var ( | |||
ErrJobTimeOut = errors.New("Do a job time out") | |||
ErrInvalidData = errors.New("Invalid data") | |||
ErrWorkWarning = errors.New("Work warning") | |||
ErrWorkFail = errors.New("Work fail") | |||
ErrWorkException = errors.New("Work exeption") | |||
ErrDataType = errors.New("Invalid data type") | |||
ErrOutOfCap = errors.New("Out of the capability") | |||
ErrNotConn = errors.New("Did not connect to job server") | |||
ErrFuncNotFound = errors.New("The function was not found") | |||
ErrConnection = errors.New("Connection error") | |||
ErrNoActiveAgent = errors.New("No active agent") | |||
ErrTimeOut = errors.New("Executing time out") | |||
ErrUnknown = errors.New("Unknown error") | |||
ErrConnClosed = errors.New("Connection closed") | |||
) | |||
func DisablePanic() {recover()} | |||
// Extract the error message | |||
func GetError(data []byte) (eno syscall.Errno, err error) { | |||
rel := bytes.SplitN(data, []byte{'\x00'}, 2) | |||
if len(rel) != 2 { | |||
err = Errorf("Not a error data: %V", data) | |||
return | |||
} | |||
l := len(rel[0]) | |||
eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) | |||
err = errors.New(string(rel[1])) | |||
return | |||
} | |||
// Get a formated error | |||
func Errorf(format string, msg ... interface{}) error { | |||
return errors.New(fmt.Sprintf(format, msg ... )) | |||
} | |||
// An error handler | |||
type ErrorHandler func(error) | |||
@@ -1,86 +0,0 @@ | |||
// Copyright 2011 - 2012 Xing Xing <mikespook@gmail.com>. | |||
// All rights reserved. | |||
// Use of this source code is governed by a MIT | |||
// license that can be found in the LICENSE file. | |||
package common | |||
import ( | |||
"bytes" | |||
"encoding/binary" | |||
) | |||
const ( | |||
NETWORK = "tcp" | |||
// queue size | |||
QUEUE_SIZE = 8 | |||
// read buffer size | |||
BUFFER_SIZE = 1024 | |||
// min packet length | |||
PACKET_LEN = 12 | |||
// \x00REQ | |||
REQ = 5391697 | |||
REQ_STR = "\x00REQ" | |||
// \x00RES | |||
RES = 5391699 | |||
RES_STR = "\x00RES" | |||
// package data type | |||
CAN_DO = 1 | |||
CANT_DO = 2 | |||
RESET_ABILITIES = 3 | |||
PRE_SLEEP = 4 | |||
NOOP = 6 | |||
JOB_CREATED = 8 | |||
GRAB_JOB = 9 | |||
NO_JOB = 10 | |||
JOB_ASSIGN = 11 | |||
WORK_STATUS = 12 | |||
WORK_COMPLETE = 13 | |||
WORK_FAIL = 14 | |||
GET_STATUS = 15 | |||
ECHO_REQ = 16 | |||
ECHO_RES = 17 | |||
ERROR = 19 | |||
STATUS_RES = 20 | |||
SET_CLIENT_ID = 22 | |||
CAN_DO_TIMEOUT = 23 | |||
WORK_EXCEPTION = 25 | |||
WORK_DATA = 28 | |||
WORK_WARNING = 29 | |||
GRAB_JOB_UNIQ = 30 | |||
JOB_ASSIGN_UNIQ = 31 | |||
SUBMIT_JOB = 7 | |||
SUBMIT_JOB_BG = 18 | |||
SUBMIT_JOB_HIGH = 21 | |||
SUBMIT_JOB_HIGH_BG = 32 | |||
SUBMIT_JOB_LOW = 33 | |||
SUBMIT_JOB_LOW_BG = 34 | |||
) | |||
// Decode [4]byte to uint32 | |||
func BytesToUint32(buf [4]byte) uint32 { | |||
var r uint32 | |||
b := bytes.NewBuffer(buf[:]) | |||
err := binary.Read(b, binary.BigEndian, &r) | |||
if err != nil { | |||
return 0 | |||
} | |||
return r | |||
} | |||
// Encode uint32 to [4]byte | |||
func Uint32ToBytes(i uint32) [4]byte { | |||
buf := new(bytes.Buffer) | |||
err := binary.Write(buf, binary.BigEndian, i) | |||
if err != nil { | |||
return [4]byte{0, 0, 0, 0} | |||
} | |||
var r [4]byte | |||
for k, v := range buf.Bytes() { | |||
r[k] = v | |||
} | |||
return r | |||
} |
@@ -1,48 +0,0 @@ | |||
package common | |||
import ( | |||
"bytes" | |||
"testing" | |||
) | |||
var ( | |||
testCase = map[uint32][4]byte { | |||
0: [...]byte{0, 0, 0, 0}, | |||
1: [...]byte{0, 0, 0, 1}, | |||
256: [...]byte{0, 0, 1, 0}, | |||
256 * 256: [...]byte{0, 1, 0, 0}, | |||
256 * 256 * 256: [...]byte{1, 0, 0, 0}, | |||
256 * 256 * 256 + 256 * 256 + 256 + 1: [...]byte{1, 1, 1, 1}, | |||
4294967295 : [...]byte{0xFF, 0xFF, 0xFF, 0xFF}, | |||
} | |||
) | |||
func TestUint32ToBytes(t *testing.T) { | |||
for k, v := range testCase { | |||
b := Uint32ToBytes(k) | |||
if bytes.Compare(b[:], v[:]) != 0 { | |||
t.Errorf("%v was expected, but %v was got", v, b) | |||
} | |||
} | |||
} | |||
func TestBytesToUint32s(t *testing.T) { | |||
for k, v := range testCase { | |||
u := BytesToUint32([4]byte(v)) | |||
if u != k { | |||
t.Errorf("%v was expected, but %v was got", k, u) | |||
} | |||
} | |||
} | |||
func BenchmarkByteToUnit32(b * testing.B) { | |||
for i := 0; i < b.N; i++ { | |||
BytesToUint32([4]byte{0xF, 0xF, 0xF, 0xF}); | |||
} | |||
} | |||
func BenchmarkUint32ToByte(b *testing.B) { | |||
for i := 0; i < b.N; i++ { | |||
Uint32ToBytes(123456); | |||
} | |||
} |
@@ -3,14 +3,14 @@ | |||
// license that can be found in the LICENSE file. | |||
/* | |||
This module is Gearman API for golang. | |||
This module is Gearman API for golang. | |||
The protocol was implemented by native way. | |||
*/ | |||
package gearman | |||
import ( | |||
_ "github.com/mikespook/gearman-go/common" | |||
_ "github.com/mikespook/gearman-go/client" | |||
_ "github.com/mikespook/gearman-go/worker" | |||
_ "github.com/mikespook/gearman-go/client" | |||
_ "github.com/mikespook/gearman-go/common" | |||
_ "github.com/mikespook/gearman-go/worker" | |||
) |
@@ -3,138 +3,137 @@ | |||
// license that can be found in the LICENSE file. | |||
/* | |||
This module is Gearman API for golang. | |||
This module is Gearman API for golang. | |||
The protocol was implemented by native way. | |||
*/ | |||
package gearman | |||
import ( | |||
"time" | |||
"sync" | |||
"testing" | |||
"strings" | |||
"github.com/mikespook/gearman-go/client" | |||
"github.com/mikespook/gearman-go/worker" | |||
"github.com/mikespook/gearman-go/client" | |||
"github.com/mikespook/gearman-go/worker" | |||
"strings" | |||
"sync" | |||
"testing" | |||
"time" | |||
) | |||
const( | |||
STR = "The gearman-go is a pure go implemented library." | |||
GEARMAND = "127.0.0.1:4730" | |||
const ( | |||
STR = "The gearman-go is a pure go implemented library." | |||
GEARMAND = "127.0.0.1:4730" | |||
) | |||
func ToUpper(job *worker.Job) ([]byte, error) { | |||
data := []byte(strings.ToUpper(string(job.Data))) | |||
return data, nil | |||
data := []byte(strings.ToUpper(string(job.Data))) | |||
return data, nil | |||
} | |||
func Sleep(job *worker.Job) ([]byte, error) { | |||
time.Sleep(time.Second * 5) | |||
return nil, nil | |||
time.Sleep(time.Second * 5) | |||
return nil, nil | |||
} | |||
func TestJobs(t *testing.T) { | |||
w := worker.New(worker.Unlimited) | |||
if err := w.AddServer(GEARMAND); err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
defer w.Close() | |||
w := worker.New(worker.Unlimited) | |||
if err := w.AddServer(GEARMAND); err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
defer w.Close() | |||
t.Log("Servers added...") | |||
if err := w.AddFunc("ToUpper", ToUpper, 0); err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
if err := w.AddFunc("Sleep", Sleep, 0); err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
if err := w.AddFunc("ToUpper", ToUpper, 0); err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
if err := w.AddFunc("Sleep", Sleep, 0); err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
t.Log("Functions added...") | |||
w.ErrHandler = func(e error) { | |||
t.Error(e) | |||
} | |||
go w.Work() | |||
w.ErrHandler = func(e error) { | |||
t.Error(e) | |||
} | |||
go w.Work() | |||
t.Log("Worker is running...") | |||
c, err := client.New("tcp4", GEARMAND) | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
defer c.Close() | |||
c.ErrorHandler = func(e error) { | |||
t.Log(e) | |||
} | |||
{ | |||
var w sync.WaitGroup | |||
jobHandler := func(job *client.Response) { | |||
upper := strings.ToUpper(STR) | |||
if (string(job.Data) != upper) { | |||
t.Errorf("%s expected, got %s", upper, job.Data) | |||
} | |||
w.Done() | |||
} | |||
w.Add(1) | |||
handle, err := c.Do("ToUpper", []byte(STR), client.JOB_NORMAL, jobHandler) | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
defer c.Close() | |||
c.ErrorHandler = func(e error) { | |||
t.Log(e) | |||
} | |||
{ | |||
var w sync.WaitGroup | |||
jobHandler := func(job *client.Response) { | |||
upper := strings.ToUpper(STR) | |||
if string(job.Data) != upper { | |||
t.Errorf("%s expected, got %s", upper, job.Data) | |||
} | |||
w.Done() | |||
} | |||
w.Add(1) | |||
handle, err := c.Do("ToUpper", []byte(STR), client.JOB_NORMAL, jobHandler) | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
w.Wait() | |||
status, err := c.Status(handle) | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
if status.Known { | |||
t.Errorf("%s shouldn't be known", status.Handle) | |||
return | |||
} | |||
if status.Running { | |||
t.Errorf("%s shouldn't be running", status.Handle) | |||
} | |||
} | |||
{ | |||
handle, err := c.DoBg("Sleep", nil, client.JOB_NORMAL) | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
w.Wait() | |||
status, err := c.Status(handle) | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
if status.Known { | |||
t.Errorf("%s shouldn't be known", status.Handle) | |||
return | |||
} | |||
if status.Running { | |||
t.Errorf("%s shouldn't be running", status.Handle) | |||
} | |||
} | |||
{ | |||
handle, err := c.DoBg("Sleep", nil, client.JOB_NORMAL) | |||
time.Sleep(time.Second) | |||
status, err := c.Status(handle) | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
time.Sleep(time.Second) | |||
status, err := c.Status(handle) | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
if !status.Known { | |||
t.Errorf("%s should be known", status.Handle) | |||
return | |||
} | |||
if !status.Running { | |||
t.Errorf("%s should be running", status.Handle) | |||
} | |||
} | |||
{ | |||
status, err := c.Status("not exists handle") | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
if status.Known { | |||
t.Errorf("%s shouldn't be known", status.Handle) | |||
return | |||
} | |||
if status.Running { | |||
t.Errorf("%s shouldn't be running", status.Handle) | |||
} | |||
} | |||
if !status.Known { | |||
t.Errorf("%s should be known", status.Handle) | |||
return | |||
} | |||
if !status.Running { | |||
t.Errorf("%s should be running", status.Handle) | |||
} | |||
} | |||
{ | |||
status, err := c.Status("not exists handle") | |||
if err != nil { | |||
t.Error(err) | |||
return | |||
} | |||
if status.Known { | |||
t.Errorf("%s shouldn't be known", status.Handle) | |||
return | |||
} | |||
if status.Running { | |||
t.Errorf("%s shouldn't be running", status.Handle) | |||
} | |||
} | |||
} |
@@ -5,199 +5,198 @@ | |||
package worker | |||
import ( | |||
"io" | |||
"net" | |||
"github.com/mikespook/gearman-go/common" | |||
"io" | |||
"net" | |||
) | |||
// The agent of job server. | |||
type agent struct { | |||
conn net.Conn | |||
worker *Worker | |||
in chan []byte | |||
out chan *Job | |||
addr string | |||
conn net.Conn | |||
worker *Worker | |||
in chan []byte | |||
out chan *Job | |||
addr string | |||
} | |||
// Create the agent of job server. | |||
func newAgent(addr string, worker *Worker) (a *agent, err error) { | |||
conn, err := net.Dial(common.NETWORK, addr) | |||
if err != nil { | |||
return | |||
} | |||
a = &agent{ | |||
conn: conn, | |||
worker: worker, | |||
addr: addr, | |||
in: make(chan []byte, common.QUEUE_SIZE), | |||
out: make(chan *Job, common.QUEUE_SIZE), | |||
} | |||
// reset abilities | |||
a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil)) | |||
return | |||
conn, err := net.Dial(common.NETWORK, addr) | |||
if err != nil { | |||
return | |||
} | |||
a = &agent{ | |||
conn: conn, | |||
worker: worker, | |||
addr: addr, | |||
in: make(chan []byte, common.QUEUE_SIZE), | |||
out: make(chan *Job, common.QUEUE_SIZE), | |||
} | |||
// reset abilities | |||
a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil)) | |||
return | |||
} | |||
// outputing loop | |||
func (a *agent) outLoop() { | |||
ok := true | |||
var job *Job | |||
for a.worker.running && ok { | |||
if job, ok = <-a.out; ok { | |||
if err := a.write(job.Encode()); err != nil { | |||
a.worker.err(err) | |||
} | |||
} | |||
} | |||
ok := true | |||
var job *Job | |||
for a.worker.running && ok { | |||
if job, ok = <-a.out; ok { | |||
if err := a.write(job.Encode()); err != nil { | |||
a.worker.err(err) | |||
} | |||
} | |||
} | |||
} | |||
// inputing loop | |||
func (a *agent) inLoop() { | |||
defer func() { | |||
if r := recover(); r != nil { | |||
a.worker.err(common.Errorf("Exiting: %s", r)) | |||
} | |||
close(a.in) | |||
close(a.out) | |||
a.worker.removeAgent(a) | |||
}() | |||
for a.worker.running { | |||
a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) | |||
RESTART: | |||
// got noop msg and in queue is zero, grab job | |||
rel, err := a.read() | |||
if err != nil { | |||
if err == common.ErrConnection { | |||
for i := 0; i < 3 && a.worker.running; i++ { | |||
if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { | |||
a.worker.err(common.Errorf("Reconnection: %d faild", i)) | |||
continue | |||
} else { | |||
a.conn = conn | |||
goto RESTART | |||
} | |||
} | |||
a.worker.err(err) | |||
break | |||
} | |||
a.worker.err(err) | |||
continue | |||
} | |||
job, err := decodeJob(rel) | |||
if err != nil { | |||
a.worker.err(err) | |||
continue | |||
} | |||
switch job.DataType { | |||
case common.NOOP: | |||
a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil)) | |||
case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: | |||
if a.worker.running { | |||
if a.worker.limit != nil { | |||
a.worker.limit <- true | |||
} | |||
job.agent = a | |||
a.worker.in <- job | |||
} | |||
} | |||
} | |||
defer func() { | |||
if r := recover(); r != nil { | |||
a.worker.err(common.Errorf("Exiting: %s", r)) | |||
} | |||
close(a.in) | |||
close(a.out) | |||
a.worker.removeAgent(a) | |||
}() | |||
for a.worker.running { | |||
a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) | |||
RESTART: | |||
// got noop msg and in queue is zero, grab job | |||
rel, err := a.read() | |||
if err != nil { | |||
if err == common.ErrConnection { | |||
for i := 0; i < 3 && a.worker.running; i++ { | |||
if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { | |||
a.worker.err(common.Errorf("Reconnection: %d faild", i)) | |||
continue | |||
} else { | |||
a.conn = conn | |||
goto RESTART | |||
} | |||
} | |||
a.worker.err(err) | |||
break | |||
} | |||
a.worker.err(err) | |||
continue | |||
} | |||
job, err := decodeJob(rel) | |||
if err != nil { | |||
a.worker.err(err) | |||
continue | |||
} | |||
switch job.DataType { | |||
case common.NOOP: | |||
a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil)) | |||
case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: | |||
if a.worker.running { | |||
if a.worker.limit != nil { | |||
a.worker.limit <- true | |||
} | |||
job.agent = a | |||
a.worker.in <- job | |||
} | |||
} | |||
} | |||
} | |||
func (a *agent) Close() { | |||
a.conn.Close() | |||
a.conn.Close() | |||
} | |||
func (a *agent) Work() { | |||
go a.outLoop() | |||
go a.inLoop() | |||
go a.outLoop() | |||
go a.inLoop() | |||
} | |||
func (a *agent) readData(length int) (data []byte, err error) { | |||
n := 0 | |||
buf := make([]byte, common.BUFFER_SIZE) | |||
// read until data can be unpacked | |||
for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n { | |||
if n, err = a.conn.Read(buf); err != nil { | |||
if err == io.EOF && n == 0 { | |||
if data == nil { | |||
err = common.ErrConnection | |||
return | |||
} | |||
return data, nil | |||
} | |||
return | |||
} | |||
data = append(data, buf[0:n]...) | |||
if n < common.BUFFER_SIZE { | |||
break | |||
} | |||
} | |||
return | |||
n := 0 | |||
buf := make([]byte, common.BUFFER_SIZE) | |||
// read until data can be unpacked | |||
for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n { | |||
if n, err = a.conn.Read(buf); err != nil { | |||
if err == io.EOF && n == 0 { | |||
if data == nil { | |||
err = common.ErrConnection | |||
return | |||
} | |||
return data, nil | |||
} | |||
return | |||
} | |||
data = append(data, buf[0:n]...) | |||
if n < common.BUFFER_SIZE { | |||
break | |||
} | |||
} | |||
return | |||
} | |||
func (a *agent) unpack(data []byte) ([]byte, int, bool) { | |||
tl := len(data) | |||
start := 0 | |||
for i := 0; i < tl+1-common.PACKET_LEN; i++ { | |||
if start+common.PACKET_LEN > tl { // too few data to unpack, read more | |||
return nil, common.PACKET_LEN, false | |||
} | |||
if string(data[start:start+4]) == common.RES_STR { | |||
l := int(common.BytesToUint32([4]byte{data[start+8], | |||
data[start+9], data[start+10], data[start+11]})) | |||
total := l + common.PACKET_LEN | |||
if total == tl { // data is what we want | |||
return data, common.PACKET_LEN, true | |||
} else if total < tl { // data[:total] is what we want, data[total:] is the more | |||
a.in <- data[total:] | |||
data = data[start:total] | |||
return data, common.PACKET_LEN, true | |||
} else { // ops! It won't be possible. | |||
return nil, total - tl, false | |||
} | |||
} else { // flag was not found, move to next step | |||
start++ | |||
} | |||
} | |||
return nil, common.PACKET_LEN, false | |||
tl := len(data) | |||
start := 0 | |||
for i := 0; i < tl+1-common.PACKET_LEN; i++ { | |||
if start+common.PACKET_LEN > tl { // too few data to unpack, read more | |||
return nil, common.PACKET_LEN, false | |||
} | |||
if string(data[start:start+4]) == common.RES_STR { | |||
l := int(common.BytesToUint32([4]byte{data[start+8], | |||
data[start+9], data[start+10], data[start+11]})) | |||
total := l + common.PACKET_LEN | |||
if total == tl { // data is what we want | |||
return data, common.PACKET_LEN, true | |||
} else if total < tl { // data[:total] is what we want, data[total:] is the more | |||
a.in <- data[total:] | |||
data = data[start:total] | |||
return data, common.PACKET_LEN, true | |||
} else { // ops! It won't be possible. | |||
return nil, total - tl, false | |||
} | |||
} else { // flag was not found, move to next step | |||
start++ | |||
} | |||
} | |||
return nil, common.PACKET_LEN, false | |||
} | |||
func (a *agent) read() (rel []byte, err error) { | |||
var data []byte | |||
ok := false | |||
l := common.PACKET_LEN | |||
for !ok { | |||
inlen := len(a.in) | |||
if inlen > 0 { | |||
// in queue is not empty | |||
for i := 0; i < inlen; i++ { | |||
data = append(data, <-a.in...) | |||
} | |||
} else { | |||
var d []byte | |||
d, err = a.readData(l) | |||
if err != nil { | |||
return | |||
} | |||
data = append(data, d...) | |||
} | |||
rel, l, ok = a.unpack(data) | |||
} | |||
return | |||
var data []byte | |||
ok := false | |||
l := common.PACKET_LEN | |||
for !ok { | |||
inlen := len(a.in) | |||
if inlen > 0 { | |||
// in queue is not empty | |||
for i := 0; i < inlen; i++ { | |||
data = append(data, <-a.in...) | |||
} | |||
} else { | |||
var d []byte | |||
d, err = a.readData(l) | |||
if err != nil { | |||
return | |||
} | |||
data = append(data, d...) | |||
} | |||
rel, l, ok = a.unpack(data) | |||
} | |||
return | |||
} | |||
// Send a job to the job server. | |||
func (a *agent) WriteJob(job *Job) { | |||
a.out <- job | |||
a.out <- job | |||
} | |||
// Internal write the encoded job. | |||
func (a *agent) write(buf []byte) (err error) { | |||
var n int | |||
for i := 0; i < len(buf); i += n { | |||
n, err = a.conn.Write(buf[i:]) | |||
if err != nil { | |||
return err | |||
} | |||
} | |||
return | |||
var n int | |||
for i := 0; i < len(buf); i += n { | |||
n, err = a.conn.Write(buf[i:]) | |||
if err != nil { | |||
return err | |||
} | |||
} | |||
return | |||
} |
@@ -0,0 +1,56 @@ | |||
// Copyright 2011 - 2012 Xing Xing <mikespook@gmail.com>. | |||
// All rights reserved. | |||
// Use of this source code is governed by a MIT | |||
// license that can be found in the LICENSE file. | |||
package worker | |||
const ( | |||
NETWORK = "tcp" | |||
// queue size | |||
QUEUE_SIZE = 8 | |||
// read buffer size | |||
BUFFER_SIZE = 1024 | |||
// min packet length | |||
MIN_PACKET_LEN = 12 | |||
// \x00REQ | |||
REQ = 5391697 | |||
REQ_STR = "\x00REQ" | |||
// \x00RES | |||
RES = 5391699 | |||
RES_STR = "\x00RES" | |||
// package data type | |||
CAN_DO = 1 | |||
CANT_DO = 2 | |||
RESET_ABILITIES = 3 | |||
PRE_SLEEP = 4 | |||
NOOP = 6 | |||
JOB_CREATED = 8 | |||
GRAB_JOB = 9 | |||
NO_JOB = 10 | |||
JOB_ASSIGN = 11 | |||
WORK_STATUS = 12 | |||
WORK_COMPLETE = 13 | |||
WORK_FAIL = 14 | |||
GET_STATUS = 15 | |||
ECHO_REQ = 16 | |||
ECHO_RES = 17 | |||
ERROR = 19 | |||
STATUS_RES = 20 | |||
SET_CLIENT_ID = 22 | |||
CAN_DO_TIMEOUT = 23 | |||
WORK_EXCEPTION = 25 | |||
WORK_DATA = 28 | |||
WORK_WARNING = 29 | |||
GRAB_JOB_UNIQ = 30 | |||
JOB_ASSIGN_UNIQ = 31 | |||
SUBMIT_JOB = 7 | |||
SUBMIT_JOB_BG = 18 | |||
SUBMIT_JOB_HIGH = 21 | |||
SUBMIT_JOB_HIGH_BG = 32 | |||
SUBMIT_JOB_LOW = 33 | |||
SUBMIT_JOB_LOW_BG = 34 | |||
) |
@@ -0,0 +1,45 @@ | |||
// Copyright 2011 - 2012 Xing Xing <mikespook@gmail.com>. | |||
// All rights reserved. | |||
// Use of this source code is governed by a MIT | |||
// license that can be found in the LICENSE file. | |||
package worker | |||
import ( | |||
"bytes" | |||
"errors" | |||
"fmt" | |||
) | |||
var ( | |||
ErrJobTimeOut = errors.New("Do a job time out") | |||
ErrInvalidData = errors.New("Invalid data") | |||
ErrWorkWarning = errors.New("Work warning") | |||
ErrWorkFail = errors.New("Work fail") | |||
ErrWorkException = errors.New("Work exeption") | |||
ErrDataType = errors.New("Invalid data type") | |||
ErrOutOfCap = errors.New("Out of the capability") | |||
ErrNotConn = errors.New("Did not connect to job server") | |||
ErrFuncNotFound = errors.New("The function was not found") | |||
ErrConnection = errors.New("Connection error") | |||
ErrNoActiveAgent = errors.New("No active agent") | |||
ErrTimeOut = errors.New("Executing time out") | |||
ErrUnknown = errors.New("Unknown error") | |||
ErrConnClosed = errors.New("Connection closed") | |||
) | |||
func DisablePanic() { recover() } | |||
// Extract the error message | |||
func GetError(data []byte) (err error) { | |||
rel := bytes.SplitN(data, []byte{'\x00'}, 2) | |||
if len(rel) != 2 { | |||
err = fmt.Errorf("Not a error data: %V", data) | |||
return | |||
} | |||
err = errors.New(fmt.Sprintf("%s: %s", rel[0], rel[1])) | |||
return | |||
} | |||
// An error handler | |||
type ErrorHandler func(error) |
@@ -1,31 +1,31 @@ | |||
package worker | |||
import ( | |||
"runtime" | |||
"encoding/json" | |||
"encoding/json" | |||
"runtime" | |||
) | |||
type systemInfo struct { | |||
GOOS, GOARCH, GOROOT, Version string | |||
NumCPU, NumGoroutine int | |||
NumCgoCall int64 | |||
GOOS, GOARCH, GOROOT, Version string | |||
NumCPU, NumGoroutine int | |||
NumCgoCall int64 | |||
} | |||
func SysInfo(job *Job) ([]byte, error) { | |||
return json.Marshal(&systemInfo{ | |||
GOOS: runtime.GOOS, | |||
GOARCH: runtime.GOARCH, | |||
GOROOT: runtime.GOROOT(), | |||
Version: runtime.Version(), | |||
NumCPU: runtime.NumCPU(), | |||
NumGoroutine: runtime.NumGoroutine(), | |||
NumCgoCall: runtime.NumCgoCall(), | |||
}) | |||
return json.Marshal(&systemInfo{ | |||
GOOS: runtime.GOOS, | |||
GOARCH: runtime.GOARCH, | |||
GOROOT: runtime.GOROOT(), | |||
Version: runtime.Version(), | |||
NumCPU: runtime.NumCPU(), | |||
NumGoroutine: runtime.NumGoroutine(), | |||
NumCgoCall: runtime.NumCgoCall(), | |||
}) | |||
} | |||
var memState runtime.MemStats | |||
func MemInfo(job *Job) ([]byte, error) { | |||
runtime.ReadMemStats(&memState) | |||
return json.Marshal(&memState) | |||
runtime.ReadMemStats(&memState) | |||
return json.Marshal(&memState) | |||
} |
@@ -1,134 +0,0 @@ | |||
// Copyright 2011 Xing Xing <mikespook@gmail.com> | |||
// All rights reserved. | |||
// Use of this source code is governed by a MIT | |||
// license that can be found in the LICENSE file. | |||
package worker | |||
import ( | |||
"bytes" | |||
"strconv" | |||
"github.com/mikespook/gearman-go/common" | |||
) | |||
// Worker side job | |||
type Job struct { | |||
Data []byte | |||
Handle, UniqueId, Fn string | |||
agent *agent | |||
magicCode, DataType uint32 | |||
c chan bool | |||
} | |||
// Create a new job | |||
func newJob(magiccode, datatype uint32, data []byte) (job *Job) { | |||
return &Job{magicCode: magiccode, | |||
DataType: datatype, | |||
Data: data, | |||
c: make(chan bool),} | |||
} | |||
// Decode job from byte slice | |||
func decodeJob(data []byte) (job *Job, err error) { | |||
if len(data) < 12 { | |||
return nil, common.Errorf("Invalid data: %V", data) | |||
} | |||
datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) | |||
l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) | |||
if len(data[12:]) != int(l) { | |||
return nil, common.Errorf("Invalid data: %V", data) | |||
} | |||
data = data[12:] | |||
job = &Job{magicCode: common.RES, DataType: datatype, c: make(chan bool),} | |||
switch datatype { | |||
case common.JOB_ASSIGN: | |||
s := bytes.SplitN(data, []byte{'\x00'}, 3) | |||
if len(s) == 3 { | |||
job.Handle = string(s[0]) | |||
job.Fn = string(s[1]) | |||
data = s[2] | |||
} | |||
case common.JOB_ASSIGN_UNIQ: | |||
s := bytes.SplitN(data, []byte{'\x00'}, 4) | |||
if len(s) == 4 { | |||
job.Handle = string(s[0]) | |||
job.Fn = string(s[1]) | |||
job.UniqueId = string(s[2]) | |||
data = s[3] | |||
} | |||
} | |||
job.Data = data | |||
return | |||
} | |||
// Encode a job to byte slice | |||
func (job *Job) Encode() (data []byte) { | |||
var l int | |||
if job.DataType == common.WORK_FAIL { | |||
l = len(job.Handle) | |||
} else { | |||
l = len(job.Data) | |||
if job.Handle != "" { | |||
l += len(job.Handle) + 1 | |||
} | |||
} | |||
data = make([]byte, 0, l + 12) | |||
magiccode := common.Uint32ToBytes(job.magicCode) | |||
datatype := common.Uint32ToBytes(job.DataType) | |||
datalength := common.Uint32ToBytes(uint32(l)) | |||
data = append(data, magiccode[:]...) | |||
data = append(data, datatype[:]...) | |||
data = append(data, datalength[:]...) | |||
if job.Handle != "" { | |||
data = append(data, []byte(job.Handle)...) | |||
if job.DataType != common.WORK_FAIL { | |||
data = append(data, 0) | |||
} | |||
} | |||
data = append(data, job.Data...) | |||
return | |||
} | |||
// Send some datas to client. | |||
// Using this in a job's executing. | |||
func (job *Job) UpdateData(data []byte, iswarning bool) { | |||
result := append([]byte(job.Handle), 0) | |||
result = append(result, data...) | |||
var datatype uint32 | |||
if iswarning { | |||
datatype = common.WORK_WARNING | |||
} else { | |||
datatype = common.WORK_DATA | |||
} | |||
job.agent.WriteJob(newJob(common.REQ, datatype, result)) | |||
} | |||
// Update status. | |||
// Tall client how many percent job has been executed. | |||
func (job *Job) UpdateStatus(numerator, denominator int) { | |||
n := []byte(strconv.Itoa(numerator)) | |||
d := []byte(strconv.Itoa(denominator)) | |||
result := append([]byte(job.Handle), '\x00') | |||
result = append(result, n...) | |||
result = append(result, '\x00') | |||
result = append(result, d...) | |||
job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) | |||
} | |||
// close the job | |||
func (job *Job) Close() { | |||
close(job.c) | |||
} | |||
// cancel the job executing | |||
func (job *Job) cancel() { | |||
defer func() {recover()}() | |||
job.c <- true | |||
} | |||
// When a job was canceled, return a true form a channel | |||
func (job *Job) Canceled() <-chan bool { | |||
return job.c | |||
} |
@@ -0,0 +1,133 @@ | |||
// Copyright 2011 Xing Xing <mikespook@gmail.com> | |||
// All rights reserved. | |||
// Use of this source code is governed by a MIT | |||
// license that can be found in the LICENSE file. | |||
package worker | |||
import ( | |||
"bytes" | |||
"strconv" | |||
) | |||
// Worker side job | |||
type Job struct { | |||
Data []byte | |||
Handle, UniqueId, Fn string | |||
agent *agent | |||
magicCode, DataType uint32 | |||
c chan bool | |||
} | |||
// Create a new job | |||
func newJob(magiccode, datatype uint32, data []byte) (job *Job) { | |||
return &Job{magicCode: magiccode, | |||
DataType: datatype, | |||
Data: data, | |||
c: make(chan bool)} | |||
} | |||
// Decode job from byte slice | |||
func decodeJob(data []byte) (job *Job, err error) { | |||
if len(data) < 12 { | |||
return nil, common.Errorf("Invalid data: %V", data) | |||
} | |||
datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) | |||
l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) | |||
if len(data[12:]) != int(l) { | |||
return nil, common.Errorf("Invalid data: %V", data) | |||
} | |||
data = data[12:] | |||
job = &Job{magicCode: common.RES, DataType: datatype, c: make(chan bool)} | |||
switch datatype { | |||
case common.JOB_ASSIGN: | |||
s := bytes.SplitN(data, []byte{'\x00'}, 3) | |||
if len(s) == 3 { | |||
job.Handle = string(s[0]) | |||
job.Fn = string(s[1]) | |||
data = s[2] | |||
} | |||
case common.JOB_ASSIGN_UNIQ: | |||
s := bytes.SplitN(data, []byte{'\x00'}, 4) | |||
if len(s) == 4 { | |||
job.Handle = string(s[0]) | |||
job.Fn = string(s[1]) | |||
job.UniqueId = string(s[2]) | |||
data = s[3] | |||
} | |||
} | |||
job.Data = data | |||
return | |||
} | |||
// Encode a job to byte slice | |||
func (job *Job) Encode() (data []byte) { | |||
var l int | |||
if job.DataType == common.WORK_FAIL { | |||
l = len(job.Handle) | |||
} else { | |||
l = len(job.Data) | |||
if job.Handle != "" { | |||
l += len(job.Handle) + 1 | |||
} | |||
} | |||
data = make([]byte, 0, l+12) | |||
magiccode := common.Uint32ToBytes(job.magicCode) | |||
datatype := common.Uint32ToBytes(job.DataType) | |||
datalength := common.Uint32ToBytes(uint32(l)) | |||
data = append(data, magiccode[:]...) | |||
data = append(data, datatype[:]...) | |||
data = append(data, datalength[:]...) | |||
if job.Handle != "" { | |||
data = append(data, []byte(job.Handle)...) | |||
if job.DataType != common.WORK_FAIL { | |||
data = append(data, 0) | |||
} | |||
} | |||
data = append(data, job.Data...) | |||
return | |||
} | |||
// Send some datas to client. | |||
// Using this in a job's executing. | |||
func (job *Job) UpdateData(data []byte, iswarning bool) { | |||
result := append([]byte(job.Handle), 0) | |||
result = append(result, data...) | |||
var datatype uint32 | |||
if iswarning { | |||
datatype = common.WORK_WARNING | |||
} else { | |||
datatype = common.WORK_DATA | |||
} | |||
job.agent.WriteJob(newJob(common.REQ, datatype, result)) | |||
} | |||
// Update status. | |||
// Tall client how many percent job has been executed. | |||
func (job *Job) UpdateStatus(numerator, denominator int) { | |||
n := []byte(strconv.Itoa(numerator)) | |||
d := []byte(strconv.Itoa(denominator)) | |||
result := append([]byte(job.Handle), '\x00') | |||
result = append(result, n...) | |||
result = append(result, '\x00') | |||
result = append(result, d...) | |||
job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) | |||
} | |||
// close the job | |||
func (job *Job) Close() { | |||
close(job.c) | |||
} | |||
// cancel the job executing | |||
func (job *Job) cancel() { | |||
defer func() { recover() }() | |||
job.c <- true | |||
} | |||
// When a job was canceled, return a true form a channel | |||
func (job *Job) Canceled() <-chan bool { | |||
return job.c | |||
} |
@@ -0,0 +1,133 @@ | |||
// Copyright 2011 Xing Xing <mikespook@gmail.com> | |||
// All rights reserved. | |||
// Use of this source code is governed by a MIT | |||
// license that can be found in the LICENSE file. | |||
package worker | |||
import ( | |||
"bytes" | |||
"strconv" | |||
) | |||
// Worker side job | |||
type Job struct { | |||
Data []byte | |||
Handle, UniqueId, Fn string | |||
agent *agent | |||
magicCode, DataType uint32 | |||
c chan bool | |||
} | |||
// Create a new job | |||
func newJob(magiccode, datatype uint32, data []byte) (job *Job) { | |||
return &Job{magicCode: magiccode, | |||
DataType: datatype, | |||
Data: data, | |||
c: make(chan bool)} | |||
} | |||
// Decode job from byte slice | |||
func decodeJob(data []byte) (job *Job, err error) { | |||
if len(data) < 12 { | |||
return nil, common.Errorf("Invalid data: %V", data) | |||
} | |||
datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) | |||
l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) | |||
if len(data[12:]) != int(l) { | |||
return nil, common.Errorf("Invalid data: %V", data) | |||
} | |||
data = data[12:] | |||
job = &Job{magicCode: common.RES, DataType: datatype, c: make(chan bool)} | |||
switch datatype { | |||
case common.JOB_ASSIGN: | |||
s := bytes.SplitN(data, []byte{'\x00'}, 3) | |||
if len(s) == 3 { | |||
job.Handle = string(s[0]) | |||
job.Fn = string(s[1]) | |||
data = s[2] | |||
} | |||
case common.JOB_ASSIGN_UNIQ: | |||
s := bytes.SplitN(data, []byte{'\x00'}, 4) | |||
if len(s) == 4 { | |||
job.Handle = string(s[0]) | |||
job.Fn = string(s[1]) | |||
job.UniqueId = string(s[2]) | |||
data = s[3] | |||
} | |||
} | |||
job.Data = data | |||
return | |||
} | |||
// Encode a job to byte slice | |||
func (job *Job) Encode() (data []byte) { | |||
var l int | |||
if job.DataType == common.WORK_FAIL { | |||
l = len(job.Handle) | |||
} else { | |||
l = len(job.Data) | |||
if job.Handle != "" { | |||
l += len(job.Handle) + 1 | |||
} | |||
} | |||
data = make([]byte, 0, l+12) | |||
magiccode := common.Uint32ToBytes(job.magicCode) | |||
datatype := common.Uint32ToBytes(job.DataType) | |||
datalength := common.Uint32ToBytes(uint32(l)) | |||
data = append(data, magiccode[:]...) | |||
data = append(data, datatype[:]...) | |||
data = append(data, datalength[:]...) | |||
if job.Handle != "" { | |||
data = append(data, []byte(job.Handle)...) | |||
if job.DataType != common.WORK_FAIL { | |||
data = append(data, 0) | |||
} | |||
} | |||
data = append(data, job.Data...) | |||
return | |||
} | |||
// Send some datas to client. | |||
// Using this in a job's executing. | |||
func (job *Job) UpdateData(data []byte, iswarning bool) { | |||
result := append([]byte(job.Handle), 0) | |||
result = append(result, data...) | |||
var datatype uint32 | |||
if iswarning { | |||
datatype = common.WORK_WARNING | |||
} else { | |||
datatype = common.WORK_DATA | |||
} | |||
job.agent.WriteJob(newJob(common.REQ, datatype, result)) | |||
} | |||
// Update status. | |||
// Tall client how many percent job has been executed. | |||
func (job *Job) UpdateStatus(numerator, denominator int) { | |||
n := []byte(strconv.Itoa(numerator)) | |||
d := []byte(strconv.Itoa(denominator)) | |||
result := append([]byte(job.Handle), '\x00') | |||
result = append(result, n...) | |||
result = append(result, '\x00') | |||
result = append(result, d...) | |||
job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) | |||
} | |||
// close the job | |||
func (job *Job) Close() { | |||
close(job.c) | |||
} | |||
// cancel the job executing | |||
func (job *Job) cancel() { | |||
defer func() { recover() }() | |||
job.c <- true | |||
} | |||
// When a job was canceled, return a true form a channel | |||
func (job *Job) Canceled() <-chan bool { | |||
return job.c | |||
} |
@@ -5,20 +5,20 @@ | |||
package worker | |||
import ( | |||
"time" | |||
"github.com/mikespook/gearman-go/common" | |||
"time" | |||
) | |||
const ( | |||
Unlimited = 0 | |||
OneByOne = 1 | |||
Unlimited = 0 | |||
OneByOne = 1 | |||
Immediately = 0 | |||
Immediately = 0 | |||
) | |||
var ( | |||
ErrConnection = common.ErrConnection | |||
ErrConnection = common.ErrConnection | |||
) | |||
// Job handler | |||
type JobHandler func(*Job) error | |||
@@ -26,8 +26,8 @@ type JobFunc func(*Job) ([]byte, error) | |||
// The definition of the callback function. | |||
type jobFunc struct { | |||
f JobFunc | |||
timeout uint32 | |||
f JobFunc | |||
timeout uint32 | |||
} | |||
// Map for added function. | |||
@@ -52,266 +52,266 @@ func foobar(job *Job) (data []byte, err os.Error) { | |||
} | |||
*/ | |||
type Worker struct { | |||
agents []*agent | |||
funcs JobFuncs | |||
in chan *Job | |||
running bool | |||
limit chan bool | |||
Id string | |||
// assign a ErrFunc to handle errors | |||
ErrHandler common.ErrorHandler | |||
JobHandler JobHandler | |||
agents []*agent | |||
funcs JobFuncs | |||
in chan *Job | |||
running bool | |||
limit chan bool | |||
Id string | |||
// assign a ErrFunc to handle errors | |||
ErrHandler common.ErrorHandler | |||
JobHandler JobHandler | |||
} | |||
// Get a new worker | |||
func New(l int) (worker *Worker) { | |||
worker = &Worker{ | |||
agents: make([]*agent, 0), | |||
funcs: make(JobFuncs), | |||
in: make(chan *Job, common.QUEUE_SIZE), | |||
} | |||
if l != Unlimited { | |||
worker.limit = make(chan bool, l) | |||
} | |||
return | |||
worker = &Worker{ | |||
agents: make([]*agent, 0), | |||
funcs: make(JobFuncs), | |||
in: make(chan *Job, common.QUEUE_SIZE), | |||
} | |||
if l != Unlimited { | |||
worker.limit = make(chan bool, l) | |||
} | |||
return | |||
} | |||
// | |||
func (worker *Worker)err(e error) { | |||
if worker.ErrHandler != nil { | |||
worker.ErrHandler(e) | |||
} | |||
// | |||
func (worker *Worker) err(e error) { | |||
if worker.ErrHandler != nil { | |||
worker.ErrHandler(e) | |||
} | |||
} | |||
// Add a server. The addr should be 'host:port' format. | |||
// The connection is established at this time. | |||
func (worker *Worker) AddServer(addr string) (err error) { | |||
// Create a new job server's client as a agent of server | |||
server, err := newAgent(addr, worker) | |||
if err != nil { | |||
return err | |||
} | |||
worker.agents = append(worker.agents, server) | |||
return | |||
// Create a new job server's client as a agent of server | |||
server, err := newAgent(addr, worker) | |||
if err != nil { | |||
return err | |||
} | |||
worker.agents = append(worker.agents, server) | |||
return | |||
} | |||
// Write a job to job server. | |||
// Here, the job's mean is not the oraginal mean. | |||
// Just looks like a network package for job's result or tell job server, there was a fail. | |||
func (worker *Worker) broadcast(job *Job) { | |||
for _, v := range worker.agents { | |||
v.WriteJob(job) | |||
} | |||
for _, v := range worker.agents { | |||
v.WriteJob(job) | |||
} | |||
} | |||
// Add a function. | |||
// Plz added job servers first, then functions. | |||
// The API will tell every connected job server that 'I can do this' | |||
func (worker *Worker) AddFunc(funcname string, | |||
f JobFunc, timeout uint32) (err error) { | |||
if _, ok := worker.funcs[funcname]; ok { | |||
return common.Errorf("The function already exists: %s", funcname) | |||
} | |||
worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout} | |||
if worker.running { | |||
worker.addFunc(funcname, timeout) | |||
} | |||
return | |||
f JobFunc, timeout uint32) (err error) { | |||
if _, ok := worker.funcs[funcname]; ok { | |||
return common.Errorf("The function already exists: %s", funcname) | |||
} | |||
worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout} | |||
if worker.running { | |||
worker.addFunc(funcname, timeout) | |||
} | |||
return | |||
} | |||
// inner add function | |||
func (worker *Worker) addFunc(funcname string, timeout uint32) { | |||
var datatype uint32 | |||
var data []byte | |||
if timeout == 0 { | |||
datatype = common.CAN_DO | |||
data = []byte(funcname) | |||
} else { | |||
datatype = common.CAN_DO_TIMEOUT | |||
data = []byte(funcname + "\x00") | |||
t := common.Uint32ToBytes(timeout) | |||
data = append(data, t[:]...) | |||
} | |||
job := newJob(common.REQ, datatype, data) | |||
worker.broadcast(job) | |||
var datatype uint32 | |||
var data []byte | |||
if timeout == 0 { | |||
datatype = common.CAN_DO | |||
data = []byte(funcname) | |||
} else { | |||
datatype = common.CAN_DO_TIMEOUT | |||
data = []byte(funcname + "\x00") | |||
t := common.Uint32ToBytes(timeout) | |||
data = append(data, t[:]...) | |||
} | |||
job := newJob(common.REQ, datatype, data) | |||
worker.broadcast(job) | |||
} | |||
// Remove a function. | |||
// Tell job servers 'I can not do this now' at the same time. | |||
func (worker *Worker) RemoveFunc(funcname string) (err error) { | |||
if _, ok := worker.funcs[funcname]; !ok { | |||
return common.Errorf("The function does not exist: %s", funcname) | |||
} | |||
delete(worker.funcs, funcname) | |||
if worker.running { | |||
worker.removeFunc(funcname) | |||
} | |||
return | |||
if _, ok := worker.funcs[funcname]; !ok { | |||
return common.Errorf("The function does not exist: %s", funcname) | |||
} | |||
delete(worker.funcs, funcname) | |||
if worker.running { | |||
worker.removeFunc(funcname) | |||
} | |||
return | |||
} | |||
// inner remove function | |||
func (worker *Worker) removeFunc(funcname string) { | |||
job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) | |||
worker.broadcast(job) | |||
job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) | |||
worker.broadcast(job) | |||
} | |||
func (worker *Worker) dealJob(job *Job) { | |||
defer func() { | |||
job.Close() | |||
if worker.running && worker.limit != nil { | |||
<-worker.limit | |||
} | |||
}() | |||
switch job.DataType { | |||
case common.ERROR: | |||
_, err := common.GetError(job.Data) | |||
worker.err(err) | |||
case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: | |||
if err := worker.exec(job); err != nil { | |||
worker.err(err) | |||
} | |||
default: | |||
worker.handleJob(job) | |||
} | |||
defer func() { | |||
job.Close() | |||
if worker.running && worker.limit != nil { | |||
<-worker.limit | |||
} | |||
}() | |||
switch job.DataType { | |||
case common.ERROR: | |||
_, err := common.GetError(job.Data) | |||
worker.err(err) | |||
case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: | |||
if err := worker.exec(job); err != nil { | |||
worker.err(err) | |||
} | |||
default: | |||
worker.handleJob(job) | |||
} | |||
} | |||
// Main loop | |||
func (worker *Worker) Work() { | |||
defer func() { | |||
for _, v := range worker.agents { | |||
v.Close() | |||
} | |||
}() | |||
worker.running = true | |||
for _, v := range worker.agents { | |||
go v.Work() | |||
} | |||
for funcname, f := range worker.funcs { | |||
worker.addFunc(funcname, f.timeout) | |||
} | |||
ok := true | |||
for ok { | |||
var job *Job | |||
if job, ok = <-worker.in; ok { | |||
go worker.dealJob(job) | |||
} | |||
} | |||
defer func() { | |||
for _, v := range worker.agents { | |||
v.Close() | |||
} | |||
}() | |||
worker.running = true | |||
for _, v := range worker.agents { | |||
go v.Work() | |||
} | |||
for funcname, f := range worker.funcs { | |||
worker.addFunc(funcname, f.timeout) | |||
} | |||
ok := true | |||
for ok { | |||
var job *Job | |||
if job, ok = <-worker.in; ok { | |||
go worker.dealJob(job) | |||
} | |||
} | |||
} | |||
// job handler | |||
func (worker *Worker) handleJob(job *Job) { | |||
if worker.JobHandler != nil { | |||
if err := worker.JobHandler(job); err != nil { | |||
worker.err(err) | |||
} | |||
} | |||
if worker.JobHandler != nil { | |||
if err := worker.JobHandler(job); err != nil { | |||
worker.err(err) | |||
} | |||
} | |||
} | |||
// Close. | |||
func (worker *Worker) Close() { | |||
worker.running = false | |||
close(worker.in) | |||
if worker.limit != nil { | |||
close(worker.limit) | |||
} | |||
worker.running = false | |||
close(worker.in) | |||
if worker.limit != nil { | |||
close(worker.limit) | |||
} | |||
} | |||
// Send a something out, get the samething back. | |||
func (worker *Worker) Echo(data []byte) { | |||
job := newJob(common.REQ, common.ECHO_REQ, data) | |||
worker.broadcast(job) | |||
job := newJob(common.REQ, common.ECHO_REQ, data) | |||
worker.broadcast(job) | |||
} | |||
// Remove all of functions. | |||
// Both from the worker or job servers. | |||
func (worker *Worker) Reset() { | |||
job := newJob(common.REQ, common.RESET_ABILITIES, nil) | |||
worker.broadcast(job) | |||
worker.funcs = make(JobFuncs) | |||
job := newJob(common.REQ, common.RESET_ABILITIES, nil) | |||
worker.broadcast(job) | |||
worker.funcs = make(JobFuncs) | |||
} | |||
// Set the worker's unique id. | |||
func (worker *Worker) SetId(id string) { | |||
worker.Id = id | |||
job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) | |||
worker.broadcast(job) | |||
worker.Id = id | |||
job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) | |||
worker.broadcast(job) | |||
} | |||
// Execute the job. And send back the result. | |||
func (worker *Worker) exec(job *Job) (err error) { | |||
defer func() { | |||
if r := recover(); r != nil { | |||
if e, ok := r.(error); ok { | |||
err = e | |||
} else { | |||
err = common.ErrUnknown | |||
} | |||
} | |||
} () | |||
f, ok := worker.funcs[job.Fn] | |||
if !ok { | |||
return common.Errorf("The function does not exist: %s", job.Fn) | |||
} | |||
var r *result | |||
if f.timeout == 0 { | |||
d, e := f.f(job) | |||
r = &result{data:d, err: e} | |||
} else { | |||
r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second) | |||
} | |||
var datatype uint32 | |||
if r.err == nil { | |||
datatype = common.WORK_COMPLETE | |||
} else { | |||
if r.data == nil { | |||
datatype = common.WORK_FAIL | |||
} else { | |||
datatype = common.WORK_EXCEPTION | |||
} | |||
err = r.err | |||
} | |||
job.magicCode = common.REQ | |||
job.DataType = datatype | |||
job.Data = r.data | |||
if worker.running { | |||
job.agent.WriteJob(job) | |||
} | |||
return | |||
defer func() { | |||
if r := recover(); r != nil { | |||
if e, ok := r.(error); ok { | |||
err = e | |||
} else { | |||
err = common.ErrUnknown | |||
} | |||
} | |||
}() | |||
f, ok := worker.funcs[job.Fn] | |||
if !ok { | |||
return common.Errorf("The function does not exist: %s", job.Fn) | |||
} | |||
var r *result | |||
if f.timeout == 0 { | |||
d, e := f.f(job) | |||
r = &result{data: d, err: e} | |||
} else { | |||
r = execTimeout(f.f, job, time.Duration(f.timeout)*time.Second) | |||
} | |||
var datatype uint32 | |||
if r.err == nil { | |||
datatype = common.WORK_COMPLETE | |||
} else { | |||
if r.data == nil { | |||
datatype = common.WORK_FAIL | |||
} else { | |||
datatype = common.WORK_EXCEPTION | |||
} | |||
err = r.err | |||
} | |||
job.magicCode = common.REQ | |||
job.DataType = datatype | |||
job.Data = r.data | |||
if worker.running { | |||
job.agent.WriteJob(job) | |||
} | |||
return | |||
} | |||
func (worker *Worker) removeAgent(a *agent) { | |||
for k, v := range worker.agents { | |||
if v == a { | |||
worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...) | |||
} | |||
} | |||
if len(worker.agents) == 0 { | |||
worker.err(common.ErrNoActiveAgent) | |||
} | |||
for k, v := range worker.agents { | |||
if v == a { | |||
worker.agents = append(worker.agents[:k], worker.agents[k+1:]...) | |||
} | |||
} | |||
if len(worker.agents) == 0 { | |||
worker.err(common.ErrNoActiveAgent) | |||
} | |||
} | |||
type result struct { | |||
data []byte | |||
err error | |||
data []byte | |||
err error | |||
} | |||
func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) { | |||
rslt := make(chan *result) | |||
defer close(rslt) | |||
go func() { | |||
defer func() {recover()}() | |||
d, e := f(job) | |||
rslt <- &result{data: d, err: e} | |||
}() | |||
select { | |||
case r = <-rslt: | |||
case <-time.After(timeout): | |||
go job.cancel() | |||
return &result{err:common.ErrTimeOut} | |||
} | |||
return r | |||
rslt := make(chan *result) | |||
defer close(rslt) | |||
go func() { | |||
defer func() { recover() }() | |||
d, e := f(job) | |||
rslt <- &result{data: d, err: e} | |||
}() | |||
select { | |||
case r = <-rslt: | |||
case <-time.After(timeout): | |||
go job.cancel() | |||
return &result{err: common.ErrTimeOut} | |||
} | |||
return r | |||
} |
@@ -5,40 +5,40 @@ import "testing" | |||
var worker *Worker | |||
func init() { | |||
worker = New(Unlimited) | |||
worker = New(Unlimited) | |||
} | |||
func TestWorkerAddServer(t *testing.T) { | |||
t.Log("Add local server 127.0.0.1:4730.") | |||
if err := worker.AddServer("127.0.0.1:4730"); err != nil { | |||
t.Error(err) | |||
} | |||
if l := len(worker.agents); l != 1 { | |||
t.Log(worker.agents) | |||
t.Error("The length of server list should be 1.") | |||
} | |||
t.Log("Add local server 127.0.0.1:4730.") | |||
if err := worker.AddServer("127.0.0.1:4730"); err != nil { | |||
t.Error(err) | |||
} | |||
if l := len(worker.agents); l != 1 { | |||
t.Log(worker.agents) | |||
t.Error("The length of server list should be 1.") | |||
} | |||
} | |||
func foobar(job *Job) ([]byte, error) { | |||
return nil, nil | |||
return nil, nil | |||
} | |||
func TestWorkerAddFunction(t *testing.T) { | |||
if err := worker.AddFunc("foobar", foobar, 0); err != nil { | |||
t.Error(err) | |||
} | |||
if err := worker.AddFunc("timeout", foobar, 5); err != nil { | |||
t.Error(err) | |||
} | |||
if l := len(worker.funcs); l != 2 { | |||
t.Log(worker.funcs) | |||
t.Errorf("The length of function map should be %d.", 2) | |||
} | |||
if err := worker.AddFunc("foobar", foobar, 0); err != nil { | |||
t.Error(err) | |||
} | |||
if err := worker.AddFunc("timeout", foobar, 5); err != nil { | |||
t.Error(err) | |||
} | |||
if l := len(worker.funcs); l != 2 { | |||
t.Log(worker.funcs) | |||
t.Errorf("The length of function map should be %d.", 2) | |||
} | |||
} | |||
func TestWorkerRemoveFunc(t *testing.T) { | |||
if err := worker.RemoveFunc("foobar"); err != nil { | |||
t.Error(err) | |||
} | |||
if err := worker.RemoveFunc("foobar"); err != nil { | |||
t.Error(err) | |||
} | |||
} |