The worker can be working now.

--HG--
branch : 0.1
This commit is contained in:
mikespook 2012-05-24 16:49:35 +08:00
parent 2960cb9953
commit d6a9025a56
4 changed files with 222 additions and 173 deletions

View File

@ -1,25 +1,37 @@
package main package main
import ( import (
"bitbucket.org/mikespook/gearman-go/worker" "os"
// "bitbucket.org/mikespook/golib/signal"
// "os"
"log" "log"
"strings" "strings"
"bitbucket.org/mikespook/golib/signal"
"bitbucket.org/mikespook/gearman-go/worker"
) )
func ToUpper(job *worker.Job) ([]byte, error) { func ToUpper(job *worker.Job) ([]byte, error) {
log.Printf("Handle=[%s]; UID=[%s], Data=[%s]\n",
job.Handle, job.UniqueId, job.Data)
data := []byte(strings.ToUpper(string(job.Data))) data := []byte(strings.ToUpper(string(job.Data)))
return data, nil return data, nil
} }
func main() { func main() {
log.Println("Starting ...")
defer log.Println("Shutdown complete!")
w := worker.New(worker.Unlimited) w := worker.New(worker.Unlimited)
w.ErrHandler = func(e error) { w.ErrHandler = func(e error) {
log.Println(e) log.Println(e)
} }
w.JobHandler = func(job *worker.Job) error {
log.Printf("H=%s, UID=%s, Data=%s\n", job.Handle,
job.UniqueId, job.Data)
return nil
}
w.AddServer("127.0.0.1:4730") w.AddServer("127.0.0.1:4730")
w.AddFunction("ToUpper", ToUpper, 0) w.AddFunc("ToUpper", ToUpper, 0)
w.AddFunction("ToUpperTimeOut5", ToUpper, 5) //w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
w.Work() go w.Work()
sh := signal.NewHandler()
sh.Bind(os.Interrupt, func() bool {return true})
sh.Loop()
} }

View File

@ -11,43 +11,102 @@ import (
) )
// The agent of job server. // The agent of job server.
type jobAgent struct { type agent struct {
conn net.Conn conn net.Conn
worker *Worker worker *Worker
running bool
in chan []byte in chan []byte
out chan *Job out chan *Job
} }
// Create the agent of job server. // Create the agent of job server.
func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err error) { func newAgent(addr string, worker *Worker) (a *agent, err error) {
conn, err := net.Dial(common.NETWORK, addr) conn, err := net.Dial(common.NETWORK, addr)
if err != nil { if err != nil {
return nil, err return
} }
jobagent = &jobAgent{ a = &agent{
conn: conn, conn: conn,
worker: worker, worker: worker,
running: true,
in: make(chan []byte, common.QUEUE_SIZE), in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE),
} }
return jobagent, err return
}
// outputing loop
func (a *agent) outLoop() {
ok := true
for ok {
if job, ok := <-a.out; ok {
if err := a.write(job.Encode()); err != nil {
a.worker.err(err)
}
}
}
}
// inputing loop
func (a *agent) inLoop() {
defer func() {
a.conn.Close()
close(a.in)
close(a.out)
a.worker.removeAgent(a)
}()
noop := true
for a.worker.running {
// got noop msg and in queue is zero, grab job
if noop && len(a.in) == 0 {
a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil))
}
rel, err := a.read()
if err != nil {
if err == common.ErrEmptyReading {
break
}
a.worker.err(err)
continue
}
job, err := decodeJob(rel)
if err != nil {
a.worker.err(err)
continue
}
switch job.DataType {
case common.NOOP:
noop = true
case common.NO_JOB:
noop = false
a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil))
case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
job.agent = a
a.worker.in <- job
}
}
}
func (a *agent) Work() {
go a.outLoop()
go a.inLoop()
} }
// Internal read // Internal read
func (agent *jobAgent) read() (data []byte, err error) { func (a *agent) read() (data []byte, err error) {
if len(agent.in) > 0 { if len(a.in) > 0 {
// in queue is not empty // in queue is not empty
data = <-agent.in data = <-a.in
} else { } else {
for { for {
buf := make([]byte, common.BUFFER_SIZE) buf := make([]byte, common.BUFFER_SIZE)
var n int var n int
if n, err = agent.conn.Read(buf); err != nil { if n, err = a.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 { if err == io.EOF && n == 0 {
err = nil if data == nil {
err = common.ErrEmptyReading
return return
} }
break
}
return return
} }
data = append(data, buf[0:n]...) data = append(data, buf[0:n]...)
@ -57,16 +116,17 @@ func (agent *jobAgent) read() (data []byte, err error) {
} }
} }
// split package // split package
start := 0
tl := len(data) tl := len(data)
start := 0
for i := 0; i < tl; i++ { for i := 0; i < tl; i++ {
if string(data[start:start+4]) == common.RES_STR { if string(data[start:start+4]) == common.RES_STR {
l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) l := int(common.BytesToUint32([4]byte{data[start+8],
data[start+9], data[start+10], data[start+11]}))
total := l + 12 total := l + 12
if total == tl { if total == tl {
return return
} else { } else {
agent.in <- data[total:] a.in <- data[total:]
data = data[:total] data = data[:total]
return return
} }
@ -74,64 +134,22 @@ func (agent *jobAgent) read() (data []byte, err error) {
start++ start++
} }
} }
err = common.ErrInvalidData return nil, common.Errorf("Invalid data: %V", data)
return
}
// Main loop.
func (agent *jobAgent) Work() {
noop := true
for agent.running {
// got noop msg and in queue is zero, grab job
if noop && len(agent.in) == 0 {
agent.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil))
}
rel, err := agent.read()
if err != nil {
agent.worker.err(err)
continue
}
job, err := decodeJob(rel)
if err != nil {
agent.worker.err(err)
continue
} else {
switch job.DataType {
case common.NOOP:
noop = true
case common.NO_JOB:
noop = false
agent.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil))
case common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
job.agent = agent
agent.worker.in <- job
}
}
}
return
} }
// Send a job to the job server. // Send a job to the job server.
func (agent *jobAgent) WriteJob(job *Job) (err error) { func (a *agent) WriteJob(job *Job) {
return agent.write(job.Encode()) a.out <- job
} }
// Internal write the encoded job. // Internal write the encoded job.
func (agent *jobAgent) write(buf []byte) (err error) { func (a *agent) write(buf []byte) (err error) {
var n int var n int
for i := 0; i < len(buf); i += n { for i := 0; i < len(buf); i += n {
n, err = agent.conn.Write(buf[i:]) n, err = a.conn.Write(buf[i:])
if err != nil { if err != nil {
return err return err
} }
} }
return return
} }
// Close.
func (agent *jobAgent) Close() (err error) {
agent.running = false
close(agent.in)
err = agent.conn.Close()
return
}

View File

@ -1,4 +1,5 @@
// Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved. // Copyright 2011 Xing Xing <mikespook@gmail.com>
// All rights reserved.
// Use of this source code is governed by a MIT // Use of this source code is governed by a MIT
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
@ -13,7 +14,7 @@ import (
type Job struct { type Job struct {
Data []byte Data []byte
Handle, UniqueId string Handle, UniqueId string
agent *jobAgent agent *agent
magicCode, DataType uint32 magicCode, DataType uint32
} }
@ -27,14 +28,12 @@ func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
// Decode job from byte slice // Decode job from byte slice
func decodeJob(data []byte) (job *Job, err error) { func decodeJob(data []byte) (job *Job, err error) {
if len(data) < 12 { if len(data) < 12 {
err = common.ErrInvalidData return nil, common.Errorf("Invalid data: %V", data)
return
} }
datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
if len(data[12:]) != int(l) { if len(data[12:]) != int(l) {
err = common.ErrInvalidData return nil, common.Errorf("Invalid data: %V", data)
return
} }
data = data[12:] data = data[12:]
job = newJob(common.RES, datatype, data) job = newJob(common.RES, datatype, data)
@ -44,11 +43,11 @@ func decodeJob(data []byte) (job *Job, err error) {
// Encode a job to byte slice // Encode a job to byte slice
func (job *Job) Encode() (data []byte) { func (job *Job) Encode() (data []byte) {
l := len(job.Data) l := len(job.Data)
tl := l + 12 tl := l
if job.Handle != "" { if job.Handle != "" {
tl += len(job.Handle) + 1 tl += len(job.Handle) + 1
} }
data = make([]byte, 0, tl) data = make([]byte, 0, tl + 12)
magiccode := common.Uint32ToBytes(job.magicCode) magiccode := common.Uint32ToBytes(job.magicCode)
datatype := common.Uint32ToBytes(job.DataType) datatype := common.Uint32ToBytes(job.DataType)
@ -67,7 +66,7 @@ func (job *Job) Encode() (data []byte) {
// Send some datas to client. // Send some datas to client.
// Using this in a job's executing. // Using this in a job's executing.
func (job *Job) UpdateData(data []byte, iswaring bool) (err error) { func (job *Job) UpdateData(data []byte, iswaring bool) {
result := append([]byte(job.Handle), 0) result := append([]byte(job.Handle), 0)
result = append(result, data...) result = append(result, data...)
var datatype uint32 var datatype uint32
@ -76,16 +75,16 @@ func (job *Job) UpdateData(data []byte, iswaring bool) (err error) {
} else { } else {
datatype = common.WORK_DATA datatype = common.WORK_DATA
} }
return job.agent.WriteJob(newJob(common.REQ, datatype, result)) job.agent.WriteJob(newJob(common.REQ, datatype, result))
} }
// Update status. // Update status.
// Tall client how many percent job has been executed. // Tall client how many percent job has been executed.
func (job *Job) UpdateStatus(numerator, denominator int) (err error) { func (job *Job) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator)) n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator)) d := []byte(strconv.Itoa(denominator))
result := append([]byte(job.Handle), 0) result := append([]byte(job.Handle), 0)
result = append(result, n...) result = append(result, n...)
result = append(result, d...) result = append(result, d...)
return job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result))
} }

View File

@ -13,21 +13,28 @@ const (
Unlimited = 0 Unlimited = 0
OneByOne = 1 OneByOne = 1
) )
// Job handler
type JobHandler func(*Job) error
// The definition of the callback function.
type JobFunc func(job *Job) ([]byte, error) type JobFunc func(job *Job) ([]byte, error)
// The definition of the callback function.
type jobFunc struct {
f JobFunc
timeout uint32
}
// Map for added function. // Map for added function.
type JobFuncs map[string]JobFunc type JobFuncs map[string]*jobFunc
/* /*
Worker side api for gearman Worker side api for gearman
usage: usage:
w = worker.New(worker.Unlimited) w = worker.New(worker.Unlimited)
w.AddFunction("foobar", foobar) w.AddFunction("foobar", foobar)
w.AddServer("127.0.0.1:4730") w.AddServer("127.0.0.1:4730")
w.Work() // Enter the worker's main loop w.Work() // Enter the worker's main loop
The definition of the callback function 'foobar' should suit for the type 'JobFunction'. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
It looks like this: It looks like this:
@ -39,27 +46,24 @@ func foobar(job *Job) (data []byte, err os.Error) {
} }
*/ */
type Worker struct { type Worker struct {
agents []*jobAgent agents []*agent
funcs JobFuncs funcs JobFuncs
in chan *Job in chan *Job
out chan *Job
running bool running bool
limit chan bool limit chan bool
Id string Id string
// assign a ErrFunc to handle errors // assign a ErrFunc to handle errors
ErrHandler common.ErrorHandler ErrHandler common.ErrorHandler
JobHandler JobHandler
} }
// Get a new worker // Get a new worker
func New(l int) (worker *Worker) { func New(l int) (worker *Worker) {
worker = &Worker{ worker = &Worker{
agents: make([]*jobAgent, 0), agents: make([]*agent, 0),
functions: make(JobFunctionMap), funcs: make(JobFuncs),
in: make(chan *Job, common.QUEUE_SIZE), in: make(chan *Job, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE),
running: true,
} }
if l != Unlimited { if l != Unlimited {
worker.limit = make(chan bool, l) worker.limit = make(chan bool, l)
@ -67,7 +71,6 @@ func New(l int) (worker *Worker) {
worker.limit <- true worker.limit <- true
} }
} }
go worker.outLoop()
return return
} }
@ -81,37 +84,42 @@ func (worker *Worker)err(e error) {
// Add a server. The addr should be 'host:port' format. // Add a server. The addr should be 'host:port' format.
// The connection is established at this time. // The connection is established at this time.
func (worker *Worker) AddServer(addr string) (err error) { func (worker *Worker) AddServer(addr string) (err error) {
worker.mutex.Lock()
defer worker.mutex.Unlock()
if len(worker.clients) == cap(worker.clients) {
return common.ErrOutOfCap
}
// Create a new job server's client as a agent of server // Create a new job server's client as a agent of server
server, err := newJobAgent(addr, worker) server, err := newAgent(addr, worker)
if err != nil { if err != nil {
return err return err
} }
worker.agents = append(worker.agents, server)
n := len(worker.clients)
worker.clients = worker.clients[0 : n+1]
worker.clients[n] = server
return return
} }
// Write a job to job server.
// Here, the job's mean is not the oraginal mean.
// Just looks like a network package for job's result or tell job server, there was a fail.
func (worker *Worker) broadcast(job *Job) {
for _, v := range worker.agents {
v.WriteJob(job)
}
}
// Add a function. // Add a function.
// Plz added job servers first, then functions. // Plz added job servers first, then functions.
// The API will tell every connected job server that 'I can do this' // The API will tell every connected job server that 'I can do this'
func (worker *Worker) AddFunction(funcname string, func (worker *Worker) AddFunc(funcname string,
f JobFunc, timeout uint32) (err error) { f JobFunc, timeout uint32) (err error) {
if len(worker.clients) < 1 { if _, ok := worker.funcs[funcname]; ok {
return common.ErrNotConn return common.Errorf("The function already exists: %s", funcname)
} }
worker.mutex.Lock() worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
defer worker.mutex.Unlock()
worker.functions[funcname] = f
if worker.running {
worker.addFunc(funcname, timeout)
}
return
}
// inner add function
func (worker *Worker) addFunc(funcname string, timeout uint32) {
var datatype uint32 var datatype uint32
var data []byte var data []byte
if timeout == 0 { if timeout == 0 {
@ -124,42 +132,50 @@ func (worker *Worker) AddFunction(funcname string,
data = append(data, t[:]...) data = append(data, t[:]...)
} }
job := newJob(common.REQ, datatype, data) job := newJob(common.REQ, datatype, data)
worker.WriteJob(job) worker.broadcast(job)
return
} }
// Remove a function. // Remove a function.
// Tell job servers 'I can not do this now' at the same time. // Tell job servers 'I can not do this now' at the same time.
func (worker *Worker) RemoveFunction(funcname string) (err error) { func (worker *Worker) RemoveFunc(funcname string) (err error) {
worker.mutex.Lock() if _, ok := worker.funcs[funcname]; !ok {
defer worker.mutex.Unlock() return common.Errorf("The function does not exist: %s", funcname)
}
if worker.functions[funcname] == nil { delete(worker.funcs, funcname)
return common.ErrFuncNotFound if worker.running {
worker.removeFunc(funcname)
} }
delete(worker.functions, funcname)
job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
worker.WriteJob(job)
return return
} }
// inner remove function
func (worker *Worker) removeFunc(funcname string) {
job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
worker.broadcast(job)
}
// Main loop // Main loop
func (worker *Worker) Work() { func (worker *Worker) Work() {
for _, v := range worker.clients { defer func() {
worker.running = false
}()
for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout)
}
worker.running = true
for _, v := range worker.agents {
go v.Work() go v.Work()
} }
for worker.running || len(worker.in) > 0{ ok := true
select { for ok {
case job := <-worker.in: if job, ok := <-worker.in; ok {
if job == nil {
break
}
switch job.DataType { switch job.DataType {
case common.NO_JOB:
// do nothing
case common.ERROR: case common.ERROR:
go func() {
_, err := common.GetError(job.Data) _, err := common.GetError(job.Data)
worker.err(err) worker.err(err)
}()
case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
go func() { go func() {
if err := worker.exec(job); err != nil { if err := worker.exec(job); err != nil {
@ -167,54 +183,48 @@ func (worker *Worker) Work() {
} }
}() }()
default: default:
worker.JobQueue <- job go worker.handleJob(job)
} }
} }
} }
close(worker.in) }
// job handler
func (worker *Worker) handleJob(job *Job) {
if worker.JobHandler != nil {
if err := worker.JobHandler(job); err != nil {
worker.err(err)
}
}
} }
// Close. // Close.
func (worker *Worker) Close() (err error) { func (worker *Worker) Close() {
for _, v := range worker.clients { close(worker.in)
err = v.Close() if worker.limit != nil {
} close(worker.limit)
worker.running = false
return err
}
// Write a job to job server.
// Here, the job's mean is not the oraginal mean.
// Just looks like a network package for job's result or tell job server, there was a fail.
func (worker *Worker) Broadcast(job *Job) {
for _, v := range worker.agents {
go func() {
if err := v.WriteJob(job); err != nil {
worker.err(err)
}
}()
} }
} }
// Send a something out, get the samething back. // Send a something out, get the samething back.
func (worker *Worker) Echo(data []byte) (err error) { func (worker *Worker) Echo(data []byte) {
job := newJob(common.REQ, common.ECHO_REQ, data) job := newJob(common.REQ, common.ECHO_REQ, data)
return worker.WriteJob(job) worker.broadcast(job)
} }
// Remove all of functions. // Remove all of functions.
// Both from the worker or job servers. // Both from the worker or job servers.
func (worker *Worker) Reset() (err error) { func (worker *Worker) Reset() {
job := newJob(common.REQ, common.RESET_ABILITIES, nil) job := newJob(common.REQ, common.RESET_ABILITIES, nil)
err = worker.WriteJob(job) worker.broadcast(job)
worker.functions = make(JobFunctionMap) worker.funcs = make(JobFuncs)
return
} }
// Set the worker's unique id. // Set the worker's unique id.
func (worker *Worker) SetId(id string) (err error) { func (worker *Worker) SetId(id string) {
worker.Id = id
job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id))
return worker.WriteJob(job) worker.broadcast(job)
} }
// Execute the job. And send back the result. // Execute the job. And send back the result.
@ -240,11 +250,11 @@ func (worker *Worker) exec(job *Job) (err error) {
job.UniqueId = string(jobdata[2]) job.UniqueId = string(jobdata[2])
job.Data = jobdata[3] job.Data = jobdata[3]
} }
f, ok := worker.functions[funcname] f, ok := worker.funcs[funcname]
if !ok { if !ok {
return common.ErrFuncNotFound return common.Errorf("The function does not exist: %s", funcname)
} }
result, err := f(job) result, err := f.f(job)
var datatype uint32 var datatype uint32
if err == nil { if err == nil {
datatype = common.WORK_COMPLETE datatype = common.WORK_COMPLETE
@ -259,7 +269,17 @@ func (worker *Worker) exec(job *Job) (err error) {
job.magicCode = common.REQ job.magicCode = common.REQ
job.DataType = datatype job.DataType = datatype
job.Data = result job.Data = result
job.agent.WriteJob(job)
worker.WriteJob(job)
return return
} }
func (worker *Worker) removeAgent(a *agent) {
for k, v := range worker.agents {
if v == a {
worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...)
}
}
if len(worker.agents) == 0 {
worker.Close()
}
}