diff --git a/example/worker.go b/example/worker.go index 3247195..d9b4759 100644 --- a/example/worker.go +++ b/example/worker.go @@ -16,7 +16,10 @@ func ToUpper(job *worker.WorkerJob) ([]byte, error) { } func main() { - w := worker.New() + w := worker.New(worker.Unlimit) + w.ErrFunc = func(e error) { + log.Println(e) + } w.AddServer("127.0.0.1:4730") w.AddFunction("ToUpper", ToUpper, 0) w.AddFunction("ToUpperTimeOut5", ToUpper, 5) @@ -44,6 +47,7 @@ func main() { } log.Println(string(job.Data)) case "quit": + os.Exit(0) return case "result": job := <-w.JobQueue diff --git a/gearman/client/client.go b/gearman/client/client.go index 2ed069a..156bb8f 100644 --- a/gearman/client/client.go +++ b/gearman/client/client.go @@ -25,28 +25,28 @@ usage: type Client struct { mutex sync.Mutex conn net.Conn - JobQueue chan *ClientJob incoming chan []byte + + JobQueue chan *ClientJob UId uint32 } // Create a new client. func New() (client *Client) { - client = &Client{JobQueue: make(chan *ClientJob, gearman.QUEUE_CAP), + return &Client{ + JobQueue: make(chan *ClientJob, gearman.QUEUE_CAP), incoming: make(chan []byte, gearman.QUEUE_CAP), - UId: 1} - return + UId:1} } // Add a server. // In this version, one client connect to one job server. // Sample is better. Plz do the load balancing by your self. func (client *Client) AddServer(addr string) (err error) { - conn, err := net.Dial(gearman.TCP, addr) + client.conn, err = net.Dial(gearman.TCP, addr) if err != nil { return } - client.conn = conn return } diff --git a/gearman/worker/jobagent.go b/gearman/worker/jobagent.go index 118c1a5..0aee90c 100644 --- a/gearman/worker/jobagent.go +++ b/gearman/worker/jobagent.go @@ -82,12 +82,12 @@ func (agent *jobAgent) Work() { } rel, err := agent.read() if err != nil { - agent.worker.ErrQueue <- err + agent.worker.err(err) continue } job, err := DecodeWorkerJob(rel) if err != nil { - agent.worker.ErrQueue <- err + agent.worker.err(err) continue } else { switch job.DataType { diff --git a/gearman/worker/worker.go b/gearman/worker/worker.go index aa5e03b..ca4589f 100644 --- a/gearman/worker/worker.go +++ b/gearman/worker/worker.go @@ -10,20 +10,27 @@ import ( "sync" ) +const ( + Unlimit = 0 + OneByOne = 1 +) + // The definition of the callback function. type JobFunction func(job *WorkerJob) ([]byte, error) // Map for added function. type JobFunctionMap map[string]JobFunction +// Error Function +type ErrFunc func(e error) /* Worker side api for gearman. usage: - worker = NewWorker() - worker.AddFunction("foobar", foobar) - worker.AddServer("127.0.0.1:4730") - worker.Work() // Enter the worker's main loop + w = worker.New(worker.Unlimit) + w.AddFunction("foobar", foobar) + w.AddServer("127.0.0.1:4730") + w.Work() // Enter the worker's main loop The definition of the callback function 'foobar' should suit for the type 'JobFunction'. It looks like this: @@ -37,16 +44,20 @@ func foobar(job *WorkerJob) (data []byte, err os.Error) { type Worker struct { clients []*jobAgent functions JobFunctionMap - running bool incoming chan *WorkerJob mutex sync.Mutex + limit chan bool + JobQueue chan *WorkerJob - ErrQueue chan error + + // assign a ErrFunc to handle errors + // Must assign befor AddServer + ErrFunc ErrFunc } // Get a new worker -func New() (worker *Worker) { +func New(l int) (worker *Worker) { worker = &Worker{ // job server list clients: make([]*jobAgent, 0, gearman.WORKER_SERVER_CAP), @@ -54,12 +65,24 @@ func New() (worker *Worker) { functions: make(JobFunctionMap), incoming: make(chan *WorkerJob, gearman.QUEUE_CAP), JobQueue: make(chan *WorkerJob, gearman.QUEUE_CAP), - ErrQueue: make(chan error, gearman.QUEUE_CAP), running: true, } + if l != Unlimit { + worker.limit = make(chan bool, l) + for i := 0; i < l; i ++ { + worker.limit <- true + } + } return } +// +func (worker *Worker)err(e error) { + if worker.ErrFunc != nil { + worker.ErrFunc(e) + } +} + // Add a server. The addr should be 'host:port' format. // The connection is established at this time. func (worker *Worker) AddServer(addr string) (err error) { @@ -141,11 +164,11 @@ func (worker *Worker) Work() { // do nothing case gearman.ERROR: _, err := gearman.GetError(job.Data) - worker.ErrQueue <- err + worker.err(err) case gearman.JOB_ASSIGN, gearman.JOB_ASSIGN_UNIQ: go func() { if err := worker.exec(job); err != nil { - worker.ErrQueue <- err + worker.err(err) } }() default: @@ -217,6 +240,12 @@ func (worker *Worker) SetId(id string) (err error) { // Execute the job. And send back the result. func (worker *Worker) exec(job *WorkerJob) (err error) { + if worker.limit != nil { + <- worker.limit + defer func() { + worker.limit <- true + }() + } var limit int if job.DataType == gearman.JOB_ASSIGN { limit = 3