Compare commits

..

No commits in common. "master" and "0.0.1" have entirely different histories.

43 changed files with 1269 additions and 3276 deletions

22
.gitignore vendored
View File

@ -1,22 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe

2
.hgtags Normal file
View File

@ -0,0 +1,2 @@
b68aee2a48811a1bee5994b56437c393c6fb2f5b 2011-05-24
0dc8bc71d7e895caf5803c1905bf07a823462fba native.start

View File

@ -1,7 +0,0 @@
language: go
go:
- 1.4
before_install:
- sudo apt-get remove -y gearman-job-server
- sudo apt-get install -y gearman-job-server

131
README.md
View File

@ -1,122 +1,53 @@
Gearman-Go # Gearman API for golang
==========
This module is a [Gearman](http://gearman.org/) API for the [Go Programming Language](http://golang.org). This module is Gearman API for golang. It was implemented a native
The protocols were written in pure Go. It contains two sub-packages: protocol for both worker and client API.
The client package is used for sending jobs to the Gearman job server, Copyright 2012 Xing Xing <mikespook@gmail.com> All rights reserved.
and getting responses from the server. Use of this source code is governed by a MIT license that can be found
in the LICENSE file.
"github.com/mikespook/gearman-go/client" # INSTALL
The worker package will help developers in developing Gearman worker This will install the client:
service easily.
"github.com/mikespook/gearman-go/worker" > $ go get bitbucket.org/mikespook/gearman-go/gearman/client
[![Build Status](https://travis-ci.org/mikespook/gearman-go.png?branch=master)](https://travis-ci.org/mikespook/gearman-go)
[![GoDoc](https://godoc.org/github.com/mikespook/gearman-go?status.png)](https://godoc.org/github.com/mikespook/gearman-go)
Install
=======
Install the client package:
> $ go get github.com/mikespook/gearman-go/client
Install the worker package: This will install the worker:
> $ go get github.com/mikespook/gearman-go/worker > $ go get bitbucket.org/mikespook/gearman-go/gearman/worker
Both of them: This will install the client and the worker automatically:
> $ go get github.com/mikespook/gearman-go > $ go get bitbucket.org/mikespook/gearman-go
Usage # SAMPLE OF USAGE
=====
## Worker ## Worker
```go > $ cd example
// Limit number of concurrent jobs execution. >
// Use worker.Unlimited (0) if you want no limitation. > $ go build worker
w := worker.New(worker.OneByOne) >
w.ErrHandler = func(e error) { > $ ./worker
log.Println(e)
}
w.AddServer("127.0.0.1:4730")
// Use worker.Unlimited (0) if you want no timeout
w.AddFunc("ToUpper", ToUpper, worker.Unlimited)
// This will give a timeout of 5 seconds
w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work()
```
## Client ## Client
```go > $ cd example
// ... >
c, err := client.New("tcp4", "127.0.0.1:4730") > $ go build client
// ... error handling >
defer c.Close() > $ ./client
c.ErrorHandler = func(e error) {
log.Println(e)
}
echo := []byte("Hello\x00 world")
echomsg, err := c.Echo(echo)
// ... error handling
log.Println(string(echomsg))
jobHandler := func(resp *client.Response) {
log.Printf("%s", resp.Data)
}
handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler)
// ...
```
Branches # Code format
========
Version 0.x means: _It is far far away from stable._ > $ gofmt -spaces=true -tabwidth=4 -w=true -tabindent=false $(DIR)
__Use at your own risk!__ # Contacts
* master current usable version xingxing<mikespook@gmail.com>
* 0.2-dev Refactoring a lot of things
* 0.1-testing Old API and some known issues, eg. [issue-14](https://github.com/mikespook/gearman-go/issues/14)
Contributors http://mikespook.com
============
Great thanks to all of you for your support and interest! http://twitter.com/mikespook
(_Alphabetic order_)
* [Alex Zylman](https://github.com/azylman)
* [C.R. Kirkwood-Watts](https://github.com/kirkwood)
* [Damian Gryski](https://github.com/dgryski)
* [Gabriel Cristian Alecu](https://github.com/AzuraMeta)
* [Graham Barr](https://github.com/gbarr)
* [Ingo Oeser](https://github.com/nightlyone)
* [jake](https://github.com/jbaikge)
* [Joe Higton](https://github.com/draxil)
* [Jonathan Wills](https://github.com/runningwild)
* [Kevin Darlington](https://github.com/kdar)
* [miraclesu](https://github.com/miraclesu)
* [Paul Mach](https://github.com/paulmach)
* [Randall McPherson](https://github.com/rlmcpherson)
* [Sam Grimee](https://github.com/sgrimee)
Maintainer
==========
* [Xing Xing](http://mikespook.com) &lt;<mikespook@gmail.com>&gt; [@Twitter](http://twitter.com/mikespook)
Open Source - MIT Software License
==================================
See LICENSE.

View File

@ -1,360 +0,0 @@
// The client package helps developers connect to Gearmand, send
// jobs and fetch result.
package client
import (
"bufio"
"net"
"sync"
"time"
)
var (
DefaultTimeout time.Duration = time.Second
)
// One client connect to one server.
// Use Pool for multi-connections.
type Client struct {
sync.Mutex
net, addr string
innerHandler *responseHandlerMap
in chan *Response
conn net.Conn
rw *bufio.ReadWriter
ResponseTimeout time.Duration // response timeout for do()
ErrorHandler ErrorHandler
}
type responseHandlerMap struct {
sync.Mutex
holder map[string]handledResponse
}
type handledResponse struct {
internal ResponseHandler // internal handler, always non-nil
external ResponseHandler // handler passed in from (*Client).Do, sometimes nil
}
func newResponseHandlerMap() *responseHandlerMap {
return &responseHandlerMap{holder: make(map[string]handledResponse, queueSize)}
}
func (r *responseHandlerMap) remove(key string) {
r.Lock()
delete(r.holder, key)
r.Unlock()
}
func (r *responseHandlerMap) getAndRemove(key string) (handledResponse, bool) {
r.Lock()
rh, b := r.holder[key]
delete(r.holder, key)
r.Unlock()
return rh, b
}
func (r *responseHandlerMap) putWithExternalHandler(key string, internal, external ResponseHandler) {
r.Lock()
r.holder[key] = handledResponse{internal: internal, external: external}
r.Unlock()
}
func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
r.putWithExternalHandler(key, rh, nil)
}
// New returns a client.
func New(network, addr string) (client *Client, err error) {
client = &Client{
net: network,
addr: addr,
innerHandler: newResponseHandlerMap(),
in: make(chan *Response, queueSize),
ResponseTimeout: DefaultTimeout,
}
client.conn, err = net.Dial(client.net, client.addr)
if err != nil {
return
}
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
bufio.NewWriter(client.conn))
go client.readLoop()
go client.processLoop()
return
}
func (client *Client) write(req *request) (err error) {
var n int
buf := req.Encode()
for i := 0; i < len(buf); i += n {
n, err = client.rw.Write(buf[i:])
if err != nil {
return
}
}
return client.rw.Flush()
}
func (client *Client) read(length int) (data []byte, err error) {
n := 0
buf := getBuffer(bufferSize)
// read until data can be unpacked
for i := length; i > 0 || len(data) < minPacketLength; i -= n {
if n, err = client.rw.Read(buf); err != nil {
return
}
data = append(data, buf[0:n]...)
if n < bufferSize {
break
}
}
return
}
func (client *Client) readLoop() {
defer close(client.in)
var data, leftdata []byte
var err error
var resp *Response
ReadLoop:
for client.conn != nil {
if data, err = client.read(bufferSize); err != nil {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Timeout() {
client.err(err)
}
if opErr.Temporary() {
continue
}
break
}
client.err(err)
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the client should close the conection
// and reconnect to job server.
client.Close()
client.conn, err = net.Dial(client.net, client.addr)
if err != nil {
client.err(err)
break
}
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
bufio.NewWriter(client.conn))
continue
}
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
leftdata = nil
}
for {
l := len(data)
if l < minPacketLength { // not enough data
leftdata = data
continue ReadLoop
}
if resp, l, err = decodeResponse(data); err != nil {
leftdata = data[l:]
continue ReadLoop
} else {
client.in <- resp
}
data = data[l:]
if len(data) > 0 {
continue
}
break
}
}
}
func (client *Client) processLoop() {
rhandlers := map[string]ResponseHandler{}
for resp := range client.in {
switch resp.DataType {
case dtError:
client.err(getError(resp.Data))
case dtStatusRes:
client.handleInner("s"+resp.Handle, resp, nil)
case dtJobCreated:
client.handleInner("c", resp, rhandlers)
case dtEchoRes:
client.handleInner("e", resp, nil)
case dtWorkData, dtWorkWarning, dtWorkStatus:
if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
}
case dtWorkComplete, dtWorkFail, dtWorkException:
if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
delete(rhandlers, resp.Handle)
}
}
}
}
func (client *Client) err(e error) {
if client.ErrorHandler != nil {
client.ErrorHandler(e)
}
}
func (client *Client) handleInner(key string, resp *Response, rhandlers map[string]ResponseHandler) {
if h, ok := client.innerHandler.getAndRemove(key); ok {
if h.external != nil && resp.Handle != "" {
rhandlers[resp.Handle] = h.external
}
h.internal(resp)
}
}
type handleOrError struct {
handle string
err error
}
func (client *Client) do(funcname string, data []byte,
flag uint32, h ResponseHandler, id string) (handle string, err error) {
if len(id) == 0 {
return "", ErrInvalidId
}
if client.conn == nil {
return "", ErrLostConn
}
var result = make(chan handleOrError, 1)
client.Lock()
defer client.Unlock()
client.innerHandler.putWithExternalHandler("c", func(resp *Response) {
if resp.DataType == dtError {
err = getError(resp.Data)
result <- handleOrError{"", err}
return
}
handle = resp.Handle
result <- handleOrError{handle, nil}
}, h)
req := getJob(id, []byte(funcname), data)
req.DataType = flag
if err = client.write(req); err != nil {
client.innerHandler.remove("c")
return
}
var timer = time.After(client.ResponseTimeout)
select {
case ret := <-result:
return ret.handle, ret.err
case <-timer:
client.innerHandler.remove("c")
return "", ErrLostConn
}
return
}
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (handle string, err error) {
handle, err = client.DoWithId(funcname, data, flag, h, IdGen.Id())
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) {
handle, err = client.DoBgWithId(funcname, data, flag, IdGen.Id())
return
}
// Status gets job status from job server.
func (client *Client) Status(handle string) (status *Status, err error) {
if client.conn == nil {
return nil, ErrLostConn
}
var mutex sync.Mutex
mutex.Lock()
client.innerHandler.put("s"+handle, func(resp *Response) {
defer mutex.Unlock()
var err error
status, err = resp._status()
if err != nil {
client.err(err)
}
})
req := getRequest()
req.DataType = dtGetStatus
req.Data = []byte(handle)
client.write(req)
mutex.Lock()
return
}
// Echo.
func (client *Client) Echo(data []byte) (echo []byte, err error) {
if client.conn == nil {
return nil, ErrLostConn
}
var mutex sync.Mutex
mutex.Lock()
client.innerHandler.put("e", func(resp *Response) {
echo = resp.Data
mutex.Unlock()
})
req := getRequest()
req.DataType = dtEchoReq
req.Data = data
client.write(req)
mutex.Lock()
return
}
// Close connection
func (client *Client) Close() (err error) {
client.Lock()
defer client.Unlock()
if client.conn != nil {
err = client.conn.Close()
client.conn = nil
}
return
}
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoWithId(funcname string, data []byte,
flag byte, h ResponseHandler, id string) (handle string, err error) {
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype, h, id)
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBgWithId(funcname string, data []byte,
flag byte, id string) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype, nil, id)
return
}

View File

@ -1,377 +0,0 @@
package client
import (
"crypto/md5"
"encoding/hex"
"errors"
"flag"
"fmt"
"os"
"testing"
"time"
)
const (
TestStr = "Hello world"
)
var (
client *Client
runIntegrationTests bool
)
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
flag.Parse()
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
code := m.Run()
os.Exit(code)
}
func TestClientAddServer(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add local server 127.0.0.1:4730")
var err error
if client, err = New(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
client.ErrorHandler = func(e error) {
t.Log(e)
}
}
func TestClientEcho(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
echo, err := client.Echo([]byte(TestStr))
if err != nil {
t.Error(err)
return
}
if string(echo) != TestStr {
t.Errorf("Echo error, %s expected, %s got", TestStr, echo)
return
}
}
func TestClientDoBg(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
handle, err := client.DoBg("ToUpper", []byte("abcdef"), JobLow)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
id := ""
_, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
handle, err := client.Do("ToUpper", []byte("abcdef"),
JobLow, jobHandler)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
id := ""
_, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDoWithIdCheckSameHandle(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123,categoryId:1}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 != handle2 {
t.Error("expecting the same handle when using the same id on the same Job name")
}
}
func TestClientDoWithIdCheckDifferentHandleOnDifferentJobs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("DeleteProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 == handle2 {
t.Error("expecting different handles because there are different job names")
}
}
func TestClientMultiDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
// This integration test requires that examples/pl/worker_multi.pl be running.
//
// Test invocation is:
// go test -integration -timeout 10s -run '^TestClient(AddServer|MultiDo)$'
//
// Send 1000 requests to go through all race conditions
const nreqs = 1000
errCh := make(chan error)
gotCh := make(chan string, nreqs)
olderrh := client.ErrorHandler
client.ErrorHandler = func(e error) { errCh <- e }
client.ResponseTimeout = 5 * time.Second
defer func() { client.ErrorHandler = olderrh }()
nextJobCh := make(chan struct{})
defer close(nextJobCh)
go func() {
for range nextJobCh {
start := time.Now()
handle, err := client.Do("PerlToUpper", []byte("abcdef"), JobNormal, func(r *Response) { gotCh <- string(r.Data) })
if err == ErrLostConn && time.Since(start) > client.ResponseTimeout {
errCh <- errors.New("Impossible 'lost conn', deadlock bug detected")
} else if err != nil {
errCh <- err
}
if handle == "" {
errCh <- errors.New("Handle is empty.")
}
}
}()
for i := 0; i < nreqs; i++ {
select {
case err := <-errCh:
t.Fatal(err)
case nextJobCh <- struct{}{}:
}
}
remaining := nreqs
for remaining > 0 {
select {
case err := <-errCh:
t.Fatal(err)
case got := <-gotCh:
if got != "ABCDEF" {
t.Error("Unexpected response from PerlDoUpper: ", got)
}
remaining--
t.Logf("%d response remaining", remaining)
}
}
}
func TestClientStatus(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
status, err := client.Status("handle not exists")
if err != nil {
t.Error(err)
return
}
if status.Known {
t.Errorf("The job (%s) shouldn't be known.", status.Handle)
return
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
return
}
handle, err := client.Do("Delay5sec", []byte("abcdef"), JobLow, nil)
if err != nil {
t.Error(err)
return
}
status, err = client.Status(handle)
if err != nil {
t.Error(err)
return
}
if !status.Known {
t.Errorf("The job (%s) should be known.", status.Handle)
return
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
return
}
}
func TestClientClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := client.Close(); err != nil {
t.Error(err)
}
}

View File

@ -1,75 +0,0 @@
package client
const (
Network = "tcp"
// queue size
queueSize = 8
// read buffer size
bufferSize = 8192
// min packet length
minPacketLength = 12
// \x00REQ
req = 5391697
reqStr = "\x00REQ"
// \x00RES
res = 5391699
resStr = "\x00RES"
// package data type
dtCanDo = 1
dtCantDo = 2
dtResetAbilities = 3
dtPreSleep = 4
dtNoop = 6
dtJobCreated = 8
dtGrabJob = 9
dtNoJob = 10
dtJobAssign = 11
dtWorkStatus = 12
dtWorkComplete = 13
dtWorkFail = 14
dtGetStatus = 15
dtEchoReq = 16
dtEchoRes = 17
dtError = 19
dtStatusRes = 20
dtSetClientId = 22
dtCanDoTimeout = 23
dtAllYours = 24
dtWorkException = 25
dtWorkData = 28
dtWorkWarning = 29
dtGrabJobUniq = 30
dtJobAssignUniq = 31
dtSubmitJob = 7
dtSubmitJobBg = 18
dtSubmitJobHigh = 21
dtSubmitJobHighBg = 32
dtSubmitJobLow = 33
dtSubmitJobLowBg = 34
WorkComplate = dtWorkComplete
WorkComplete = dtWorkComplete
WorkData = dtWorkData
WorkStatus = dtWorkStatus
WorkWarning = dtWorkWarning
WorkFail = dtWorkFail
WorkException = dtWorkException
)
const (
// Job type
JobNormal = iota
// low level
JobLow
// high level
JobHigh
)
func getBuffer(l int) (buf []byte) {
// TODO add byte buffer pool
buf = make([]byte, l)
return
}

View File

@ -1,31 +0,0 @@
package client
import (
"bytes"
"errors"
"fmt"
)
var (
ErrWorkWarning = errors.New("Work warning")
ErrInvalidData = errors.New("Invalid data")
ErrInvalidId = errors.New("Invalid ID")
ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type")
ErrLostConn = errors.New("Lost connection with Gearmand")
)
// Extract the error message
func getError(data []byte) (err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %v", data)
return
}
err = fmt.Errorf("%s: %s", rel[0], rel[1])
return
}
// Error handler
type ErrorHandler func(error)

View File

@ -1,42 +0,0 @@
package client
import (
"strconv"
"sync/atomic"
"time"
)
var (
// Global ID generator
// Default is an autoincrement ID generator
IdGen IdGenerator
)
func init() {
IdGen = NewAutoIncId()
}
// ID generator interface. Users can implament this for
// their own generator.
type IdGenerator interface {
Id() string
}
// AutoIncId
type autoincId struct {
value int64
}
func (ai *autoincId) Id() string {
next := atomic.AddInt64(&ai.value, 1)
return strconv.FormatInt(next, 10)
}
// NewAutoIncId returns an autoincrement ID generator
func NewAutoIncId() IdGenerator {
// we'll consider the nano fraction of a second at startup unique
// and count up from there.
return &autoincId{
value: int64(time.Now().Nanosecond()) << 32,
}
}

View File

@ -1,18 +0,0 @@
package client
import (
"testing"
)
func TestAutoInc(t *testing.T) {
ai := NewAutoIncId()
previous := ai.Id()
for i := 0; i < 10; i++ {
id := ai.Id()
if id == previous {
t.Errorf("Id not unique, previous and current %s", id)
}
previous = id
}
}

View File

@ -1,165 +0,0 @@
package client
import (
"errors"
"math/rand"
"sync"
)
const (
poolSize = 10
)
var (
ErrNotFound = errors.New("Server Not Found")
)
type PoolClient struct {
*Client
Rate int
mutex sync.Mutex
}
type SelectionHandler func(map[string]*PoolClient, string) string
func SelectWithRate(pool map[string]*PoolClient,
last string) (addr string) {
total := 0
for _, item := range pool {
total += item.Rate
if rand.Intn(total) < item.Rate {
return item.addr
}
}
return last
}
func SelectRandom(pool map[string]*PoolClient,
last string) (addr string) {
r := rand.Intn(len(pool))
i := 0
for k, _ := range pool {
if r == i {
return k
}
i++
}
return last
}
type Pool struct {
SelectionHandler SelectionHandler
ErrorHandler ErrorHandler
Clients map[string]*PoolClient
last string
mutex sync.Mutex
}
// NewPool returns a new pool.
func NewPool() (pool *Pool) {
return &Pool{
Clients: make(map[string]*PoolClient, poolSize),
SelectionHandler: SelectWithRate,
}
}
// Add a server with rate.
func (pool *Pool) Add(net, addr string, rate int) (err error) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
var item *PoolClient
var ok bool
if item, ok = pool.Clients[addr]; ok {
item.Rate = rate
} else {
var client *Client
client, err = New(net, addr)
if err == nil {
item = &PoolClient{Client: client, Rate: rate}
pool.Clients[addr] = item
}
}
return
}
// Remove a server.
func (pool *Pool) Remove(addr string) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
delete(pool.Clients, addr)
}
func (pool *Pool) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (addr, handle string, err error) {
client := pool.selectServer()
client.Lock()
defer client.Unlock()
handle, err = client.Do(funcname, data, flag, h)
addr = client.addr
return
}
func (pool *Pool) DoBg(funcname string, data []byte,
flag byte) (addr, handle string, err error) {
client := pool.selectServer()
client.Lock()
defer client.Unlock()
handle, err = client.DoBg(funcname, data, flag)
addr = client.addr
return
}
// Status gets job status from job server.
// !!!Not fully tested.!!!
func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
if client, ok := pool.Clients[addr]; ok {
client.Lock()
defer client.Unlock()
status, err = client.Status(handle)
} else {
err = ErrNotFound
}
return
}
// Send a something out, get the samething back.
func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) {
var client *PoolClient
if addr == "" {
client = pool.selectServer()
} else {
var ok bool
if client, ok = pool.Clients[addr]; !ok {
err = ErrNotFound
return
}
}
client.Lock()
defer client.Unlock()
echo, err = client.Echo(data)
return
}
// Close
func (pool *Pool) Close() (err map[string]error) {
err = make(map[string]error)
for _, c := range pool.Clients {
err[c.addr] = c.Close()
}
return
}
// selecting server
func (pool *Pool) selectServer() (client *PoolClient) {
for client == nil {
addr := pool.SelectionHandler(pool.Clients, pool.last)
var ok bool
if client, ok = pool.Clients[addr]; ok {
pool.last = addr
break
}
}
return
}

View File

@ -1,139 +0,0 @@
package client
import (
"testing"
)
var (
pool = NewPool()
)
func TestPoolAdd(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add servers")
c := 2
if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil {
t.Fatal(err)
}
if err := pool.Add("tcp4", "127.0.1.1:4730", 1); err != nil {
t.Log(err)
c -= 1
}
if len(pool.Clients) != c {
t.Errorf("%d servers expected, %d got.", c, len(pool.Clients))
}
}
func TestPoolEcho(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
echo, err := pool.Echo("", []byte(TestStr))
if err != nil {
t.Error(err)
return
}
if string(echo) != TestStr {
t.Errorf("Invalid echo data: %s", echo)
return
}
_, err = pool.Echo("not exists", []byte(TestStr))
if err != ErrNotFound {
t.Errorf("ErrNotFound expected, got %s", err)
}
}
func TestPoolDoBg(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
addr, handle, err := pool.DoBg("ToUpper",
[]byte("abcdef"), JobLow)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
}
}
func TestPoolDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
addr, handle, err := pool.Do("ToUpper",
[]byte("abcdef"), JobLow, jobHandler)
if err != nil {
t.Error(err)
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
}
}
func TestPoolStatus(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
status, err := pool.Status("127.0.0.1:4730", "handle not exists")
if err != nil {
t.Error(err)
return
}
if status.Known {
t.Errorf("The job (%s) shouldn't be known.", status.Handle)
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
}
addr, handle, err := pool.Do("Delay5sec",
[]byte("abcdef"), JobLow, nil)
if err != nil {
t.Error(err)
return
}
status, err = pool.Status(addr, handle)
if err != nil {
t.Error(err)
return
}
if !status.Known {
t.Errorf("The job (%s) should be known.", status.Handle)
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
}
status, err = pool.Status("not exists", "not exists")
if err != ErrNotFound {
t.Error(err)
return
}
}
func TestPoolClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
return
if err := pool.Close(); err != nil {
t.Error(err)
}
}

View File

@ -1,42 +0,0 @@
package client
import (
"encoding/binary"
)
// Request from client
type request struct {
DataType uint32
Data []byte
}
// Encode a Request to byte slice
func (req *request) Encode() (data []byte) {
l := len(req.Data) // length of data
tl := l + minPacketLength // add 12 bytes head
data = getBuffer(tl)
copy(data[:4], reqStr)
binary.BigEndian.PutUint32(data[4:8], req.DataType)
binary.BigEndian.PutUint32(data[8:12], uint32(l))
copy(data[minPacketLength:], req.Data)
return
}
func getRequest() (req *request) {
// TODO add a pool
req = &request{}
return
}
func getJob(id string, funcname, data []byte) (req *request) {
req = getRequest()
a := len(funcname)
b := len(id)
c := len(data)
l := a + b + c + 2
req.Data = getBuffer(l)
copy(req.Data[0:a], funcname)
copy(req.Data[a+1:a+b+1], []byte(id))
copy(req.Data[a+b+2:], data)
return
}

View File

@ -1,156 +0,0 @@
package client
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
)
// Response handler
type ResponseHandler func(*Response)
// response
type Response struct {
DataType uint32
Data, UID []byte
Handle string
}
// Extract the Response's result.
// if data == nil, err != nil, then worker failing to execute job
// if data != nil, err != nil, then worker has a exception
// if data != nil, err == nil, then worker complate job
// after calling this method, the Response.Handle will be filled
func (resp *Response) Result() (data []byte, err error) {
switch resp.DataType {
case dtWorkFail:
resp.Handle = string(resp.Data)
err = ErrWorkFail
return
case dtWorkException:
err = ErrWorkException
fallthrough
case dtWorkComplete:
data = resp.Data
default:
err = ErrDataType
}
return
}
// Extract the job's update
func (resp *Response) Update() (data []byte, err error) {
if resp.DataType != dtWorkData &&
resp.DataType != dtWorkWarning {
err = ErrDataType
return
}
data = resp.Data
if resp.DataType == dtWorkWarning {
err = ErrWorkWarning
}
return
}
// Decode a job from byte slice
func decodeResponse(data []byte) (resp *Response, l int, err error) {
a := len(data)
if a < minPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %v", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
if a < minPacketLength+dl {
err = fmt.Errorf("Invalid data: %v", data)
return
}
dt := data[minPacketLength : dl+minPacketLength]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %v", data)
return
}
resp = getResponse()
resp.DataType = binary.BigEndian.Uint32(data[4:8])
switch resp.DataType {
case dtJobCreated:
resp.Handle = string(dt)
case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus,
dtWorkComplete, dtWorkException:
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 2 {
resp.Handle = string(s[0])
resp.Data = s[1]
} else {
err = fmt.Errorf("Invalid data: %v", data)
return
}
case dtWorkFail:
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 1 {
resp.Handle = string(s[0])
} else {
err = fmt.Errorf("Invalid data: %v", data)
return
}
case dtEchoRes:
fallthrough
default:
resp.Data = dt
}
l = dl + minPacketLength
return
}
func (resp *Response) Status() (status *Status, err error) {
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
if len(data) != 2 {
err = fmt.Errorf("Invalid data: %v", resp.Data)
return
}
status = &Status{}
status.Handle = resp.Handle
status.Known = true
status.Running = true
status.Numerator, err = strconv.ParseUint(string(data[0]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[0])
return
}
status.Denominator, err = strconv.ParseUint(string(data[1]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[1])
return
}
return
}
// status handler
func (resp *Response) _status() (status *Status, err error) {
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 4)
if len(data) != 4 {
err = fmt.Errorf("Invalid data: %v", resp.Data)
return
}
status = &Status{}
status.Handle = resp.Handle
status.Known = (data[0][0] == '1')
status.Running = (data[1][0] == '1')
status.Numerator, err = strconv.ParseUint(string(data[2]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[2])
return
}
status.Denominator, err = strconv.ParseUint(string(data[3]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[3])
return
}
return
}
func getResponse() (resp *Response) {
// TODO add a pool
resp = &Response{}
return
}

View File

@ -1,11 +0,0 @@
package client
// Status handler
// handle, known, running, numerator, denominator
type StatusHandler func(string, bool, bool, uint64, uint64)
type Status struct {
Handle string
Known, Running bool
Numerator, Denominator uint64
}

46
example/client.go Normal file
View File

@ -0,0 +1,46 @@
package main
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"bitbucket.org/mikespook/gearman-go/gearman/client"
"log"
)
func main() {
client := client.New()
defer client.Close()
if err := client.AddServer("127.0.0.1:4730"); err != nil {
log.Fatalln(err)
}
echo := []byte("Hello\x00 world")
if data, err := client.Echo(echo); err != nil {
log.Fatalln(string(data))
}
handle, err := client.Do("ToUpper", echo, gearman.JOB_NORMAL)
if err != nil {
log.Fatalln(err)
} else {
log.Println(handle)
job := <-client.JobQueue
if data, err := job.Result(); err != nil {
log.Fatalln(err)
} else {
log.Println(string(data))
}
}
known, running, numerator, denominator, err := client.Status(handle)
if err != nil {
log.Fatalln(err)
}
if !known {
log.Println("Unknown")
}
if running {
log.Printf("%g%%\n", float32(numerator)*100/float32(denominator))
} else {
log.Println("Not running")
}
}

View File

@ -1,80 +0,0 @@
package main
import (
"github.com/mikespook/gearman-go/client"
"log"
"os"
"sync"
)
func main() {
// Set the autoinc id generator
// You can write your own id generator
// by implementing IdGenerator interface.
// client.IdGen = client.NewAutoIncId()
c, err := client.New(client.Network, "127.0.0.1:4730")
if err != nil {
log.Fatalln(err)
}
defer c.Close()
c.ErrorHandler = func(e error) {
log.Println(e)
os.Exit(1)
}
echo := []byte("Hello\x00 world")
echomsg, err := c.Echo(echo)
if err != nil {
log.Fatalln(err)
}
log.Println(string(echomsg))
jobHandler := func(resp *client.Response) {
switch resp.DataType {
case client.WorkException:
fallthrough
case client.WorkFail:
fallthrough
case client.WorkComplate:
if data, err := resp.Result(); err == nil {
log.Printf("RESULT: %v\n", data)
} else {
log.Printf("RESULT: %s\n", err)
}
case client.WorkWarning:
fallthrough
case client.WorkData:
if data, err := resp.Update(); err == nil {
log.Printf("UPDATE: %v\n", data)
} else {
log.Printf("UPDATE: %v, %s\n", data, err)
}
case client.WorkStatus:
if data, err := resp.Status(); err == nil {
log.Printf("STATUS: %v\n", data)
} else {
log.Printf("STATUS: %s\n", err)
}
default:
log.Printf("UNKNOWN: %v", resp.Data)
}
}
handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler)
if err != nil {
log.Fatalln(err)
}
status, err := c.Status(handle)
if err != nil {
log.Fatalln(err)
}
log.Printf("%v", *status)
_, err = c.Do("Foobar", echo, client.JobNormal, jobHandler)
if err != nil {
log.Fatalln(err)
}
log.Println("Press Ctrl-C to exit ...")
var mutex sync.Mutex
mutex.Lock()
mutex.Lock()
}

View File

@ -1,33 +0,0 @@
#!/usr/bin/perl
# Runs 20 children that expose "PerlToUpper" before returning the result.
use strict; use warnings;
use constant CHILDREN => 20;
use Time::HiRes qw(usleep);
use Gearman::Worker;
$|++;
my @child_pids;
for (1 .. CHILDREN) {
if (my $pid = fork) {
push @child_pids, $pid;
next;
}
eval {
my $w = Gearman::Worker->new(job_servers => '127.0.0.1:4730');
$w->register_function(PerlToUpper => sub { print "."; uc $_[0]->arg });
$w->work while 1;
};
warn $@ if $@;
exit 0;
}
$SIG{INT} = $SIG{HUP} = sub {
kill 9, @child_pids;
print "\nChildren shut down, gracefully exiting\n";
exit 0;
};
printf "Forked %d children, serving 'PerlToUpper' function to gearman\n", CHILDREN;
sleep;

View File

@ -12,38 +12,10 @@ def check_request_status(job_request):
def main(): def main():
client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730']) client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730'])
try: completed_job_request = client.submit_job("ToUpper", "arbitrary binary data")
completed_job_request = client.submit_job("ToUpper", "arbitrary binary data") check_request_status(completed_job_request)
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("ToUpperTimeOut5", "arbitrary binary data")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("ToUpperTimeOut20", "arbitrary binary data")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("SysInfo", "")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("MemInfo", "")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
if __name__ == '__main__': if __name__ == '__main__':
main() for i in range(100):
main()

61
example/worker.go Normal file
View File

@ -0,0 +1,61 @@
package main
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"bitbucket.org/mikespook/gearman-go/gearman/worker"
"bitbucket.org/mikespook/golib/signal"
"os"
"fmt"
"log"
"strings"
)
func ToUpper(job *worker.WorkerJob) ([]byte, error) {
data := []byte(strings.ToUpper(string(job.Data)))
return data, nil
}
func main() {
w := worker.New(worker.Unlimit)
w.ErrFunc = func(e error) {
log.Println(e)
}
w.AddServer("127.0.0.1:4730")
w.AddFunction("ToUpper", ToUpper, 0)
w.AddFunction("ToUpperTimeOut5", ToUpper, 5)
// Catch the interrupt to exit the working loop.
sh := signal.NewHandler()
sh.Bind(os.Interrupt, func() bool {
w.Close()
return true
})
go sh.Loop()
go func() {
log.Println("start worker")
for {
print("cmd: ")
var str string
fmt.Scan(&str)
switch str {
case "echo":
w.Echo([]byte("Hello world!"))
var job *worker.WorkerJob
for job = <-w.JobQueue; job.DataType != gearman.ECHO_RES; job = <-w.JobQueue {
log.Println(job)
}
log.Println(string(job.Data))
case "quit":
os.Exit(0)
return
case "result":
job := <-w.JobQueue
log.Println(string(job.Data))
default:
log.Println("Unknown command")
}
}
}()
w.Work()
}

View File

@ -1,74 +0,0 @@
package main
import (
"log"
"net"
"os"
"strings"
"time"
"github.com/mikespook/gearman-go/worker"
"github.com/mikespook/golib/signal"
)
func ToUpper(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data())
data := []byte(strings.ToUpper(string(job.Data())))
return data, nil
}
func ToUpperDelay10(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data())
time.Sleep(10 * time.Second)
data := []byte(strings.ToUpper(string(job.Data())))
return data, nil
}
func Foobar(job worker.Job) ([]byte, error) {
log.Printf("Foobar: Data=[%s]\n", job.Data())
for i := 0; i < 10; i++ {
job.SendWarning([]byte{byte(i)})
job.SendData([]byte{byte(i)})
job.UpdateStatus(i+1, 100)
}
return job.Data(), nil
}
func main() {
log.Println("Starting ...")
defer log.Println("Shutdown complete!")
w := worker.New(worker.Unlimited)
defer w.Close()
w.ErrorHandler = func(e error) {
log.Println(e)
if opErr, ok := e.(*net.OpError); ok {
if !opErr.Temporary() {
proc, err := os.FindProcess(os.Getpid())
if err != nil {
log.Println(err)
}
if err := proc.Signal(os.Interrupt); err != nil {
log.Println(err)
}
}
}
}
w.JobHandler = func(job worker.Job) error {
log.Printf("Data=%s\n", job.Data())
return nil
}
w.AddServer("tcp4", "127.0.0.1:4730")
w.AddFunc("Foobar", Foobar, worker.Unlimited)
w.AddFunc("ToUpper", ToUpper, worker.Unlimited)
w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
w.AddFunc("SysInfo", worker.SysInfo, worker.Unlimited)
w.AddFunc("MemInfo", worker.MemInfo, worker.Unlimited)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work()
signal.Bind(os.Interrupt, func() uint { return signal.BreakExit })
signal.Wait()
}

View File

@ -1,19 +1,15 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved. // Copyright 2012 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT // Use of this source code is governed by a MIT
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
/* /*
This module is a Gearman API for the Go Programming Language. This module is Gearman API for golang.
The protocols were written in pure Go. It contains two sub-packages: The protocol was implemented by native way.
The client package is used for sending jobs to the Gearman job server,
and getting responses from the server.
import "github.com/mikespook/gearman-go/client"
The worker package will help developers to develop Gearman's worker
in an easy way.
import "github.com/mikespook/gearman-go/worker"
*/ */
package gearman package gearman
import (
_ "bitbucket.org/mikespook/gearman-go/gearman/client"
_ "bitbucket.org/mikespook/gearman-go/gearman/worker"
)

278
gearman/client/client.go Normal file
View File

@ -0,0 +1,278 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"bytes"
"io"
"net"
"strconv"
"sync"
)
/*
The client side api for gearman.
usage:
client = NewClient()
client.AddServer("127.0.0.1:4730")
handle := client.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
*/
type Client struct {
mutex sync.Mutex
conn net.Conn
incoming chan []byte
JobQueue chan *ClientJob
UId uint32
}
// Create a new client.
func New() (client *Client) {
return &Client{
JobQueue: make(chan *ClientJob, gearman.QUEUE_CAP),
incoming: make(chan []byte, gearman.QUEUE_CAP),
UId:1}
}
// Add a server.
// In this version, one client connect to one job server.
// Sample is better. Plz do the load balancing by your self.
func (client *Client) AddServer(addr string) (err error) {
client.conn, err = net.Dial(gearman.TCP, addr)
if err != nil {
return
}
return
}
// Internal read
func (client *Client) read() (data []byte, err error) {
if len(client.incoming) > 0 {
// incoming queue is not empty
data = <-client.incoming
} else {
// empty queue, read data from socket
for {
buf := make([]byte, gearman.BUFFER_SIZE)
var n int
if n, err = client.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 {
break
}
return
}
data = append(data, buf[0:n]...)
if n < gearman.BUFFER_SIZE {
break
}
}
}
// split package
start, end := 0, 4
tl := len(data)
for i := 0; i < tl; i++ {
if string(data[start:end]) == gearman.RES_STR {
l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]}))
total := l + 12
if total == tl {
return
} else {
client.incoming <- data[total:]
data = data[:total]
return
}
} else {
start++
end++
}
}
err = gearman.ErrInvalidData
return
}
// Read a job from job server.
// This function will return the job, and add it to the job queue.
func (client *Client) ReadJob() (job *ClientJob, err error) {
var rel []byte
if rel, err = client.read(); err != nil {
return
}
if job, err = DecodeClientJob(rel); err != nil {
return
} else {
switch job.DataType {
case gearman.ERROR:
_, err = gearman.GetError(job.Data)
return
case gearman.WORK_DATA, gearman.WORK_WARNING, gearman.WORK_STATUS, gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION:
client.JobQueue <- job
}
}
return
}
// Do the function.
// funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
// and if it is background job: JOB_BG.
// JOB_LOW | JOB_BG means the job is running with low level in background.
func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) {
var datatype uint32
if flag&gearman.JOB_LOW == gearman.JOB_LOW {
if flag&gearman.JOB_BG == gearman.JOB_BG {
datatype = gearman.SUBMIT_JOB_LOW_BG
} else {
datatype = gearman.SUBMIT_JOB_LOW
}
} else if flag&gearman.JOB_HIGH == gearman.JOB_HIGH {
if flag&gearman.JOB_BG == gearman.JOB_BG {
datatype = gearman.SUBMIT_JOB_HIGH_BG
} else {
datatype = gearman.SUBMIT_JOB_HIGH
}
} else if flag&gearman.JOB_BG == gearman.JOB_BG {
datatype = gearman.SUBMIT_JOB_BG
} else {
datatype = gearman.SUBMIT_JOB
}
rel := make([]byte, 0, 1024*64)
rel = append(rel, []byte(funcname)...)
rel = append(rel, '\x00')
client.mutex.Lock()
uid := strconv.Itoa(int(client.UId))
client.UId++
rel = append(rel, []byte(uid)...)
client.mutex.Unlock()
rel = append(rel, '\x00')
rel = append(rel, data...)
if err = client.WriteJob(NewClientJob(gearman.REQ, datatype, rel)); err != nil {
return
}
var job *ClientJob
if job, err = client.readLastJob(gearman.JOB_CREATED); err != nil {
return
}
handle = string(job.Data)
go func() {
if flag&gearman.JOB_BG != gearman.JOB_BG {
for {
if job, err = client.ReadJob(); err != nil {
return
}
switch job.DataType {
case gearman.WORK_DATA, gearman.WORK_WARNING:
case gearman.WORK_STATUS:
case gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION:
return
}
}
}
}()
return
}
// Internal read last job
func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err error) {
for {
if job, err = client.ReadJob(); err != nil {
return
}
if job.DataType == datatype {
break
}
}
if job.DataType != datatype {
err = gearman.ErrDataType
}
return
}
// Get job status from job server.
// !!!Not fully tested.!!!
func (client *Client) Status(handle string) (known, running bool, numerator, denominator uint64, err error) {
if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.GET_STATUS, []byte(handle))); err != nil {
return
}
var job *ClientJob
if job, err = client.readLastJob(gearman.STATUS_RES); err != nil {
return
}
data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
if len(data) != 5 {
err = gearman.ErrInvalidData
return
}
if handle != string(data[0]) {
err = gearman.ErrInvalidData
return
}
known = data[1][0] == '1'
running = data[2][0] == '1'
if numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0); err != nil {
return
}
if denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0); err != nil {
return
}
return
}
// Send a something out, get the samething back.
func (client *Client) Echo(data []byte) (echo []byte, err error) {
if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.ECHO_REQ, data)); err != nil {
return
}
var job *ClientJob
if job, err = client.readLastJob(gearman.ECHO_RES); err != nil {
return
}
echo = job.Data
return
}
// Get the last job.
// the job means a network package.
// Normally, it is the job executed result.
func (client *Client) LastJob() (job *ClientJob) {
if l := len(client.JobQueue); l != 1 {
if l == 0 {
return
}
for i := 0; i < l-1; i++ {
<-client.JobQueue
}
}
return <-client.JobQueue
}
// Send the job to job server.
func (client *Client) WriteJob(job *ClientJob) (err error) {
return client.write(job.Encode())
}
// Internal write
func (client *Client) write(buf []byte) (err error) {
var n int
for i := 0; i < len(buf); i += n {
n, err = client.conn.Write(buf[i:])
if err != nil {
return
}
}
return
}
// Close.
func (client *Client) Close() (err error) {
err = client.conn.Close()
close(client.JobQueue)
return
}

View File

@ -0,0 +1,41 @@
package client
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"testing"
)
var client *Client
func init() {
client = NewClient()
}
func TestClientAddServer(t *testing.T) {
t.Log("Add local server 127.0.0.1:4730")
if err := client.AddServer("127.0.0.1:4730"); err != nil {
t.Error(err)
}
}
func TestClientEcho(t *testing.T) {
if echo, err := client.Echo([]byte("Hello world")); err != nil {
t.Error(err)
} else {
t.Log(echo)
}
}
func TestClientDo(t *testing.T) {
if handle, err := client.Do("ToUpper", []byte("abcdef"), gearman.JOB_LOW|gearman.JOB_BG); err != nil {
t.Error(err)
} else {
t.Log(handle)
}
}
func TestClientClose(t *testing.T) {
if err := client.Close(); err != nil {
t.Error(err)
}
}

View File

@ -0,0 +1,98 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"bytes"
)
// Client side job
type ClientJob struct {
Data []byte
Handle, UniqueId string
magicCode, DataType uint32
}
// Create a new job
func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) {
return &ClientJob{magicCode: magiccode,
DataType: datatype,
Data: data}
}
// Decode a job from byte slice
func DecodeClientJob(data []byte) (job *ClientJob, err error) {
if len(data) < 12 {
err = gearman.ErrInvalidData
return
}
datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) {
err = gearman.ErrInvalidData
return
}
data = data[12:]
job = NewClientJob(gearman.RES, datatype, data)
return
}
// Encode a job to byte slice
func (job *ClientJob) Encode() (data []byte) {
magiccode := gearman.Uint32ToBytes(job.magicCode)
datatype := gearman.Uint32ToBytes(job.DataType)
data = make([]byte, 0, 1024*64)
data = append(data, magiccode[:]...)
data = append(data, datatype[:]...)
l := len(job.Data)
datalength := gearman.Uint32ToBytes(uint32(l))
data = append(data, datalength[:]...)
data = append(data, job.Data...)
return
}
// Extract the job's result.
func (job *ClientJob) Result() (data []byte, err error) {
switch job.DataType {
case gearman.WORK_FAIL:
job.Handle = string(job.Data)
err = gearman.ErrWorkFail
return
case gearman.WORK_EXCEPTION:
err = gearman.ErrWorkException
fallthrough
case gearman.WORK_COMPLETE:
s := bytes.SplitN(job.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = gearman.ErrInvalidData
return
}
job.Handle = string(s[0])
data = s[1]
default:
err = gearman.ErrDataType
}
return
}
// Extract the job's update
func (job *ClientJob) Update() (data []byte, err error) {
if job.DataType != gearman.WORK_DATA && job.DataType != gearman.WORK_WARNING {
err = gearman.ErrDataType
return
}
s := bytes.SplitN(job.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = gearman.ErrInvalidData
return
}
if job.DataType == gearman.WORK_WARNING {
err = gearman.ErrWorkWarning
}
job.Handle = string(s[0])
data = s[1]
return
}

123
gearman/gearman.go Normal file
View File

@ -0,0 +1,123 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
/*
This module is Gearman API for golang.
The protocol was implemented by native way.
*/
package gearman
import (
"bytes"
"errors"
"syscall"
)
const (
// tcp4 is tested. You can modify this to 'tcp' for both ipv4 and ipv6,
// or 'tcp6' only for ipv6.
TCP = "tcp4"
// the number limited for job servers.
WORKER_SERVER_CAP = 32
// the number limited for functions.
WORKER_FUNCTION_CAP = 512
// queue size
QUEUE_CAP = 512
// read buffer size
BUFFER_SIZE = 1024
// \x00REQ
REQ = 5391697
REQ_STR = "\x00REQ"
// \x00RES
RES = 5391699
RES_STR = "\x00RES"
// package data type
CAN_DO = 1
CANT_DO = 2
RESET_ABILITIES = 3
PRE_SLEEP = 4
NOOP = 6
JOB_CREATED = 8
GRAB_JOB = 9
NO_JOB = 10
JOB_ASSIGN = 11
WORK_STATUS = 12
WORK_COMPLETE = 13
WORK_FAIL = 14
GET_STATUS = 15
ECHO_REQ = 16
ECHO_RES = 17
ERROR = 19
STATUS_RES = 20
SET_CLIENT_ID = 22
CAN_DO_TIMEOUT = 23
WORK_EXCEPTION = 25
WORK_DATA = 28
WORK_WARNING = 29
GRAB_JOB_UNIQ = 30
JOB_ASSIGN_UNIQ = 31
SUBMIT_JOB = 7
SUBMIT_JOB_BG = 18
SUBMIT_JOB_HIGH = 21
SUBMIT_JOB_HIGH_BG = 32
SUBMIT_JOB_LOW = 33
SUBMIT_JOB_LOW_BG = 34
// Job type
// JOB_NORMAL | JOB_BG means a normal level job run in background
// normal level
JOB_NORMAL = 0
// background job
JOB_BG = 1
// low level
JOB_LOW = 2
// high level
JOB_HIGH = 4
)
var (
ErrIsNotErr = errors.New("The input is not a error data.")
ErrInvalidData = errors.New("Invalid data.")
ErrWorkWarning = errors.New("Work warning.")
ErrWorkFail = errors.New("Work fail.")
ErrWorkException = errors.New("Work exeption.")
ErrDataType = errors.New("Invalid data type.")
ErrOutOfCap = errors.New("Out of the capability.")
ErrNotConn = errors.New("Did not connect to job server.")
ErrFuncNotFound = errors.New("The function was not found.")
)
// Extract the error message
func GetError(data []byte) (eno syscall.Errno, err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = ErrIsNotErr
return
}
l := len(rel[0])
eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]}))
err = errors.New(string(rel[1]))
return
}
// Decode [4]byte to uint32
func BytesToUint32(buf [4]byte) uint32 {
return uint32(buf[0])<<24 +
uint32(buf[1])<<16 +
uint32(buf[2])<<8 +
uint32(buf[3])
}
// Encode uint32 to [4]byte
func Uint32ToBytes(i uint32) (data [4]byte) {
data[0] = byte((i >> 24) & 0xff)
data[1] = byte((i >> 16) & 0xff)
data[2] = byte((i >> 8) & 0xff)
data[3] = byte(i & 0xff)
return
}

131
gearman/worker/jobagent.go Normal file
View File

@ -0,0 +1,131 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"io"
"net"
)
// The agent of job server.
type jobAgent struct {
conn net.Conn
worker *Worker
running bool
incoming chan []byte
}
// Create the agent of job server.
func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err error) {
conn, err := net.Dial(gearman.TCP, addr)
if err != nil {
return nil, err
}
jobagent = &jobAgent{conn: conn, worker: worker, running: true, incoming: make(chan []byte, gearman.QUEUE_CAP)}
return jobagent, err
}
// Internal read
func (agent *jobAgent) read() (data []byte, err error) {
if len(agent.incoming) > 0 {
// incoming queue is not empty
data = <-agent.incoming
} else {
for {
buf := make([]byte, gearman.BUFFER_SIZE)
var n int
if n, err = agent.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 {
err = nil
return
}
return
}
data = append(data, buf[0:n]...)
if n < gearman.BUFFER_SIZE {
break
}
}
}
// split package
start := 0
tl := len(data)
for i := 0; i < tl; i++ {
if string(data[start:start+4]) == gearman.RES_STR {
l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]}))
total := l + 12
if total == tl {
return
} else {
agent.incoming <- data[total:]
data = data[:total]
return
}
} else {
start++
}
}
err = gearman.ErrInvalidData
return
}
// Main loop.
func (agent *jobAgent) Work() {
noop := true
for agent.running {
// got noop msg and incoming queue is zero, grab job
if noop && len(agent.incoming) == 0 {
agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.GRAB_JOB, nil))
}
rel, err := agent.read()
if err != nil {
agent.worker.err(err)
continue
}
job, err := DecodeWorkerJob(rel)
if err != nil {
agent.worker.err(err)
continue
} else {
switch job.DataType {
case gearman.NOOP:
noop = true
case gearman.NO_JOB:
noop = false
agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.PRE_SLEEP, nil))
case gearman.ECHO_RES, gearman.JOB_ASSIGN_UNIQ, gearman.JOB_ASSIGN:
job.agent = agent
agent.worker.incoming <- job
}
}
}
return
}
// Send a job to the job server.
func (agent *jobAgent) WriteJob(job *WorkerJob) (err error) {
return agent.write(job.Encode())
}
// Internal write the encoded job.
func (agent *jobAgent) write(buf []byte) (err error) {
var n int
for i := 0; i < len(buf); i += n {
n, err = agent.conn.Write(buf[i:])
if err != nil {
return err
}
}
return
}
// Close.
func (agent *jobAgent) Close() (err error) {
agent.running = false
close(agent.incoming)
err = agent.conn.Close()
return
}

286
gearman/worker/worker.go Normal file
View File

@ -0,0 +1,286 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"bytes"
"sync"
)
const (
Unlimit = 0
OneByOne = 1
)
// The definition of the callback function.
type JobFunction func(job *WorkerJob) ([]byte, error)
// Map for added function.
type JobFunctionMap map[string]JobFunction
// Error Function
type ErrFunc func(e error)
/*
Worker side api for gearman.
usage:
w = worker.New(worker.Unlimit)
w.AddFunction("foobar", foobar)
w.AddServer("127.0.0.1:4730")
w.Work() // Enter the worker's main loop
The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
It looks like this:
func foobar(job *WorkerJob) (data []byte, err os.Error) {
//sth. here
//plaplapla...
return
}
*/
type Worker struct {
clients []*jobAgent
functions JobFunctionMap
running bool
incoming chan *WorkerJob
mutex sync.Mutex
limit chan bool
JobQueue chan *WorkerJob
// assign a ErrFunc to handle errors
// Must assign befor AddServer
ErrFunc ErrFunc
}
// Get a new worker
func New(l int) (worker *Worker) {
worker = &Worker{
// job server list
clients: make([]*jobAgent, 0, gearman.WORKER_SERVER_CAP),
// function list
functions: make(JobFunctionMap),
incoming: make(chan *WorkerJob, gearman.QUEUE_CAP),
JobQueue: make(chan *WorkerJob, gearman.QUEUE_CAP),
running: true,
}
if l != Unlimit {
worker.limit = make(chan bool, l)
for i := 0; i < l; i ++ {
worker.limit <- true
}
}
return
}
//
func (worker *Worker)err(e error) {
if worker.ErrFunc != nil {
worker.ErrFunc(e)
}
}
// Add a server. The addr should be 'host:port' format.
// The connection is established at this time.
func (worker *Worker) AddServer(addr string) (err error) {
worker.mutex.Lock()
defer worker.mutex.Unlock()
if len(worker.clients) == cap(worker.clients) {
return gearman.ErrOutOfCap
}
// Create a new job server's client as a agent of server
server, err := newJobAgent(addr, worker)
if err != nil {
return err
}
n := len(worker.clients)
worker.clients = worker.clients[0 : n+1]
worker.clients[n] = server
return
}
// Add a function.
// Plz added job servers first, then functions.
// The API will tell every connected job server that 'I can do this'
func (worker *Worker) AddFunction(funcname string,
f JobFunction, timeout uint32) (err error) {
if len(worker.clients) < 1 {
return gearman.ErrNotConn
}
worker.mutex.Lock()
defer worker.mutex.Unlock()
worker.functions[funcname] = f
var datatype uint32
var data []byte
if timeout == 0 {
datatype = gearman.CAN_DO
data = []byte(funcname)
} else {
datatype = gearman.CAN_DO_TIMEOUT
data = []byte(funcname + "\x00")
t := gearman.Uint32ToBytes(timeout)
data = append(data, t[:]...)
}
job := NewWorkerJob(gearman.REQ, datatype, data)
worker.WriteJob(job)
return
}
// Remove a function.
// Tell job servers 'I can not do this now' at the same time.
func (worker *Worker) RemoveFunction(funcname string) (err error) {
worker.mutex.Lock()
defer worker.mutex.Unlock()
if worker.functions[funcname] == nil {
return gearman.ErrFuncNotFound
}
delete(worker.functions, funcname)
job := NewWorkerJob(gearman.REQ, gearman.CANT_DO, []byte(funcname))
worker.WriteJob(job)
return
}
// Main loop
func (worker *Worker) Work() {
for _, v := range worker.clients {
go v.Work()
}
for worker.running || len(worker.incoming) > 0{
select {
case job := <-worker.incoming:
if job == nil {
break
}
switch job.DataType {
case gearman.NO_JOB:
// do nothing
case gearman.ERROR:
_, err := gearman.GetError(job.Data)
worker.err(err)
case gearman.JOB_ASSIGN, gearman.JOB_ASSIGN_UNIQ:
go func() {
if err := worker.exec(job); err != nil {
worker.err(err)
}
}()
default:
worker.JobQueue <- job
}
}
}
close(worker.incoming)
}
// Get the last job in queue.
// If there are more than one job in the queue,
// the last one will be returned,
// the others will be lost.
func (worker *Worker) LastJob() (job *WorkerJob) {
if l := len(worker.JobQueue); l != 1 {
if l == 0 {
return
}
for i := 0; i < l-1; i++ {
<-worker.JobQueue
}
}
return <-worker.JobQueue
}
// Close.
func (worker *Worker) Close() (err error) {
for _, v := range worker.clients {
err = v.Close()
}
worker.running = false
return err
}
// Write a job to job server.
// Here, the job's mean is not the oraginal mean.
// Just looks like a network package for job's result or tell job server, there was a fail.
func (worker *Worker) WriteJob(job *WorkerJob) (err error) {
e := make(chan error)
for _, v := range worker.clients {
go func() {
e <- v.WriteJob(job)
}()
}
return <-e
}
// Send a something out, get the samething back.
func (worker *Worker) Echo(data []byte) (err error) {
job := NewWorkerJob(gearman.REQ, gearman.ECHO_REQ, data)
return worker.WriteJob(job)
}
// Remove all of functions.
// Both from the worker or job servers.
func (worker *Worker) Reset() (err error) {
job := NewWorkerJob(gearman.REQ, gearman.RESET_ABILITIES, nil)
err = worker.WriteJob(job)
worker.functions = make(JobFunctionMap)
return
}
// Set the worker's unique id.
func (worker *Worker) SetId(id string) (err error) {
job := NewWorkerJob(gearman.REQ, gearman.SET_CLIENT_ID, []byte(id))
return worker.WriteJob(job)
}
// Execute the job. And send back the result.
func (worker *Worker) exec(job *WorkerJob) (err error) {
if worker.limit != nil {
<- worker.limit
defer func() {
worker.limit <- true
}()
}
var limit int
if job.DataType == gearman.JOB_ASSIGN {
limit = 3
} else {
limit = 4
}
jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit)
job.Handle = string(jobdata[0])
funcname := string(jobdata[1])
if job.DataType == gearman.JOB_ASSIGN {
job.Data = jobdata[2]
} else {
job.UniqueId = string(jobdata[2])
job.Data = jobdata[3]
}
f, ok := worker.functions[funcname]
if !ok {
return gearman.ErrFuncNotFound
}
result, err := f(job)
var datatype uint32
if err == nil {
datatype = gearman.WORK_COMPLETE
} else {
if result == nil {
datatype = gearman.WORK_FAIL
} else {
datatype = gearman.WORK_EXCEPTION
}
}
job.magicCode = gearman.REQ
job.DataType = datatype
job.Data = result
worker.WriteJob(job)
return
}

View File

@ -0,0 +1,72 @@
package worker
import "testing"
var worker *Worker
func init() {
worker = NewWorker()
}
func TestWorkerAddServer(t *testing.T) {
t.Log("Add local server 127.0.0.1:4730.")
if err := worker.AddServer("127.0.0.1:4730"); err != nil {
t.Error(err)
}
if l := len(worker.clients); l != 1 {
t.Log(worker.clients)
t.Error("The length of server list should be 1.")
}
}
func foobar(job *WorkerJob) ([]byte, error) {
return nil, nil
}
func TestWorkerAddFunction(t *testing.T) {
if err := worker.AddFunction("foobar", foobar, 0); err != nil {
t.Error(err)
}
if err := worker.AddFunction("timeout", foobar, 5); err != nil {
t.Error(err)
}
if l := len(worker.functions); l != 2 {
t.Log(worker.functions)
t.Errorf("The length of function map should be %d.", 2)
}
}
func TestWorkerEcho(t *testing.T) {
if err := worker.Echo([]byte("Hello World")); err != nil {
t.Error(err)
}
}
/*
func TestWorkerResult(t *testing.T) {
if job := worker.LastResult(); job == nil {
t.Error("Nothing in result.")
} else {
t.Log(job)
}
}
*/
func TestWorkerRemoveFunction(t *testing.T) {
if err := worker.RemoveFunction("foobar"); err != nil {
t.Error(err)
}
}
func TestWorkerReset(t *testing.T) {
if err := worker.Reset(); err != nil {
t.Error(err)
}
}
func TestWorkerClose(t *testing.T) {
if err := worker.Close(); err != nil {
t.Error(err)
}
}

View File

@ -0,0 +1,87 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"strconv"
)
// Worker side job
type WorkerJob struct {
Data []byte
Handle, UniqueId string
agent *jobAgent
magicCode, DataType uint32
}
// Create a new job
func NewWorkerJob(magiccode, datatype uint32, data []byte) (job *WorkerJob) {
return &WorkerJob{magicCode: magiccode,
DataType: datatype,
Data: data}
}
// Decode job from byte slice
func DecodeWorkerJob(data []byte) (job *WorkerJob, err error) {
if len(data) < 12 {
err = gearman.ErrInvalidData
return
}
datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) {
err = gearman.ErrInvalidData
return
}
data = data[12:]
job = NewWorkerJob(gearman.RES, datatype, data)
return
}
// Encode a job to byte slice
func (job *WorkerJob) Encode() (data []byte) {
magiccode := gearman.Uint32ToBytes(job.magicCode)
datatype := gearman.Uint32ToBytes(job.DataType)
data = make([]byte, 0, 1024*64)
data = append(data, magiccode[:]...)
data = append(data, datatype[:]...)
data = append(data, []byte{0, 0, 0, 0}...)
l := len(job.Data)
if job.Handle != "" {
data = append(data, []byte(job.Handle)...)
data = append(data, 0)
l += len(job.Handle) + 1
}
data = append(data, job.Data...)
datalength := gearman.Uint32ToBytes(uint32(l))
copy(data[8:12], datalength[:])
return
}
// Send some datas to client.
// Using this in a job's executing.
func (job *WorkerJob) UpdateData(data []byte, iswaring bool) (err error) {
result := append([]byte(job.Handle), 0)
result = append(result, data...)
var datatype uint32
if iswaring {
datatype = gearman.WORK_WARNING
} else {
datatype = gearman.WORK_DATA
}
return job.agent.WriteJob(NewWorkerJob(gearman.REQ, datatype, result))
}
// Update status.
// Tall client how many percent job has been executed.
func (job *WorkerJob) UpdateStatus(numerator, denominator int) (err error) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
result := append([]byte(job.Handle), 0)
result = append(result, n...)
result = append(result, d...)
return job.agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.WORK_STATUS, result))
}

View File

@ -1,229 +0,0 @@
package worker
import (
"bufio"
"bytes"
"encoding/binary"
"io"
"net"
"sync"
)
// The agent of job server.
type agent struct {
sync.Mutex
conn net.Conn
rw *bufio.ReadWriter
worker *Worker
in chan []byte
net, addr string
}
// Create the agent of job server.
func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
a = &agent{
net: net,
addr: addr,
worker: worker,
in: make(chan []byte, queueSize),
}
return
}
func (a *agent) Connect() (err error) {
a.Lock()
defer a.Unlock()
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
return
}
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
go a.work()
return
}
func (a *agent) work() {
defer func() {
if err := recover(); err != nil {
a.worker.err(err.(error))
}
}()
var inpack *inPack
var l int
var err error
var data, leftdata []byte
for {
if data, err = a.read(); err != nil {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Temporary() {
continue
} else {
a.disconnect_error(err)
// else - we're probably dc'ing due to a Close()
break
}
} else if err == io.EOF {
a.disconnect_error(err)
break
}
a.worker.err(err)
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the agent should close the conection
// and reconnect to job server.
a.Close()
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
a.worker.err(err)
break
}
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
}
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
}
if len(data) < minPacketLength { // not enough data
leftdata = data
continue
}
for {
if inpack, l, err = decodeInPack(data); err != nil {
a.worker.err(err)
leftdata = data
break
} else {
leftdata = nil
inpack.a = a
select {
case <-a.worker.closed:
return
default:
}
a.worker.in <- inpack
if len(data) == l {
break
}
if len(data) > l {
data = data[l:]
}
}
}
}
}
func (a *agent) disconnect_error(err error) {
a.Lock()
defer a.Unlock()
if a.conn != nil {
err = &WorkerDisconnectError{
err: err,
agent: a,
}
a.worker.err(err)
}
}
func (a *agent) Close() {
a.Lock()
defer a.Unlock()
if a.conn != nil {
a.conn.Close()
a.conn = nil
}
}
func (a *agent) Grab() {
a.Lock()
defer a.Unlock()
a.grab()
}
func (a *agent) grab() bool {
if a.worker.closed != nil {
return false
}
outpack := getOutPack()
outpack.dataType = dtGrabJobUniq
a.write(outpack)
return true
}
func (a *agent) PreSleep() {
a.Lock()
defer a.Unlock()
outpack := getOutPack()
outpack.dataType = dtPreSleep
a.write(outpack)
}
func (a *agent) reconnect() error {
a.Lock()
defer a.Unlock()
conn, err := net.Dial(a.net, a.addr)
if err != nil {
return err
}
a.conn = conn
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
a.worker.reRegisterFuncsForAgent(a)
if a.grab() {
go a.work()
}
return nil
}
// read length bytes from the socket
func (a *agent) read() (data []byte, err error) {
n := 0
tmp := getBuffer(bufferSize)
var buf bytes.Buffer
// read the header so we can get the length of the data
if n, err = a.rw.Read(tmp); err != nil {
return
}
dl := int(binary.BigEndian.Uint32(tmp[8:12]))
// write what we read so far
buf.Write(tmp[:n])
// read until we receive all the data
for buf.Len() < dl+minPacketLength {
if n, err = a.rw.Read(tmp); err != nil {
return buf.Bytes(), err
}
buf.Write(tmp[:n])
}
return buf.Bytes(), err
}
// Internal write the encoded job.
func (a *agent) write(outpack *outPack) (err error) {
var n int
buf := outpack.Encode()
for i := 0; i < len(buf); i += n {
n, err = a.rw.Write(buf[i:])
if err != nil {
return err
}
}
return a.rw.Flush()
}
// Write with lock
func (a *agent) Write(outpack *outPack) (err error) {
a.Lock()
defer a.Unlock()
return a.write(outpack)
}

View File

@ -1,51 +0,0 @@
package worker
const (
Network = "tcp"
// queue size
queueSize = 8
// read buffer size
bufferSize = 1024
// min packet length
minPacketLength = 12
// \x00REQ
req = 5391697
reqStr = "\x00REQ"
// \x00RES
res = 5391699
resStr = "\x00RES"
// package data type
dtCanDo = 1
dtCantDo = 2
dtResetAbilities = 3
dtPreSleep = 4
dtNoop = 6
dtJobCreated = 8
dtGrabJob = 9
dtNoJob = 10
dtJobAssign = 11
dtWorkStatus = 12
dtWorkComplete = 13
dtWorkFail = 14
dtGetStatus = 15
dtEchoReq = 16
dtEchoRes = 17
dtError = 19
dtStatusRes = 20
dtSetClientId = 22
dtCanDoTimeout = 23
dtAllYours = 24
dtWorkException = 25
dtWorkData = 28
dtWorkWarning = 29
dtGrabJobUniq = 30
dtJobAssignUniq = 31
)
func getBuffer(l int) (buf []byte) {
// TODO add byte buffer pool
buf = make([]byte, l)
return
}

View File

@ -1,28 +0,0 @@
package worker
import (
"bytes"
"errors"
"fmt"
)
var (
ErrNoneAgents = errors.New("None active agents")
ErrNoneFuncs = errors.New("None functions")
ErrTimeOut = errors.New("Executing time out")
ErrUnknown = errors.New("Unknown error")
)
// Extract the error message
func getError(data []byte) (err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %v", data)
return
}
err = fmt.Errorf("%s: %s", rel[0], rel[1])
return
}
// An error handler
type ErrorHandler func(error)

View File

@ -1,56 +0,0 @@
package worker_test
import (
"fmt"
"github.com/mikespook/gearman-go/worker"
"sync"
)
func ExampleWorker() {
// An example of worker
w := worker.New(worker.Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(worker.Network, "127.0.0.1:4730"); err != nil {
fmt.Println(err)
return
}
// A function for handling jobs
foobar := func(job worker.Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job worker.Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
}

View File

@ -1,45 +0,0 @@
package worker
import (
"encoding/json"
"runtime"
)
// Job handler
type JobHandler func(Job) error
type JobFunc func(Job) ([]byte, error)
// The definition of the callback function.
type jobFunc struct {
f JobFunc
timeout uint32
}
// Map for added function.
type jobFuncs map[string]*jobFunc
type systemInfo struct {
GOOS, GOARCH, GOROOT, Version string
NumCPU, NumGoroutine int
NumCgoCall int64
}
func SysInfo(job Job) ([]byte, error) {
return json.Marshal(&systemInfo{
GOOS: runtime.GOOS,
GOARCH: runtime.GOARCH,
GOROOT: runtime.GOROOT(),
Version: runtime.Version(),
NumCPU: runtime.NumCPU(),
NumGoroutine: runtime.NumGoroutine(),
NumCgoCall: runtime.NumCgoCall(),
})
}
var memState runtime.MemStats
func MemInfo(job Job) ([]byte, error) {
runtime.ReadMemStats(&memState)
return json.Marshal(&memState)
}

View File

@ -1,126 +0,0 @@
package worker
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
)
// Worker side job
type inPack struct {
dataType uint32
data []byte
handle, uniqueId, fn string
a *agent
}
// Create a new job
func getInPack() *inPack {
return &inPack{}
}
func (inpack *inPack) Data() []byte {
return inpack.data
}
func (inpack *inPack) Fn() string {
return inpack.fn
}
func (inpack *inPack) Handle() string {
return inpack.handle
}
func (inpack *inPack) UniqueId() string {
return inpack.uniqueId
}
func (inpack *inPack) Err() error {
if inpack.dataType == dtError {
return getError(inpack.data)
}
return nil
}
// Send some datas to client.
// Using this in a job's executing.
func (inpack *inPack) SendData(data []byte) {
outpack := getOutPack()
outpack.dataType = dtWorkData
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
func (inpack *inPack) SendWarning(data []byte) {
outpack := getOutPack()
outpack.dataType = dtWorkWarning
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
// Update status.
// Tall client how many percent job has been executed.
func (inpack *inPack) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
outpack := getOutPack()
outpack.dataType = dtWorkStatus
hl := len(inpack.handle)
nl := len(n)
dl := len(d)
outpack.data = getBuffer(hl + nl + dl + 2)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], n)
copy(outpack.data[hl+nl+2:], d)
inpack.a.write(outpack)
}
// Decode job from byte slice
func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
if len(data) < minPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %v", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
if len(data) < (dl + minPacketLength) {
err = fmt.Errorf("Not enough data: %v", data)
return
}
dt := data[minPacketLength : dl+minPacketLength]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %v", data)
return
}
inpack = getInPack()
inpack.dataType = binary.BigEndian.Uint32(data[4:8])
switch inpack.dataType {
case dtJobAssign:
s := bytes.SplitN(dt, []byte{'\x00'}, 3)
if len(s) == 3 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.data = s[2]
}
case dtJobAssignUniq:
s := bytes.SplitN(dt, []byte{'\x00'}, 4)
if len(s) == 4 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.uniqueId = string(s[2])
inpack.data = s[3]
}
default:
inpack.data = dt
}
l = dl + minPacketLength
return
}

View File

@ -1,73 +0,0 @@
package worker
import (
"bytes"
"testing"
)
var (
inpackcases = map[uint32]map[string]string{
dtNoop: map[string]string{
"src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00",
},
dtNoJob: map[string]string{
"src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00",
},
dtJobAssign: map[string]string{
"src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz",
"handle": "a",
"fn": "b",
"data": "xyz",
},
dtJobAssignUniq: map[string]string{
"src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz",
"handle": "a",
"fn": "b",
"uid": "c",
"data": "xyz",
},
}
)
func TestInPack(t *testing.T) {
for k, v := range inpackcases {
inpack, _, err := decodeInPack([]byte(v["src"]))
if err != nil {
t.Error(err)
}
if inpack.dataType != k {
t.Errorf("DataType: %d expected, %d got.", k, inpack.dataType)
}
if handle, ok := v["handle"]; ok {
if inpack.handle != handle {
t.Errorf("Handle: %s expected, %s got.", handle, inpack.handle)
}
}
if fn, ok := v["fn"]; ok {
if inpack.fn != fn {
t.Errorf("FuncName: %s expected, %s got.", fn, inpack.fn)
}
}
if uid, ok := v["uid"]; ok {
if inpack.uniqueId != uid {
t.Errorf("UID: %s expected, %s got.", uid, inpack.uniqueId)
}
}
if data, ok := v["data"]; ok {
if bytes.Compare([]byte(data), inpack.data) != 0 {
t.Errorf("UID: %v expected, %v got.", data, inpack.data)
}
}
}
}
func BenchmarkDecode(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, v := range inpackcases {
_, _, err := decodeInPack([]byte(v["src"]))
if err != nil {
b.Error(err)
}
}
}
}

View File

@ -1,12 +0,0 @@
package worker
type Job interface {
Err() error
Data() []byte
Fn() string
SendWarning(data []byte)
SendData(data []byte)
UpdateStatus(numerator, denominator int)
Handle() string
UniqueId() string
}

View File

@ -1,47 +0,0 @@
package worker
import (
"encoding/binary"
)
// Worker side job
type outPack struct {
dataType uint32
data []byte
handle string
}
func getOutPack() (outpack *outPack) {
// TODO pool
return &outPack{}
}
// Encode a job to byte slice
func (outpack *outPack) Encode() (data []byte) {
var l int
if outpack.dataType == dtWorkFail {
l = len(outpack.handle)
} else {
l = len(outpack.data)
if outpack.handle != "" {
l += len(outpack.handle) + 1
}
}
data = getBuffer(l + minPacketLength)
binary.BigEndian.PutUint32(data[:4], req)
binary.BigEndian.PutUint32(data[4:8], outpack.dataType)
binary.BigEndian.PutUint32(data[8:minPacketLength], uint32(l))
i := minPacketLength
if outpack.handle != "" {
hi := len(outpack.handle) + i
copy(data[i:hi], []byte(outpack.handle))
if outpack.dataType != dtWorkFail {
data[hi] = '\x00'
}
i = hi + 1
}
if outpack.dataType != dtWorkFail {
copy(data[i:], outpack.data)
}
return
}

View File

@ -1,99 +0,0 @@
package worker
import (
"bytes"
"testing"
)
var (
outpackcases = map[uint32]map[string]string{
dtCanDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a",
"data": "a",
},
dtCanDoTimeout: map[string]string{
"src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01",
"data": "a\x00\x00\x00\x00\x01",
},
dtCantDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a",
"data": "a",
},
dtResetAbilities: map[string]string{
"src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00",
},
dtPreSleep: map[string]string{
"src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00",
},
dtGrabJob: map[string]string{
"src": "\x00REQ\x00\x00\x00\x09\x00\x00\x00\x00",
},
dtGrabJobUniq: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1E\x00\x00\x00\x00",
},
dtWorkData: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1C\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkWarning: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkStatus: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100",
"data": "a\x0050\x00100",
},
dtWorkComplete: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkFail: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a",
"handle": "a",
},
dtWorkException: map[string]string{
"src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtSetClientId: map[string]string{
"src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a",
"data": "a",
},
dtAllYours: map[string]string{
"src": "\x00REQ\x00\x00\x00\x18\x00\x00\x00\x00",
},
}
)
func TestOutPack(t *testing.T) {
for k, v := range outpackcases {
outpack := getOutPack()
outpack.dataType = k
if handle, ok := v["handle"]; ok {
outpack.handle = handle
}
if data, ok := v["data"]; ok {
outpack.data = []byte(data)
}
data := outpack.Encode()
if bytes.Compare([]byte(v["src"]), data) != 0 {
t.Errorf("%d: %X expected, %X got.", k, v["src"], data)
}
}
}
func BenchmarkEncode(b *testing.B) {
for i := 0; i < b.N; i++ {
for k, v := range outpackcases {
outpack := getOutPack()
outpack.dataType = k
if handle, ok := v["handle"]; ok {
outpack.handle = handle
}
if data, ok := v["data"]; ok {
outpack.data = []byte(data)
}
outpack.Encode()
}
}
}

View File

@ -1,405 +0,0 @@
// The worker package helps developers to develop Gearman's worker
// in an easy way.
package worker
import (
"fmt"
"strconv"
"sync"
"time"
)
const (
Unlimited = iota
OneByOne
)
// Worker is the only structure needed by worker side developing.
// It can connect to multi-server and grab jobs.
type Worker struct {
sync.Mutex
agents []*agent
funcs jobFuncs
in chan *inPack
running bool
ready bool
jobLeftNum int64
Id string
ErrorHandler ErrorHandler
JobHandler JobHandler
limit chan bool
closed chan struct{}
leftJobs chan struct{}
}
// New returns a worker.
//
// If limit is set to Unlimited(=0), the worker will grab all jobs
// and execute them parallelly.
// If limit is greater than zero, the number of paralled executing
// jobs are limited under the number. If limit is assgined to
// OneByOne(=1), there will be only one job executed in a time.
func New(limit int) (worker *Worker) {
worker = &Worker{
agents: make([]*agent, 0, limit),
funcs: make(jobFuncs),
in: make(chan *inPack, queueSize),
}
if limit != Unlimited {
worker.limit = make(chan bool, limit-1)
}
return
}
// inner error handling
func (worker *Worker) err(e error) {
if worker.ErrorHandler != nil {
worker.ErrorHandler(e)
}
}
// AddServer adds a Gearman job server.
//
// addr should be formated as 'host:port'.
func (worker *Worker) AddServer(net, addr string) (err error) {
// Create a new job server's client as a agent of server
a, err := newAgent(net, addr, worker)
if err != nil {
return err
}
worker.agents = append(worker.agents, a)
return
}
// Broadcast an outpack to all Gearman server.
func (worker *Worker) broadcast(outpack *outPack) {
for _, v := range worker.agents {
v.Write(outpack)
}
}
// AddFunc adds a function.
// Set timeout as Unlimited(=0) to disable executing timeout.
func (worker *Worker) AddFunc(funcname string,
f JobFunc, timeout uint32) (err error) {
worker.Lock()
defer worker.Unlock()
if _, ok := worker.funcs[funcname]; ok {
return fmt.Errorf("The function already exists: %s", funcname)
}
worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
if worker.running {
worker.addFunc(funcname, timeout)
}
return
}
// inner add
func (worker *Worker) addFunc(funcname string, timeout uint32) {
outpack := prepFuncOutpack(funcname, timeout)
worker.broadcast(outpack)
}
func prepFuncOutpack(funcname string, timeout uint32) *outPack {
outpack := getOutPack()
if timeout == 0 {
outpack.dataType = dtCanDo
outpack.data = []byte(funcname)
} else {
outpack.dataType = dtCanDoTimeout
l := len(funcname)
timeoutString := strconv.FormatUint(uint64(timeout), 10)
outpack.data = getBuffer(l + len(timeoutString) + 1)
copy(outpack.data, []byte(funcname))
outpack.data[l] = '\x00'
copy(outpack.data[l+1:], []byte(timeoutString))
}
return outpack
}
// RemoveFunc removes a function.
func (worker *Worker) RemoveFunc(funcname string) (err error) {
worker.Lock()
defer worker.Unlock()
if _, ok := worker.funcs[funcname]; !ok {
return fmt.Errorf("The function does not exist: %s", funcname)
}
delete(worker.funcs, funcname)
if worker.running {
worker.removeFunc(funcname)
}
return
}
// inner remove
func (worker *Worker) removeFunc(funcname string) {
outpack := getOutPack()
outpack.dataType = dtCantDo
outpack.data = []byte(funcname)
worker.broadcast(outpack)
}
// inner package handling
func (worker *Worker) handleInPack(inpack *inPack) {
switch inpack.dataType {
case dtNoJob:
inpack.a.PreSleep()
case dtNoop:
inpack.a.Grab()
case dtJobAssign, dtJobAssignUniq:
go func() {
go func() {
worker.incrExecJobNum()
defer func() {
worker.decrExecJobNum()
}()
if err := worker.exec(inpack); err != nil {
worker.err(err)
}
}()
if worker.limit != nil {
worker.limit <- true
}
inpack.a.Grab()
}()
case dtError:
worker.err(inpack.Err())
fallthrough
case dtEchoRes:
fallthrough
default:
worker.customeHandler(inpack)
}
}
// Connect to Gearman server and tell every server
// what can this worker do.
func (worker *Worker) Ready() (err error) {
if len(worker.agents) == 0 {
return ErrNoneAgents
}
if len(worker.funcs) == 0 {
return ErrNoneFuncs
}
for _, a := range worker.agents {
if err = a.Connect(); err != nil {
return
}
}
for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout)
}
worker.ready = true
return
}
// Work start main loop (blocking)
// Most of time, this should be evaluated in goroutine.
func (worker *Worker) Work() {
if !worker.ready {
// didn't run Ready beforehand, so we'll have to do it:
err := worker.Ready()
if err != nil {
panic(err)
}
}
worker.Lock()
worker.running = true
worker.Unlock()
for _, a := range worker.agents {
a.Grab()
}
// 执行任务(阻塞)
var inpack *inPack
for inpack = range worker.in {
worker.handleInPack(inpack)
}
// 关闭Worker进程后 等待任务完成后退出
worker.Lock()
leftJobNum := int(worker.jobLeftNum)
worker.Unlock()
if worker.leftJobs != nil {
for i := 0; i < leftJobNum; i++ {
<-worker.leftJobs
}
}
worker.Reset()
worker.close()
}
// custome handling warper
func (worker *Worker) customeHandler(inpack *inPack) {
if worker.JobHandler != nil {
if err := worker.JobHandler(inpack); err != nil {
worker.err(err)
}
}
}
// Close connection and exit main loop
func (worker *Worker) Close() {
worker.Lock()
defer worker.Unlock()
if worker.running == true && worker.closed == nil {
worker.closed = make(chan struct{}, 1)
worker.closed <- struct{}{}
worker.running = false
close(worker.in)
// 创建关闭后执行中的任务列表
if worker.jobLeftNum != 0 {
worker.leftJobs = make(chan struct{}, worker.jobLeftNum+int64(len(worker.in)))
}
}
}
func (worker *Worker) close() {
for _, a := range worker.agents {
a.Close()
}
}
// Echo
func (worker *Worker) Echo(data []byte) {
outpack := getOutPack()
outpack.dataType = dtEchoReq
outpack.data = data
worker.broadcast(outpack)
}
// Reset removes all of functions.
// Both from the worker and job servers.
func (worker *Worker) Reset() {
outpack := getOutPack()
outpack.dataType = dtResetAbilities
worker.broadcast(outpack)
worker.funcs = make(jobFuncs)
}
// Set the worker's unique id.
func (worker *Worker) SetId(id string) {
worker.Id = id
outpack := getOutPack()
outpack.dataType = dtSetClientId
outpack.data = []byte(id)
worker.broadcast(outpack)
}
func (worker *Worker) incrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum++
return worker.jobLeftNum
}
func (worker *Worker) decrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum--
if worker.jobLeftNum < 0 {
worker.jobLeftNum = 0
}
return worker.jobLeftNum
}
// inner job executing
func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() {
if worker.limit != nil {
<-worker.limit
}
if r := recover(); r != nil {
if e, ok := r.(error); ok {
err = e
} else {
err = ErrUnknown
}
}
}()
f, ok := worker.funcs[inpack.fn]
if !ok {
return fmt.Errorf("The function does not exist: %s", inpack.fn)
}
var r *result
if f.timeout == 0 {
d, e := f.f(inpack)
r = &result{data: d, err: e}
} else {
r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
}
//if worker.running {
outpack := getOutPack()
if r.err == nil {
outpack.dataType = dtWorkComplete
} else {
if len(r.data) == 0 {
outpack.dataType = dtWorkFail
} else {
outpack.dataType = dtWorkException
}
err = r.err
}
outpack.handle = inpack.handle
outpack.data = r.data
_ = inpack.a.Write(outpack)
if worker.leftJobs != nil {
worker.leftJobs <- struct{}{}
}
//}
return
}
func (worker *Worker) reRegisterFuncsForAgent(a *agent) {
worker.Lock()
defer worker.Unlock()
for funcname, f := range worker.funcs {
outpack := prepFuncOutpack(funcname, f.timeout)
a.write(outpack)
}
}
// inner result
type result struct {
data []byte
err error
}
// executing timer
func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
rslt := make(chan *result)
defer close(rslt)
go func() {
defer func() { recover() }()
d, e := f(job)
rslt <- &result{data: d, err: e}
}()
select {
case r = <-rslt:
case <-time.After(timeout):
return &result{err: ErrTimeOut}
}
return r
}
// Error type passed when a worker connection disconnects
type WorkerDisconnectError struct {
err error
agent *agent
}
func (e *WorkerDisconnectError) Error() string {
return e.err.Error()
}
// Responds to the error by asking the worker to reconnect
func (e *WorkerDisconnectError) Reconnect() (err error) {
return e.agent.reconnect()
}
// Which server was this for?
func (e *WorkerDisconnectError) Server() (net string, addr string) {
return e.agent.net, e.agent.addr
}

View File

@ -1,59 +0,0 @@
package worker
import (
"fmt"
"sync"
"testing"
)
func TestWorkerRace(t *testing.T) {
// from example worker
// An example of worker
w := New(Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
// A function for handling jobs
foobar := func(job Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
// tear down
w.Close()
}

View File

@ -1,269 +0,0 @@
package worker
import (
"bytes"
"flag"
"os"
"sync"
"testing"
"time"
)
var (
worker *Worker
runIntegrationTests bool
)
func init() {
worker = New(Unlimited)
}
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
code := m.Run()
os.Exit(code)
}
func TestWorkerErrNoneAgents(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
err := worker.Ready()
if err != ErrNoneAgents {
t.Error("ErrNoneAgents expected.")
}
}
func TestWorkerAddServer(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add local server 127.0.0.1:4730.")
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
if l := len(worker.agents); l != 1 {
t.Log(worker.agents)
t.Error("The length of server list should be 1.")
}
}
func TestWorkerErrNoneFuncs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
err := worker.Ready()
if err != ErrNoneFuncs {
t.Error("ErrNoneFuncs expected.")
}
}
func foobar(job Job) ([]byte, error) {
return nil, nil
}
func TestWorkerAddFunction(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := worker.AddFunc("foobar", foobar, 0); err != nil {
t.Error(err)
}
if err := worker.AddFunc("timeout", foobar, 5); err != nil {
t.Error(err)
}
if l := len(worker.funcs); l != 2 {
t.Log(worker.funcs)
t.Errorf("The length of function map should be %d.", 2)
}
}
func TestWorkerRemoveFunc(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := worker.RemoveFunc("foobar"); err != nil {
t.Error(err)
}
}
func TestWork(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
var wg sync.WaitGroup
worker.JobHandler = func(job Job) error {
t.Logf("%s", job.Data())
wg.Done()
return nil
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
worker.Echo([]byte("Hello"))
wg.Wait()
}
func TestLargeDataWork(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
worker := New(Unlimited)
defer worker.Close()
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
worker.Ready()
l := 5714
var wg sync.WaitGroup
bigdataHandler := func(job Job) error {
defer wg.Done()
if len(job.Data()) != l {
t.Errorf("expected length %d. got %d.", l, len(job.Data()))
}
return nil
}
if err := worker.AddFunc("bigdata", foobar, 0); err != nil {
defer wg.Done()
t.Error(err)
}
worker.JobHandler = bigdataHandler
worker.ErrorHandler = func(err error) {
t.Fatal("shouldn't have received an error")
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
// var cli *client.Client
// var err error
// if cli, err = client.New(client.Network, "127.0.0.1:4730"); err != nil {
// t.Fatal(err)
// }
// cli.ErrorHandler = func(e error) {
// t.Error(e)
// }
// _, err = cli.Do("bigdata", bytes.Repeat([]byte("a"), l), client.JobLow, func(res *client.Response) {
// })
// if err != nil {
// t.Error(err)
// }
worker.Echo(bytes.Repeat([]byte("a"), l))
wg.Wait()
}
func TestWorkerClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
worker.Close()
}
func TestWorkWithoutReady(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
other_worker := New(Unlimited)
if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
if err := other_worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil {
t.Error(err)
}
timeout := make(chan bool, 1)
done := make(chan bool, 1)
other_worker.JobHandler = func(j Job) error {
if !other_worker.ready {
t.Error("Worker not ready as expected")
}
done <- true
return nil
}
go func() {
time.Sleep(5 * time.Second)
timeout <- true
}()
go func() {
other_worker.Work()
}()
// With the all-in-one Work() we don't know if the
// worker is ready at this stage so we may have to wait a sec:
go func() {
tries := 5
for tries > 0 {
if other_worker.ready {
other_worker.Echo([]byte("Hello"))
break
}
// still waiting for it to be ready..
time.Sleep(250 * time.Millisecond)
tries--
}
}()
// determine if we've finished or timed out:
select {
case <-timeout:
t.Error("Test timed out waiting for the worker")
case <-done:
}
}
func TestWorkWithoutReadyWithPanic(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
other_worker := New(Unlimited)
timeout := make(chan bool, 1)
done := make(chan bool, 1)
// Going to work with no worker setup.
// when Work (hopefully) calls Ready it will get an error which should cause it to panic()
go func() {
defer func() {
if err := recover(); err != nil {
done <- true
return
}
t.Error("Work should raise a panic.")
done <- true
}()
other_worker.Work()
}()
go func() {
time.Sleep(2 * time.Second)
timeout <- true
}()
select {
case <-timeout:
t.Error("Test timed out waiting for the worker")
case <-done:
}
}