diff --git a/client/client.go b/client/client.go index 0f8d00a..fe3d7fe 100644 --- a/client/client.go +++ b/client/client.go @@ -45,7 +45,7 @@ type Client struct { // Eg. // client, err := client.New("127.0.0.1:4730") func New(addr string) (client *Client, err error) { - conn, err := net.Dial("tcp", addr) + conn, err := net.Dial(common.NETWORK, addr) if err != nil { return } diff --git a/common/gearman.go b/common/gearman.go index eebaec1..cc9d41e 100644 --- a/common/gearman.go +++ b/common/gearman.go @@ -6,10 +6,7 @@ package common const ( - // the number limited for job servers. - WORKER_SERVER_CAP = 32 - // the number limited for functions. - WORKER_FUNCTION_CAP = 512 + NETWORK = "tcp" // queue size QUEUE_SIZE = 512 // read buffer size diff --git a/example/worker.go b/example/worker.go index d9b4759..bd82a2f 100644 --- a/example/worker.go +++ b/example/worker.go @@ -1,61 +1,25 @@ package main import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bitbucket.org/mikespook/gearman-go/gearman/worker" - "bitbucket.org/mikespook/golib/signal" - "os" - "fmt" + "bitbucket.org/mikespook/gearman-go/worker" +// "bitbucket.org/mikespook/golib/signal" +// "os" "log" "strings" ) -func ToUpper(job *worker.WorkerJob) ([]byte, error) { +func ToUpper(job *worker.Job) ([]byte, error) { data := []byte(strings.ToUpper(string(job.Data))) return data, nil } func main() { - w := worker.New(worker.Unlimit) - w.ErrFunc = func(e error) { + w := worker.New(worker.Unlimited) + w.ErrHandler = func(e error) { log.Println(e) } w.AddServer("127.0.0.1:4730") w.AddFunction("ToUpper", ToUpper, 0) w.AddFunction("ToUpperTimeOut5", ToUpper, 5) - - // Catch the interrupt to exit the working loop. - sh := signal.NewHandler() - sh.Bind(os.Interrupt, func() bool { - w.Close() - return true - }) - go sh.Loop() - - go func() { - log.Println("start worker") - for { - print("cmd: ") - var str string - fmt.Scan(&str) - switch str { - case "echo": - w.Echo([]byte("Hello world!")) - var job *worker.WorkerJob - for job = <-w.JobQueue; job.DataType != gearman.ECHO_RES; job = <-w.JobQueue { - log.Println(job) - } - log.Println(string(job.Data)) - case "quit": - os.Exit(0) - return - case "result": - job := <-w.JobQueue - log.Println(string(job.Data)) - default: - log.Println("Unknown command") - } - } - }() w.Work() } diff --git a/worker/jobagent.go b/worker/agent.go similarity index 63% rename from worker/jobagent.go rename to worker/agent.go index 5456e94..9efc780 100644 --- a/worker/jobagent.go +++ b/worker/agent.go @@ -5,9 +5,9 @@ package worker import ( - gearman "bitbucket.org/mikespook/gearman-go" "io" "net" + "bitbucket.org/mikespook/gearman-go/common" ) // The agent of job server. @@ -15,27 +15,33 @@ type jobAgent struct { conn net.Conn worker *Worker running bool - incoming chan []byte + in chan []byte + out chan *Job } // Create the agent of job server. func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err error) { - conn, err := net.Dial(gearman.TCP, addr) + conn, err := net.Dial(common.NETWORK, addr) if err != nil { return nil, err } - jobagent = &jobAgent{conn: conn, worker: worker, running: true, incoming: make(chan []byte, gearman.QUEUE_CAP)} + jobagent = &jobAgent{ + conn: conn, + worker: worker, + running: true, + in: make(chan []byte, common.QUEUE_SIZE), + } return jobagent, err } // Internal read func (agent *jobAgent) read() (data []byte, err error) { - if len(agent.incoming) > 0 { - // incoming queue is not empty - data = <-agent.incoming + if len(agent.in) > 0 { + // in queue is not empty + data = <-agent.in } else { for { - buf := make([]byte, gearman.BUFFER_SIZE) + buf := make([]byte, common.BUFFER_SIZE) var n int if n, err = agent.conn.Read(buf); err != nil { if err == io.EOF && n == 0 { @@ -45,7 +51,7 @@ func (agent *jobAgent) read() (data []byte, err error) { return } data = append(data, buf[0:n]...) - if n < gearman.BUFFER_SIZE { + if n < common.BUFFER_SIZE { break } } @@ -54,13 +60,13 @@ func (agent *jobAgent) read() (data []byte, err error) { start := 0 tl := len(data) for i := 0; i < tl; i++ { - if string(data[start:start+4]) == gearman.RES_STR { - l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + if string(data[start:start+4]) == common.RES_STR { + l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) total := l + 12 if total == tl { return } else { - agent.incoming <- data[total:] + agent.in <- data[total:] data = data[:total] return } @@ -68,7 +74,7 @@ func (agent *jobAgent) read() (data []byte, err error) { start++ } } - err = gearman.ErrInvalidData + err = common.ErrInvalidData return } @@ -76,29 +82,29 @@ func (agent *jobAgent) read() (data []byte, err error) { func (agent *jobAgent) Work() { noop := true for agent.running { - // got noop msg and incoming queue is zero, grab job - if noop && len(agent.incoming) == 0 { - agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.GRAB_JOB, nil)) + // got noop msg and in queue is zero, grab job + if noop && len(agent.in) == 0 { + agent.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) } rel, err := agent.read() if err != nil { agent.worker.err(err) continue } - job, err := DecodeWorkerJob(rel) + job, err := decodeJob(rel) if err != nil { agent.worker.err(err) continue } else { switch job.DataType { - case gearman.NOOP: + case common.NOOP: noop = true - case gearman.NO_JOB: + case common.NO_JOB: noop = false - agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.PRE_SLEEP, nil)) - case gearman.ECHO_RES, gearman.JOB_ASSIGN_UNIQ, gearman.JOB_ASSIGN: + agent.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) + case common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: job.agent = agent - agent.worker.incoming <- job + agent.worker.in <- job } } } @@ -106,7 +112,7 @@ func (agent *jobAgent) Work() { } // Send a job to the job server. -func (agent *jobAgent) WriteJob(job *WorkerJob) (err error) { +func (agent *jobAgent) WriteJob(job *Job) (err error) { return agent.write(job.Encode()) } @@ -125,7 +131,7 @@ func (agent *jobAgent) write(buf []byte) (err error) { // Close. func (agent *jobAgent) Close() (err error) { agent.running = false - close(agent.incoming) + close(agent.in) err = agent.conn.Close() return } diff --git a/worker/workerjob.go b/worker/job.go similarity index 51% rename from worker/workerjob.go rename to worker/job.go index 1f60e12..267ca3a 100644 --- a/worker/workerjob.go +++ b/worker/job.go @@ -5,12 +5,12 @@ package worker import ( - gearman "bitbucket.org/mikespook/gearman-go" "strconv" + "bitbucket.org/mikespook/gearman-go/common" ) // Worker side job -type WorkerJob struct { +type Job struct { Data []byte Handle, UniqueId string agent *jobAgent @@ -18,70 +18,74 @@ type WorkerJob struct { } // Create a new job -func NewWorkerJob(magiccode, datatype uint32, data []byte) (job *WorkerJob) { - return &WorkerJob{magicCode: magiccode, +func newJob(magiccode, datatype uint32, data []byte) (job *Job) { + return &Job{magicCode: magiccode, DataType: datatype, Data: data} } // Decode job from byte slice -func DecodeWorkerJob(data []byte) (job *WorkerJob, err error) { +func decodeJob(data []byte) (job *Job, err error) { if len(data) < 12 { - err = gearman.ErrInvalidData + err = common.ErrInvalidData return } - datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) + datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) if len(data[12:]) != int(l) { - err = gearman.ErrInvalidData + err = common.ErrInvalidData return } data = data[12:] - job = NewWorkerJob(gearman.RES, datatype, data) + job = newJob(common.RES, datatype, data) return } // Encode a job to byte slice -func (job *WorkerJob) Encode() (data []byte) { - magiccode := gearman.Uint32ToBytes(job.magicCode) - datatype := gearman.Uint32ToBytes(job.DataType) - data = make([]byte, 0, 1024*64) +func (job *Job) Encode() (data []byte) { + l := len(job.Data) + tl := l + 12 + if job.Handle != "" { + tl += len(job.Handle) + 1 + } + data = make([]byte, 0, tl) + + magiccode := common.Uint32ToBytes(job.magicCode) + datatype := common.Uint32ToBytes(job.DataType) + datalength := common.Uint32ToBytes(uint32(tl)) + data = append(data, magiccode[:]...) data = append(data, datatype[:]...) - data = append(data, []byte{0, 0, 0, 0}...) - l := len(job.Data) + data = append(data, datalength[:]...) if job.Handle != "" { data = append(data, []byte(job.Handle)...) data = append(data, 0) - l += len(job.Handle) + 1 } data = append(data, job.Data...) - datalength := gearman.Uint32ToBytes(uint32(l)) - copy(data[8:12], datalength[:]) return } // Send some datas to client. // Using this in a job's executing. -func (job *WorkerJob) UpdateData(data []byte, iswaring bool) (err error) { +func (job *Job) UpdateData(data []byte, iswaring bool) (err error) { result := append([]byte(job.Handle), 0) result = append(result, data...) var datatype uint32 if iswaring { - datatype = gearman.WORK_WARNING + datatype = common.WORK_WARNING } else { - datatype = gearman.WORK_DATA + datatype = common.WORK_DATA } - return job.agent.WriteJob(NewWorkerJob(gearman.REQ, datatype, result)) + return job.agent.WriteJob(newJob(common.REQ, datatype, result)) } // Update status. // Tall client how many percent job has been executed. -func (job *WorkerJob) UpdateStatus(numerator, denominator int) (err error) { +func (job *Job) UpdateStatus(numerator, denominator int) (err error) { n := []byte(strconv.Itoa(numerator)) d := []byte(strconv.Itoa(denominator)) result := append([]byte(job.Handle), 0) result = append(result, n...) result = append(result, d...) - return job.agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.WORK_STATUS, result)) + return job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) } diff --git a/worker/worker.go b/worker/worker.go index 9a61937..3f1c38c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,29 +5,26 @@ package worker import ( - gearman "bitbucket.org/mikespook/gearman-go" "bytes" - "sync" + "bitbucket.org/mikespook/gearman-go/common" ) const ( - Unlimit = 0 + Unlimited = 0 OneByOne = 1 ) // The definition of the callback function. -type JobFunction func(job *WorkerJob) ([]byte, error) +type JobFunc func(job *Job) ([]byte, error) // Map for added function. -type JobFunctionMap map[string]JobFunction +type JobFuncs map[string]JobFunc -// Error Function -type ErrFunc func(e error) /* -Worker side api for gearman. +Worker side api for gearman usage: - w = worker.New(worker.Unlimit) + w = worker.New(worker.Unlimited) w.AddFunction("foobar", foobar) w.AddServer("127.0.0.1:4730") w.Work() // Enter the worker's main loop @@ -35,51 +32,49 @@ usage: The definition of the callback function 'foobar' should suit for the type 'JobFunction'. It looks like this: -func foobar(job *WorkerJob) (data []byte, err os.Error) { +func foobar(job *Job) (data []byte, err os.Error) { //sth. here //plaplapla... return } */ type Worker struct { - clients []*jobAgent - functions JobFunctionMap - running bool - incoming chan *WorkerJob - mutex sync.Mutex + agents []*jobAgent + funcs JobFuncs + in chan *Job + out chan *Job + running bool limit chan bool - JobQueue chan *WorkerJob - + Id string // assign a ErrFunc to handle errors - // Must assign befor AddServer - ErrFunc ErrFunc + ErrHandler common.ErrorHandler } // Get a new worker func New(l int) (worker *Worker) { worker = &Worker{ - // job server list - clients: make([]*jobAgent, 0, gearman.WORKER_SERVER_CAP), - // function list + agents: make([]*jobAgent, 0), functions: make(JobFunctionMap), - incoming: make(chan *WorkerJob, gearman.QUEUE_CAP), - JobQueue: make(chan *WorkerJob, gearman.QUEUE_CAP), + + in: make(chan *Job, common.QUEUE_SIZE), + out: make(chan *Job, common.QUEUE_SIZE), running: true, } - if l != Unlimit { + if l != Unlimited { worker.limit = make(chan bool, l) for i := 0; i < l; i ++ { worker.limit <- true } } + go worker.outLoop() return } // func (worker *Worker)err(e error) { - if worker.ErrFunc != nil { - worker.ErrFunc(e) + if worker.ErrHandler != nil { + worker.ErrHandler(e) } } @@ -90,7 +85,7 @@ func (worker *Worker) AddServer(addr string) (err error) { defer worker.mutex.Unlock() if len(worker.clients) == cap(worker.clients) { - return gearman.ErrOutOfCap + return common.ErrOutOfCap } // Create a new job server's client as a agent of server @@ -109,9 +104,9 @@ func (worker *Worker) AddServer(addr string) (err error) { // Plz added job servers first, then functions. // The API will tell every connected job server that 'I can do this' func (worker *Worker) AddFunction(funcname string, - f JobFunction, timeout uint32) (err error) { + f JobFunc, timeout uint32) (err error) { if len(worker.clients) < 1 { - return gearman.ErrNotConn + return common.ErrNotConn } worker.mutex.Lock() defer worker.mutex.Unlock() @@ -120,15 +115,15 @@ func (worker *Worker) AddFunction(funcname string, var datatype uint32 var data []byte if timeout == 0 { - datatype = gearman.CAN_DO + datatype = common.CAN_DO data = []byte(funcname) } else { - datatype = gearman.CAN_DO_TIMEOUT + datatype = common.CAN_DO_TIMEOUT data = []byte(funcname + "\x00") - t := gearman.Uint32ToBytes(timeout) + t := common.Uint32ToBytes(timeout) data = append(data, t[:]...) } - job := NewWorkerJob(gearman.REQ, datatype, data) + job := newJob(common.REQ, datatype, data) worker.WriteJob(job) return } @@ -140,10 +135,10 @@ func (worker *Worker) RemoveFunction(funcname string) (err error) { defer worker.mutex.Unlock() if worker.functions[funcname] == nil { - return gearman.ErrFuncNotFound + return common.ErrFuncNotFound } delete(worker.functions, funcname) - job := NewWorkerJob(gearman.REQ, gearman.CANT_DO, []byte(funcname)) + job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) worker.WriteJob(job) return } @@ -153,19 +148,19 @@ func (worker *Worker) Work() { for _, v := range worker.clients { go v.Work() } - for worker.running || len(worker.incoming) > 0{ + for worker.running || len(worker.in) > 0{ select { - case job := <-worker.incoming: + case job := <-worker.in: if job == nil { break } switch job.DataType { - case gearman.NO_JOB: + case common.NO_JOB: // do nothing - case gearman.ERROR: - _, err := gearman.GetError(job.Data) + case common.ERROR: + _, err := common.GetError(job.Data) worker.err(err) - case gearman.JOB_ASSIGN, gearman.JOB_ASSIGN_UNIQ: + case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: go func() { if err := worker.exec(job); err != nil { worker.err(err) @@ -176,23 +171,7 @@ func (worker *Worker) Work() { } } } - close(worker.incoming) -} - -// Get the last job in queue. -// If there are more than one job in the queue, -// the last one will be returned, -// the others will be lost. -func (worker *Worker) LastJob() (job *WorkerJob) { - if l := len(worker.JobQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l-1; i++ { - <-worker.JobQueue - } - } - return <-worker.JobQueue + close(worker.in) } // Close. @@ -207,26 +186,26 @@ func (worker *Worker) Close() (err error) { // Write a job to job server. // Here, the job's mean is not the oraginal mean. // Just looks like a network package for job's result or tell job server, there was a fail. -func (worker *Worker) WriteJob(job *WorkerJob) (err error) { - e := make(chan error) - for _, v := range worker.clients { +func (worker *Worker) Broadcast(job *Job) { + for _, v := range worker.agents { go func() { - e <- v.WriteJob(job) + if err := v.WriteJob(job); err != nil { + worker.err(err) + } }() } - return <-e } // Send a something out, get the samething back. func (worker *Worker) Echo(data []byte) (err error) { - job := NewWorkerJob(gearman.REQ, gearman.ECHO_REQ, data) + job := newJob(common.REQ, common.ECHO_REQ, data) return worker.WriteJob(job) } // Remove all of functions. // Both from the worker or job servers. func (worker *Worker) Reset() (err error) { - job := NewWorkerJob(gearman.REQ, gearman.RESET_ABILITIES, nil) + job := newJob(common.REQ, common.RESET_ABILITIES, nil) err = worker.WriteJob(job) worker.functions = make(JobFunctionMap) return @@ -234,20 +213,20 @@ func (worker *Worker) Reset() (err error) { // Set the worker's unique id. func (worker *Worker) SetId(id string) (err error) { - job := NewWorkerJob(gearman.REQ, gearman.SET_CLIENT_ID, []byte(id)) + job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) return worker.WriteJob(job) } // Execute the job. And send back the result. -func (worker *Worker) exec(job *WorkerJob) (err error) { +func (worker *Worker) exec(job *Job) (err error) { if worker.limit != nil { - <- worker.limit + <-worker.limit defer func() { worker.limit <- true }() } var limit int - if job.DataType == gearman.JOB_ASSIGN { + if job.DataType == common.JOB_ASSIGN { limit = 3 } else { limit = 4 @@ -255,7 +234,7 @@ func (worker *Worker) exec(job *WorkerJob) (err error) { jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit) job.Handle = string(jobdata[0]) funcname := string(jobdata[1]) - if job.DataType == gearman.JOB_ASSIGN { + if job.DataType == common.JOB_ASSIGN { job.Data = jobdata[2] } else { job.UniqueId = string(jobdata[2]) @@ -263,21 +242,21 @@ func (worker *Worker) exec(job *WorkerJob) (err error) { } f, ok := worker.functions[funcname] if !ok { - return gearman.ErrFuncNotFound + return common.ErrFuncNotFound } result, err := f(job) var datatype uint32 if err == nil { - datatype = gearman.WORK_COMPLETE + datatype = common.WORK_COMPLETE } else { if result == nil { - datatype = gearman.WORK_FAIL + datatype = common.WORK_FAIL } else { - datatype = gearman.WORK_EXCEPTION + datatype = common.WORK_EXCEPTION } } - job.magicCode = gearman.REQ + job.magicCode = common.REQ job.DataType = datatype job.Data = result