replease ErrQueue to ErrFunc, add a limitation to executing job

This commit is contained in:
mikespook 2012-05-10 21:25:33 +08:00
parent f7896651a4
commit e4172dec11
4 changed files with 52 additions and 19 deletions

View File

@ -16,7 +16,10 @@ func ToUpper(job *worker.WorkerJob) ([]byte, error) {
} }
func main() { func main() {
w := worker.New() w := worker.New(worker.Unlimit)
w.ErrFunc = func(e error) {
log.Println(e)
}
w.AddServer("127.0.0.1:4730") w.AddServer("127.0.0.1:4730")
w.AddFunction("ToUpper", ToUpper, 0) w.AddFunction("ToUpper", ToUpper, 0)
w.AddFunction("ToUpperTimeOut5", ToUpper, 5) w.AddFunction("ToUpperTimeOut5", ToUpper, 5)
@ -44,6 +47,7 @@ func main() {
} }
log.Println(string(job.Data)) log.Println(string(job.Data))
case "quit": case "quit":
os.Exit(0)
return return
case "result": case "result":
job := <-w.JobQueue job := <-w.JobQueue

View File

@ -25,28 +25,28 @@ usage:
type Client struct { type Client struct {
mutex sync.Mutex mutex sync.Mutex
conn net.Conn conn net.Conn
JobQueue chan *ClientJob
incoming chan []byte incoming chan []byte
JobQueue chan *ClientJob
UId uint32 UId uint32
} }
// Create a new client. // Create a new client.
func New() (client *Client) { func New() (client *Client) {
client = &Client{JobQueue: make(chan *ClientJob, gearman.QUEUE_CAP), return &Client{
JobQueue: make(chan *ClientJob, gearman.QUEUE_CAP),
incoming: make(chan []byte, gearman.QUEUE_CAP), incoming: make(chan []byte, gearman.QUEUE_CAP),
UId:1} UId:1}
return
} }
// Add a server. // Add a server.
// In this version, one client connect to one job server. // In this version, one client connect to one job server.
// Sample is better. Plz do the load balancing by your self. // Sample is better. Plz do the load balancing by your self.
func (client *Client) AddServer(addr string) (err error) { func (client *Client) AddServer(addr string) (err error) {
conn, err := net.Dial(gearman.TCP, addr) client.conn, err = net.Dial(gearman.TCP, addr)
if err != nil { if err != nil {
return return
} }
client.conn = conn
return return
} }

View File

@ -82,12 +82,12 @@ func (agent *jobAgent) Work() {
} }
rel, err := agent.read() rel, err := agent.read()
if err != nil { if err != nil {
agent.worker.ErrQueue <- err agent.worker.err(err)
continue continue
} }
job, err := DecodeWorkerJob(rel) job, err := DecodeWorkerJob(rel)
if err != nil { if err != nil {
agent.worker.ErrQueue <- err agent.worker.err(err)
continue continue
} else { } else {
switch job.DataType { switch job.DataType {

View File

@ -10,20 +10,27 @@ import (
"sync" "sync"
) )
const (
Unlimit = 0
OneByOne = 1
)
// The definition of the callback function. // The definition of the callback function.
type JobFunction func(job *WorkerJob) ([]byte, error) type JobFunction func(job *WorkerJob) ([]byte, error)
// Map for added function. // Map for added function.
type JobFunctionMap map[string]JobFunction type JobFunctionMap map[string]JobFunction
// Error Function
type ErrFunc func(e error)
/* /*
Worker side api for gearman. Worker side api for gearman.
usage: usage:
worker = NewWorker() w = worker.New(worker.Unlimit)
worker.AddFunction("foobar", foobar) w.AddFunction("foobar", foobar)
worker.AddServer("127.0.0.1:4730") w.AddServer("127.0.0.1:4730")
worker.Work() // Enter the worker's main loop w.Work() // Enter the worker's main loop
The definition of the callback function 'foobar' should suit for the type 'JobFunction'. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
It looks like this: It looks like this:
@ -37,16 +44,20 @@ func foobar(job *WorkerJob) (data []byte, err os.Error) {
type Worker struct { type Worker struct {
clients []*jobAgent clients []*jobAgent
functions JobFunctionMap functions JobFunctionMap
running bool running bool
incoming chan *WorkerJob incoming chan *WorkerJob
mutex sync.Mutex mutex sync.Mutex
limit chan bool
JobQueue chan *WorkerJob JobQueue chan *WorkerJob
ErrQueue chan error
// assign a ErrFunc to handle errors
// Must assign befor AddServer
ErrFunc ErrFunc
} }
// Get a new worker // Get a new worker
func New() (worker *Worker) { func New(l int) (worker *Worker) {
worker = &Worker{ worker = &Worker{
// job server list // job server list
clients: make([]*jobAgent, 0, gearman.WORKER_SERVER_CAP), clients: make([]*jobAgent, 0, gearman.WORKER_SERVER_CAP),
@ -54,12 +65,24 @@ func New() (worker *Worker) {
functions: make(JobFunctionMap), functions: make(JobFunctionMap),
incoming: make(chan *WorkerJob, gearman.QUEUE_CAP), incoming: make(chan *WorkerJob, gearman.QUEUE_CAP),
JobQueue: make(chan *WorkerJob, gearman.QUEUE_CAP), JobQueue: make(chan *WorkerJob, gearman.QUEUE_CAP),
ErrQueue: make(chan error, gearman.QUEUE_CAP),
running: true, running: true,
} }
if l != Unlimit {
worker.limit = make(chan bool, l)
for i := 0; i < l; i ++ {
worker.limit <- true
}
}
return return
} }
//
func (worker *Worker)err(e error) {
if worker.ErrFunc != nil {
worker.ErrFunc(e)
}
}
// Add a server. The addr should be 'host:port' format. // Add a server. The addr should be 'host:port' format.
// The connection is established at this time. // The connection is established at this time.
func (worker *Worker) AddServer(addr string) (err error) { func (worker *Worker) AddServer(addr string) (err error) {
@ -141,11 +164,11 @@ func (worker *Worker) Work() {
// do nothing // do nothing
case gearman.ERROR: case gearman.ERROR:
_, err := gearman.GetError(job.Data) _, err := gearman.GetError(job.Data)
worker.ErrQueue <- err worker.err(err)
case gearman.JOB_ASSIGN, gearman.JOB_ASSIGN_UNIQ: case gearman.JOB_ASSIGN, gearman.JOB_ASSIGN_UNIQ:
go func() { go func() {
if err := worker.exec(job); err != nil { if err := worker.exec(job); err != nil {
worker.ErrQueue <- err worker.err(err)
} }
}() }()
default: default:
@ -217,6 +240,12 @@ func (worker *Worker) SetId(id string) (err error) {
// Execute the job. And send back the result. // Execute the job. And send back the result.
func (worker *Worker) exec(job *WorkerJob) (err error) { func (worker *Worker) exec(job *WorkerJob) (err error) {
if worker.limit != nil {
<- worker.limit
defer func() {
worker.limit <- true
}()
}
var limit int var limit int
if job.DataType == gearman.JOB_ASSIGN { if job.DataType == gearman.JOB_ASSIGN {
limit = 3 limit = 3