Compare commits

...

67 Commits

Author SHA1 Message Date
abd6ce9be3 调用Close后不再向JobServer获取任务 2023-02-13 01:01:37 +08:00
97187ecbf9 等待执行中的任务完成后再退出进程 2023-02-13 00:29:22 +08:00
Xing
2a518e8661
Merge pull request #96 from floringavrila/add-support-for-own-ids
add support for own ids (coalescing)
2022-05-20 15:14:03 +12:00
gavrila florin
fa71d7a37a add support for own ids 2022-05-04 00:04:38 +03:00
Xing
d36dcb7fc2
Merge pull request #94 from cameronpm/client_race
Fix two race conditions in client.Do
2021-05-03 16:56:33 +12:00
Paul Cameron
a8f0a04c3d Fix two race conditions in client.Do
* Common race condition is fixed by identifying that Client.respHandler
  can be completely removed since all respHandler operations (get, put
  and invocation) can be moved into the Client.processLoop goroutine,
  meaning that zero locking is required. This race condition resulted
  in a deadlock that was resolved by the response timeout at the
  end of client.Do, returning ErrLostConn

* Rare race condition is fixed by changing responseHandlerMap.get
  to .getAndRemove. This race condition resulted in the innerHandler
  for a new dtJobCreated assigned in client.Do overriding a stale older
  dtJobCreated request, and the newer innerHandler being removed by an
  older dtJobCreated in client.processLoop > client.handleInner. When
  the newer dtJobCreated response was received, the handler for it had
  already been deleted. This was resolved by the response timeout at the
  end of client.Do, returning ErrLostConn
2021-05-02 11:31:19 +10:00
galih rivanto
81d00aa9ce add synchronization to prevent race condition on worker shutdown (#89)
* add synchronization to prevent race condition on worker shutdown

* fix incorrect synchronization and add race unit test

* fix race while broadcasting outpack
2019-05-28 18:41:20 +08:00
Christoffer Fjellström
b902646ce8
Merge pull request #87 from CodeLingoBot/rewrite
Fix function comments based on best practices from Effective Go
2019-02-28 11:28:24 +01:00
CodeLingo Bot
133dd3716f Fix function comments based on best practices from Effective Go
Signed-off-by: CodeLingo Bot <bot@codelingo.io>
2019-02-28 02:11:17 +00:00
Christoffer Fjellström
f333ba6102 Fix job register with timeout set
Fixed issue where timeout is sent as binary encoded uint but server
expects a string
2019-02-13 11:45:35 +01:00
Md. Ashiquzzaman
d0e6ec4878 Fix Odd use of time.Duration (#29)
Ref - https://github.com/appscode/g2/issues/28
2018-11-21 17:16:29 +01:00
Sadlil Rhythom
f2f0349d2d Remove lastcall. 2018-11-21 17:00:42 +01:00
Sadlil Rhythom
5c665de2d6 fix worker reconnect errors, send cando after reconnection.
* fix worker reconnect errors, send cando after reconnection.
2018-11-19 17:10:05 +01:00
Christoffer Fjellström
2fba865e37 Merge pull request #81 from Detectify/master
Fix race condition in client.Do()
2017-10-18 14:34:29 +02:00
Christoffer Fjellström
0ca6dc2c6f Remove duplicate locking
Fixes duplicated code after merge
2017-09-30 09:30:29 +02:00
No-ops
7b6215604e Merge remote-tracking branch 'upstream/master' 2017-09-29 10:51:16 +02:00
No-ops
8a7ac5b450 Merge branch 'develop' 2017-09-27 16:57:19 +02:00
No-ops
a2eb7ba410 Remove unused imports 2017-09-27 16:57:03 +02:00
No-ops
53bb686b76 Merge branch 'develop' 2017-09-27 16:51:18 +02:00
Christoffer Fjellström
e701107e40 Remove broken test suite
OS and configuration specific integration test
2017-09-27 16:42:48 +02:00
Christoffer Fjellström
99d317427f Add skipping for integration tests when running unit tests
Run integration tests with the -integration flag
2017-09-27 16:42:21 +02:00
Sadlil Rhythom
d8cb3883ad Merge pull request #79 from John-Lin/fixed-typo
fixed typo on WorkComplete
2017-09-27 16:59:04 +06:00
Christoffer Fjellström
d9ad23413d Merge pull request #1 from Detectify/develop
Add lock on job handler assignment
2017-09-27 12:00:00 +02:00
Christoffer Fjellström
2dbf199260 Add lock on job handler assignment
Fixes race condition on jobs being done before handler is set.
2017-09-27 11:57:57 +02:00
John-Lin
e1224a8c20 fixed typo and keep WorkComplate for downward compatibility 2017-09-27 13:19:08 +08:00
Christoffer Fjellström
9735b2e54f Merge pull request #75 from No-ops/patch-1
Add sync lock to create job functions
2017-03-02 13:40:41 +01:00
Christoffer Fjellström
27942f55cd Add sync lock on do() to avoid race conditions when creating jobs 2017-02-03 10:06:27 +01:00
Christoffer Fjellström
d6791e2e42 Add sync lock to create job functions
Add sync lock to make create job calls thread safe.
2017-01-16 16:44:48 +01:00
Xing
b79fee2965 Merge pull request #70 from JessonChan/master
concurrent map bug fixed #70
2016-05-06 22:33:03 +12:00
JessonChan
dd82e211a3 concurrent map bug fixed 2016-05-06 18:00:58 +08:00
Xing
68777318f9 Merge pull request #65 from JessonChan/master
fixed #65
2016-04-24 11:03:26 +12:00
JessonChan
99c8032384 fix a bug when high qps 2016-04-20 12:54:21 +08:00
Xing
21cc8de64f Merge pull request #63 from kujohn/pool-custom-handler
Allow built in selectionHandlers to be reused outside of package
2015-12-14 18:41:29 +08:00
John Ku
9d99accce2 Allow built in selectionHandlers to be reused outside of package 2015-12-11 13:48:02 -08:00
Xing
6d9b2fba51 Merge pull request #62 from kujohn/pool-custom-handler
Export SelectionHandler
2015-12-11 12:44:35 +08:00
John Ku
1c4b8aa000 Export pool.clients for custom Pool instantiation 2015-12-10 13:54:48 -08:00
John Ku
d20c3c7bd1 Allow custom Pool without constructor 2015-12-10 11:19:04 -08:00
Xing Xing
c6c6c9cac2 don't use sigillum 2015-07-10 20:30:35 +08:00
Xing
503d523dbf Merge pull request #60 from micmac/master
Replace mutex in client.do() with a channel to avoid deadlock and int…
2015-07-10 20:24:16 +08:00
Endre Hirling
c615e74af8 Replace mutex in client.do() with a channel to avoid deadlock and introduce command timeout 2015-07-06 21:40:49 +02:00
Xing Xing
df1af4f8cb 80 chars/line 2015-01-20 10:28:36 +08:00
Xing Xing
b612b80f80 complete the list of contributors 2015-01-20 10:27:12 +08:00
Xing Xing
2c9d82830c merge, update README to the newest API design. 2015-01-20 09:43:54 +08:00
C.R. Kirkwood-Watts
37db439a4a Update README.md
Updated to reflect new type names.
2015-01-19 10:28:21 -08:00
Xing Xing
bc80b2f853 fixed signal method 2015-01-16 17:41:19 +08:00
Xing Xing
939189448e golib/signal upgrade 2015-01-15 10:28:21 +08:00
Xing Xing
9dbb3ea3fc added lock-write to co-ordinate package sequence #56 2015-01-15 10:26:53 +08:00
Xing Xing
c01a2e22c0 upgrade travis to Go 1.4 2015-01-10 00:09:38 +08:00
Xing Xing
ad9b3cb988 go fmt 2015-01-06 11:45:18 +08:00
Xing Xing
a003eac543 fixed the closing method 2015-01-06 11:34:39 +08:00
Xing Xing
d32eb195e1 Merge branch 'kdar-big-data' 2014-12-10 09:14:08 +08:00
Xing Xing
25735c8488 merge for issue #33 2014-12-10 09:13:49 +08:00
Xing
ea92c122a5 Merge pull request #50 from AzuraMeta/patch-1
Fixed WORK_FAIL needing 2 arguments
2014-08-21 16:32:24 +08:00
Gabriel Cristian Alecu
e005ea4442 Fixed WORK_FAIL needing 2 arguments
According to http://gearman.org/protocol/ , WORK_FAIL only has 1 argument: the handle
2014-08-21 10:41:23 +03:00
Xing
9387a36a0b Merge pull request #49 from gbarr/write-errors
Do not ignore write errors
2014-08-21 09:24:18 +08:00
Graham Barr
7bcc4de76f Do not ignore write errors 2014-08-20 11:27:32 -05:00
Xing
ccb6f4a24f Merge pull request #48 from gbarr/large-packet-read
Avoid read channel corruption when response size > bufferSize
2014-08-19 17:11:42 +08:00
Graham Barr
d82da8fd71 Avoid read channel corruption when response size > bufferSize
When receiving a response, what was happening

1. Read bufferSize and it gets assigned to leftdata
2. Read another bufferSize
3. 2 buffers get appended, but leftdata still points to first buffer
4. Process data buffer which contains only complete responses
5. Back to ReadLoop, but leftdata still points to first incomplete buffer
    causing corrupt data to be processed

Solution is to make leftdata nil once we have merged it with the second buffer
2014-08-18 12:35:31 -05:00
Graham Barr
49ea8c0ec1 Increase buffer size to 8K 2014-08-18 09:51:33 -05:00
Xing
1f303d8145 Merge pull request #47 from gbarr/memleak
When a job completes, remove its handler
2014-07-25 09:46:21 +08:00
Graham Barr
3e556edb2d When a job completes, remove its handler 2014-07-24 14:20:56 -05:00
Xing Xing
d40888817d Merge branch 'master' into 0.2-dev 2014-06-16 11:00:20 +08:00
Xing
dde0c3e9b3 Merge pull request #45 from draxil/worker_disconnect_testing
Allow reconnect from error handler
2014-06-16 10:58:57 +08:00
Joe Higton
97731e1774 FIX: EOF disconnect error also called raw handler afterwards 2014-06-10 04:09:27 +01:00
Joe Higton
09c626f488 Cope with io.EOF as a disconnect error 2014-06-10 03:46:21 +01:00
Joe Higton
1ebb3d5fcc Wrap disconnect errors and allow reconnect 2014-06-10 03:23:18 +01:00
Kevin Darlington
de91c999f7 Changed agent.read to handle big data. 2014-03-08 19:22:14 -05:00
20 changed files with 977 additions and 426 deletions

View File

@ -1,6 +1,6 @@
language: go language: go
go: go:
- 1.2 - 1.4
before_install: before_install:
- sudo apt-get remove -y gearman-job-server - sudo apt-get remove -y gearman-job-server

View File

@ -37,7 +37,9 @@ Usage
## Worker ## Worker
// Limit number of concurrent jobs execution. Use worker.Unlimited (0) if you want no limitation. ```go
// Limit number of concurrent jobs execution.
// Use worker.Unlimited (0) if you want no limitation.
w := worker.New(worker.OneByOne) w := worker.New(worker.OneByOne)
w.ErrHandler = func(e error) { w.ErrHandler = func(e error) {
log.Println(e) log.Println(e)
@ -47,15 +49,17 @@ Usage
w.AddFunc("ToUpper", ToUpper, worker.Unlimited) w.AddFunc("ToUpper", ToUpper, worker.Unlimited)
// This will give a timeout of 5 seconds // This will give a timeout of 5 seconds
w.AddFunc("ToUpperTimeOut5", ToUpper, 5) w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
if err := w.Ready(); err != nil { if err := w.Ready(); err != nil {
log.Fatal(err) log.Fatal(err)
return return
} }
go w.Work() go w.Work()
```
## Client ## Client
```go
// ... // ...
c, err := client.New("tcp4", "127.0.0.1:4730") c, err := client.New("tcp4", "127.0.0.1:4730")
// ... error handling // ... error handling
@ -67,11 +71,12 @@ Usage
echomsg, err := c.Echo(echo) echomsg, err := c.Echo(echo)
// ... error handling // ... error handling
log.Println(string(echomsg)) log.Println(string(echomsg))
jobHandler := func(job *client.Job) { jobHandler := func(resp *client.Response) {
log.Printf("%s", job.Data) log.Printf("%s", resp.Data)
} }
handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler)
// ... // ...
```
Branches Branches
======== ========
@ -87,10 +92,15 @@ __Use at your own risk!__
Contributors Contributors
============ ============
Great thanks to all of you for your support and interest!
(_Alphabetic order_) (_Alphabetic order_)
* [Alex Zylman](https://github.com/azylman) * [Alex Zylman](https://github.com/azylman)
* [C.R. Kirkwood-Watts](https://github.com/kirkwood)
* [Damian Gryski](https://github.com/dgryski) * [Damian Gryski](https://github.com/dgryski)
* [Gabriel Cristian Alecu](https://github.com/AzuraMeta)
* [Graham Barr](https://github.com/gbarr)
* [Ingo Oeser](https://github.com/nightlyone) * [Ingo Oeser](https://github.com/nightlyone)
* [jake](https://github.com/jbaikge) * [jake](https://github.com/jbaikge)
* [Joe Higton](https://github.com/draxil) * [Joe Higton](https://github.com/draxil)
@ -98,9 +108,13 @@ Contributors
* [Kevin Darlington](https://github.com/kdar) * [Kevin Darlington](https://github.com/kdar)
* [miraclesu](https://github.com/miraclesu) * [miraclesu](https://github.com/miraclesu)
* [Paul Mach](https://github.com/paulmach) * [Paul Mach](https://github.com/paulmach)
* [Randall McPherson](https://github.com/rlmcpherson)
* [Sam Grimee](https://github.com/sgrimee) * [Sam Grimee](https://github.com/sgrimee)
* suchj
* [Xing Xing](http://mikespook.com) <mikespook@gmail.com> [@Twitter](http://twitter.com/mikespook) Maintainer
==========
* [Xing Xing](http://mikespook.com) &lt;<mikespook@gmail.com>&gt; [@Twitter](http://twitter.com/mikespook)
Open Source - MIT Software License Open Source - MIT Software License
================================== ==================================

View File

@ -6,6 +6,11 @@ import (
"bufio" "bufio"
"net" "net"
"sync" "sync"
"time"
)
var (
DefaultTimeout time.Duration = time.Second
) )
// One client connect to one server. // One client connect to one server.
@ -13,24 +18,63 @@ import (
type Client struct { type Client struct {
sync.Mutex sync.Mutex
net, addr, lastcall string net, addr string
respHandler map[string]ResponseHandler innerHandler *responseHandlerMap
innerHandler map[string]ResponseHandler
in chan *Response in chan *Response
conn net.Conn conn net.Conn
rw *bufio.ReadWriter rw *bufio.ReadWriter
ResponseTimeout time.Duration // response timeout for do()
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
} }
// Return a client. type responseHandlerMap struct {
sync.Mutex
holder map[string]handledResponse
}
type handledResponse struct {
internal ResponseHandler // internal handler, always non-nil
external ResponseHandler // handler passed in from (*Client).Do, sometimes nil
}
func newResponseHandlerMap() *responseHandlerMap {
return &responseHandlerMap{holder: make(map[string]handledResponse, queueSize)}
}
func (r *responseHandlerMap) remove(key string) {
r.Lock()
delete(r.holder, key)
r.Unlock()
}
func (r *responseHandlerMap) getAndRemove(key string) (handledResponse, bool) {
r.Lock()
rh, b := r.holder[key]
delete(r.holder, key)
r.Unlock()
return rh, b
}
func (r *responseHandlerMap) putWithExternalHandler(key string, internal, external ResponseHandler) {
r.Lock()
r.holder[key] = handledResponse{internal: internal, external: external}
r.Unlock()
}
func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
r.putWithExternalHandler(key, rh, nil)
}
// New returns a client.
func New(network, addr string) (client *Client, err error) { func New(network, addr string) (client *Client, err error) {
client = &Client{ client = &Client{
net: network, net: network,
addr: addr, addr: addr,
respHandler: make(map[string]ResponseHandler, queueSize), innerHandler: newResponseHandlerMap(),
innerHandler: make(map[string]ResponseHandler, queueSize),
in: make(chan *Response, queueSize), in: make(chan *Response, queueSize),
ResponseTimeout: DefaultTimeout,
} }
client.conn, err = net.Dial(client.net, client.addr) client.conn, err = net.Dial(client.net, client.addr)
if err != nil { if err != nil {
@ -104,6 +148,7 @@ ReadLoop:
} }
if len(leftdata) > 0 { // some data left for processing if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...) data = append(leftdata, data...)
leftdata = nil
} }
for { for {
l := len(data) l := len(data)
@ -127,27 +172,25 @@ ReadLoop:
} }
func (client *Client) processLoop() { func (client *Client) processLoop() {
rhandlers := map[string]ResponseHandler{}
for resp := range client.in { for resp := range client.in {
switch resp.DataType { switch resp.DataType {
case dtError: case dtError:
if client.lastcall != "" {
resp = client.handleInner(client.lastcall, resp)
client.lastcall = ""
} else {
client.err(getError(resp.Data)) client.err(getError(resp.Data))
}
case dtStatusRes: case dtStatusRes:
resp = client.handleInner("s"+resp.Handle, resp) client.handleInner("s"+resp.Handle, resp, nil)
case dtJobCreated: case dtJobCreated:
resp = client.handleInner("c", resp) client.handleInner("c", resp, rhandlers)
case dtEchoRes: case dtEchoRes:
resp = client.handleInner("e", resp) client.handleInner("e", resp, nil)
case dtWorkData, dtWorkWarning, dtWorkStatus: case dtWorkData, dtWorkWarning, dtWorkStatus:
resp = client.handleResponse(resp.Handle, resp) if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
}
case dtWorkComplete, dtWorkFail, dtWorkException: case dtWorkComplete, dtWorkFail, dtWorkException:
resp = client.handleResponse(resp.Handle, resp) if cb := rhandlers[resp.Handle]; cb != nil {
if resp != nil { cb(resp)
delete(client.respHandler, resp.Handle) delete(rhandlers, resp.Handle)
} }
} }
} }
@ -159,44 +202,54 @@ func (client *Client) err(e error) {
} }
} }
func (client *Client) handleResponse(key string, resp *Response) *Response { func (client *Client) handleInner(key string, resp *Response, rhandlers map[string]ResponseHandler) {
if h, ok := client.respHandler[key]; ok { if h, ok := client.innerHandler.getAndRemove(key); ok {
h(resp) if h.external != nil && resp.Handle != "" {
return nil rhandlers[resp.Handle] = h.external
}
h.internal(resp)
} }
return resp
} }
func (client *Client) handleInner(key string, resp *Response) *Response { type handleOrError struct {
if h, ok := client.innerHandler[key]; ok { handle string
h(resp) err error
delete(client.innerHandler, key)
return nil
}
return resp
} }
func (client *Client) do(funcname string, data []byte, func (client *Client) do(funcname string, data []byte,
flag uint32) (handle string, err error) { flag uint32, h ResponseHandler, id string) (handle string, err error) {
if len(id) == 0 {
return "", ErrInvalidId
}
if client.conn == nil { if client.conn == nil {
return "", ErrLostConn return "", ErrLostConn
} }
var mutex sync.Mutex var result = make(chan handleOrError, 1)
mutex.Lock() client.Lock()
client.lastcall = "c" defer client.Unlock()
client.innerHandler["c"] = func(resp *Response) { client.innerHandler.putWithExternalHandler("c", func(resp *Response) {
defer mutex.Unlock()
if resp.DataType == dtError { if resp.DataType == dtError {
err = getError(resp.Data) err = getError(resp.Data)
result <- handleOrError{"", err}
return return
} }
handle = resp.Handle handle = resp.Handle
} result <- handleOrError{handle, nil}
id := IdGen.Id() }, h)
req := getJob(id, []byte(funcname), data) req := getJob(id, []byte(funcname), data)
req.DataType = flag req.DataType = flag
client.write(req) if err = client.write(req); err != nil {
mutex.Lock() client.innerHandler.remove("c")
return
}
var timer = time.After(client.ResponseTimeout)
select {
case ret := <-result:
return ret.handle, ret.err
case <-timer:
client.innerHandler.remove("c")
return "", ErrLostConn
}
return return
} }
@ -204,19 +257,7 @@ func (client *Client) do(funcname string, data []byte,
// flag can be set to: JobLow, JobNormal and JobHigh // flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) Do(funcname string, data []byte, func (client *Client) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (handle string, err error) { flag byte, h ResponseHandler) (handle string, err error) {
var datatype uint32 handle, err = client.DoWithId(funcname, data, flag, h, IdGen.Id())
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype)
if err == nil && h != nil {
client.respHandler[handle] = h
}
return return
} }
@ -224,38 +265,25 @@ func (client *Client) Do(funcname string, data []byte,
// flag can be set to: JobLow, JobNormal and JobHigh // flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBg(funcname string, data []byte, func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) { flag byte) (handle string, err error) {
if client.conn == nil { handle, err = client.DoBgWithId(funcname, data, flag, IdGen.Id())
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype)
return return
} }
// Get job status from job server. // Status gets job status from job server.
func (client *Client) Status(handle string) (status *Status, err error) { func (client *Client) Status(handle string) (status *Status, err error) {
if client.conn == nil { if client.conn == nil {
return nil, ErrLostConn return nil, ErrLostConn
} }
var mutex sync.Mutex var mutex sync.Mutex
mutex.Lock() mutex.Lock()
client.lastcall = "s" + handle client.innerHandler.put("s"+handle, func(resp *Response) {
client.innerHandler["s"+handle] = func(resp *Response) {
defer mutex.Unlock() defer mutex.Unlock()
var err error var err error
status, err = resp._status() status, err = resp._status()
if err != nil { if err != nil {
client.err(err) client.err(err)
} }
} })
req := getRequest() req := getRequest()
req.DataType = dtGetStatus req.DataType = dtGetStatus
req.Data = []byte(handle) req.Data = []byte(handle)
@ -271,14 +299,13 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) {
} }
var mutex sync.Mutex var mutex sync.Mutex
mutex.Lock() mutex.Lock()
client.innerHandler["e"] = func(resp *Response) { client.innerHandler.put("e", func(resp *Response) {
echo = resp.Data echo = resp.Data
mutex.Unlock() mutex.Unlock()
} })
req := getRequest() req := getRequest()
req.DataType = dtEchoReq req.DataType = dtEchoReq
req.Data = data req.Data = data
client.lastcall = "e"
client.write(req) client.write(req)
mutex.Lock() mutex.Lock()
return return
@ -294,3 +321,40 @@ func (client *Client) Close() (err error) {
} }
return return
} }
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoWithId(funcname string, data []byte,
flag byte, h ResponseHandler, id string) (handle string, err error) {
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype, h, id)
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBgWithId(funcname string, data []byte,
flag byte, id string) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype, nil, id)
return
}

View File

@ -1,16 +1,39 @@
package client package client
import ( import (
"crypto/md5"
"encoding/hex"
"errors"
"flag"
"fmt"
"os"
"testing" "testing"
"time"
) )
const ( const (
TestStr = "Hello world" TestStr = "Hello world"
) )
var client *Client var (
client *Client
runIntegrationTests bool
)
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
flag.Parse()
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
code := m.Run()
os.Exit(code)
}
func TestClientAddServer(t *testing.T) { func TestClientAddServer(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add local server 127.0.0.1:4730") t.Log("Add local server 127.0.0.1:4730")
var err error var err error
if client, err = New(Network, "127.0.0.1:4730"); err != nil { if client, err = New(Network, "127.0.0.1:4730"); err != nil {
@ -22,6 +45,9 @@ func TestClientAddServer(t *testing.T) {
} }
func TestClientEcho(t *testing.T) { func TestClientEcho(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
echo, err := client.Echo([]byte(TestStr)) echo, err := client.Echo([]byte(TestStr))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -34,6 +60,9 @@ func TestClientEcho(t *testing.T) {
} }
func TestClientDoBg(t *testing.T) { func TestClientDoBg(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
handle, err := client.DoBg("ToUpper", []byte("abcdef"), JobLow) handle, err := client.DoBg("ToUpper", []byte("abcdef"), JobLow)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -46,7 +75,46 @@ func TestClientDoBg(t *testing.T) {
} }
} }
func TestClientDoBgWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
id := ""
_, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDo(t *testing.T) { func TestClientDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) { jobHandler := func(job *Response) {
str := string(job.Data) str := string(job.Data)
if str == "ABCDEF" { if str == "ABCDEF" {
@ -69,7 +137,202 @@ func TestClientDo(t *testing.T) {
} }
} }
func TestClientDoWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
id := ""
_, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDoWithIdCheckSameHandle(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123,categoryId:1}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 != handle2 {
t.Error("expecting the same handle when using the same id on the same Job name")
}
}
func TestClientDoWithIdCheckDifferentHandleOnDifferentJobs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("DeleteProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 == handle2 {
t.Error("expecting different handles because there are different job names")
}
}
func TestClientMultiDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
// This integration test requires that examples/pl/worker_multi.pl be running.
//
// Test invocation is:
// go test -integration -timeout 10s -run '^TestClient(AddServer|MultiDo)$'
//
// Send 1000 requests to go through all race conditions
const nreqs = 1000
errCh := make(chan error)
gotCh := make(chan string, nreqs)
olderrh := client.ErrorHandler
client.ErrorHandler = func(e error) { errCh <- e }
client.ResponseTimeout = 5 * time.Second
defer func() { client.ErrorHandler = olderrh }()
nextJobCh := make(chan struct{})
defer close(nextJobCh)
go func() {
for range nextJobCh {
start := time.Now()
handle, err := client.Do("PerlToUpper", []byte("abcdef"), JobNormal, func(r *Response) { gotCh <- string(r.Data) })
if err == ErrLostConn && time.Since(start) > client.ResponseTimeout {
errCh <- errors.New("Impossible 'lost conn', deadlock bug detected")
} else if err != nil {
errCh <- err
}
if handle == "" {
errCh <- errors.New("Handle is empty.")
}
}
}()
for i := 0; i < nreqs; i++ {
select {
case err := <-errCh:
t.Fatal(err)
case nextJobCh <- struct{}{}:
}
}
remaining := nreqs
for remaining > 0 {
select {
case err := <-errCh:
t.Fatal(err)
case got := <-gotCh:
if got != "ABCDEF" {
t.Error("Unexpected response from PerlDoUpper: ", got)
}
remaining--
t.Logf("%d response remaining", remaining)
}
}
}
func TestClientStatus(t *testing.T) { func TestClientStatus(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
status, err := client.Status("handle not exists") status, err := client.Status("handle not exists")
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -105,6 +368,9 @@ func TestClientStatus(t *testing.T) {
} }
func TestClientClose(t *testing.T) { func TestClientClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := client.Close(); err != nil { if err := client.Close(); err != nil {
t.Error(err) t.Error(err)
} }

View File

@ -5,7 +5,7 @@ const (
// queue size // queue size
queueSize = 8 queueSize = 8
// read buffer size // read buffer size
bufferSize = 1024 bufferSize = 8192
// min packet length // min packet length
minPacketLength = 12 minPacketLength = 12
@ -51,6 +51,7 @@ const (
dtSubmitJobLowBg = 34 dtSubmitJobLowBg = 34
WorkComplate = dtWorkComplete WorkComplate = dtWorkComplete
WorkComplete = dtWorkComplete
WorkData = dtWorkData WorkData = dtWorkData
WorkStatus = dtWorkStatus WorkStatus = dtWorkStatus
WorkWarning = dtWorkWarning WorkWarning = dtWorkWarning

View File

@ -9,6 +9,7 @@ import (
var ( var (
ErrWorkWarning = errors.New("Work warning") ErrWorkWarning = errors.New("Work warning")
ErrInvalidData = errors.New("Invalid data") ErrInvalidData = errors.New("Invalid data")
ErrInvalidId = errors.New("Invalid ID")
ErrWorkFail = errors.New("Work fail") ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption") ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type") ErrDataType = errors.New("Invalid data type")

View File

@ -32,7 +32,7 @@ func (ai *autoincId) Id() string {
return strconv.FormatInt(next, 10) return strconv.FormatInt(next, 10)
} }
// Return an autoincrement ID generator // NewAutoIncId returns an autoincrement ID generator
func NewAutoIncId() IdGenerator { func NewAutoIncId() IdGenerator {
// we'll consider the nano fraction of a second at startup unique // we'll consider the nano fraction of a second at startup unique
// and count up from there. // and count up from there.

View File

@ -12,19 +12,17 @@ const (
var ( var (
ErrNotFound = errors.New("Server Not Found") ErrNotFound = errors.New("Server Not Found")
SelectWithRate = selectWithRate
SelectRandom = selectRandom
) )
type poolClient struct { type PoolClient struct {
*Client *Client
Rate int Rate int
mutex sync.Mutex mutex sync.Mutex
} }
type SelectionHandler func(map[string]*poolClient, string) string type SelectionHandler func(map[string]*PoolClient, string) string
func selectWithRate(pool map[string]*poolClient, func SelectWithRate(pool map[string]*PoolClient,
last string) (addr string) { last string) (addr string) {
total := 0 total := 0
for _, item := range pool { for _, item := range pool {
@ -36,7 +34,7 @@ func selectWithRate(pool map[string]*poolClient,
return last return last
} }
func selectRandom(pool map[string]*poolClient, func SelectRandom(pool map[string]*PoolClient,
last string) (addr string) { last string) (addr string) {
r := rand.Intn(len(pool)) r := rand.Intn(len(pool))
i := 0 i := 0
@ -52,17 +50,17 @@ func selectRandom(pool map[string]*poolClient,
type Pool struct { type Pool struct {
SelectionHandler SelectionHandler SelectionHandler SelectionHandler
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
Clients map[string]*PoolClient
clients map[string]*poolClient
last string last string
mutex sync.Mutex mutex sync.Mutex
} }
// Return a new pool. // NewPool returns a new pool.
func NewPool() (pool *Pool) { func NewPool() (pool *Pool) {
return &Pool{ return &Pool{
clients: make(map[string]*poolClient, poolSize), Clients: make(map[string]*PoolClient, poolSize),
SelectionHandler: SelectWithRate, SelectionHandler: SelectWithRate,
} }
} }
@ -71,16 +69,16 @@ func NewPool() (pool *Pool) {
func (pool *Pool) Add(net, addr string, rate int) (err error) { func (pool *Pool) Add(net, addr string, rate int) (err error) {
pool.mutex.Lock() pool.mutex.Lock()
defer pool.mutex.Unlock() defer pool.mutex.Unlock()
var item *poolClient var item *PoolClient
var ok bool var ok bool
if item, ok = pool.clients[addr]; ok { if item, ok = pool.Clients[addr]; ok {
item.Rate = rate item.Rate = rate
} else { } else {
var client *Client var client *Client
client, err = New(net, addr) client, err = New(net, addr)
if err == nil { if err == nil {
item = &poolClient{Client: client, Rate: rate} item = &PoolClient{Client: client, Rate: rate}
pool.clients[addr] = item pool.Clients[addr] = item
} }
} }
return return
@ -90,7 +88,7 @@ func (pool *Pool) Add(net, addr string, rate int) (err error) {
func (pool *Pool) Remove(addr string) { func (pool *Pool) Remove(addr string) {
pool.mutex.Lock() pool.mutex.Lock()
defer pool.mutex.Unlock() defer pool.mutex.Unlock()
delete(pool.clients, addr) delete(pool.Clients, addr)
} }
func (pool *Pool) Do(funcname string, data []byte, func (pool *Pool) Do(funcname string, data []byte,
@ -113,10 +111,10 @@ func (pool *Pool) DoBg(funcname string, data []byte,
return return
} }
// Get job status from job server. // Status gets job status from job server.
// !!!Not fully tested.!!! // !!!Not fully tested.!!!
func (pool *Pool) Status(addr, handle string) (status *Status, err error) { func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
if client, ok := pool.clients[addr]; ok { if client, ok := pool.Clients[addr]; ok {
client.Lock() client.Lock()
defer client.Unlock() defer client.Unlock()
status, err = client.Status(handle) status, err = client.Status(handle)
@ -128,12 +126,12 @@ func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
// Send a something out, get the samething back. // Send a something out, get the samething back.
func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) {
var client *poolClient var client *PoolClient
if addr == "" { if addr == "" {
client = pool.selectServer() client = pool.selectServer()
} else { } else {
var ok bool var ok bool
if client, ok = pool.clients[addr]; !ok { if client, ok = pool.Clients[addr]; !ok {
err = ErrNotFound err = ErrNotFound
return return
} }
@ -147,18 +145,18 @@ func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) {
// Close // Close
func (pool *Pool) Close() (err map[string]error) { func (pool *Pool) Close() (err map[string]error) {
err = make(map[string]error) err = make(map[string]error)
for _, c := range pool.clients { for _, c := range pool.Clients {
err[c.addr] = c.Close() err[c.addr] = c.Close()
} }
return return
} }
// selecting server // selecting server
func (pool *Pool) selectServer() (client *poolClient) { func (pool *Pool) selectServer() (client *PoolClient) {
for client == nil { for client == nil {
addr := pool.SelectionHandler(pool.clients, pool.last) addr := pool.SelectionHandler(pool.Clients, pool.last)
var ok bool var ok bool
if client, ok = pool.clients[addr]; ok { if client, ok = pool.Clients[addr]; ok {
pool.last = addr pool.last = addr
break break
} }

View File

@ -9,6 +9,9 @@ var (
) )
func TestPoolAdd(t *testing.T) { func TestPoolAdd(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add servers") t.Log("Add servers")
c := 2 c := 2
if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil { if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil {
@ -18,12 +21,15 @@ func TestPoolAdd(t *testing.T) {
t.Log(err) t.Log(err)
c -= 1 c -= 1
} }
if len(pool.clients) != c { if len(pool.Clients) != c {
t.Errorf("%d servers expected, %d got.", c, len(pool.clients)) t.Errorf("%d servers expected, %d got.", c, len(pool.Clients))
} }
} }
func TestPoolEcho(t *testing.T) { func TestPoolEcho(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
echo, err := pool.Echo("", []byte(TestStr)) echo, err := pool.Echo("", []byte(TestStr))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -41,6 +47,9 @@ func TestPoolEcho(t *testing.T) {
} }
func TestPoolDoBg(t *testing.T) { func TestPoolDoBg(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
addr, handle, err := pool.DoBg("ToUpper", addr, handle, err := pool.DoBg("ToUpper",
[]byte("abcdef"), JobLow) []byte("abcdef"), JobLow)
if err != nil { if err != nil {
@ -55,6 +64,9 @@ func TestPoolDoBg(t *testing.T) {
} }
func TestPoolDo(t *testing.T) { func TestPoolDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) { jobHandler := func(job *Response) {
str := string(job.Data) str := string(job.Data)
if str == "ABCDEF" { if str == "ABCDEF" {
@ -77,6 +89,9 @@ func TestPoolDo(t *testing.T) {
} }
func TestPoolStatus(t *testing.T) { func TestPoolStatus(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
status, err := pool.Status("127.0.0.1:4730", "handle not exists") status, err := pool.Status("127.0.0.1:4730", "handle not exists")
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -114,6 +129,9 @@ func TestPoolStatus(t *testing.T) {
} }
func TestPoolClose(t *testing.T) { func TestPoolClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
return return
if err := pool.Close(); err != nil { if err := pool.Close(); err != nil {
t.Error(err) t.Error(err)

View File

@ -76,7 +76,7 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) {
case dtJobCreated: case dtJobCreated:
resp.Handle = string(dt) resp.Handle = string(dt)
case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus, case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus,
dtWorkComplete, dtWorkFail, dtWorkException: dtWorkComplete, dtWorkException:
s := bytes.SplitN(dt, []byte{'\x00'}, 2) s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 2 { if len(s) >= 2 {
resp.Handle = string(s[0]) resp.Handle = string(s[0])
@ -85,6 +85,14 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) {
err = fmt.Errorf("Invalid data: %v", data) err = fmt.Errorf("Invalid data: %v", data)
return return
} }
case dtWorkFail:
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
if len(s) >= 1 {
resp.Handle = string(s[0])
} else {
err = fmt.Errorf("Invalid data: %v", data)
return
}
case dtEchoRes: case dtEchoRes:
fallthrough fallthrough
default: default:

View File

@ -3,8 +3,8 @@ package main
import ( import (
"github.com/mikespook/gearman-go/client" "github.com/mikespook/gearman-go/client"
"log" "log"
"sync"
"os" "os"
"sync"
) )
func main() { func main() {

View File

@ -0,0 +1,33 @@
#!/usr/bin/perl
# Runs 20 children that expose "PerlToUpper" before returning the result.
use strict; use warnings;
use constant CHILDREN => 20;
use Time::HiRes qw(usleep);
use Gearman::Worker;
$|++;
my @child_pids;
for (1 .. CHILDREN) {
if (my $pid = fork) {
push @child_pids, $pid;
next;
}
eval {
my $w = Gearman::Worker->new(job_servers => '127.0.0.1:4730');
$w->register_function(PerlToUpper => sub { print "."; uc $_[0]->arg });
$w->work while 1;
};
warn $@ if $@;
exit 0;
}
$SIG{INT} = $SIG{HUP} = sub {
kill 9, @child_pids;
print "\nChildren shut down, gracefully exiting\n";
exit 0;
};
printf "Forked %d children, serving 'PerlToUpper' function to gearman\n", CHILDREN;
sleep;

View File

@ -1,13 +1,14 @@
package main package main
import ( import (
"github.com/mikespook/gearman-go/worker"
"github.com/mikespook/golib/signal"
"log" "log"
"net"
"os" "os"
"strings" "strings"
"time" "time"
"net"
"github.com/mikespook/gearman-go/worker"
"github.com/mikespook/golib/signal"
) )
func ToUpper(job worker.Job) ([]byte, error) { func ToUpper(job worker.Job) ([]byte, error) {
@ -68,7 +69,6 @@ func main() {
return return
} }
go w.Work() go w.Work()
sh := signal.NewHandler() signal.Bind(os.Interrupt, func() uint { return signal.BreakExit })
sh.Bind(os.Interrupt, func() bool { return true }) signal.Wait()
sh.Loop()
} }

View File

@ -17,8 +17,3 @@ in an easy way.
import "github.com/mikespook/gearman-go/worker" import "github.com/mikespook/gearman-go/worker"
*/ */
package gearman package gearman
import (
_ "github.com/mikespook/gearman-go/client"
_ "github.com/mikespook/gearman-go/worker"
)

View File

@ -2,6 +2,9 @@ package worker
import ( import (
"bufio" "bufio"
"bytes"
"encoding/binary"
"io"
"net" "net"
"sync" "sync"
) )
@ -46,20 +49,26 @@ func (a *agent) work() {
a.worker.err(err.(error)) a.worker.err(err.(error))
} }
}() }()
var inpack *inPack var inpack *inPack
var l int var l int
var err error var err error
var data, leftdata []byte var data, leftdata []byte
for { for {
if data, err = a.read(bufferSize); err != nil { if data, err = a.read(); err != nil {
if opErr, ok := err.(*net.OpError); ok { if opErr, ok := err.(*net.OpError); ok {
if opErr.Temporary() { if opErr.Temporary() {
continue continue
} else { } else {
a.worker.err(err) a.disconnect_error(err)
// else - we're probably dc'ing due to a Close()
break break
} }
} else if err == io.EOF {
a.disconnect_error(err)
break
} }
a.worker.err(err) a.worker.err(err)
// If it is unexpected error and the connection wasn't // If it is unexpected error and the connection wasn't
@ -81,17 +90,41 @@ func (a *agent) work() {
leftdata = data leftdata = data
continue continue
} }
for {
if inpack, l, err = decodeInPack(data); err != nil { if inpack, l, err = decodeInPack(data); err != nil {
a.worker.err(err) a.worker.err(err)
leftdata = data leftdata = data
continue break
} } else {
leftdata = nil leftdata = nil
inpack.a = a inpack.a = a
a.worker.in <- inpack select {
if len(data) > l { case <-a.worker.closed:
leftdata = data[l:] return
default:
} }
a.worker.in <- inpack
if len(data) == l {
break
}
if len(data) > l {
data = data[l:]
}
}
}
}
}
func (a *agent) disconnect_error(err error) {
a.Lock()
defer a.Unlock()
if a.conn != nil {
err = &WorkerDisconnectError{
err: err,
agent: a,
}
a.worker.err(err)
} }
} }
@ -107,9 +140,17 @@ func (a *agent) Close() {
func (a *agent) Grab() { func (a *agent) Grab() {
a.Lock() a.Lock()
defer a.Unlock() defer a.Unlock()
a.grab()
}
func (a *agent) grab() bool {
if a.worker.closed != nil {
return false
}
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = dtGrabJobUniq outpack.dataType = dtGrabJobUniq
a.write(outpack) a.write(outpack)
return true
} }
func (a *agent) PreSleep() { func (a *agent) PreSleep() {
@ -120,21 +161,51 @@ func (a *agent) PreSleep() {
a.write(outpack) a.write(outpack)
} }
func (a *agent) reconnect() error {
a.Lock()
defer a.Unlock()
conn, err := net.Dial(a.net, a.addr)
if err != nil {
return err
}
a.conn = conn
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
a.worker.reRegisterFuncsForAgent(a)
if a.grab() {
go a.work()
}
return nil
}
// read length bytes from the socket // read length bytes from the socket
func (a *agent) read(length int) (data []byte, err error) { func (a *agent) read() (data []byte, err error) {
n := 0 n := 0
buf := getBuffer(bufferSize)
// read until data can be unpacked tmp := getBuffer(bufferSize)
for i := length; i > 0 || len(data) < minPacketLength; i -= n { var buf bytes.Buffer
if n, err = a.rw.Read(buf); err != nil {
// read the header so we can get the length of the data
if n, err = a.rw.Read(tmp); err != nil {
return return
} }
data = append(data, buf[0:n]...) dl := int(binary.BigEndian.Uint32(tmp[8:12]))
if n < bufferSize {
break // write what we read so far
buf.Write(tmp[:n])
// read until we receive all the data
for buf.Len() < dl+minPacketLength {
if n, err = a.rw.Read(tmp); err != nil {
return buf.Bytes(), err
} }
buf.Write(tmp[:n])
} }
return
return buf.Bytes(), err
} }
// Internal write the encoded job. // Internal write the encoded job.
@ -149,3 +220,10 @@ func (a *agent) write(outpack *outPack) (err error) {
} }
return a.rw.Flush() return a.rw.Flush()
} }
// Write with lock
func (a *agent) Write(outpack *outPack) (err error) {
a.Lock()
defer a.Unlock()
return a.write(outpack)
}

View File

@ -53,5 +53,4 @@ func ExampleWorker() {
w.Echo([]byte("Hello")) w.Echo([]byte("Hello"))
// Waiting results // Waiting results
wg.Wait() wg.Wait()
// Output: Hello
} }

View File

@ -3,8 +3,8 @@
package worker package worker
import ( import (
"encoding/binary"
"fmt" "fmt"
"strconv"
"sync" "sync"
"time" "time"
) )
@ -23,14 +23,17 @@ type Worker struct {
in chan *inPack in chan *inPack
running bool running bool
ready bool ready bool
jobLeftNum int64
Id string Id string
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
JobHandler JobHandler JobHandler JobHandler
limit chan bool limit chan bool
closed chan struct{}
leftJobs chan struct{}
} }
// Return a worker. // New returns a worker.
// //
// If limit is set to Unlimited(=0), the worker will grab all jobs // If limit is set to Unlimited(=0), the worker will grab all jobs
// and execute them parallelly. // and execute them parallelly.
@ -56,7 +59,7 @@ func (worker *Worker) err(e error) {
} }
} }
// Add a Gearman job server. // AddServer adds a Gearman job server.
// //
// addr should be formated as 'host:port'. // addr should be formated as 'host:port'.
func (worker *Worker) AddServer(net, addr string) (err error) { func (worker *Worker) AddServer(net, addr string) (err error) {
@ -72,11 +75,11 @@ func (worker *Worker) AddServer(net, addr string) (err error) {
// Broadcast an outpack to all Gearman server. // Broadcast an outpack to all Gearman server.
func (worker *Worker) broadcast(outpack *outPack) { func (worker *Worker) broadcast(outpack *outPack) {
for _, v := range worker.agents { for _, v := range worker.agents {
v.write(outpack) v.Write(outpack)
} }
} }
// Add a function. // AddFunc adds a function.
// Set timeout as Unlimited(=0) to disable executing timeout. // Set timeout as Unlimited(=0) to disable executing timeout.
func (worker *Worker) AddFunc(funcname string, func (worker *Worker) AddFunc(funcname string,
f JobFunc, timeout uint32) (err error) { f JobFunc, timeout uint32) (err error) {
@ -94,6 +97,11 @@ func (worker *Worker) AddFunc(funcname string,
// inner add // inner add
func (worker *Worker) addFunc(funcname string, timeout uint32) { func (worker *Worker) addFunc(funcname string, timeout uint32) {
outpack := prepFuncOutpack(funcname, timeout)
worker.broadcast(outpack)
}
func prepFuncOutpack(funcname string, timeout uint32) *outPack {
outpack := getOutPack() outpack := getOutPack()
if timeout == 0 { if timeout == 0 {
outpack.dataType = dtCanDo outpack.dataType = dtCanDo
@ -101,15 +109,17 @@ func (worker *Worker) addFunc(funcname string, timeout uint32) {
} else { } else {
outpack.dataType = dtCanDoTimeout outpack.dataType = dtCanDoTimeout
l := len(funcname) l := len(funcname)
outpack.data = getBuffer(l + 5)
timeoutString := strconv.FormatUint(uint64(timeout), 10)
outpack.data = getBuffer(l + len(timeoutString) + 1)
copy(outpack.data, []byte(funcname)) copy(outpack.data, []byte(funcname))
outpack.data[l] = '\x00' outpack.data[l] = '\x00'
binary.BigEndian.PutUint32(outpack.data[l+1:], timeout) copy(outpack.data[l+1:], []byte(timeoutString))
} }
worker.broadcast(outpack) return outpack
} }
// Remove a function. // RemoveFunc removes a function.
func (worker *Worker) RemoveFunc(funcname string) (err error) { func (worker *Worker) RemoveFunc(funcname string) (err error) {
worker.Lock() worker.Lock()
defer worker.Unlock() defer worker.Unlock()
@ -140,6 +150,11 @@ func (worker *Worker) handleInPack(inpack *inPack) {
inpack.a.Grab() inpack.a.Grab()
case dtJobAssign, dtJobAssignUniq: case dtJobAssign, dtJobAssignUniq:
go func() { go func() {
go func() {
worker.incrExecJobNum()
defer func() {
worker.decrExecJobNum()
}()
if err := worker.exec(inpack); err != nil { if err := worker.exec(inpack); err != nil {
worker.err(err) worker.err(err)
} }
@ -148,6 +163,7 @@ func (worker *Worker) handleInPack(inpack *inPack) {
worker.limit <- true worker.limit <- true
} }
inpack.a.Grab() inpack.a.Grab()
}()
case dtError: case dtError:
worker.err(inpack.Err()) worker.err(inpack.Err())
fallthrough fallthrough
@ -179,7 +195,7 @@ func (worker *Worker) Ready() (err error) {
return return
} }
// Main loop, block here // Work start main loop (blocking)
// Most of time, this should be evaluated in goroutine. // Most of time, this should be evaluated in goroutine.
func (worker *Worker) Work() { func (worker *Worker) Work() {
if !worker.ready { if !worker.ready {
@ -190,19 +206,29 @@ func (worker *Worker) Work() {
} }
} }
defer func() { worker.Lock()
for _, a := range worker.agents {
a.Close()
}
}()
worker.running = true worker.running = true
worker.Unlock()
for _, a := range worker.agents { for _, a := range worker.agents {
a.Grab() a.Grab()
} }
// 执行任务(阻塞)
var inpack *inPack var inpack *inPack
for inpack = range worker.in { for inpack = range worker.in {
worker.handleInPack(inpack) worker.handleInPack(inpack)
} }
// 关闭Worker进程后 等待任务完成后退出
worker.Lock()
leftJobNum := int(worker.jobLeftNum)
worker.Unlock()
if worker.leftJobs != nil {
for i := 0; i < leftJobNum; i++ {
<-worker.leftJobs
}
}
worker.Reset()
worker.close()
} }
// custome handling warper // custome handling warper
@ -217,10 +243,22 @@ func (worker *Worker) customeHandler(inpack *inPack) {
// Close connection and exit main loop // Close connection and exit main loop
func (worker *Worker) Close() { func (worker *Worker) Close() {
worker.Lock() worker.Lock()
worker.Unlock() defer worker.Unlock()
if worker.running == true { if worker.running == true && worker.closed == nil {
worker.closed = make(chan struct{}, 1)
worker.closed <- struct{}{}
worker.running = false worker.running = false
close(worker.in) close(worker.in)
// 创建关闭后执行中的任务列表
if worker.jobLeftNum != 0 {
worker.leftJobs = make(chan struct{}, worker.jobLeftNum+int64(len(worker.in)))
}
}
}
func (worker *Worker) close() {
for _, a := range worker.agents {
a.Close()
} }
} }
@ -232,7 +270,7 @@ func (worker *Worker) Echo(data []byte) {
worker.broadcast(outpack) worker.broadcast(outpack)
} }
// Remove all of functions. // Reset removes all of functions.
// Both from the worker and job servers. // Both from the worker and job servers.
func (worker *Worker) Reset() { func (worker *Worker) Reset() {
outpack := getOutPack() outpack := getOutPack()
@ -250,6 +288,23 @@ func (worker *Worker) SetId(id string) {
worker.broadcast(outpack) worker.broadcast(outpack)
} }
func (worker *Worker) incrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum++
return worker.jobLeftNum
}
func (worker *Worker) decrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum--
if worker.jobLeftNum < 0 {
worker.jobLeftNum = 0
}
return worker.jobLeftNum
}
// inner job executing // inner job executing
func (worker *Worker) exec(inpack *inPack) (err error) { func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() { defer func() {
@ -275,7 +330,7 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
} else { } else {
r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second) r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
} }
if worker.running { //if worker.running {
outpack := getOutPack() outpack := getOutPack()
if r.err == nil { if r.err == nil {
outpack.dataType = dtWorkComplete outpack.dataType = dtWorkComplete
@ -289,10 +344,22 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
} }
outpack.handle = inpack.handle outpack.handle = inpack.handle
outpack.data = r.data outpack.data = r.data
inpack.a.write(outpack) _ = inpack.a.Write(outpack)
if worker.leftJobs != nil {
worker.leftJobs <- struct{}{}
} }
//}
return return
} }
func (worker *Worker) reRegisterFuncsForAgent(a *agent) {
worker.Lock()
defer worker.Unlock()
for funcname, f := range worker.funcs {
outpack := prepFuncOutpack(funcname, f.timeout)
a.write(outpack)
}
}
// inner result // inner result
type result struct { type result struct {
@ -316,3 +383,23 @@ func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
} }
return r return r
} }
// Error type passed when a worker connection disconnects
type WorkerDisconnectError struct {
err error
agent *agent
}
func (e *WorkerDisconnectError) Error() string {
return e.err.Error()
}
// Responds to the error by asking the worker to reconnect
func (e *WorkerDisconnectError) Reconnect() (err error) {
return e.agent.reconnect()
}
// Which server was this for?
func (e *WorkerDisconnectError) Server() (net string, addr string) {
return e.agent.net, e.agent.addr
}

View File

@ -1,170 +0,0 @@
package worker
import (
"../client"
"log"
"net"
"os/exec"
"testing"
"time"
)
const port = `3700`
var gearman_ready chan bool
var kill_gearman chan bool
var bye chan bool
func init() {
if check_gearman_present() {
panic(`Something already listening on our testing port. Chickening out of testing with it!`)
}
gearman_ready = make( chan bool )
kill_gearman = make( chan bool )
// TODO: verify port is clear
go run_gearman()
}
func run_gearman() {
gm_cmd := exec.Command(`/usr/sbin/gearmand`, `--port`, port)
start_err := gm_cmd.Start()
if start_err != nil {
panic(`could not start gearman, aborting test :` + start_err.Error())
}
// Make sure we clear up our gearman:
defer func() {
log.Println("killing gearmand")
gm_cmd.Process.Kill()
}()
for tries := 10; tries > 0; tries-- {
if check_gearman_present() {
break
}
time.Sleep(250 * time.Millisecond)
}
if !check_gearman_present() {
panic(`Unable to start gearman aborting test`)
}
gearman_ready <- true
<- kill_gearman
}
func check_gearman_present() bool {
con, err := net.Dial(`tcp`, `127.0.0.1:`+port)
if err != nil {
log.Println("gearman not ready " + err.Error())
return false
}
log.Println("gearman ready")
con.Close()
return true
}
func check_gearman_is_dead() bool {
for tries := 10; tries > 0; tries-- {
if !check_gearman_present() {
return true
}
time.Sleep(250 * time.Millisecond)
}
return false
}
/*
Checks for a disconnect whilst not working
*/
func TestBasicDisconnect(t *testing.T) {
<- gearman_ready
worker := New(Unlimited)
timeout := make(chan bool, 1)
done := make( chan bool, 1)
if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil {
t.Error(err)
}
work_done := false;
if err := worker.AddFunc("gearman-go-workertest",
func(j Job)(b []byte, e error){
work_done = true;
done <- true
return}, 0);
err != nil {
t.Error(err)
}
worker.JobHandler = func( j Job ) error {
if( ! worker.ready ){
t.Error("Worker not ready as expected");
}
done <-true
return nil
}
handled_errors := false
c_error := make( chan bool)
worker.ErrorHandler = func( e error ){
log.Println( e )
handled_errors = true
c_error <- true
}
go func() {
time.Sleep(5 * time.Second)
timeout <- true
}()
err := worker.Ready()
if err != nil {
t.Error(err)
}
go worker.Work()
kill_gearman <- true
check_gearman_is_dead()
go run_gearman()
select {
case <-gearman_ready:
case <-timeout:
}
send_client_request()
select {
case <- done:
t.Error("Client request handled (somehow), did we magically reconnect?")
case <-timeout:
t.Error("Test timed out waiting for the error handler")
case <-c_error:
// error was handled!
}
kill_gearman <- true
}
func send_client_request(){
log.Println("sending client request");
c, err := client.New( Network, "127.0.0.1:" + port )
if err == nil {
_, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh)
if err != nil {
log.Println( "error sending client request " + err.Error() )
}
}else{
log.Println( "error with client " + err.Error() )
}
}

View File

@ -0,0 +1,59 @@
package worker
import (
"fmt"
"sync"
"testing"
)
func TestWorkerRace(t *testing.T) {
// from example worker
// An example of worker
w := New(Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
// A function for handling jobs
foobar := func(job Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
// tear down
w.Close()
}

View File

@ -1,18 +1,36 @@
package worker package worker
import ( import (
"bytes"
"flag"
"os"
"sync" "sync"
"testing" "testing"
"time" "time"
) )
var worker *Worker var (
worker *Worker
runIntegrationTests bool
)
func init() { func init() {
worker = New(Unlimited) worker = New(Unlimited)
} }
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
code := m.Run()
os.Exit(code)
}
func TestWorkerErrNoneAgents(t *testing.T) { func TestWorkerErrNoneAgents(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
err := worker.Ready() err := worker.Ready()
if err != ErrNoneAgents { if err != ErrNoneAgents {
t.Error("ErrNoneAgents expected.") t.Error("ErrNoneAgents expected.")
@ -20,6 +38,9 @@ func TestWorkerErrNoneAgents(t *testing.T) {
} }
func TestWorkerAddServer(t *testing.T) { func TestWorkerAddServer(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
t.Log("Add local server 127.0.0.1:4730.") t.Log("Add local server 127.0.0.1:4730.")
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil { if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err) t.Error(err)
@ -32,6 +53,9 @@ func TestWorkerAddServer(t *testing.T) {
} }
func TestWorkerErrNoneFuncs(t *testing.T) { func TestWorkerErrNoneFuncs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
err := worker.Ready() err := worker.Ready()
if err != ErrNoneFuncs { if err != ErrNoneFuncs {
t.Error("ErrNoneFuncs expected.") t.Error("ErrNoneFuncs expected.")
@ -43,6 +67,9 @@ func foobar(job Job) ([]byte, error) {
} }
func TestWorkerAddFunction(t *testing.T) { func TestWorkerAddFunction(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := worker.AddFunc("foobar", foobar, 0); err != nil { if err := worker.AddFunc("foobar", foobar, 0); err != nil {
t.Error(err) t.Error(err)
} }
@ -56,12 +83,18 @@ func TestWorkerAddFunction(t *testing.T) {
} }
func TestWorkerRemoveFunc(t *testing.T) { func TestWorkerRemoveFunc(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
if err := worker.RemoveFunc("foobar"); err != nil { if err := worker.RemoveFunc("foobar"); err != nil {
t.Error(err) t.Error(err)
} }
} }
func TestWork(t *testing.T) { func TestWork(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
var wg sync.WaitGroup var wg sync.WaitGroup
worker.JobHandler = func(job Job) error { worker.JobHandler = func(job Job) error {
t.Logf("%s", job.Data()) t.Logf("%s", job.Data())
@ -78,12 +111,76 @@ func TestWork(t *testing.T) {
wg.Wait() wg.Wait()
} }
func TestLargeDataWork(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
worker := New(Unlimited)
defer worker.Close()
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
worker.Ready()
l := 5714
var wg sync.WaitGroup
bigdataHandler := func(job Job) error {
defer wg.Done()
if len(job.Data()) != l {
t.Errorf("expected length %d. got %d.", l, len(job.Data()))
}
return nil
}
if err := worker.AddFunc("bigdata", foobar, 0); err != nil {
defer wg.Done()
t.Error(err)
}
worker.JobHandler = bigdataHandler
worker.ErrorHandler = func(err error) {
t.Fatal("shouldn't have received an error")
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
// var cli *client.Client
// var err error
// if cli, err = client.New(client.Network, "127.0.0.1:4730"); err != nil {
// t.Fatal(err)
// }
// cli.ErrorHandler = func(e error) {
// t.Error(e)
// }
// _, err = cli.Do("bigdata", bytes.Repeat([]byte("a"), l), client.JobLow, func(res *client.Response) {
// })
// if err != nil {
// t.Error(err)
// }
worker.Echo(bytes.Repeat([]byte("a"), l))
wg.Wait()
}
func TestWorkerClose(t *testing.T) { func TestWorkerClose(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
worker.Close() worker.Close()
} }
func TestWorkWithoutReady(t *testing.T) { func TestWorkWithoutReady(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
other_worker := New(Unlimited) other_worker := New(Unlimited)
if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil { if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
@ -97,8 +194,8 @@ func TestWorkWithoutReady(t * testing.T){
done := make(chan bool, 1) done := make(chan bool, 1)
other_worker.JobHandler = func(j Job) error { other_worker.JobHandler = func(j Job) error {
if( ! other_worker.ready ){ if !other_worker.ready {
t.Error("Worker not ready as expected"); t.Error("Worker not ready as expected")
} }
done <- true done <- true
return nil return nil
@ -109,14 +206,14 @@ func TestWorkWithoutReady(t * testing.T){
}() }()
go func() { go func() {
other_worker.Work(); other_worker.Work()
}() }()
// With the all-in-one Work() we don't know if the // With the all-in-one Work() we don't know if the
// worker is ready at this stage so we may have to wait a sec: // worker is ready at this stage so we may have to wait a sec:
go func() { go func() {
tries := 5 tries := 5
for( tries > 0 ){ for tries > 0 {
if other_worker.ready { if other_worker.ready {
other_worker.Echo([]byte("Hello")) other_worker.Echo([]byte("Hello"))
break break
@ -137,6 +234,9 @@ func TestWorkWithoutReady(t * testing.T){
} }
func TestWorkWithoutReadyWithPanic(t *testing.T) { func TestWorkWithoutReadyWithPanic(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
other_worker := New(Unlimited) other_worker := New(Unlimited)
timeout := make(chan bool, 1) timeout := make(chan bool, 1)
@ -153,7 +253,7 @@ func TestWorkWithoutReadyWithPanic(t * testing.T){
t.Error("Work should raise a panic.") t.Error("Work should raise a panic.")
done <- true done <- true
}() }()
other_worker.Work(); other_worker.Work()
}() }()
go func() { go func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)