Compare commits

..

No commits in common. "master" and "0.1-testing" have entirely different histories.

47 changed files with 2265 additions and 2882 deletions

30
.hgtags Normal file
View File

@ -0,0 +1,30 @@
b68aee2a48811a1bee5994b56437c393c6fb2f5b 2011-05-24
0dc8bc71d7e895caf5803c1905bf07a823462fba native.start
dee83cac69e07ed4f3efde162d981f5855101845 0.0.1
dee83cac69e07ed4f3efde162d981f5855101845 go1-0.0.1
dee83cac69e07ed4f3efde162d981f5855101845 0.0.1
0000000000000000000000000000000000000000 0.0.1
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
b68aee2a48811a1bee5994b56437c393c6fb2f5b 2011-05-24
0000000000000000000000000000000000000000 2011-05-24
0dc8bc71d7e895caf5803c1905bf07a823462fba native.start
0000000000000000000000000000000000000000 native.start
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
a3b64f831c51ae215ce3b2c354483a4746e20555 go1-0.1
7928d7ed58bc0e36abebb5602b4f8880551054a7 go1-0.1
0000000000000000000000000000000000000000 0.0.1
dee83cac69e07ed4f3efde162d981f5855101845 0.0.1
7928d7ed58bc0e36abebb5602b4f8880551054a7 0.1
7928d7ed58bc0e36abebb5602b4f8880551054a7 go1-0.1
0000000000000000000000000000000000000000 go1-0.1
dee83cac69e07ed4f3efde162d981f5855101845 go1-0.0.1
0000000000000000000000000000000000000000 go1-0.0.1
4e3bf88517539cf6a0342e57b86df6e17ceb228c 0.1.1
4e3bf88517539cf6a0342e57b86df6e17ceb228c 0.1.1
0000000000000000000000000000000000000000 0.1.1
0000000000000000000000000000000000000000 0.1.1
eea0878b43d209630d7b342b1b99c61c839b454f 0.1.1
67f11fa2301f5e74a436883f66ce7fb121a4df82 0.1.2

View File

@ -1,7 +1,3 @@
language: go language: go
go:
- 1.4
before_install: before_install:
- sudo apt-get remove -y gearman-job-server - sudo apt-get install -qq gearman-job-server
- sudo apt-get install -y gearman-job-server

120
README.md
View File

@ -1,21 +1,10 @@
Gearman-Go Gearman-Go
========== ==========
This module is a [Gearman](http://gearman.org/) API for the [Go Programming Language](http://golang.org).
The protocols were written in pure Go. It contains two sub-packages:
The client package is used for sending jobs to the Gearman job server,
and getting responses from the server.
"github.com/mikespook/gearman-go/client"
The worker package will help developers in developing Gearman worker
service easily.
"github.com/mikespook/gearman-go/worker"
[![Build Status](https://travis-ci.org/mikespook/gearman-go.png?branch=master)](https://travis-ci.org/mikespook/gearman-go) [![Build Status](https://travis-ci.org/mikespook/gearman-go.png?branch=master)](https://travis-ci.org/mikespook/gearman-go)
[![GoDoc](https://godoc.org/github.com/mikespook/gearman-go?status.png)](https://godoc.org/github.com/mikespook/gearman-go)
This package is a [Gearman](http://gearman.org/) API for [Golang](http://golang.org).
It was implemented a native protocol for both worker and client API.
Install Install
======= =======
@ -28,95 +17,52 @@ Install the worker package:
> $ go get github.com/mikespook/gearman-go/worker > $ go get github.com/mikespook/gearman-go/worker
Both of them: Install both:
> $ go get github.com/mikespook/gearman-go > $ go get github.com/mikespook/gearman-go
Usage Usage
===== =====
## Worker ## Worker
```go w := worker.New(worker.Unlimited)
// Limit number of concurrent jobs execution. w.ErrHandler = func(e error) {
// Use worker.Unlimited (0) if you want no limitation.
w := worker.New(worker.OneByOne)
w.ErrHandler = func(e error) {
log.Println(e) log.Println(e)
} }
w.AddServer("127.0.0.1:4730") w.AddServer("127.0.0.1:4730")
// Use worker.Unlimited (0) if you want no timeout w.AddFunc("ToUpper", ToUpper, worker.Immediately)
w.AddFunc("ToUpper", ToUpper, worker.Unlimited) w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
// This will give a timeout of 5 seconds w.Work()
w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work()
```
## Client ## Client
```go c, err := client.New("127.0.0.1:4730")
// ... // ...
c, err := client.New("tcp4", "127.0.0.1:4730") defer c.Close()
// ... error handling data := []byte("Hello\x00 world")
defer c.Close() c.ErrHandler = func(e error) {
c.ErrorHandler = func(e error) {
log.Println(e) log.Println(e)
} panic(e)
echo := []byte("Hello\x00 world") }
echomsg, err := c.Echo(echo) jobHandler := func(job *client.Job) {
// ... error handling log.Printf("%s", job.Data)
log.Println(string(echomsg)) }
jobHandler := func(resp *client.Response) { handle := c.Do("ToUpper", data, client.JOB_NORMAL, jobHandler)
log.Printf("%s", resp.Data) // ...
}
handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler)
// ...
```
Branches Authors
======== =======
Version 0.x means: _It is far far away from stable._ * Xing Xing <mikespook@gmail.com> [Blog](http://mikespook.com) [@Twitter](http://twitter.com/mikespook)
__Use at your own risk!__
* master current usable version
* 0.2-dev Refactoring a lot of things
* 0.1-testing Old API and some known issues, eg. [issue-14](https://github.com/mikespook/gearman-go/issues/14)
Contributors
============
Great thanks to all of you for your support and interest!
(_Alphabetic order_)
* [Alex Zylman](https://github.com/azylman)
* [C.R. Kirkwood-Watts](https://github.com/kirkwood)
* [Damian Gryski](https://github.com/dgryski)
* [Gabriel Cristian Alecu](https://github.com/AzuraMeta)
* [Graham Barr](https://github.com/gbarr)
* [Ingo Oeser](https://github.com/nightlyone)
* [jake](https://github.com/jbaikge)
* [Joe Higton](https://github.com/draxil)
* [Jonathan Wills](https://github.com/runningwild)
* [Kevin Darlington](https://github.com/kdar)
* [miraclesu](https://github.com/miraclesu)
* [Paul Mach](https://github.com/paulmach)
* [Randall McPherson](https://github.com/rlmcpherson)
* [Sam Grimee](https://github.com/sgrimee)
Maintainer
==========
* [Xing Xing](http://mikespook.com) &lt;<mikespook@gmail.com>&gt; [@Twitter](http://twitter.com/mikespook)
Open Source - MIT Software License Open Source - MIT Software License
================================== ==================================
Copyright (c) 2012 Xing Xing
See LICENSE. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -1,360 +1,380 @@
// The client package helps developers connect to Gearmand, send // Copyright 2011 Xing Xing <mikespook@gmail.com>.
// jobs and fetch result. // All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client package client
import ( import (
"bufio" "io"
"net" "net"
"sync" "sync"
"time" "time"
"bytes"
"strconv"
"github.com/mikespook/gearman-go/common"
) )
var ( var (
DefaultTimeout time.Duration = time.Second IdGen IdGenerator
) )
// One client connect to one server. func init() {
// Use Pool for multi-connections. IdGen = NewObjectId()
}
// Status handler
// handle, known, running, numerator, denominator
type StatusHandler func(string, bool, bool, uint64, uint64)
/*
The client side api for gearman
usage:
c := client.New("tcp4", "127.0.0.1:4730")
handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
*/
type Client struct { type Client struct {
sync.Mutex ErrHandler common.ErrorHandler
TimeOut time.Duration
net, addr string in chan []byte
innerHandler *responseHandlerMap out chan *Job
in chan *Response
created chan string
echo chan []byte
status chan *Status
jobhandlers map[string]JobHandler
isConn bool
conn net.Conn conn net.Conn
rw *bufio.ReadWriter addr string
mutex sync.RWMutex
ResponseTimeout time.Duration // response timeout for do()
ErrorHandler ErrorHandler
} }
type responseHandlerMap struct { // Create a new client.
sync.Mutex // Connect to "addr" through "network"
holder map[string]handledResponse // Eg.
} // client, err := client.New("127.0.0.1:4730")
func New(addr string) (client *Client, err error) {
type handledResponse struct {
internal ResponseHandler // internal handler, always non-nil
external ResponseHandler // handler passed in from (*Client).Do, sometimes nil
}
func newResponseHandlerMap() *responseHandlerMap {
return &responseHandlerMap{holder: make(map[string]handledResponse, queueSize)}
}
func (r *responseHandlerMap) remove(key string) {
r.Lock()
delete(r.holder, key)
r.Unlock()
}
func (r *responseHandlerMap) getAndRemove(key string) (handledResponse, bool) {
r.Lock()
rh, b := r.holder[key]
delete(r.holder, key)
r.Unlock()
return rh, b
}
func (r *responseHandlerMap) putWithExternalHandler(key string, internal, external ResponseHandler) {
r.Lock()
r.holder[key] = handledResponse{internal: internal, external: external}
r.Unlock()
}
func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
r.putWithExternalHandler(key, rh, nil)
}
// New returns a client.
func New(network, addr string) (client *Client, err error) {
client = &Client{ client = &Client{
net: network, created: make(chan string, common.QUEUE_SIZE),
echo: make(chan []byte, common.QUEUE_SIZE),
status: make(chan *Status, common.QUEUE_SIZE),
jobhandlers: make(map[string]JobHandler, common.QUEUE_SIZE),
in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE),
addr: addr, addr: addr,
innerHandler: newResponseHandlerMap(), TimeOut: time.Second,
in: make(chan *Response, queueSize),
ResponseTimeout: DefaultTimeout,
} }
client.conn, err = net.Dial(client.net, client.addr) if err = client.connect(); err != nil {
if err != nil {
return return
} }
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn), client.isConn = true
bufio.NewWriter(client.conn)) go client.inLoop()
go client.readLoop() go client.outLoop()
go client.processLoop()
return return
} }
func (client *Client) write(req *request) (err error) { // {{{ private functions
//
func (client *Client) connect() (err error) {
client.conn, err = net.Dial(common.NETWORK, client.addr)
return
}
// Internal write
func (client *Client) write(buf []byte) (err error) {
var n int var n int
buf := req.Encode()
for i := 0; i < len(buf); i += n { for i := 0; i < len(buf); i += n {
n, err = client.rw.Write(buf[i:]) n, err = client.conn.Write(buf[i:])
if err != nil { if err != nil {
return return
} }
} }
return client.rw.Flush() return
} }
func (client *Client) read(length int) (data []byte, err error) { // read length bytes from the socket
func (client *Client) readData(length int) (data []byte, err error) {
n := 0 n := 0
buf := getBuffer(bufferSize) buf := make([]byte, common.BUFFER_SIZE)
// read until data can be unpacked // read until data can be unpacked
for i := length; i > 0 || len(data) < minPacketLength; i -= n { for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n {
if n, err = client.rw.Read(buf); err != nil { if n, err = client.conn.Read(buf); err != nil {
if !client.isConn {
return nil, common.ErrConnClosed
}
if err == io.EOF && n == 0 {
if data == nil {
err = common.ErrConnection
return
}
return data, nil
}
return return
} }
data = append(data, buf[0:n]...) data = append(data, buf[0:n]...)
if n < bufferSize { if n < common.BUFFER_SIZE {
break break
} }
} }
return return
} }
func (client *Client) readLoop() { // unpack data
defer close(client.in) func (client *Client) unpack(data []byte) ([]byte, int, bool) {
var data, leftdata []byte tl := len(data)
var err error start := 0
var resp *Response for i := 0; i < tl+1-common.PACKET_LEN; i++ {
ReadLoop: if start+common.PACKET_LEN > tl { // too few data to unpack, read more
for client.conn != nil { return nil, common.PACKET_LEN, false
if data, err = client.read(bufferSize); err != nil {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Timeout() {
client.err(err)
} }
if opErr.Temporary() { if string(data[start:start+4]) == common.RES_STR {
continue l := int(common.BytesToUint32([4]byte{data[start+8],
data[start+9], data[start+10], data[start+11]}))
total := l + common.PACKET_LEN
if total == tl { // data is what we want
return data, common.PACKET_LEN, true
} else if total < tl { // data[:total] is what we want, data[total:] is the more
client.in <- data[total:]
data = data[start:total]
return data, common.PACKET_LEN, true
} else { // ops! It won't be possible.
return nil, total - tl, false
} }
break } else { // flag was not found, move to next step
start++
} }
client.err(err)
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the client should close the conection
// and reconnect to job server.
client.Close()
client.conn, err = net.Dial(client.net, client.addr)
if err != nil {
client.err(err)
break
} }
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn), return nil, common.PACKET_LEN, false
bufio.NewWriter(client.conn)) }
continue
// Internal read
func (client *Client) read() (rel []byte, err error) {
var data []byte
ok := false
l := common.PACKET_LEN
for !ok {
inlen := len(client.in)
if inlen > 0 {
// in queue is not empty
for i := 0; i < inlen; i++ {
data = append(data, <-client.in...)
} }
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
leftdata = nil
}
for {
l := len(data)
if l < minPacketLength { // not enough data
leftdata = data
continue ReadLoop
}
if resp, l, err = decodeResponse(data); err != nil {
leftdata = data[l:]
continue ReadLoop
} else { } else {
client.in <- resp var d []byte
d, err = client.readData(l)
if err != nil {
return
} }
data = data[l:] data = append(data, d...)
if len(data) > 0 { }
rel, l, ok = client.unpack(data)
}
return
}
// out loop
func (client *Client) outLoop() {
for job := range client.out {
if err := client.write(job.Encode()); err != nil {
client.err(err)
}
}
}
// in loop
func (client *Client) inLoop() {
defer common.DisablePanic()
for {
rel, err := client.read()
if err != nil {
if err == common.ErrConnection {
client.Close()
}
if err != common.ErrConnClosed {
client.err(err)
}
break
}
job, err := decodeJob(rel)
if err != nil {
client.err(err)
continue continue
//break
} }
switch job.DataType {
case common.ERROR:
_, err := common.GetError(job.Data)
client.err(err)
case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
client.handleJob(job)
case common.ECHO_RES:
client.handleEcho(job)
case common.JOB_CREATED:
client.handleCreated(job)
case common.STATUS_RES:
client.handleStatus(job)
default:
break break
} }
} }
} }
func (client *Client) processLoop() { // error handler
rhandlers := map[string]ResponseHandler{} func (client *Client) err (e error) {
for resp := range client.in { if client.ErrHandler != nil {
switch resp.DataType { client.ErrHandler(e)
case dtError:
client.err(getError(resp.Data))
case dtStatusRes:
client.handleInner("s"+resp.Handle, resp, nil)
case dtJobCreated:
client.handleInner("c", resp, rhandlers)
case dtEchoRes:
client.handleInner("e", resp, nil)
case dtWorkData, dtWorkWarning, dtWorkStatus:
if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
}
case dtWorkComplete, dtWorkFail, dtWorkException:
if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
delete(rhandlers, resp.Handle)
}
}
} }
} }
func (client *Client) err(e error) { // job handler
if client.ErrorHandler != nil { func (client *Client) handleJob(job *Job) {
client.ErrorHandler(e) client.mutex.RLock()
defer client.mutex.RUnlock()
if h, ok := client.jobhandlers[job.Handle]; ok {
h(job)
delete(client.jobhandlers, job.Handle)
} }
} }
func (client *Client) handleInner(key string, resp *Response, rhandlers map[string]ResponseHandler) { func (client *Client) handleEcho(job *Job) {
if h, ok := client.innerHandler.getAndRemove(key); ok { client.echo <- job.Data
if h.external != nil && resp.Handle != "" {
rhandlers[resp.Handle] = h.external
}
h.internal(resp)
}
} }
type handleOrError struct { func (client *Client) handleCreated(job *Job) {
handle string client.created <- string(job.Data)
err error
} }
func (client *Client) do(funcname string, data []byte, // status handler
flag uint32, h ResponseHandler, id string) (handle string, err error) { func (client *Client) handleStatus(job *Job) {
if len(id) == 0 { data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
return "", ErrInvalidId if len(data) != 5 {
} client.err(common.Errorf("Invalid data: %V", job.Data))
if client.conn == nil {
return "", ErrLostConn
}
var result = make(chan handleOrError, 1)
client.Lock()
defer client.Unlock()
client.innerHandler.putWithExternalHandler("c", func(resp *Response) {
if resp.DataType == dtError {
err = getError(resp.Data)
result <- handleOrError{"", err}
return return
} }
handle = resp.Handle status := &Status{}
result <- handleOrError{handle, nil} status.Handle = string(data[0])
}, h) status.Known = (data[1][0] == '1')
req := getJob(id, []byte(funcname), data) status.Running = (data[2][0] == '1')
req.DataType = flag
if err = client.write(req); err != nil {
client.innerHandler.remove("c")
return
}
var timer = time.After(client.ResponseTimeout)
select {
case ret := <-result:
return ret.handle, ret.err
case <-timer:
client.innerHandler.remove("c")
return "", ErrLostConn
}
return
}
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (handle string, err error) {
handle, err = client.DoWithId(funcname, data, flag, h, IdGen.Id())
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) {
handle, err = client.DoBgWithId(funcname, data, flag, IdGen.Id())
return
}
// Status gets job status from job server.
func (client *Client) Status(handle string) (status *Status, err error) {
if client.conn == nil {
return nil, ErrLostConn
}
var mutex sync.Mutex
mutex.Lock()
client.innerHandler.put("s"+handle, func(resp *Response) {
defer mutex.Unlock()
var err error var err error
status, err = resp._status() status.Numerator, err = strconv.ParseUint(string(data[3]), 10, 0)
if err != nil { if err != nil {
client.err(err) client.err(common.Errorf("Invalid Integer: %s", data[3]))
return
} }
}) status.Denominator, err = strconv.ParseUint(string(data[4]), 10, 0)
req := getRequest() if err != nil {
req.DataType = dtGetStatus client.err(common.Errorf("Invalid Integer: %s", data[4]))
req.Data = []byte(handle) return
client.write(req) }
mutex.Lock() client.status <- status
}
// Send the job to job server.
func (client *Client) writeJob(job *Job) {
client.out <- job
}
// Internal do
func (client *Client) do(funcname string, data []byte,
flag uint32, id string) (handle string) {
l := len(funcname) + len(id) + len(data) + 2
rel := make([]byte, 0, l)
rel = append(rel, []byte(funcname)...) // len(funcname)
rel = append(rel, '\x00') // 1 Byte
rel = append(rel, []byte(id)...) // len(uid)
rel = append(rel, '\x00') // 1 Byte
rel = append(rel, data...) // len(data)
client.writeJob(newJob(common.REQ, flag, rel))
// Waiting for JOB_CREATED
handle = <-client.created
return return
} }
// Echo. // }}}
func (client *Client) Echo(data []byte) (echo []byte, err error) {
if client.conn == nil { // Do the function.
return nil, ErrLostConn // funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
// and if it is background job: JOB_BG.
// JOB_LOW | JOB_BG means the job is running with low level in background.
func (client *Client) Do(funcname string, data []byte,
flag byte, jobhandler JobHandler) (handle string) {
var datatype uint32
switch flag {
case JOB_LOW :
datatype = common.SUBMIT_JOB_LOW
case JOB_HIGH :
datatype = common.SUBMIT_JOB_HIGH
default:
datatype = common.SUBMIT_JOB
}
id := IdGen.Id()
client.mutex.Lock()
defer client.mutex.Unlock()
handle = client.do(funcname, data, datatype, id)
if jobhandler != nil {
client.jobhandlers[handle] = jobhandler
} }
var mutex sync.Mutex
mutex.Lock()
client.innerHandler.put("e", func(resp *Response) {
echo = resp.Data
mutex.Unlock()
})
req := getRequest()
req.DataType = dtEchoReq
req.Data = data
client.write(req)
mutex.Lock()
return return
} }
// Close connection func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string) {
var datatype uint32
switch flag {
case JOB_LOW :
datatype = common.SUBMIT_JOB_LOW_BG
case JOB_HIGH :
datatype = common.SUBMIT_JOB_HIGH_BG
default:
datatype = common.SUBMIT_JOB_BG
}
handle = client.do(funcname, data, datatype, "")
return
}
// Get job status from job server.
// !!!Not fully tested.!!!
func (client *Client) Status(handle string, timeout time.Duration) (status *Status, err error) {
client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle)))
select {
case status = <-client.status:
case <-time.After(timeout):
err = common.ErrTimeOut
}
return
}
// Send a something out, get the samething back.
func (client *Client) Echo(data []byte, timeout time.Duration) (r []byte, err error) {
client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
select {
case r = <-client.echo:
case <-time.After(timeout):
err = common.ErrTimeOut
}
return
}
// Close
func (client *Client) Close() (err error) { func (client *Client) Close() (err error) {
client.Lock() client.isConn = false
defer client.Unlock() close(client.in)
if client.conn != nil { close(client.out)
err = client.conn.Close()
client.conn = nil
}
return
}
// Call the function and get a response. close(client.echo)
// flag can be set to: JobLow, JobNormal and JobHigh close(client.created)
func (client *Client) DoWithId(funcname string, data []byte, close(client.status)
flag byte, h ResponseHandler, id string) (handle string, err error) { return client.conn.Close();
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype, h, id)
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBgWithId(funcname string, data []byte,
flag byte, id string) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype, nil, id)
return
} }

View File

@ -1,121 +1,45 @@
package client package client
import ( import (
"crypto/md5"
"encoding/hex"
"errors"
"flag"
"fmt"
"os"
"testing"
"time" "time"
"testing"
) )
const ( var client *Client
TestStr = "Hello world"
)
var (
client *Client
runIntegrationTests bool
)
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
flag.Parse()
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
code := m.Run()
os.Exit(code)
}
func TestClientAddServer(t *testing.T) { func TestClientAddServer(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add local server 127.0.0.1:4730") t.Log("Add local server 127.0.0.1:4730")
var err error var err error
if client, err = New(Network, "127.0.0.1:4730"); err != nil { if client, err = New("127.0.0.1:4730"); err != nil {
t.Fatal(err) t.Error(err)
return
} }
client.ErrorHandler = func(e error) { client.ErrHandler = func(e error) {
t.Log(e) t.Log(e)
} }
} }
func TestClientEcho(t *testing.T) { func TestClientEcho(t *testing.T) {
if !runIntegrationTests { echo, err := client.Echo([]byte("Hello world"), time.Second)
t.Skip("To run this test, use: go test -integration")
}
echo, err := client.Echo([]byte(TestStr))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
if string(echo) != TestStr { if string(echo) != "Hello world" {
t.Errorf("Echo error, %s expected, %s got", TestStr, echo) t.Errorf("Invalid echo data: %s", echo)
return return
} }
} }
func TestClientDoBg(t *testing.T) { func TestClientDoBg(t *testing.T) {
if !runIntegrationTests { if handle := client.DoBg("ToUpper", []byte("abcdef"),
t.Skip("To run this test, use: go test -integration") JOB_LOW); handle == "" {
}
handle, err := client.DoBg("ToUpper", []byte("abcdef"), JobLow)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.") t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
id := ""
_, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
} }
} }
func TestClientDo(t *testing.T) { func TestClientDo(t *testing.T) {
if !runIntegrationTests { jobHandler := func(job *Job) {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data) str := string(job.Data)
if str == "ABCDEF" { if str == "ABCDEF" {
t.Log(str) t.Log(str)
@ -124,253 +48,48 @@ func TestClientDo(t *testing.T) {
} }
return return
} }
handle, err := client.Do("ToUpper", []byte("abcdef"), if handle := client.Do("ToUpper", []byte("abcdef"),
JobLow, jobHandler) JOB_LOW, jobHandler); handle == "" {
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.") t.Error("Handle is empty.")
} else { } else {
t.Log(handle) t.Log(handle)
} }
} }
func TestClientDoWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
id := ""
_, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDoWithIdCheckSameHandle(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123,categoryId:1}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 != handle2 {
t.Error("expecting the same handle when using the same id on the same Job name")
}
}
func TestClientDoWithIdCheckDifferentHandleOnDifferentJobs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("DeleteProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 == handle2 {
t.Error("expecting different handles because there are different job names")
}
}
func TestClientMultiDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
// This integration test requires that examples/pl/worker_multi.pl be running.
//
// Test invocation is:
// go test -integration -timeout 10s -run '^TestClient(AddServer|MultiDo)$'
//
// Send 1000 requests to go through all race conditions
const nreqs = 1000
errCh := make(chan error)
gotCh := make(chan string, nreqs)
olderrh := client.ErrorHandler
client.ErrorHandler = func(e error) { errCh <- e }
client.ResponseTimeout = 5 * time.Second
defer func() { client.ErrorHandler = olderrh }()
nextJobCh := make(chan struct{})
defer close(nextJobCh)
go func() {
for range nextJobCh {
start := time.Now()
handle, err := client.Do("PerlToUpper", []byte("abcdef"), JobNormal, func(r *Response) { gotCh <- string(r.Data) })
if err == ErrLostConn && time.Since(start) > client.ResponseTimeout {
errCh <- errors.New("Impossible 'lost conn', deadlock bug detected")
} else if err != nil {
errCh <- err
}
if handle == "" {
errCh <- errors.New("Handle is empty.")
}
}
}()
for i := 0; i < nreqs; i++ {
select {
case err := <-errCh:
t.Fatal(err)
case nextJobCh <- struct{}{}:
}
}
remaining := nreqs
for remaining > 0 {
select {
case err := <-errCh:
t.Fatal(err)
case got := <-gotCh:
if got != "ABCDEF" {
t.Error("Unexpected response from PerlDoUpper: ", got)
}
remaining--
t.Logf("%d response remaining", remaining)
}
}
}
func TestClientStatus(t *testing.T) { func TestClientStatus(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration") s1, err := client.Status("handle not exists", time.Second)
}
status, err := client.Status("handle not exists")
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
if status.Known { if s1.Known {
t.Errorf("The job (%s) shouldn't be known.", status.Handle) t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
return return
} }
if status.Running { if s1.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle) t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
return return
} }
handle, err := client.Do("Delay5sec", []byte("abcdef"), JobLow, nil) handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil);
s2, err := client.Status(handle, time.Second)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
status, err = client.Status(handle) if !s2.Known {
if err != nil { t.Errorf("The job (%s) should be known.", s2.Handle)
t.Error(err)
return return
} }
if !status.Known { if s2.Running {
t.Errorf("The job (%s) should be known.", status.Handle) t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
return
}
if status.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle)
return return
} }
} }
func TestClientClose(t *testing.T) { func TestClientClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := client.Close(); err != nil { if err := client.Close(); err != nil {
t.Error(err) t.Error(err)
} }

View File

@ -1,75 +0,0 @@
package client
const (
Network = "tcp"
// queue size
queueSize = 8
// read buffer size
bufferSize = 8192
// min packet length
minPacketLength = 12
// \x00REQ
req = 5391697
reqStr = "\x00REQ"
// \x00RES
res = 5391699
resStr = "\x00RES"
// package data type
dtCanDo = 1
dtCantDo = 2
dtResetAbilities = 3
dtPreSleep = 4
dtNoop = 6
dtJobCreated = 8
dtGrabJob = 9
dtNoJob = 10
dtJobAssign = 11
dtWorkStatus = 12
dtWorkComplete = 13
dtWorkFail = 14
dtGetStatus = 15
dtEchoReq = 16
dtEchoRes = 17
dtError = 19
dtStatusRes = 20
dtSetClientId = 22
dtCanDoTimeout = 23
dtAllYours = 24
dtWorkException = 25
dtWorkData = 28
dtWorkWarning = 29
dtGrabJobUniq = 30
dtJobAssignUniq = 31
dtSubmitJob = 7
dtSubmitJobBg = 18
dtSubmitJobHigh = 21
dtSubmitJobHighBg = 32
dtSubmitJobLow = 33
dtSubmitJobLowBg = 34
WorkComplate = dtWorkComplete
WorkComplete = dtWorkComplete
WorkData = dtWorkData
WorkStatus = dtWorkStatus
WorkWarning = dtWorkWarning
WorkFail = dtWorkFail
WorkException = dtWorkException
)
const (
// Job type
JobNormal = iota
// low level
JobLow
// high level
JobHigh
)
func getBuffer(l int) (buf []byte) {
// TODO add byte buffer pool
buf = make([]byte, l)
return
}

View File

@ -1,31 +0,0 @@
package client
import (
"bytes"
"errors"
"fmt"
)
var (
ErrWorkWarning = errors.New("Work warning")
ErrInvalidData = errors.New("Invalid data")
ErrInvalidId = errors.New("Invalid ID")
ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type")
ErrLostConn = errors.New("Lost connection with Gearmand")
)
// Extract the error message
func getError(data []byte) (err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %v", data)
return
}
err = fmt.Errorf("%s: %s", rel[0], rel[1])
return
}
// Error handler
type ErrorHandler func(error)

4
client/handler.go Normal file
View File

@ -0,0 +1,4 @@
package client
// Job handler
type JobHandler func(*Job)

View File

@ -2,41 +2,34 @@ package client
import ( import (
"strconv" "strconv"
"sync/atomic" "labix.org/v2/mgo/bson"
"time" "github.com/mikespook/golib/autoinc"
) )
var (
// Global ID generator
// Default is an autoincrement ID generator
IdGen IdGenerator
)
func init() {
IdGen = NewAutoIncId()
}
// ID generator interface. Users can implament this for
// their own generator.
type IdGenerator interface { type IdGenerator interface {
Id() string Id() string
} }
// ObjectId
type objectId struct {}
func (id *objectId) Id() string {
return bson.NewObjectId().Hex()
}
func NewObjectId() IdGenerator {
return &objectId{}
}
// AutoIncId // AutoIncId
type autoincId struct { type autoincId struct {
value int64 *autoinc.AutoInc
} }
func (ai *autoincId) Id() string { func (id *autoincId) Id() string {
next := atomic.AddInt64(&ai.value, 1) return strconv.Itoa(id.AutoInc.Id())
return strconv.FormatInt(next, 10)
} }
// NewAutoIncId returns an autoincrement ID generator
func NewAutoIncId() IdGenerator { func NewAutoIncId() IdGenerator {
// we'll consider the nano fraction of a second at startup unique return &autoincId{autoinc.New(1, 1)}
// and count up from there.
return &autoincId{
value: int64(time.Now().Nanosecond()) << 32,
}
} }

View File

@ -1,18 +0,0 @@
package client
import (
"testing"
)
func TestAutoInc(t *testing.T) {
ai := NewAutoIncId()
previous := ai.Id()
for i := 0; i < 10; i++ {
id := ai.Id()
if id == previous {
t.Errorf("Id not unique, previous and current %s", id)
}
previous = id
}
}

142
client/job.go Normal file
View File

@ -0,0 +1,142 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"bytes"
"github.com/mikespook/gearman-go/common"
)
const (
// Job type
// JOB_NORMAL | JOB_BG means a normal level job run in background
// normal level
JOB_NORMAL = 0
// background job
JOB_BG = 1
// low level
JOB_LOW = 2
// high level
JOB_HIGH = 4
)
// An error handler
type ErrorHandler func(error)
// Client side job
type Job struct {
Data []byte
Handle, UniqueId string
magicCode, DataType uint32
}
// Create a new job
func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
return &Job{magicCode: magiccode,
DataType: datatype,
Data: data}
}
// Decode a job from byte slice
func decodeJob(data []byte) (job *Job, err error) {
if len(data) < 12 {
return nil, common.Errorf("Invalid data: %V", data)
}
datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) {
return nil, common.Errorf("Invalid data: %V", data)
}
data = data[12:]
var handle string
switch datatype {
case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
i := bytes.IndexByte(data, '\x00')
if i != -1 {
handle = string(data[:i])
data = data[i + 1:]
}
}
return &Job{magicCode: common.RES,
DataType: datatype,
Data: data,
Handle: handle}, nil
}
// Encode a job to byte slice
func (job *Job) Encode() (data []byte) {
l := len(job.Data)
tl := l + 12
data = make([]byte, tl)
magiccode := common.Uint32ToBytes(job.magicCode)
datatype := common.Uint32ToBytes(job.DataType)
datalength := common.Uint32ToBytes(uint32(l))
for i := 0; i < tl; i ++ {
switch {
case i < 4:
data[i] = magiccode[i]
case i < 8:
data[i] = datatype[i - 4]
case i < 12:
data[i] = datalength[i - 8]
default:
data[i] = job.Data[i - 12]
}
}
// Alternative
/*
data = append(data, magiccode[:] ...)
data = append(data, datatype[:] ...)
data = append(data, datalength[:] ...)
data = append(data, job.Data ...)
*/
return
}
// Extract the job's result.
func (job *Job) Result() (data []byte, err error) {
switch job.DataType {
case common.WORK_FAIL:
job.Handle = string(job.Data)
return nil, common.ErrWorkFail
case common.WORK_EXCEPTION:
err = common.ErrWorkException
fallthrough
case common.WORK_COMPLETE:
s := bytes.SplitN(job.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
return nil, common.Errorf("Invalid data: %V", job.Data)
}
job.Handle = string(s[0])
data = s[1]
default:
err = common.ErrDataType
}
return
}
// Extract the job's update
func (job *Job) Update() (data []byte, err error) {
if job.DataType != common.WORK_DATA && job.DataType != common.WORK_WARNING {
err = common.ErrDataType
return
}
s := bytes.SplitN(job.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = common.ErrInvalidData
return
}
if job.DataType == common.WORK_WARNING {
err = common.ErrWorkWarning
}
job.Handle = string(s[0])
data = s[1]
return
}

View File

@ -1,29 +1,35 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client package client
import ( import (
"sync"
"time"
"errors" "errors"
"math/rand" "math/rand"
"sync" "github.com/mikespook/gearman-go/common"
) )
const ( const (
poolSize = 10 PoolSize = 10
) )
var ( var (
ErrNotFound = errors.New("Server Not Found") ErrNotFound = errors.New("Server Not Found")
) )
type PoolClient struct { type poolClient struct {
*Client *Client
Rate int Rate int
mutex sync.Mutex
} }
type SelectionHandler func(map[string]*PoolClient, string) string type SelectionHandler func(map[string]*poolClient, string) string
func SelectWithRate(pool map[string]*PoolClient, func SelectWithRate(pool map[string]*poolClient,
last string) (addr string) { last string) (addr string) {
total := 0 total := 0
for _, item := range pool { for _, item := range pool {
total += item.Rate total += item.Rate
@ -34,52 +40,52 @@ func SelectWithRate(pool map[string]*PoolClient,
return last return last
} }
func SelectRandom(pool map[string]*PoolClient, func SelectRandom(pool map[string]*poolClient,
last string) (addr string) { last string) (addr string) {
r := rand.Intn(len(pool)) r := rand.Intn(len(pool))
i := 0 i := 0
for k, _ := range pool { for k, _ := range pool {
if r == i { if r == i {
return k return k
} }
i++ i ++
} }
return last return last
} }
type Pool struct { type Pool struct {
SelectionHandler SelectionHandler SelectionHandler SelectionHandler
ErrorHandler ErrorHandler ErrHandler common.ErrorHandler
Clients map[string]*PoolClient
clients map[string]*poolClient
last string last string
mutex sync.Mutex mutex sync.Mutex
} }
// NewPool returns a new pool. // Create a new pool.
func NewPool() (pool *Pool) { func NewPool() (pool *Pool) {
return &Pool{ return &Pool{
Clients: make(map[string]*PoolClient, poolSize), clients: make(map[string]*poolClient, PoolSize),
SelectionHandler: SelectWithRate, SelectionHandler: SelectWithRate,
} }
} }
// Add a server with rate. // Add a server with rate.
func (pool *Pool) Add(net, addr string, rate int) (err error) { func (pool *Pool) Add(addr string, rate int) (err error) {
pool.mutex.Lock() pool.mutex.Lock()
defer pool.mutex.Unlock() defer pool.mutex.Unlock()
var item *PoolClient var item *poolClient
var ok bool var ok bool
if item, ok = pool.Clients[addr]; ok { if item, ok = pool.clients[addr]; ok {
item.Rate = rate item.Rate = rate
} else { } else {
var client *Client var client *Client
client, err = New(net, addr) client, err = New(addr)
if err == nil { item = &poolClient{Client: client, Rate: rate}
item = &PoolClient{Client: client, Rate: rate} pool.clients[addr] = item
pool.Clients[addr] = item
}
} }
return return
} }
@ -88,36 +94,30 @@ func (pool *Pool) Add(net, addr string, rate int) (err error) {
func (pool *Pool) Remove(addr string) { func (pool *Pool) Remove(addr string) {
pool.mutex.Lock() pool.mutex.Lock()
defer pool.mutex.Unlock() defer pool.mutex.Unlock()
delete(pool.Clients, addr) delete(pool.clients, addr)
} }
func (pool *Pool) Do(funcname string, data []byte, func (pool *Pool) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (addr, handle string, err error) { flag byte, h JobHandler) (addr, handle string) {
client := pool.selectServer() client := pool.selectServer()
client.Lock() handle = client.Do(funcname, data, flag, h)
defer client.Unlock()
handle, err = client.Do(funcname, data, flag, h)
addr = client.addr addr = client.addr
return return
} }
func (pool *Pool) DoBg(funcname string, data []byte, func (pool *Pool) DoBg(funcname string, data []byte,
flag byte) (addr, handle string, err error) { flag byte) (addr, handle string) {
client := pool.selectServer() client := pool.selectServer()
client.Lock() handle = client.DoBg(funcname, data, flag)
defer client.Unlock()
handle, err = client.DoBg(funcname, data, flag)
addr = client.addr addr = client.addr
return return
} }
// Status gets job status from job server. // Get job status from job server.
// !!!Not fully tested.!!! // !!!Not fully tested.!!!
func (pool *Pool) Status(addr, handle string) (status *Status, err error) { func (pool *Pool) Status(addr, handle string, timeout time.Duration) (status *Status, err error) {
if client, ok := pool.Clients[addr]; ok { if client, ok := pool.clients[addr]; ok {
client.Lock() status, err = client.Status(handle, timeout)
defer client.Unlock()
status, err = client.Status(handle)
} else { } else {
err = ErrNotFound err = ErrNotFound
} }
@ -125,38 +125,35 @@ func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
} }
// Send a something out, get the samething back. // Send a something out, get the samething back.
func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { func (pool *Pool) Echo(addr string, data []byte, timeout time.Duration) (r []byte, err error) {
var client *PoolClient var client *poolClient
if addr == "" { if addr == "" {
client = pool.selectServer() client = pool.selectServer()
} else { } else {
var ok bool var ok bool
if client, ok = pool.Clients[addr]; !ok { if client, ok = pool.clients[addr]; !ok {
err = ErrNotFound err = ErrNotFound
return return
} }
} }
client.Lock() r, err = client.Echo(data, timeout)
defer client.Unlock()
echo, err = client.Echo(data)
return return
} }
// Close // Close
func (pool *Pool) Close() (err map[string]error) { func (pool *Pool) Close() (err map[string]error) {
err = make(map[string]error) err = make(map[string]error)
for _, c := range pool.Clients { for _, c := range pool.clients {
err[c.addr] = c.Close() err[c.addr] = c.Close()
} }
return return
} }
// selecting server func (pool *Pool) selectServer() (client *poolClient) {
func (pool *Pool) selectServer() (client *PoolClient) {
for client == nil { for client == nil {
addr := pool.SelectionHandler(pool.Clients, pool.last) addr := pool.SelectionHandler(pool.clients, pool.last)
var ok bool var ok bool
if client, ok = pool.Clients[addr]; ok { if client, ok = pool.clients[addr]; ok {
pool.last = addr pool.last = addr
break break
} }

View File

@ -1,6 +1,7 @@
package client package client
import ( import (
"time"
"testing" "testing"
) )
@ -9,54 +10,38 @@ var (
) )
func TestPoolAdd(t *testing.T) { func TestPoolAdd(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add servers") t.Log("Add servers")
c := 2 if err := pool.Add("127.0.0.1:4730", 1); err != nil {
if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil { t.Error(err)
t.Fatal(err)
} }
if err := pool.Add("tcp4", "127.0.1.1:4730", 1); err != nil { if err := pool.Add("127.0.0.1:4730", 1); err != nil {
t.Log(err) t.Error(err)
c -= 1
} }
if len(pool.Clients) != c { if len(pool.clients) != 2 {
t.Errorf("%d servers expected, %d got.", c, len(pool.Clients)) t.Errorf("2 servers expected, %d got.", len(pool.clients))
} }
} }
func TestPoolEcho(t *testing.T) { func TestPoolEcho(t *testing.T) {
if !runIntegrationTests { echo, err := pool.Echo("", []byte("Hello pool"), time.Second)
t.Skip("To run this test, use: go test -integration")
}
echo, err := pool.Echo("", []byte(TestStr))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
if string(echo) != TestStr { if string(echo) != "Hello pool" {
t.Errorf("Invalid echo data: %s", echo) t.Errorf("Invalid echo data: %s", echo)
return return
} }
_, err = pool.Echo("not exists", []byte(TestStr)) _, err = pool.Echo("not exists", []byte("Hello pool"), time.Second)
if err != ErrNotFound { if err != ErrNotFound {
t.Errorf("ErrNotFound expected, got %s", err) t.Errorf("ErrNotFound expected, got %s", err)
} }
} }
func TestPoolDoBg(t *testing.T) { func TestPoolDoBg(t *testing.T) {
if !runIntegrationTests { if addr, handle := pool.DoBg("ToUpper", []byte("abcdef"),
t.Skip("To run this test, use: go test -integration") JOB_LOW); handle == "" {
}
addr, handle, err := pool.DoBg("ToUpper",
[]byte("abcdef"), JobLow)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.") t.Error("Handle is empty.")
} else { } else {
t.Log(addr, handle) t.Log(addr, handle)
@ -64,10 +49,7 @@ func TestPoolDoBg(t *testing.T) {
} }
func TestPoolDo(t *testing.T) { func TestPoolDo(t *testing.T) {
if !runIntegrationTests { jobHandler := func(job *Job) {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data) str := string(job.Data)
if str == "ABCDEF" { if str == "ABCDEF" {
t.Log(str) t.Log(str)
@ -76,12 +58,8 @@ func TestPoolDo(t *testing.T) {
} }
return return
} }
addr, handle, err := pool.Do("ToUpper", if addr, handle := pool.Do("ToUpper", []byte("abcdef"),
[]byte("abcdef"), JobLow, jobHandler) JOB_LOW, jobHandler); handle == "" {
if err != nil {
t.Error(err)
}
if handle == "" {
t.Error("Handle is empty.") t.Error("Handle is empty.")
} else { } else {
t.Log(addr, handle) t.Log(addr, handle)
@ -89,49 +67,39 @@ func TestPoolDo(t *testing.T) {
} }
func TestPoolStatus(t *testing.T) { func TestPoolStatus(t *testing.T) {
if !runIntegrationTests { s1, err := pool.Status("127.0.0.1:4730", "handle not exists", time.Second)
t.Skip("To run this test, use: go test -integration")
}
status, err := pool.Status("127.0.0.1:4730", "handle not exists")
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
if status.Known { if s1.Known {
t.Errorf("The job (%s) shouldn't be known.", status.Handle) t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
} }
if status.Running { if s1.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle) t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
} }
addr, handle, err := pool.Do("Delay5sec",
[]byte("abcdef"), JobLow, nil) addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil);
if err != nil { s2, err := pool.Status(addr, handle, time.Second)
t.Error(err)
return
}
status, err = pool.Status(addr, handle)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
if !status.Known { if !s2.Known {
t.Errorf("The job (%s) should be known.", status.Handle) t.Errorf("The job (%s) should be known.", s2.Handle)
} }
if status.Running { if s2.Running {
t.Errorf("The job (%s) shouldn't be running.", status.Handle) t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
} }
status, err = pool.Status("not exists", "not exists")
_, err = pool.Status("not exists", "not exists", time.Second)
if err != ErrNotFound { if err != ErrNotFound {
t.Error(err) t.Error(err)
return
} }
} }
func TestPoolClose(t *testing.T) { func TestPoolClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
return return
if err := pool.Close(); err != nil { if err := pool.Close(); err != nil {
t.Error(err) t.Error(err)

View File

@ -1,42 +0,0 @@
package client
import (
"encoding/binary"
)
// Request from client
type request struct {
DataType uint32
Data []byte
}
// Encode a Request to byte slice
func (req *request) Encode() (data []byte) {
l := len(req.Data) // length of data
tl := l + minPacketLength // add 12 bytes head
data = getBuffer(tl)
copy(data[:4], reqStr)
binary.BigEndian.PutUint32(data[4:8], req.DataType)
binary.BigEndian.PutUint32(data[8:12], uint32(l))
copy(data[minPacketLength:], req.Data)
return
}
func getRequest() (req *request) {
// TODO add a pool
req = &request{}
return
}
func getJob(id string, funcname, data []byte) (req *request) {
req = getRequest()
a := len(funcname)
b := len(id)
c := len(data)
l := a + b + c + 2
req.Data = getBuffer(l)
copy(req.Data[0:a], funcname)
copy(req.Data[a+1:a+b+1], []byte(id))
copy(req.Data[a+b+2:], data)
return
}

View File

@ -1,156 +0,0 @@
package client
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
)
// Response handler
type ResponseHandler func(*Response)
// response
type Response struct {
DataType uint32
Data, UID []byte
Handle string
}
// Extract the Response's result.
// if data == nil, err != nil, then worker failing to execute job
// if data != nil, err != nil, then worker has a exception
// if data != nil, err == nil, then worker complate job
// after calling this method, the Response.Handle will be filled
func (resp *Response) Result() (data []byte, err error) {
switch resp.DataType {
case dtWorkFail:
resp.Handle = string(resp.Data)
err = ErrWorkFail
return
case dtWorkException:
err = ErrWorkException
fallthrough
case dtWorkComplete:
data = resp.Data
default:
err = ErrDataType
}
return
}
// Extract the job's update
func (resp *Response) Update() (data []byte, err error) {
if resp.DataType != dtWorkData &&
resp.DataType != dtWorkWarning {
err = ErrDataType
return
}
data = resp.Data
if resp.DataType == dtWorkWarning {
err = ErrWorkWarning
}
return
}
// Decode a job from byte slice
func decodeResponse(data []byte) (resp *Response, l int, err error) {
a := len(data)
if a < minPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %v", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
if a < minPacketLength+dl {
err = fmt.Errorf("Invalid data: %v", data)
return
}
dt := data[minPacketLength : dl+minPacketLength]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %v", data)
return
}
resp = getResponse()
resp.DataType = binary.BigEndian.Uint32(data[4:8])
switch resp.DataType {
case dtJobCreated:
resp.Handle = string(dt)
case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus,
dtWorkComplete, dtWorkException:
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 2 {
resp.Handle = string(s[0])
resp.Data = s[1]
} else {
err = fmt.Errorf("Invalid data: %v", data)
return
}
case dtWorkFail:
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 1 {
resp.Handle = string(s[0])
} else {
err = fmt.Errorf("Invalid data: %v", data)
return
}
case dtEchoRes:
fallthrough
default:
resp.Data = dt
}
l = dl + minPacketLength
return
}
func (resp *Response) Status() (status *Status, err error) {
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
if len(data) != 2 {
err = fmt.Errorf("Invalid data: %v", resp.Data)
return
}
status = &Status{}
status.Handle = resp.Handle
status.Known = true
status.Running = true
status.Numerator, err = strconv.ParseUint(string(data[0]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[0])
return
}
status.Denominator, err = strconv.ParseUint(string(data[1]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[1])
return
}
return
}
// status handler
func (resp *Response) _status() (status *Status, err error) {
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 4)
if len(data) != 4 {
err = fmt.Errorf("Invalid data: %v", resp.Data)
return
}
status = &Status{}
status.Handle = resp.Handle
status.Known = (data[0][0] == '1')
status.Running = (data[1][0] == '1')
status.Numerator, err = strconv.ParseUint(string(data[2]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[2])
return
}
status.Denominator, err = strconv.ParseUint(string(data[3]), 10, 0)
if err != nil {
err = fmt.Errorf("Invalid Integer: %s", data[3])
return
}
return
}
func getResponse() (resp *Response) {
// TODO add a pool
resp = &Response{}
return
}

View File

@ -1,9 +1,5 @@
package client package client
// Status handler
// handle, known, running, numerator, denominator
type StatusHandler func(string, bool, bool, uint64, uint64)
type Status struct { type Status struct {
Handle string Handle string
Known, Running bool Known, Running bool

54
common/error.go Normal file
View File

@ -0,0 +1,54 @@
// Copyright 2011 - 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package common
import (
"fmt"
"bytes"
"errors"
"syscall"
)
var (
ErrJobTimeOut = errors.New("Do a job time out")
ErrInvalidData = errors.New("Invalid data")
ErrWorkWarning = errors.New("Work warning")
ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type")
ErrOutOfCap = errors.New("Out of the capability")
ErrNotConn = errors.New("Did not connect to job server")
ErrFuncNotFound = errors.New("The function was not found")
ErrConnection = errors.New("Connection error")
ErrNoActiveAgent = errors.New("No active agent")
ErrTimeOut = errors.New("Executing time out")
ErrUnknown = errors.New("Unknown error")
ErrConnClosed = errors.New("Connection closed")
)
func DisablePanic() {recover()}
// Extract the error message
func GetError(data []byte) (eno syscall.Errno, err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = Errorf("Not a error data: %V", data)
return
}
l := len(rel[0])
eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]}))
err = errors.New(string(rel[1]))
return
}
// Get a formated error
func Errorf(format string, msg ... interface{}) error {
return errors.New(fmt.Sprintf(format, msg ... ))
}
// An error handler
type ErrorHandler func(error)

86
common/gearman.go Normal file
View File

@ -0,0 +1,86 @@
// Copyright 2011 - 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package common
import (
"bytes"
"encoding/binary"
)
const (
NETWORK = "tcp"
// queue size
QUEUE_SIZE = 8
// read buffer size
BUFFER_SIZE = 1024
// min packet length
PACKET_LEN = 12
// \x00REQ
REQ = 5391697
REQ_STR = "\x00REQ"
// \x00RES
RES = 5391699
RES_STR = "\x00RES"
// package data type
CAN_DO = 1
CANT_DO = 2
RESET_ABILITIES = 3
PRE_SLEEP = 4
NOOP = 6
JOB_CREATED = 8
GRAB_JOB = 9
NO_JOB = 10
JOB_ASSIGN = 11
WORK_STATUS = 12
WORK_COMPLETE = 13
WORK_FAIL = 14
GET_STATUS = 15
ECHO_REQ = 16
ECHO_RES = 17
ERROR = 19
STATUS_RES = 20
SET_CLIENT_ID = 22
CAN_DO_TIMEOUT = 23
WORK_EXCEPTION = 25
WORK_DATA = 28
WORK_WARNING = 29
GRAB_JOB_UNIQ = 30
JOB_ASSIGN_UNIQ = 31
SUBMIT_JOB = 7
SUBMIT_JOB_BG = 18
SUBMIT_JOB_HIGH = 21
SUBMIT_JOB_HIGH_BG = 32
SUBMIT_JOB_LOW = 33
SUBMIT_JOB_LOW_BG = 34
)
// Decode [4]byte to uint32
func BytesToUint32(buf [4]byte) uint32 {
var r uint32
b := bytes.NewBuffer(buf[:])
err := binary.Read(b, binary.BigEndian, &r)
if err != nil {
return 0
}
return r
}
// Encode uint32 to [4]byte
func Uint32ToBytes(i uint32) [4]byte {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.BigEndian, i)
if err != nil {
return [4]byte{0, 0, 0, 0}
}
var r [4]byte
for k, v := range buf.Bytes() {
r[k] = v
}
return r
}

48
common/gearman_test.go Normal file
View File

@ -0,0 +1,48 @@
package common
import (
"bytes"
"testing"
)
var (
testCase = map[uint32][4]byte {
0: [...]byte{0, 0, 0, 0},
1: [...]byte{0, 0, 0, 1},
256: [...]byte{0, 0, 1, 0},
256 * 256: [...]byte{0, 1, 0, 0},
256 * 256 * 256: [...]byte{1, 0, 0, 0},
256 * 256 * 256 + 256 * 256 + 256 + 1: [...]byte{1, 1, 1, 1},
4294967295 : [...]byte{0xFF, 0xFF, 0xFF, 0xFF},
}
)
func TestUint32ToBytes(t *testing.T) {
for k, v := range testCase {
b := Uint32ToBytes(k)
if bytes.Compare(b[:], v[:]) != 0 {
t.Errorf("%v was expected, but %v was got", v, b)
}
}
}
func TestBytesToUint32s(t *testing.T) {
for k, v := range testCase {
u := BytesToUint32([4]byte(v))
if u != k {
t.Errorf("%v was expected, but %v was got", k, u)
}
}
}
func BenchmarkByteToUnit32(b * testing.B) {
for i := 0; i < b.N; i++ {
BytesToUint32([4]byte{0xF, 0xF, 0xF, 0xF});
}
}
func BenchmarkUint32ToByte(b *testing.B) {
for i := 0; i < b.N; i++ {
Uint32ToBytes(123456);
}
}

View File

@ -1,80 +1,46 @@
package main package main
import ( import (
"github.com/mikespook/gearman-go/client"
"log" "log"
"os"
"sync" "sync"
"time"
"github.com/mikespook/gearman-go/client"
) )
func main() { func main() {
var wg sync.WaitGroup
// Set the autoinc id generator // Set the autoinc id generator
// You can write your own id generator // You can write your own id generator
// by implementing IdGenerator interface. // by implementing IdGenerator interface.
// client.IdGen = client.NewAutoIncId() // client.IdGen = client.NewAutoIncId()
c, err := client.New(client.Network, "127.0.0.1:4730") c, err := client.New("127.0.0.1:4730")
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
defer c.Close() defer c.Close()
c.ErrorHandler = func(e error) { c.ErrHandler = func(e error) {
log.Println(e) log.Println(e)
os.Exit(1)
} }
echo := []byte("Hello\x00 world") echo := []byte("Hello\x00 world")
echomsg, err := c.Echo(echo) wg.Add(1)
echomsg, err := c.Echo(echo, time.Second)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
log.Println(string(echomsg)) log.Println(string(echomsg))
jobHandler := func(resp *client.Response) { wg.Done()
switch resp.DataType { jobHandler := func(job *client.Job) {
case client.WorkException: log.Printf("%s", job.Data)
fallthrough wg.Done()
case client.WorkFail:
fallthrough
case client.WorkComplate:
if data, err := resp.Result(); err == nil {
log.Printf("RESULT: %v\n", data)
} else {
log.Printf("RESULT: %s\n", err)
} }
case client.WorkWarning: handle := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler)
fallthrough wg.Add(1)
case client.WorkData: status, err := c.Status(handle, time.Second)
if data, err := resp.Update(); err == nil {
log.Printf("UPDATE: %v\n", data)
} else {
log.Printf("UPDATE: %v, %s\n", data, err)
}
case client.WorkStatus:
if data, err := resp.Status(); err == nil {
log.Printf("STATUS: %v\n", data)
} else {
log.Printf("STATUS: %s\n", err)
}
default:
log.Printf("UNKNOWN: %v", resp.Data)
}
}
handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
status, err := c.Status(handle) log.Printf("%t", status)
if err != nil {
log.Fatalln(err)
}
log.Printf("%v", *status)
_, err = c.Do("Foobar", echo, client.JobNormal, jobHandler) wg.Wait()
if err != nil {
log.Fatalln(err)
}
log.Println("Press Ctrl-C to exit ...")
var mutex sync.Mutex
mutex.Lock()
mutex.Lock()
} }

View File

@ -0,0 +1,7 @@
= Exec Worker
This program execute shell or php scripts as backend jobs.
Scripts can communicat with the worker through STDERR using formated output.
USE THIS AT YOUR OWN RASK!!!

View File

@ -0,0 +1,40 @@
<?php
# create our client object
$gmclient= new GearmanClient();
# add the default server (localhost)
$gmclient->addServer();
$data = array(
'Name' => 'foobar',
'Args' => array("0", "1", "2", "3"),
);
$c = isset($_SERVER['argv'][1]) ? $_SERVER['argv'][1] : 10;
for ($i = 0; $i < $c; $i ++) {
# run reverse client in the background
$job_handle = $gmclient->doBackground("execphp", json_encode($data));
if ($gmclient->returnCode() != GEARMAN_SUCCESS) {
echo "bad return code\n";
exit;
}
}
/*
$data = array(
'Name' => 'notexists',
'Args' => array("0", "1", "2", "3"),
);
# run reverse client in the background
$job_handle = $gmclient->doBackground("exec", json_encode($data));
if ($gmclient->returnCode() != GEARMAN_SUCCESS)
{
echo "bad return code\n";
exit;
}
*/
echo "done!\n";

123
example/exec-worker/exec.go Normal file
View File

@ -0,0 +1,123 @@
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a commercial
// license that can be found in the LICENSE file.
package main
import (
"io"
"bytes"
"os/exec"
"encoding/json"
"github.com/mikespook/golib/log"
"github.com/mikespook/gearman-go/worker"
)
type outData struct {
Numerator, Denominator int
Warning bool
Data []byte
Debug string
}
type ShExec struct {
Name, basedir string
Args []string
*exec.Cmd
job *worker.Job
Logger *log.Logger
}
func NewShExec(basedir string, job *worker.Job) (sh *ShExec, err error) {
sh = &ShExec{
basedir: basedir,
job: job,
Args: make([]string, 0),
}
if err = sh.parse(job.Data); err != nil {
return nil, err
}
return
}
func (sh *ShExec) parse(data []byte) (err error) {
if err = json.Unmarshal(data, sh); err != nil {
return
}
return
}
func (sh *ShExec) Append(args ... string) {
sh.Args = append(sh.Args, args ...)
}
func (sh *ShExec) Prepend(args ... string) {
sh.Args = append(args, sh.Args ...)
}
func (sh *ShExec) Exec() (rslt []byte, err error){
sh.Logger.Debugf("Executing: Handle=%s, Exec=%s, Args=%v",
sh.job.Handle, sh.Name, sh.Args)
sh.Cmd = exec.Command(sh.Name, sh.Args ... )
go func() {
if ok := <-sh.job.Canceled(); ok {
sh.Cmd.Process.Kill()
}
}()
sh.Cmd.Dir = sh.basedir
var buf bytes.Buffer
sh.Cmd.Stdout = &buf
var errPipe io.ReadCloser
if errPipe, err = sh.Cmd.StderrPipe(); err != nil {
return nil, err
}
defer errPipe.Close()
go sh.processErr(errPipe)
if err = sh.Cmd.Run(); err != nil {
return nil, err
}
rslt = buf.Bytes()
return
}
func (sh *ShExec) processErr(pipe io.ReadCloser) {
result := make([]byte, 1024)
var more []byte
for {
n, err := pipe.Read(result)
if err != nil {
if err != io.EOF {
sh.job.UpdateData([]byte(err.Error()), true)
}
return
}
if more != nil {
result = append(more, result[:n]...)
} else {
result = result[:n]
}
if n < 1024 {
var out outData
if err := json.Unmarshal(result, &out); err != nil {
sh.job.UpdateData([]byte(result), true)
return
}
if out.Debug == "" {
if out.Data != nil {
sh.job.UpdateData(out.Data, out.Warning)
}
if out.Numerator != 0 || out.Denominator != 0 {
sh.job.UpdateStatus(out.Numerator, out.Denominator)
}
} else {
sh.Logger.Debugf("Debug: Handle=%s, Exec=%s, Args=%v, Data=%s",
sh.job.Handle, sh.Name, sh.Args, out.Debug)
}
more = nil
} else {
more = result
}
}
}

View File

@ -0,0 +1,38 @@
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a commercial
// license that can be found in the LICENSE file.
package main
import (
"github.com/mikespook/golib/log"
"github.com/mikespook/gearman-go/worker"
)
func execShell(job *worker.Job) (result []byte, err error) {
log.Messagef("[Shell]Received: Handle=%s", job.Handle)
defer log.Messagef("[Shell]Finished: Handle=%s", job.Handle)
log.Debugf("[Shell]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data)
var sh *ShExec
if sh, err = NewShExec(*basedir, job); err != nil {
return
}
sh.Logger = log.DefaultLogger
return sh.Exec()
}
func execPHP(job *worker.Job) (result []byte, err error) {
log.Messagef("[PHP]Received: Handle=%s", job.Handle)
defer log.Messagef("[PHP]Finished: Handle=%s", job.Handle)
log.Debugf("[PHP]Received: Handle=%s, UID=%s, Data=%v", job.Handle, job.UniqueId, job.Data)
var sh *ShExec
if sh, err = NewShExec(*basedir, job); err != nil {
return
}
sh.Prepend("-f", sh.Name + ".php")
sh.Name = "php"
sh.Logger = log.DefaultLogger
return sh.Exec()
}

View File

@ -0,0 +1,40 @@
<?php
class X {
private $stderr;
function __construct() {
$this->stderr = fopen("php://stderr", "r");
}
public function sendMsg($msg, $warning = false, $numerator = 0, $denominator = 0) {
$result = array(
'Numerator' => $numerator,
'Denominator' => $denominator,
'Warning' => $warning,
'Data' => $msg,
);
fwrite($this->stderr, json_encode($result));
}
public function debug($msg) {
$result = array(
'Debug' => $msg,
);
fwrite($this->stderr, json_encode($result));
}
public function close() {
fclose($this->stderr);
}
public function argv($index) {
$argv = $_SERVER['argv'];
return isset($argv[$index]) ? $argv[$index] : null;
}
public function argc() {
$argc = $_SERVER['argc'];
return $argc;
}
}

View File

@ -0,0 +1,11 @@
<?php
function autoloader($class) {
$names = split("_", $class);
if (count($names) > 1) {
include implode(DIRECTORY_SEPARATOR, $names) . '.php';
} else {
include $class . '.php';
}
}
spl_autoload_register('autoloader');

View File

@ -0,0 +1,47 @@
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a commercial
// license that can be found in the LICENSE file.
package main
import (
"github.com/mikespook/golib/log"
"flag"
"strings"
)
var (
logfile = flag.String("log", "",
"Log file to write errors and information to." +
" Empty string output to STDOUT.")
loglevel = flag.String("log-level", "all", "Log level to record." +
" Values 'error', 'warning', 'message', 'debug', 'all' and 'none'" +
" are accepted. Use '|' to combine more levels.")
)
func initLog() {
level := log.LogNone
levels := strings.SplitN(*loglevel, "|", -1)
for _, v := range levels {
switch v {
case "none":
level = level | log.LogNone
break
case "error":
level = level | log.LogError
case "warning":
level = level | log.LogWarning
case "message":
level = level | log.LogMessage
case "debug":
level = level | log.LogDebug
case "all":
level = log.LogAll
default:
}
}
if err := log.Init(*logfile, level); err != nil {
log.Error(err)
}
}

View File

@ -0,0 +1,8 @@
<?php
define('ISCLI', PHP_SAPI === 'cli');
if (!ISCLI) {
die("cli only!");
}
define("ROOT", dirname(__FILE__));
define("LIB", ROOT . "/../lib/");
include_once(LIB . "bootstrap.php");

View File

@ -0,0 +1,8 @@
<?php
include("default.php");
$x = new X();
$x->sendMsg("test");
$x->debug("debug message");
sleep(10);
$x->close();
exit(0);

View File

@ -0,0 +1,106 @@
// Copyright 2012 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a commercial
// license that can be found in the LICENSE file.
package main
import (
"os"
"flag"
"time"
"github.com/mikespook/golib/log"
"github.com/mikespook/golib/pid"
"github.com/mikespook/golib/prof"
"github.com/mikespook/golib/signal"
"github.com/mikespook/gearman-go/worker"
)
var (
pidfile = flag.String("pid", "/run/seedworker.pid",
"PID file to write pid")
proffile = flag.String("prof", "", "Profiling file")
dumpfile = flag.String("dump", "", "Heap dumping file")
dumptime = flag.Int("dumptime", 5, "Heap dumping time interval")
joblimit = flag.Int("job-limit", worker.Unlimited,
"Maximum number of concurrently executing job." +
" Zero is unlimited.")
basedir = flag.String("basedir", "", "Working directory of the php scripts.")
timeout = flag.Uint("timeout", 30, "Executing time out.")
gearmand = flag.String("gearmand", "127.0.0.1:4730", "Address and port of gearmand")
)
func init() {
flag.Parse()
initLog()
if *basedir == "" {
*basedir = "./script/"
}
}
func main() {
log.Message("Starting ... ")
defer func() {
time.Sleep(time.Second)
log.Message("Shutdown complate!")
}()
// init profiling file
if *proffile != "" {
log.Debugf("Open a profiling file: %s", *proffile)
if err := prof.Start(*proffile); err != nil {
log.Error(err)
} else {
defer prof.Stop()
}
}
// init heap dumping file
if *dumpfile != "" {
log.Debugf("Open a heap dumping file: %s", *dumpfile)
if err := prof.NewDump(*dumpfile); err != nil {
log.Error(err)
} else {
defer prof.CloseDump()
go func() {
for prof.Dumping {
time.Sleep(time.Duration(*dumptime) * time.Second)
prof.Dump()
}
}()
}
}
// init pid file
log.Debugf("Open a pid file: %s", *pidfile)
if pidFile, err := pid.New(*pidfile); err != nil {
log.Error(err)
} else {
defer pidFile.Close()
}
w := worker.New(*joblimit)
if err := w.AddServer(*gearmand); err != nil {
log.Error(err)
return
}
if err := w.AddFunc("exec", execShell, uint32(*timeout)); err != nil {
log.Error(err)
return
}
if err := w.AddFunc("execphp", execPHP, uint32(*timeout)); err != nil {
log.Error(err)
return
}
defer w.Close()
go w.Work()
// signal handler
sh := signal.NewHandler()
sh.Bind(os.Interrupt, func() bool {return true})
sh.Loop()
}

View File

@ -1,33 +0,0 @@
#!/usr/bin/perl
# Runs 20 children that expose "PerlToUpper" before returning the result.
use strict; use warnings;
use constant CHILDREN => 20;
use Time::HiRes qw(usleep);
use Gearman::Worker;
$|++;
my @child_pids;
for (1 .. CHILDREN) {
if (my $pid = fork) {
push @child_pids, $pid;
next;
}
eval {
my $w = Gearman::Worker->new(job_servers => '127.0.0.1:4730');
$w->register_function(PerlToUpper => sub { print "."; uc $_[0]->arg });
$w->work while 1;
};
warn $@ if $@;
exit 0;
}
$SIG{INT} = $SIG{HUP} = sub {
kill 9, @child_pids;
print "\nChildren shut down, gracefully exiting\n";
exit 0;
};
printf "Forked %d children, serving 'PerlToUpper' function to gearman\n", CHILDREN;
sleep;

View File

@ -1,48 +1,39 @@
package main package main
import ( import (
"log"
"net"
"os" "os"
"strings" "log"
"time" "time"
"strings"
"github.com/mikespook/gearman-go/worker"
"github.com/mikespook/golib/signal" "github.com/mikespook/golib/signal"
"github.com/mikespook/gearman-go/worker"
) )
func ToUpper(job worker.Job) ([]byte, error) { func ToUpper(job *worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data()) log.Printf("ToUpper: Handle=[%s]; UID=[%s], Data=[%s]\n",
data := []byte(strings.ToUpper(string(job.Data()))) job.Handle, job.UniqueId, job.Data)
data := []byte(strings.ToUpper(string(job.Data)))
return data, nil return data, nil
} }
func ToUpperDelay10(job worker.Job) ([]byte, error) { func ToUpperDelay10(job *worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data()) log.Printf("ToUpperDelay10: Handle=[%s]; UID=[%s], Data=[%s]\n",
job.Handle, job.UniqueId, job.Data)
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
data := []byte(strings.ToUpper(string(job.Data()))) data := []byte(strings.ToUpper(string(job.Data)))
return data, nil return data, nil
} }
func Foobar(job worker.Job) ([]byte, error) {
log.Printf("Foobar: Data=[%s]\n", job.Data())
for i := 0; i < 10; i++ {
job.SendWarning([]byte{byte(i)})
job.SendData([]byte{byte(i)})
job.UpdateStatus(i+1, 100)
}
return job.Data(), nil
}
func main() { func main() {
log.Println("Starting ...") log.Println("Starting ...")
defer log.Println("Shutdown complete!") defer log.Println("Shutdown complete!")
w := worker.New(worker.Unlimited) w := worker.New(worker.Unlimited)
defer w.Close() defer w.Close()
w.ErrorHandler = func(e error) { w.ErrHandler = func(e error) {
log.Println(e) log.Println(e)
if opErr, ok := e.(*net.OpError); ok { if e == worker.ErrConnection {
if !opErr.Temporary() {
proc, err := os.FindProcess(os.Getpid()) proc, err := os.FindProcess(os.Getpid())
if err != nil { if err != nil {
log.Println(err) log.Println(err)
@ -52,23 +43,19 @@ func main() {
} }
} }
} }
} w.JobHandler = func(job *worker.Job) error {
w.JobHandler = func(job worker.Job) error { log.Printf("H=%s, UID=%s, Data=%s, DataType=%d\n", job.Handle,
log.Printf("Data=%s\n", job.Data()) job.UniqueId, job.Data, job.DataType)
return nil return nil
} }
w.AddServer("tcp4", "127.0.0.1:4730") w.AddServer("127.0.0.1:4730")
w.AddFunc("Foobar", Foobar, worker.Unlimited) w.AddFunc("ToUpper", ToUpper, worker.Immediately)
w.AddFunc("ToUpper", ToUpper, worker.Unlimited)
w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
w.AddFunc("SysInfo", worker.SysInfo, worker.Unlimited) w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately)
w.AddFunc("MemInfo", worker.MemInfo, worker.Unlimited) w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately)
if err := w.Ready(); err != nil {
log.Fatal(err)
return
}
go w.Work() go w.Work()
signal.Bind(os.Interrupt, func() uint { return signal.BreakExit }) sh := signal.NewHandler()
signal.Wait() sh.Bind(os.Interrupt, func() bool {return true})
sh.Loop()
} }

View File

@ -3,17 +3,14 @@
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
/* /*
This module is a Gearman API for the Go Programming Language. This module is Gearman API for golang.
The protocols were written in pure Go. It contains two sub-packages: The protocol was implemented by native way.
The client package is used for sending jobs to the Gearman job server,
and getting responses from the server.
import "github.com/mikespook/gearman-go/client"
The worker package will help developers to develop Gearman's worker
in an easy way.
import "github.com/mikespook/gearman-go/worker"
*/ */
package gearman package gearman
import (
_ "github.com/mikespook/gearman-go/common"
_ "github.com/mikespook/gearman-go/client"
_ "github.com/mikespook/gearman-go/worker"
)

131
gearman_test.go Normal file
View File

@ -0,0 +1,131 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
/*
This module is Gearman API for golang.
The protocol was implemented by native way.
*/
package gearman
import (
"time"
"sync"
"testing"
"strings"
"github.com/mikespook/gearman-go/client"
"github.com/mikespook/gearman-go/worker"
)
const(
STR = "The gearman-go is a pure go implemented library."
GEARMAND = "127.0.0.1:4730"
)
func ToUpper(job *worker.Job) ([]byte, error) {
data := []byte(strings.ToUpper(string(job.Data)))
return data, nil
}
func Sleep(job *worker.Job) ([]byte, error) {
time.Sleep(time.Second * 5)
return nil, nil
}
func TestJobs(t *testing.T) {
w := worker.New(worker.Unlimited)
if err := w.AddServer(GEARMAND); err != nil {
t.Error(err)
return
}
defer w.Close()
if err := w.AddFunc("ToUpper", ToUpper, 0); err != nil {
t.Error(err)
return
}
if err := w.AddFunc("Sleep", Sleep, 0); err != nil {
t.Error(err)
return
}
w.ErrHandler = func(e error) {
t.Error(e)
}
go w.Work()
c, err := client.New(GEARMAND)
if err != nil {
t.Error(err)
return
}
defer c.Close()
c.ErrHandler = func(e error) {
// t.Error(e)
t.Log(e)
}
{
var w sync.WaitGroup
jobHandler := func(job *client.Job) {
upper := strings.ToUpper(STR)
if (string(job.Data) != upper) {
t.Error("%s expected, got %s", []byte(upper), job.Data)
}
w.Done()
}
w.Add(1)
handle := c.Do("ToUpper", []byte(STR), client.JOB_NORMAL, jobHandler)
w.Wait()
status, err := c.Status(handle, time.Second)
if err != nil {
t.Error(err)
return
}
if status.Known {
t.Errorf("%s shouldn't be known", status.Handle)
return
}
if status.Running {
t.Errorf("%s shouldn't be running", status.Handle)
}
}
{
handle := c.DoBg("Sleep", nil, client.JOB_NORMAL)
time.Sleep(time.Second)
status, err := c.Status(handle, time.Second)
if err != nil {
t.Error(err)
return
}
if !status.Known {
t.Errorf("%s should be known", status.Handle)
return
}
if !status.Running {
t.Errorf("%s should be running", status.Handle)
}
}
{
status, err := c.Status("not exists handle", time.Second)
if err != nil {
t.Error(err)
return
}
if status.Known {
t.Errorf("%s shouldn't be known", status.Handle)
return
}
if status.Running {
t.Errorf("%s shouldn't be running", status.Handle)
}
}
}

View File

@ -1,229 +1,203 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker package worker
import ( import (
"bufio"
"bytes"
"encoding/binary"
"io" "io"
"net" "net"
"sync" "github.com/mikespook/gearman-go/common"
) )
// The agent of job server. // The agent of job server.
type agent struct { type agent struct {
sync.Mutex
conn net.Conn conn net.Conn
rw *bufio.ReadWriter
worker *Worker worker *Worker
in chan []byte in chan []byte
net, addr string out chan *Job
addr string
} }
// Create the agent of job server. // Create the agent of job server.
func newAgent(net, addr string, worker *Worker) (a *agent, err error) { func newAgent(addr string, worker *Worker) (a *agent, err error) {
conn, err := net.Dial(common.NETWORK, addr)
if err != nil {
return
}
a = &agent{ a = &agent{
net: net, conn: conn,
addr: addr,
worker: worker, worker: worker,
in: make(chan []byte, queueSize), addr: addr,
in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE),
} }
// reset abilities
a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil))
return return
} }
func (a *agent) Connect() (err error) { // outputing loop
a.Lock() func (a *agent) outLoop() {
defer a.Unlock() ok := true
a.conn, err = net.Dial(a.net, a.addr) var job *Job
if err != nil { for a.worker.running && ok {
return if job, ok = <-a.out; ok {
if err := a.write(job.Encode()); err != nil {
a.worker.err(err)
}
}
} }
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
go a.work()
return
} }
func (a *agent) work() { // inputing loop
func (a *agent) inLoop() {
defer func() { defer func() {
if err := recover(); err != nil { if r := recover(); r != nil {
a.worker.err(err.(error)) a.worker.err(common.Errorf("Exiting: %s", r))
} }
close(a.in)
close(a.out)
a.worker.removeAgent(a)
}() }()
for a.worker.running {
var inpack *inPack a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil))
var l int RESTART:
var err error // got noop msg and in queue is zero, grab job
var data, leftdata []byte rel, err := a.read()
for { if err != nil {
if data, err = a.read(); err != nil { if err == common.ErrConnection {
if opErr, ok := err.(*net.OpError); ok { for i := 0; i < 3 && a.worker.running; i++ {
if opErr.Temporary() { if conn, err := net.Dial(common.NETWORK, a.addr); err != nil {
a.worker.err(common.Errorf("Reconnection: %d faild", i))
continue continue
} else { } else {
a.disconnect_error(err) a.conn = conn
// else - we're probably dc'ing due to a Close() goto RESTART
break
} }
}
} else if err == io.EOF { a.worker.err(err)
a.disconnect_error(err)
break break
} }
a.worker.err(err) a.worker.err(err)
// If it is unexpected error and the connection wasn't continue
// closed by Gearmand, the agent should close the conection }
// and reconnect to job server. job, err := decodeJob(rel)
a.Close()
a.conn, err = net.Dial(a.net, a.addr)
if err != nil { if err != nil {
a.worker.err(err) a.worker.err(err)
break
}
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
}
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
}
if len(data) < minPacketLength { // not enough data
leftdata = data
continue continue
} }
for { switch job.DataType {
if inpack, l, err = decodeInPack(data); err != nil { case common.NOOP:
a.worker.err(err) a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil))
leftdata = data case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
break if a.worker.running {
} else { if a.worker.limit != nil {
leftdata = nil a.worker.limit <- true
inpack.a = a
select {
case <-a.worker.closed:
return
default:
} }
a.worker.in <- inpack job.agent = a
if len(data) == l { a.worker.in <- job
break
}
if len(data) > l {
data = data[l:]
} }
} }
} }
}
}
func (a *agent) disconnect_error(err error) {
a.Lock()
defer a.Unlock()
if a.conn != nil {
err = &WorkerDisconnectError{
err: err,
agent: a,
}
a.worker.err(err)
}
} }
func (a *agent) Close() { func (a *agent) Close() {
a.Lock()
defer a.Unlock()
if a.conn != nil {
a.conn.Close() a.conn.Close()
a.conn = nil
}
} }
func (a *agent) Grab() { func (a *agent) Work() {
a.Lock() go a.outLoop()
defer a.Unlock() go a.inLoop()
a.grab()
} }
func (a *agent) grab() bool { func (a *agent) readData(length int) (data []byte, err error) {
if a.worker.closed != nil {
return false
}
outpack := getOutPack()
outpack.dataType = dtGrabJobUniq
a.write(outpack)
return true
}
func (a *agent) PreSleep() {
a.Lock()
defer a.Unlock()
outpack := getOutPack()
outpack.dataType = dtPreSleep
a.write(outpack)
}
func (a *agent) reconnect() error {
a.Lock()
defer a.Unlock()
conn, err := net.Dial(a.net, a.addr)
if err != nil {
return err
}
a.conn = conn
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
a.worker.reRegisterFuncsForAgent(a)
if a.grab() {
go a.work()
}
return nil
}
// read length bytes from the socket
func (a *agent) read() (data []byte, err error) {
n := 0 n := 0
buf := make([]byte, common.BUFFER_SIZE)
tmp := getBuffer(bufferSize) // read until data can be unpacked
var buf bytes.Buffer for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n {
if n, err = a.conn.Read(buf); err != nil {
// read the header so we can get the length of the data if err == io.EOF && n == 0 {
if n, err = a.rw.Read(tmp); err != nil { if data == nil {
err = common.ErrConnection
return return
} }
dl := int(binary.BigEndian.Uint32(tmp[8:12])) return data, nil
// write what we read so far
buf.Write(tmp[:n])
// read until we receive all the data
for buf.Len() < dl+minPacketLength {
if n, err = a.rw.Read(tmp); err != nil {
return buf.Bytes(), err
} }
return
buf.Write(tmp[:n])
} }
data = append(data, buf[0:n]...)
if n < common.BUFFER_SIZE {
break
}
}
return
}
return buf.Bytes(), err func (a *agent) unpack(data []byte) ([]byte, int, bool) {
tl := len(data)
start := 0
for i := 0; i < tl+1-common.PACKET_LEN; i++ {
if start+common.PACKET_LEN > tl { // too few data to unpack, read more
return nil, common.PACKET_LEN, false
}
if string(data[start:start+4]) == common.RES_STR {
l := int(common.BytesToUint32([4]byte{data[start+8],
data[start+9], data[start+10], data[start+11]}))
total := l + common.PACKET_LEN
if total == tl { // data is what we want
return data, common.PACKET_LEN, true
} else if total < tl { // data[:total] is what we want, data[total:] is the more
a.in <- data[total:]
data = data[start:total]
return data, common.PACKET_LEN, true
} else { // ops! It won't be possible.
return nil, total - tl, false
}
} else { // flag was not found, move to next step
start++
}
}
return nil, common.PACKET_LEN, false
}
func (a *agent) read() (rel []byte, err error) {
var data []byte
ok := false
l := common.PACKET_LEN
for !ok {
inlen := len(a.in)
if inlen > 0 {
// in queue is not empty
for i := 0; i < inlen; i++ {
data = append(data, <-a.in...)
}
} else {
var d []byte
d, err = a.readData(l)
if err != nil {
return
}
data = append(data, d...)
}
rel, l, ok = a.unpack(data)
}
return
}
// Send a job to the job server.
func (a *agent) WriteJob(job *Job) {
a.out <- job
} }
// Internal write the encoded job. // Internal write the encoded job.
func (a *agent) write(outpack *outPack) (err error) { func (a *agent) write(buf []byte) (err error) {
var n int var n int
buf := outpack.Encode()
for i := 0; i < len(buf); i += n { for i := 0; i < len(buf); i += n {
n, err = a.rw.Write(buf[i:]) n, err = a.conn.Write(buf[i:])
if err != nil { if err != nil {
return err return err
} }
} }
return a.rw.Flush() return
}
// Write with lock
func (a *agent) Write(outpack *outPack) (err error) {
a.Lock()
defer a.Unlock()
return a.write(outpack)
} }

View File

@ -1,51 +0,0 @@
package worker
const (
Network = "tcp"
// queue size
queueSize = 8
// read buffer size
bufferSize = 1024
// min packet length
minPacketLength = 12
// \x00REQ
req = 5391697
reqStr = "\x00REQ"
// \x00RES
res = 5391699
resStr = "\x00RES"
// package data type
dtCanDo = 1
dtCantDo = 2
dtResetAbilities = 3
dtPreSleep = 4
dtNoop = 6
dtJobCreated = 8
dtGrabJob = 9
dtNoJob = 10
dtJobAssign = 11
dtWorkStatus = 12
dtWorkComplete = 13
dtWorkFail = 14
dtGetStatus = 15
dtEchoReq = 16
dtEchoRes = 17
dtError = 19
dtStatusRes = 20
dtSetClientId = 22
dtCanDoTimeout = 23
dtAllYours = 24
dtWorkException = 25
dtWorkData = 28
dtWorkWarning = 29
dtGrabJobUniq = 30
dtJobAssignUniq = 31
)
func getBuffer(l int) (buf []byte) {
// TODO add byte buffer pool
buf = make([]byte, l)
return
}

View File

@ -1,28 +0,0 @@
package worker
import (
"bytes"
"errors"
"fmt"
)
var (
ErrNoneAgents = errors.New("None active agents")
ErrNoneFuncs = errors.New("None functions")
ErrTimeOut = errors.New("Executing time out")
ErrUnknown = errors.New("Unknown error")
)
// Extract the error message
func getError(data []byte) (err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %v", data)
return
}
err = fmt.Errorf("%s: %s", rel[0], rel[1])
return
}
// An error handler
type ErrorHandler func(error)

View File

@ -1,56 +0,0 @@
package worker_test
import (
"fmt"
"github.com/mikespook/gearman-go/worker"
"sync"
)
func ExampleWorker() {
// An example of worker
w := worker.New(worker.Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(worker.Network, "127.0.0.1:4730"); err != nil {
fmt.Println(err)
return
}
// A function for handling jobs
foobar := func(job worker.Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job worker.Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
}

View File

@ -1,31 +1,17 @@
package worker package worker
import ( import (
"encoding/json"
"runtime" "runtime"
"encoding/json"
) )
// Job handler
type JobHandler func(Job) error
type JobFunc func(Job) ([]byte, error)
// The definition of the callback function.
type jobFunc struct {
f JobFunc
timeout uint32
}
// Map for added function.
type jobFuncs map[string]*jobFunc
type systemInfo struct { type systemInfo struct {
GOOS, GOARCH, GOROOT, Version string GOOS, GOARCH, GOROOT, Version string
NumCPU, NumGoroutine int NumCPU, NumGoroutine int
NumCgoCall int64 NumCgoCall int64
} }
func SysInfo(job Job) ([]byte, error) { func SysInfo(job *Job) ([]byte, error) {
return json.Marshal(&systemInfo{ return json.Marshal(&systemInfo{
GOOS: runtime.GOOS, GOOS: runtime.GOOS,
GOARCH: runtime.GOARCH, GOARCH: runtime.GOARCH,
@ -39,7 +25,7 @@ func SysInfo(job Job) ([]byte, error) {
var memState runtime.MemStats var memState runtime.MemStats
func MemInfo(job Job) ([]byte, error) { func MemInfo(job *Job) ([]byte, error) {
runtime.ReadMemStats(&memState) runtime.ReadMemStats(&memState)
return json.Marshal(&memState) return json.Marshal(&memState)
} }

View File

@ -1,126 +0,0 @@
package worker
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
)
// Worker side job
type inPack struct {
dataType uint32
data []byte
handle, uniqueId, fn string
a *agent
}
// Create a new job
func getInPack() *inPack {
return &inPack{}
}
func (inpack *inPack) Data() []byte {
return inpack.data
}
func (inpack *inPack) Fn() string {
return inpack.fn
}
func (inpack *inPack) Handle() string {
return inpack.handle
}
func (inpack *inPack) UniqueId() string {
return inpack.uniqueId
}
func (inpack *inPack) Err() error {
if inpack.dataType == dtError {
return getError(inpack.data)
}
return nil
}
// Send some datas to client.
// Using this in a job's executing.
func (inpack *inPack) SendData(data []byte) {
outpack := getOutPack()
outpack.dataType = dtWorkData
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
func (inpack *inPack) SendWarning(data []byte) {
outpack := getOutPack()
outpack.dataType = dtWorkWarning
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
// Update status.
// Tall client how many percent job has been executed.
func (inpack *inPack) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
outpack := getOutPack()
outpack.dataType = dtWorkStatus
hl := len(inpack.handle)
nl := len(n)
dl := len(d)
outpack.data = getBuffer(hl + nl + dl + 2)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], n)
copy(outpack.data[hl+nl+2:], d)
inpack.a.write(outpack)
}
// Decode job from byte slice
func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
if len(data) < minPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %v", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
if len(data) < (dl + minPacketLength) {
err = fmt.Errorf("Not enough data: %v", data)
return
}
dt := data[minPacketLength : dl+minPacketLength]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %v", data)
return
}
inpack = getInPack()
inpack.dataType = binary.BigEndian.Uint32(data[4:8])
switch inpack.dataType {
case dtJobAssign:
s := bytes.SplitN(dt, []byte{'\x00'}, 3)
if len(s) == 3 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.data = s[2]
}
case dtJobAssignUniq:
s := bytes.SplitN(dt, []byte{'\x00'}, 4)
if len(s) == 4 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.uniqueId = string(s[2])
inpack.data = s[3]
}
default:
inpack.data = dt
}
l = dl + minPacketLength
return
}

View File

@ -1,73 +0,0 @@
package worker
import (
"bytes"
"testing"
)
var (
inpackcases = map[uint32]map[string]string{
dtNoop: map[string]string{
"src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00",
},
dtNoJob: map[string]string{
"src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00",
},
dtJobAssign: map[string]string{
"src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz",
"handle": "a",
"fn": "b",
"data": "xyz",
},
dtJobAssignUniq: map[string]string{
"src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz",
"handle": "a",
"fn": "b",
"uid": "c",
"data": "xyz",
},
}
)
func TestInPack(t *testing.T) {
for k, v := range inpackcases {
inpack, _, err := decodeInPack([]byte(v["src"]))
if err != nil {
t.Error(err)
}
if inpack.dataType != k {
t.Errorf("DataType: %d expected, %d got.", k, inpack.dataType)
}
if handle, ok := v["handle"]; ok {
if inpack.handle != handle {
t.Errorf("Handle: %s expected, %s got.", handle, inpack.handle)
}
}
if fn, ok := v["fn"]; ok {
if inpack.fn != fn {
t.Errorf("FuncName: %s expected, %s got.", fn, inpack.fn)
}
}
if uid, ok := v["uid"]; ok {
if inpack.uniqueId != uid {
t.Errorf("UID: %s expected, %s got.", uid, inpack.uniqueId)
}
}
if data, ok := v["data"]; ok {
if bytes.Compare([]byte(data), inpack.data) != 0 {
t.Errorf("UID: %v expected, %v got.", data, inpack.data)
}
}
}
}
func BenchmarkDecode(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, v := range inpackcases {
_, _, err := decodeInPack([]byte(v["src"]))
if err != nil {
b.Error(err)
}
}
}
}

View File

@ -1,12 +1,134 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com>
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker package worker
type Job interface { import (
Err() error "bytes"
Data() []byte "strconv"
Fn() string "github.com/mikespook/gearman-go/common"
SendWarning(data []byte) )
SendData(data []byte)
UpdateStatus(numerator, denominator int) // Worker side job
Handle() string type Job struct {
UniqueId() string Data []byte
Handle, UniqueId, Fn string
agent *agent
magicCode, DataType uint32
c chan bool
}
// Create a new job
func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
return &Job{magicCode: magiccode,
DataType: datatype,
Data: data,
c: make(chan bool),}
}
// Decode job from byte slice
func decodeJob(data []byte) (job *Job, err error) {
if len(data) < 12 {
return nil, common.Errorf("Invalid data: %V", data)
}
datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) {
return nil, common.Errorf("Invalid data: %V", data)
}
data = data[12:]
job = &Job{magicCode: common.RES, DataType: datatype, c: make(chan bool),}
switch datatype {
case common.JOB_ASSIGN:
s := bytes.SplitN(data, []byte{'\x00'}, 3)
if len(s) == 3 {
job.Handle = string(s[0])
job.Fn = string(s[1])
data = s[2]
}
case common.JOB_ASSIGN_UNIQ:
s := bytes.SplitN(data, []byte{'\x00'}, 4)
if len(s) == 4 {
job.Handle = string(s[0])
job.Fn = string(s[1])
job.UniqueId = string(s[2])
data = s[3]
}
}
job.Data = data
return
}
// Encode a job to byte slice
func (job *Job) Encode() (data []byte) {
var l int
if job.DataType == common.WORK_FAIL {
l = len(job.Handle)
} else {
l = len(job.Data)
if job.Handle != "" {
l += len(job.Handle) + 1
}
}
data = make([]byte, 0, l + 12)
magiccode := common.Uint32ToBytes(job.magicCode)
datatype := common.Uint32ToBytes(job.DataType)
datalength := common.Uint32ToBytes(uint32(l))
data = append(data, magiccode[:]...)
data = append(data, datatype[:]...)
data = append(data, datalength[:]...)
if job.Handle != "" {
data = append(data, []byte(job.Handle)...)
if job.DataType != common.WORK_FAIL {
data = append(data, 0)
}
}
data = append(data, job.Data...)
return
}
// Send some datas to client.
// Using this in a job's executing.
func (job *Job) UpdateData(data []byte, iswarning bool) {
result := append([]byte(job.Handle), 0)
result = append(result, data...)
var datatype uint32
if iswarning {
datatype = common.WORK_WARNING
} else {
datatype = common.WORK_DATA
}
job.agent.WriteJob(newJob(common.REQ, datatype, result))
}
// Update status.
// Tall client how many percent job has been executed.
func (job *Job) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
result := append([]byte(job.Handle), '\x00')
result = append(result, n...)
result = append(result, '\x00')
result = append(result, d...)
job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result))
}
// close the job
func (job *Job) Close() {
close(job.c)
}
// cancel the job executing
func (job *Job) cancel() {
defer func() {recover()}()
job.c <- true
}
// When a job was canceled, return a true form a channel
func (job *Job) Canceled() <-chan bool {
return job.c
} }

View File

@ -1,47 +0,0 @@
package worker
import (
"encoding/binary"
)
// Worker side job
type outPack struct {
dataType uint32
data []byte
handle string
}
func getOutPack() (outpack *outPack) {
// TODO pool
return &outPack{}
}
// Encode a job to byte slice
func (outpack *outPack) Encode() (data []byte) {
var l int
if outpack.dataType == dtWorkFail {
l = len(outpack.handle)
} else {
l = len(outpack.data)
if outpack.handle != "" {
l += len(outpack.handle) + 1
}
}
data = getBuffer(l + minPacketLength)
binary.BigEndian.PutUint32(data[:4], req)
binary.BigEndian.PutUint32(data[4:8], outpack.dataType)
binary.BigEndian.PutUint32(data[8:minPacketLength], uint32(l))
i := minPacketLength
if outpack.handle != "" {
hi := len(outpack.handle) + i
copy(data[i:hi], []byte(outpack.handle))
if outpack.dataType != dtWorkFail {
data[hi] = '\x00'
}
i = hi + 1
}
if outpack.dataType != dtWorkFail {
copy(data[i:], outpack.data)
}
return
}

View File

@ -1,99 +0,0 @@
package worker
import (
"bytes"
"testing"
)
var (
outpackcases = map[uint32]map[string]string{
dtCanDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a",
"data": "a",
},
dtCanDoTimeout: map[string]string{
"src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01",
"data": "a\x00\x00\x00\x00\x01",
},
dtCantDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a",
"data": "a",
},
dtResetAbilities: map[string]string{
"src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00",
},
dtPreSleep: map[string]string{
"src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00",
},
dtGrabJob: map[string]string{
"src": "\x00REQ\x00\x00\x00\x09\x00\x00\x00\x00",
},
dtGrabJobUniq: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1E\x00\x00\x00\x00",
},
dtWorkData: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1C\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkWarning: map[string]string{
"src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkStatus: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100",
"data": "a\x0050\x00100",
},
dtWorkComplete: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtWorkFail: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a",
"handle": "a",
},
dtWorkException: map[string]string{
"src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b",
"data": "a\x00b",
},
dtSetClientId: map[string]string{
"src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a",
"data": "a",
},
dtAllYours: map[string]string{
"src": "\x00REQ\x00\x00\x00\x18\x00\x00\x00\x00",
},
}
)
func TestOutPack(t *testing.T) {
for k, v := range outpackcases {
outpack := getOutPack()
outpack.dataType = k
if handle, ok := v["handle"]; ok {
outpack.handle = handle
}
if data, ok := v["data"]; ok {
outpack.data = []byte(data)
}
data := outpack.Encode()
if bytes.Compare([]byte(v["src"]), data) != 0 {
t.Errorf("%d: %X expected, %X got.", k, v["src"], data)
}
}
}
func BenchmarkEncode(b *testing.B) {
for i := 0; i < b.N; i++ {
for k, v := range outpackcases {
outpack := getOutPack()
outpack.dataType = k
if handle, ok := v["handle"]; ok {
outpack.handle = handle
}
if data, ok := v["data"]; ok {
outpack.data = []byte(data)
}
outpack.Encode()
}
}
}

View File

@ -1,130 +1,149 @@
// The worker package helps developers to develop Gearman's worker // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
// in an easy way. // Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package worker package worker
import ( import (
"fmt"
"strconv"
"sync"
"time" "time"
"github.com/mikespook/gearman-go/common"
) )
const ( const (
Unlimited = iota Unlimited = 0
OneByOne OneByOne = 1
Immediately = 0
) )
// Worker is the only structure needed by worker side developing. var (
// It can connect to multi-server and grab jobs. ErrConnection = common.ErrConnection
type Worker struct { )
sync.Mutex // Job handler
agents []*agent type JobHandler func(*Job) error
funcs jobFuncs
in chan *inPack
running bool
ready bool
jobLeftNum int64
Id string type JobFunc func(*Job) ([]byte, error)
ErrorHandler ErrorHandler
JobHandler JobHandler // The definition of the callback function.
limit chan bool type jobFunc struct {
closed chan struct{} f JobFunc
leftJobs chan struct{} timeout uint32
} }
// New returns a worker. // Map for added function.
// type JobFuncs map[string]*jobFunc
// If limit is set to Unlimited(=0), the worker will grab all jobs
// and execute them parallelly. /*
// If limit is greater than zero, the number of paralled executing Worker side api for gearman
// jobs are limited under the number. If limit is assgined to
// OneByOne(=1), there will be only one job executed in a time. usage:
func New(limit int) (worker *Worker) { w = worker.New(worker.Unlimited)
w.AddFunction("foobar", foobar)
w.AddServer("127.0.0.1:4730")
w.Work() // Enter the worker's main loop
The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
It looks like this:
func foobar(job *Job) (data []byte, err os.Error) {
//sth. here
//plaplapla...
return
}
*/
type Worker struct {
agents []*agent
funcs JobFuncs
in chan *Job
running bool
limit chan bool
Id string
// assign a ErrFunc to handle errors
ErrHandler common.ErrorHandler
JobHandler JobHandler
}
// Get a new worker
func New(l int) (worker *Worker) {
worker = &Worker{ worker = &Worker{
agents: make([]*agent, 0, limit), agents: make([]*agent, 0),
funcs: make(jobFuncs), funcs: make(JobFuncs),
in: make(chan *inPack, queueSize), in: make(chan *Job, common.QUEUE_SIZE),
} }
if limit != Unlimited { if l != Unlimited {
worker.limit = make(chan bool, limit-1) worker.limit = make(chan bool, l)
} }
return return
} }
// inner error handling //
func (worker *Worker) err(e error) { func (worker *Worker)err(e error) {
if worker.ErrorHandler != nil { if worker.ErrHandler != nil {
worker.ErrorHandler(e) worker.ErrHandler(e)
} }
} }
// AddServer adds a Gearman job server. // Add a server. The addr should be 'host:port' format.
// // The connection is established at this time.
// addr should be formated as 'host:port'. func (worker *Worker) AddServer(addr string) (err error) {
func (worker *Worker) AddServer(net, addr string) (err error) {
// Create a new job server's client as a agent of server // Create a new job server's client as a agent of server
a, err := newAgent(net, addr, worker) server, err := newAgent(addr, worker)
if err != nil { if err != nil {
return err return err
} }
worker.agents = append(worker.agents, a) worker.agents = append(worker.agents, server)
return return
} }
// Broadcast an outpack to all Gearman server. // Write a job to job server.
func (worker *Worker) broadcast(outpack *outPack) { // Here, the job's mean is not the oraginal mean.
// Just looks like a network package for job's result or tell job server, there was a fail.
func (worker *Worker) broadcast(job *Job) {
for _, v := range worker.agents { for _, v := range worker.agents {
v.Write(outpack) v.WriteJob(job)
} }
} }
// AddFunc adds a function. // Add a function.
// Set timeout as Unlimited(=0) to disable executing timeout. // Plz added job servers first, then functions.
// The API will tell every connected job server that 'I can do this'
func (worker *Worker) AddFunc(funcname string, func (worker *Worker) AddFunc(funcname string,
f JobFunc, timeout uint32) (err error) { f JobFunc, timeout uint32) (err error) {
worker.Lock()
defer worker.Unlock()
if _, ok := worker.funcs[funcname]; ok { if _, ok := worker.funcs[funcname]; ok {
return fmt.Errorf("The function already exists: %s", funcname) return common.Errorf("The function already exists: %s", funcname)
} }
worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout} worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
if worker.running { if worker.running {
worker.addFunc(funcname, timeout) worker.addFunc(funcname, timeout)
} }
return return
} }
// inner add // inner add function
func (worker *Worker) addFunc(funcname string, timeout uint32) { func (worker *Worker) addFunc(funcname string, timeout uint32) {
outpack := prepFuncOutpack(funcname, timeout) var datatype uint32
worker.broadcast(outpack) var data []byte
}
func prepFuncOutpack(funcname string, timeout uint32) *outPack {
outpack := getOutPack()
if timeout == 0 { if timeout == 0 {
outpack.dataType = dtCanDo datatype = common.CAN_DO
outpack.data = []byte(funcname) data = []byte(funcname)
} else { } else {
outpack.dataType = dtCanDoTimeout datatype = common.CAN_DO_TIMEOUT
l := len(funcname) data = []byte(funcname + "\x00")
t := common.Uint32ToBytes(timeout)
timeoutString := strconv.FormatUint(uint64(timeout), 10) data = append(data, t[:]...)
outpack.data = getBuffer(l + len(timeoutString) + 1)
copy(outpack.data, []byte(funcname))
outpack.data[l] = '\x00'
copy(outpack.data[l+1:], []byte(timeoutString))
} }
return outpack job := newJob(common.REQ, datatype, data)
worker.broadcast(job)
} }
// RemoveFunc removes a function. // Remove a function.
// Tell job servers 'I can not do this now' at the same time.
func (worker *Worker) RemoveFunc(funcname string) (err error) { func (worker *Worker) RemoveFunc(funcname string) (err error) {
worker.Lock()
defer worker.Unlock()
if _, ok := worker.funcs[funcname]; !ok { if _, ok := worker.funcs[funcname]; !ok {
return fmt.Errorf("The function does not exist: %s", funcname) return common.Errorf("The function does not exist: %s", funcname)
} }
delete(worker.funcs, funcname) delete(worker.funcs, funcname)
if worker.running { if worker.running {
@ -133,273 +152,166 @@ func (worker *Worker) RemoveFunc(funcname string) (err error) {
return return
} }
// inner remove // inner remove function
func (worker *Worker) removeFunc(funcname string) { func (worker *Worker) removeFunc(funcname string) {
outpack := getOutPack() job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
outpack.dataType = dtCantDo worker.broadcast(job)
outpack.data = []byte(funcname)
worker.broadcast(outpack)
} }
// inner package handling func (worker *Worker) dealJob(job *Job) {
func (worker *Worker) handleInPack(inpack *inPack) {
switch inpack.dataType {
case dtNoJob:
inpack.a.PreSleep()
case dtNoop:
inpack.a.Grab()
case dtJobAssign, dtJobAssignUniq:
go func() {
go func() {
worker.incrExecJobNum()
defer func() { defer func() {
worker.decrExecJobNum() job.Close()
if worker.running && worker.limit != nil {
<-worker.limit
}
}() }()
if err := worker.exec(inpack); err != nil { switch job.DataType {
case common.ERROR:
_, err := common.GetError(job.Data)
worker.err(err)
case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
if err := worker.exec(job); err != nil {
worker.err(err) worker.err(err)
} }
}()
if worker.limit != nil {
worker.limit <- true
}
inpack.a.Grab()
}()
case dtError:
worker.err(inpack.Err())
fallthrough
case dtEchoRes:
fallthrough
default: default:
worker.customeHandler(inpack) worker.handleJob(job)
} }
} }
// Connect to Gearman server and tell every server // Main loop
// what can this worker do. func (worker *Worker) Work() {
func (worker *Worker) Ready() (err error) { defer func() {
if len(worker.agents) == 0 { for _, v := range worker.agents {
return ErrNoneAgents v.Close()
}
if len(worker.funcs) == 0 {
return ErrNoneFuncs
}
for _, a := range worker.agents {
if err = a.Connect(); err != nil {
return
} }
}()
worker.running = true
for _, v := range worker.agents {
go v.Work()
} }
for funcname, f := range worker.funcs { for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout) worker.addFunc(funcname, f.timeout)
} }
worker.ready = true ok := true
return for ok {
var job *Job
if job, ok = <-worker.in; ok {
go worker.dealJob(job)
}
}
} }
// Work start main loop (blocking) // job handler
// Most of time, this should be evaluated in goroutine. func (worker *Worker) handleJob(job *Job) {
func (worker *Worker) Work() {
if !worker.ready {
// didn't run Ready beforehand, so we'll have to do it:
err := worker.Ready()
if err != nil {
panic(err)
}
}
worker.Lock()
worker.running = true
worker.Unlock()
for _, a := range worker.agents {
a.Grab()
}
// 执行任务(阻塞)
var inpack *inPack
for inpack = range worker.in {
worker.handleInPack(inpack)
}
// 关闭Worker进程后 等待任务完成后退出
worker.Lock()
leftJobNum := int(worker.jobLeftNum)
worker.Unlock()
if worker.leftJobs != nil {
for i := 0; i < leftJobNum; i++ {
<-worker.leftJobs
}
}
worker.Reset()
worker.close()
}
// custome handling warper
func (worker *Worker) customeHandler(inpack *inPack) {
if worker.JobHandler != nil { if worker.JobHandler != nil {
if err := worker.JobHandler(inpack); err != nil { if err := worker.JobHandler(job); err != nil {
worker.err(err) worker.err(err)
} }
} }
} }
// Close connection and exit main loop // Close.
func (worker *Worker) Close() { func (worker *Worker) Close() {
worker.Lock()
defer worker.Unlock()
if worker.running == true && worker.closed == nil {
worker.closed = make(chan struct{}, 1)
worker.closed <- struct{}{}
worker.running = false worker.running = false
close(worker.in) close(worker.in)
// 创建关闭后执行中的任务列表 if worker.limit != nil {
if worker.jobLeftNum != 0 { close(worker.limit)
worker.leftJobs = make(chan struct{}, worker.jobLeftNum+int64(len(worker.in)))
}
} }
} }
func (worker *Worker) close() { // Send a something out, get the samething back.
for _, a := range worker.agents {
a.Close()
}
}
// Echo
func (worker *Worker) Echo(data []byte) { func (worker *Worker) Echo(data []byte) {
outpack := getOutPack() job := newJob(common.REQ, common.ECHO_REQ, data)
outpack.dataType = dtEchoReq worker.broadcast(job)
outpack.data = data
worker.broadcast(outpack)
} }
// Reset removes all of functions. // Remove all of functions.
// Both from the worker and job servers. // Both from the worker or job servers.
func (worker *Worker) Reset() { func (worker *Worker) Reset() {
outpack := getOutPack() job := newJob(common.REQ, common.RESET_ABILITIES, nil)
outpack.dataType = dtResetAbilities worker.broadcast(job)
worker.broadcast(outpack) worker.funcs = make(JobFuncs)
worker.funcs = make(jobFuncs)
} }
// Set the worker's unique id. // Set the worker's unique id.
func (worker *Worker) SetId(id string) { func (worker *Worker) SetId(id string) {
worker.Id = id worker.Id = id
outpack := getOutPack() job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id))
outpack.dataType = dtSetClientId worker.broadcast(job)
outpack.data = []byte(id)
worker.broadcast(outpack)
} }
func (worker *Worker) incrExecJobNum() int64 { // Execute the job. And send back the result.
worker.Lock() func (worker *Worker) exec(job *Job) (err error) {
defer worker.Unlock()
worker.jobLeftNum++
return worker.jobLeftNum
}
func (worker *Worker) decrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum--
if worker.jobLeftNum < 0 {
worker.jobLeftNum = 0
}
return worker.jobLeftNum
}
// inner job executing
func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() { defer func() {
if worker.limit != nil {
<-worker.limit
}
if r := recover(); r != nil { if r := recover(); r != nil {
if e, ok := r.(error); ok { if e, ok := r.(error); ok {
err = e err = e
} else { } else {
err = ErrUnknown err = common.ErrUnknown
} }
} }
}() } ()
f, ok := worker.funcs[inpack.fn] f, ok := worker.funcs[job.Fn]
if !ok { if !ok {
return fmt.Errorf("The function does not exist: %s", inpack.fn) return common.Errorf("The function does not exist: %s", job.Fn)
} }
var r *result var r *result
if f.timeout == 0 { if f.timeout == 0 {
d, e := f.f(inpack) d, e := f.f(job)
r = &result{data: d, err: e} r = &result{data:d, err: e}
} else { } else {
r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second) r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second)
} }
//if worker.running { var datatype uint32
outpack := getOutPack()
if r.err == nil { if r.err == nil {
outpack.dataType = dtWorkComplete datatype = common.WORK_COMPLETE
} else { } else {
if len(r.data) == 0 { if r.data == nil {
outpack.dataType = dtWorkFail datatype = common.WORK_FAIL
} else { } else {
outpack.dataType = dtWorkException datatype = common.WORK_EXCEPTION
} }
err = r.err err = r.err
} }
outpack.handle = inpack.handle
outpack.data = r.data job.magicCode = common.REQ
_ = inpack.a.Write(outpack) job.DataType = datatype
if worker.leftJobs != nil { job.Data = r.data
worker.leftJobs <- struct{}{} if worker.running {
job.agent.WriteJob(job)
} }
//}
return return
} }
func (worker *Worker) reRegisterFuncsForAgent(a *agent) {
worker.Lock()
defer worker.Unlock()
for funcname, f := range worker.funcs {
outpack := prepFuncOutpack(funcname, f.timeout)
a.write(outpack)
}
func (worker *Worker) removeAgent(a *agent) {
for k, v := range worker.agents {
if v == a {
worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...)
}
}
if len(worker.agents) == 0 {
worker.err(common.ErrNoActiveAgent)
}
} }
// inner result
type result struct { type result struct {
data []byte data []byte
err error err error
} }
// executing timer func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) {
func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
rslt := make(chan *result) rslt := make(chan *result)
defer close(rslt) defer close(rslt)
go func() { go func() {
defer func() { recover() }() defer func() {recover()}()
d, e := f(job) d, e := f(job)
rslt <- &result{data: d, err: e} rslt <- &result{data: d, err: e}
}() }()
select { select {
case r = <-rslt: case r = <-rslt:
case <-time.After(timeout): case <-time.After(timeout):
return &result{err: ErrTimeOut} go job.cancel()
return &result{err:common.ErrTimeOut}
} }
return r return r
} }
// Error type passed when a worker connection disconnects
type WorkerDisconnectError struct {
err error
agent *agent
}
func (e *WorkerDisconnectError) Error() string {
return e.err.Error()
}
// Responds to the error by asking the worker to reconnect
func (e *WorkerDisconnectError) Reconnect() (err error) {
return e.agent.reconnect()
}
// Which server was this for?
func (e *WorkerDisconnectError) Server() (net string, addr string) {
return e.agent.net, e.agent.addr
}

View File

@ -1,59 +0,0 @@
package worker
import (
"fmt"
"sync"
"testing"
)
func TestWorkerRace(t *testing.T) {
// from example worker
// An example of worker
w := New(Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
// A function for handling jobs
foobar := func(job Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
// tear down
w.Close()
}

View File

@ -1,48 +1,16 @@
package worker package worker
import ( import "testing"
"bytes"
"flag"
"os"
"sync"
"testing"
"time"
)
var ( var worker *Worker
worker *Worker
runIntegrationTests bool
)
func init() { func init() {
worker = New(Unlimited) worker = New(Unlimited)
} }
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
code := m.Run()
os.Exit(code)
}
func TestWorkerErrNoneAgents(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
err := worker.Ready()
if err != ErrNoneAgents {
t.Error("ErrNoneAgents expected.")
}
}
func TestWorkerAddServer(t *testing.T) { func TestWorkerAddServer(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add local server 127.0.0.1:4730.") t.Log("Add local server 127.0.0.1:4730.")
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil { if err := worker.AddServer("127.0.0.1:4730"); err != nil {
t.Error(err) t.Error(err)
} }
@ -52,24 +20,11 @@ func TestWorkerAddServer(t *testing.T) {
} }
} }
func TestWorkerErrNoneFuncs(t *testing.T) { func foobar(job *Job) ([]byte, error) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
err := worker.Ready()
if err != ErrNoneFuncs {
t.Error("ErrNoneFuncs expected.")
}
}
func foobar(job Job) ([]byte, error) {
return nil, nil return nil, nil
} }
func TestWorkerAddFunction(t *testing.T) { func TestWorkerAddFunction(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := worker.AddFunc("foobar", foobar, 0); err != nil { if err := worker.AddFunc("foobar", foobar, 0); err != nil {
t.Error(err) t.Error(err)
} }
@ -83,187 +38,7 @@ func TestWorkerAddFunction(t *testing.T) {
} }
func TestWorkerRemoveFunc(t *testing.T) { func TestWorkerRemoveFunc(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := worker.RemoveFunc("foobar"); err != nil { if err := worker.RemoveFunc("foobar"); err != nil {
t.Error(err) t.Error(err)
} }
} }
func TestWork(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
var wg sync.WaitGroup
worker.JobHandler = func(job Job) error {
t.Logf("%s", job.Data())
wg.Done()
return nil
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
worker.Echo([]byte("Hello"))
wg.Wait()
}
func TestLargeDataWork(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
worker := New(Unlimited)
defer worker.Close()
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
worker.Ready()
l := 5714
var wg sync.WaitGroup
bigdataHandler := func(job Job) error {
defer wg.Done()
if len(job.Data()) != l {
t.Errorf("expected length %d. got %d.", l, len(job.Data()))
}
return nil
}
if err := worker.AddFunc("bigdata", foobar, 0); err != nil {
defer wg.Done()
t.Error(err)
}
worker.JobHandler = bigdataHandler
worker.ErrorHandler = func(err error) {
t.Fatal("shouldn't have received an error")
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
// var cli *client.Client
// var err error
// if cli, err = client.New(client.Network, "127.0.0.1:4730"); err != nil {
// t.Fatal(err)
// }
// cli.ErrorHandler = func(e error) {
// t.Error(e)
// }
// _, err = cli.Do("bigdata", bytes.Repeat([]byte("a"), l), client.JobLow, func(res *client.Response) {
// })
// if err != nil {
// t.Error(err)
// }
worker.Echo(bytes.Repeat([]byte("a"), l))
wg.Wait()
}
func TestWorkerClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
worker.Close()
}
func TestWorkWithoutReady(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
other_worker := New(Unlimited)
if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
if err := other_worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil {
t.Error(err)
}
timeout := make(chan bool, 1)
done := make(chan bool, 1)
other_worker.JobHandler = func(j Job) error {
if !other_worker.ready {
t.Error("Worker not ready as expected")
}
done <- true
return nil
}
go func() {
time.Sleep(5 * time.Second)
timeout <- true
}()
go func() {
other_worker.Work()
}()
// With the all-in-one Work() we don't know if the
// worker is ready at this stage so we may have to wait a sec:
go func() {
tries := 5
for tries > 0 {
if other_worker.ready {
other_worker.Echo([]byte("Hello"))
break
}
// still waiting for it to be ready..
time.Sleep(250 * time.Millisecond)
tries--
}
}()
// determine if we've finished or timed out:
select {
case <-timeout:
t.Error("Test timed out waiting for the worker")
case <-done:
}
}
func TestWorkWithoutReadyWithPanic(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
other_worker := New(Unlimited)
timeout := make(chan bool, 1)
done := make(chan bool, 1)
// Going to work with no worker setup.
// when Work (hopefully) calls Ready it will get an error which should cause it to panic()
go func() {
defer func() {
if err := recover(); err != nil {
done <- true
return
}
t.Error("Work should raise a panic.")
done <- true
}()
other_worker.Work()
}()
go func() {
time.Sleep(2 * time.Second)
timeout <- true
}()
select {
case <-timeout:
t.Error("Test timed out waiting for the worker")
case <-done:
}
}