@@ -1,4 +1,7 @@ | |||||
language: go | language: go | ||||
go: | |||||
- 1.2 | - 1.2 | ||||
before_install: | before_install: | ||||
- sudo apt-get install -qq gearman-job-server | |||||
- sudo apt-get remove -y gearman-job-server | |||||
- sudo apt-get install -y gearman-job-server |
@@ -3,10 +3,12 @@ package worker | |||||
import ( | import ( | ||||
"io" | "io" | ||||
"net" | "net" | ||||
"sync" | |||||
) | ) | ||||
// The agent of job server. | // The agent of job server. | ||||
type agent struct { | type agent struct { | ||||
sync.Mutex | |||||
conn net.Conn | conn net.Conn | ||||
worker *Worker | worker *Worker | ||||
in chan []byte | in chan []byte | ||||
@@ -25,6 +27,8 @@ func newAgent(net, addr string, worker *Worker) (a *agent, err error) { | |||||
} | } | ||||
func (a *agent) Connect() (err error) { | func (a *agent) Connect() (err error) { | ||||
a.Lock() | |||||
defer a.Unlock() | |||||
a.conn, err = net.Dial(a.net, a.addr) | a.conn, err = net.Dial(a.net, a.addr) | ||||
if err != nil { | if err != nil { | ||||
return | return | ||||
@@ -40,11 +44,18 @@ func (a *agent) work() { | |||||
var data, leftdata []byte | var data, leftdata []byte | ||||
for { | for { | ||||
if data, err = a.read(BUFFER_SIZE); err != nil { | if data, err = a.read(BUFFER_SIZE); err != nil { | ||||
if err == ErrConnClosed { | |||||
a.worker.err(err) | |||||
if err == ErrLostConn { | |||||
break | |||||
} | |||||
// If it is unexpected error and the connection wasn't | |||||
// closed by Gearmand, the agent should colse the conection | |||||
// and reconnect to job server. | |||||
a.conn, err = net.Dial(a.net, a.addr) | |||||
if err != nil { | |||||
a.worker.err(err) | |||||
break | break | ||||
} | } | ||||
a.worker.err(err) | |||||
continue | |||||
} | } | ||||
if len(leftdata) > 0 { // some data left for processing | if len(leftdata) > 0 { // some data left for processing | ||||
data = append(leftdata, data...) | data = append(leftdata, data...) | ||||
@@ -67,16 +78,25 @@ func (a *agent) work() { | |||||
} | } | ||||
func (a *agent) Close() { | func (a *agent) Close() { | ||||
a.conn.Close() | |||||
a.Lock() | |||||
defer a.Unlock() | |||||
if a.conn != nil { | |||||
a.conn.Close() | |||||
a.conn = nil | |||||
} | |||||
} | } | ||||
func (a *agent) Grab() { | func (a *agent) Grab() { | ||||
a.Lock() | |||||
defer a.Unlock() | |||||
outpack := getOutPack() | outpack := getOutPack() | ||||
outpack.dataType = GRAB_JOB_UNIQ | outpack.dataType = GRAB_JOB_UNIQ | ||||
a.write(outpack) | a.write(outpack) | ||||
} | } | ||||
func (a *agent) PreSleep() { | func (a *agent) PreSleep() { | ||||
a.Lock() | |||||
defer a.Unlock() | |||||
outpack := getOutPack() | outpack := getOutPack() | ||||
outpack.dataType = PRE_SLEEP | outpack.dataType = PRE_SLEEP | ||||
a.write(outpack) | a.write(outpack) | ||||
@@ -89,12 +109,8 @@ func (a *agent) read(length int) (data []byte, err error) { | |||||
// read until data can be unpacked | // read until data can be unpacked | ||||
for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n { | for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n { | ||||
if n, err = a.conn.Read(buf); err != nil { | if n, err = a.conn.Read(buf); err != nil { | ||||
if err == io.EOF && n == 0 { | |||||
if data == nil { | |||||
err = ErrConnection | |||||
} else { | |||||
err = ErrConnClosed | |||||
} | |||||
if err == io.EOF { | |||||
err = ErrLostConn | |||||
} | } | ||||
return | return | ||||
} | } | ||||
@@ -7,20 +7,11 @@ import ( | |||||
) | ) | ||||
var ( | var ( | ||||
ErrJobTimeOut = errors.New("Do a job time out") | |||||
ErrInvalidData = errors.New("Invalid data") | |||||
ErrWorkWarning = errors.New("Work warning") | |||||
ErrWorkFail = errors.New("Work fail") | |||||
ErrWorkException = errors.New("Work exeption") | |||||
ErrDataType = errors.New("Invalid data type") | |||||
ErrOutOfCap = errors.New("Out of the capability") | |||||
ErrNotConn = errors.New("Did not connect to job server") | |||||
ErrFuncNotFound = errors.New("The function was not found") | |||||
ErrConnection = errors.New("Connection error") | |||||
ErrNoActiveAgent = errors.New("No active agent") | |||||
ErrTimeOut = errors.New("Executing time out") | |||||
ErrUnknown = errors.New("Unknown error") | |||||
ErrConnClosed = errors.New("Connection closed") | |||||
ErrNoneAgents = errors.New("None active agents") | |||||
ErrNoneFuncs = errors.New("None functions") | |||||
ErrTimeOut = errors.New("Executing time out") | |||||
ErrUnknown = errors.New("Unknown error") | |||||
ErrLostConn = errors.New("Lost connection with Gearmand") | |||||
) | ) | ||||
// Extract the error message | // Extract the error message | ||||
@@ -0,0 +1,57 @@ | |||||
package worker_test | |||||
import ( | |||||
"fmt" | |||||
"github.com/mikespook/gearman-go/worker" | |||||
"sync" | |||||
) | |||||
func ExampleWorker() { | |||||
// An example of worker | |||||
w := worker.New(worker.Unlimited) | |||||
defer w.Close() | |||||
// Add a gearman job server | |||||
if err := w.AddServer("tcp4", "127.0.0.1:4730"); err != nil { | |||||
fmt.Println(err) | |||||
return | |||||
} | |||||
// A function for handling jobs | |||||
foobar := func(job worker.Job) ([]byte, error) { | |||||
// Do nothing here | |||||
return nil, nil | |||||
} | |||||
// Add the function to worker | |||||
if err := w.AddFunc("foobar", foobar, 0); err != nil { | |||||
fmt.Println(err) | |||||
return | |||||
} | |||||
var wg sync.WaitGroup | |||||
// A custome handler, for handling other results, eg. ECHO, ERROR. | |||||
w.JobHandler = func(job worker.Job) error { | |||||
if job.Err() == nil { | |||||
fmt.Println(string(job.Data())) | |||||
} else { | |||||
fmt.Println(job.Err()) | |||||
} | |||||
wg.Done() | |||||
return nil | |||||
} | |||||
// An error handler for handling worker's internal errors. | |||||
w.ErrorHandler = func(e error) { | |||||
fmt.Println(e) | |||||
// Ignore the error or shutdown the worker | |||||
} | |||||
// Tell Gearman job server: I'm ready! | |||||
if err := w.Ready(); err != nil { | |||||
fmt.Println(err) | |||||
return | |||||
} | |||||
// Running main loop | |||||
go w.Work() | |||||
wg.Add(1) | |||||
// calling Echo | |||||
w.Echo([]byte("Hello")) | |||||
// Waiting results | |||||
wg.Wait() | |||||
// Output: Hello | |||||
} |
@@ -24,6 +24,13 @@ func (inpack *inPack) Data() []byte { | |||||
return inpack.data | return inpack.data | ||||
} | } | ||||
func (inpack *inPack) Err() error { | |||||
if inpack.dataType == ERROR { | |||||
return GetError(inpack.data) | |||||
} | |||||
return nil | |||||
} | |||||
// Send some datas to client. | // Send some datas to client. | ||||
// Using this in a job's executing. | // Using this in a job's executing. | ||||
func (inpack *inPack) SendData(data []byte) { | func (inpack *inPack) SendData(data []byte) { | ||||
@@ -1,6 +1,7 @@ | |||||
package worker | package worker | ||||
type Job interface { | type Job interface { | ||||
Err() error | |||||
Data() []byte | Data() []byte | ||||
SendWarning(data []byte) | SendWarning(data []byte) | ||||
SendData(data []byte) | SendData(data []byte) | ||||
@@ -10,10 +10,10 @@ import ( | |||||
) | ) | ||||
const ( | const ( | ||||
Unlimited = 0 | |||||
OneByOne = 1 | |||||
Unlimited = iota | |||||
OneByOne | |||||
Immediately = 0 | |||||
Immediately = iota | |||||
) | ) | ||||
// Worker is the only structure needed by worker side developing. | // Worker is the only structure needed by worker side developing. | ||||
@@ -139,8 +139,6 @@ func (worker *Worker) handleInPack(inpack *inPack) { | |||||
inpack.a.PreSleep() | inpack.a.PreSleep() | ||||
case NOOP: | case NOOP: | ||||
inpack.a.Grab() | inpack.a.Grab() | ||||
case ERROR: | |||||
worker.err(GetError(inpack.data)) | |||||
case JOB_ASSIGN, JOB_ASSIGN_UNIQ: | case JOB_ASSIGN, JOB_ASSIGN_UNIQ: | ||||
go func() { | go func() { | ||||
if err := worker.exec(inpack); err != nil { | if err := worker.exec(inpack); err != nil { | ||||
@@ -151,6 +149,9 @@ func (worker *Worker) handleInPack(inpack *inPack) { | |||||
worker.limit <- true | worker.limit <- true | ||||
} | } | ||||
inpack.a.Grab() | inpack.a.Grab() | ||||
case ERROR: | |||||
worker.err(inpack.Err()) | |||||
fallthrough | |||||
case ECHO_RES: | case ECHO_RES: | ||||
fallthrough | fallthrough | ||||
default: | default: | ||||
@@ -161,6 +162,12 @@ func (worker *Worker) handleInPack(inpack *inPack) { | |||||
// Connect to Gearman server and tell every server | // Connect to Gearman server and tell every server | ||||
// what can this worker do. | // what can this worker do. | ||||
func (worker *Worker) Ready() (err error) { | func (worker *Worker) Ready() (err error) { | ||||
if len(worker.agents) == 0 { | |||||
return ErrNoneAgents | |||||
} | |||||
if len(worker.funcs) == 0 { | |||||
return ErrNoneFuncs | |||||
} | |||||
for _, v := range worker.agents { | for _, v := range worker.agents { | ||||
if err = v.Connect(); err != nil { | if err = v.Connect(); err != nil { | ||||
return | return | ||||
@@ -201,8 +208,12 @@ func (worker *Worker) customeHandler(inpack *inPack) { | |||||
// Close connection and exit main loop | // Close connection and exit main loop | ||||
func (worker *Worker) Close() { | func (worker *Worker) Close() { | ||||
worker.running = false | |||||
close(worker.in) | |||||
worker.Lock() | |||||
worker.Unlock() | |||||
if worker.running == true { | |||||
worker.running = false | |||||
close(worker.in) | |||||
} | |||||
} | } | ||||
// Echo | // Echo | ||||
@@ -11,6 +11,13 @@ func init() { | |||||
worker = New(Unlimited) | worker = New(Unlimited) | ||||
} | } | ||||
func TestWorkerErrNoneAgents(t *testing.T) { | |||||
err := worker.Ready() | |||||
if err != ErrNoneAgents { | |||||
t.Error("ErrNoneAgents expected.") | |||||
} | |||||
} | |||||
func TestWorkerAddServer(t *testing.T) { | func TestWorkerAddServer(t *testing.T) { | ||||
t.Log("Add local server 127.0.0.1:4730.") | t.Log("Add local server 127.0.0.1:4730.") | ||||
if err := worker.AddServer("tcp4", "127.0.0.1:4730"); err != nil { | if err := worker.AddServer("tcp4", "127.0.0.1:4730"); err != nil { | ||||
@@ -23,6 +30,13 @@ func TestWorkerAddServer(t *testing.T) { | |||||
} | } | ||||
} | } | ||||
func TestWorkerErrNoneFuncs(t *testing.T) { | |||||
err := worker.Ready() | |||||
if err != ErrNoneFuncs { | |||||
t.Error("ErrNoneFuncs expected.") | |||||
} | |||||
} | |||||
func foobar(job Job) ([]byte, error) { | func foobar(job Job) ([]byte, error) { | ||||
return nil, nil | return nil, nil | ||||
} | } | ||||