Compare commits

..

No commits in common. "master" and "0.2" have entirely different histories.
master ... 0.2

9 changed files with 106 additions and 512 deletions

View File

@ -19,6 +19,7 @@ type Client struct {
sync.Mutex sync.Mutex
net, addr string net, addr string
respHandler *responseHandlerMap
innerHandler *responseHandlerMap innerHandler *responseHandlerMap
in chan *Response in chan *Response
conn net.Conn conn net.Conn
@ -31,16 +32,11 @@ type Client struct {
type responseHandlerMap struct { type responseHandlerMap struct {
sync.Mutex sync.Mutex
holder map[string]handledResponse holder map[string]ResponseHandler
}
type handledResponse struct {
internal ResponseHandler // internal handler, always non-nil
external ResponseHandler // handler passed in from (*Client).Do, sometimes nil
} }
func newResponseHandlerMap() *responseHandlerMap { func newResponseHandlerMap() *responseHandlerMap {
return &responseHandlerMap{holder: make(map[string]handledResponse, queueSize)} return &responseHandlerMap{holder: make(map[string]ResponseHandler, queueSize)}
} }
func (r *responseHandlerMap) remove(key string) { func (r *responseHandlerMap) remove(key string) {
@ -49,29 +45,29 @@ func (r *responseHandlerMap) remove(key string) {
r.Unlock() r.Unlock()
} }
func (r *responseHandlerMap) getAndRemove(key string) (handledResponse, bool) { func (r *responseHandlerMap) get(key string) (ResponseHandler, bool) {
r.Lock() r.Lock()
rh, b := r.holder[key] rh, b := r.holder[key]
delete(r.holder, key)
r.Unlock() r.Unlock()
return rh, b return rh, b
} }
func (r *responseHandlerMap) putWithExternalHandler(key string, internal, external ResponseHandler) { func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
r.Lock() r.Lock()
r.holder[key] = handledResponse{internal: internal, external: external} r.holder[key] = rh
r.Unlock() r.Unlock()
} }
func (r *responseHandlerMap) put(key string, rh ResponseHandler) { func (r *responseHandlerMap) putNoLock(key string, rh ResponseHandler) {
r.putWithExternalHandler(key, rh, nil) r.holder[key] = rh
} }
// New returns a client. // Return a client.
func New(network, addr string) (client *Client, err error) { func New(network, addr string) (client *Client, err error) {
client = &Client{ client = &Client{
net: network, net: network,
addr: addr, addr: addr,
respHandler: newResponseHandlerMap(),
innerHandler: newResponseHandlerMap(), innerHandler: newResponseHandlerMap(),
in: make(chan *Response, queueSize), in: make(chan *Response, queueSize),
ResponseTimeout: DefaultTimeout, ResponseTimeout: DefaultTimeout,
@ -172,26 +168,21 @@ ReadLoop:
} }
func (client *Client) processLoop() { func (client *Client) processLoop() {
rhandlers := map[string]ResponseHandler{}
for resp := range client.in { for resp := range client.in {
switch resp.DataType { switch resp.DataType {
case dtError: case dtError:
client.err(getError(resp.Data)) client.err(getError(resp.Data))
case dtStatusRes: case dtStatusRes:
client.handleInner("s"+resp.Handle, resp, nil) resp = client.handleInner("s"+resp.Handle, resp)
case dtJobCreated: case dtJobCreated:
client.handleInner("c", resp, rhandlers) resp = client.handleInner("c", resp)
case dtEchoRes: case dtEchoRes:
client.handleInner("e", resp, nil) resp = client.handleInner("e", resp)
case dtWorkData, dtWorkWarning, dtWorkStatus: case dtWorkData, dtWorkWarning, dtWorkStatus:
if cb := rhandlers[resp.Handle]; cb != nil { resp = client.handleResponse(resp.Handle, resp)
cb(resp)
}
case dtWorkComplete, dtWorkFail, dtWorkException: case dtWorkComplete, dtWorkFail, dtWorkException:
if cb := rhandlers[resp.Handle]; cb != nil { client.handleResponse(resp.Handle, resp)
cb(resp) client.respHandler.remove(resp.Handle)
delete(rhandlers, resp.Handle)
}
} }
} }
} }
@ -202,13 +193,21 @@ func (client *Client) err(e error) {
} }
} }
func (client *Client) handleInner(key string, resp *Response, rhandlers map[string]ResponseHandler) { func (client *Client) handleResponse(key string, resp *Response) *Response {
if h, ok := client.innerHandler.getAndRemove(key); ok { if h, ok := client.respHandler.get(key); ok {
if h.external != nil && resp.Handle != "" { h(resp)
rhandlers[resp.Handle] = h.external return nil
}
h.internal(resp)
} }
return resp
}
func (client *Client) handleInner(key string, resp *Response) *Response {
if h, ok := client.innerHandler.get(key); ok {
h(resp)
client.innerHandler.remove(key)
return nil
}
return resp
} }
type handleOrError struct { type handleOrError struct {
@ -217,17 +216,14 @@ type handleOrError struct {
} }
func (client *Client) do(funcname string, data []byte, func (client *Client) do(funcname string, data []byte,
flag uint32, h ResponseHandler, id string) (handle string, err error) { flag uint32) (handle string, err error) {
if len(id) == 0 {
return "", ErrInvalidId
}
if client.conn == nil { if client.conn == nil {
return "", ErrLostConn return "", ErrLostConn
} }
var result = make(chan handleOrError, 1) var result = make(chan handleOrError, 1)
client.Lock() client.Lock()
defer client.Unlock() defer client.Unlock()
client.innerHandler.putWithExternalHandler("c", func(resp *Response) { client.innerHandler.put("c", func(resp *Response) {
if resp.DataType == dtError { if resp.DataType == dtError {
err = getError(resp.Data) err = getError(resp.Data)
result <- handleOrError{"", err} result <- handleOrError{"", err}
@ -235,7 +231,8 @@ func (client *Client) do(funcname string, data []byte,
} }
handle = resp.Handle handle = resp.Handle
result <- handleOrError{handle, nil} result <- handleOrError{handle, nil}
}, h) })
id := IdGen.Id()
req := getJob(id, []byte(funcname), data) req := getJob(id, []byte(funcname), data)
req.DataType = flag req.DataType = flag
if err = client.write(req); err != nil { if err = client.write(req); err != nil {
@ -257,7 +254,22 @@ func (client *Client) do(funcname string, data []byte,
// flag can be set to: JobLow, JobNormal and JobHigh // flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) Do(funcname string, data []byte, func (client *Client) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (handle string, err error) { flag byte, h ResponseHandler) (handle string, err error) {
handle, err = client.DoWithId(funcname, data, flag, h, IdGen.Id()) var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
client.respHandler.Lock()
defer client.respHandler.Unlock()
handle, err = client.do(funcname, data, datatype)
if err == nil && h != nil {
client.respHandler.putNoLock(handle, h)
}
return return
} }
@ -265,11 +277,23 @@ func (client *Client) Do(funcname string, data []byte,
// flag can be set to: JobLow, JobNormal and JobHigh // flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBg(funcname string, data []byte, func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) { flag byte) (handle string, err error) {
handle, err = client.DoBgWithId(funcname, data, flag, IdGen.Id()) if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype)
return return
} }
// Status gets job status from job server. // Get job status from job server.
func (client *Client) Status(handle string) (status *Status, err error) { func (client *Client) Status(handle string) (status *Status, err error) {
if client.conn == nil { if client.conn == nil {
return nil, ErrLostConn return nil, ErrLostConn
@ -321,40 +345,3 @@ func (client *Client) Close() (err error) {
} }
return return
} }
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoWithId(funcname string, data []byte,
flag byte, h ResponseHandler, id string) (handle string, err error) {
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype, h, id)
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBgWithId(funcname string, data []byte,
flag byte, id string) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype, nil, id)
return
}

View File

@ -1,14 +1,9 @@
package client package client
import ( import (
"crypto/md5"
"encoding/hex"
"errors"
"flag" "flag"
"fmt"
"os" "os"
"testing" "testing"
"time"
) )
const ( const (
@ -22,7 +17,6 @@ var (
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)") integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)")
flag.Parse()
if integrationsTestFlag != nil { if integrationsTestFlag != nil {
runIntegrationTests = *integrationsTestFlag runIntegrationTests = *integrationsTestFlag
} }
@ -75,42 +69,6 @@ func TestClientDoBg(t *testing.T) {
} }
} }
func TestClientDoBgWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoBgWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
data := []byte("abcdef")
id := ""
_, err := client.DoBgWithId("ToUpper", data, JobLow, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDo(t *testing.T) { func TestClientDo(t *testing.T) {
if !runIntegrationTests { if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration") t.Skip("To run this test, use: go test -integration")
@ -137,198 +95,6 @@ func TestClientDo(t *testing.T) {
} }
} }
func TestClientDoWithId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
hash := md5.Sum(data)
id := hex.EncodeToString(hash[:])
handle, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
func TestClientDoWithIdFailsIfNoId(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
data := []byte("abcdef")
id := ""
_, err := client.DoWithId("ToUpper", data,
JobLow, jobHandler, id)
if err == nil {
t.Error("Expecting error")
return
}
if err.Error() != "Invalid ID" {
t.Error(fmt.Sprintf("Expecting \"Invalid ID\" error, got %s.", err.Error()))
return
}
}
func TestClientDoWithIdCheckSameHandle(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123,categoryId:1}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 != handle2 {
t.Error("expecting the same handle when using the same id on the same Job name")
}
}
func TestClientDoWithIdCheckDifferentHandleOnDifferentJobs(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
jobHandler := func(job *Response) {
return
}
data := []byte("{productId:123}")
id := "123"
handle1, err := client.DoWithId("PublishProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle1 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle1)
}
handle2, err := client.DoWithId("DeleteProduct", data,
JobLow, jobHandler, id)
if err != nil {
t.Error(err)
return
}
if handle2 == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle2)
}
if handle1 == handle2 {
t.Error("expecting different handles because there are different job names")
}
}
func TestClientMultiDo(t *testing.T) {
if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration")
}
// This integration test requires that examples/pl/worker_multi.pl be running.
//
// Test invocation is:
// go test -integration -timeout 10s -run '^TestClient(AddServer|MultiDo)$'
//
// Send 1000 requests to go through all race conditions
const nreqs = 1000
errCh := make(chan error)
gotCh := make(chan string, nreqs)
olderrh := client.ErrorHandler
client.ErrorHandler = func(e error) { errCh <- e }
client.ResponseTimeout = 5 * time.Second
defer func() { client.ErrorHandler = olderrh }()
nextJobCh := make(chan struct{})
defer close(nextJobCh)
go func() {
for range nextJobCh {
start := time.Now()
handle, err := client.Do("PerlToUpper", []byte("abcdef"), JobNormal, func(r *Response) { gotCh <- string(r.Data) })
if err == ErrLostConn && time.Since(start) > client.ResponseTimeout {
errCh <- errors.New("Impossible 'lost conn', deadlock bug detected")
} else if err != nil {
errCh <- err
}
if handle == "" {
errCh <- errors.New("Handle is empty.")
}
}
}()
for i := 0; i < nreqs; i++ {
select {
case err := <-errCh:
t.Fatal(err)
case nextJobCh <- struct{}{}:
}
}
remaining := nreqs
for remaining > 0 {
select {
case err := <-errCh:
t.Fatal(err)
case got := <-gotCh:
if got != "ABCDEF" {
t.Error("Unexpected response from PerlDoUpper: ", got)
}
remaining--
t.Logf("%d response remaining", remaining)
}
}
}
func TestClientStatus(t *testing.T) { func TestClientStatus(t *testing.T) {
if !runIntegrationTests { if !runIntegrationTests {
t.Skip("To run this test, use: go test -integration") t.Skip("To run this test, use: go test -integration")

View File

@ -9,7 +9,6 @@ import (
var ( var (
ErrWorkWarning = errors.New("Work warning") ErrWorkWarning = errors.New("Work warning")
ErrInvalidData = errors.New("Invalid data") ErrInvalidData = errors.New("Invalid data")
ErrInvalidId = errors.New("Invalid ID")
ErrWorkFail = errors.New("Work fail") ErrWorkFail = errors.New("Work fail")
ErrWorkException = errors.New("Work exeption") ErrWorkException = errors.New("Work exeption")
ErrDataType = errors.New("Invalid data type") ErrDataType = errors.New("Invalid data type")

View File

@ -32,7 +32,7 @@ func (ai *autoincId) Id() string {
return strconv.FormatInt(next, 10) return strconv.FormatInt(next, 10)
} }
// NewAutoIncId returns an autoincrement ID generator // Return an autoincrement ID generator
func NewAutoIncId() IdGenerator { func NewAutoIncId() IdGenerator {
// we'll consider the nano fraction of a second at startup unique // we'll consider the nano fraction of a second at startup unique
// and count up from there. // and count up from there.

View File

@ -57,7 +57,7 @@ type Pool struct {
mutex sync.Mutex mutex sync.Mutex
} }
// NewPool returns a new pool. // Return a new pool.
func NewPool() (pool *Pool) { func NewPool() (pool *Pool) {
return &Pool{ return &Pool{
Clients: make(map[string]*PoolClient, poolSize), Clients: make(map[string]*PoolClient, poolSize),
@ -111,7 +111,7 @@ func (pool *Pool) DoBg(funcname string, data []byte,
return return
} }
// Status gets job status from job server. // Get job status from job server.
// !!!Not fully tested.!!! // !!!Not fully tested.!!!
func (pool *Pool) Status(addr, handle string) (status *Status, err error) { func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
if client, ok := pool.Clients[addr]; ok { if client, ok := pool.Clients[addr]; ok {

View File

@ -1,33 +0,0 @@
#!/usr/bin/perl
# Runs 20 children that expose "PerlToUpper" before returning the result.
use strict; use warnings;
use constant CHILDREN => 20;
use Time::HiRes qw(usleep);
use Gearman::Worker;
$|++;
my @child_pids;
for (1 .. CHILDREN) {
if (my $pid = fork) {
push @child_pids, $pid;
next;
}
eval {
my $w = Gearman::Worker->new(job_servers => '127.0.0.1:4730');
$w->register_function(PerlToUpper => sub { print "."; uc $_[0]->arg });
$w->work while 1;
};
warn $@ if $@;
exit 0;
}
$SIG{INT} = $SIG{HUP} = sub {
kill 9, @child_pids;
print "\nChildren shut down, gracefully exiting\n";
exit 0;
};
printf "Forked %d children, serving 'PerlToUpper' function to gearman\n", CHILDREN;
sleep;

View File

@ -98,11 +98,6 @@ func (a *agent) work() {
} else { } else {
leftdata = nil leftdata = nil
inpack.a = a inpack.a = a
select {
case <-a.worker.closed:
return
default:
}
a.worker.in <- inpack a.worker.in <- inpack
if len(data) == l { if len(data) == l {
break break
@ -116,9 +111,6 @@ func (a *agent) work() {
} }
func (a *agent) disconnect_error(err error) { func (a *agent) disconnect_error(err error) {
a.Lock()
defer a.Unlock()
if a.conn != nil { if a.conn != nil {
err = &WorkerDisconnectError{ err = &WorkerDisconnectError{
err: err, err: err,
@ -143,14 +135,10 @@ func (a *agent) Grab() {
a.grab() a.grab()
} }
func (a *agent) grab() bool { func (a *agent) grab() {
if a.worker.closed != nil {
return false
}
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = dtGrabJobUniq outpack.dataType = dtGrabJobUniq
a.write(outpack) a.write(outpack)
return true
} }
func (a *agent) PreSleep() { func (a *agent) PreSleep() {
@ -173,10 +161,9 @@ func (a *agent) reconnect() error {
bufio.NewWriter(a.conn)) bufio.NewWriter(a.conn))
a.worker.reRegisterFuncsForAgent(a) a.worker.reRegisterFuncsForAgent(a)
if a.grab() { a.grab()
go a.work()
}
go a.work()
return nil return nil
} }

View File

@ -4,9 +4,9 @@ package worker
import ( import (
"fmt" "fmt"
"strconv"
"sync" "sync"
"time" "time"
"strconv"
) )
const ( const (
@ -18,22 +18,19 @@ const (
// It can connect to multi-server and grab jobs. // It can connect to multi-server and grab jobs.
type Worker struct { type Worker struct {
sync.Mutex sync.Mutex
agents []*agent agents []*agent
funcs jobFuncs funcs jobFuncs
in chan *inPack in chan *inPack
running bool running bool
ready bool ready bool
jobLeftNum int64
Id string Id string
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
JobHandler JobHandler JobHandler JobHandler
limit chan bool limit chan bool
closed chan struct{}
leftJobs chan struct{}
} }
// New returns a worker. // Return a worker.
// //
// If limit is set to Unlimited(=0), the worker will grab all jobs // If limit is set to Unlimited(=0), the worker will grab all jobs
// and execute them parallelly. // and execute them parallelly.
@ -59,7 +56,7 @@ func (worker *Worker) err(e error) {
} }
} }
// AddServer adds a Gearman job server. // Add a Gearman job server.
// //
// addr should be formated as 'host:port'. // addr should be formated as 'host:port'.
func (worker *Worker) AddServer(net, addr string) (err error) { func (worker *Worker) AddServer(net, addr string) (err error) {
@ -75,11 +72,11 @@ func (worker *Worker) AddServer(net, addr string) (err error) {
// Broadcast an outpack to all Gearman server. // Broadcast an outpack to all Gearman server.
func (worker *Worker) broadcast(outpack *outPack) { func (worker *Worker) broadcast(outpack *outPack) {
for _, v := range worker.agents { for _, v := range worker.agents {
v.Write(outpack) v.write(outpack)
} }
} }
// AddFunc adds a function. // Add a function.
// Set timeout as Unlimited(=0) to disable executing timeout. // Set timeout as Unlimited(=0) to disable executing timeout.
func (worker *Worker) AddFunc(funcname string, func (worker *Worker) AddFunc(funcname string,
f JobFunc, timeout uint32) (err error) { f JobFunc, timeout uint32) (err error) {
@ -119,7 +116,7 @@ func prepFuncOutpack(funcname string, timeout uint32) *outPack {
return outpack return outpack
} }
// RemoveFunc removes a function. // Remove a function.
func (worker *Worker) RemoveFunc(funcname string) (err error) { func (worker *Worker) RemoveFunc(funcname string) (err error) {
worker.Lock() worker.Lock()
defer worker.Unlock() defer worker.Unlock()
@ -150,20 +147,14 @@ func (worker *Worker) handleInPack(inpack *inPack) {
inpack.a.Grab() inpack.a.Grab()
case dtJobAssign, dtJobAssignUniq: case dtJobAssign, dtJobAssignUniq:
go func() { go func() {
go func() { if err := worker.exec(inpack); err != nil {
worker.incrExecJobNum() worker.err(err)
defer func() {
worker.decrExecJobNum()
}()
if err := worker.exec(inpack); err != nil {
worker.err(err)
}
}()
if worker.limit != nil {
worker.limit <- true
} }
inpack.a.Grab()
}() }()
if worker.limit != nil {
worker.limit <- true
}
inpack.a.Grab()
case dtError: case dtError:
worker.err(inpack.Err()) worker.err(inpack.Err())
fallthrough fallthrough
@ -195,7 +186,7 @@ func (worker *Worker) Ready() (err error) {
return return
} }
// Work start main loop (blocking) // Main loop, block here
// Most of time, this should be evaluated in goroutine. // Most of time, this should be evaluated in goroutine.
func (worker *Worker) Work() { func (worker *Worker) Work() {
if !worker.ready { if !worker.ready {
@ -206,29 +197,14 @@ func (worker *Worker) Work() {
} }
} }
worker.Lock()
worker.running = true worker.running = true
worker.Unlock()
for _, a := range worker.agents { for _, a := range worker.agents {
a.Grab() a.Grab()
} }
// 执行任务(阻塞)
var inpack *inPack var inpack *inPack
for inpack = range worker.in { for inpack = range worker.in {
worker.handleInPack(inpack) worker.handleInPack(inpack)
} }
// 关闭Worker进程后 等待任务完成后退出
worker.Lock()
leftJobNum := int(worker.jobLeftNum)
worker.Unlock()
if worker.leftJobs != nil {
for i := 0; i < leftJobNum; i++ {
<-worker.leftJobs
}
}
worker.Reset()
worker.close()
} }
// custome handling warper // custome handling warper
@ -244,21 +220,12 @@ func (worker *Worker) customeHandler(inpack *inPack) {
func (worker *Worker) Close() { func (worker *Worker) Close() {
worker.Lock() worker.Lock()
defer worker.Unlock() defer worker.Unlock()
if worker.running == true && worker.closed == nil { if worker.running == true {
worker.closed = make(chan struct{}, 1) for _, a := range worker.agents {
worker.closed <- struct{}{} a.Close()
}
worker.running = false worker.running = false
close(worker.in) close(worker.in)
// 创建关闭后执行中的任务列表
if worker.jobLeftNum != 0 {
worker.leftJobs = make(chan struct{}, worker.jobLeftNum+int64(len(worker.in)))
}
}
}
func (worker *Worker) close() {
for _, a := range worker.agents {
a.Close()
} }
} }
@ -270,7 +237,7 @@ func (worker *Worker) Echo(data []byte) {
worker.broadcast(outpack) worker.broadcast(outpack)
} }
// Reset removes all of functions. // Remove all of functions.
// Both from the worker and job servers. // Both from the worker and job servers.
func (worker *Worker) Reset() { func (worker *Worker) Reset() {
outpack := getOutPack() outpack := getOutPack()
@ -288,23 +255,6 @@ func (worker *Worker) SetId(id string) {
worker.broadcast(outpack) worker.broadcast(outpack)
} }
func (worker *Worker) incrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum++
return worker.jobLeftNum
}
func (worker *Worker) decrExecJobNum() int64 {
worker.Lock()
defer worker.Unlock()
worker.jobLeftNum--
if worker.jobLeftNum < 0 {
worker.jobLeftNum = 0
}
return worker.jobLeftNum
}
// inner job executing // inner job executing
func (worker *Worker) exec(inpack *inPack) (err error) { func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() { defer func() {
@ -330,25 +280,22 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
} else { } else {
r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second) r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
} }
//if worker.running { if worker.running {
outpack := getOutPack() outpack := getOutPack()
if r.err == nil { if r.err == nil {
outpack.dataType = dtWorkComplete outpack.dataType = dtWorkComplete
} else {
if len(r.data) == 0 {
outpack.dataType = dtWorkFail
} else { } else {
outpack.dataType = dtWorkException if len(r.data) == 0 {
outpack.dataType = dtWorkFail
} else {
outpack.dataType = dtWorkException
}
err = r.err
} }
err = r.err outpack.handle = inpack.handle
outpack.data = r.data
inpack.a.Write(outpack)
} }
outpack.handle = inpack.handle
outpack.data = r.data
_ = inpack.a.Write(outpack)
if worker.leftJobs != nil {
worker.leftJobs <- struct{}{}
}
//}
return return
} }
func (worker *Worker) reRegisterFuncsForAgent(a *agent) { func (worker *Worker) reRegisterFuncsForAgent(a *agent) {

View File

@ -1,59 +0,0 @@
package worker
import (
"fmt"
"sync"
"testing"
)
func TestWorkerRace(t *testing.T) {
// from example worker
// An example of worker
w := New(Unlimited)
defer w.Close()
// Add a gearman job server
if err := w.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
// A function for handling jobs
foobar := func(job Job) ([]byte, error) {
// Do nothing here
return nil, nil
}
// Add the function to worker
if err := w.AddFunc("foobar", foobar, 0); err != nil {
fmt.Println(err)
return
}
var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job Job) error {
if job.Err() == nil {
fmt.Println(string(job.Data()))
} else {
fmt.Println(job.Err())
}
wg.Done()
return nil
}
// An error handler for handling worker's internal errors.
w.ErrorHandler = func(e error) {
fmt.Println(e)
// Ignore the error or shutdown the worker
}
// Tell Gearman job server: I'm ready!
if err := w.Ready(); err != nil {
fmt.Println(err)
return
}
// Running main loop
go w.Work()
wg.Add(1)
// calling Echo
w.Echo([]byte("Hello"))
// Waiting results
wg.Wait()
// tear down
w.Close()
}