From 298edadfa677f0e5daa641cb14bebb633325a51f Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Wed, 25 Dec 2013 18:11:01 +0800 Subject: [PATCH] a better doc --- gearman.go | 4 +-- worker/worker.go | 78 ++++++++++++++++++++++++------------------------ 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/gearman.go b/gearman.go index 03b4ac0..44262e4 100644 --- a/gearman.go +++ b/gearman.go @@ -11,8 +11,8 @@ and getting responses from the server. import "github.com/mikespook/gearman-go/client" -The worker package will help developers in developing Gearman worker -service easily. +The worker package will help developers to develop Gearman's worker +in an easy way. import "github.com/mikespook/gearman-go/worker" */ diff --git a/worker/worker.go b/worker/worker.go index ae47fc6..bbc7c5c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,3 +1,5 @@ +// The worker package helps developers to develop Gearman's worker +// in an easy way. package worker import ( @@ -14,24 +16,8 @@ const ( Immediately = 0 ) -/* -Worker side api for gearman - -usage: -w = worker.New(worker.Unlimited) -w.AddFunction("foobar", foobar) -w.AddServer("127.0.0.1:4730") -w.Work() // Enter the worker's main loop - -The definition of the callback function 'foobar' should suit for the type 'JobFunction'. -It looks like this: - -func foobar(job *Job) (data []byte, err os.Error) { - //sth. here - //plaplapla... - return -} -*/ +// Worker is the only structure needed by worker side developing. +// It can connect to multi-server and grab jobs. type Worker struct { sync.Mutex agents []*agent @@ -39,14 +25,19 @@ type Worker struct { in chan *inPack running bool - Id string - // assign a ErrFunc to handle errors + Id string ErrorHandler ErrorHandler JobHandler JobHandler limit chan bool } -// Get a new worker +// Return a worker. +// +// If limit is set to Unlimited(=0), the worker will grab all jobs +// and execute them parallelly. +// If limit is greater than zero, the number of paralled executing +// jobs are limited under the number. If limit is assgined to +// OneByOne(=1), there will be only one job executed in a time. func New(limit int) (worker *Worker) { worker = &Worker{ agents: make([]*agent, 0, limit), @@ -59,15 +50,16 @@ func New(limit int) (worker *Worker) { return } -// +// inner error handling func (worker *Worker) err(e error) { if worker.ErrorHandler != nil { worker.ErrorHandler(e) } } -// Add a server. The addr should be 'host:port' format. -// The connection is established at this time. +// Add a Gearman job server. +// +// addr should be formated as 'host:port'. func (worker *Worker) AddServer(net, addr string) (err error) { // Create a new job server's client as a agent of server a, err := newAgent(net, addr, worker) @@ -78,9 +70,7 @@ func (worker *Worker) AddServer(net, addr string) (err error) { return } -// Write a job to job server. -// Here, the job's mean is not the oraginal mean. -// Just looks like a network package for job's result or tell job server, there was a fail. +// Broadcast an outpack to all Gearman server. func (worker *Worker) broadcast(outpack *outPack) { for _, v := range worker.agents { v.write(outpack) @@ -88,8 +78,7 @@ func (worker *Worker) broadcast(outpack *outPack) { } // Add a function. -// Plz added job servers first, then functions. -// The API will tell every connected job server that 'I can do this' +// Set timeout as Unlimited(=0) to disable executing timeout. func (worker *Worker) AddFunc(funcname string, f JobFunc, timeout uint32) (err error) { worker.Lock() @@ -104,7 +93,7 @@ func (worker *Worker) AddFunc(funcname string, return } -// inner add function +// inner add func (worker *Worker) addFunc(funcname string, timeout uint32) { outpack := getOutPack() if timeout == 0 { @@ -135,7 +124,7 @@ func (worker *Worker) RemoveFunc(funcname string) (err error) { return } -// inner remove function +// inner remove func (worker *Worker) removeFunc(funcname string) { outpack := getOutPack() outpack.dataType = CANT_DO @@ -143,6 +132,7 @@ func (worker *Worker) removeFunc(funcname string) { worker.broadcast(outpack) } +// inner package handling func (worker *Worker) handleInPack(inpack *inPack) { switch inpack.dataType { case NO_JOB: @@ -168,6 +158,8 @@ func (worker *Worker) handleInPack(inpack *inPack) { } } +// Connect to Gearman server and tell every server +// what can this worker do. func (worker *Worker) Ready() (err error) { for _, v := range worker.agents { if err = v.Connect(); err != nil { @@ -180,11 +172,17 @@ func (worker *Worker) Ready() (err error) { return } -// Main loop +// Main loop, block here +// Most of time, this should be evaluated in goroutine. func (worker *Worker) Work() { + defer func() { + for _, a := range worker.agents { + a.Close() + } + }() worker.running = true - for _, v := range worker.agents { - v.Grab() + for _, a := range worker.agents { + a.Grab() } var inpack *inPack for inpack = range worker.in { @@ -192,7 +190,7 @@ func (worker *Worker) Work() { } } -// job handler +// custome handling warper func (worker *Worker) customeHandler(inpack *inPack) { if worker.JobHandler != nil { if err := worker.JobHandler(inpack); err != nil { @@ -201,13 +199,13 @@ func (worker *Worker) customeHandler(inpack *inPack) { } } -// Close. +// Close connection and exit main loop func (worker *Worker) Close() { worker.running = false close(worker.in) } -// Send a something out, get the samething back. +// Echo func (worker *Worker) Echo(data []byte) { outpack := getOutPack() outpack.dataType = ECHO_REQ @@ -216,7 +214,7 @@ func (worker *Worker) Echo(data []byte) { } // Remove all of functions. -// Both from the worker or job servers. +// Both from the worker and job servers. func (worker *Worker) Reset() { outpack := getOutPack() outpack.dataType = RESET_ABILITIES @@ -233,7 +231,7 @@ func (worker *Worker) SetId(id string) { worker.broadcast(outpack) } -// Execute the job. And send back the result. +// inner job executing func (worker *Worker) exec(inpack *inPack) (err error) { defer func() { if worker.limit != nil { @@ -277,11 +275,13 @@ func (worker *Worker) exec(inpack *inPack) (err error) { return } +// inner result type result struct { data []byte err error } +// executing timer func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) { rslt := make(chan *result) defer close(rslt)