|
|
@@ -1,3 +1,5 @@ |
|
|
|
// The worker package helps developers to develop Gearman's worker |
|
|
|
// in an easy way. |
|
|
|
package worker |
|
|
|
|
|
|
|
import ( |
|
|
@@ -14,24 +16,8 @@ const ( |
|
|
|
Immediately = 0 |
|
|
|
) |
|
|
|
|
|
|
|
/* |
|
|
|
Worker side api for gearman |
|
|
|
|
|
|
|
usage: |
|
|
|
w = worker.New(worker.Unlimited) |
|
|
|
w.AddFunction("foobar", foobar) |
|
|
|
w.AddServer("127.0.0.1:4730") |
|
|
|
w.Work() // Enter the worker's main loop |
|
|
|
|
|
|
|
The definition of the callback function 'foobar' should suit for the type 'JobFunction'. |
|
|
|
It looks like this: |
|
|
|
|
|
|
|
func foobar(job *Job) (data []byte, err os.Error) { |
|
|
|
//sth. here |
|
|
|
//plaplapla... |
|
|
|
return |
|
|
|
} |
|
|
|
*/ |
|
|
|
// Worker is the only structure needed by worker side developing. |
|
|
|
// It can connect to multi-server and grab jobs. |
|
|
|
type Worker struct { |
|
|
|
sync.Mutex |
|
|
|
agents []*agent |
|
|
@@ -39,14 +25,19 @@ type Worker struct { |
|
|
|
in chan *inPack |
|
|
|
running bool |
|
|
|
|
|
|
|
Id string |
|
|
|
// assign a ErrFunc to handle errors |
|
|
|
Id string |
|
|
|
ErrorHandler ErrorHandler |
|
|
|
JobHandler JobHandler |
|
|
|
limit chan bool |
|
|
|
} |
|
|
|
|
|
|
|
// Get a new worker |
|
|
|
// Return a worker. |
|
|
|
// |
|
|
|
// If limit is set to Unlimited(=0), the worker will grab all jobs |
|
|
|
// and execute them parallelly. |
|
|
|
// If limit is greater than zero, the number of paralled executing |
|
|
|
// jobs are limited under the number. If limit is assgined to |
|
|
|
// OneByOne(=1), there will be only one job executed in a time. |
|
|
|
func New(limit int) (worker *Worker) { |
|
|
|
worker = &Worker{ |
|
|
|
agents: make([]*agent, 0, limit), |
|
|
@@ -59,15 +50,16 @@ func New(limit int) (worker *Worker) { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// |
|
|
|
// inner error handling |
|
|
|
func (worker *Worker) err(e error) { |
|
|
|
if worker.ErrorHandler != nil { |
|
|
|
worker.ErrorHandler(e) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Add a server. The addr should be 'host:port' format. |
|
|
|
// The connection is established at this time. |
|
|
|
// Add a Gearman job server. |
|
|
|
// |
|
|
|
// addr should be formated as 'host:port'. |
|
|
|
func (worker *Worker) AddServer(net, addr string) (err error) { |
|
|
|
// Create a new job server's client as a agent of server |
|
|
|
a, err := newAgent(net, addr, worker) |
|
|
@@ -78,9 +70,7 @@ func (worker *Worker) AddServer(net, addr string) (err error) { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// Write a job to job server. |
|
|
|
// Here, the job's mean is not the oraginal mean. |
|
|
|
// Just looks like a network package for job's result or tell job server, there was a fail. |
|
|
|
// Broadcast an outpack to all Gearman server. |
|
|
|
func (worker *Worker) broadcast(outpack *outPack) { |
|
|
|
for _, v := range worker.agents { |
|
|
|
v.write(outpack) |
|
|
@@ -88,8 +78,7 @@ func (worker *Worker) broadcast(outpack *outPack) { |
|
|
|
} |
|
|
|
|
|
|
|
// Add a function. |
|
|
|
// Plz added job servers first, then functions. |
|
|
|
// The API will tell every connected job server that 'I can do this' |
|
|
|
// Set timeout as Unlimited(=0) to disable executing timeout. |
|
|
|
func (worker *Worker) AddFunc(funcname string, |
|
|
|
f JobFunc, timeout uint32) (err error) { |
|
|
|
worker.Lock() |
|
|
@@ -104,7 +93,7 @@ func (worker *Worker) AddFunc(funcname string, |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// inner add function |
|
|
|
// inner add |
|
|
|
func (worker *Worker) addFunc(funcname string, timeout uint32) { |
|
|
|
outpack := getOutPack() |
|
|
|
if timeout == 0 { |
|
|
@@ -135,7 +124,7 @@ func (worker *Worker) RemoveFunc(funcname string) (err error) { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// inner remove function |
|
|
|
// inner remove |
|
|
|
func (worker *Worker) removeFunc(funcname string) { |
|
|
|
outpack := getOutPack() |
|
|
|
outpack.dataType = CANT_DO |
|
|
@@ -143,6 +132,7 @@ func (worker *Worker) removeFunc(funcname string) { |
|
|
|
worker.broadcast(outpack) |
|
|
|
} |
|
|
|
|
|
|
|
// inner package handling |
|
|
|
func (worker *Worker) handleInPack(inpack *inPack) { |
|
|
|
switch inpack.dataType { |
|
|
|
case NO_JOB: |
|
|
@@ -168,6 +158,8 @@ func (worker *Worker) handleInPack(inpack *inPack) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Connect to Gearman server and tell every server |
|
|
|
// what can this worker do. |
|
|
|
func (worker *Worker) Ready() (err error) { |
|
|
|
for _, v := range worker.agents { |
|
|
|
if err = v.Connect(); err != nil { |
|
|
@@ -180,11 +172,17 @@ func (worker *Worker) Ready() (err error) { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// Main loop |
|
|
|
// Main loop, block here |
|
|
|
// Most of time, this should be evaluated in goroutine. |
|
|
|
func (worker *Worker) Work() { |
|
|
|
defer func() { |
|
|
|
for _, a := range worker.agents { |
|
|
|
a.Close() |
|
|
|
} |
|
|
|
}() |
|
|
|
worker.running = true |
|
|
|
for _, v := range worker.agents { |
|
|
|
v.Grab() |
|
|
|
for _, a := range worker.agents { |
|
|
|
a.Grab() |
|
|
|
} |
|
|
|
var inpack *inPack |
|
|
|
for inpack = range worker.in { |
|
|
@@ -192,7 +190,7 @@ func (worker *Worker) Work() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// job handler |
|
|
|
// custome handling warper |
|
|
|
func (worker *Worker) customeHandler(inpack *inPack) { |
|
|
|
if worker.JobHandler != nil { |
|
|
|
if err := worker.JobHandler(inpack); err != nil { |
|
|
@@ -201,13 +199,13 @@ func (worker *Worker) customeHandler(inpack *inPack) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Close. |
|
|
|
// Close connection and exit main loop |
|
|
|
func (worker *Worker) Close() { |
|
|
|
worker.running = false |
|
|
|
close(worker.in) |
|
|
|
} |
|
|
|
|
|
|
|
// Send a something out, get the samething back. |
|
|
|
// Echo |
|
|
|
func (worker *Worker) Echo(data []byte) { |
|
|
|
outpack := getOutPack() |
|
|
|
outpack.dataType = ECHO_REQ |
|
|
@@ -216,7 +214,7 @@ func (worker *Worker) Echo(data []byte) { |
|
|
|
} |
|
|
|
|
|
|
|
// Remove all of functions. |
|
|
|
// Both from the worker or job servers. |
|
|
|
// Both from the worker and job servers. |
|
|
|
func (worker *Worker) Reset() { |
|
|
|
outpack := getOutPack() |
|
|
|
outpack.dataType = RESET_ABILITIES |
|
|
@@ -233,7 +231,7 @@ func (worker *Worker) SetId(id string) { |
|
|
|
worker.broadcast(outpack) |
|
|
|
} |
|
|
|
|
|
|
|
// Execute the job. And send back the result. |
|
|
|
// inner job executing |
|
|
|
func (worker *Worker) exec(inpack *inPack) (err error) { |
|
|
|
defer func() { |
|
|
|
if worker.limit != nil { |
|
|
@@ -277,11 +275,13 @@ func (worker *Worker) exec(inpack *inPack) (err error) { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// inner result |
|
|
|
type result struct { |
|
|
|
data []byte |
|
|
|
err error |
|
|
|
} |
|
|
|
|
|
|
|
// executing timer |
|
|
|
func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) { |
|
|
|
rslt := make(chan *result) |
|
|
|
defer close(rslt) |
|
|
|