Compare commits

...

9 Commits
0.2 ... master

Author SHA1 Message Date
abd6ce9be3 调用Close后不再向JobServer获取任务 2023-02-13 01:01:37 +08:00
97187ecbf9 等待执行中的任务完成后再退出进程 2023-02-13 00:29:22 +08:00
Xing
2a518e8661
Merge pull request #96 from floringavrila/add-support-for-own-ids
add support for own ids (coalescing)
2022-05-20 15:14:03 +12:00
gavrila florin
fa71d7a37a add support for own ids 2022-05-04 00:04:38 +03:00
Xing
d36dcb7fc2
Merge pull request #94 from cameronpm/client_race
Fix two race conditions in client.Do
2021-05-03 16:56:33 +12:00
Paul Cameron
a8f0a04c3d Fix two race conditions in client.Do
* Common race condition is fixed by identifying that Client.respHandler
  can be completely removed since all respHandler operations (get, put
  and invocation) can be moved into the Client.processLoop goroutine,
  meaning that zero locking is required. This race condition resulted
  in a deadlock that was resolved by the response timeout at the
  end of client.Do, returning ErrLostConn

* Rare race condition is fixed by changing responseHandlerMap.get
  to .getAndRemove. This race condition resulted in the innerHandler
  for a new dtJobCreated assigned in client.Do overriding a stale older
  dtJobCreated request, and the newer innerHandler being removed by an
  older dtJobCreated in client.processLoop > client.handleInner. When
  the newer dtJobCreated response was received, the handler for it had
  already been deleted. This was resolved by the response timeout at the
  end of client.Do, returning ErrLostConn
2021-05-02 11:31:19 +10:00
galih rivanto
81d00aa9ce add synchronization to prevent race condition on worker shutdown (#89)
* add synchronization to prevent race condition on worker shutdown

* fix incorrect synchronization and add race unit test

* fix race while broadcasting outpack
2019-05-28 18:41:20 +08:00
Christoffer Fjellström
b902646ce8
Merge pull request #87 from CodeLingoBot/rewrite
Fix function comments based on best practices from Effective Go
2019-02-28 11:28:24 +01:00
CodeLingo Bot
133dd3716f Fix function comments based on best practices from Effective Go
Signed-off-by: CodeLingo Bot <bot@codelingo.io>
2019-02-28 02:11:17 +00:00
9 changed files with 512 additions and 106 deletions

View File

@ -19,7 +19,6 @@ type Client struct {
sync.Mutex sync.Mutex
net, addr string net, addr string
respHandler *responseHandlerMap
innerHandler *responseHandlerMap innerHandler *responseHandlerMap
in chan *Response in chan *Response
conn net.Conn conn net.Conn
@ -32,11 +31,16 @@ type Client struct {
type responseHandlerMap struct { type responseHandlerMap struct {
sync.Mutex sync.Mutex
holder map[string]ResponseHandler holder map[string]handledResponse
}
type handledResponse struct {
internal ResponseHandler // internal handler, always non-nil
external ResponseHandler // handler passed in from (*Client).Do, sometimes nil
} }
func newResponseHandlerMap() *responseHandlerMap { func newResponseHandlerMap() *responseHandlerMap {
return &responseHandlerMap{holder: make(map[string]ResponseHandler, queueSize)} return &responseHandlerMap{holder: make(map[string]handledResponse, queueSize)}
} }
func (r *responseHandlerMap) remove(key string) { func (r *responseHandlerMap) remove(key string) {
@ -45,29 +49,29 @@ func (r *responseHandlerMap) remove(key string) {
r.Unlock() r.Unlock()
} }
func (r *responseHandlerMap) get(key string) (ResponseHandler, bool) { func (r *responseHandlerMap) getAndRemove(key string) (handledResponse, bool) {
r.Lock() r.Lock()
rh, b := r.holder[key] rh, b := r.holder[key]
delete(r.holder, key)
r.Unlock() r.Unlock()
return rh, b return rh, b
} }
func (r *responseHandlerMap) put(key string, rh ResponseHandler) { func (r *responseHandlerMap) putWithExternalHandler(key string, internal, external ResponseHandler) {
r.Lock() r.Lock()
r.holder[key] = rh r.holder[key] = handledResponse{internal: internal, external: external}
r.Unlock() r.Unlock()
} }
func (r *responseHandlerMap) putNoLock(key string, rh ResponseHandler) { func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
r.holder[key] = rh r.putWithExternalHandler(key, rh, nil)
} }
// Return a client. // New returns a client.
func New(network, addr string) (client *Client, err error) { func New(network, addr string) (client *Client, err error) {
client = &Client{ client = &Client{
net: network, net: network,
addr: addr, addr: addr,
respHandler: newResponseHandlerMap(),
innerHandler: newResponseHandlerMap(), innerHandler: newResponseHandlerMap(),
in: make(chan *Response, queueSize), in: make(chan *Response, queueSize),
ResponseTimeout: DefaultTimeout, ResponseTimeout: DefaultTimeout,
@ -168,21 +172,26 @@ ReadLoop:
} }
func (client *Client) processLoop() { func (client *Client) processLoop() {
rhandlers := map[string]ResponseHandler{}
for resp := range client.in { for resp := range client.in {
switch resp.DataType { switch resp.DataType {
case dtError: case dtError:
client.err(getError(resp.Data)) client.err(getError(resp.Data))
case dtStatusRes: case dtStatusRes:
resp = client.handleInner("s"+resp.Handle, resp) client.handleInner("s"+resp.Handle, resp, nil)
case dtJobCreated: case dtJobCreated:
resp = client.handleInner("c", resp) client.handleInner("c", resp, rhandlers)
case dtEchoRes: case dtEchoRes:
resp = client.handleInner("e", resp) client.handleInner("e", resp, nil)
case dtWorkData, dtWorkWarning, dtWorkStatus: case dtWorkData, dtWorkWarning, dtWorkStatus:
resp = client.handleResponse(resp.Handle, resp) if cb := rhandlers[resp.Handle]; cb != nil {
cb(resp)
}
case dtWorkComplete, dtWorkFail, dtWorkException: case dtWorkComplete, dtWorkFail, dtWorkException:
client.handleResponse(resp.Handle, resp) if cb := rhandlers[resp.Handle]; cb != nil {
client.respHandler.remove(resp.Handle) cb(resp)
delete(rhandlers, resp.Handle)
}
} }
} }
} }
@ -193,21 +202,13 @@ func (client *Client) err(e error) {
} }
} }
func (client *Client) handleResponse(key string, resp *Response) *Response { func (client *Client) handleInner(key string, resp *Response, rhandlers map[string]ResponseHandler) {
if h, ok := client.respHandler.get(key); ok { if h, ok := client.innerHandler.getAndRemove(key); ok {
h(resp) if h.external != nil && resp.Handle != "" {
return nil rhandlers[resp.Handle] = h.external
}
h.internal(resp)
} }
return resp
}
func (client *Client) handleInner(key string, resp *Response) *Response {
if h, ok := client.innerHandler.get(key); ok {
h(resp)
client.innerHandler.remove(key)
return nil
}
return resp
} }
type handleOrError struct { type handleOrError struct {
@ -216,14 +217,17 @@ type handleOrError struct {
} }
func (client *Client) do(funcname string, data []byte, func (client *Client) do(funcname string, data []byte,
flag uint32) (handle string, err error) { flag uint32, h ResponseHandler, id string) (handle string, err error) {
if len(id) == 0 {
return "", ErrInvalidId
}
if client.conn == nil { if client.conn == nil {
return "", ErrLostConn return "", ErrLostConn
} }
var result = make(chan handleOrError, 1) var result = make(chan handleOrError, 1)
client.Lock() client.Lock()
defer client.Unlock() defer client.Unlock()
client.innerHandler.put("c", func(resp *Response) { client.innerHandler.putWithExternalHandler("c", func(resp *Response) {
if resp.DataType == dtError { if resp.DataType == dtError {
err = getError(resp.Data) err = getError(resp.Data)
result <- handleOrError{"", err} result <- handleOrError{"", err}
@ -231,8 +235,7 @@ func (client *Client) do(funcname string, data []byte,
} }
handle = resp.Handle handle = resp.Handle
result <- handleOrError{handle, nil} result <- handleOrError{handle, nil}
}) }, h)
id := IdGen.Id()
req := getJob(id, []byte(funcname), data) req := getJob(id, []byte(funcname), data)
req.DataType = flag req.DataType = flag
if err = client.write(req); err != nil { if err = client.write(req); err != nil {
@ -254,22 +257,7 @@ func (client *Client) do(funcname string, data []byte,
// flag can be set to: JobLow, JobNormal and JobHigh // flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) Do(funcname string, data []byte, func (client *Client) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (handle string, err error) { flag byte, h ResponseHandler) (handle string, err error) {
var datatype uint32 handle, err = client.DoWithId(funcname, data, flag, h, IdGen.Id())
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
client.respHandler.Lock()
defer client.respHandler.Unlock()
handle, err = client.do(funcname, data, datatype)
if err == nil && h != nil {
client.respHandler.putNoLock(handle, h)
}
return return
} }
@ -277,23 +265,11 @@ func (client *Client) Do(funcname string, data []byte,
// flag can be set to: JobLow, JobNormal and JobHigh // flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBg(funcname string, data []byte, func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) { flag byte) (handle string, err error) {
if client.conn == nil { handle, err = client.DoBgWithId(funcname, data, flag, IdGen.Id())
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype)
return return
} }
// Get job status from job server. // Status gets job status from job server.
func (client *Client) Status(handle string) (status *Status, err error) { func (client *Client) Status(handle string) (status *Status, err error) {
if client.conn == nil { if client.conn == nil {
return nil, ErrLostConn return nil, ErrLostConn
@ -345,3 +321,40 @@ func (client *Client) Close() (err error) {
} }
return return
} }
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoWithId(funcname string, data []byte,
flag byte, h ResponseHandler, id string) (handle string, err error) {
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype, h, id)
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBgWithId(funcname string, data []byte,
flag byte, id string) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype, nil, id)
return
}

View File

@ -1,9 +1,14 @@
package client package client
import ( import (
"crypto/md5"
"encoding/hex"
"errors"
"flag" "flag"
"fmt"
"os" "os"
"testing" "testing"
"time"
) )
const ( const (
@ -17,6 +22,7 @@ var (
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)") integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
flag.Parse()
if integrationsTestFlag != nil { if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag runIntegrationTests = *integrationsTestFlag
} }
@ -69,6 +75,42 @@ func TestClientDoBg(t *testing.T) {
} }
} }
func TestClientDoBgWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
id := ""
_, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDo(t *testing.T) { func TestClientDo(t *testing.T) {
if !runIntegrationTests { if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration") t.Skip("To run this test, use: go test -integration")
@ -95,6 +137,198 @@ func TestClientDo(t *testing.T) {
} }
} }
func TestClientDoWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
id := ""
_, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDoWithIdCheckSameHandle(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123,categoryId:1}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 != handle2 {
t.Error("expecting the same handle when using the same id on the same Job name")
}
}
func TestClientDoWithIdCheckDifferentHandleOnDifferentJobs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("DeleteProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 == handle2 {
t.Error("expecting different handles because there are different job names")
}
}
func TestClientMultiDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
// This integration test requires that examples/pl/worker_multi.pl be running.
//
// Test invocation is:
// go test -integration -timeout 10s -run '^TestClient(AddServer|MultiDo)$'
//
// Send 1000 requests to go through all race conditions
const nreqs = 1000
errCh := make(chan error)
gotCh := make(chan string, nreqs)
olderrh := client.ErrorHandler
client.ErrorHandler = func(e error) { errCh <- e }
client.ResponseTimeout = 5 * time.Second
defer func() { client.ErrorHandler = olderrh }()
nextJobCh := make(chan struct{})
defer close(nextJobCh)
go func() {
for range nextJobCh {
start := time.Now()
handle, err := client.Do("PerlToUpper", []byte("abcdef"), JobNormal, func(r *Response) { gotCh <- string(r.Data) })
if err == ErrLostConn && time.Since(start) > client.ResponseTimeout {
errCh <- errors.New("Impossible 'lost conn', deadlock bug detected")
} else if err != nil {
errCh <- err
}
if handle == "" {
errCh <- errors.New("Handle is empty.")
}
}
}()
for i := 0; i < nreqs; i++ {
select {
case err := <-errCh:
t.Fatal(err)
case nextJobCh <- struct{}{}:
}
}
remaining := nreqs
for remaining > 0 {
select {
case err := <-errCh:
t.Fatal(err)
case got := <-gotCh:
if got != "ABCDEF" {
t.Error("Unexpected response from PerlDoUpper: ", got)
}
remaining--
t.Logf("%d response remaining", remaining)
}
}
}
func TestClientStatus(t *testing.T) { func TestClientStatus(t *testing.T) {
if !runIntegrationTests { if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration") t.Skip("To run this test, use: go test -integration")

View File

@ -9,6 +9,7 @@ import (
var ( var (
ErrWorkWarning = errors.New("Work warning") ErrWorkWarning = errors.New("Work warning")
ErrInvalidData = errors.New("Invalid data") ErrInvalidData = errors.New("Invalid data")
ErrInvalidId = errors.New("Invalid ID")
ErrWorkFail = errors.New("Work fail") ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption") ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type") ErrDataType = errors.New("Invalid data type")

View File

@ -32,7 +32,7 @@ func (ai *autoincId) Id() string {
return strconv.FormatInt(next, 10) return strconv.FormatInt(next, 10)
} }
// Return an autoincrement ID generator // NewAutoIncId returns an autoincrement ID generator
func NewAutoIncId() IdGenerator { func NewAutoIncId() IdGenerator {
// we'll consider the nano fraction of a second at startup unique // we'll consider the nano fraction of a second at startup unique
// and count up from there. // and count up from there.

View File

@ -57,7 +57,7 @@ type Pool struct {
mutex sync.Mutex mutex sync.Mutex
} }
// Return a new pool. // NewPool returns a new pool.
func NewPool() (pool *Pool) { func NewPool() (pool *Pool) {
return &Pool{ return &Pool{
Clients: make(map[string]*PoolClient, poolSize), Clients: make(map[string]*PoolClient, poolSize),
@ -111,7 +111,7 @@ func (pool *Pool) DoBg(funcname string, data []byte,
return return
} }
// Get job status from job server. // Status gets job status from job server.
// !!!Not fully tested.!!! // !!!Not fully tested.!!!
func (pool *Pool) Status(addr, handle string) (status *Status, err error) { func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
if client, ok := pool.Clients[addr]; ok { if client, ok := pool.Clients[addr]; ok {

View File

@ -0,0 +1,33 @@
#!/usr/bin/perl
# Runs 20 children that expose "PerlToUpper" before returning the result.
use strict; use warnings;
use constant CHILDREN => 20;
use Time::HiRes qw(usleep);
use Gearman::Worker;
$|++;
my @child_pids;
for (1 .. CHILDREN) {
if (my $pid = fork) {
push @child_pids, $pid;
next;
}
eval {
my $w = Gearman::Worker->new(job_servers => '127.0.0.1:4730');
$w->register_function(PerlToUpper => sub { print "."; uc $_[0]->arg });
$w->work while 1;
};
warn $@ if $@;
exit 0;
}
$SIG{INT} = $SIG{HUP} = sub {
kill 9, @child_pids;
print "\nChildren shut down, gracefully exiting\n";
exit 0;
};
printf "Forked %d children, serving 'PerlToUpper' function to gearman\n", CHILDREN;
sleep;

View File

@ -98,6 +98,11 @@ func (a *agent) work() {
} else { } else {
leftdata = nil leftdata = nil
inpack.a = a inpack.a = a
select {
case <-a.worker.closed:
return
default:
}
a.worker.in <- inpack a.worker.in <- inpack
if len(data) == l { if len(data) == l {
break break
@ -111,6 +116,9 @@ func (a *agent) work() {
} }
func (a *agent) disconnect_error(err error) { func (a *agent) disconnect_error(err error) {
a.Lock()
defer a.Unlock()
if a.conn != nil { if a.conn != nil {
err = &WorkerDisconnectError{ err = &WorkerDisconnectError{
err: err, err: err,
@ -135,10 +143,14 @@ func (a *agent) Grab() {
a.grab() a.grab()
} }
func (a *agent) grab() { func (a *agent) grab() bool {
if a.worker.closed != nil {
return false
}
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = dtGrabJobUniq outpack.dataType = dtGrabJobUniq
a.write(outpack) a.write(outpack)
return true
} }
func (a *agent) PreSleep() { func (a *agent) PreSleep() {
@ -161,9 +173,10 @@ func (a *agent) reconnect() error {
bufio.NewWriter(a.conn)) bufio.NewWriter(a.conn))
a.worker.reRegisterFuncsForAgent(a) a.worker.reRegisterFuncsForAgent(a)
a.grab() if a.grab() {
go a.work()
}
go a.work()
return nil return nil
} }

View File

@ -4,9 +4,9 @@ package worker
import ( import (
"fmt" "fmt"
"strconv"
"sync" "sync"
"time" "time"
"strconv"
) )
const ( const (
@ -18,19 +18,22 @@ const (
// It can connect to multi-server and grab jobs. // It can connect to multi-server and grab jobs.
type Worker struct { type Worker struct {
sync.Mutex sync.Mutex
agents []*agent agents []*agent
funcs jobFuncs funcs jobFuncs
in chan *inPack in chan *inPack
running bool running bool
ready bool ready bool
jobLeftNum int64
Id string Id string
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
JobHandler JobHandler JobHandler JobHandler
limit chan bool limit chan bool
closed chan struct{}
leftJobs chan struct{}
} }
// Return a worker. // New returns a worker.
// //
// If limit is set to Unlimited(=0), the worker will grab all jobs // If limit is set to Unlimited(=0), the worker will grab all jobs
// and execute them parallelly. // and execute them parallelly.
@ -56,7 +59,7 @@ func (worker *Worker) err(e error) {
} }
} }
// Add a Gearman job server. // AddServer adds a Gearman job server.
// //
// addr should be formated as 'host:port'. // addr should be formated as 'host:port'.
func (worker *Worker) AddServer(net, addr string) (err error) { func (worker *Worker) AddServer(net, addr string) (err error) {
@ -72,11 +75,11 @@ func (worker *Worker) AddServer(net, addr string) (err error) {
// Broadcast an outpack to all Gearman server. // Broadcast an outpack to all Gearman server.
func (worker *Worker) broadcast(outpack *outPack) { func (worker *Worker) broadcast(outpack *outPack) {
for _, v := range worker.agents { for _, v := range worker.agents {
v.write(outpack) v.Write(outpack)
} }
} }
// Add a function. // AddFunc adds a function.
// Set timeout as Unlimited(=0) to disable executing timeout. // Set timeout as Unlimited(=0) to disable executing timeout.
func (worker *Worker) AddFunc(funcname string, func (worker *Worker) AddFunc(funcname string,
f JobFunc, timeout uint32) (err error) { f JobFunc, timeout uint32) (err error) {
@ -116,7 +119,7 @@ func prepFuncOutpack(funcname string, timeout uint32) *outPack {
return outpack return outpack
} }
// Remove a function. // RemoveFunc removes a function.
func (worker *Worker) RemoveFunc(funcname string) (err error) { func (worker *Worker) RemoveFunc(funcname string) (err error) {
worker.Lock() worker.Lock()
defer worker.Unlock() defer worker.Unlock()
@ -147,14 +150,20 @@ func (worker *Worker) handleInPack(inpack *inPack) {
inpack.a.Grab() inpack.a.Grab()
case dtJobAssign, dtJobAssignUniq: case dtJobAssign, dtJobAssignUniq:
go func() { go func() {
if err := worker.exec(inpack); err != nil { go func() {
worker.err(err) worker.incrExecJobNum()
defer func() {
worker.decrExecJobNum()
}()
if err := worker.exec(inpack); err != nil {
worker.err(err)
}
}()
if worker.limit != nil {
worker.limit <- true
} }
inpack.a.Grab()
}() }()
if worker.limit != nil {
worker.limit <- true
}
inpack.a.Grab()
case dtError: case dtError:
worker.err(inpack.Err()) worker.err(inpack.Err())
fallthrough fallthrough
@ -186,7 +195,7 @@ func (worker *Worker) Ready() (err error) {
return return
} }
// Main loop, block here // Work start main loop (blocking)
// Most of time, this should be evaluated in goroutine. // Most of time, this should be evaluated in goroutine.
func (worker *Worker) Work() { func (worker *Worker) Work() {
if !worker.ready { if !worker.ready {
@ -197,14 +206,29 @@ func (worker *Worker) Work() {
} }
} }
worker.Lock()
worker.running = true worker.running = true
worker.Unlock()
for _, a := range worker.agents { for _, a := range worker.agents {
a.Grab() a.Grab()
} }
// 执行任务(阻塞)
var inpack *inPack var inpack *inPack
for inpack = range worker.in { for inpack = range worker.in {
worker.handleInPack(inpack) worker.handleInPack(inpack)
} }
// 关闭Worker进程后 等待任务完成后退出
worker.Lock()
leftJobNum := int(worker.jobLeftNum)
worker.Unlock()
if worker.leftJobs != nil {
for i := 0; i < leftJobNum; i++ {
<-worker.leftJobs
}
}
worker.Reset()
worker.close()
} }
// custome handling warper // custome handling warper
@ -220,12 +244,21 @@ func (worker *Worker) customeHandler(inpack *inPack) {
func (worker *Worker) Close() { func (worker *Worker) Close() {
worker.Lock() worker.Lock()
defer worker.Unlock() defer worker.Unlock()
if worker.running == true { if worker.running == true && worker.closed == nil {
for _, a := range worker.agents { worker.closed = make(chan struct{}, 1)
a.Close() worker.closed <- struct{}{}
}
worker.running = false worker.running = false
close(worker.in) close(worker.in)
// 创建关闭后执行中的任务列表
if worker.jobLeftNum != 0 {
worker.leftJobs = make(chan struct{}, worker.jobLeftNum+int64(len(worker.in)))
}
}
}
func (worker *Worker) close() {
for _, a := range worker.agents {
a.Close()
} }
} }
@ -237,7 +270,7 @@ func (worker *Worker) Echo(data []byte) {
worker.broadcast(outpack) worker.broadcast(outpack)
} }
// Remove all of functions. // Reset removes all of functions.
// Both from the worker and job servers. // Both from the worker and job servers.
func (worker *Worker) Reset() { func (worker *Worker) Reset() {
outpack := getOutPack() outpack := getOutPack()
@ -255,6 +288,23 @@ func (worker *Worker) SetId(id string) {
worker.broadcast(outpack) worker.broadcast(outpack)
} }
func (worker *Worker) incrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum++
return worker.jobLeftNum
}
func (worker *Worker) decrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum--
if worker.jobLeftNum < 0 {
worker.jobLeftNum = 0
}
return worker.jobLeftNum
}
// inner job executing // inner job executing
func (worker *Worker) exec(inpack *inPack) (err error) { func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() { defer func() {
@ -280,22 +330,25 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
} else { } else {
r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second) r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
} }
if worker.running { //if worker.running {
outpack := getOutPack() outpack := getOutPack()
if r.err == nil { if r.err == nil {
outpack.dataType = dtWorkComplete outpack.dataType = dtWorkComplete
} else {
if len(r.data) == 0 {
outpack.dataType = dtWorkFail
} else { } else {
if len(r.data) == 0 { outpack.dataType = dtWorkException
outpack.dataType = dtWorkFail
} else {
outpack.dataType = dtWorkException
}
err = r.err
} }
outpack.handle = inpack.handle err = r.err
outpack.data = r.data
inpack.a.Write(outpack)
} }
outpack.handle = inpack.handle
outpack.data = r.data
_ = inpack.a.Write(outpack)
if worker.leftJobs != nil {
worker.leftJobs <- struct{}{}
}
//}
return return
} }
func (worker *Worker) reRegisterFuncsForAgent(a *agent) { func (worker *Worker) reRegisterFuncsForAgent(a *agent) {

View File

@ -0,0 +1,59 @@
package worker
import (
"fmt"
"sync"
"testing"
)
func TestWorkerRace(t *testing.T) {
// from example worker
// An example of worker
w := New(Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
// A function for handling jobs
foobar := func(job Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
// tear down
w.Close()
}