Compare commits

..

No commits in common. "master" and "0.0.1" have entirely different histories.

43 changed files with 1269 additions and 3276 deletions

22
.gitignore vendored
View File

@ -1,22 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe

2
.hgtags Normal file
View File

@ -0,0 +1,2 @@
b68aee2a48811a1bee5994b56437c393c6fb2f5b 2011-05-24
0dc8bc71d7e895caf5803c1905bf07a823462fba native.start

View File

@ -1,7 +0,0 @@
language: go
go:
- 1.4
before_install:
- sudo apt-get remove -y gearman-job-server
- sudo apt-get install -y gearman-job-server

131
README.md
View File

@ -1,122 +1,53 @@
Gearman-Go
==========
# Gearman API for golang
This module is a [Gearman](http://gearman.org/) API for the [Go Programming Language](http://golang.org).
The protocols were written in pure Go. It contains two sub-packages:
This module is Gearman API for golang. It was implemented a native
protocol for both worker and client API.
The client package is used for sending jobs to the Gearman job server,
and getting responses from the server.
Copyright 2012 Xing Xing <mikespook@gmail.com> All rights reserved.
Use of this source code is governed by a MIT license that can be found
in the LICENSE file.
"github.com/mikespook/gearman-go/client"
# INSTALL
The worker package will help developers in developing Gearman worker
service easily.
This will install the client:
"github.com/mikespook/gearman-go/worker"
[![Build Status](https://travis-ci.org/mikespook/gearman-go.png?branch=master)](https://travis-ci.org/mikespook/gearman-go)
[![GoDoc](https://godoc.org/github.com/mikespook/gearman-go?status.png)](https://godoc.org/github.com/mikespook/gearman-go)
Install
=======
Install the client package:
> $ go get github.com/mikespook/gearman-go/client
> $ go get bitbucket.org/mikespook/gearman-go/gearman/client
Install the worker package:
This will install the worker:
> $ go get github.com/mikespook/gearman-go/worker
> $ go get bitbucket.org/mikespook/gearman-go/gearman/worker
Both of them:
This will install the client and the worker automatically:
> $ go get github.com/mikespook/gearman-go
> $ go get bitbucket.org/mikespook/gearman-go
Usage
=====
# SAMPLE OF USAGE
## Worker
```go
// Limit number of concurrent jobs execution.
// Use worker.Unlimited (0) if you want no limitation.
w := worker.New(worker.OneByOne)
w.ErrHandler = func(e error) {
log.Println(e)
}
w.AddServer("127.0.0.1:4730")
// Use worker.Unlimited (0) if you want no timeout
w.AddFunc("ToUpper", ToUpper, worker.Unlimited)
// This will give a timeout of 5 seconds
w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work()
```
> $ cd example
>
> $ go build worker
>
> $ ./worker
## Client
```go
// ...
c, err := client.New("tcp4", "127.0.0.1:4730")
// ... error handling
defer c.Close()
c.ErrorHandler = func(e error) {
log.Println(e)
}
echo := []byte("Hello\x00 world")
echomsg, err := c.Echo(echo)
// ... error handling
log.Println(string(echomsg))
jobHandler := func(resp *client.Response) {
log.Printf("%s", resp.Data)
}
handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler)
// ...
```
> $ cd example
>
> $ go build client
>
> $ ./client
Branches
========
# Code format
Version 0.x means: _It is far far away from stable._
> $ gofmt -spaces=true -tabwidth=4 -w=true -tabindent=false $(DIR)
__Use at your own risk!__
# Contacts
* master current usable version
* 0.2-dev Refactoring a lot of things
* 0.1-testing Old API and some known issues, eg. [issue-14](https://github.com/mikespook/gearman-go/issues/14)
xingxing<mikespook@gmail.com>
Contributors
============
http://mikespook.com
Great thanks to all of you for your support and interest!
(_Alphabetic order_)
* [Alex Zylman](https://github.com/azylman)
* [C.R. Kirkwood-Watts](https://github.com/kirkwood)
* [Damian Gryski](https://github.com/dgryski)
* [Gabriel Cristian Alecu](https://github.com/AzuraMeta)
* [Graham Barr](https://github.com/gbarr)
* [Ingo Oeser](https://github.com/nightlyone)
* [jake](https://github.com/jbaikge)
* [Joe Higton](https://github.com/draxil)
* [Jonathan Wills](https://github.com/runningwild)
* [Kevin Darlington](https://github.com/kdar)
* [miraclesu](https://github.com/miraclesu)
* [Paul Mach](https://github.com/paulmach)
* [Randall McPherson](https://github.com/rlmcpherson)
* [Sam Grimee](https://github.com/sgrimee)
Maintainer
==========
* [Xing Xing](http://mikespook.com) &lt;<mikespook@gmail.com>&gt; [@Twitter](http://twitter.com/mikespook)
Open Source - MIT Software License
==================================
See LICENSE.
http://twitter.com/mikespook

View File

@ -1,360 +0,0 @@
// The client package helps developers connect to Gearmand, send
// jobs and fetch result.
package client
import (
"bufio"
"net"
"sync"
"time"
)
var (
DefaultTimeout time.Duration = time.Second
)
// One client connect to one server.
// Use Pool for multi-connections.
type Client struct {
sync.Mutex
net, addr string
innerHandler *responseHandlerMap
in chan *Response
conn net.Conn
rw *bufio.ReadWriter
ResponseTimeout time.Duration // response timeout for do()
ErrorHandler ErrorHandler
}
type responseHandlerMap struct {
sync.Mutex
holder map[string]handledResponse
}
type handledResponse struct {
internal ResponseHandler // internal handler, always non-nil
external ResponseHandler // handler passed in from (*Client).Do, sometimes nil
}
func newResponseHandlerMap() *responseHandlerMap {
return &responseHandlerMap{holder: make(map[string]handledResponse, queueSize)}
}
func (r *responseHandlerMap) remove(key string) {
r.Lock()
delete(r.holder, key)
r.Unlock()
}
func (r *responseHandlerMap) getAndRemove(key string) (handledResponse, bool) {
r.Lock()
rh, b := r.holder[key]
delete(r.holder, key)
r.Unlock()
return rh, b
}
func (r *responseHandlerMap) putWithExternalHandler(key string, internal, external ResponseHandler) {
r.Lock()
r.holder[key] = handledResponse{internal: internal, external: external}
r.Unlock()
}
func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
r.putWithExternalHandler(key, rh, nil)
}
// New returns a client.
func New(network, addr string) (client *Client, err error) {
client = &Client{
net: network,
addr: addr,
innerHandler: newResponseHandlerMap(),
in: make(chan *Response, queueSize),
ResponseTimeout: DefaultTimeout,
}
client.conn, err = net.Dial(client.net, client.addr)
if err != nil {
return
}
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
bufio.NewWriter(client.conn))
go client.readLoop()
go client.processLoop()
return
}
func (client *Client) write(req *request) (err error) {
var n int
buf := req.Encode()
for i := 0; i < len(buf); i += n {
n, err = client.rw.Write(buf[i:])
if err != nil {
return
}
}
return client.rw.Flush()
}
func (client *Client) read(length int) (data []byte, err error) {
n := 0
buf := getBuffer(bufferSize)
// read until data can be unpacked
for i := length; i > 0 || len(data) < minPacketLength; i -= n {
if n, err = client.rw.Read(buf); err != nil {
return
}
data = append(data, buf[0:n]...)
if n < bufferSize {
break
}
}
return
}
func (client *Client) readLoop() {
defer close(client.in)
var data, leftdata []byte
var err error
var resp *Response
ReadLoop:
for client.conn != nil {
if data, err = client.read(bufferSize); err != nil {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Timeout() {
client.err(err)
}
if opErr.Temporary() {
continue
}
break
}
client.err(err)
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the client should close the conection
// and reconnect to job server.
client.Close()
client.conn, err = net.Dial(client.net, client.addr)
if err != nil {
client.err(err)
break
}
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
bufio.NewWriter(client.conn))
continue
}
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
leftdata = nil
}
for {
l := len(data)
if l < minPacketLength { // not enough data
leftdata = data
continue ReadLoop
}
if resp, l, err = decodeResponse(data); err != nil {
leftdata = data[l:]
continue ReadLoop
} else {
client.in <- resp
}
data = data[l:]
if len(data) > 0 {
continue
}
break
}
}
}
func (client *Client) processLoop() {
rhandlers := map[string]ResponseHandler{}
for resp := range client.in {
switch resp.DataType {
case dtError:
client.err(getError(resp.Data))
case dtStatusRes:
client.handleInner("s"+resp.Handle, resp, nil)
case dtJobCreated:
client.handleInner("c", resp, rhandlers)
case dtEchoRes:
client.handleInner("e", resp, nil)
case dtWorkData, dtWorkWarning, dtWorkStatus:
if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
}
case dtWorkComplete, dtWorkFail, dtWorkException:
if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
delete(rhandlers, resp.Handle)
}
}
}
}
func (client *Client) err(e error) {
if client.ErrorHandler != nil {
client.ErrorHandler(e)
}
}
func (client *Client) handleInner(key string, resp *Response, rhandlers map[string]ResponseHandler) {
if h, ok := client.innerHandler.getAndRemove(key); ok {
if h.external != nil && resp.Handle != "" {
rhandlers[resp.Handle] = h.external
}
h.internal(resp)
}
}
type handleOrError struct {
handle string
err error
}
func (client *Client) do(funcname string, data []byte,
flag uint32, h ResponseHandler, id string) (handle string, err error) {
if len(id) == 0 {
return "", ErrInvalidId
}
if client.conn == nil {
return "", ErrLostConn
}
var result = make(chan handleOrError, 1)
client.Lock()
defer client.Unlock()
client.innerHandler.putWithExternalHandler("c", func(resp *Response) {
if resp.DataType == dtError {
err = getError(resp.Data)
result <- handleOrError{"", err}
return
}
handle = resp.Handle
result <- handleOrError{handle, nil}
}, h)
req := getJob(id, []byte(funcname), data)
req.DataType = flag
if err = client.write(req); err != nil {
client.innerHandler.remove("c")
return
}
var timer = time.After(client.ResponseTimeout)
select {
case ret := <-result:
return ret.handle, ret.err
case <-timer:
client.innerHandler.remove("c")
return "", ErrLostConn
}
return
}
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (handle string, err error) {
handle, err = client.DoWithId(funcname, data, flag, h, IdGen.Id())
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) {
handle, err = client.DoBgWithId(funcname, data, flag, IdGen.Id())
return
}
// Status gets job status from job server.
func (client *Client) Status(handle string) (status *Status, err error) {
if client.conn == nil {
return nil, ErrLostConn
}
var mutex sync.Mutex
mutex.Lock()
client.innerHandler.put("s"+handle, func(resp *Response) {
defer mutex.Unlock()
var err error
status, err = resp._status()
if err != nil {
client.err(err)
}
})
req := getRequest()
req.DataType = dtGetStatus
req.Data = []byte(handle)
client.write(req)
mutex.Lock()
return
}
// Echo.
func (client *Client) Echo(data []byte) (echo []byte, err error) {
if client.conn == nil {
return nil, ErrLostConn
}
var mutex sync.Mutex
mutex.Lock()
client.innerHandler.put("e", func(resp *Response) {
echo = resp.Data
mutex.Unlock()
})
req := getRequest()
req.DataType = dtEchoReq
req.Data = data
client.write(req)
mutex.Lock()
return
}
// Close connection
func (client *Client) Close() (err error) {
client.Lock()
defer client.Unlock()
if client.conn != nil {
err = client.conn.Close()
client.conn = nil
}
return
}
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoWithId(funcname string, data []byte,
flag byte, h ResponseHandler, id string) (handle string, err error) {
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype, h, id)
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBgWithId(funcname string, data []byte,
flag byte, id string) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype, nil, id)
return
}

View File

@ -1,377 +0,0 @@
package client
import (
"crypto/md5"
"encoding/hex"
"errors"
"flag"
"fmt"
"os"
"testing"
"time"
)
const (
TestStr = "Hello world"
)
var (
client *Client
runIntegrationTests bool
)
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
flag.Parse()
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
code := m.Run()
os.Exit(code)
}
func TestClientAddServer(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add local server 127.0.0.1:4730")
var err error
if client, err = New(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
client.ErrorHandler = func(e error) {
t.Log(e)
}
}
func TestClientEcho(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
echo, err := client.Echo([]byte(TestStr))
if err != nil {
t.Error(err)
return
}
if string(echo) != TestStr {
t.Errorf("Echo error, %s expected, %s got", TestStr, echo)
return
}
}
func TestClientDoBg(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
handle, err := client.DoBg("ToUpper", []byte("abcdef"), JobLow)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
id := ""
_, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
handle, err := client.Do("ToUpper", []byte("abcdef"),
JobLow, jobHandler)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
id := ""
_, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDoWithIdCheckSameHandle(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123,categoryId:1}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 != handle2 {
t.Error("expecting the same handle when using the same id on the same Job name")
}
}
func TestClientDoWithIdCheckDifferentHandleOnDifferentJobs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("DeleteProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 == handle2 {
t.Error("expecting different handles because there are different job names")
}
}
func TestClientMultiDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
// This integration test requires that examples/pl/worker_multi.pl be running.
//
// Test invocation is:
// go test -integration -timeout 10s -run '^TestClient(AddServer|MultiDo)$'
//
// Send 1000 requests to go through all race conditions
const nreqs = 1000
errCh := make(chan error)
gotCh := make(chan string, nreqs)
olderrh := client.ErrorHandler
client.ErrorHandler = func(e error) { errCh <- e }
client.ResponseTimeout = 5 * time.Second
defer func() { client.ErrorHandler = olderrh }()
nextJobCh := make(chan struct{})
defer close(nextJobCh)
go func() {
for range nextJobCh {
start := time.Now()
handle, err := client.Do("PerlToUpper", []byte("abcdef"), JobNormal, func(r *Response) { gotCh <- string(r.Data) })
if err == ErrLostConn && time.Since(start) > client.ResponseTimeout {
errCh <- errors.New("Impossible 'lost conn', deadlock bug detected")
} else if err != nil {
errCh <- err
}
if handle == "" {
errCh <- errors.New("Handle is empty.")
}
}
}()
for i := 0; i < nreqs; i++ {
select {
case err := <-errCh:
t.Fatal(err)
case nextJobCh <- struct{}{}:
}
}
remaining := nreqs
for remaining > 0 {
select {
case err := <-errCh:
t.Fatal(err)
case got := <-gotCh:
if got != "ABCDEF" {
t.Error("Unexpected response from PerlDoUpper: ", got)
}
remaining--
t.Logf("%d response remaining", remaining)
}
}
}
func TestClientStatus(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
status, err := client.Status("handle not exists")
if err != nil {
t.Error(err)
return
}
if status.Known {
t.Errorf("The job (%s) shouldn't be known.", status.Handle)
return
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
return
}
handle, err := client.Do("Delay5sec", []byte("abcdef"), JobLow, nil)
if err != nil {
t.Error(err)
return
}
status, err = client.Status(handle)
if err != nil {
t.Error(err)
return
}
if !status.Known {
t.Errorf("The job (%s) should be known.", status.Handle)
return
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
return
}
}
func TestClientClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := client.Close(); err != nil {
t.Error(err)
}
}

View File

@ -1,75 +0,0 @@
package client
const (
Network = "tcp"
// queue size
queueSize = 8
// read buffer size
bufferSize = 8192
// min packet length
minPacketLength = 12
// \x00REQ
req = 5391697
reqStr = "\x00REQ"
// \x00RES
res = 5391699
resStr = "\x00RES"
// package data type
dtCanDo = 1
dtCantDo = 2
dtResetAbilities = 3
dtPreSleep = 4
dtNoop = 6
dtJobCreated = 8
dtGrabJob = 9
dtNoJob = 10
dtJobAssign = 11
dtWorkStatus = 12
dtWorkComplete = 13
dtWorkFail = 14
dtGetStatus = 15
dtEchoReq = 16
dtEchoRes = 17
dtError = 19
dtStatusRes = 20
dtSetClientId = 22
dtCanDoTimeout = 23
dtAllYours = 24
dtWorkException = 25
dtWorkData = 28
dtWorkWarning = 29
dtGrabJobUniq = 30
dtJobAssignUniq = 31
dtSubmitJob = 7
dtSubmitJobBg = 18
dtSubmitJobHigh = 21
dtSubmitJobHighBg = 32
dtSubmitJobLow = 33
dtSubmitJobLowBg = 34
WorkComplate = dtWorkComplete
WorkComplete = dtWorkComplete
WorkData = dtWorkData
WorkStatus = dtWorkStatus
WorkWarning = dtWorkWarning
WorkFail = dtWorkFail
WorkException = dtWorkException
)
const (
// Job type
JobNormal = iota
// low level
JobLow
// high level
JobHigh
)
func getBuffer(l int) (buf []byte) {
// TODO add byte buffer pool
buf = make([]byte, l)
return
}

View File

@ -1,31 +0,0 @@
package client
import (
"bytes"
"errors"
"fmt"
)
var (
ErrWorkWarning = errors.New("Work warning")
ErrInvalidData = errors.New("Invalid data")
ErrInvalidId = errors.New("Invalid ID")
ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type")
ErrLostConn = errors.New("Lost connection with Gearmand")
)
// Extract the error message
func getError(data []byte) (err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %v", data)
return
}
err = fmt.Errorf("%s: %s", rel[0], rel[1])
return
}
// Error handler
type ErrorHandler func(error)

View File

@ -1,42 +0,0 @@
package client
import (
"strconv"
"sync/atomic"
"time"
)
var (
// Global ID generator
// Default is an autoincrement ID generator
IdGen IdGenerator
)
func init() {
IdGen = NewAutoIncId()
}
// ID generator interface. Users can implament this for
// their own generator.
type IdGenerator interface {
Id() string
}
// AutoIncId
type autoincId struct {
value int64
}
func (ai *autoincId) Id() string {
next := atomic.AddInt64(&ai.value, 1)
return strconv.FormatInt(next, 10)
}
// NewAutoIncId returns an autoincrement ID generator
func NewAutoIncId() IdGenerator {
// we'll consider the nano fraction of a second at startup unique
// and count up from there.
return &autoincId{
value: int64(time.Now().Nanosecond()) << 32,
}
}

View File

@ -1,18 +0,0 @@
package client
import (
"testing"
)
func TestAutoInc(t *testing.T) {
ai := NewAutoIncId()
previous := ai.Id()
for i := 0; i < 10; i++ {
id := ai.Id()
if id == previous {
t.Errorf("Id not unique, previous and current %s", id)
}
previous = id
}
}

View File

@ -1,165 +0,0 @@
package client
import (
"errors"
"math/rand"
"sync"
)
const (
poolSize = 10
)
var (
ErrNotFound = errors.New("Server Not Found")
)
type PoolClient struct {
*Client
Rate int
mutex sync.Mutex
}
type SelectionHandler func(map[string]*PoolClient, string) string
func SelectWithRate(pool map[string]*PoolClient,
last string) (addr string) {
total := 0
for _, item := range pool {
total += item.Rate
if rand.Intn(total) < item.Rate {
return item.addr
}
}
return last
}
func SelectRandom(pool map[string]*PoolClient,
last string) (addr string) {
r := rand.Intn(len(pool))
i := 0
for k, _ := range pool {
if r == i {
return k
}
i++
}
return last
}
type Pool struct {
SelectionHandler SelectionHandler
ErrorHandler ErrorHandler
Clients map[string]*PoolClient
last string
mutex sync.Mutex
}
// NewPool returns a new pool.
func NewPool() (pool *Pool) {
return &Pool{
Clients: make(map[string]*PoolClient, poolSize),
SelectionHandler: SelectWithRate,
}
}
// Add a server with rate.
func (pool *Pool) Add(net, addr string, rate int) (err error) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
var item *PoolClient
var ok bool
if item, ok = pool.Clients[addr]; ok {
item.Rate = rate
} else {
var client *Client
client, err = New(net, addr)
if err == nil {
item = &PoolClient{Client: client, Rate: rate}
pool.Clients[addr] = item
}
}
return
}
// Remove a server.
func (pool *Pool) Remove(addr string) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
delete(pool.Clients, addr)
}
func (pool *Pool) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (addr, handle string, err error) {
client := pool.selectServer()
client.Lock()
defer client.Unlock()
handle, err = client.Do(funcname, data, flag, h)
addr = client.addr
return
}
func (pool *Pool) DoBg(funcname string, data []byte,
flag byte) (addr, handle string, err error) {
client := pool.selectServer()
client.Lock()
defer client.Unlock()
handle, err = client.DoBg(funcname, data, flag)
addr = client.addr
return
}
// Status gets job status from job server.
// !!!Not fully tested.!!!
func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
if client, ok := pool.Clients[addr]; ok {
client.Lock()
defer client.Unlock()
status, err = client.Status(handle)
} else {
err = ErrNotFound
}
return
}
// Send a something out, get the samething back.
func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) {
var client *PoolClient
if addr == "" {
client = pool.selectServer()
} else {
var ok bool
if client, ok = pool.Clients[addr]; !ok {
err = ErrNotFound
return
}
}
client.Lock()
defer client.Unlock()
echo, err = client.Echo(data)
return
}
// Close
func (pool *Pool) Close() (err map[string]error) {
err = make(map[string]error)
for _, c := range pool.Clients {
err[c.addr] = c.Close()
}
return
}
// selecting server
func (pool *Pool) selectServer() (client *PoolClient) {
for client == nil {
addr := pool.SelectionHandler(pool.Clients, pool.last)
var ok bool
if client, ok = pool.Clients[addr]; ok {
pool.last = addr
break
}
}
return
}

View File

@ -1,139 +0,0 @@
package client
import (
"testing"
)
var (
pool = NewPool()
)
func TestPoolAdd(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add servers")
c := 2
if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil {
t.Fatal(err)
}
if err := pool.Add("tcp4", "127.0.1.1:4730", 1); err != nil {
t.Log(err)
c -= 1
}
if len(pool.Clients) != c {
t.Errorf("%d servers expected, %d got.", c, len(pool.Clients))
}
}
func TestPoolEcho(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
echo, err := pool.Echo("", []byte(TestStr))
if err != nil {
t.Error(err)
return
}
if string(echo) != TestStr {
t.Errorf("Invalid echo data: %s", echo)
return
}
_, err = pool.Echo("not exists", []byte(TestStr))
if err != ErrNotFound {
t.Errorf("ErrNotFound expected, got %s", err)
}
}
func TestPoolDoBg(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
addr, handle, err := pool.DoBg("ToUpper",
[]byte("abcdef"), JobLow)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
}
}
func TestPoolDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
addr, handle, err := pool.Do("ToUpper",
[]byte("abcdef"), JobLow, jobHandler)
if err != nil {
t.Error(err)
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(addr, handle)
}
}
func TestPoolStatus(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
status, err := pool.Status("127.0.0.1:4730", "handle not exists")
if err != nil {
t.Error(err)
return
}
if status.Known {
t.Errorf("The job (%s) shouldn't be known.", status.Handle)
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
}
addr, handle, err := pool.Do("Delay5sec",
[]byte("abcdef"), JobLow, nil)
if err != nil {
t.Error(err)
return
}
status, err = pool.Status(addr, handle)
if err != nil {
t.Error(err)
return
}
if !status.Known {
t.Errorf("The job (%s) should be known.", status.Handle)
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
}
status, err = pool.Status("not exists", "not exists")
if err != ErrNotFound {
t.Error(err)
return
}
}
func TestPoolClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
return
if err := pool.Close(); err != nil {
t.Error(err)
}
}

View File

@ -1,42 +0,0 @@
package client
import (
"encoding/binary"
)
// Request from client
type request struct {
DataType uint32
Data []byte
}
// Encode a Request to byte slice
func (req *request) Encode() (data []byte) {
l := len(req.Data) // length of data
tl := l + minPacketLength // add 12 bytes head
data = getBuffer(tl)
copy(data[:4], reqStr)
binary.BigEndian.PutUint32(data[4:8], req.DataType)
binary.BigEndian.PutUint32(data[8:12], uint32(l))
copy(data[minPacketLength:], req.Data)
return
}
func getRequest() (req *request) {
// TODO add a pool
req = &request{}
return
}
func getJob(id string, funcname, data []byte) (req *request) {
req = getRequest()
a := len(funcname)
b := len(id)
c := len(data)
l := a + b + c + 2
req.Data = getBuffer(l)
copy(req.Data[0:a], funcname)
copy(req.Data[a+1:a+b+1], []byte(id))
copy(req.Data[a+b+2:], data)
return
}

View File

@ -1,156 +0,0 @@
package client
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
)
// Response handler
type ResponseHandler func(*Response)
// response
type Response struct {
DataType uint32
Data, UID []byte
Handle string
}
// Extract the Response's result.
// if data == nil, err != nil, then worker failing to execute job
// if data != nil, err != nil, then worker has a exception
// if data != nil, err == nil, then worker complate job
// after calling this method, the Response.Handle will be filled
func (resp *Response) Result() (data []byte, err error) {
switch resp.DataType {
case dtWorkFail:
resp.Handle = string(resp.Data)
err = ErrWorkFail
return
case dtWorkException:
err = ErrWorkException
fallthrough
case dtWorkComplete:
data = resp.Data
default:
err = ErrDataType
}
return
}
// Extract the job's update
func (resp *Response) Update() (data []byte, err error) {
if resp.DataType != dtWorkData &&
resp.DataType != dtWorkWarning {
err = ErrDataType
return
}
data = resp.Data
if resp.DataType == dtWorkWarning {
err = ErrWorkWarning
}
return
}
// Decode a job from byte slice
func decodeResponse(data []byte) (resp *Response, l int, err error) {
a := len(data)
if a < minPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %v", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
if a < minPacketLength+dl {
err = fmt.Errorf("Invalid data: %v", data)
return
}
dt := data[minPacketLength : dl+minPacketLength]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %v", data)
return
}
resp = getResponse()
resp.DataType = binary.BigEndian.Uint32(data[4:8])
switch resp.DataType {
case dtJobCreated:
resp.Handle = string(dt)
case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus,
dtWorkComplete, dtWorkException:
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 2 {
resp.Handle = string(s[0])
resp.Data = s[1]
} else {
err = fmt.Errorf("Invalid data: %v", data)
return
}
case dtWorkFail:
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 1 {
resp.Handle = string(s[0])
} else {
err = fmt.Errorf("Invalid data: %v", data)
return
}
case dtEchoRes:
fallthrough
default:
resp.Data = dt
}
l = dl + minPacketLength
return
}
func (resp *Response) Status() (status *Status, err error) {
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
if len(data) != 2 {
err = fmt.Errorf("Invalid data: %v", resp.Data)
return
}
status = &Status{}
status.Handle = resp.Handle
status.Known = true
status.Running = true
status.Numerator, err = strconv.ParseUint(string(data[0]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[0])
return
}
status.Denominator, err = strconv.ParseUint(string(data[1]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[1])
return
}
return
}
// status handler
func (resp *Response) _status() (status *Status, err error) {
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 4)
if len(data) != 4 {
err = fmt.Errorf("Invalid data: %v", resp.Data)
return
}
status = &Status{}
status.Handle = resp.Handle
status.Known = (data[0][0] == '1')
status.Running = (data[1][0] == '1')
status.Numerator, err = strconv.ParseUint(string(data[2]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[2])
return
}
status.Denominator, err = strconv.ParseUint(string(data[3]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[3])
return
}
return
}
func getResponse() (resp *Response) {
// TODO add a pool
resp = &Response{}
return
}

View File

@ -1,11 +0,0 @@
package client
// Status handler
// handle, known, running, numerator, denominator
type StatusHandler func(string, bool, bool, uint64, uint64)
type Status struct {
Handle string
Known, Running bool
Numerator, Denominator uint64
}

46
example/client.go Normal file
View File

@ -0,0 +1,46 @@
package main
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"bitbucket.org/mikespook/gearman-go/gearman/client"
"log"
)
func main() {
client := client.New()
defer client.Close()
if err := client.AddServer("127.0.0.1:4730"); err != nil {
log.Fatalln(err)
}
echo := []byte("Hello\x00 world")
if data, err := client.Echo(echo); err != nil {
log.Fatalln(string(data))
}
handle, err := client.Do("ToUpper", echo, gearman.JOB_NORMAL)
if err != nil {
log.Fatalln(err)
} else {
log.Println(handle)
job := <-client.JobQueue
if data, err := job.Result(); err != nil {
log.Fatalln(err)
} else {
log.Println(string(data))
}
}
known, running, numerator, denominator, err := client.Status(handle)
if err != nil {
log.Fatalln(err)
}
if !known {
log.Println("Unknown")
}
if running {
log.Printf("%g%%\n", float32(numerator)*100/float32(denominator))
} else {
log.Println("Not running")
}
}

View File

@ -1,80 +0,0 @@
package main
import (
"github.com/mikespook/gearman-go/client"
"log"
"os"
"sync"
)
func main() {
// Set the autoinc id generator
// You can write your own id generator
// by implementing IdGenerator interface.
// client.IdGen = client.NewAutoIncId()
c, err := client.New(client.Network, "127.0.0.1:4730")
if err != nil {
log.Fatalln(err)
}
defer c.Close()
c.ErrorHandler = func(e error) {
log.Println(e)
os.Exit(1)
}
echo := []byte("Hello\x00 world")
echomsg, err := c.Echo(echo)
if err != nil {
log.Fatalln(err)
}
log.Println(string(echomsg))
jobHandler := func(resp *client.Response) {
switch resp.DataType {
case client.WorkException:
fallthrough
case client.WorkFail:
fallthrough
case client.WorkComplate:
if data, err := resp.Result(); err == nil {
log.Printf("RESULT: %v\n", data)
} else {
log.Printf("RESULT: %s\n", err)
}
case client.WorkWarning:
fallthrough
case client.WorkData:
if data, err := resp.Update(); err == nil {
log.Printf("UPDATE: %v\n", data)
} else {
log.Printf("UPDATE: %v, %s\n", data, err)
}
case client.WorkStatus:
if data, err := resp.Status(); err == nil {
log.Printf("STATUS: %v\n", data)
} else {
log.Printf("STATUS: %s\n", err)
}
default:
log.Printf("UNKNOWN: %v", resp.Data)
}
}
handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler)
if err != nil {
log.Fatalln(err)
}
status, err := c.Status(handle)
if err != nil {
log.Fatalln(err)
}
log.Printf("%v", *status)
_, err = c.Do("Foobar", echo, client.JobNormal, jobHandler)
if err != nil {
log.Fatalln(err)
}
log.Println("Press Ctrl-C to exit ...")
var mutex sync.Mutex
mutex.Lock()
mutex.Lock()
}

View File

@ -1,33 +0,0 @@
#!/usr/bin/perl
# Runs 20 children that expose "PerlToUpper" before returning the result.
use strict; use warnings;
use constant CHILDREN => 20;
use Time::HiRes qw(usleep);
use Gearman::Worker;
$|++;
my @child_pids;
for (1 .. CHILDREN) {
if (my $pid = fork) {
push @child_pids, $pid;
next;
}
eval {
my $w = Gearman::Worker->new(job_servers => '127.0.0.1:4730');
$w->register_function(PerlToUpper => sub { print "."; uc $_[0]->arg });
$w->work while 1;
};
warn $@ if $@;
exit 0;
}
$SIG{INT} = $SIG{HUP} = sub {
kill 9, @child_pids;
print "\nChildren shut down, gracefully exiting\n";
exit 0;
};
printf "Forked %d children, serving 'PerlToUpper' function to gearman\n", CHILDREN;
sleep;

View File

@ -12,38 +12,10 @@ def check_request_status(job_request):
def main():
client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730'])
try:
completed_job_request = client.submit_job("ToUpper", "arbitrary binary data")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("ToUpperTimeOut5", "arbitrary binary data")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("ToUpperTimeOut20", "arbitrary binary data")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("SysInfo", "")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("MemInfo", "")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
completed_job_request = client.submit_job("ToUpper", "arbitrary binary data")
check_request_status(completed_job_request)
if __name__ == '__main__':
main()
for i in range(100):
main()

61
example/worker.go Normal file
View File

@ -0,0 +1,61 @@
package main
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"bitbucket.org/mikespook/gearman-go/gearman/worker"
"bitbucket.org/mikespook/golib/signal"
"os"
"fmt"
"log"
"strings"
)
func ToUpper(job *worker.WorkerJob) ([]byte, error) {
data := []byte(strings.ToUpper(string(job.Data)))
return data, nil
}
func main() {
w := worker.New(worker.Unlimit)
w.ErrFunc = func(e error) {
log.Println(e)
}
w.AddServer("127.0.0.1:4730")
w.AddFunction("ToUpper", ToUpper, 0)
w.AddFunction("ToUpperTimeOut5", ToUpper, 5)
// Catch the interrupt to exit the working loop.
sh := signal.NewHandler()
sh.Bind(os.Interrupt, func() bool {
w.Close()
return true
})
go sh.Loop()
go func() {
log.Println("start worker")
for {
print("cmd: ")
var str string
fmt.Scan(&str)
switch str {
case "echo":
w.Echo([]byte("Hello world!"))
var job *worker.WorkerJob
for job = <-w.JobQueue; job.DataType != gearman.ECHO_RES; job = <-w.JobQueue {
log.Println(job)
}
log.Println(string(job.Data))
case "quit":
os.Exit(0)
return
case "result":
job := <-w.JobQueue
log.Println(string(job.Data))
default:
log.Println("Unknown command")
}
}
}()
w.Work()
}

View File

@ -1,74 +0,0 @@
package main
import (
"log"
"net"
"os"
"strings"
"time"
"github.com/mikespook/gearman-go/worker"
"github.com/mikespook/golib/signal"
)
func ToUpper(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data())
data := []byte(strings.ToUpper(string(job.Data())))
return data, nil
}
func ToUpperDelay10(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data())
time.Sleep(10 * time.Second)
data := []byte(strings.ToUpper(string(job.Data())))
return data, nil
}
func Foobar(job worker.Job) ([]byte, error) {
log.Printf("Foobar: Data=[%s]\n", job.Data())
for i := 0; i < 10; i++ {
job.SendWarning([]byte{byte(i)})
job.SendData([]byte{byte(i)})
job.UpdateStatus(i+1, 100)
}
return job.Data(), nil
}
func main() {
log.Println("Starting ...")
defer log.Println("Shutdown complete!")
w := worker.New(worker.Unlimited)
defer w.Close()
w.ErrorHandler = func(e error) {
log.Println(e)
if opErr, ok := e.(*net.OpError); ok {
if !opErr.Temporary() {
proc, err := os.FindProcess(os.Getpid())
if err != nil {
log.Println(err)
}
if err := proc.Signal(os.Interrupt); err != nil {
log.Println(err)
}
}
}
}
w.JobHandler = func(job worker.Job) error {
log.Printf("Data=%s\n", job.Data())
return nil
}
w.AddServer("tcp4", "127.0.0.1:4730")
w.AddFunc("Foobar", Foobar, worker.Unlimited)
w.AddFunc("ToUpper", ToUpper, worker.Unlimited)
w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
w.AddFunc("SysInfo", worker.SysInfo, worker.Unlimited)
w.AddFunc("MemInfo", worker.MemInfo, worker.Unlimited)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work()
signal.Bind(os.Interrupt, func() uint { return signal.BreakExit })
signal.Wait()
}

View File

@ -1,19 +1,15 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Copyright 2012 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
/*
This module is a Gearman API for the Go Programming Language.
The protocols were written in pure Go. It contains two sub-packages:
The client package is used for sending jobs to the Gearman job server,
and getting responses from the server.
import "github.com/mikespook/gearman-go/client"
The worker package will help developers to develop Gearman's worker
in an easy way.
import "github.com/mikespook/gearman-go/worker"
This module is Gearman API for golang.
The protocol was implemented by native way.
*/
package gearman
import (
_ "bitbucket.org/mikespook/gearman-go/gearman/client"
_ "bitbucket.org/mikespook/gearman-go/gearman/worker"
)

278
gearman/client/client.go Normal file
View File

@ -0,0 +1,278 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"bytes"
"io"
"net"
"strconv"
"sync"
)
/*
The client side api for gearman.
usage:
client = NewClient()
client.AddServer("127.0.0.1:4730")
handle := client.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
*/
type Client struct {
mutex sync.Mutex
conn net.Conn
incoming chan []byte
JobQueue chan *ClientJob
UId uint32
}
// Create a new client.
func New() (client *Client) {
return &Client{
JobQueue: make(chan *ClientJob, gearman.QUEUE_CAP),
incoming: make(chan []byte, gearman.QUEUE_CAP),
UId:1}
}
// Add a server.
// In this version, one client connect to one job server.
// Sample is better. Plz do the load balancing by your self.
func (client *Client) AddServer(addr string) (err error) {
client.conn, err = net.Dial(gearman.TCP, addr)
if err != nil {
return
}
return
}
// Internal read
func (client *Client) read() (data []byte, err error) {
if len(client.incoming) > 0 {
// incoming queue is not empty
data = <-client.incoming
} else {
// empty queue, read data from socket
for {
buf := make([]byte, gearman.BUFFER_SIZE)
var n int
if n, err = client.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 {
break
}
return
}
data = append(data, buf[0:n]...)
if n < gearman.BUFFER_SIZE {
break
}
}
}
// split package
start, end := 0, 4
tl := len(data)
for i := 0; i < tl; i++ {
if string(data[start:end]) == gearman.RES_STR {
l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]}))
total := l + 12
if total == tl {
return
} else {
client.incoming <- data[total:]
data = data[:total]
return
}
} else {
start++
end++
}
}
err = gearman.ErrInvalidData
return
}
// Read a job from job server.
// This function will return the job, and add it to the job queue.
func (client *Client) ReadJob() (job *ClientJob, err error) {
var rel []byte
if rel, err = client.read(); err != nil {
return
}
if job, err = DecodeClientJob(rel); err != nil {
return
} else {
switch job.DataType {
case gearman.ERROR:
_, err = gearman.GetError(job.Data)
return
case gearman.WORK_DATA, gearman.WORK_WARNING, gearman.WORK_STATUS, gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION:
client.JobQueue <- job
}
}
return
}
// Do the function.
// funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
// and if it is background job: JOB_BG.
// JOB_LOW | JOB_BG means the job is running with low level in background.
func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) {
var datatype uint32
if flag&gearman.JOB_LOW == gearman.JOB_LOW {
if flag&gearman.JOB_BG == gearman.JOB_BG {
datatype = gearman.SUBMIT_JOB_LOW_BG
} else {
datatype = gearman.SUBMIT_JOB_LOW
}
} else if flag&gearman.JOB_HIGH == gearman.JOB_HIGH {
if flag&gearman.JOB_BG == gearman.JOB_BG {
datatype = gearman.SUBMIT_JOB_HIGH_BG
} else {
datatype = gearman.SUBMIT_JOB_HIGH
}
} else if flag&gearman.JOB_BG == gearman.JOB_BG {
datatype = gearman.SUBMIT_JOB_BG
} else {
datatype = gearman.SUBMIT_JOB
}
rel := make([]byte, 0, 1024*64)
rel = append(rel, []byte(funcname)...)
rel = append(rel, '\x00')
client.mutex.Lock()
uid := strconv.Itoa(int(client.UId))
client.UId++
rel = append(rel, []byte(uid)...)
client.mutex.Unlock()
rel = append(rel, '\x00')
rel = append(rel, data...)
if err = client.WriteJob(NewClientJob(gearman.REQ, datatype, rel)); err != nil {
return
}
var job *ClientJob
if job, err = client.readLastJob(gearman.JOB_CREATED); err != nil {
return
}
handle = string(job.Data)
go func() {
if flag&gearman.JOB_BG != gearman.JOB_BG {
for {
if job, err = client.ReadJob(); err != nil {
return
}
switch job.DataType {
case gearman.WORK_DATA, gearman.WORK_WARNING:
case gearman.WORK_STATUS:
case gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION:
return
}
}
}
}()
return
}
// Internal read last job
func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err error) {
for {
if job, err = client.ReadJob(); err != nil {
return
}
if job.DataType == datatype {
break
}
}
if job.DataType != datatype {
err = gearman.ErrDataType
}
return
}
// Get job status from job server.
// !!!Not fully tested.!!!
func (client *Client) Status(handle string) (known, running bool, numerator, denominator uint64, err error) {
if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.GET_STATUS, []byte(handle))); err != nil {
return
}
var job *ClientJob
if job, err = client.readLastJob(gearman.STATUS_RES); err != nil {
return
}
data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
if len(data) != 5 {
err = gearman.ErrInvalidData
return
}
if handle != string(data[0]) {
err = gearman.ErrInvalidData
return
}
known = data[1][0] == '1'
running = data[2][0] == '1'
if numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0); err != nil {
return
}
if denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0); err != nil {
return
}
return
}
// Send a something out, get the samething back.
func (client *Client) Echo(data []byte) (echo []byte, err error) {
if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.ECHO_REQ, data)); err != nil {
return
}
var job *ClientJob
if job, err = client.readLastJob(gearman.ECHO_RES); err != nil {
return
}
echo = job.Data
return
}
// Get the last job.
// the job means a network package.
// Normally, it is the job executed result.
func (client *Client) LastJob() (job *ClientJob) {
if l := len(client.JobQueue); l != 1 {
if l == 0 {
return
}
for i := 0; i < l-1; i++ {
<-client.JobQueue
}
}
return <-client.JobQueue
}
// Send the job to job server.
func (client *Client) WriteJob(job *ClientJob) (err error) {
return client.write(job.Encode())
}
// Internal write
func (client *Client) write(buf []byte) (err error) {
var n int
for i := 0; i < len(buf); i += n {
n, err = client.conn.Write(buf[i:])
if err != nil {
return
}
}
return
}
// Close.
func (client *Client) Close() (err error) {
err = client.conn.Close()
close(client.JobQueue)
return
}

View File

@ -0,0 +1,41 @@
package client
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"testing"
)
var client *Client
func init() {
client = NewClient()
}
func TestClientAddServer(t *testing.T) {
t.Log("Add local server 127.0.0.1:4730")
if err := client.AddServer("127.0.0.1:4730"); err != nil {
t.Error(err)
}
}
func TestClientEcho(t *testing.T) {
if echo, err := client.Echo([]byte("Hello world")); err != nil {
t.Error(err)
} else {
t.Log(echo)
}
}
func TestClientDo(t *testing.T) {
if handle, err := client.Do("ToUpper", []byte("abcdef"), gearman.JOB_LOW|gearman.JOB_BG); err != nil {
t.Error(err)
} else {
t.Log(handle)
}
}
func TestClientClose(t *testing.T) {
if err := client.Close(); err != nil {
t.Error(err)
}
}

View File

@ -0,0 +1,98 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"bytes"
)
// Client side job
type ClientJob struct {
Data []byte
Handle, UniqueId string
magicCode, DataType uint32
}
// Create a new job
func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) {
return &ClientJob{magicCode: magiccode,
DataType: datatype,
Data: data}
}
// Decode a job from byte slice
func DecodeClientJob(data []byte) (job *ClientJob, err error) {
if len(data) < 12 {
err = gearman.ErrInvalidData
return
}
datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) {
err = gearman.ErrInvalidData
return
}
data = data[12:]
job = NewClientJob(gearman.RES, datatype, data)
return
}
// Encode a job to byte slice
func (job *ClientJob) Encode() (data []byte) {
magiccode := gearman.Uint32ToBytes(job.magicCode)
datatype := gearman.Uint32ToBytes(job.DataType)
data = make([]byte, 0, 1024*64)
data = append(data, magiccode[:]...)
data = append(data, datatype[:]...)
l := len(job.Data)
datalength := gearman.Uint32ToBytes(uint32(l))
data = append(data, datalength[:]...)
data = append(data, job.Data...)
return
}
// Extract the job's result.
func (job *ClientJob) Result() (data []byte, err error) {
switch job.DataType {
case gearman.WORK_FAIL:
job.Handle = string(job.Data)
err = gearman.ErrWorkFail
return
case gearman.WORK_EXCEPTION:
err = gearman.ErrWorkException
fallthrough
case gearman.WORK_COMPLETE:
s := bytes.SplitN(job.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = gearman.ErrInvalidData
return
}
job.Handle = string(s[0])
data = s[1]
default:
err = gearman.ErrDataType
}
return
}
// Extract the job's update
func (job *ClientJob) Update() (data []byte, err error) {
if job.DataType != gearman.WORK_DATA && job.DataType != gearman.WORK_WARNING {
err = gearman.ErrDataType
return
}
s := bytes.SplitN(job.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = gearman.ErrInvalidData
return
}
if job.DataType == gearman.WORK_WARNING {
err = gearman.ErrWorkWarning
}
job.Handle = string(s[0])
data = s[1]
return
}

123
gearman/gearman.go Normal file
View File

@ -0,0 +1,123 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
/*
This module is Gearman API for golang.
The protocol was implemented by native way.
*/
package gearman
import (
"bytes"
"errors"
"syscall"
)
const (
// tcp4 is tested. You can modify this to 'tcp' for both ipv4 and ipv6,
// or 'tcp6' only for ipv6.
TCP = "tcp4"
// the number limited for job servers.
WORKER_SERVER_CAP = 32
// the number limited for functions.
WORKER_FUNCTION_CAP = 512
// queue size
QUEUE_CAP = 512
// read buffer size
BUFFER_SIZE = 1024
// \x00REQ
REQ = 5391697
REQ_STR = "\x00REQ"
// \x00RES
RES = 5391699
RES_STR = "\x00RES"
// package data type
CAN_DO = 1
CANT_DO = 2
RESET_ABILITIES = 3
PRE_SLEEP = 4
NOOP = 6
JOB_CREATED = 8
GRAB_JOB = 9
NO_JOB = 10
JOB_ASSIGN = 11
WORK_STATUS = 12
WORK_COMPLETE = 13
WORK_FAIL = 14
GET_STATUS = 15
ECHO_REQ = 16
ECHO_RES = 17
ERROR = 19
STATUS_RES = 20
SET_CLIENT_ID = 22
CAN_DO_TIMEOUT = 23
WORK_EXCEPTION = 25
WORK_DATA = 28
WORK_WARNING = 29
GRAB_JOB_UNIQ = 30
JOB_ASSIGN_UNIQ = 31
SUBMIT_JOB = 7
SUBMIT_JOB_BG = 18
SUBMIT_JOB_HIGH = 21
SUBMIT_JOB_HIGH_BG = 32
SUBMIT_JOB_LOW = 33
SUBMIT_JOB_LOW_BG = 34
// Job type
// JOB_NORMAL | JOB_BG means a normal level job run in background
// normal level
JOB_NORMAL = 0
// background job
JOB_BG = 1
// low level
JOB_LOW = 2
// high level
JOB_HIGH = 4
)
var (
ErrIsNotErr = errors.New("The input is not a error data.")
ErrInvalidData = errors.New("Invalid data.")
ErrWorkWarning = errors.New("Work warning.")
ErrWorkFail = errors.New("Work fail.")
ErrWorkException = errors.New("Work exeption.")
ErrDataType = errors.New("Invalid data type.")
ErrOutOfCap = errors.New("Out of the capability.")
ErrNotConn = errors.New("Did not connect to job server.")
ErrFuncNotFound = errors.New("The function was not found.")
)
// Extract the error message
func GetError(data []byte) (eno syscall.Errno, err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = ErrIsNotErr
return
}
l := len(rel[0])
eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]}))
err = errors.New(string(rel[1]))
return
}
// Decode [4]byte to uint32
func BytesToUint32(buf [4]byte) uint32 {
return uint32(buf[0])<<24 +
uint32(buf[1])<<16 +
uint32(buf[2])<<8 +
uint32(buf[3])
}
// Encode uint32 to [4]byte
func Uint32ToBytes(i uint32) (data [4]byte) {
data[0] = byte((i >> 24) & 0xff)
data[1] = byte((i >> 16) & 0xff)
data[2] = byte((i >> 8) & 0xff)
data[3] = byte(i & 0xff)
return
}

131
gearman/worker/jobagent.go Normal file
View File

@ -0,0 +1,131 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"io"
"net"
)
// The agent of job server.
type jobAgent struct {
conn net.Conn
worker *Worker
running bool
incoming chan []byte
}
// Create the agent of job server.
func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err error) {
conn, err := net.Dial(gearman.TCP, addr)
if err != nil {
return nil, err
}
jobagent = &jobAgent{conn: conn, worker: worker, running: true, incoming: make(chan []byte, gearman.QUEUE_CAP)}
return jobagent, err
}
// Internal read
func (agent *jobAgent) read() (data []byte, err error) {
if len(agent.incoming) > 0 {
// incoming queue is not empty
data = <-agent.incoming
} else {
for {
buf := make([]byte, gearman.BUFFER_SIZE)
var n int
if n, err = agent.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 {
err = nil
return
}
return
}
data = append(data, buf[0:n]...)
if n < gearman.BUFFER_SIZE {
break
}
}
}
// split package
start := 0
tl := len(data)
for i := 0; i < tl; i++ {
if string(data[start:start+4]) == gearman.RES_STR {
l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]}))
total := l + 12
if total == tl {
return
} else {
agent.incoming <- data[total:]
data = data[:total]
return
}
} else {
start++
}
}
err = gearman.ErrInvalidData
return
}
// Main loop.
func (agent *jobAgent) Work() {
noop := true
for agent.running {
// got noop msg and incoming queue is zero, grab job
if noop && len(agent.incoming) == 0 {
agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.GRAB_JOB, nil))
}
rel, err := agent.read()
if err != nil {
agent.worker.err(err)
continue
}
job, err := DecodeWorkerJob(rel)
if err != nil {
agent.worker.err(err)
continue
} else {
switch job.DataType {
case gearman.NOOP:
noop = true
case gearman.NO_JOB:
noop = false
agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.PRE_SLEEP, nil))
case gearman.ECHO_RES, gearman.JOB_ASSIGN_UNIQ, gearman.JOB_ASSIGN:
job.agent = agent
agent.worker.incoming <- job
}
}
}
return
}
// Send a job to the job server.
func (agent *jobAgent) WriteJob(job *WorkerJob) (err error) {
return agent.write(job.Encode())
}
// Internal write the encoded job.
func (agent *jobAgent) write(buf []byte) (err error) {
var n int
for i := 0; i < len(buf); i += n {
n, err = agent.conn.Write(buf[i:])
if err != nil {
return err
}
}
return
}
// Close.
func (agent *jobAgent) Close() (err error) {
agent.running = false
close(agent.incoming)
err = agent.conn.Close()
return
}

286
gearman/worker/worker.go Normal file
View File

@ -0,0 +1,286 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"bytes"
"sync"
)
const (
Unlimit = 0
OneByOne = 1
)
// The definition of the callback function.
type JobFunction func(job *WorkerJob) ([]byte, error)
// Map for added function.
type JobFunctionMap map[string]JobFunction
// Error Function
type ErrFunc func(e error)
/*
Worker side api for gearman.
usage:
w = worker.New(worker.Unlimit)
w.AddFunction("foobar", foobar)
w.AddServer("127.0.0.1:4730")
w.Work() // Enter the worker's main loop
The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
It looks like this:
func foobar(job *WorkerJob) (data []byte, err os.Error) {
//sth. here
//plaplapla...
return
}
*/
type Worker struct {
clients []*jobAgent
functions JobFunctionMap
running bool
incoming chan *WorkerJob
mutex sync.Mutex
limit chan bool
JobQueue chan *WorkerJob
// assign a ErrFunc to handle errors
// Must assign befor AddServer
ErrFunc ErrFunc
}
// Get a new worker
func New(l int) (worker *Worker) {
worker = &Worker{
// job server list
clients: make([]*jobAgent, 0, gearman.WORKER_SERVER_CAP),
// function list
functions: make(JobFunctionMap),
incoming: make(chan *WorkerJob, gearman.QUEUE_CAP),
JobQueue: make(chan *WorkerJob, gearman.QUEUE_CAP),
running: true,
}
if l != Unlimit {
worker.limit = make(chan bool, l)
for i := 0; i < l; i ++ {
worker.limit <- true
}
}
return
}
//
func (worker *Worker)err(e error) {
if worker.ErrFunc != nil {
worker.ErrFunc(e)
}
}
// Add a server. The addr should be 'host:port' format.
// The connection is established at this time.
func (worker *Worker) AddServer(addr string) (err error) {
worker.mutex.Lock()
defer worker.mutex.Unlock()
if len(worker.clients) == cap(worker.clients) {
return gearman.ErrOutOfCap
}
// Create a new job server's client as a agent of server
server, err := newJobAgent(addr, worker)
if err != nil {
return err
}
n := len(worker.clients)
worker.clients = worker.clients[0 : n+1]
worker.clients[n] = server
return
}
// Add a function.
// Plz added job servers first, then functions.
// The API will tell every connected job server that 'I can do this'
func (worker *Worker) AddFunction(funcname string,
f JobFunction, timeout uint32) (err error) {
if len(worker.clients) < 1 {
return gearman.ErrNotConn
}
worker.mutex.Lock()
defer worker.mutex.Unlock()
worker.functions[funcname] = f
var datatype uint32
var data []byte
if timeout == 0 {
datatype = gearman.CAN_DO
data = []byte(funcname)
} else {
datatype = gearman.CAN_DO_TIMEOUT
data = []byte(funcname + "\x00")
t := gearman.Uint32ToBytes(timeout)
data = append(data, t[:]...)
}
job := NewWorkerJob(gearman.REQ, datatype, data)
worker.WriteJob(job)
return
}
// Remove a function.
// Tell job servers 'I can not do this now' at the same time.
func (worker *Worker) RemoveFunction(funcname string) (err error) {
worker.mutex.Lock()
defer worker.mutex.Unlock()
if worker.functions[funcname] == nil {
return gearman.ErrFuncNotFound
}
delete(worker.functions, funcname)
job := NewWorkerJob(gearman.REQ, gearman.CANT_DO, []byte(funcname))
worker.WriteJob(job)
return
}
// Main loop
func (worker *Worker) Work() {
for _, v := range worker.clients {
go v.Work()
}
for worker.running || len(worker.incoming) > 0{
select {
case job := <-worker.incoming:
if job == nil {
break
}
switch job.DataType {
case gearman.NO_JOB:
// do nothing
case gearman.ERROR:
_, err := gearman.GetError(job.Data)
worker.err(err)
case gearman.JOB_ASSIGN, gearman.JOB_ASSIGN_UNIQ:
go func() {
if err := worker.exec(job); err != nil {
worker.err(err)
}
}()
default:
worker.JobQueue <- job
}
}
}
close(worker.incoming)
}
// Get the last job in queue.
// If there are more than one job in the queue,
// the last one will be returned,
// the others will be lost.
func (worker *Worker) LastJob() (job *WorkerJob) {
if l := len(worker.JobQueue); l != 1 {
if l == 0 {
return
}
for i := 0; i < l-1; i++ {
<-worker.JobQueue
}
}
return <-worker.JobQueue
}
// Close.
func (worker *Worker) Close() (err error) {
for _, v := range worker.clients {
err = v.Close()
}
worker.running = false
return err
}
// Write a job to job server.
// Here, the job's mean is not the oraginal mean.
// Just looks like a network package for job's result or tell job server, there was a fail.
func (worker *Worker) WriteJob(job *WorkerJob) (err error) {
e := make(chan error)
for _, v := range worker.clients {
go func() {
e <- v.WriteJob(job)
}()
}
return <-e
}
// Send a something out, get the samething back.
func (worker *Worker) Echo(data []byte) (err error) {
job := NewWorkerJob(gearman.REQ, gearman.ECHO_REQ, data)
return worker.WriteJob(job)
}
// Remove all of functions.
// Both from the worker or job servers.
func (worker *Worker) Reset() (err error) {
job := NewWorkerJob(gearman.REQ, gearman.RESET_ABILITIES, nil)
err = worker.WriteJob(job)
worker.functions = make(JobFunctionMap)
return
}
// Set the worker's unique id.
func (worker *Worker) SetId(id string) (err error) {
job := NewWorkerJob(gearman.REQ, gearman.SET_CLIENT_ID, []byte(id))
return worker.WriteJob(job)
}
// Execute the job. And send back the result.
func (worker *Worker) exec(job *WorkerJob) (err error) {
if worker.limit != nil {
<- worker.limit
defer func() {
worker.limit <- true
}()
}
var limit int
if job.DataType == gearman.JOB_ASSIGN {
limit = 3
} else {
limit = 4
}
jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit)
job.Handle = string(jobdata[0])
funcname := string(jobdata[1])
if job.DataType == gearman.JOB_ASSIGN {
job.Data = jobdata[2]
} else {
job.UniqueId = string(jobdata[2])
job.Data = jobdata[3]
}
f, ok := worker.functions[funcname]
if !ok {
return gearman.ErrFuncNotFound
}
result, err := f(job)
var datatype uint32
if err == nil {
datatype = gearman.WORK_COMPLETE
} else {
if result == nil {
datatype = gearman.WORK_FAIL
} else {
datatype = gearman.WORK_EXCEPTION
}
}
job.magicCode = gearman.REQ
job.DataType = datatype
job.Data = result
worker.WriteJob(job)
return
}

View File

@ -0,0 +1,72 @@
package worker
import "testing"
var worker *Worker
func init() {
worker = NewWorker()
}
func TestWorkerAddServer(t *testing.T) {
t.Log("Add local server 127.0.0.1:4730.")
if err := worker.AddServer("127.0.0.1:4730"); err != nil {
t.Error(err)
}
if l := len(worker.clients); l != 1 {
t.Log(worker.clients)
t.Error("The length of server list should be 1.")
}
}
func foobar(job *WorkerJob) ([]byte, error) {
return nil, nil
}
func TestWorkerAddFunction(t *testing.T) {
if err := worker.AddFunction("foobar", foobar, 0); err != nil {
t.Error(err)
}
if err := worker.AddFunction("timeout", foobar, 5); err != nil {
t.Error(err)
}
if l := len(worker.functions); l != 2 {
t.Log(worker.functions)
t.Errorf("The length of function map should be %d.", 2)
}
}
func TestWorkerEcho(t *testing.T) {
if err := worker.Echo([]byte("Hello World")); err != nil {
t.Error(err)
}
}
/*
func TestWorkerResult(t *testing.T) {
if job := worker.LastResult(); job == nil {
t.Error("Nothing in result.")
} else {
t.Log(job)
}
}
*/
func TestWorkerRemoveFunction(t *testing.T) {
if err := worker.RemoveFunction("foobar"); err != nil {
t.Error(err)
}
}
func TestWorkerReset(t *testing.T) {
if err := worker.Reset(); err != nil {
t.Error(err)
}
}
func TestWorkerClose(t *testing.T) {
if err := worker.Close(); err != nil {
t.Error(err)
}
}

View File

@ -0,0 +1,87 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker
import (
"bitbucket.org/mikespook/gearman-go/gearman"
"strconv"
)
// Worker side job
type WorkerJob struct {
Data []byte
Handle, UniqueId string
agent *jobAgent
magicCode, DataType uint32
}
// Create a new job
func NewWorkerJob(magiccode, datatype uint32, data []byte) (job *WorkerJob) {
return &WorkerJob{magicCode: magiccode,
DataType: datatype,
Data: data}
}
// Decode job from byte slice
func DecodeWorkerJob(data []byte) (job *WorkerJob, err error) {
if len(data) < 12 {
err = gearman.ErrInvalidData
return
}
datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) {
err = gearman.ErrInvalidData
return
}
data = data[12:]
job = NewWorkerJob(gearman.RES, datatype, data)
return
}
// Encode a job to byte slice
func (job *WorkerJob) Encode() (data []byte) {
magiccode := gearman.Uint32ToBytes(job.magicCode)
datatype := gearman.Uint32ToBytes(job.DataType)
data = make([]byte, 0, 1024*64)
data = append(data, magiccode[:]...)
data = append(data, datatype[:]...)
data = append(data, []byte{0, 0, 0, 0}...)
l := len(job.Data)
if job.Handle != "" {
data = append(data, []byte(job.Handle)...)
data = append(data, 0)
l += len(job.Handle) + 1
}
data = append(data, job.Data...)
datalength := gearman.Uint32ToBytes(uint32(l))
copy(data[8:12], datalength[:])
return
}
// Send some datas to client.
// Using this in a job's executing.
func (job *WorkerJob) UpdateData(data []byte, iswaring bool) (err error) {
result := append([]byte(job.Handle), 0)
result = append(result, data...)
var datatype uint32
if iswaring {
datatype = gearman.WORK_WARNING
} else {
datatype = gearman.WORK_DATA
}
return job.agent.WriteJob(NewWorkerJob(gearman.REQ, datatype, result))
}
// Update status.
// Tall client how many percent job has been executed.
func (job *WorkerJob) UpdateStatus(numerator, denominator int) (err error) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
result := append([]byte(job.Handle), 0)
result = append(result, n...)
result = append(result, d...)
return job.agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.WORK_STATUS, result))
}

View File

@ -1,229 +0,0 @@
package worker
import (
"bufio"
"bytes"
"encoding/binary"
"io"
"net"
"sync"
)
// The agent of job server.
type agent struct {
sync.Mutex
conn net.Conn
rw *bufio.ReadWriter
worker *Worker
in chan []byte
net, addr string
}
// Create the agent of job server.
func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
a = &agent{
net: net,
addr: addr,
worker: worker,
in: make(chan []byte, queueSize),
}
return
}
func (a *agent) Connect() (err error) {
a.Lock()
defer a.Unlock()
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
return
}
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
go a.work()
return
}
func (a *agent) work() {
defer func() {
if err := recover(); err != nil {
a.worker.err(err.(error))
}
}()
var inpack *inPack
var l int
var err error
var data, leftdata []byte
for {
if data, err = a.read(); err != nil {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Temporary() {
continue
} else {
a.disconnect_error(err)
// else - we're probably dc'ing due to a Close()
break
}
} else if err == io.EOF {
a.disconnect_error(err)
break
}
a.worker.err(err)
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the agent should close the conection
// and reconnect to job server.
a.Close()
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
a.worker.err(err)
break
}
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
}
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
}
if len(data) < minPacketLength { // not enough data
leftdata = data
continue
}
for {
if inpack, l, err = decodeInPack(data); err != nil {
a.worker.err(err)
leftdata = data
break
} else {
leftdata = nil
inpack.a = a
select {
case <-a.worker.closed:
return
default:
}
a.worker.in <- inpack
if len(data) == l {
break
}
if len(data) > l {
data = data[l:]
}
}
}
}
}
func (a *agent) disconnect_error(err error) {
a.Lock()
defer a.Unlock()
if a.conn != nil {
err = &WorkerDisconnectError{
err: err,
agent: a,
}
a.worker.err(err)
}
}
func (a *agent) Close() {
a.Lock()
defer a.Unlock()
if a.conn != nil {
a.conn.Close()
a.conn = nil
}
}
func (a *agent) Grab() {
a.Lock()
defer a.Unlock()
a.grab()
}
func (a *agent) grab() bool {
if a.worker.closed != nil {
return false
}
outpack := getOutPack()
outpack.dataType = dtGrabJobUniq
a.write(outpack)
return true
}
func (a *agent) PreSleep() {
a.Lock()
defer a.Unlock()
outpack := getOutPack()
outpack.dataType = dtPreSleep
a.write(outpack)
}
func (a *agent) reconnect() error {
a.Lock()
defer a.Unlock()
conn, err := net.Dial(a.net, a.addr)
if err != nil {
return err
}
a.conn = conn
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
a.worker.reRegisterFuncsForAgent(a)
if a.grab() {
go a.work()
}
return nil
}
// read length bytes from the socket
func (a *agent) read() (data []byte, err error) {
n := 0
tmp := getBuffer(bufferSize)
var buf bytes.Buffer
// read the header so we can get the length of the data
if n, err = a.rw.Read(tmp); err != nil {
return
}
dl := int(binary.BigEndian.Uint32(tmp[8:12]))
// write what we read so far
buf.Write(tmp[:n])
// read until we receive all the data
for buf.Len() < dl+minPacketLength {
if n, err = a.rw.Read(tmp); err != nil {
return buf.Bytes(), err
}
buf.Write(tmp[:n])
}
return buf.Bytes(), err
}
// Internal write the encoded job.
func (a *agent) write(outpack *outPack) (err error) {
var n int
buf := outpack.Encode()
for i := 0; i < len(buf); i += n {
n, err = a.rw.Write(buf[i:])
if err != nil {
return err
}
}
return a.rw.Flush()
}
// Write with lock
func (a *agent) Write(outpack *outPack) (err error) {
a.Lock()
defer a.Unlock()
return a.write(outpack)
}

View File

@ -1,51 +0,0 @@
package worker
const (
Network = "tcp"
// queue size
queueSize = 8
// read buffer size
bufferSize = 1024
// min packet length
minPacketLength = 12
// \x00REQ
req = 5391697
reqStr = "\x00REQ"
// \x00RES
res = 5391699
resStr = "\x00RES"
// package data type
dtCanDo = 1
dtCantDo = 2
dtResetAbilities = 3
dtPreSleep = 4
dtNoop = 6
dtJobCreated = 8
dtGrabJob = 9
dtNoJob = 10
dtJobAssign = 11
dtWorkStatus = 12
dtWorkComplete = 13
dtWorkFail = 14
dtGetStatus = 15
dtEchoReq = 16
dtEchoRes = 17
dtError = 19
dtStatusRes = 20
dtSetClientId = 22
dtCanDoTimeout = 23
dtAllYours = 24
dtWorkException = 25
dtWorkData = 28
dtWorkWarning = 29
dtGrabJobUniq = 30
dtJobAssignUniq = 31
)
func getBuffer(l int) (buf []byte) {
// TODO add byte buffer pool
buf = make([]byte, l)
return
}

View File

@ -1,28 +0,0 @@
package worker
import (
"bytes"
"errors"
"fmt"
)
var (
ErrNoneAgents = errors.New("None active agents")
ErrNoneFuncs = errors.New("None functions")
ErrTimeOut = errors.New("Executing time out")
ErrUnknown = errors.New("Unknown error")
)
// Extract the error message
func getError(data []byte) (err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %v", data)
return
}
err = fmt.Errorf("%s: %s", rel[0], rel[1])
return
}
// An error handler
type ErrorHandler func(error)

View File

@ -1,56 +0,0 @@
package worker_test
import (
"fmt"
"github.com/mikespook/gearman-go/worker"
"sync"
)
func ExampleWorker() {
// An example of worker
w := worker.New(worker.Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(worker.Network, "127.0.0.1:4730"); err != nil {
fmt.Println(err)
return
}
// A function for handling jobs
foobar := func(job worker.Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job worker.Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
}

View File

@ -1,45 +0,0 @@
package worker
import (
"encoding/json"
"runtime"
)
// Job handler
type JobHandler func(Job) error
type JobFunc func(Job) ([]byte, error)
// The definition of the callback function.
type jobFunc struct {
f JobFunc
timeout uint32
}
// Map for added function.
type jobFuncs map[string]*jobFunc
type systemInfo struct {
GOOS, GOARCH, GOROOT, Version string
NumCPU, NumGoroutine int
NumCgoCall int64
}
func SysInfo(job Job) ([]byte, error) {
return json.Marshal(&systemInfo{
GOOS: runtime.GOOS,
GOARCH: runtime.GOARCH,
GOROOT: runtime.GOROOT(),
Version: runtime.Version(),
NumCPU: runtime.NumCPU(),
NumGoroutine: runtime.NumGoroutine(),
NumCgoCall: runtime.NumCgoCall(),
})
}
var memState runtime.MemStats
func MemInfo(job Job) ([]byte, error) {
runtime.ReadMemStats(&memState)
return json.Marshal(&memState)
}

View File

@ -1,126 +0,0 @@
package worker
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
)
// Worker side job
type inPack struct {
dataType uint32
data []byte
handle, uniqueId, fn string
a *agent
}
// Create a new job
func getInPack() *inPack {
return &inPack{}
}
func (inpack *inPack) Data() []byte {
return inpack.data
}
func (inpack *inPack) Fn() string {
return inpack.fn
}
func (inpack *inPack) Handle() string {
return inpack.handle
}
func (inpack *inPack) UniqueId() string {
return inpack.uniqueId
}
func (inpack *inPack) Err() error {
if inpack.dataType == dtError {
return getError(inpack.data)
}
return nil
}
// Send some datas to client.
// Using this in a job's executing.
func (inpack *inPack) SendData(data []byte) {
outpack := getOutPack()
outpack.dataType = dtWorkData
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
func (inpack *inPack) SendWarning(data []byte) {
outpack := getOutPack()
outpack.dataType = dtWorkWarning
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
// Update status.
// Tall client how many percent job has been executed.
func (inpack *inPack) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
outpack := getOutPack()
outpack.dataType = dtWorkStatus
hl := len(inpack.handle)
nl := len(n)
dl := len(d)
outpack.data = getBuffer(hl + nl + dl + 2)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], n)
copy(outpack.data[hl+nl+2:], d)
inpack.a.write(outpack)
}
// Decode job from byte slice
func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
if len(data) < minPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %v", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
if len(data) < (dl + minPacketLength) {
err = fmt.Errorf("Not enough data: %v", data)
return
}
dt := data[minPacketLength : dl+minPacketLength]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %v", data)
return
}
inpack = getInPack()
inpack.dataType = binary.BigEndian.Uint32(data[4:8])
switch inpack.dataType {
case dtJobAssign:
s := bytes.SplitN(dt, []byte{'\x00'}, 3)
if len(s) == 3 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.data = s[2]
}
case dtJobAssignUniq:
s := bytes.SplitN(dt, []byte{'\x00'}, 4)
if len(s) == 4 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.uniqueId = string(s[2])
inpack.data = s[3]
}
default:
inpack.data = dt
}
l = dl + minPacketLength
return
}

View File

@ -1,73 +0,0 @@
package worker
import (
"bytes"
"testing"
)
var (
inpackcases = map[uint32]map[string]string{
dtNoop: map[string]string{
"src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00",
},
dtNoJob: map[string]string{
"src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00",
},
dtJobAssign: map[string]string{
"src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz",
"handle": "a",
"fn": "b",
"data": "xyz",
},
dtJobAssignUniq: map[string]string{
"src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz",
"handle": "a",
"fn": "b",
"uid": "c",
"data": "xyz",
},
}
)
func TestInPack(t *testing.T) {
for k, v := range inpackcases {
inpack, _, err := decodeInPack([]byte(v["src"]))
if err != nil {
t.Error(err)
}
if inpack.dataType != k {
t.Errorf("DataType: %d expected, %d got.", k, inpack.dataType)
}
if handle, ok := v["handle"]; ok {
if inpack.handle != handle {
t.Errorf("Handle: %s expected, %s got.", handle, inpack.handle)
}
}
if fn, ok := v["fn"]; ok {
if inpack.fn != fn {
t.Errorf("FuncName: %s expected, %s got.", fn, inpack.fn)
}
}
if uid, ok := v["uid"]; ok {
if inpack.uniqueId != uid {
t.Errorf("UID: %s expected, %s got.", uid, inpack.uniqueId)
}
}
if data, ok := v["data"]; ok {
if bytes.Compare([]byte(data), inpack.data) != 0 {
t.Errorf("UID: %v expected, %v got.", data, inpack.data)
}
}
}
}
func BenchmarkDecode(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, v := range inpackcases {
_, _, err := decodeInPack([]byte(v["src"]))
if err != nil {
b.Error(err)
}
}
}
}

View File

@ -1,12 +0,0 @@
package worker
type Job interface {
Err() error
Data() []byte
Fn() string
SendWarning(data []byte)
SendData(data []byte)
UpdateStatus(numerator, denominator int)
Handle() string
UniqueId() string
}

View File

@ -1,47 +0,0 @@
package worker
import (
"encoding/binary"
)
// Worker side job
type outPack struct {
dataType uint32
data []byte
handle string
}
func getOutPack() (outpack *outPack) {
// TODO pool
return &outPack{}
}
// Encode a job to byte slice
func (outpack *outPack) Encode() (data []byte) {
var l int
if outpack.dataType == dtWorkFail {
l = len(outpack.handle)
} else {
l = len(outpack.data)
if outpack.handle != "" {
l += len(outpack.handle) + 1
}
}
data = getBuffer(l + minPacketLength)
binary.BigEndian.PutUint32(data[:4], req)
binary.BigEndian.PutUint32(data[4:8], outpack.dataType)
binary.BigEndian.PutUint32(data[8:minPacketLength], uint32(l))
i := minPacketLength
if outpack.handle != "" {
hi := len(outpack.handle) + i
copy(data[i:hi], []byte(outpack.handle))
if outpack.dataType != dtWorkFail {
data[hi] = '\x00'
}
i = hi + 1
}
if outpack.dataType != dtWorkFail {
copy(data[i:], outpack.data)
}
return
}

View File

@ -1,99 +0,0 @@
package worker
import (
"bytes"
"testing"
)
var (
outpackcases = map[uint32]map[string]string{
dtCanDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a",
"data": "a",
},
dtCanDoTimeout: map[string]string{
"src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01",
"data": "a\x00\x00\x00\x00\x01",
},
dtCantDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a",
"data": "a",
},
dtResetAbilities: map[string]string{
"src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00",
},
dtPreSleep: map[string]string{
"src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00",
},
dtGrabJob: map[string]string{
"src": "\x00REQ\x00\x00\x00\x09\x00\x00\x00\x00",
},
dtGrabJobUniq: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1E\x00\x00\x00\x00",
},
dtWorkData: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1C\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkWarning: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkStatus: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100",
"data": "a\x0050\x00100",
},
dtWorkComplete: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkFail: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a",
"handle": "a",
},
dtWorkException: map[string]string{
"src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtSetClientId: map[string]string{
"src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a",
"data": "a",
},
dtAllYours: map[string]string{
"src": "\x00REQ\x00\x00\x00\x18\x00\x00\x00\x00",
},
}
)
func TestOutPack(t *testing.T) {
for k, v := range outpackcases {
outpack := getOutPack()
outpack.dataType = k
if handle, ok := v["handle"]; ok {
outpack.handle = handle
}
if data, ok := v["data"]; ok {
outpack.data = []byte(data)
}
data := outpack.Encode()
if bytes.Compare([]byte(v["src"]), data) != 0 {
t.Errorf("%d: %X expected, %X got.", k, v["src"], data)
}
}
}
func BenchmarkEncode(b *testing.B) {
for i := 0; i < b.N; i++ {
for k, v := range outpackcases {
outpack := getOutPack()
outpack.dataType = k
if handle, ok := v["handle"]; ok {
outpack.handle = handle
}
if data, ok := v["data"]; ok {
outpack.data = []byte(data)
}
outpack.Encode()
}
}
}

View File

@ -1,405 +0,0 @@
// The worker package helps developers to develop Gearman's worker
// in an easy way.
package worker
import (
"fmt"
"strconv"
"sync"
"time"
)
const (
Unlimited = iota
OneByOne
)
// Worker is the only structure needed by worker side developing.
// It can connect to multi-server and grab jobs.
type Worker struct {
sync.Mutex
agents []*agent
funcs jobFuncs
in chan *inPack
running bool
ready bool
jobLeftNum int64
Id string
ErrorHandler ErrorHandler
JobHandler JobHandler
limit chan bool
closed chan struct{}
leftJobs chan struct{}
}
// New returns a worker.
//
// If limit is set to Unlimited(=0), the worker will grab all jobs
// and execute them parallelly.
// If limit is greater than zero, the number of paralled executing
// jobs are limited under the number. If limit is assgined to
// OneByOne(=1), there will be only one job executed in a time.
func New(limit int) (worker *Worker) {
worker = &Worker{
agents: make([]*agent, 0, limit),
funcs: make(jobFuncs),
in: make(chan *inPack, queueSize),
}
if limit != Unlimited {
worker.limit = make(chan bool, limit-1)
}
return
}
// inner error handling
func (worker *Worker) err(e error) {
if worker.ErrorHandler != nil {
worker.ErrorHandler(e)
}
}
// AddServer adds a Gearman job server.
//
// addr should be formated as 'host:port'.
func (worker *Worker) AddServer(net, addr string) (err error) {
// Create a new job server's client as a agent of server
a, err := newAgent(net, addr, worker)
if err != nil {
return err
}
worker.agents = append(worker.agents, a)
return
}
// Broadcast an outpack to all Gearman server.
func (worker *Worker) broadcast(outpack *outPack) {
for _, v := range worker.agents {
v.Write(outpack)
}
}
// AddFunc adds a function.
// Set timeout as Unlimited(=0) to disable executing timeout.
func (worker *Worker) AddFunc(funcname string,
f JobFunc, timeout uint32) (err error) {
worker.Lock()
defer worker.Unlock()
if _, ok := worker.funcs[funcname]; ok {
return fmt.Errorf("The function already exists: %s", funcname)
}
worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
if worker.running {
worker.addFunc(funcname, timeout)
}
return
}
// inner add
func (worker *Worker) addFunc(funcname string, timeout uint32) {
outpack := prepFuncOutpack(funcname, timeout)
worker.broadcast(outpack)
}
func prepFuncOutpack(funcname string, timeout uint32) *outPack {
outpack := getOutPack()
if timeout == 0 {
outpack.dataType = dtCanDo
outpack.data = []byte(funcname)
} else {
outpack.dataType = dtCanDoTimeout
l := len(funcname)
timeoutString := strconv.FormatUint(uint64(timeout), 10)
outpack.data = getBuffer(l + len(timeoutString) + 1)
copy(outpack.data, []byte(funcname))
outpack.data[l] = '\x00'
copy(outpack.data[l+1:], []byte(timeoutString))
}
return outpack
}
// RemoveFunc removes a function.
func (worker *Worker) RemoveFunc(funcname string) (err error) {
worker.Lock()
defer worker.Unlock()
if _, ok := worker.funcs[funcname]; !ok {
return fmt.Errorf("The function does not exist: %s", funcname)
}
delete(worker.funcs, funcname)
if worker.running {
worker.removeFunc(funcname)
}
return
}
// inner remove
func (worker *Worker) removeFunc(funcname string) {
outpack := getOutPack()
outpack.dataType = dtCantDo
outpack.data = []byte(funcname)
worker.broadcast(outpack)
}
// inner package handling
func (worker *Worker) handleInPack(inpack *inPack) {
switch inpack.dataType {
case dtNoJob:
inpack.a.PreSleep()
case dtNoop:
inpack.a.Grab()
case dtJobAssign, dtJobAssignUniq:
go func() {
go func() {
worker.incrExecJobNum()
defer func() {
worker.decrExecJobNum()
}()
if err := worker.exec(inpack); err != nil {
worker.err(err)
}
}()
if worker.limit != nil {
worker.limit <- true
}
inpack.a.Grab()
}()
case dtError:
worker.err(inpack.Err())
fallthrough
case dtEchoRes:
fallthrough
default:
worker.customeHandler(inpack)
}
}
// Connect to Gearman server and tell every server
// what can this worker do.
func (worker *Worker) Ready() (err error) {
if len(worker.agents) == 0 {
return ErrNoneAgents
}
if len(worker.funcs) == 0 {
return ErrNoneFuncs
}
for _, a := range worker.agents {
if err = a.Connect(); err != nil {
return
}
}
for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout)
}
worker.ready = true
return
}
// Work start main loop (blocking)
// Most of time, this should be evaluated in goroutine.
func (worker *Worker) Work() {
if !worker.ready {
// didn't run Ready beforehand, so we'll have to do it:
err := worker.Ready()
if err != nil {
panic(err)
}
}
worker.Lock()
worker.running = true
worker.Unlock()
for _, a := range worker.agents {
a.Grab()
}
// 执行任务(阻塞)
var inpack *inPack
for inpack = range worker.in {
worker.handleInPack(inpack)
}
// 关闭Worker进程后 等待任务完成后退出
worker.Lock()
leftJobNum := int(worker.jobLeftNum)
worker.Unlock()
if worker.leftJobs != nil {
for i := 0; i < leftJobNum; i++ {
<-worker.leftJobs
}
}
worker.Reset()
worker.close()
}
// custome handling warper
func (worker *Worker) customeHandler(inpack *inPack) {
if worker.JobHandler != nil {
if err := worker.JobHandler(inpack); err != nil {
worker.err(err)
}
}
}
// Close connection and exit main loop
func (worker *Worker) Close() {
worker.Lock()
defer worker.Unlock()
if worker.running == true && worker.closed == nil {
worker.closed = make(chan struct{}, 1)
worker.closed <- struct{}{}
worker.running = false
close(worker.in)
// 创建关闭后执行中的任务列表
if worker.jobLeftNum != 0 {
worker.leftJobs = make(chan struct{}, worker.jobLeftNum+int64(len(worker.in)))
}
}
}
func (worker *Worker) close() {
for _, a := range worker.agents {
a.Close()
}
}
// Echo
func (worker *Worker) Echo(data []byte) {
outpack := getOutPack()
outpack.dataType = dtEchoReq
outpack.data = data
worker.broadcast(outpack)
}
// Reset removes all of functions.
// Both from the worker and job servers.
func (worker *Worker) Reset() {
outpack := getOutPack()
outpack.dataType = dtResetAbilities
worker.broadcast(outpack)
worker.funcs = make(jobFuncs)
}
// Set the worker's unique id.
func (worker *Worker) SetId(id string) {
worker.Id = id
outpack := getOutPack()
outpack.dataType = dtSetClientId
outpack.data = []byte(id)
worker.broadcast(outpack)
}
func (worker *Worker) incrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum++
return worker.jobLeftNum
}
func (worker *Worker) decrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum--
if worker.jobLeftNum < 0 {
worker.jobLeftNum = 0
}
return worker.jobLeftNum
}
// inner job executing
func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() {
if worker.limit != nil {
<-worker.limit
}
if r := recover(); r != nil {
if e, ok := r.(error); ok {
err = e
} else {
err = ErrUnknown
}
}
}()
f, ok := worker.funcs[inpack.fn]
if !ok {
return fmt.Errorf("The function does not exist: %s", inpack.fn)
}
var r *result
if f.timeout == 0 {
d, e := f.f(inpack)
r = &result{data: d, err: e}
} else {
r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
}
//if worker.running {
outpack := getOutPack()
if r.err == nil {
outpack.dataType = dtWorkComplete
} else {
if len(r.data) == 0 {
outpack.dataType = dtWorkFail
} else {
outpack.dataType = dtWorkException
}
err = r.err
}
outpack.handle = inpack.handle
outpack.data = r.data
_ = inpack.a.Write(outpack)
if worker.leftJobs != nil {
worker.leftJobs <- struct{}{}
}
//}
return
}
func (worker *Worker) reRegisterFuncsForAgent(a *agent) {
worker.Lock()
defer worker.Unlock()
for funcname, f := range worker.funcs {
outpack := prepFuncOutpack(funcname, f.timeout)
a.write(outpack)
}
}
// inner result
type result struct {
data []byte
err error
}
// executing timer
func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
rslt := make(chan *result)
defer close(rslt)
go func() {
defer func() { recover() }()
d, e := f(job)
rslt <- &result{data: d, err: e}
}()
select {
case r = <-rslt:
case <-time.After(timeout):
return &result{err: ErrTimeOut}
}
return r
}
// Error type passed when a worker connection disconnects
type WorkerDisconnectError struct {
err error
agent *agent
}
func (e *WorkerDisconnectError) Error() string {
return e.err.Error()
}
// Responds to the error by asking the worker to reconnect
func (e *WorkerDisconnectError) Reconnect() (err error) {
return e.agent.reconnect()
}
// Which server was this for?
func (e *WorkerDisconnectError) Server() (net string, addr string) {
return e.agent.net, e.agent.addr
}

View File

@ -1,59 +0,0 @@
package worker
import (
"fmt"
"sync"
"testing"
)
func TestWorkerRace(t *testing.T) {
// from example worker
// An example of worker
w := New(Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
// A function for handling jobs
foobar := func(job Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
// tear down
w.Close()
}

View File

@ -1,269 +0,0 @@
package worker
import (
"bytes"
"flag"
"os"
"sync"
"testing"
"time"
)
var (
worker *Worker
runIntegrationTests bool
)
func init() {
worker = New(Unlimited)
}
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
code := m.Run()
os.Exit(code)
}
func TestWorkerErrNoneAgents(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
err := worker.Ready()
if err != ErrNoneAgents {
t.Error("ErrNoneAgents expected.")
}
}
func TestWorkerAddServer(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add local server 127.0.0.1:4730.")
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
if l := len(worker.agents); l != 1 {
t.Log(worker.agents)
t.Error("The length of server list should be 1.")
}
}
func TestWorkerErrNoneFuncs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
err := worker.Ready()
if err != ErrNoneFuncs {
t.Error("ErrNoneFuncs expected.")
}
}
func foobar(job Job) ([]byte, error) {
return nil, nil
}
func TestWorkerAddFunction(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := worker.AddFunc("foobar", foobar, 0); err != nil {
t.Error(err)
}
if err := worker.AddFunc("timeout", foobar, 5); err != nil {
t.Error(err)
}
if l := len(worker.funcs); l != 2 {
t.Log(worker.funcs)
t.Errorf("The length of function map should be %d.", 2)
}
}
func TestWorkerRemoveFunc(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := worker.RemoveFunc("foobar"); err != nil {
t.Error(err)
}
}
func TestWork(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
var wg sync.WaitGroup
worker.JobHandler = func(job Job) error {
t.Logf("%s", job.Data())
wg.Done()
return nil
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
worker.Echo([]byte("Hello"))
wg.Wait()
}
func TestLargeDataWork(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
worker := New(Unlimited)
defer worker.Close()
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
worker.Ready()
l := 5714
var wg sync.WaitGroup
bigdataHandler := func(job Job) error {
defer wg.Done()
if len(job.Data()) != l {
t.Errorf("expected length %d. got %d.", l, len(job.Data()))
}
return nil
}
if err := worker.AddFunc("bigdata", foobar, 0); err != nil {
defer wg.Done()
t.Error(err)
}
worker.JobHandler = bigdataHandler
worker.ErrorHandler = func(err error) {
t.Fatal("shouldn't have received an error")
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
// var cli *client.Client
// var err error
// if cli, err = client.New(client.Network, "127.0.0.1:4730"); err != nil {
// t.Fatal(err)
// }
// cli.ErrorHandler = func(e error) {
// t.Error(e)
// }
// _, err = cli.Do("bigdata", bytes.Repeat([]byte("a"), l), client.JobLow, func(res *client.Response) {
// })
// if err != nil {
// t.Error(err)
// }
worker.Echo(bytes.Repeat([]byte("a"), l))
wg.Wait()
}
func TestWorkerClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
worker.Close()
}
func TestWorkWithoutReady(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
other_worker := New(Unlimited)
if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
if err := other_worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil {
t.Error(err)
}
timeout := make(chan bool, 1)
done := make(chan bool, 1)
other_worker.JobHandler = func(j Job) error {
if !other_worker.ready {
t.Error("Worker not ready as expected")
}
done <- true
return nil
}
go func() {
time.Sleep(5 * time.Second)
timeout <- true
}()
go func() {
other_worker.Work()
}()
// With the all-in-one Work() we don't know if the
// worker is ready at this stage so we may have to wait a sec:
go func() {
tries := 5
for tries > 0 {
if other_worker.ready {
other_worker.Echo([]byte("Hello"))
break
}
// still waiting for it to be ready..
time.Sleep(250 * time.Millisecond)
tries--
}
}()
// determine if we've finished or timed out:
select {
case <-timeout:
t.Error("Test timed out waiting for the worker")
case <-done:
}
}
func TestWorkWithoutReadyWithPanic(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
other_worker := New(Unlimited)
timeout := make(chan bool, 1)
done := make(chan bool, 1)
// Going to work with no worker setup.
// when Work (hopefully) calls Ready it will get an error which should cause it to panic()
go func() {
defer func() {
if err := recover(); err != nil {
done <- true
return
}
t.Error("Work should raise a panic.")
done <- true
}()
other_worker.Work()
}()
go func() {
time.Sleep(2 * time.Second)
timeout <- true
}()
select {
case <-timeout:
t.Error("Test timed out waiting for the worker")
case <-done:
}
}