// Copyright 2011 Xing Xing All rights reserved. // Use of this source code is governed by a MIT // license that can be found in the LICENSE file. package worker import ( "bytes" "bitbucket.org/mikespook/gearman-go/common" ) const ( Unlimited = 0 OneByOne = 1 ) // Job handler type JobHandler func(*Job) error type JobFunc func(job *Job) ([]byte, error) // The definition of the callback function. type jobFunc struct { f JobFunc timeout uint32 } // Map for added function. type JobFuncs map[string]*jobFunc /* Worker side api for gearman usage: w = worker.New(worker.Unlimited) w.AddFunction("foobar", foobar) w.AddServer("127.0.0.1:4730") w.Work() // Enter the worker's main loop The definition of the callback function 'foobar' should suit for the type 'JobFunction'. It looks like this: func foobar(job *Job) (data []byte, err os.Error) { //sth. here //plaplapla... return } */ type Worker struct { agents []*agent funcs JobFuncs in chan *Job running bool limit chan bool Id string // assign a ErrFunc to handle errors ErrHandler common.ErrorHandler JobHandler JobHandler } // Get a new worker func New(l int) (worker *Worker) { worker = &Worker{ agents: make([]*agent, 0), funcs: make(JobFuncs), in: make(chan *Job, common.QUEUE_SIZE), } if l != Unlimited { worker.limit = make(chan bool, l) for i := 0; i < l; i ++ { worker.limit <- true } } return } // func (worker *Worker)err(e error) { if worker.ErrHandler != nil { worker.ErrHandler(e) } } // Add a server. The addr should be 'host:port' format. // The connection is established at this time. func (worker *Worker) AddServer(addr string) (err error) { // Create a new job server's client as a agent of server server, err := newAgent(addr, worker) if err != nil { return err } worker.agents = append(worker.agents, server) return } // Write a job to job server. // Here, the job's mean is not the oraginal mean. // Just looks like a network package for job's result or tell job server, there was a fail. func (worker *Worker) broadcast(job *Job) { for _, v := range worker.agents { v.WriteJob(job) } } // Add a function. // Plz added job servers first, then functions. // The API will tell every connected job server that 'I can do this' func (worker *Worker) AddFunc(funcname string, f JobFunc, timeout uint32) (err error) { if _, ok := worker.funcs[funcname]; ok { return common.Errorf("The function already exists: %s", funcname) } worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout} if worker.running { worker.addFunc(funcname, timeout) } return } // inner add function func (worker *Worker) addFunc(funcname string, timeout uint32) { var datatype uint32 var data []byte if timeout == 0 { datatype = common.CAN_DO data = []byte(funcname) } else { datatype = common.CAN_DO_TIMEOUT data = []byte(funcname + "\x00") t := common.Uint32ToBytes(timeout) data = append(data, t[:]...) } job := newJob(common.REQ, datatype, data) worker.broadcast(job) } // Remove a function. // Tell job servers 'I can not do this now' at the same time. func (worker *Worker) RemoveFunc(funcname string) (err error) { if _, ok := worker.funcs[funcname]; !ok { return common.Errorf("The function does not exist: %s", funcname) } delete(worker.funcs, funcname) if worker.running { worker.removeFunc(funcname) } return } // inner remove function func (worker *Worker) removeFunc(funcname string) { job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) worker.broadcast(job) } // Main loop func (worker *Worker) Work() { defer func() { worker.running = false for _, v := range worker.agents { v.Close() } }() for funcname, f := range worker.funcs { worker.addFunc(funcname, f.timeout) } worker.running = true for _, v := range worker.agents { go v.Work() } ok := true var job *Job for ok { if job, ok = <-worker.in; ok { switch job.DataType { case common.ERROR: go func() { _, err := common.GetError(job.Data) worker.err(err) }() case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: go func() { if err := worker.exec(job); err != nil { worker.err(err) } }() default: go worker.handleJob(job) } } } } // job handler func (worker *Worker) handleJob(job *Job) { if worker.JobHandler != nil { if err := worker.JobHandler(job); err != nil { worker.err(err) } } } // Close. func (worker *Worker) Close() { close(worker.in) if worker.limit != nil { close(worker.limit) } } // Send a something out, get the samething back. func (worker *Worker) Echo(data []byte) { job := newJob(common.REQ, common.ECHO_REQ, data) worker.broadcast(job) } // Remove all of functions. // Both from the worker or job servers. func (worker *Worker) Reset() { job := newJob(common.REQ, common.RESET_ABILITIES, nil) worker.broadcast(job) worker.funcs = make(JobFuncs) } // Set the worker's unique id. func (worker *Worker) SetId(id string) { worker.Id = id job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) worker.broadcast(job) } // Execute the job. And send back the result. func (worker *Worker) exec(job *Job) (err error) { if worker.limit != nil { <-worker.limit defer func() { worker.limit <- true }() } var limit int if job.DataType == common.JOB_ASSIGN { limit = 3 } else { limit = 4 } jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit) job.Handle = string(jobdata[0]) funcname := string(jobdata[1]) if job.DataType == common.JOB_ASSIGN { job.Data = jobdata[2] } else { job.UniqueId = string(jobdata[2]) job.Data = jobdata[3] } f, ok := worker.funcs[funcname] if !ok { return common.Errorf("The function does not exist: %s", funcname) } result, err := f.f(job) var datatype uint32 if err == nil { datatype = common.WORK_COMPLETE } else { if result == nil { datatype = common.WORK_FAIL } else { datatype = common.WORK_EXCEPTION } } job.magicCode = common.REQ job.DataType = datatype job.Data = result job.agent.WriteJob(job) return } func (worker *Worker) removeAgent(a *agent) { for k, v := range worker.agents { if v == a { worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...) } } if len(worker.agents) == 0 { worker.err(common.ErrNoActiveAgent) } }