0.2 refactoring begining

This commit is contained in:
Xing Xing 2013-08-29 16:51:23 +08:00
parent 2002bb1804
commit e5c30068cd
13 changed files with 798 additions and 712 deletions

View File

@ -6,28 +6,13 @@
package client
import (
"io"
"net"
"sync"
"time"
"bytes"
"strconv"
"github.com/mikespook/gearman-go/common"
"io"
"net"
"sync"
"github.com/mikespook/golib/idgen"
)
var (
IdGen IdGenerator
)
func init() {
IdGen = NewObjectId()
}
// Status handler
// handle, known, running, numerator, denominator
type StatusHandler func(string, bool, bool, uint64, uint64)
/*
/*
The client side api for gearman
usage:
@ -36,267 +21,164 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
*/
type Client struct {
ErrHandler common.ErrorHandler
TimeOut time.Duration
net, addr string
respHandler map[string]ResponseHandler
createdHandler ResponseHandler
in chan []byte
isConn bool
conn net.Conn
mutex sync.RWMutex
ErrorHandler ErrorHandler
in chan []byte
out chan *Job
created chan string
echo chan []byte
status chan *Status
jobhandlers map[string]JobHandler
isConn bool
conn net.Conn
addr string
mutex sync.RWMutex
IdGen idgen.IdGen
}
// Create a new client.
// Connect to "addr" through "network"
// Eg.
// client, err := client.New("127.0.0.1:4730")
func New(addr string) (client *Client, err error) {
client = &Client{
created: make(chan string, common.QUEUE_SIZE),
echo: make(chan []byte, common.QUEUE_SIZE),
status: make(chan *Status, common.QUEUE_SIZE),
jobhandlers: make(map[string]JobHandler, common.QUEUE_SIZE),
in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE),
addr: addr,
TimeOut: time.Second,
}
if err = client.connect(); err != nil {
return
}
client.isConn = true
go client.inLoop()
go client.outLoop()
return
func New(net, addr string) (client *Client, err error) {
client = &Client{
net: net,
addr: addr,
respHandler: make(map[string]ResponseHandler, QUEUE_SIZE),
in: make(chan []byte, QUEUE_SIZE),
}
if err = client.connect(); err != nil {
return
}
client.isConn = true
go client.readLoop()
go client.processLoop()
return
}
// {{{ private functions
//
//
func (client *Client) connect() (err error) {
client.conn, err = net.Dial(common.NETWORK, client.addr)
return
client.conn, err = net.Dial(client.net, client.addr)
return
}
// Internal write
func (client *Client) write(buf []byte) (err error) {
var n int
for i := 0; i < len(buf); i += n {
n, err = client.conn.Write(buf[i:])
if err != nil {
return
}
}
return
func (client *Client) write(req *request) (err error) {
var n int
buf := req.Encode()
for i := 0; i < len(buf); i += n {
n, err = client.conn.Write(buf[i:])
if err != nil {
return
}
}
return
}
// read length bytes from the socket
func (client *Client) readData(length int) (data []byte, err error) {
n := 0
buf := make([]byte, common.BUFFER_SIZE)
// read until data can be unpacked
for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n {
if n, err = client.conn.Read(buf); err != nil {
if !client.isConn {
return nil, common.ErrConnClosed
}
if err == io.EOF && n == 0 {
if data == nil {
err = common.ErrConnection
return
}
return data, nil
}
return
}
data = append(data, buf[0:n]...)
if n < common.BUFFER_SIZE {
break
}
}
return
func (client *Client) read(length int) (data []byte, err error) {
n := 0
buf := getBuffer(BUFFER_SIZE)
// read until data can be unpacked
for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n {
if n, err = client.conn.Read(buf); err != nil {
if !client.isConn {
err = ErrConnClosed
return
}
if err == io.EOF && n == 0 {
if data == nil {
err = ErrConnection
}
}
return
}
data = append(data, buf[0:n]...)
if n < BUFFER_SIZE {
break
}
}
return
}
// unpack data
func (client *Client) unpack(data []byte) ([]byte, int, bool) {
tl := len(data)
start := 0
for i := 0; i < tl+1-common.PACKET_LEN; i++ {
if start+common.PACKET_LEN > tl { // too few data to unpack, read more
return nil, common.PACKET_LEN, false
}
if string(data[start:start+4]) == common.RES_STR {
l := int(common.BytesToUint32([4]byte{data[start+8],
data[start+9], data[start+10], data[start+11]}))
total := l + common.PACKET_LEN
if total == tl { // data is what we want
return data, common.PACKET_LEN, true
} else if total < tl { // data[:total] is what we want, data[total:] is the more
client.in <- data[total:]
data = data[start:total]
return data, common.PACKET_LEN, true
} else { // ops! It won't be possible.
return nil, total - tl, false
}
} else { // flag was not found, move to next step
start++
}
}
return nil, common.PACKET_LEN, false
// read data from socket
func (client *Client) readLoop() {
var data []byte
var err error
for client.isConn {
if data, err = client.read(BUFFER_SIZE); err != nil {
if err == ErrConnClosed {
break
}
client.err(err)
continue
}
client.in <- data
}
close(client.in)
}
// Internal read
func (client *Client) read() (rel []byte, err error) {
var data []byte
ok := false
l := common.PACKET_LEN
for !ok {
inlen := len(client.in)
if inlen > 0 {
// in queue is not empty
for i := 0; i < inlen; i++ {
data = append(data, <-client.in...)
}
} else {
var d []byte
d, err = client.readData(l)
if err != nil {
return
}
data = append(data, d...)
}
rel, l, ok = client.unpack(data)
}
return
}
// out loop
func (client *Client) outLoop() {
for job := range client.out {
if err := client.write(job.Encode()); err != nil {
client.err(err)
}
}
}
// in loop
func (client *Client) inLoop() {
defer common.DisablePanic()
for {
rel, err := client.read()
if err != nil {
if err == common.ErrConnection {
client.Close()
}
if err != common.ErrConnClosed {
client.err(err)
}
break
}
job, err := decodeJob(rel)
if err != nil {
client.err(err)
continue
//break
}
switch job.DataType {
case common.ERROR:
_, err := common.GetError(job.Data)
client.err(err)
case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
client.handleJob(job)
case common.ECHO_RES:
client.handleEcho(job)
case common.JOB_CREATED:
client.handleCreated(job)
case common.STATUS_RES:
client.handleStatus(job)
default:
break
}
}
// decode data & process it
func (client *Client) processLoop() {
var resp *response
var l int
var err error
var data, leftdata []byte
for data = range client.in {
l = len(data)
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data ...)
}
if l < MIN_PACKET_LEN { // not enough data
leftdata = data
continue
}
if resp, l, err = decodeResponse(data); err != nil {
client.err(err)
continue
}
switch resp.DataType {
case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE,
WORK_FAIL, WORK_EXCEPTION:
client.handleResponse(string(resp.Handle), resp)
}
if len(data) > l {
leftdata = data[l:]
}
}
}
// error handler
func (client *Client) err (e error) {
if client.ErrHandler != nil {
client.ErrHandler(e)
}
func (client *Client) err(e error) {
if client.ErrorHandler != nil {
client.ErrorHandler(e)
}
}
// job handler
func (client *Client) handleJob(job *Job) {
client.mutex.RLock()
defer client.mutex.RUnlock()
if h, ok := client.jobhandlers[job.Handle]; ok {
h(job)
delete(client.jobhandlers, job.Handle)
}
}
func (client *Client) handleEcho(job *Job) {
client.echo <- job.Data
}
func (client *Client) handleCreated(job *Job) {
client.created <- string(job.Data)
}
// status handler
func (client *Client) handleStatus(job *Job) {
data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
if len(data) != 5 {
client.err(common.Errorf("Invalid data: %V", job.Data))
return
}
status := &Status{}
status.Handle = string(data[0])
status.Known = (data[1][0] == '1')
status.Running = (data[2][0] == '1')
var err error
status.Numerator, err = strconv.ParseUint(string(data[3]), 10, 0)
if err != nil {
client.err(common.Errorf("Invalid Integer: %s", data[3]))
return
}
status.Denominator, err = strconv.ParseUint(string(data[4]), 10, 0)
if err != nil {
client.err(common.Errorf("Invalid Integer: %s", data[4]))
return
}
client.status <- status
}
// Send the job to job server.
func (client *Client) writeJob(job *Job) {
client.out <- job
func (client *Client) handleResponse(key string, resp *response) {
client.mutex.RLock()
defer client.mutex.RUnlock()
if h, ok := client.respHandler[key]; ok {
h(resp)
delete(client.respHandler, string(resp.Handle))
}
}
// Internal do
func (client *Client) do(funcname string, data []byte,
flag uint32, id string) (handle string) {
l := len(funcname) + len(id) + len(data) + 2
rel := make([]byte, 0, l)
rel = append(rel, []byte(funcname)...) // len(funcname)
rel = append(rel, '\x00') // 1 Byte
rel = append(rel, []byte(id)...) // len(uid)
rel = append(rel, '\x00') // 1 Byte
rel = append(rel, data...) // len(data)
client.writeJob(newJob(common.REQ, flag, rel))
// Waiting for JOB_CREATED
handle = <-client.created
return
flag uint32) (handle []byte) {
req := getJob(funcname, client.IdGen.Id().(string), data)
client.mutex.Lock()
defer client.mutex.Unlock()
client.write(req)
var wg sync.WaitGroup
wg.Add(1)
client.createdHandler = func(resp *response) {
defer wg.Done()
handle = resp.Handle
}
wg.Wait()
return
}
// }}}
@ -304,77 +186,73 @@ flag uint32, id string) (handle string) {
// Do the function.
// funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
// and if it is background job: JOB_BG.
// JOB_LOW | JOB_BG means the job is running with low level in background.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH
func (client *Client) Do(funcname string, data []byte,
flag byte, jobhandler JobHandler) (handle string) {
var datatype uint32
switch flag {
case JOB_LOW :
datatype = common.SUBMIT_JOB_LOW
case JOB_HIGH :
datatype = common.SUBMIT_JOB_HIGH
default:
datatype = common.SUBMIT_JOB
}
id := IdGen.Id()
client.mutex.Lock()
defer client.mutex.Unlock()
handle = client.do(funcname, data, datatype, id)
if jobhandler != nil {
client.jobhandlers[handle] = jobhandler
}
return
flag byte, h ResponseHandler) (handle []byte) {
var datatype uint32
switch flag {
case JOB_LOW:
datatype = SUBMIT_JOB_LOW
case JOB_HIGH:
datatype = SUBMIT_JOB_HIGH
default:
datatype = SUBMIT_JOB
}
handle = client.do(funcname, data, datatype)
client.mutex.Lock()
defer client.mutex.Unlock()
if h != nil {
client.respHandler[string(handle)] = h
}
return
}
// Do the function at background.
// funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH
func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string) {
var datatype uint32
switch flag {
case JOB_LOW :
datatype = common.SUBMIT_JOB_LOW_BG
case JOB_HIGH :
datatype = common.SUBMIT_JOB_HIGH_BG
default:
datatype = common.SUBMIT_JOB_BG
}
handle = client.do(funcname, data, datatype, "")
return
flag byte) (handle []byte) {
var datatype uint32
switch flag {
case JOB_LOW:
datatype = SUBMIT_JOB_LOW_BG
case JOB_HIGH:
datatype = SUBMIT_JOB_HIGH_BG
default:
datatype = SUBMIT_JOB_BG
}
handle = client.do(funcname, data, datatype)
return
}
// Get job status from job server.
// !!!Not fully tested.!!!
func (client *Client) Status(handle string, timeout time.Duration) (status *Status, err error) {
client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle)))
select {
case status = <-client.status:
case <-time.After(timeout):
err = common.ErrTimeOut
}
return
func (client *Client) Status(handle []byte, h ResponseHandler) (err error) {
req := getRequest()
req.DataType = GET_STATUS
req.Data = handle
client.write(req)
if h != nil {
client.respHandler["status-" + string(handle)] = h
}
return
}
// Send a something out, get the samething back.
func (client *Client) Echo(data []byte, timeout time.Duration) (r []byte, err error) {
client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
select {
case r = <-client.echo:
case <-time.After(timeout):
err = common.ErrTimeOut
}
return
func (client *Client) Echo(data []byte, h ResponseHandler) (err error) {
req := getRequest()
req.DataType = ECHO_REQ
req.Data = data
client.write(req)
if h != nil {
client.respHandler["echo"] = h
}
return
}
// Close
func (client *Client) Close() (err error) {
client.isConn = false
close(client.in)
close(client.out)
close(client.echo)
close(client.created)
close(client.status)
return client.conn.Close();
client.isConn = false
return client.conn.Close()
}

View File

@ -1,96 +1,95 @@
package client
import (
"time"
"testing"
"testing"
"time"
)
var client *Client
func TestClientAddServer(t *testing.T) {
t.Log("Add local server 127.0.0.1:4730")
var err error
if client, err = New("127.0.0.1:4730"); err != nil {
t.Error(err)
return
}
client.ErrHandler = func(e error) {
t.Log(e)
}
t.Log("Add local server 127.0.0.1:4730")
var err error
if client, err = New("127.0.0.1:4730"); err != nil {
t.Error(err)
return
}
client.ErrHandler = func(e error) {
t.Log(e)
}
}
func TestClientEcho(t *testing.T) {
echo, err := client.Echo([]byte("Hello world"), time.Second)
if err != nil {
t.Error(err)
return
}
if string(echo) != "Hello world" {
t.Errorf("Invalid echo data: %s", echo)
return
}
echo, err := client.Echo([]byte("Hello world"), time.Second)
if err != nil {
t.Error(err)
return
}
if string(echo) != "Hello world" {
t.Errorf("Invalid echo data: %s", echo)
return
}
}
func TestClientDoBg(t *testing.T) {
if handle := client.DoBg("ToUpper", []byte("abcdef"),
JOB_LOW); handle == "" {
t.Error("Handle is empty.")
}
if handle := client.DoBg("ToUpper", []byte("abcdef"),
JOB_LOW); handle == "" {
t.Error("Handle is empty.")
}
}
func TestClientDo(t *testing.T) {
jobHandler := func(job *Job) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
if handle := client.Do("ToUpper", []byte("abcdef"),
JOB_LOW, jobHandler); handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
jobHandler := func(job *Job) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
if handle := client.Do("ToUpper", []byte("abcdef"),
JOB_LOW, jobHandler); handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientStatus(t *testing.T) {
s1, err := client.Status("handle not exists", time.Second)
if err != nil {
t.Error(err)
return
}
if s1.Known {
t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
return
}
if s1.Running {
t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
return
}
s1, err := client.Status("handle not exists", time.Second)
if err != nil {
t.Error(err)
return
}
if s1.Known {
t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
return
}
if s1.Running {
t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
return
}
handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil);
s2, err := client.Status(handle, time.Second)
if err != nil {
t.Error(err)
return
}
if !s2.Known {
t.Errorf("The job (%s) should be known.", s2.Handle)
return
}
if s2.Running {
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
return
}
handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil)
s2, err := client.Status(handle, time.Second)
if err != nil {
t.Error(err)
return
}
if !s2.Known {
t.Errorf("The job (%s) should be known.", s2.Handle)
return
}
if s2.Running {
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
return
}
}
func TestClientClose(t *testing.T) {
if err := client.Close(); err != nil {
t.Error(err)
}
if err := client.Close(); err != nil {
t.Error(err)
}
}

75
client/common.go Normal file
View File

@ -0,0 +1,75 @@
// Copyright 2011 - 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
const (
NETWORK = "tcp"
// queue size
QUEUE_SIZE = 8
// read buffer size
BUFFER_SIZE = 1024
// min packet length
MIN_PACKET_LEN = 12
// \x00REQ
REQ = 5391697
REQ_STR = "\x00REQ"
// \x00RES
RES = 5391699
RES_STR = "\x00RES"
// package data type
CAN_DO = 1
CANT_DO = 2
RESET_ABILITIES = 3
PRE_SLEEP = 4
NOOP = 6
JOB_CREATED = 8
GRAB_JOB = 9
NO_JOB = 10
JOB_ASSIGN = 11
WORK_STATUS = 12
WORK_COMPLETE = 13
WORK_FAIL = 14
GET_STATUS = 15
ECHO_REQ = 16
ECHO_RES = 17
ERROR = 19
STATUS_RES = 20
SET_CLIENT_ID = 22
CAN_DO_TIMEOUT = 23
WORK_EXCEPTION = 25
WORK_DATA = 28
WORK_WARNING = 29
GRAB_JOB_UNIQ = 30
JOB_ASSIGN_UNIQ = 31
SUBMIT_JOB = 7
SUBMIT_JOB_BG = 18
SUBMIT_JOB_HIGH = 21
SUBMIT_JOB_HIGH_BG = 32
SUBMIT_JOB_LOW = 33
SUBMIT_JOB_LOW_BG = 34
)
const (
// Job type
// JOB_NORMAL | JOB_BG means a normal level job run in background
// normal level
JOB_NORMAL = 0
// background job
JOB_BG = 1
// low level
JOB_LOW = 2
// high level
JOB_HIGH = 4
)
func getBuffer(l int) (buf []byte) {
// TODO add byte buffer pool
buf = make([]byte, l)
return
}

48
client/common_test.go Normal file
View File

@ -0,0 +1,48 @@
package client
import (
"bytes"
"testing"
)
var (
testCase = map[uint32][4]byte{
0: [...]byte{0, 0, 0, 0},
1: [...]byte{0, 0, 0, 1},
256: [...]byte{0, 0, 1, 0},
256 * 256: [...]byte{0, 1, 0, 0},
256 * 256 * 256: [...]byte{1, 0, 0, 0},
256*256*256 + 256*256 + 256 + 1: [...]byte{1, 1, 1, 1},
4294967295: [...]byte{0xFF, 0xFF, 0xFF, 0xFF},
}
)
func TestUint32ToBytes(t *testing.T) {
for k, v := range testCase {
b := Uint32ToBytes(k)
if bytes.Compare(b[:], v[:]) != 0 {
t.Errorf("%v was expected, but %v was got", v, b)
}
}
}
func TestBytesToUint32s(t *testing.T) {
for k, v := range testCase {
u := BytesToUint32([4]byte(v))
if u != k {
t.Errorf("%v was expected, but %v was got", k, u)
}
}
}
func BenchmarkByteToUnit32(b *testing.B) {
for i := 0; i < b.N; i++ {
BytesToUint32([4]byte{0xF, 0xF, 0xF, 0xF})
}
}
func BenchmarkUint32ToByte(b *testing.B) {
for i := 0; i < b.N; i++ {
Uint32ToBytes(123456)
}
}

49
client/error.go Normal file
View File

@ -0,0 +1,49 @@
// Copyright 2011 - 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"bytes"
"errors"
"fmt"
"syscall"
"strconv"
)
var (
ErrJobTimeOut = errors.New("Do a job time out")
ErrInvalidData = errors.New("Invalid data")
ErrWorkWarning = errors.New("Work warning")
ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type")
ErrOutOfCap = errors.New("Out of the capability")
ErrNotConn = errors.New("Did not connect to job server")
ErrFuncNotFound = errors.New("The function was not found")
ErrConnection = errors.New("Connection error")
ErrNoActiveAgent = errors.New("No active agent")
ErrTimeOut = errors.New("Executing time out")
ErrUnknown = errors.New("Unknown error")
ErrConnClosed = errors.New("Connection closed")
)
func DisablePanic() { recover() }
// Extract the error message
func GetError(data []byte) (eno syscall.Errno, err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %V", data)
return
}
var n uint64
if n, err = strconv.ParseUint(string(rel[0]), 10, 0); err != nil {
return
}
eno = syscall.Errno(n)
err = errors.New(string(rel[1]))
return
}

View File

@ -1,4 +1,10 @@
package client
// Job handler
type JobHandler func(*Job)
// Response handler
type ResponseHandler func(*response)
// Error handler
type ErrorHandler func(error)
// Status handler
// handle, known, running, numerator, denominator
type StatusHandler func(string, bool, bool, uint64, uint64)

View File

@ -1,35 +1,35 @@
package client
import (
"strconv"
"labix.org/v2/mgo/bson"
"github.com/mikespook/golib/autoinc"
"github.com/mikespook/golib/autoinc"
"labix.org/v2/mgo/bson"
"strconv"
)
type IdGenerator interface {
Id() string
Id() string
}
// ObjectId
type objectId struct {}
type objectId struct{}
func (id *objectId) Id() string {
return bson.NewObjectId().Hex()
return bson.NewObjectId().Hex()
}
func NewObjectId() IdGenerator {
return &objectId{}
return &objectId{}
}
// AutoIncId
type autoincId struct {
*autoinc.AutoInc
*autoinc.AutoInc
}
func (id *autoincId) Id() string {
return strconv.Itoa(id.AutoInc.Id())
return strconv.Itoa(id.AutoInc.Id())
}
func NewAutoIncId() IdGenerator {
return &autoincId{autoinc.New(1, 1)}
return &autoincId{autoinc.New(1, 1)}
}

View File

@ -1,142 +0,0 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"bytes"
"github.com/mikespook/gearman-go/common"
)
const (
// Job type
// JOB_NORMAL | JOB_BG means a normal level job run in background
// normal level
JOB_NORMAL = 0
// background job
JOB_BG = 1
// low level
JOB_LOW = 2
// high level
JOB_HIGH = 4
)
// An error handler
type ErrorHandler func(error)
// Client side job
type Job struct {
Data []byte
Handle, UniqueId string
magicCode, DataType uint32
}
// Create a new job
func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
return &Job{magicCode: magiccode,
DataType: datatype,
Data: data}
}
// Decode a job from byte slice
func decodeJob(data []byte) (job *Job, err error) {
if len(data) < 12 {
return nil, common.Errorf("Invalid data: %V", data)
}
datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) {
return nil, common.Errorf("Invalid data: %V", data)
}
data = data[12:]
var handle string
switch datatype {
case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
i := bytes.IndexByte(data, '\x00')
if i != -1 {
handle = string(data[:i])
data = data[i + 1:]
}
}
return &Job{magicCode: common.RES,
DataType: datatype,
Data: data,
Handle: handle}, nil
}
// Encode a job to byte slice
func (job *Job) Encode() (data []byte) {
l := len(job.Data)
tl := l + 12
data = make([]byte, tl)
magiccode := common.Uint32ToBytes(job.magicCode)
datatype := common.Uint32ToBytes(job.DataType)
datalength := common.Uint32ToBytes(uint32(l))
for i := 0; i < tl; i ++ {
switch {
case i < 4:
data[i] = magiccode[i]
case i < 8:
data[i] = datatype[i - 4]
case i < 12:
data[i] = datalength[i - 8]
default:
data[i] = job.Data[i - 12]
}
}
// Alternative
/*
data = append(data, magiccode[:] ...)
data = append(data, datatype[:] ...)
data = append(data, datalength[:] ...)
data = append(data, job.Data ...)
*/
return
}
// Extract the job's result.
func (job *Job) Result() (data []byte, err error) {
switch job.DataType {
case common.WORK_FAIL:
job.Handle = string(job.Data)
return nil, common.ErrWorkFail
case common.WORK_EXCEPTION:
err = common.ErrWorkException
fallthrough
case common.WORK_COMPLETE:
s := bytes.SplitN(job.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
return nil, common.Errorf("Invalid data: %V", job.Data)
}
job.Handle = string(s[0])
data = s[1]
default:
err = common.ErrDataType
}
return
}
// Extract the job's update
func (job *Job) Update() (data []byte, err error) {
if job.DataType != common.WORK_DATA && job.DataType != common.WORK_WARNING {
err = common.ErrDataType
return
}
s := bytes.SplitN(job.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = common.ErrInvalidData
return
}
if job.DataType == common.WORK_WARNING {
err = common.ErrWorkWarning
}
job.Handle = string(s[0])
data = s[1]
return
}

View File

@ -6,157 +6,154 @@
package client
import (
"sync"
"time"
"errors"
"math/rand"
"github.com/mikespook/gearman-go/common"
"errors"
"math/rand"
"sync"
)
const (
PoolSize = 10
PoolSize = 10
)
var (
ErrNotFound = errors.New("Server Not Found")
ErrNotFound = errors.New("Server Not Found")
)
type poolClient struct {
*Client
Rate int
*Client
Rate int
}
type SelectionHandler func(map[string]*poolClient, string) string
func SelectWithRate(pool map[string]*poolClient,
last string) (addr string) {
total := 0
for _, item := range pool {
total += item.Rate
if rand.Intn(total) < item.Rate {
return item.addr
}
}
return last
last string) (addr string) {
total := 0
for _, item := range pool {
total += item.Rate
if rand.Intn(total) < item.Rate {
return item.addr
}
}
return last
}
func SelectRandom(pool map[string]*poolClient,
last string) (addr string) {
r := rand.Intn(len(pool))
i := 0
for k, _ := range pool {
if r == i {
return k
}
i ++
}
return last
last string) (addr string) {
r := rand.Intn(len(pool))
i := 0
for k, _ := range pool {
if r == i {
return k
}
i++
}
return last
}
type Pool struct {
SelectionHandler SelectionHandler
ErrHandler common.ErrorHandler
SelectionHandler SelectionHandler
ErrorHandler ErrorHandler
clients map[string]*poolClient
last string
clients map[string]*poolClient
last string
mutex sync.Mutex
mutex sync.Mutex
}
// Create a new pool.
func NewPool() (pool *Pool) {
return &Pool{
clients: make(map[string]*poolClient, PoolSize),
SelectionHandler: SelectWithRate,
}
return &Pool{
clients: make(map[string]*poolClient, PoolSize),
SelectionHandler: SelectWithRate,
}
}
// Add a server with rate.
func (pool *Pool) Add(addr string, rate int) (err error) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
var item *poolClient
var ok bool
if item, ok = pool.clients[addr]; ok {
item.Rate = rate
} else {
var client *Client
client, err = New(addr)
item = &poolClient{Client: client, Rate: rate}
pool.clients[addr] = item
}
return
func (pool *Pool) Add(net, addr string, rate int) (err error) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
var item *poolClient
var ok bool
if item, ok = pool.clients[addr]; ok {
item.Rate = rate
} else {
var client *Client
client, err = New(net, addr)
item = &poolClient{Client: client, Rate: rate}
pool.clients[addr] = item
}
return
}
// Remove a server.
func (pool *Pool) Remove(addr string) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
delete(pool.clients, addr)
pool.mutex.Lock()
defer pool.mutex.Unlock()
delete(pool.clients, addr)
}
func (pool *Pool) Do(funcname string, data []byte,
flag byte, h JobHandler) (addr, handle string) {
client := pool.selectServer()
handle = client.Do(funcname, data, flag, h)
addr = client.addr
return
flag byte, h ResponseHandler) (addr string, handle []byte) {
client := pool.selectServer()
handle = client.Do(funcname, data, flag, h)
addr = client.addr
return
}
func (pool *Pool) DoBg(funcname string, data []byte,
flag byte) (addr, handle string) {
client := pool.selectServer()
handle = client.DoBg(funcname, data, flag)
addr = client.addr
return
flag byte) (addr string, handle []byte) {
client := pool.selectServer()
handle = client.DoBg(funcname, data, flag)
addr = client.addr
return
}
// Get job status from job server.
// !!!Not fully tested.!!!
func (pool *Pool) Status(addr, handle string, timeout time.Duration) (status *Status, err error) {
if client, ok := pool.clients[addr]; ok {
status, err = client.Status(handle, timeout)
} else {
err = ErrNotFound
}
return
func (pool *Pool) Status(addr string, handle []byte, h ResponseHandler) (err error) {
if client, ok := pool.clients[addr]; ok {
err = client.Status(handle, h)
} else {
err = ErrNotFound
}
return
}
// Send a something out, get the samething back.
func (pool *Pool) Echo(addr string, data []byte, timeout time.Duration) (r []byte, err error) {
var client *poolClient
if addr == "" {
client = pool.selectServer()
} else {
var ok bool
if client, ok = pool.clients[addr]; !ok {
err = ErrNotFound
return
}
}
r, err = client.Echo(data, timeout)
return
func (pool *Pool) Echo(addr string, data []byte, h ResponseHandler) (r []byte, err error) {
var client *poolClient
if addr == "" {
client = pool.selectServer()
} else {
var ok bool
if client, ok = pool.clients[addr]; !ok {
err = ErrNotFound
return
}
}
err = client.Echo(data, h)
return
}
// Close
func (pool *Pool) Close() (err map[string]error) {
err = make(map[string]error)
for _, c := range pool.clients {
err[c.addr] = c.Close()
}
return
err = make(map[string]error)
for _, c := range pool.clients {
err[c.addr] = c.Close()
}
return
}
// selecting server
func (pool *Pool) selectServer() (client *poolClient) {
for client == nil {
addr := pool.SelectionHandler(pool.clients, pool.last)
var ok bool
if client, ok = pool.clients[addr]; ok {
pool.last = addr
break
}
}
return
for client == nil {
addr := pool.SelectionHandler(pool.clients, pool.last)
var ok bool
if client, ok = pool.clients[addr]; ok {
pool.last = addr
break
}
}
return
}

View File

@ -1,107 +1,107 @@
package client
import (
"time"
"testing"
"testing"
"time"
)
var (
pool = NewPool()
pool = NewPool()
)
func TestPoolAdd(t *testing.T) {
t.Log("Add servers")
if err := pool.Add("127.0.0.1:4730", 1); err != nil {
t.Error(err)
}
if err := pool.Add("127.0.0.1:4730", 1); err != nil {
t.Error(err)
}
if len(pool.clients) != 2 {
t.Errorf("2 servers expected, %d got.", len(pool.clients))
}
t.Log("Add servers")
if err := pool.Add("127.0.0.1:4730", 1); err != nil {
t.Error(err)
}
if err := pool.Add("127.0.0.1:4730", 1); err != nil {
t.Error(err)
}
if len(pool.clients) != 2 {
t.Errorf("2 servers expected, %d got.", len(pool.clients))
}
}
func TestPoolEcho(t *testing.T) {
echo, err := pool.Echo("", []byte("Hello pool"), time.Second)
if err != nil {
t.Error(err)
return
}
if string(echo) != "Hello pool" {
t.Errorf("Invalid echo data: %s", echo)
return
}
echo, err := pool.Echo("", []byte("Hello pool"), time.Second)
if err != nil {
t.Error(err)
return
}
if string(echo) != "Hello pool" {
t.Errorf("Invalid echo data: %s", echo)
return
}
_, err = pool.Echo("not exists", []byte("Hello pool"), time.Second)
if err != ErrNotFound {
t.Errorf("ErrNotFound expected, got %s", err)
}
_, err = pool.Echo("not exists", []byte("Hello pool"), time.Second)
if err != ErrNotFound {
t.Errorf("ErrNotFound expected, got %s", err)
}
}
func TestPoolDoBg(t *testing.T) {
if addr, handle := pool.DoBg("ToUpper", []byte("abcdef"),
JOB_LOW); handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
}
if addr, handle := pool.DoBg("ToUpper", []byte("abcdef"),
JOB_LOW); handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
}
}
func TestPoolDo(t *testing.T) {
jobHandler := func(job *Job) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
if addr, handle := pool.Do("ToUpper", []byte("abcdef"),
JOB_LOW, jobHandler); handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
}
jobHandler := func(job *Job) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
if addr, handle := pool.Do("ToUpper", []byte("abcdef"),
JOB_LOW, jobHandler); handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
}
}
func TestPoolStatus(t *testing.T) {
s1, err := pool.Status("127.0.0.1:4730", "handle not exists", time.Second)
if err != nil {
t.Error(err)
return
}
if s1.Known {
t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
}
if s1.Running {
t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
}
s1, err := pool.Status("127.0.0.1:4730", "handle not exists", time.Second)
if err != nil {
t.Error(err)
return
}
if s1.Known {
t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
}
if s1.Running {
t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
}
addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil);
s2, err := pool.Status(addr, handle, time.Second)
if err != nil {
t.Error(err)
return
}
addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil)
s2, err := pool.Status(addr, handle, time.Second)
if err != nil {
t.Error(err)
return
}
if !s2.Known {
t.Errorf("The job (%s) should be known.", s2.Handle)
}
if s2.Running {
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
}
if !s2.Known {
t.Errorf("The job (%s) should be known.", s2.Handle)
}
if s2.Running {
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
}
_, err = pool.Status("not exists", "not exists", time.Second)
if err != ErrNotFound {
t.Error(err)
}
_, err = pool.Status("not exists", "not exists", time.Second)
if err != ErrNotFound {
t.Error(err)
}
}
func TestPoolClose(t *testing.T) {
return
if err := pool.Close(); err != nil {
t.Error(err)
}
return
if err := pool.Close(); err != nil {
t.Error(err)
}
}

47
client/request.go Normal file
View File

@ -0,0 +1,47 @@
// Copyright 2013 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"encoding/binary"
)
// request
type request struct {
DataType uint32
Data []byte
}
// Encode a Request to byte slice
func (req *request) Encode() (data []byte) {
l := len(req.Data) // length of data
tl := l + 12 // add 12 bytes head
data = getBuffer(tl)
copy(data[:4], REQ_STR)
binary.BigEndian.PutUint32(data[4:8], req.DataType)
binary.BigEndian.PutUint32(data[8:12], uint32(l))
copy(data[12:], req.Data)
return
}
func getRequest() (req *request) {
// TODO add a pool
req = &request{}
return
}
func getJob(funcname, id string, data []byte) (req *request) {
req = getRequest()
a := len(funcname)
b := len(id)
c := len(data)
l := a + b + c + 2
req.Data = getBuffer(l)
copy(req.Data[0:a], funcname)
copy(req.Data[a+1:a+b+1], []byte(id))
copy(req.Data[a+b+1:a+b+c+1], data)
return
}

129
client/response.go Normal file
View File

@ -0,0 +1,129 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"fmt"
"bytes"
"strconv"
"encoding/binary"
)
// response
type response struct {
DataType uint32
Data, Handle []byte
UID string
}
// Extract the Response's result.
// if data == nil, err != nil, then worker failing to execute job
// if data != nil, err != nil, then worker has a exception
// if data != nil, err == nil, then worker complate job
// after calling this method, the Response.Handle will be filled
func (resp *response) Result() (data []byte, err error) {
switch resp.DataType {
case WORK_FAIL:
resp.Handle = resp.Data
err = ErrWorkFail
return
case WORK_EXCEPTION:
err = ErrWorkException
fallthrough
case WORK_COMPLETE:
s := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = fmt.Errorf("Invalid data: %V", resp.Data)
return
}
resp.Handle = s[0]
data = s[1]
default:
err = ErrDataType
}
return
}
// Extract the job's update
func (resp *response) Update() (data []byte, err error) {
if resp.DataType != WORK_DATA &&
resp.DataType != WORK_WARNING {
err = ErrDataType
return
}
s := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = ErrInvalidData
return
}
if resp.DataType == WORK_WARNING {
err = ErrWorkWarning
}
resp.Handle = s[0]
data = s[1]
return
}
// Decode a job from byte slice
func decodeResponse(data []byte) (resp *response, l int, err error) {
if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %V", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
dt := data[MIN_PACKET_LEN:dl+MIN_PACKET_LEN]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %V", data)
return
}
resp = getResponse()
resp.DataType = binary.BigEndian.Uint32(data[4:8])
switch resp.DataType {
case WORK_DATA, WORK_WARNING, WORK_STATUS,
WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION:
s := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(s) >= 2 {
resp.Handle = s[0]
resp.Data = s[1]
} else {
err = fmt.Errorf("Invalid data: %V", data)
return
}
}
l = len(resp.Data) + MIN_PACKET_LEN
return
}
// status handler
func (resp *response) Status() (status *Status, err error) {
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 5)
if len(data) != 5 {
err = fmt.Errorf("Invalid data: %V", resp.Data)
return
}
status = &Status{}
status.Handle = data[0]
status.Known = (data[1][0] == '1')
status.Running = (data[2][0] == '1')
status.Numerator, err = strconv.ParseUint(string(data[3]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[3])
return
}
status.Denominator, err = strconv.ParseUint(string(data[4]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[4])
return
}
return
}
func getResponse() (resp *response) {
// TODO add a pool
resp = &response{}
return
}

View File

@ -1,7 +1,7 @@
package client
type Status struct {
Handle string
Known, Running bool
Numerator, Denominator uint64
Handle []byte
Known, Running bool
Numerator, Denominator uint64
}