Compare commits

..

No commits in common. "master" and "0.1.2" have entirely different histories.

39 changed files with 1317 additions and 3096 deletions

22
.gitignore vendored
View File

@ -1,22 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe

29
.hgtags Normal file
View File

@ -0,0 +1,29 @@
b68aee2a48811a1bee5994b56437c393c6fb2f5b 2011-05-24
0dc8bc71d7e895caf5803c1905bf07a823462fba native.start
dee83cac69e07ed4f3efde162d981f5855101845 0.0.1
dee83cac69e07ed4f3efde162d981f5855101845 go1-0.0.1
dee83cac69e07ed4f3efde162d981f5855101845 0.0.1
0000000000000000000000000000000000000000 0.0.1
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
b68aee2a48811a1bee5994b56437c393c6fb2f5b 2011-05-24
0000000000000000000000000000000000000000 2011-05-24
0dc8bc71d7e895caf5803c1905bf07a823462fba native.start
0000000000000000000000000000000000000000 native.start
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
7928d7ed58bc0e36abebb5602b4f8880551054a7 go1-0.1
0000000000000000000000000000000000000000 0.0.1
dee83cac69e07ed4f3efde162d981f5855101845 0.0.1
7928d7ed58bc0e36abebb5602b4f8880551054a7 0.1
7928d7ed58bc0e36abebb5602b4f8880551054a7 go1-0.1
0000000000000000000000000000000000000000 go1-0.1
dee83cac69e07ed4f3efde162d981f5855101845 go1-0.0.1
0000000000000000000000000000000000000000 go1-0.0.1
4e3bf88517539cf6a0342e57b86df6e17ceb228c 0.1.1
4e3bf88517539cf6a0342e57b86df6e17ceb228c 0.1.1
0000000000000000000000000000000000000000 0.1.1
0000000000000000000000000000000000000000 0.1.1
eea0878b43d209630d7b342b1b99c61c839b454f 0.1.1

View File

@ -1,7 +0,0 @@
language: go
go:
- 1.4
before_install:
- sudo apt-get remove -y gearman-job-server
- sudo apt-get install -y gearman-job-server

143
README.md
View File

@ -1,122 +1,71 @@
Gearman-Go # Gearman API for golang
==========
This module is a [Gearman](http://gearman.org/) API for the [Go Programming Language](http://golang.org). This package is a [Gearman](http://gearman.org/) API for [Golang](http://golang.org).
The protocols were written in pure Go. It contains two sub-packages: It was implemented a native protocol for both worker and client API.
The client package is used for sending jobs to the Gearman job server, Copyright 2012 Xing Xing <mikespook@gmail.com>
and getting responses from the server. All rights reserved.
Use of this source code is governed by a MIT license that can be found
in the LICENSE file.
"github.com/mikespook/gearman-go/client" # INSTALL
The worker package will help developers in developing Gearman worker
service easily.
"github.com/mikespook/gearman-go/worker"
[![Build Status](https://travis-ci.org/mikespook/gearman-go.png?branch=master)](https://travis-ci.org/mikespook/gearman-go)
[![GoDoc](https://godoc.org/github.com/mikespook/gearman-go?status.png)](https://godoc.org/github.com/mikespook/gearman-go)
Install
=======
Install the client package: Install the client package:
> $ go get github.com/mikespook/gearman-go/client > $ go get bitbucket.org/mikespook/gearman-go/client
Install the worker package: Install the worker package:
> $ go get github.com/mikespook/gearman-go/worker > $ go get bitbucket.org/mikespook/gearman-go/worker
Both of them: Install both:
> $ go get github.com/mikespook/gearman-go > $ go get bitbucket.org/mikespook/gearman-go
Usage
===== # SAMPLE OF USAGE
## Worker ## Worker
```go w := worker.New(worker.Unlimited)
// Limit number of concurrent jobs execution. w.ErrHandler = func(e error) {
// Use worker.Unlimited (0) if you want no limitation. log.Println(e)
w := worker.New(worker.OneByOne) }
w.ErrHandler = func(e error) { w.AddServer("127.0.0.1:4730")
log.Println(e) w.AddFunc("ToUpper", ToUpper, worker.Immediately)
} w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
w.AddServer("127.0.0.1:4730") w.Work()
// Use worker.Unlimited (0) if you want no timeout
w.AddFunc("ToUpper", ToUpper, worker.Unlimited)
// This will give a timeout of 5 seconds
w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work()
```
## Client ## Client
```go c, err := client.New("127.0.0.1:4730")
// ... // ...
c, err := client.New("tcp4", "127.0.0.1:4730") defer c.Close()
// ... error handling echo := []byte("Hello\x00 world")
defer c.Close() c.JobHandler = func(job *client.Job) error {
c.ErrorHandler = func(e error) { log.Printf("%s", job.Data)
log.Println(e) return nil
} }
echo := []byte("Hello\x00 world") c.ErrHandler = func(e error) {
echomsg, err := c.Echo(echo) log.Println(e)
// ... error handling panic(e)
log.Println(string(echomsg)) }
jobHandler := func(resp *client.Response) { handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL)
log.Printf("%s", resp.Data) // ...
}
handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler)
// ...
```
Branches # Contacts
========
Version 0.x means: _It is far far away from stable._ Xing Xing <mikespook@gmail.com>
__Use at your own risk!__ [Blog](http://mikespook.com)
* master current usable version [@Twitter](http://twitter.com/mikespook)
* 0.2-dev Refactoring a lot of things
* 0.1-testing Old API and some known issues, eg. [issue-14](https://github.com/mikespook/gearman-go/issues/14)
Contributors # History
============
Great thanks to all of you for your support and interest! * 0.1.2 Fixed issues: timeout executing, resources leaking.
* 0.1.1 Fixed the issue of grabbing jobs.
* 0.1 Code refactoring; Redesign the API.
* 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API.
(_Alphabetic order_) # TODO
* [Alex Zylman](https://github.com/azylman)
* [C.R. Kirkwood-Watts](https://github.com/kirkwood)
* [Damian Gryski](https://github.com/dgryski)
* [Gabriel Cristian Alecu](https://github.com/AzuraMeta)
* [Graham Barr](https://github.com/gbarr)
* [Ingo Oeser](https://github.com/nightlyone)
* [jake](https://github.com/jbaikge)
* [Joe Higton](https://github.com/draxil)
* [Jonathan Wills](https://github.com/runningwild)
* [Kevin Darlington](https://github.com/kdar)
* [miraclesu](https://github.com/miraclesu)
* [Paul Mach](https://github.com/paulmach)
* [Randall McPherson](https://github.com/rlmcpherson)
* [Sam Grimee](https://github.com/sgrimee)
Maintainer
==========
* [Xing Xing](http://mikespook.com) &lt;<mikespook@gmail.com>&gt; [@Twitter](http://twitter.com/mikespook)
Open Source - MIT Software License
==================================
See LICENSE.

View File

@ -1,360 +1,282 @@
// The client package helps developers connect to Gearmand, send // Copyright 2011 Xing Xing <mikespook@gmail.com>.
// jobs and fetch result. // All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client package client
import ( import (
"bufio" "io"
"net" "net"
"sync" "time"
"time" "bytes"
"strconv"
"bitbucket.org/mikespook/golib/autoinc"
"bitbucket.org/mikespook/gearman-go/common"
) )
var ( // Job handler
DefaultTimeout time.Duration = time.Second type JobHandler func(*Job) error
) // Status handler
// handle, known, running, numerator, denominator
type StatusHandler func(string, bool, bool, uint64, uint64)
// One client connect to one server. /*
// Use Pool for multi-connections. The client side api for gearman
usage:
c := client.New("tcp4", "127.0.0.1:4730")
handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
*/
type Client struct { type Client struct {
sync.Mutex ErrHandler common.ErrorHandler
JobHandler JobHandler
StatusHandler StatusHandler
TimeOut time.Duration
net, addr string in chan []byte
innerHandler *responseHandlerMap out chan *Job
in chan *Response jobCreated chan *Job
conn net.Conn conn net.Conn
rw *bufio.ReadWriter ai *autoinc.AutoInc
ResponseTimeout time.Duration // response timeout for do()
ErrorHandler ErrorHandler
} }
type responseHandlerMap struct { // Create a new client.
sync.Mutex // Connect to "addr" through "network"
holder map[string]handledResponse // Eg.
// client, err := client.New("127.0.0.1:4730")
func New(addr string) (client *Client, err error) {
conn, err := net.Dial(common.NETWORK, addr)
if err != nil {
return
}
client = &Client{
jobCreated: make(chan *Job),
in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE),
conn: conn,
ai: autoinc.New(0, 1),
TimeOut: time.Second,
}
go client.inLoop()
go client.outLoop()
return
} }
type handledResponse struct { // out loop
internal ResponseHandler // internal handler, always non-nil func (client *Client) outLoop() {
external ResponseHandler // handler passed in from (*Client).Do, sometimes nil ok := true
for ok {
if job, ok := <-client.out; ok {
if err := client.write(job.Encode()); err != nil {
client.err(err)
}
}
}
} }
func newResponseHandlerMap() *responseHandlerMap { // in loop
return &responseHandlerMap{holder: make(map[string]handledResponse, queueSize)} func (client *Client) inLoop() {
defer common.DisablePanic()
for {
rel, err := client.read()
if err != nil {
if err == common.ErrConnection {
client.Close()
break
}
client.err(err)
continue
}
job, err := decodeJob(rel)
if err != nil {
client.err(err)
continue
}
switch job.DataType {
case common.ERROR:
_, err := common.GetError(job.Data)
client.err(err)
case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION,
common.ECHO_RES:
go client.handleJob(job)
case common.JOB_CREATED:
client.jobCreated <- job
case common.STATUS_RES:
go client.handleStatus(job)
}
}
} }
func (r *responseHandlerMap) remove(key string) { // inner read
r.Lock() func (client *Client) read() (data []byte, err error) {
delete(r.holder, key) if len(client.in) > 0 {
r.Unlock() // incoming queue is not empty
data = <-client.in
} else {
// empty queue, read data from socket
for {
buf := make([]byte, common.BUFFER_SIZE)
var n int
if n, err = client.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 {
if data == nil {
err = common.ErrConnection
return
}
break
}
return
}
data = append(data, buf[0:n]...)
if n < common.BUFFER_SIZE {
break
}
}
}
// split package
tl := len(data)
start, end := 0, 4
for i := 0; i < tl; i++ {
if string(data[start:end]) == common.RES_STR {
l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]}))
total := l + 12
if total == tl {
return
} else {
client.in <- data[total:]
data = data[:total]
return
}
} else {
start++
end++
}
}
return nil, common.Errorf("Invalid data: %V", data)
} }
func (r *responseHandlerMap) getAndRemove(key string) (handledResponse, bool) { // error handler
r.Lock() func (client *Client) err (e error) {
rh, b := r.holder[key] if client.ErrHandler != nil {
delete(r.holder, key) client.ErrHandler(e)
r.Unlock() }
return rh, b
} }
func (r *responseHandlerMap) putWithExternalHandler(key string, internal, external ResponseHandler) { // job handler
r.Lock() func (client *Client) handleJob(job *Job) {
r.holder[key] = handledResponse{internal: internal, external: external} if client.JobHandler != nil {
r.Unlock() if err := client.JobHandler(job); err != nil {
client.err(err)
}
}
} }
func (r *responseHandlerMap) put(key string, rh ResponseHandler) { // status handler
r.putWithExternalHandler(key, rh, nil) func (client *Client) handleStatus(job *Job) {
if client.StatusHandler != nil {
data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
if len(data) != 5 {
client.err(common.Errorf("Invalid data: %V", job.Data))
return
}
handle := string(data[0])
known := (data[1][0] == '1')
running := (data[2][0] == '1')
numerator, err := strconv.ParseUint(string(data[3][0]), 10, 0)
if err != nil {
client.err(common.Errorf("Invalid handle: %s", data[3][0]))
return
}
denominator, err := strconv.ParseUint(string(data[4][0]), 10, 0)
if err != nil {
client.err(common.Errorf("Invalid handle: %s", data[4][0]))
return
}
client.StatusHandler(handle, known, running, numerator, denominator)
}
} }
// New returns a client. // Do the function.
func New(network, addr string) (client *Client, err error) { // funcname is a string with function name.
client = &Client{ // data is encoding to byte array.
net: network, // flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
addr: addr, // and if it is background job: JOB_BG.
innerHandler: newResponseHandlerMap(), // JOB_LOW | JOB_BG means the job is running with low level in background.
in: make(chan *Response, queueSize), func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) {
ResponseTimeout: DefaultTimeout, var datatype uint32
} if flag & JOB_LOW == JOB_LOW {
client.conn, err = net.Dial(client.net, client.addr) if flag & JOB_BG == JOB_BG {
if err != nil { datatype = common.SUBMIT_JOB_LOW_BG
return } else {
} datatype = common.SUBMIT_JOB_LOW
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn), }
bufio.NewWriter(client.conn)) } else if flag & JOB_HIGH == JOB_HIGH {
go client.readLoop() if flag & JOB_BG == JOB_BG {
go client.processLoop() datatype = common.SUBMIT_JOB_HIGH_BG
return } else {
datatype = common.SUBMIT_JOB_HIGH
}
} else if flag & JOB_BG == JOB_BG {
datatype = common.SUBMIT_JOB_BG
} else {
datatype = common.SUBMIT_JOB
}
uid := strconv.Itoa(int(client.ai.Id()))
l := len(funcname) + len(uid) + len(data) + 2
rel := make([]byte, 0, l)
rel = append(rel, []byte(funcname)...) // len(funcname)
rel = append(rel, '\x00') // 1 Byte
rel = append(rel, []byte(uid)...) // len(uid)
rel = append(rel, '\x00') // 1 Byte
rel = append(rel, data...) // len(data)
client.writeJob(newJob(common.REQ, datatype, rel))
// Waiting for JOB_CREATED
select {
case job := <-client.jobCreated:
return string(job.Data), nil
case <-time.After(client.TimeOut):
return "", common.ErrJobTimeOut
}
return
} }
func (client *Client) write(req *request) (err error) { // Get job status from job server.
var n int // !!!Not fully tested.!!!
buf := req.Encode() func (client *Client) Status(handle string) {
for i := 0; i < len(buf); i += n { job := newJob(common.REQ, common.GET_STATUS, []byte(handle))
n, err = client.rw.Write(buf[i:]) client.writeJob(job)
if err != nil {
return
}
}
return client.rw.Flush()
} }
func (client *Client) read(length int) (data []byte, err error) { // Send a something out, get the samething back.
n := 0 func (client *Client) Echo(data []byte) {
buf := getBuffer(bufferSize) client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
// read until data can be unpacked
for i := length; i > 0 || len(data) < minPacketLength; i -= n {
if n, err = client.rw.Read(buf); err != nil {
return
}
data = append(data, buf[0:n]...)
if n < bufferSize {
break
}
}
return
} }
func (client *Client) readLoop() { // Send the job to job server.
defer close(client.in) func (client *Client) writeJob(job *Job) {
var data, leftdata []byte client.out <- job
var err error
var resp *Response
ReadLoop:
for client.conn != nil {
if data, err = client.read(bufferSize); err != nil {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Timeout() {
client.err(err)
}
if opErr.Temporary() {
continue
}
break
}
client.err(err)
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the client should close the conection
// and reconnect to job server.
client.Close()
client.conn, err = net.Dial(client.net, client.addr)
if err != nil {
client.err(err)
break
}
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
bufio.NewWriter(client.conn))
continue
}
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
leftdata = nil
}
for {
l := len(data)
if l < minPacketLength { // not enough data
leftdata = data
continue ReadLoop
}
if resp, l, err = decodeResponse(data); err != nil {
leftdata = data[l:]
continue ReadLoop
} else {
client.in <- resp
}
data = data[l:]
if len(data) > 0 {
continue
}
break
}
}
} }
func (client *Client) processLoop() { // Internal write
rhandlers := map[string]ResponseHandler{} func (client *Client) write(buf []byte) (err error) {
for resp := range client.in { var n int
switch resp.DataType { for i := 0; i < len(buf); i += n {
case dtError: n, err = client.conn.Write(buf[i:])
client.err(getError(resp.Data)) if err != nil {
case dtStatusRes: return
client.handleInner("s"+resp.Handle, resp, nil) }
case dtJobCreated: }
client.handleInner("c", resp, rhandlers) return
case dtEchoRes:
client.handleInner("e", resp, nil)
case dtWorkData, dtWorkWarning, dtWorkStatus:
if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
}
case dtWorkComplete, dtWorkFail, dtWorkException:
if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
delete(rhandlers, resp.Handle)
}
}
}
} }
func (client *Client) err(e error) { // Close
if client.ErrorHandler != nil {
client.ErrorHandler(e)
}
}
func (client *Client) handleInner(key string, resp *Response, rhandlers map[string]ResponseHandler) {
if h, ok := client.innerHandler.getAndRemove(key); ok {
if h.external != nil && resp.Handle != "" {
rhandlers[resp.Handle] = h.external
}
h.internal(resp)
}
}
type handleOrError struct {
handle string
err error
}
func (client *Client) do(funcname string, data []byte,
flag uint32, h ResponseHandler, id string) (handle string, err error) {
if len(id) == 0 {
return "", ErrInvalidId
}
if client.conn == nil {
return "", ErrLostConn
}
var result = make(chan handleOrError, 1)
client.Lock()
defer client.Unlock()
client.innerHandler.putWithExternalHandler("c", func(resp *Response) {
if resp.DataType == dtError {
err = getError(resp.Data)
result <- handleOrError{"", err}
return
}
handle = resp.Handle
result <- handleOrError{handle, nil}
}, h)
req := getJob(id, []byte(funcname), data)
req.DataType = flag
if err = client.write(req); err != nil {
client.innerHandler.remove("c")
return
}
var timer = time.After(client.ResponseTimeout)
select {
case ret := <-result:
return ret.handle, ret.err
case <-timer:
client.innerHandler.remove("c")
return "", ErrLostConn
}
return
}
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (handle string, err error) {
handle, err = client.DoWithId(funcname, data, flag, h, IdGen.Id())
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) {
handle, err = client.DoBgWithId(funcname, data, flag, IdGen.Id())
return
}
// Status gets job status from job server.
func (client *Client) Status(handle string) (status *Status, err error) {
if client.conn == nil {
return nil, ErrLostConn
}
var mutex sync.Mutex
mutex.Lock()
client.innerHandler.put("s"+handle, func(resp *Response) {
defer mutex.Unlock()
var err error
status, err = resp._status()
if err != nil {
client.err(err)
}
})
req := getRequest()
req.DataType = dtGetStatus
req.Data = []byte(handle)
client.write(req)
mutex.Lock()
return
}
// Echo.
func (client *Client) Echo(data []byte) (echo []byte, err error) {
if client.conn == nil {
return nil, ErrLostConn
}
var mutex sync.Mutex
mutex.Lock()
client.innerHandler.put("e", func(resp *Response) {
echo = resp.Data
mutex.Unlock()
})
req := getRequest()
req.DataType = dtEchoReq
req.Data = data
client.write(req)
mutex.Lock()
return
}
// Close connection
func (client *Client) Close() (err error) { func (client *Client) Close() (err error) {
client.Lock() close(client.jobCreated)
defer client.Unlock() close(client.in)
if client.conn != nil { close(client.out)
err = client.conn.Close() return client.conn.Close();
client.conn = nil
}
return
}
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoWithId(funcname string, data []byte,
flag byte, h ResponseHandler, id string) (handle string, err error) {
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype, h, id)
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBgWithId(funcname string, data []byte,
flag byte, id string) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype, nil, id)
return
} }

View File

@ -1,377 +1,43 @@
package client package client
import ( import (
"crypto/md5" "testing"
"encoding/hex"
"errors"
"flag"
"fmt"
"os"
"testing"
"time"
) )
const ( var client *Client
TestStr = "Hello world"
)
var (
client *Client
runIntegrationTests bool
)
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
flag.Parse()
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
code := m.Run()
os.Exit(code)
}
func TestClientAddServer(t *testing.T) { func TestClientAddServer(t *testing.T) {
if !runIntegrationTests { t.Log("Add local server 127.0.0.1:4730")
t.Skip("To run this test, use: go test -integration") var err error
} if client, err = New("127.0.0.1:4730"); err != nil {
t.Log("Add local server 127.0.0.1:4730") t.Error(err)
var err error }
if client, err = New(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
client.ErrorHandler = func(e error) {
t.Log(e)
}
} }
/*
func TestClientEcho(t *testing.T) { func TestClientEcho(t *testing.T) {
if !runIntegrationTests { client.JobHandler = func(job *Job) error {
t.Skip("To run this test, use: go test -integration") echo := string(job.Data)
} if echo == "Hello world" {
echo, err := client.Echo([]byte(TestStr)) t.Log(echo)
if err != nil { } else {
t.Error(err) t.Errorf("Invalid echo data: %s", job.Data)
return }
} return nil
if string(echo) != TestStr { }
t.Errorf("Echo error, %s expected, %s got", TestStr, echo) client.Echo([]byte("Hello world"))
return
}
} }
*/
func TestClientDoBg(t *testing.T) { /*
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
handle, err := client.DoBg("ToUpper", []byte("abcdef"), JobLow)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
id := ""
_, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDo(t *testing.T) { func TestClientDo(t *testing.T) {
if !runIntegrationTests { if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil {
t.Skip("To run this test, use: go test -integration") t.Error(err)
} } else {
jobHandler := func(job *Response) { t.Log(handle)
str := string(job.Data) }
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
handle, err := client.Do("ToUpper", []byte("abcdef"),
JobLow, jobHandler)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
} }
*/
func TestClientDoWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
id := ""
_, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDoWithIdCheckSameHandle(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123,categoryId:1}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 != handle2 {
t.Error("expecting the same handle when using the same id on the same Job name")
}
}
func TestClientDoWithIdCheckDifferentHandleOnDifferentJobs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("DeleteProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 == handle2 {
t.Error("expecting different handles because there are different job names")
}
}
func TestClientMultiDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
// This integration test requires that examples/pl/worker_multi.pl be running.
//
// Test invocation is:
// go test -integration -timeout 10s -run '^TestClient(AddServer|MultiDo)$'
//
// Send 1000 requests to go through all race conditions
const nreqs = 1000
errCh := make(chan error)
gotCh := make(chan string, nreqs)
olderrh := client.ErrorHandler
client.ErrorHandler = func(e error) { errCh <- e }
client.ResponseTimeout = 5 * time.Second
defer func() { client.ErrorHandler = olderrh }()
nextJobCh := make(chan struct{})
defer close(nextJobCh)
go func() {
for range nextJobCh {
start := time.Now()
handle, err := client.Do("PerlToUpper", []byte("abcdef"), JobNormal, func(r *Response) { gotCh <- string(r.Data) })
if err == ErrLostConn && time.Since(start) > client.ResponseTimeout {
errCh <- errors.New("Impossible 'lost conn', deadlock bug detected")
} else if err != nil {
errCh <- err
}
if handle == "" {
errCh <- errors.New("Handle is empty.")
}
}
}()
for i := 0; i < nreqs; i++ {
select {
case err := <-errCh:
t.Fatal(err)
case nextJobCh <- struct{}{}:
}
}
remaining := nreqs
for remaining > 0 {
select {
case err := <-errCh:
t.Fatal(err)
case got := <-gotCh:
if got != "ABCDEF" {
t.Error("Unexpected response from PerlDoUpper: ", got)
}
remaining--
t.Logf("%d response remaining", remaining)
}
}
}
func TestClientStatus(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
status, err := client.Status("handle not exists")
if err != nil {
t.Error(err)
return
}
if status.Known {
t.Errorf("The job (%s) shouldn't be known.", status.Handle)
return
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
return
}
handle, err := client.Do("Delay5sec", []byte("abcdef"), JobLow, nil)
if err != nil {
t.Error(err)
return
}
status, err = client.Status(handle)
if err != nil {
t.Error(err)
return
}
if !status.Known {
t.Errorf("The job (%s) should be known.", status.Handle)
return
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
return
}
}
func TestClientClose(t *testing.T) { func TestClientClose(t *testing.T) {
if !runIntegrationTests { if err := client.Close(); err != nil {
t.Skip("To run this test, use: go test -integration") t.Error(err)
} }
if err := client.Close(); err != nil {
t.Error(err)
}
} }

View File

@ -1,75 +0,0 @@
package client
const (
Network = "tcp"
// queue size
queueSize = 8
// read buffer size
bufferSize = 8192
// min packet length
minPacketLength = 12
// \x00REQ
req = 5391697
reqStr = "\x00REQ"
// \x00RES
res = 5391699
resStr = "\x00RES"
// package data type
dtCanDo = 1
dtCantDo = 2
dtResetAbilities = 3
dtPreSleep = 4
dtNoop = 6
dtJobCreated = 8
dtGrabJob = 9
dtNoJob = 10
dtJobAssign = 11
dtWorkStatus = 12
dtWorkComplete = 13
dtWorkFail = 14
dtGetStatus = 15
dtEchoReq = 16
dtEchoRes = 17
dtError = 19
dtStatusRes = 20
dtSetClientId = 22
dtCanDoTimeout = 23
dtAllYours = 24
dtWorkException = 25
dtWorkData = 28
dtWorkWarning = 29
dtGrabJobUniq = 30
dtJobAssignUniq = 31
dtSubmitJob = 7
dtSubmitJobBg = 18
dtSubmitJobHigh = 21
dtSubmitJobHighBg = 32
dtSubmitJobLow = 33
dtSubmitJobLowBg = 34
WorkComplate = dtWorkComplete
WorkComplete = dtWorkComplete
WorkData = dtWorkData
WorkStatus = dtWorkStatus
WorkWarning = dtWorkWarning
WorkFail = dtWorkFail
WorkException = dtWorkException
)
const (
// Job type
JobNormal = iota
// low level
JobLow
// high level
JobHigh
)
func getBuffer(l int) (buf []byte) {
// TODO add byte buffer pool
buf = make([]byte, l)
return
}

View File

@ -1,31 +0,0 @@
package client
import (
"bytes"
"errors"
"fmt"
)
var (
ErrWorkWarning = errors.New("Work warning")
ErrInvalidData = errors.New("Invalid data")
ErrInvalidId = errors.New("Invalid ID")
ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type")
ErrLostConn = errors.New("Lost connection with Gearmand")
)
// Extract the error message
func getError(data []byte) (err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %v", data)
return
}
err = fmt.Errorf("%s: %s", rel[0], rel[1])
return
}
// Error handler
type ErrorHandler func(error)

View File

@ -1,42 +0,0 @@
package client
import (
"strconv"
"sync/atomic"
"time"
)
var (
// Global ID generator
// Default is an autoincrement ID generator
IdGen IdGenerator
)
func init() {
IdGen = NewAutoIncId()
}
// ID generator interface. Users can implament this for
// their own generator.
type IdGenerator interface {
Id() string
}
// AutoIncId
type autoincId struct {
value int64
}
func (ai *autoincId) Id() string {
next := atomic.AddInt64(&ai.value, 1)
return strconv.FormatInt(next, 10)
}
// NewAutoIncId returns an autoincrement ID generator
func NewAutoIncId() IdGenerator {
// we'll consider the nano fraction of a second at startup unique
// and count up from there.
return &autoincId{
value: int64(time.Now().Nanosecond()) << 32,
}
}

View File

@ -1,18 +0,0 @@
package client
import (
"testing"
)
func TestAutoInc(t *testing.T) {
ai := NewAutoIncId()
previous := ai.Id()
for i := 0; i < 10; i++ {
id := ai.Id()
if id == previous {
t.Errorf("Id not unique, previous and current %s", id)
}
previous = id
}
}

128
client/job.go Normal file
View File

@ -0,0 +1,128 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"bytes"
"bitbucket.org/mikespook/gearman-go/common"
)
const (
// Job type
// JOB_NORMAL | JOB_BG means a normal level job run in background
// normal level
JOB_NORMAL = 0
// background job
JOB_BG = 1
// low level
JOB_LOW = 2
// high level
JOB_HIGH = 4
)
// An error handler
type ErrorHandler func(error)
// Client side job
type Job struct {
Data []byte
Handle, UniqueId string
magicCode, DataType uint32
}
// Create a new job
func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
return &Job{magicCode: magiccode,
DataType: datatype,
Data: data}
}
// Decode a job from byte slice
func decodeJob(data []byte) (job *Job, err error) {
if len(data) < 12 {
return nil, common.Errorf("Invalid data: %V", data)
}
datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) {
return nil, common.Errorf("Invalid data: %V", data)
}
data = data[12:]
return newJob(common.RES, datatype, data), nil
}
// Encode a job to byte slice
func (job *Job) Encode() (data []byte) {
l := len(job.Data)
tl := l + 12
data = make([]byte, tl)
magiccode := common.Uint32ToBytes(job.magicCode)
datatype := common.Uint32ToBytes(job.DataType)
datalength := common.Uint32ToBytes(uint32(l))
for i := 0; i < tl; i ++ {
switch {
case i < 4:
data[i] = magiccode[i]
case i < 8:
data[i] = datatype[i - 4]
case i < 12:
data[i] = datalength[i - 8]
default:
data[i] = job.Data[i - 12]
}
}
// Alternative
/*
data = append(data, magiccode[:] ...)
data = append(data, datatype[:] ...)
data = append(data, datalength[:] ...)
data = append(data, job.Data ...)
*/
return
}
// Extract the job's result.
func (job *Job) Result() (data []byte, err error) {
switch job.DataType {
case common.WORK_FAIL:
job.Handle = string(job.Data)
return nil, common.ErrWorkFail
case common.WORK_EXCEPTION:
err = common.ErrWorkException
fallthrough
case common.WORK_COMPLETE:
s := bytes.SplitN(job.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
return nil, common.Errorf("Invalid data: %V", job.Data)
}
job.Handle = string(s[0])
data = s[1]
default:
err = common.ErrDataType
}
return
}
// Extract the job's update
func (job *Job) Update() (data []byte, err error) {
if job.DataType != common.WORK_DATA && job.DataType != common.WORK_WARNING {
err = common.ErrDataType
return
}
s := bytes.SplitN(job.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = common.ErrInvalidData
return
}
if job.DataType == common.WORK_WARNING {
err = common.ErrWorkWarning
}
job.Handle = string(s[0])
data = s[1]
return
}

View File

@ -1,165 +0,0 @@
package client
import (
"errors"
"math/rand"
"sync"
)
const (
poolSize = 10
)
var (
ErrNotFound = errors.New("Server Not Found")
)
type PoolClient struct {
*Client
Rate int
mutex sync.Mutex
}
type SelectionHandler func(map[string]*PoolClient, string) string
func SelectWithRate(pool map[string]*PoolClient,
last string) (addr string) {
total := 0
for _, item := range pool {
total += item.Rate
if rand.Intn(total) < item.Rate {
return item.addr
}
}
return last
}
func SelectRandom(pool map[string]*PoolClient,
last string) (addr string) {
r := rand.Intn(len(pool))
i := 0
for k, _ := range pool {
if r == i {
return k
}
i++
}
return last
}
type Pool struct {
SelectionHandler SelectionHandler
ErrorHandler ErrorHandler
Clients map[string]*PoolClient
last string
mutex sync.Mutex
}
// NewPool returns a new pool.
func NewPool() (pool *Pool) {
return &Pool{
Clients: make(map[string]*PoolClient, poolSize),
SelectionHandler: SelectWithRate,
}
}
// Add a server with rate.
func (pool *Pool) Add(net, addr string, rate int) (err error) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
var item *PoolClient
var ok bool
if item, ok = pool.Clients[addr]; ok {
item.Rate = rate
} else {
var client *Client
client, err = New(net, addr)
if err == nil {
item = &PoolClient{Client: client, Rate: rate}
pool.Clients[addr] = item
}
}
return
}
// Remove a server.
func (pool *Pool) Remove(addr string) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
delete(pool.Clients, addr)
}
func (pool *Pool) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (addr, handle string, err error) {
client := pool.selectServer()
client.Lock()
defer client.Unlock()
handle, err = client.Do(funcname, data, flag, h)
addr = client.addr
return
}
func (pool *Pool) DoBg(funcname string, data []byte,
flag byte) (addr, handle string, err error) {
client := pool.selectServer()
client.Lock()
defer client.Unlock()
handle, err = client.DoBg(funcname, data, flag)
addr = client.addr
return
}
// Status gets job status from job server.
// !!!Not fully tested.!!!
func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
if client, ok := pool.Clients[addr]; ok {
client.Lock()
defer client.Unlock()
status, err = client.Status(handle)
} else {
err = ErrNotFound
}
return
}
// Send a something out, get the samething back.
func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) {
var client *PoolClient
if addr == "" {
client = pool.selectServer()
} else {
var ok bool
if client, ok = pool.Clients[addr]; !ok {
err = ErrNotFound
return
}
}
client.Lock()
defer client.Unlock()
echo, err = client.Echo(data)
return
}
// Close
func (pool *Pool) Close() (err map[string]error) {
err = make(map[string]error)
for _, c := range pool.Clients {
err[c.addr] = c.Close()
}
return
}
// selecting server
func (pool *Pool) selectServer() (client *PoolClient) {
for client == nil {
addr := pool.SelectionHandler(pool.Clients, pool.last)
var ok bool
if client, ok = pool.Clients[addr]; ok {
pool.last = addr
break
}
}
return
}

View File

@ -1,139 +0,0 @@
package client
import (
"testing"
)
var (
pool = NewPool()
)
func TestPoolAdd(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add servers")
c := 2
if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil {
t.Fatal(err)
}
if err := pool.Add("tcp4", "127.0.1.1:4730", 1); err != nil {
t.Log(err)
c -= 1
}
if len(pool.Clients) != c {
t.Errorf("%d servers expected, %d got.", c, len(pool.Clients))
}
}
func TestPoolEcho(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
echo, err := pool.Echo("", []byte(TestStr))
if err != nil {
t.Error(err)
return
}
if string(echo) != TestStr {
t.Errorf("Invalid echo data: %s", echo)
return
}
_, err = pool.Echo("not exists", []byte(TestStr))
if err != ErrNotFound {
t.Errorf("ErrNotFound expected, got %s", err)
}
}
func TestPoolDoBg(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
addr, handle, err := pool.DoBg("ToUpper",
[]byte("abcdef"), JobLow)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
}
}
func TestPoolDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
addr, handle, err := pool.Do("ToUpper",
[]byte("abcdef"), JobLow, jobHandler)
if err != nil {
t.Error(err)
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
}
}
func TestPoolStatus(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
status, err := pool.Status("127.0.0.1:4730", "handle not exists")
if err != nil {
t.Error(err)
return
}
if status.Known {
t.Errorf("The job (%s) shouldn't be known.", status.Handle)
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
}
addr, handle, err := pool.Do("Delay5sec",
[]byte("abcdef"), JobLow, nil)
if err != nil {
t.Error(err)
return
}
status, err = pool.Status(addr, handle)
if err != nil {
t.Error(err)
return
}
if !status.Known {
t.Errorf("The job (%s) should be known.", status.Handle)
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
}
status, err = pool.Status("not exists", "not exists")
if err != ErrNotFound {
t.Error(err)
return
}
}
func TestPoolClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
return
if err := pool.Close(); err != nil {
t.Error(err)
}
}

View File

@ -1,42 +0,0 @@
package client
import (
"encoding/binary"
)
// Request from client
type request struct {
DataType uint32
Data []byte
}
// Encode a Request to byte slice
func (req *request) Encode() (data []byte) {
l := len(req.Data) // length of data
tl := l + minPacketLength // add 12 bytes head
data = getBuffer(tl)
copy(data[:4], reqStr)
binary.BigEndian.PutUint32(data[4:8], req.DataType)
binary.BigEndian.PutUint32(data[8:12], uint32(l))
copy(data[minPacketLength:], req.Data)
return
}
func getRequest() (req *request) {
// TODO add a pool
req = &request{}
return
}
func getJob(id string, funcname, data []byte) (req *request) {
req = getRequest()
a := len(funcname)
b := len(id)
c := len(data)
l := a + b + c + 2
req.Data = getBuffer(l)
copy(req.Data[0:a], funcname)
copy(req.Data[a+1:a+b+1], []byte(id))
copy(req.Data[a+b+2:], data)
return
}

View File

@ -1,156 +0,0 @@
package client
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
)
// Response handler
type ResponseHandler func(*Response)
// response
type Response struct {
DataType uint32
Data, UID []byte
Handle string
}
// Extract the Response's result.
// if data == nil, err != nil, then worker failing to execute job
// if data != nil, err != nil, then worker has a exception
// if data != nil, err == nil, then worker complate job
// after calling this method, the Response.Handle will be filled
func (resp *Response) Result() (data []byte, err error) {
switch resp.DataType {
case dtWorkFail:
resp.Handle = string(resp.Data)
err = ErrWorkFail
return
case dtWorkException:
err = ErrWorkException
fallthrough
case dtWorkComplete:
data = resp.Data
default:
err = ErrDataType
}
return
}
// Extract the job's update
func (resp *Response) Update() (data []byte, err error) {
if resp.DataType != dtWorkData &&
resp.DataType != dtWorkWarning {
err = ErrDataType
return
}
data = resp.Data
if resp.DataType == dtWorkWarning {
err = ErrWorkWarning
}
return
}
// Decode a job from byte slice
func decodeResponse(data []byte) (resp *Response, l int, err error) {
a := len(data)
if a < minPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %v", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
if a < minPacketLength+dl {
err = fmt.Errorf("Invalid data: %v", data)
return
}
dt := data[minPacketLength : dl+minPacketLength]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %v", data)
return
}
resp = getResponse()
resp.DataType = binary.BigEndian.Uint32(data[4:8])
switch resp.DataType {
case dtJobCreated:
resp.Handle = string(dt)
case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus,
dtWorkComplete, dtWorkException:
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 2 {
resp.Handle = string(s[0])
resp.Data = s[1]
} else {
err = fmt.Errorf("Invalid data: %v", data)
return
}
case dtWorkFail:
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 1 {
resp.Handle = string(s[0])
} else {
err = fmt.Errorf("Invalid data: %v", data)
return
}
case dtEchoRes:
fallthrough
default:
resp.Data = dt
}
l = dl + minPacketLength
return
}
func (resp *Response) Status() (status *Status, err error) {
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
if len(data) != 2 {
err = fmt.Errorf("Invalid data: %v", resp.Data)
return
}
status = &Status{}
status.Handle = resp.Handle
status.Known = true
status.Running = true
status.Numerator, err = strconv.ParseUint(string(data[0]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[0])
return
}
status.Denominator, err = strconv.ParseUint(string(data[1]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[1])
return
}
return
}
// status handler
func (resp *Response) _status() (status *Status, err error) {
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 4)
if len(data) != 4 {
err = fmt.Errorf("Invalid data: %v", resp.Data)
return
}
status = &Status{}
status.Handle = resp.Handle
status.Known = (data[0][0] == '1')
status.Running = (data[1][0] == '1')
status.Numerator, err = strconv.ParseUint(string(data[2]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[2])
return
}
status.Denominator, err = strconv.ParseUint(string(data[3]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[3])
return
}
return
}
func getResponse() (resp *Response) {
// TODO add a pool
resp = &Response{}
return
}

View File

@ -1,11 +0,0 @@
package client
// Status handler
// handle, known, running, numerator, denominator
type StatusHandler func(string, bool, bool, uint64, uint64)
type Status struct {
Handle string
Known, Running bool
Numerator, Denominator uint64
}

53
common/error.go Normal file
View File

@ -0,0 +1,53 @@
// Copyright 2011 - 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package common
import (
"fmt"
"bytes"
"errors"
"syscall"
)
var (
ErrJobTimeOut = errors.New("Do a job time out.")
ErrInvalidData = errors.New("Invalid data.")
ErrWorkWarning = errors.New("Work warning.")
ErrWorkFail = errors.New("Work fail.")
ErrWorkException = errors.New("Work exeption.")
ErrDataType = errors.New("Invalid data type.")
ErrOutOfCap = errors.New("Out of the capability.")
ErrNotConn = errors.New("Did not connect to job server.")
ErrFuncNotFound = errors.New("The function was not found.")
ErrConnection = errors.New("Connection error.")
ErrNoActiveAgent = errors.New("No active agent.")
ErrExecTimeOut = errors.New("Executing time out.")
ErrUnknown = errors.New("Unknown error.")
)
func DisablePanic() {recover()}
// Extract the error message
func GetError(data []byte) (eno syscall.Errno, err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = Errorf("Not a error data: %V", data)
return
}
l := len(rel[0])
eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]}))
err = errors.New(string(rel[1]))
return
}
// Get a formated error
func Errorf(format string, msg ... interface{}) error {
return errors.New(fmt.Sprintf(format, msg ... ))
}
// An error handler
type ErrorHandler func(error)

84
common/gearman.go Normal file
View File

@ -0,0 +1,84 @@
// Copyright 2011 - 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package common
import (
"bytes"
"encoding/binary"
)
const (
NETWORK = "tcp"
// queue size
QUEUE_SIZE = 512
// read buffer size
BUFFER_SIZE = 1024
// \x00REQ
REQ = 5391697
REQ_STR = "\x00REQ"
// \x00RES
RES = 5391699
RES_STR = "\x00RES"
// package data type
CAN_DO = 1
CANT_DO = 2
RESET_ABILITIES = 3
PRE_SLEEP = 4
NOOP = 6
JOB_CREATED = 8
GRAB_JOB = 9
NO_JOB = 10
JOB_ASSIGN = 11
WORK_STATUS = 12
WORK_COMPLETE = 13
WORK_FAIL = 14
GET_STATUS = 15
ECHO_REQ = 16
ECHO_RES = 17
ERROR = 19
STATUS_RES = 20
SET_CLIENT_ID = 22
CAN_DO_TIMEOUT = 23
WORK_EXCEPTION = 25
WORK_DATA = 28
WORK_WARNING = 29
GRAB_JOB_UNIQ = 30
JOB_ASSIGN_UNIQ = 31
SUBMIT_JOB = 7
SUBMIT_JOB_BG = 18
SUBMIT_JOB_HIGH = 21
SUBMIT_JOB_HIGH_BG = 32
SUBMIT_JOB_LOW = 33
SUBMIT_JOB_LOW_BG = 34
)
// Decode [4]byte to uint32
func BytesToUint32(buf [4]byte) uint32 {
var r uint32
b := bytes.NewBuffer(buf[:])
err := binary.Read(b, binary.BigEndian, &r)
if err != nil {
return 0
}
return r
}
// Encode uint32 to [4]byte
func Uint32ToBytes(i uint32) [4]byte {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.BigEndian, i)
if err != nil {
return [4]byte{0, 0, 0, 0}
}
var r [4]byte
for k, v := range buf.Bytes() {
r[k] = v
}
return r
}

38
common/gearman_test.go Normal file
View File

@ -0,0 +1,38 @@
package common
import (
"bytes"
"testing"
)
var (
testCase = map[uint32][4]byte {
0: [...]byte{0, 0, 0, 0},
1: [...]byte{0, 0, 0, 1},
256: [...]byte{0, 0, 1, 0},
256 * 256: [...]byte{0, 1, 0, 0},
256 * 256 * 256: [...]byte{1, 0, 0, 0},
256 * 256 * 256 + 256 * 256 + 256 + 1: [...]byte{1, 1, 1, 1},
4294967295 : [...]byte{0xFF, 0xFF, 0xFF, 0xFF},
}
)
func TestUint32ToBytes(t *testing.T) {
for k, v := range testCase {
b := Uint32ToBytes(k)
if bytes.Compare(b[:], v[:]) != 0 {
t.Errorf("%v was expected, but %v was got", v, b)
}
}
}
func TestBytesToUint32s(t *testing.T) {
for k, v := range testCase {
u := BytesToUint32([4]byte(v))
if u != k {
t.Errorf("%v was expected, but %v was got", k, u)
}
}
}

46
example/client.go Normal file
View File

@ -0,0 +1,46 @@
package main
import (
"log"
"sync"
"bitbucket.org/mikespook/gearman-go/client"
)
func main() {
var wg sync.WaitGroup
c, err := client.New("127.0.0.1:4730")
if err != nil {
log.Fatalln(err)
}
defer c.Close()
echo := []byte("Hello\x00 world")
c.JobHandler = func(job *client.Job) error {
log.Printf("%s", job.Data)
wg.Done()
return nil
}
c.ErrHandler = func(e error) {
log.Println(e)
panic(e)
}
wg.Add(1)
c.Echo(echo)
wg.Add(1)
handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL)
if err != nil {
log.Fatalln(err)
} else {
log.Println(handle)
}
c.StatusHandler = func(handle string, known, running bool, numerator, denominator uint64) {
log.Printf("%s: %b, %b, %d, %d", handle, known, running, numerator, denominator)
wg.Done()
}
wg.Add(1)
c.Status(handle)
wg.Wait()
}

View File

@ -1,80 +0,0 @@
package main
import (
"github.com/mikespook/gearman-go/client"
"log"
"os"
"sync"
)
func main() {
// Set the autoinc id generator
// You can write your own id generator
// by implementing IdGenerator interface.
// client.IdGen = client.NewAutoIncId()
c, err := client.New(client.Network, "127.0.0.1:4730")
if err != nil {
log.Fatalln(err)
}
defer c.Close()
c.ErrorHandler = func(e error) {
log.Println(e)
os.Exit(1)
}
echo := []byte("Hello\x00 world")
echomsg, err := c.Echo(echo)
if err != nil {
log.Fatalln(err)
}
log.Println(string(echomsg))
jobHandler := func(resp *client.Response) {
switch resp.DataType {
case client.WorkException:
fallthrough
case client.WorkFail:
fallthrough
case client.WorkComplate:
if data, err := resp.Result(); err == nil {
log.Printf("RESULT: %v\n", data)
} else {
log.Printf("RESULT: %s\n", err)
}
case client.WorkWarning:
fallthrough
case client.WorkData:
if data, err := resp.Update(); err == nil {
log.Printf("UPDATE: %v\n", data)
} else {
log.Printf("UPDATE: %v, %s\n", data, err)
}
case client.WorkStatus:
if data, err := resp.Status(); err == nil {
log.Printf("STATUS: %v\n", data)
} else {
log.Printf("STATUS: %s\n", err)
}
default:
log.Printf("UNKNOWN: %v", resp.Data)
}
}
handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler)
if err != nil {
log.Fatalln(err)
}
status, err := c.Status(handle)
if err != nil {
log.Fatalln(err)
}
log.Printf("%v", *status)
_, err = c.Do("Foobar", echo, client.JobNormal, jobHandler)
if err != nil {
log.Fatalln(err)
}
log.Println("Press Ctrl-C to exit ...")
var mutex sync.Mutex
mutex.Lock()
mutex.Lock()
}

View File

@ -1,33 +0,0 @@
#!/usr/bin/perl
# Runs 20 children that expose "PerlToUpper" before returning the result.
use strict; use warnings;
use constant CHILDREN => 20;
use Time::HiRes qw(usleep);
use Gearman::Worker;
$|++;
my @child_pids;
for (1 .. CHILDREN) {
if (my $pid = fork) {
push @child_pids, $pid;
next;
}
eval {
my $w = Gearman::Worker->new(job_servers => '127.0.0.1:4730');
$w->register_function(PerlToUpper => sub { print "."; uc $_[0]->arg });
$w->work while 1;
};
warn $@ if $@;
exit 0;
}
$SIG{INT} = $SIG{HUP} = sub {
kill 9, @child_pids;
print "\nChildren shut down, gracefully exiting\n";
exit 0;
};
printf "Forked %d children, serving 'PerlToUpper' function to gearman\n", CHILDREN;
sleep;

View File

@ -32,18 +32,6 @@ def main():
except Exception as e: except Exception as e:
print type(e) print type(e)
try:
completed_job_request = client.submit_job("SysInfo", "")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("MemInfo", "")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
if __name__ == '__main__': if __name__ == '__main__':
main() main()

59
example/worker.go Normal file
View File

@ -0,0 +1,59 @@
package main
import (
"os"
"log"
"time"
"strings"
"bitbucket.org/mikespook/golib/signal"
"bitbucket.org/mikespook/gearman-go/worker"
)
func ToUpper(job *worker.Job) ([]byte, error) {
log.Printf("ToUpper: Handle=[%s]; UID=[%s], Data=[%s]\n",
job.Handle, job.UniqueId, job.Data)
data := []byte(strings.ToUpper(string(job.Data)))
return data, nil
}
func ToUpperDelay10(job *worker.Job) ([]byte, error) {
log.Printf("ToUpperDelay10: Handle=[%s]; UID=[%s], Data=[%s]\n",
job.Handle, job.UniqueId, job.Data)
time.Sleep(10 * time.Second)
data := []byte(strings.ToUpper(string(job.Data)))
return data, nil
}
func main() {
log.Println("Starting ...")
defer log.Println("Shutdown complete!")
w := worker.New(worker.Unlimited)
defer w.Close()
w.ErrHandler = func(e error) {
log.Println(e)
if e == worker.ErrConnection {
proc, err := os.FindProcess(os.Getpid())
if err != nil {
log.Println(err)
}
if err := proc.Signal(os.Interrupt); err != nil {
log.Println(err)
}
}
}
w.JobHandler = func(job *worker.Job) error {
log.Printf("H=%s, UID=%s, Data=%s, DataType=%d\n", job.Handle,
job.UniqueId, job.Data, job.DataType)
return nil
}
w.AddServer("127.0.0.1:4730")
w.AddFunc("ToUpper", ToUpper, worker.Immediately)
w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
go w.Work()
sh := signal.NewHandler()
sh.Bind(os.Interrupt, func() bool {return true})
sh.Loop()
}

View File

@ -1,74 +0,0 @@
package main
import (
"log"
"net"
"os"
"strings"
"time"
"github.com/mikespook/gearman-go/worker"
"github.com/mikespook/golib/signal"
)
func ToUpper(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data())
data := []byte(strings.ToUpper(string(job.Data())))
return data, nil
}
func ToUpperDelay10(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data())
time.Sleep(10 * time.Second)
data := []byte(strings.ToUpper(string(job.Data())))
return data, nil
}
func Foobar(job worker.Job) ([]byte, error) {
log.Printf("Foobar: Data=[%s]\n", job.Data())
for i := 0; i < 10; i++ {
job.SendWarning([]byte{byte(i)})
job.SendData([]byte{byte(i)})
job.UpdateStatus(i+1, 100)
}
return job.Data(), nil
}
func main() {
log.Println("Starting ...")
defer log.Println("Shutdown complete!")
w := worker.New(worker.Unlimited)
defer w.Close()
w.ErrorHandler = func(e error) {
log.Println(e)
if opErr, ok := e.(*net.OpError); ok {
if !opErr.Temporary() {
proc, err := os.FindProcess(os.Getpid())
if err != nil {
log.Println(err)
}
if err := proc.Signal(os.Interrupt); err != nil {
log.Println(err)
}
}
}
}
w.JobHandler = func(job worker.Job) error {
log.Printf("Data=%s\n", job.Data())
return nil
}
w.AddServer("tcp4", "127.0.0.1:4730")
w.AddFunc("Foobar", Foobar, worker.Unlimited)
w.AddFunc("ToUpper", ToUpper, worker.Unlimited)
w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
w.AddFunc("SysInfo", worker.SysInfo, worker.Unlimited)
w.AddFunc("MemInfo", worker.MemInfo, worker.Unlimited)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work()
signal.Bind(os.Interrupt, func() uint { return signal.BreakExit })
signal.Wait()
}

View File

@ -3,17 +3,14 @@
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
/* /*
This module is a Gearman API for the Go Programming Language. This module is Gearman API for golang.
The protocols were written in pure Go. It contains two sub-packages: The protocol was implemented by native way.
The client package is used for sending jobs to the Gearman job server,
and getting responses from the server.
import "github.com/mikespook/gearman-go/client"
The worker package will help developers to develop Gearman's worker
in an easy way.
import "github.com/mikespook/gearman-go/worker"
*/ */
package gearman package gearman
import (
_ "bitbucket.org/mikespook/gearman-go/common"
_ "bitbucket.org/mikespook/gearman-go/client"
_ "bitbucket.org/mikespook/gearman-go/worker"
)

View File

@ -1,229 +1,171 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker package worker
import ( import (
"bufio" "io"
"bytes" "net"
"encoding/binary" "bitbucket.org/mikespook/gearman-go/common"
"io"
"net"
"sync"
) )
// The agent of job server. // The agent of job server.
type agent struct { type agent struct {
sync.Mutex conn net.Conn
conn net.Conn worker *Worker
rw *bufio.ReadWriter in chan []byte
worker *Worker out chan *Job
in chan []byte addr string
net, addr string
} }
// Create the agent of job server. // Create the agent of job server.
func newAgent(net, addr string, worker *Worker) (a *agent, err error) { func newAgent(addr string, worker *Worker) (a *agent, err error) {
a = &agent{ conn, err := net.Dial(common.NETWORK, addr)
net: net, if err != nil {
addr: addr, return
worker: worker, }
in: make(chan []byte, queueSize), a = &agent{
} conn: conn,
return worker: worker,
addr: addr,
in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE),
}
// reset abilities
a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil))
return
} }
func (a *agent) Connect() (err error) { // outputing loop
a.Lock() func (a *agent) outLoop() {
defer a.Unlock() ok := true
a.conn, err = net.Dial(a.net, a.addr) var job *Job
if err != nil { for ok {
return if job, ok = <-a.out; ok {
} if err := a.write(job.Encode()); err != nil {
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn), a.worker.err(err)
bufio.NewWriter(a.conn)) }
go a.work() }
return }
} }
func (a *agent) work() { // inputing loop
defer func() { func (a *agent) inLoop() {
if err := recover(); err != nil { defer func() {
a.worker.err(err.(error)) if r := recover(); r != nil {
} a.worker.err(common.Errorf("Exiting: %s", r))
}() }
close(a.in)
var inpack *inPack close(a.out)
var l int a.worker.removeAgent(a)
var err error }()
var data, leftdata []byte for a.worker.running {
for { a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil))
if data, err = a.read(); err != nil { RESTART:
if opErr, ok := err.(*net.OpError); ok { // got noop msg and in queue is zero, grab job
if opErr.Temporary() { rel, err := a.read()
continue if err != nil {
} else { if err == common.ErrConnection {
a.disconnect_error(err) for i:= 0; i < 3 && a.worker.running; i++ {
// else - we're probably dc'ing due to a Close() if conn, err := net.Dial(common.NETWORK, a.addr); err != nil {
a.worker.err(common.Errorf("Reconnection: %d faild", i))
break continue
} } else {
a.conn = conn
} else if err == io.EOF { goto RESTART
a.disconnect_error(err) }
break }
} a.worker.err(err)
a.worker.err(err) break
// If it is unexpected error and the connection wasn't }
// closed by Gearmand, the agent should close the conection a.worker.err(err)
// and reconnect to job server. continue
a.Close() }
a.conn, err = net.Dial(a.net, a.addr) job, err := decodeJob(rel)
if err != nil { if err != nil {
a.worker.err(err) a.worker.err(err)
break continue
} }
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn), switch job.DataType {
bufio.NewWriter(a.conn)) case common.NOOP:
} a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil))
if len(leftdata) > 0 { // some data left for processing case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
data = append(leftdata, data...) job.agent = a
} a.worker.in <- job
if len(data) < minPacketLength { // not enough data }
leftdata = data }
continue
}
for {
if inpack, l, err = decodeInPack(data); err != nil {
a.worker.err(err)
leftdata = data
break
} else {
leftdata = nil
inpack.a = a
select {
case <-a.worker.closed:
return
default:
}
a.worker.in <- inpack
if len(data) == l {
break
}
if len(data) > l {
data = data[l:]
}
}
}
}
}
func (a *agent) disconnect_error(err error) {
a.Lock()
defer a.Unlock()
if a.conn != nil {
err = &WorkerDisconnectError{
err: err,
agent: a,
}
a.worker.err(err)
}
} }
func (a *agent) Close() { func (a *agent) Close() {
a.Lock() a.conn.Close()
defer a.Unlock()
if a.conn != nil {
a.conn.Close()
a.conn = nil
}
} }
func (a *agent) Grab() { func (a *agent) Work() {
a.Lock() go a.outLoop()
defer a.Unlock() go a.inLoop()
a.grab()
} }
func (a *agent) grab() bool { // Internal read
if a.worker.closed != nil {
return false
}
outpack := getOutPack()
outpack.dataType = dtGrabJobUniq
a.write(outpack)
return true
}
func (a *agent) PreSleep() {
a.Lock()
defer a.Unlock()
outpack := getOutPack()
outpack.dataType = dtPreSleep
a.write(outpack)
}
func (a *agent) reconnect() error {
a.Lock()
defer a.Unlock()
conn, err := net.Dial(a.net, a.addr)
if err != nil {
return err
}
a.conn = conn
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
a.worker.reRegisterFuncsForAgent(a)
if a.grab() {
go a.work()
}
return nil
}
// read length bytes from the socket
func (a *agent) read() (data []byte, err error) { func (a *agent) read() (data []byte, err error) {
n := 0 if len(a.in) > 0 {
// in queue is not empty
data = <-a.in
} else {
for {
buf := make([]byte, common.BUFFER_SIZE)
var n int
if n, err = a.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 {
if data == nil {
err = common.ErrConnection
return
}
break
}
return
}
data = append(data, buf[0:n]...)
if n < common.BUFFER_SIZE {
break
}
}
}
// split package
tl := len(data)
start := 0
for i := 0; i < tl; i++ {
if string(data[start:start+4]) == common.RES_STR {
l := int(common.BytesToUint32([4]byte{data[start+8],
data[start+9], data[start+10], data[start+11]}))
total := l + 12
if total == tl {
return
} else {
a.in <- data[total:]
data = data[:total]
return
}
} else {
start++
}
}
return nil, common.Errorf("Invalid data: %V", data)
}
tmp := getBuffer(bufferSize) // Send a job to the job server.
var buf bytes.Buffer func (a *agent) WriteJob(job *Job) {
a.out <- job
// read the header so we can get the length of the data
if n, err = a.rw.Read(tmp); err != nil {
return
}
dl := int(binary.BigEndian.Uint32(tmp[8:12]))
// write what we read so far
buf.Write(tmp[:n])
// read until we receive all the data
for buf.Len() < dl+minPacketLength {
if n, err = a.rw.Read(tmp); err != nil {
return buf.Bytes(), err
}
buf.Write(tmp[:n])
}
return buf.Bytes(), err
} }
// Internal write the encoded job. // Internal write the encoded job.
func (a *agent) write(outpack *outPack) (err error) { func (a *agent) write(buf []byte) (err error) {
var n int var n int
buf := outpack.Encode() for i := 0; i < len(buf); i += n {
for i := 0; i < len(buf); i += n { n, err = a.conn.Write(buf[i:])
n, err = a.rw.Write(buf[i:]) if err != nil {
if err != nil { return err
return err }
} }
} return
return a.rw.Flush()
}
// Write with lock
func (a *agent) Write(outpack *outPack) (err error) {
a.Lock()
defer a.Unlock()
return a.write(outpack)
} }

View File

@ -1,51 +0,0 @@
package worker
const (
Network = "tcp"
// queue size
queueSize = 8
// read buffer size
bufferSize = 1024
// min packet length
minPacketLength = 12
// \x00REQ
req = 5391697
reqStr = "\x00REQ"
// \x00RES
res = 5391699
resStr = "\x00RES"
// package data type
dtCanDo = 1
dtCantDo = 2
dtResetAbilities = 3
dtPreSleep = 4
dtNoop = 6
dtJobCreated = 8
dtGrabJob = 9
dtNoJob = 10
dtJobAssign = 11
dtWorkStatus = 12
dtWorkComplete = 13
dtWorkFail = 14
dtGetStatus = 15
dtEchoReq = 16
dtEchoRes = 17
dtError = 19
dtStatusRes = 20
dtSetClientId = 22
dtCanDoTimeout = 23
dtAllYours = 24
dtWorkException = 25
dtWorkData = 28
dtWorkWarning = 29
dtGrabJobUniq = 30
dtJobAssignUniq = 31
)
func getBuffer(l int) (buf []byte) {
// TODO add byte buffer pool
buf = make([]byte, l)
return
}

View File

@ -1,28 +0,0 @@
package worker
import (
"bytes"
"errors"
"fmt"
)
var (
ErrNoneAgents = errors.New("None active agents")
ErrNoneFuncs = errors.New("None functions")
ErrTimeOut = errors.New("Executing time out")
ErrUnknown = errors.New("Unknown error")
)
// Extract the error message
func getError(data []byte) (err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %v", data)
return
}
err = fmt.Errorf("%s: %s", rel[0], rel[1])
return
}
// An error handler
type ErrorHandler func(error)

View File

@ -1,56 +0,0 @@
package worker_test
import (
"fmt"
"github.com/mikespook/gearman-go/worker"
"sync"
)
func ExampleWorker() {
// An example of worker
w := worker.New(worker.Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(worker.Network, "127.0.0.1:4730"); err != nil {
fmt.Println(err)
return
}
// A function for handling jobs
foobar := func(job worker.Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job worker.Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
}

View File

@ -1,45 +0,0 @@
package worker
import (
"encoding/json"
"runtime"
)
// Job handler
type JobHandler func(Job) error
type JobFunc func(Job) ([]byte, error)
// The definition of the callback function.
type jobFunc struct {
f JobFunc
timeout uint32
}
// Map for added function.
type jobFuncs map[string]*jobFunc
type systemInfo struct {
GOOS, GOARCH, GOROOT, Version string
NumCPU, NumGoroutine int
NumCgoCall int64
}
func SysInfo(job Job) ([]byte, error) {
return json.Marshal(&systemInfo{
GOOS: runtime.GOOS,
GOARCH: runtime.GOARCH,
GOROOT: runtime.GOROOT(),
Version: runtime.Version(),
NumCPU: runtime.NumCPU(),
NumGoroutine: runtime.NumGoroutine(),
NumCgoCall: runtime.NumCgoCall(),
})
}
var memState runtime.MemStats
func MemInfo(job Job) ([]byte, error) {
runtime.ReadMemStats(&memState)
return json.Marshal(&memState)
}

View File

@ -1,126 +0,0 @@
package worker
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
)
// Worker side job
type inPack struct {
dataType uint32
data []byte
handle, uniqueId, fn string
a *agent
}
// Create a new job
func getInPack() *inPack {
return &inPack{}
}
func (inpack *inPack) Data() []byte {
return inpack.data
}
func (inpack *inPack) Fn() string {
return inpack.fn
}
func (inpack *inPack) Handle() string {
return inpack.handle
}
func (inpack *inPack) UniqueId() string {
return inpack.uniqueId
}
func (inpack *inPack) Err() error {
if inpack.dataType == dtError {
return getError(inpack.data)
}
return nil
}
// Send some datas to client.
// Using this in a job's executing.
func (inpack *inPack) SendData(data []byte) {
outpack := getOutPack()
outpack.dataType = dtWorkData
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
func (inpack *inPack) SendWarning(data []byte) {
outpack := getOutPack()
outpack.dataType = dtWorkWarning
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
// Update status.
// Tall client how many percent job has been executed.
func (inpack *inPack) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
outpack := getOutPack()
outpack.dataType = dtWorkStatus
hl := len(inpack.handle)
nl := len(n)
dl := len(d)
outpack.data = getBuffer(hl + nl + dl + 2)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], n)
copy(outpack.data[hl+nl+2:], d)
inpack.a.write(outpack)
}
// Decode job from byte slice
func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
if len(data) < minPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %v", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
if len(data) < (dl + minPacketLength) {
err = fmt.Errorf("Not enough data: %v", data)
return
}
dt := data[minPacketLength : dl+minPacketLength]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %v", data)
return
}
inpack = getInPack()
inpack.dataType = binary.BigEndian.Uint32(data[4:8])
switch inpack.dataType {
case dtJobAssign:
s := bytes.SplitN(dt, []byte{'\x00'}, 3)
if len(s) == 3 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.data = s[2]
}
case dtJobAssignUniq:
s := bytes.SplitN(dt, []byte{'\x00'}, 4)
if len(s) == 4 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.uniqueId = string(s[2])
inpack.data = s[3]
}
default:
inpack.data = dt
}
l = dl + minPacketLength
return
}

View File

@ -1,73 +0,0 @@
package worker
import (
"bytes"
"testing"
)
var (
inpackcases = map[uint32]map[string]string{
dtNoop: map[string]string{
"src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00",
},
dtNoJob: map[string]string{
"src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00",
},
dtJobAssign: map[string]string{
"src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz",
"handle": "a",
"fn": "b",
"data": "xyz",
},
dtJobAssignUniq: map[string]string{
"src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz",
"handle": "a",
"fn": "b",
"uid": "c",
"data": "xyz",
},
}
)
func TestInPack(t *testing.T) {
for k, v := range inpackcases {
inpack, _, err := decodeInPack([]byte(v["src"]))
if err != nil {
t.Error(err)
}
if inpack.dataType != k {
t.Errorf("DataType: %d expected, %d got.", k, inpack.dataType)
}
if handle, ok := v["handle"]; ok {
if inpack.handle != handle {
t.Errorf("Handle: %s expected, %s got.", handle, inpack.handle)
}
}
if fn, ok := v["fn"]; ok {
if inpack.fn != fn {
t.Errorf("FuncName: %s expected, %s got.", fn, inpack.fn)
}
}
if uid, ok := v["uid"]; ok {
if inpack.uniqueId != uid {
t.Errorf("UID: %s expected, %s got.", uid, inpack.uniqueId)
}
}
if data, ok := v["data"]; ok {
if bytes.Compare([]byte(data), inpack.data) != 0 {
t.Errorf("UID: %v expected, %v got.", data, inpack.data)
}
}
}
}
func BenchmarkDecode(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, v := range inpackcases {
_, _, err := decodeInPack([]byte(v["src"]))
if err != nil {
b.Error(err)
}
}
}
}

View File

@ -1,12 +1,108 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker package worker
type Job interface { import (
Err() error "strconv"
Data() []byte "bitbucket.org/mikespook/gearman-go/common"
Fn() string )
SendWarning(data []byte)
SendData(data []byte) // Worker side job
UpdateStatus(numerator, denominator int) type Job struct {
Handle() string Data []byte
UniqueId() string Handle, UniqueId string
agent *agent
magicCode, DataType uint32
c chan bool
}
// Create a new job
func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
return &Job{magicCode: magiccode,
DataType: datatype,
Data: data,
c: make(chan bool),
}
}
// Decode job from byte slice
func decodeJob(data []byte) (job *Job, err error) {
if len(data) < 12 {
return nil, common.Errorf("Invalid data: %V", data)
}
datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) {
return nil, common.Errorf("Invalid data: %V", data)
}
data = data[12:]
job = newJob(common.RES, datatype, data)
return
}
// Encode a job to byte slice
func (job *Job) Encode() (data []byte) {
l := len(job.Data)
if job.Handle != "" {
l += len(job.Handle) + 1
}
data = make([]byte, 0, l + 12)
magiccode := common.Uint32ToBytes(job.magicCode)
datatype := common.Uint32ToBytes(job.DataType)
datalength := common.Uint32ToBytes(uint32(l))
data = append(data, magiccode[:]...)
data = append(data, datatype[:]...)
data = append(data, datalength[:]...)
if job.Handle != "" {
data = append(data, []byte(job.Handle)...)
data = append(data, 0)
}
data = append(data, job.Data...)
return
}
// Send some datas to client.
// Using this in a job's executing.
func (job *Job) UpdateData(data []byte, iswarning bool) {
result := append([]byte(job.Handle), 0)
result = append(result, data...)
var datatype uint32
if iswarning {
datatype = common.WORK_WARNING
} else {
datatype = common.WORK_DATA
}
job.agent.WriteJob(newJob(common.REQ, datatype, result))
}
// Update status.
// Tall client how many percent job has been executed.
func (job *Job) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
result := append([]byte(job.Handle), 0)
result = append(result, n...)
result = append(result, d...)
job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result))
}
// close the job
func (job *Job) Close() {
close(job.c)
}
// cancel the job executing
func (job *Job) cancel() {
defer func() {recover()}()
job.c <- true
}
// When a job was canceled, return a true form a channel
func (job *Job) Canceled() <-chan bool {
return job.c
} }

View File

@ -1,47 +0,0 @@
package worker
import (
"encoding/binary"
)
// Worker side job
type outPack struct {
dataType uint32
data []byte
handle string
}
func getOutPack() (outpack *outPack) {
// TODO pool
return &outPack{}
}
// Encode a job to byte slice
func (outpack *outPack) Encode() (data []byte) {
var l int
if outpack.dataType == dtWorkFail {
l = len(outpack.handle)
} else {
l = len(outpack.data)
if outpack.handle != "" {
l += len(outpack.handle) + 1
}
}
data = getBuffer(l + minPacketLength)
binary.BigEndian.PutUint32(data[:4], req)
binary.BigEndian.PutUint32(data[4:8], outpack.dataType)
binary.BigEndian.PutUint32(data[8:minPacketLength], uint32(l))
i := minPacketLength
if outpack.handle != "" {
hi := len(outpack.handle) + i
copy(data[i:hi], []byte(outpack.handle))
if outpack.dataType != dtWorkFail {
data[hi] = '\x00'
}
i = hi + 1
}
if outpack.dataType != dtWorkFail {
copy(data[i:], outpack.data)
}
return
}

View File

@ -1,99 +0,0 @@
package worker
import (
"bytes"
"testing"
)
var (
outpackcases = map[uint32]map[string]string{
dtCanDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a",
"data": "a",
},
dtCanDoTimeout: map[string]string{
"src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01",
"data": "a\x00\x00\x00\x00\x01",
},
dtCantDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a",
"data": "a",
},
dtResetAbilities: map[string]string{
"src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00",
},
dtPreSleep: map[string]string{
"src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00",
},
dtGrabJob: map[string]string{
"src": "\x00REQ\x00\x00\x00\x09\x00\x00\x00\x00",
},
dtGrabJobUniq: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1E\x00\x00\x00\x00",
},
dtWorkData: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1C\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkWarning: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkStatus: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100",
"data": "a\x0050\x00100",
},
dtWorkComplete: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkFail: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a",
"handle": "a",
},
dtWorkException: map[string]string{
"src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtSetClientId: map[string]string{
"src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a",
"data": "a",
},
dtAllYours: map[string]string{
"src": "\x00REQ\x00\x00\x00\x18\x00\x00\x00\x00",
},
}
)
func TestOutPack(t *testing.T) {
for k, v := range outpackcases {
outpack := getOutPack()
outpack.dataType = k
if handle, ok := v["handle"]; ok {
outpack.handle = handle
}
if data, ok := v["data"]; ok {
outpack.data = []byte(data)
}
data := outpack.Encode()
if bytes.Compare([]byte(v["src"]), data) != 0 {
t.Errorf("%d: %X expected, %X got.", k, v["src"], data)
}
}
}
func BenchmarkEncode(b *testing.B) {
for i := 0; i < b.N; i++ {
for k, v := range outpackcases {
outpack := getOutPack()
outpack.dataType = k
if handle, ok := v["handle"]; ok {
outpack.handle = handle
}
if data, ok := v["data"]; ok {
outpack.data = []byte(data)
}
outpack.Encode()
}
}
}

View File

@ -1,405 +1,333 @@
// The worker package helps developers to develop Gearman's worker // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// in an easy way. // Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker package worker
import ( import (
"fmt" "time"
"strconv" "bytes"
"sync" "bitbucket.org/mikespook/gearman-go/common"
"time"
) )
const ( const (
Unlimited = iota Unlimited = 0
OneByOne OneByOne = 1
Immediately = 0
) )
// Worker is the only structure needed by worker side developing. var (
// It can connect to multi-server and grab jobs. ErrConnection = common.ErrConnection
)
// Job handler
type JobHandler func(*Job) error
type JobFunc func(*Job) ([]byte, error)
// The definition of the callback function.
type jobFunc struct {
f JobFunc
timeout uint32
}
// Map for added function.
type JobFuncs map[string]*jobFunc
/*
Worker side api for gearman
usage:
w = worker.New(worker.Unlimited)
w.AddFunction("foobar", foobar)
w.AddServer("127.0.0.1:4730")
w.Work() // Enter the worker's main loop
The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
It looks like this:
func foobar(job *Job) (data []byte, err os.Error) {
//sth. here
//plaplapla...
return
}
*/
type Worker struct { type Worker struct {
sync.Mutex agents []*agent
agents []*agent funcs JobFuncs
funcs jobFuncs in chan *Job
in chan *inPack running bool
running bool limit chan bool
ready bool
jobLeftNum int64
Id string Id string
ErrorHandler ErrorHandler // assign a ErrFunc to handle errors
JobHandler JobHandler ErrHandler common.ErrorHandler
limit chan bool JobHandler JobHandler
closed chan struct{} }
leftJobs chan struct{}
// Get a new worker
func New(l int) (worker *Worker) {
worker = &Worker{
agents: make([]*agent, 0),
funcs: make(JobFuncs),
in: make(chan *Job, common.QUEUE_SIZE),
}
if l != Unlimited {
worker.limit = make(chan bool, l)
for i := 0; i < l; i ++ {
worker.limit <- true
}
}
return
} }
// New returns a worker.
// //
// If limit is set to Unlimited(=0), the worker will grab all jobs func (worker *Worker)err(e error) {
// and execute them parallelly. if worker.ErrHandler != nil {
// If limit is greater than zero, the number of paralled executing worker.ErrHandler(e)
// jobs are limited under the number. If limit is assgined to }
// OneByOne(=1), there will be only one job executed in a time.
func New(limit int) (worker *Worker) {
worker = &Worker{
agents: make([]*agent, 0, limit),
funcs: make(jobFuncs),
in: make(chan *inPack, queueSize),
}
if limit != Unlimited {
worker.limit = make(chan bool, limit-1)
}
return
} }
// inner error handling // Add a server. The addr should be 'host:port' format.
func (worker *Worker) err(e error) { // The connection is established at this time.
if worker.ErrorHandler != nil { func (worker *Worker) AddServer(addr string) (err error) {
worker.ErrorHandler(e) // Create a new job server's client as a agent of server
} server, err := newAgent(addr, worker)
if err != nil {
return err
}
worker.agents = append(worker.agents, server)
return
} }
// AddServer adds a Gearman job server. // Write a job to job server.
// // Here, the job's mean is not the oraginal mean.
// addr should be formated as 'host:port'. // Just looks like a network package for job's result or tell job server, there was a fail.
func (worker *Worker) AddServer(net, addr string) (err error) { func (worker *Worker) broadcast(job *Job) {
// Create a new job server's client as a agent of server for _, v := range worker.agents {
a, err := newAgent(net, addr, worker) v.WriteJob(job)
if err != nil { }
return err
}
worker.agents = append(worker.agents, a)
return
} }
// Broadcast an outpack to all Gearman server. // Add a function.
func (worker *Worker) broadcast(outpack *outPack) { // Plz added job servers first, then functions.
for _, v := range worker.agents { // The API will tell every connected job server that 'I can do this'
v.Write(outpack)
}
}
// AddFunc adds a function.
// Set timeout as Unlimited(=0) to disable executing timeout.
func (worker *Worker) AddFunc(funcname string, func (worker *Worker) AddFunc(funcname string,
f JobFunc, timeout uint32) (err error) { f JobFunc, timeout uint32) (err error) {
worker.Lock() if _, ok := worker.funcs[funcname]; ok {
defer worker.Unlock() return common.Errorf("The function already exists: %s", funcname)
if _, ok := worker.funcs[funcname]; ok { }
return fmt.Errorf("The function already exists: %s", funcname) worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
}
worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout} if worker.running {
if worker.running { worker.addFunc(funcname, timeout)
worker.addFunc(funcname, timeout) }
} return
return
} }
// inner add // inner add function
func (worker *Worker) addFunc(funcname string, timeout uint32) { func (worker *Worker) addFunc(funcname string, timeout uint32) {
outpack := prepFuncOutpack(funcname, timeout) var datatype uint32
worker.broadcast(outpack) var data []byte
if timeout == 0 {
datatype = common.CAN_DO
data = []byte(funcname)
} else {
datatype = common.CAN_DO_TIMEOUT
data = []byte(funcname + "\x00")
t := common.Uint32ToBytes(timeout)
data = append(data, t[:]...)
}
job := newJob(common.REQ, datatype, data)
worker.broadcast(job)
} }
func prepFuncOutpack(funcname string, timeout uint32) *outPack { // Remove a function.
outpack := getOutPack() // Tell job servers 'I can not do this now' at the same time.
if timeout == 0 {
outpack.dataType = dtCanDo
outpack.data = []byte(funcname)
} else {
outpack.dataType = dtCanDoTimeout
l := len(funcname)
timeoutString := strconv.FormatUint(uint64(timeout), 10)
outpack.data = getBuffer(l + len(timeoutString) + 1)
copy(outpack.data, []byte(funcname))
outpack.data[l] = '\x00'
copy(outpack.data[l+1:], []byte(timeoutString))
}
return outpack
}
// RemoveFunc removes a function.
func (worker *Worker) RemoveFunc(funcname string) (err error) { func (worker *Worker) RemoveFunc(funcname string) (err error) {
worker.Lock() if _, ok := worker.funcs[funcname]; !ok {
defer worker.Unlock() return common.Errorf("The function does not exist: %s", funcname)
if _, ok := worker.funcs[funcname]; !ok { }
return fmt.Errorf("The function does not exist: %s", funcname) delete(worker.funcs, funcname)
} if worker.running {
delete(worker.funcs, funcname) worker.removeFunc(funcname)
if worker.running { }
worker.removeFunc(funcname) return
}
return
} }
// inner remove // inner remove function
func (worker *Worker) removeFunc(funcname string) { func (worker *Worker) removeFunc(funcname string) {
outpack := getOutPack() job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
outpack.dataType = dtCantDo worker.broadcast(job)
outpack.data = []byte(funcname)
worker.broadcast(outpack)
} }
// inner package handling // Main loop
func (worker *Worker) handleInPack(inpack *inPack) {
switch inpack.dataType {
case dtNoJob:
inpack.a.PreSleep()
case dtNoop:
inpack.a.Grab()
case dtJobAssign, dtJobAssignUniq:
go func() {
go func() {
worker.incrExecJobNum()
defer func() {
worker.decrExecJobNum()
}()
if err := worker.exec(inpack); err != nil {
worker.err(err)
}
}()
if worker.limit != nil {
worker.limit <- true
}
inpack.a.Grab()
}()
case dtError:
worker.err(inpack.Err())
fallthrough
case dtEchoRes:
fallthrough
default:
worker.customeHandler(inpack)
}
}
// Connect to Gearman server and tell every server
// what can this worker do.
func (worker *Worker) Ready() (err error) {
if len(worker.agents) == 0 {
return ErrNoneAgents
}
if len(worker.funcs) == 0 {
return ErrNoneFuncs
}
for _, a := range worker.agents {
if err = a.Connect(); err != nil {
return
}
}
for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout)
}
worker.ready = true
return
}
// Work start main loop (blocking)
// Most of time, this should be evaluated in goroutine.
func (worker *Worker) Work() { func (worker *Worker) Work() {
if !worker.ready { defer func() {
// didn't run Ready beforehand, so we'll have to do it: worker.running = false
err := worker.Ready() for _, v := range worker.agents {
if err != nil { v.Close()
panic(err) }
} }()
} for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout)
worker.Lock() }
worker.running = true worker.running = true
worker.Unlock() for _, v := range worker.agents {
go v.Work()
for _, a := range worker.agents { }
a.Grab() ok := true
} var job *Job
// 执行任务(阻塞) for ok {
var inpack *inPack if job, ok = <-worker.in; ok {
for inpack = range worker.in { go func() {
worker.handleInPack(inpack) defer job.Close()
} switch job.DataType {
// 关闭Worker进程后 等待任务完成后退出 case common.ERROR:
worker.Lock() _, err := common.GetError(job.Data)
leftJobNum := int(worker.jobLeftNum) worker.err(err)
worker.Unlock() case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
if worker.leftJobs != nil { if err := worker.exec(job); err != nil {
for i := 0; i < leftJobNum; i++ { worker.err(err)
<-worker.leftJobs }
} default:
} worker.handleJob(job)
worker.Reset() }
worker.close() }()
}
}
} }
// custome handling warper // job handler
func (worker *Worker) customeHandler(inpack *inPack) { func (worker *Worker) handleJob(job *Job) {
if worker.JobHandler != nil { if worker.JobHandler != nil {
if err := worker.JobHandler(inpack); err != nil { if err := worker.JobHandler(job); err != nil {
worker.err(err) worker.err(err)
} }
} }
} }
// Close connection and exit main loop // Close.
func (worker *Worker) Close() { func (worker *Worker) Close() {
worker.Lock() close(worker.in)
defer worker.Unlock() if worker.limit != nil {
if worker.running == true && worker.closed == nil { close(worker.limit)
worker.closed = make(chan struct{}, 1) }
worker.closed <- struct{}{}
worker.running = false
close(worker.in)
// 创建关闭后执行中的任务列表
if worker.jobLeftNum != 0 {
worker.leftJobs = make(chan struct{}, worker.jobLeftNum+int64(len(worker.in)))
}
}
} }
func (worker *Worker) close() { // Send a something out, get the samething back.
for _, a := range worker.agents {
a.Close()
}
}
// Echo
func (worker *Worker) Echo(data []byte) { func (worker *Worker) Echo(data []byte) {
outpack := getOutPack() job := newJob(common.REQ, common.ECHO_REQ, data)
outpack.dataType = dtEchoReq worker.broadcast(job)
outpack.data = data
worker.broadcast(outpack)
} }
// Reset removes all of functions. // Remove all of functions.
// Both from the worker and job servers. // Both from the worker or job servers.
func (worker *Worker) Reset() { func (worker *Worker) Reset() {
outpack := getOutPack() job := newJob(common.REQ, common.RESET_ABILITIES, nil)
outpack.dataType = dtResetAbilities worker.broadcast(job)
worker.broadcast(outpack) worker.funcs = make(JobFuncs)
worker.funcs = make(jobFuncs)
} }
// Set the worker's unique id. // Set the worker's unique id.
func (worker *Worker) SetId(id string) { func (worker *Worker) SetId(id string) {
worker.Id = id worker.Id = id
outpack := getOutPack() job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id))
outpack.dataType = dtSetClientId worker.broadcast(job)
outpack.data = []byte(id)
worker.broadcast(outpack)
} }
func (worker *Worker) incrExecJobNum() int64 { // Execute the job. And send back the result.
worker.Lock() func (worker *Worker) exec(job *Job) (err error) {
defer worker.Unlock() defer func() {
worker.jobLeftNum++ if r := recover(); r != nil {
return worker.jobLeftNum if e, ok := r.(error); ok {
err = e
} else {
err = common.ErrUnknown
}
}
} ()
if worker.limit != nil {
<-worker.limit
defer func() {
worker.limit <- true
}()
}
var limit int
if job.DataType == common.JOB_ASSIGN {
limit = 3
} else {
limit = 4
}
jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit)
job.Handle = string(jobdata[0])
funcname := string(jobdata[1])
if job.DataType == common.JOB_ASSIGN {
job.Data = jobdata[2]
} else {
job.UniqueId = string(jobdata[2])
job.Data = jobdata[3]
}
f, ok := worker.funcs[funcname]
if !ok {
return common.Errorf("The function does not exist: %s", funcname)
}
var r *result
if f.timeout == 0 {
d, e := f.f(job)
r = &result{data:d, err: e}
} else {
r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second)
}
var datatype uint32
if r.err == nil {
datatype = common.WORK_COMPLETE
} else {
if r.data == nil {
datatype = common.WORK_FAIL
} else {
datatype = common.WORK_EXCEPTION
}
err = r.err
}
job.magicCode = common.REQ
job.DataType = datatype
job.Data = r.data
job.agent.WriteJob(job)
return
} }
func (worker *Worker) decrExecJobNum() int64 { func (worker *Worker) removeAgent(a *agent) {
worker.Lock() for k, v := range worker.agents {
defer worker.Unlock() if v == a {
worker.jobLeftNum-- worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...)
if worker.jobLeftNum < 0 { }
worker.jobLeftNum = 0 }
} if len(worker.agents) == 0 {
return worker.jobLeftNum worker.err(common.ErrNoActiveAgent)
}
} }
// inner job executing
func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() {
if worker.limit != nil {
<-worker.limit
}
if r := recover(); r != nil {
if e, ok := r.(error); ok {
err = e
} else {
err = ErrUnknown
}
}
}()
f, ok := worker.funcs[inpack.fn]
if !ok {
return fmt.Errorf("The function does not exist: %s", inpack.fn)
}
var r *result
if f.timeout == 0 {
d, e := f.f(inpack)
r = &result{data: d, err: e}
} else {
r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
}
//if worker.running {
outpack := getOutPack()
if r.err == nil {
outpack.dataType = dtWorkComplete
} else {
if len(r.data) == 0 {
outpack.dataType = dtWorkFail
} else {
outpack.dataType = dtWorkException
}
err = r.err
}
outpack.handle = inpack.handle
outpack.data = r.data
_ = inpack.a.Write(outpack)
if worker.leftJobs != nil {
worker.leftJobs <- struct{}{}
}
//}
return
}
func (worker *Worker) reRegisterFuncsForAgent(a *agent) {
worker.Lock()
defer worker.Unlock()
for funcname, f := range worker.funcs {
outpack := prepFuncOutpack(funcname, f.timeout)
a.write(outpack)
}
}
// inner result
type result struct { type result struct {
data []byte data []byte
err error err error
} }
// executing timer func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) {
func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) { rslt := make(chan *result)
rslt := make(chan *result) defer close(rslt)
defer close(rslt) go func() {
go func() { defer func() {recover()}()
defer func() { recover() }() d, e := f(job)
d, e := f(job) rslt <- &result{data: d, err: e}
rslt <- &result{data: d, err: e} }()
}() select {
select { case r = <-rslt:
case r = <-rslt: case <-time.After(timeout):
case <-time.After(timeout): go job.cancel()
return &result{err: ErrTimeOut} return &result{err:common.ErrExecTimeOut}
} }
return r return r
}
// Error type passed when a worker connection disconnects
type WorkerDisconnectError struct {
err error
agent *agent
}
func (e *WorkerDisconnectError) Error() string {
return e.err.Error()
}
// Responds to the error by asking the worker to reconnect
func (e *WorkerDisconnectError) Reconnect() (err error) {
return e.agent.reconnect()
}
// Which server was this for?
func (e *WorkerDisconnectError) Server() (net string, addr string) {
return e.agent.net, e.agent.addr
} }

View File

@ -1,59 +0,0 @@
package worker
import (
"fmt"
"sync"
"testing"
)
func TestWorkerRace(t *testing.T) {
// from example worker
// An example of worker
w := New(Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
// A function for handling jobs
foobar := func(job Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
// tear down
w.Close()
}

View File

@ -1,269 +1,44 @@
package worker package worker
import ( import "testing"
"bytes"
"flag"
"os"
"sync"
"testing"
"time"
)
var ( var worker *Worker
worker *Worker
runIntegrationTests bool
)
func init() { func init() {
worker = New(Unlimited) worker = New(Unlimited)
}
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
code := m.Run()
os.Exit(code)
}
func TestWorkerErrNoneAgents(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
err := worker.Ready()
if err != ErrNoneAgents {
t.Error("ErrNoneAgents expected.")
}
} }
func TestWorkerAddServer(t *testing.T) { func TestWorkerAddServer(t *testing.T) {
if !runIntegrationTests { t.Log("Add local server 127.0.0.1:4730.")
t.Skip("To run this test, use: go test -integration") if err := worker.AddServer("127.0.0.1:4730"); err != nil {
} t.Error(err)
t.Log("Add local server 127.0.0.1:4730.") }
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
if l := len(worker.agents); l != 1 { if l := len(worker.agents); l != 1 {
t.Log(worker.agents) t.Log(worker.agents)
t.Error("The length of server list should be 1.") t.Error("The length of server list should be 1.")
} }
} }
func TestWorkerErrNoneFuncs(t *testing.T) { func foobar(job *Job) ([]byte, error) {
if !runIntegrationTests { return nil, nil
t.Skip("To run this test, use: go test -integration")
}
err := worker.Ready()
if err != ErrNoneFuncs {
t.Error("ErrNoneFuncs expected.")
}
}
func foobar(job Job) ([]byte, error) {
return nil, nil
} }
func TestWorkerAddFunction(t *testing.T) { func TestWorkerAddFunction(t *testing.T) {
if !runIntegrationTests { if err := worker.AddFunc("foobar", foobar, 0); err != nil {
t.Skip("To run this test, use: go test -integration") t.Error(err)
} }
if err := worker.AddFunc("foobar", foobar, 0); err != nil { if err := worker.AddFunc("timeout", foobar, 5); err != nil {
t.Error(err) t.Error(err)
} }
if err := worker.AddFunc("timeout", foobar, 5); err != nil { if l := len(worker.funcs); l != 2 {
t.Error(err) t.Log(worker.funcs)
} t.Errorf("The length of function map should be %d.", 2)
if l := len(worker.funcs); l != 2 { }
t.Log(worker.funcs)
t.Errorf("The length of function map should be %d.", 2)
}
} }
func TestWorkerRemoveFunc(t *testing.T) { func TestWorkerRemoveFunc(t *testing.T) {
if !runIntegrationTests { if err := worker.RemoveFunc("foobar"); err != nil {
t.Skip("To run this test, use: go test -integration") t.Error(err)
} }
if err := worker.RemoveFunc("foobar"); err != nil {
t.Error(err)
}
}
func TestWork(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
var wg sync.WaitGroup
worker.JobHandler = func(job Job) error {
t.Logf("%s", job.Data())
wg.Done()
return nil
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
worker.Echo([]byte("Hello"))
wg.Wait()
}
func TestLargeDataWork(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
worker := New(Unlimited)
defer worker.Close()
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
worker.Ready()
l := 5714
var wg sync.WaitGroup
bigdataHandler := func(job Job) error {
defer wg.Done()
if len(job.Data()) != l {
t.Errorf("expected length %d. got %d.", l, len(job.Data()))
}
return nil
}
if err := worker.AddFunc("bigdata", foobar, 0); err != nil {
defer wg.Done()
t.Error(err)
}
worker.JobHandler = bigdataHandler
worker.ErrorHandler = func(err error) {
t.Fatal("shouldn't have received an error")
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
// var cli *client.Client
// var err error
// if cli, err = client.New(client.Network, "127.0.0.1:4730"); err != nil {
// t.Fatal(err)
// }
// cli.ErrorHandler = func(e error) {
// t.Error(e)
// }
// _, err = cli.Do("bigdata", bytes.Repeat([]byte("a"), l), client.JobLow, func(res *client.Response) {
// })
// if err != nil {
// t.Error(err)
// }
worker.Echo(bytes.Repeat([]byte("a"), l))
wg.Wait()
}
func TestWorkerClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
worker.Close()
}
func TestWorkWithoutReady(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
other_worker := New(Unlimited)
if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
if err := other_worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil {
t.Error(err)
}
timeout := make(chan bool, 1)
done := make(chan bool, 1)
other_worker.JobHandler = func(j Job) error {
if !other_worker.ready {
t.Error("Worker not ready as expected")
}
done <- true
return nil
}
go func() {
time.Sleep(5 * time.Second)
timeout <- true
}()
go func() {
other_worker.Work()
}()
// With the all-in-one Work() we don't know if the
// worker is ready at this stage so we may have to wait a sec:
go func() {
tries := 5
for tries > 0 {
if other_worker.ready {
other_worker.Echo([]byte("Hello"))
break
}
// still waiting for it to be ready..
time.Sleep(250 * time.Millisecond)
tries--
}
}()
// determine if we've finished or timed out:
select {
case <-timeout:
t.Error("Test timed out waiting for the worker")
case <-done:
}
}
func TestWorkWithoutReadyWithPanic(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
other_worker := New(Unlimited)
timeout := make(chan bool, 1)
done := make(chan bool, 1)
// Going to work with no worker setup.
// when Work (hopefully) calls Ready it will get an error which should cause it to panic()
go func() {
defer func() {
if err := recover(); err != nil {
done <- true
return
}
t.Error("Work should raise a panic.")
done <- true
}()
other_worker.Work()
}()
go func() {
time.Sleep(2 * time.Second)
timeout <- true
}()
select {
case <-timeout:
t.Error("Test timed out waiting for the worker")
case <-done:
}
} }