Compare commits

...

9 Commits
0.2 ... master

Author SHA1 Message Date
abd6ce9be3 调用Close后不再向JobServer获取任务 2023-02-13 01:01:37 +08:00
97187ecbf9 等待执行中的任务完成后再退出进程 2023-02-13 00:29:22 +08:00
Xing
2a518e8661
Merge pull request #96 from floringavrila/add-support-for-own-ids
add support for own ids (coalescing)
2022-05-20 15:14:03 +12:00
gavrila florin
fa71d7a37a add support for own ids 2022-05-04 00:04:38 +03:00
Xing
d36dcb7fc2
Merge pull request #94 from cameronpm/client_race
Fix two race conditions in client.Do
2021-05-03 16:56:33 +12:00
Paul Cameron
a8f0a04c3d Fix two race conditions in client.Do
* Common race condition is fixed by identifying that Client.respHandler
  can be completely removed since all respHandler operations (get, put
  and invocation) can be moved into the Client.processLoop goroutine,
  meaning that zero locking is required. This race condition resulted
  in a deadlock that was resolved by the response timeout at the
  end of client.Do, returning ErrLostConn

* Rare race condition is fixed by changing responseHandlerMap.get
  to .getAndRemove. This race condition resulted in the innerHandler
  for a new dtJobCreated assigned in client.Do overriding a stale older
  dtJobCreated request, and the newer innerHandler being removed by an
  older dtJobCreated in client.processLoop > client.handleInner. When
  the newer dtJobCreated response was received, the handler for it had
  already been deleted. This was resolved by the response timeout at the
  end of client.Do, returning ErrLostConn
2021-05-02 11:31:19 +10:00
galih rivanto
81d00aa9ce add synchronization to prevent race condition on worker shutdown (#89)
* add synchronization to prevent race condition on worker shutdown

* fix incorrect synchronization and add race unit test

* fix race while broadcasting outpack
2019-05-28 18:41:20 +08:00
Christoffer Fjellström
b902646ce8
Merge pull request #87 from CodeLingoBot/rewrite
Fix function comments based on best practices from Effective Go
2019-02-28 11:28:24 +01:00
CodeLingo Bot
133dd3716f Fix function comments based on best practices from Effective Go
Signed-off-by: CodeLingo Bot <bot@codelingo.io>
2019-02-28 02:11:17 +00:00
9 changed files with 512 additions and 106 deletions

View File

@ -19,7 +19,6 @@ type Client struct {
sync.Mutex
net, addr string
respHandler *responseHandlerMap
innerHandler *responseHandlerMap
in chan *Response
conn net.Conn
@ -32,11 +31,16 @@ type Client struct {
type responseHandlerMap struct {
sync.Mutex
holder map[string]ResponseHandler
holder map[string]handledResponse
}
type handledResponse struct {
internal ResponseHandler // internal handler, always non-nil
external ResponseHandler // handler passed in from (*Client).Do, sometimes nil
}
func newResponseHandlerMap() *responseHandlerMap {
return &responseHandlerMap{holder: make(map[string]ResponseHandler, queueSize)}
return &responseHandlerMap{holder: make(map[string]handledResponse, queueSize)}
}
func (r *responseHandlerMap) remove(key string) {
@ -45,29 +49,29 @@ func (r *responseHandlerMap) remove(key string) {
r.Unlock()
}
func (r *responseHandlerMap) get(key string) (ResponseHandler, bool) {
func (r *responseHandlerMap) getAndRemove(key string) (handledResponse, bool) {
r.Lock()
rh, b := r.holder[key]
delete(r.holder, key)
r.Unlock()
return rh, b
}
func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
func (r *responseHandlerMap) putWithExternalHandler(key string, internal, external ResponseHandler) {
r.Lock()
r.holder[key] = rh
r.holder[key] = handledResponse{internal: internal, external: external}
r.Unlock()
}
func (r *responseHandlerMap) putNoLock(key string, rh ResponseHandler) {
r.holder[key] = rh
func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
r.putWithExternalHandler(key, rh, nil)
}
// Return a client.
// New returns a client.
func New(network, addr string) (client *Client, err error) {
client = &Client{
net: network,
addr: addr,
respHandler: newResponseHandlerMap(),
innerHandler: newResponseHandlerMap(),
in: make(chan *Response, queueSize),
ResponseTimeout: DefaultTimeout,
@ -168,21 +172,26 @@ ReadLoop:
}
func (client *Client) processLoop() {
rhandlers := map[string]ResponseHandler{}
for resp := range client.in {
switch resp.DataType {
case dtError:
client.err(getError(resp.Data))
case dtStatusRes:
resp = client.handleInner("s"+resp.Handle, resp)
client.handleInner("s"+resp.Handle, resp, nil)
case dtJobCreated:
resp = client.handleInner("c", resp)
client.handleInner("c", resp, rhandlers)
case dtEchoRes:
resp = client.handleInner("e", resp)
client.handleInner("e", resp, nil)
case dtWorkData, dtWorkWarning, dtWorkStatus:
resp = client.handleResponse(resp.Handle, resp)
if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
}
case dtWorkComplete, dtWorkFail, dtWorkException:
client.handleResponse(resp.Handle, resp)
client.respHandler.remove(resp.Handle)
if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
delete(rhandlers, resp.Handle)
}
}
}
}
@ -193,21 +202,13 @@ func (client *Client) err(e error) {
}
}
func (client *Client) handleResponse(key string, resp *Response) *Response {
if h, ok := client.respHandler.get(key); ok {
h(resp)
return nil
func (client *Client) handleInner(key string, resp *Response, rhandlers map[string]ResponseHandler) {
if h, ok := client.innerHandler.getAndRemove(key); ok {
if h.external != nil && resp.Handle != "" {
rhandlers[resp.Handle] = h.external
}
h.internal(resp)
}
return resp
}
func (client *Client) handleInner(key string, resp *Response) *Response {
if h, ok := client.innerHandler.get(key); ok {
h(resp)
client.innerHandler.remove(key)
return nil
}
return resp
}
type handleOrError struct {
@ -216,14 +217,17 @@ type handleOrError struct {
}
func (client *Client) do(funcname string, data []byte,
flag uint32) (handle string, err error) {
flag uint32, h ResponseHandler, id string) (handle string, err error) {
if len(id) == 0 {
return "", ErrInvalidId
}
if client.conn == nil {
return "", ErrLostConn
}
var result = make(chan handleOrError, 1)
client.Lock()
defer client.Unlock()
client.innerHandler.put("c", func(resp *Response) {
client.innerHandler.putWithExternalHandler("c", func(resp *Response) {
if resp.DataType == dtError {
err = getError(resp.Data)
result <- handleOrError{"", err}
@ -231,8 +235,7 @@ func (client *Client) do(funcname string, data []byte,
}
handle = resp.Handle
result <- handleOrError{handle, nil}
})
id := IdGen.Id()
}, h)
req := getJob(id, []byte(funcname), data)
req.DataType = flag
if err = client.write(req); err != nil {
@ -254,22 +257,7 @@ func (client *Client) do(funcname string, data []byte,
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (handle string, err error) {
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
client.respHandler.Lock()
defer client.respHandler.Unlock()
handle, err = client.do(funcname, data, datatype)
if err == nil && h != nil {
client.respHandler.putNoLock(handle, h)
}
handle, err = client.DoWithId(funcname, data, flag, h, IdGen.Id())
return
}
@ -277,23 +265,11 @@ func (client *Client) Do(funcname string, data []byte,
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype)
handle, err = client.DoBgWithId(funcname, data, flag, IdGen.Id())
return
}
// Get job status from job server.
// Status gets job status from job server.
func (client *Client) Status(handle string) (status *Status, err error) {
if client.conn == nil {
return nil, ErrLostConn
@ -345,3 +321,40 @@ func (client *Client) Close() (err error) {
}
return
}
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoWithId(funcname string, data []byte,
flag byte, h ResponseHandler, id string) (handle string, err error) {
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype, h, id)
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBgWithId(funcname string, data []byte,
flag byte, id string) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype, nil, id)
return
}

View File

@ -1,9 +1,14 @@
package client
import (
"crypto/md5"
"encoding/hex"
"errors"
"flag"
"fmt"
"os"
"testing"
"time"
)
const (
@ -17,6 +22,7 @@ var (
func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
flag.Parse()
if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag
}
@ -69,6 +75,42 @@ func TestClientDoBg(t *testing.T) {
}
}
func TestClientDoBgWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
id := ""
_, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
@ -95,6 +137,198 @@ func TestClientDo(t *testing.T) {
}
}
func TestClientDoWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
id := ""
_, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDoWithIdCheckSameHandle(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123,categoryId:1}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 != handle2 {
t.Error("expecting the same handle when using the same id on the same Job name")
}
}
func TestClientDoWithIdCheckDifferentHandleOnDifferentJobs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("DeleteProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 == handle2 {
t.Error("expecting different handles because there are different job names")
}
}
func TestClientMultiDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
// This integration test requires that examples/pl/worker_multi.pl be running.
//
// Test invocation is:
// go test -integration -timeout 10s -run '^TestClient(AddServer|MultiDo)$'
//
// Send 1000 requests to go through all race conditions
const nreqs = 1000
errCh := make(chan error)
gotCh := make(chan string, nreqs)
olderrh := client.ErrorHandler
client.ErrorHandler = func(e error) { errCh <- e }
client.ResponseTimeout = 5 * time.Second
defer func() { client.ErrorHandler = olderrh }()
nextJobCh := make(chan struct{})
defer close(nextJobCh)
go func() {
for range nextJobCh {
start := time.Now()
handle, err := client.Do("PerlToUpper", []byte("abcdef"), JobNormal, func(r *Response) { gotCh <- string(r.Data) })
if err == ErrLostConn && time.Since(start) > client.ResponseTimeout {
errCh <- errors.New("Impossible 'lost conn', deadlock bug detected")
} else if err != nil {
errCh <- err
}
if handle == "" {
errCh <- errors.New("Handle is empty.")
}
}
}()
for i := 0; i < nreqs; i++ {
select {
case err := <-errCh:
t.Fatal(err)
case nextJobCh <- struct{}{}:
}
}
remaining := nreqs
for remaining > 0 {
select {
case err := <-errCh:
t.Fatal(err)
case got := <-gotCh:
if got != "ABCDEF" {
t.Error("Unexpected response from PerlDoUpper: ", got)
}
remaining--
t.Logf("%d response remaining", remaining)
}
}
}
func TestClientStatus(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")

View File

@ -9,6 +9,7 @@ import (
var (
ErrWorkWarning = errors.New("Work warning")
ErrInvalidData = errors.New("Invalid data")
ErrInvalidId = errors.New("Invalid ID")
ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type")

View File

@ -32,7 +32,7 @@ func (ai *autoincId) Id() string {
return strconv.FormatInt(next, 10)
}
// Return an autoincrement ID generator
// NewAutoIncId returns an autoincrement ID generator
func NewAutoIncId() IdGenerator {
// we'll consider the nano fraction of a second at startup unique
// and count up from there.

View File

@ -57,7 +57,7 @@ type Pool struct {
mutex sync.Mutex
}
// Return a new pool.
// NewPool returns a new pool.
func NewPool() (pool *Pool) {
return &Pool{
Clients: make(map[string]*PoolClient, poolSize),
@ -111,7 +111,7 @@ func (pool *Pool) DoBg(funcname string, data []byte,
return
}
// Get job status from job server.
// Status gets job status from job server.
// !!!Not fully tested.!!!
func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
if client, ok := pool.Clients[addr]; ok {

View File

@ -0,0 +1,33 @@
#!/usr/bin/perl
# Runs 20 children that expose "PerlToUpper" before returning the result.
use strict; use warnings;
use constant CHILDREN => 20;
use Time::HiRes qw(usleep);
use Gearman::Worker;
$|++;
my @child_pids;
for (1 .. CHILDREN) {
if (my $pid = fork) {
push @child_pids, $pid;
next;
}
eval {
my $w = Gearman::Worker->new(job_servers => '127.0.0.1:4730');
$w->register_function(PerlToUpper => sub { print "."; uc $_[0]->arg });
$w->work while 1;
};
warn $@ if $@;
exit 0;
}
$SIG{INT} = $SIG{HUP} = sub {
kill 9, @child_pids;
print "\nChildren shut down, gracefully exiting\n";
exit 0;
};
printf "Forked %d children, serving 'PerlToUpper' function to gearman\n", CHILDREN;
sleep;

View File

@ -98,6 +98,11 @@ func (a *agent) work() {
} else {
leftdata = nil
inpack.a = a
select {
case <-a.worker.closed:
return
default:
}
a.worker.in <- inpack
if len(data) == l {
break
@ -111,6 +116,9 @@ func (a *agent) work() {
}
func (a *agent) disconnect_error(err error) {
a.Lock()
defer a.Unlock()
if a.conn != nil {
err = &WorkerDisconnectError{
err: err,
@ -135,10 +143,14 @@ func (a *agent) Grab() {
a.grab()
}
func (a *agent) grab() {
func (a *agent) grab() bool {
if a.worker.closed != nil {
return false
}
outpack := getOutPack()
outpack.dataType = dtGrabJobUniq
a.write(outpack)
return true
}
func (a *agent) PreSleep() {
@ -161,9 +173,10 @@ func (a *agent) reconnect() error {
bufio.NewWriter(a.conn))
a.worker.reRegisterFuncsForAgent(a)
a.grab()
if a.grab() {
go a.work()
}
go a.work()
return nil
}

View File

@ -4,9 +4,9 @@ package worker
import (
"fmt"
"strconv"
"sync"
"time"
"strconv"
)
const (
@ -18,19 +18,22 @@ const (
// It can connect to multi-server and grab jobs.
type Worker struct {
sync.Mutex
agents []*agent
funcs jobFuncs
in chan *inPack
running bool
ready bool
agents []*agent
funcs jobFuncs
in chan *inPack
running bool
ready bool
jobLeftNum int64
Id string
ErrorHandler ErrorHandler
JobHandler JobHandler
limit chan bool
closed chan struct{}
leftJobs chan struct{}
}
// Return a worker.
// New returns a worker.
//
// If limit is set to Unlimited(=0), the worker will grab all jobs
// and execute them parallelly.
@ -56,7 +59,7 @@ func (worker *Worker) err(e error) {
}
}
// Add a Gearman job server.
// AddServer adds a Gearman job server.
//
// addr should be formated as 'host:port'.
func (worker *Worker) AddServer(net, addr string) (err error) {
@ -72,11 +75,11 @@ func (worker *Worker) AddServer(net, addr string) (err error) {
// Broadcast an outpack to all Gearman server.
func (worker *Worker) broadcast(outpack *outPack) {
for _, v := range worker.agents {
v.write(outpack)
v.Write(outpack)
}
}
// Add a function.
// AddFunc adds a function.
// Set timeout as Unlimited(=0) to disable executing timeout.
func (worker *Worker) AddFunc(funcname string,
f JobFunc, timeout uint32) (err error) {
@ -116,7 +119,7 @@ func prepFuncOutpack(funcname string, timeout uint32) *outPack {
return outpack
}
// Remove a function.
// RemoveFunc removes a function.
func (worker *Worker) RemoveFunc(funcname string) (err error) {
worker.Lock()
defer worker.Unlock()
@ -147,14 +150,20 @@ func (worker *Worker) handleInPack(inpack *inPack) {
inpack.a.Grab()
case dtJobAssign, dtJobAssignUniq:
go func() {
if err := worker.exec(inpack); err != nil {
worker.err(err)
go func() {
worker.incrExecJobNum()
defer func() {
worker.decrExecJobNum()
}()
if err := worker.exec(inpack); err != nil {
worker.err(err)
}
}()
if worker.limit != nil {
worker.limit <- true
}
inpack.a.Grab()
}()
if worker.limit != nil {
worker.limit <- true
}
inpack.a.Grab()
case dtError:
worker.err(inpack.Err())
fallthrough
@ -186,7 +195,7 @@ func (worker *Worker) Ready() (err error) {
return
}
// Main loop, block here
// Work start main loop (blocking)
// Most of time, this should be evaluated in goroutine.
func (worker *Worker) Work() {
if !worker.ready {
@ -197,14 +206,29 @@ func (worker *Worker) Work() {
}
}
worker.Lock()
worker.running = true
worker.Unlock()
for _, a := range worker.agents {
a.Grab()
}
// 执行任务(阻塞)
var inpack *inPack
for inpack = range worker.in {
worker.handleInPack(inpack)
}
// 关闭Worker进程后 等待任务完成后退出
worker.Lock()
leftJobNum := int(worker.jobLeftNum)
worker.Unlock()
if worker.leftJobs != nil {
for i := 0; i < leftJobNum; i++ {
<-worker.leftJobs
}
}
worker.Reset()
worker.close()
}
// custome handling warper
@ -220,12 +244,21 @@ func (worker *Worker) customeHandler(inpack *inPack) {
func (worker *Worker) Close() {
worker.Lock()
defer worker.Unlock()
if worker.running == true {
for _, a := range worker.agents {
a.Close()
}
if worker.running == true && worker.closed == nil {
worker.closed = make(chan struct{}, 1)
worker.closed <- struct{}{}
worker.running = false
close(worker.in)
// 创建关闭后执行中的任务列表
if worker.jobLeftNum != 0 {
worker.leftJobs = make(chan struct{}, worker.jobLeftNum+int64(len(worker.in)))
}
}
}
func (worker *Worker) close() {
for _, a := range worker.agents {
a.Close()
}
}
@ -237,7 +270,7 @@ func (worker *Worker) Echo(data []byte) {
worker.broadcast(outpack)
}
// Remove all of functions.
// Reset removes all of functions.
// Both from the worker and job servers.
func (worker *Worker) Reset() {
outpack := getOutPack()
@ -255,6 +288,23 @@ func (worker *Worker) SetId(id string) {
worker.broadcast(outpack)
}
func (worker *Worker) incrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum++
return worker.jobLeftNum
}
func (worker *Worker) decrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum--
if worker.jobLeftNum < 0 {
worker.jobLeftNum = 0
}
return worker.jobLeftNum
}
// inner job executing
func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() {
@ -280,22 +330,25 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
} else {
r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
}
if worker.running {
outpack := getOutPack()
if r.err == nil {
outpack.dataType = dtWorkComplete
//if worker.running {
outpack := getOutPack()
if r.err == nil {
outpack.dataType = dtWorkComplete
} else {
if len(r.data) == 0 {
outpack.dataType = dtWorkFail
} else {
if len(r.data) == 0 {
outpack.dataType = dtWorkFail
} else {
outpack.dataType = dtWorkException
}
err = r.err
outpack.dataType = dtWorkException
}
outpack.handle = inpack.handle
outpack.data = r.data
inpack.a.Write(outpack)
err = r.err
}
outpack.handle = inpack.handle
outpack.data = r.data
_ = inpack.a.Write(outpack)
if worker.leftJobs != nil {
worker.leftJobs <- struct{}{}
}
//}
return
}
func (worker *Worker) reRegisterFuncsForAgent(a *agent) {

View File

@ -0,0 +1,59 @@
package worker
import (
"fmt"
"sync"
"testing"
)
func TestWorkerRace(t *testing.T) {
// from example worker
// An example of worker
w := New(Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
// A function for handling jobs
foobar := func(job Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
// tear down
w.Close()
}