diff --git a/example/worker.go b/example/worker.go index bd82a2f..9a19338 100644 --- a/example/worker.go +++ b/example/worker.go @@ -1,25 +1,37 @@ package main import ( - "bitbucket.org/mikespook/gearman-go/worker" -// "bitbucket.org/mikespook/golib/signal" -// "os" + "os" "log" "strings" + "bitbucket.org/mikespook/golib/signal" + "bitbucket.org/mikespook/gearman-go/worker" ) func ToUpper(job *worker.Job) ([]byte, error) { + log.Printf("Handle=[%s]; UID=[%s], Data=[%s]\n", + job.Handle, job.UniqueId, job.Data) data := []byte(strings.ToUpper(string(job.Data))) return data, nil } func main() { + log.Println("Starting ...") + defer log.Println("Shutdown complete!") w := worker.New(worker.Unlimited) w.ErrHandler = func(e error) { log.Println(e) } + w.JobHandler = func(job *worker.Job) error { + log.Printf("H=%s, UID=%s, Data=%s\n", job.Handle, + job.UniqueId, job.Data) + return nil + } w.AddServer("127.0.0.1:4730") - w.AddFunction("ToUpper", ToUpper, 0) - w.AddFunction("ToUpperTimeOut5", ToUpper, 5) - w.Work() + w.AddFunc("ToUpper", ToUpper, 0) + //w.AddFunc("ToUpperTimeOut5", ToUpper, 5) + go w.Work() + sh := signal.NewHandler() + sh.Bind(os.Interrupt, func() bool {return true}) + sh.Loop() } diff --git a/worker/agent.go b/worker/agent.go index 9efc780..ceef56a 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -11,42 +11,101 @@ import ( ) // The agent of job server. -type jobAgent struct { - conn net.Conn - worker *Worker - running bool - in chan []byte - out chan *Job +type agent struct { + conn net.Conn + worker *Worker + in chan []byte + out chan *Job } // Create the agent of job server. -func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err error) { +func newAgent(addr string, worker *Worker) (a *agent, err error) { conn, err := net.Dial(common.NETWORK, addr) if err != nil { - return nil, err + return } - jobagent = &jobAgent{ + a = &agent{ conn: conn, worker: worker, - running: true, in: make(chan []byte, common.QUEUE_SIZE), + out: make(chan *Job, common.QUEUE_SIZE), } - return jobagent, err + return +} + +// outputing loop +func (a *agent) outLoop() { + ok := true + for ok { + if job, ok := <-a.out; ok { + if err := a.write(job.Encode()); err != nil { + a.worker.err(err) + } + } + } +} + +// inputing loop +func (a *agent) inLoop() { + defer func() { + a.conn.Close() + close(a.in) + close(a.out) + a.worker.removeAgent(a) + }() + noop := true + for a.worker.running { + // got noop msg and in queue is zero, grab job + if noop && len(a.in) == 0 { + a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) + } + rel, err := a.read() + if err != nil { + if err == common.ErrEmptyReading { + break + } + a.worker.err(err) + continue + } + job, err := decodeJob(rel) + if err != nil { + a.worker.err(err) + continue + } + switch job.DataType { + case common.NOOP: + noop = true + case common.NO_JOB: + noop = false + a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) + case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: + job.agent = a + a.worker.in <- job + } + } +} + +func (a *agent) Work() { + go a.outLoop() + go a.inLoop() } // Internal read -func (agent *jobAgent) read() (data []byte, err error) { - if len(agent.in) > 0 { +func (a *agent) read() (data []byte, err error) { + if len(a.in) > 0 { // in queue is not empty - data = <-agent.in + data = <-a.in } else { for { buf := make([]byte, common.BUFFER_SIZE) var n int - if n, err = agent.conn.Read(buf); err != nil { + if n, err = a.conn.Read(buf); err != nil { if err == io.EOF && n == 0 { - err = nil - return + if data == nil { + err = common.ErrEmptyReading + return + } + break } return } @@ -57,16 +116,17 @@ func (agent *jobAgent) read() (data []byte, err error) { } } // split package - start := 0 tl := len(data) + start := 0 for i := 0; i < tl; i++ { if string(data[start:start+4]) == common.RES_STR { - l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + l := int(common.BytesToUint32([4]byte{data[start+8], + data[start+9], data[start+10], data[start+11]})) total := l + 12 if total == tl { return } else { - agent.in <- data[total:] + a.in <- data[total:] data = data[:total] return } @@ -74,64 +134,22 @@ func (agent *jobAgent) read() (data []byte, err error) { start++ } } - err = common.ErrInvalidData - return -} - -// Main loop. -func (agent *jobAgent) Work() { - noop := true - for agent.running { - // got noop msg and in queue is zero, grab job - if noop && len(agent.in) == 0 { - agent.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) - } - rel, err := agent.read() - if err != nil { - agent.worker.err(err) - continue - } - job, err := decodeJob(rel) - if err != nil { - agent.worker.err(err) - continue - } else { - switch job.DataType { - case common.NOOP: - noop = true - case common.NO_JOB: - noop = false - agent.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) - case common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: - job.agent = agent - agent.worker.in <- job - } - } - } - return + return nil, common.Errorf("Invalid data: %V", data) } // Send a job to the job server. -func (agent *jobAgent) WriteJob(job *Job) (err error) { - return agent.write(job.Encode()) +func (a *agent) WriteJob(job *Job) { + a.out <- job } // Internal write the encoded job. -func (agent *jobAgent) write(buf []byte) (err error) { +func (a *agent) write(buf []byte) (err error) { var n int for i := 0; i < len(buf); i += n { - n, err = agent.conn.Write(buf[i:]) + n, err = a.conn.Write(buf[i:]) if err != nil { return err } } return } - -// Close. -func (agent *jobAgent) Close() (err error) { - agent.running = false - close(agent.in) - err = agent.conn.Close() - return -} diff --git a/worker/job.go b/worker/job.go index 267ca3a..054fd30 100644 --- a/worker/job.go +++ b/worker/job.go @@ -1,4 +1,5 @@ -// Copyright 2011 Xing Xing All rights reserved. +// Copyright 2011 Xing Xing +// All rights reserved. // Use of this source code is governed by a MIT // license that can be found in the LICENSE file. @@ -13,7 +14,7 @@ import ( type Job struct { Data []byte Handle, UniqueId string - agent *jobAgent + agent *agent magicCode, DataType uint32 } @@ -27,14 +28,12 @@ func newJob(magiccode, datatype uint32, data []byte) (job *Job) { // Decode job from byte slice func decodeJob(data []byte) (job *Job, err error) { if len(data) < 12 { - err = common.ErrInvalidData - return + return nil, common.Errorf("Invalid data: %V", data) } datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) if len(data[12:]) != int(l) { - err = common.ErrInvalidData - return + return nil, common.Errorf("Invalid data: %V", data) } data = data[12:] job = newJob(common.RES, datatype, data) @@ -44,11 +43,11 @@ func decodeJob(data []byte) (job *Job, err error) { // Encode a job to byte slice func (job *Job) Encode() (data []byte) { l := len(job.Data) - tl := l + 12 + tl := l if job.Handle != "" { tl += len(job.Handle) + 1 } - data = make([]byte, 0, tl) + data = make([]byte, 0, tl + 12) magiccode := common.Uint32ToBytes(job.magicCode) datatype := common.Uint32ToBytes(job.DataType) @@ -67,7 +66,7 @@ func (job *Job) Encode() (data []byte) { // Send some datas to client. // Using this in a job's executing. -func (job *Job) UpdateData(data []byte, iswaring bool) (err error) { +func (job *Job) UpdateData(data []byte, iswaring bool) { result := append([]byte(job.Handle), 0) result = append(result, data...) var datatype uint32 @@ -76,16 +75,16 @@ func (job *Job) UpdateData(data []byte, iswaring bool) (err error) { } else { datatype = common.WORK_DATA } - return job.agent.WriteJob(newJob(common.REQ, datatype, result)) + job.agent.WriteJob(newJob(common.REQ, datatype, result)) } // Update status. // Tall client how many percent job has been executed. -func (job *Job) UpdateStatus(numerator, denominator int) (err error) { +func (job *Job) UpdateStatus(numerator, denominator int) { n := []byte(strconv.Itoa(numerator)) d := []byte(strconv.Itoa(denominator)) result := append([]byte(job.Handle), 0) result = append(result, n...) result = append(result, d...) - return job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) + job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) } diff --git a/worker/worker.go b/worker/worker.go index 3f1c38c..667e16e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -13,21 +13,28 @@ const ( Unlimited = 0 OneByOne = 1 ) +// Job handler +type JobHandler func(*Job) error -// The definition of the callback function. type JobFunc func(job *Job) ([]byte, error) +// The definition of the callback function. +type jobFunc struct { + f JobFunc + timeout uint32 +} + // Map for added function. -type JobFuncs map[string]JobFunc +type JobFuncs map[string]*jobFunc /* Worker side api for gearman usage: - w = worker.New(worker.Unlimited) - w.AddFunction("foobar", foobar) - w.AddServer("127.0.0.1:4730") - w.Work() // Enter the worker's main loop +w = worker.New(worker.Unlimited) +w.AddFunction("foobar", foobar) +w.AddServer("127.0.0.1:4730") +w.Work() // Enter the worker's main loop The definition of the callback function 'foobar' should suit for the type 'JobFunction'. It looks like this: @@ -39,27 +46,24 @@ func foobar(job *Job) (data []byte, err os.Error) { } */ type Worker struct { - agents []*jobAgent + agents []*agent funcs JobFuncs in chan *Job - out chan *Job running bool limit chan bool Id string // assign a ErrFunc to handle errors ErrHandler common.ErrorHandler + JobHandler JobHandler } // Get a new worker func New(l int) (worker *Worker) { worker = &Worker{ - agents: make([]*jobAgent, 0), - functions: make(JobFunctionMap), - + agents: make([]*agent, 0), + funcs: make(JobFuncs), in: make(chan *Job, common.QUEUE_SIZE), - out: make(chan *Job, common.QUEUE_SIZE), - running: true, } if l != Unlimited { worker.limit = make(chan bool, l) @@ -67,7 +71,6 @@ func New(l int) (worker *Worker) { worker.limit <- true } } - go worker.outLoop() return } @@ -81,37 +84,42 @@ func (worker *Worker)err(e error) { // Add a server. The addr should be 'host:port' format. // The connection is established at this time. func (worker *Worker) AddServer(addr string) (err error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() - - if len(worker.clients) == cap(worker.clients) { - return common.ErrOutOfCap - } - // Create a new job server's client as a agent of server - server, err := newJobAgent(addr, worker) + server, err := newAgent(addr, worker) if err != nil { return err } - - n := len(worker.clients) - worker.clients = worker.clients[0 : n+1] - worker.clients[n] = server + worker.agents = append(worker.agents, server) return } +// Write a job to job server. +// Here, the job's mean is not the oraginal mean. +// Just looks like a network package for job's result or tell job server, there was a fail. +func (worker *Worker) broadcast(job *Job) { + for _, v := range worker.agents { + v.WriteJob(job) + } +} + // Add a function. // Plz added job servers first, then functions. // The API will tell every connected job server that 'I can do this' -func (worker *Worker) AddFunction(funcname string, - f JobFunc, timeout uint32) (err error) { - if len(worker.clients) < 1 { - return common.ErrNotConn +func (worker *Worker) AddFunc(funcname string, +f JobFunc, timeout uint32) (err error) { + if _, ok := worker.funcs[funcname]; ok { + return common.Errorf("The function already exists: %s", funcname) } - worker.mutex.Lock() - defer worker.mutex.Unlock() - worker.functions[funcname] = f + worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout} + if worker.running { + worker.addFunc(funcname, timeout) + } + return +} + +// inner add function +func (worker *Worker) addFunc(funcname string, timeout uint32) { var datatype uint32 var data []byte if timeout == 0 { @@ -124,42 +132,50 @@ func (worker *Worker) AddFunction(funcname string, data = append(data, t[:]...) } job := newJob(common.REQ, datatype, data) - worker.WriteJob(job) - return + worker.broadcast(job) + } // Remove a function. // Tell job servers 'I can not do this now' at the same time. -func (worker *Worker) RemoveFunction(funcname string) (err error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() - - if worker.functions[funcname] == nil { - return common.ErrFuncNotFound +func (worker *Worker) RemoveFunc(funcname string) (err error) { + if _, ok := worker.funcs[funcname]; !ok { + return common.Errorf("The function does not exist: %s", funcname) + } + delete(worker.funcs, funcname) + if worker.running { + worker.removeFunc(funcname) } - delete(worker.functions, funcname) - job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) - worker.WriteJob(job) return } +// inner remove function +func (worker *Worker) removeFunc(funcname string) { + job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) + worker.broadcast(job) +} + // Main loop func (worker *Worker) Work() { - for _, v := range worker.clients { + defer func() { + worker.running = false + }() + for funcname, f := range worker.funcs { + worker.addFunc(funcname, f.timeout) + } + worker.running = true + for _, v := range worker.agents { go v.Work() } - for worker.running || len(worker.in) > 0{ - select { - case job := <-worker.in: - if job == nil { - break - } + ok := true + for ok { + if job, ok := <-worker.in; ok { switch job.DataType { - case common.NO_JOB: - // do nothing case common.ERROR: - _, err := common.GetError(job.Data) - worker.err(err) + go func() { + _, err := common.GetError(job.Data) + worker.err(err) + }() case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: go func() { if err := worker.exec(job); err != nil { @@ -167,54 +183,48 @@ func (worker *Worker) Work() { } }() default: - worker.JobQueue <- job + go worker.handleJob(job) } } } - close(worker.in) +} + +// job handler +func (worker *Worker) handleJob(job *Job) { + if worker.JobHandler != nil { + if err := worker.JobHandler(job); err != nil { + worker.err(err) + } + } } // Close. -func (worker *Worker) Close() (err error) { - for _, v := range worker.clients { - err = v.Close() - } - worker.running = false - return err -} - -// Write a job to job server. -// Here, the job's mean is not the oraginal mean. -// Just looks like a network package for job's result or tell job server, there was a fail. -func (worker *Worker) Broadcast(job *Job) { - for _, v := range worker.agents { - go func() { - if err := v.WriteJob(job); err != nil { - worker.err(err) - } - }() +func (worker *Worker) Close() { + close(worker.in) + if worker.limit != nil { + close(worker.limit) } } // Send a something out, get the samething back. -func (worker *Worker) Echo(data []byte) (err error) { +func (worker *Worker) Echo(data []byte) { job := newJob(common.REQ, common.ECHO_REQ, data) - return worker.WriteJob(job) + worker.broadcast(job) } // Remove all of functions. // Both from the worker or job servers. -func (worker *Worker) Reset() (err error) { +func (worker *Worker) Reset() { job := newJob(common.REQ, common.RESET_ABILITIES, nil) - err = worker.WriteJob(job) - worker.functions = make(JobFunctionMap) - return + worker.broadcast(job) + worker.funcs = make(JobFuncs) } // Set the worker's unique id. -func (worker *Worker) SetId(id string) (err error) { +func (worker *Worker) SetId(id string) { + worker.Id = id job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) - return worker.WriteJob(job) + worker.broadcast(job) } // Execute the job. And send back the result. @@ -240,11 +250,11 @@ func (worker *Worker) exec(job *Job) (err error) { job.UniqueId = string(jobdata[2]) job.Data = jobdata[3] } - f, ok := worker.functions[funcname] + f, ok := worker.funcs[funcname] if !ok { - return common.ErrFuncNotFound + return common.Errorf("The function does not exist: %s", funcname) } - result, err := f(job) + result, err := f.f(job) var datatype uint32 if err == nil { datatype = common.WORK_COMPLETE @@ -259,7 +269,17 @@ func (worker *Worker) exec(job *Job) (err error) { job.magicCode = common.REQ job.DataType = datatype job.Data = result - - worker.WriteJob(job) + job.agent.WriteJob(job) return } + +func (worker *Worker) removeAgent(a *agent) { + for k, v := range worker.agents { + if v == a { + worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...) + } + } + if len(worker.agents) == 0 { + worker.Close() + } +}