From 7614c2678a8fcde805257eece6a9b12729efc68d Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 22 May 2012 20:05:39 +0800 Subject: [PATCH 1/5] make a new branch, refactoring codes --HG-- branch : 0.1 rename : gearman/client/client.go => client/client.go rename : gearman/client/client_test.go => client/client_test.go rename : gearman/client/clientjob.go => client/job.go rename : gearman/gearman.go => common/gearman.go rename : gearman/worker/jobagent.go => worker/jobagent.go rename : gearman/worker/worker.go => worker/worker.go rename : gearman/worker/worker_test.go => worker/worker_test.go rename : gearman/worker/workerjob.go => worker/workerjob.go --- client/client.go | 263 ++++++++++++++++++++ client/client_test.go | 43 ++++ client/job.go | 105 ++++++++ common/error.go | 47 ++++ {gearman => common}/gearman.go | 59 +---- example/client.go | 48 ++-- gearman.go | 16 +- gearman/client/client.go | 278 ---------------------- gearman/client/client_test.go | 41 ---- gearman/client/clientjob.go | 98 -------- {gearman/worker => worker}/jobagent.go | 2 +- {gearman/worker => worker}/worker.go | 2 +- {gearman/worker => worker}/worker_test.go | 0 {gearman/worker => worker}/workerjob.go | 2 +- 14 files changed, 503 insertions(+), 501 deletions(-) create mode 100644 client/client.go create mode 100644 client/client_test.go create mode 100644 client/job.go create mode 100644 common/error.go rename {gearman => common}/gearman.go (50%) delete mode 100644 gearman/client/client.go delete mode 100644 gearman/client/client_test.go delete mode 100644 gearman/client/clientjob.go rename {gearman/worker => worker}/jobagent.go (98%) rename {gearman/worker => worker}/worker.go (99%) rename {gearman/worker => worker}/worker_test.go (100%) rename {gearman/worker => worker}/workerjob.go (98%) diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..e19f6b2 --- /dev/null +++ b/client/client.go @@ -0,0 +1,263 @@ +// Copyright 2011 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "bytes" + "io" + "net" + "strconv" + "bitbucket.org/mikespook/golib/autoinc" + "bitbucket.org/mikespook/gearman-go/common" +) + +// Job handler +type JobHandler func(*Job) error +// Status handler +// handle, known, running, numerator, denominator +type StatusHandler func(string, bool, bool, uint64, uint64) + +/* +The client side api for gearman + +usage: + c := client.New("tcp4", "127.0.0.1:4730") + handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) + +*/ +type Client struct { + ErrHandler common.ErrorHandler + JobHandler JobHandler + StatusHandler StatusHandler + + in chan []byte + out chan *Job + jobCreated chan *Job + conn net.Conn + ai *autoinc.AutoInc +} + +// Create a new client. +// Connect to "addr" through "network" +// Eg. +// client, err := client.New("tcp4", "127.0.0.1:4730") +func New(network, addr string) (client *Client, err error) { + conn, err := net.Dial(network, addr) + if err != nil { + return + } + client = &Client{ + jobCreated: make(chan *Job), + in: make(chan []byte, common.QUEUE_SIZE), + out: make(chan *Job, common.QUEUE_SIZE), + conn: conn, + ai: autoinc.New(0, 1), + } + go client.outLoop() + go client.inLoop() + return +} + +// out loop +func (client *Client) outLoop() { + ok := true + for ok { + if job, ok := <-client.out; ok { + client.write(job.Encode()) + } + } +} + +// in loop +func (client *Client) inLoop() { + for { + rel, err := client.read() + if err != nil { + client.err(err) + continue + } + job, err := decodeJob(rel) + if err != nil { + client.err(err) + continue + } + switch job.DataType { + case common.ERROR: + _, err := common.GetError(job.Data) + client.err(err) + case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, + common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION, + common.ECHO_RES: + client.handleJob(job) + case common.JOB_CREATED: + client.jobCreated <- job + case common.STATUS_RES: + client.handleStatus(job) + } + } +} + +// inner read +func (client *Client) read() (data []byte, err error) { + if len(client.in) > 0 { + // incoming queue is not empty + data = <-client.in + } else { + // empty queue, read data from socket + for { + buf := make([]byte, common.BUFFER_SIZE) + var n int + if n, err = client.conn.Read(buf); err != nil { + if err == io.EOF && n == 0 { + break + } + return + } + data = append(data, buf[0:n]...) + if n < common.BUFFER_SIZE { + break + } + } + } + // split package + start, end := 0, 4 + tl := len(data) + for i := 0; i < tl; i++ { + if string(data[start:end]) == common.RES_STR { + l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + total := l + 12 + if total == tl { + return + } else { + client.in <- data[total:] + data = data[:total] + return + } + } else { + start++ + end++ + } + } + return nil, common.Errorf("Invalid data: %V", data) +} + +// error handler +func (client *Client) err (e error) { + if client.ErrHandler != nil { + client.ErrHandler(e) + } +} + +// job handler +func (client *Client) handleJob(job *Job) { + if client.JobHandler != nil { + if err := client.JobHandler(job); err != nil { + client.err(err) + } + } +} + +// status handler +func (client *Client) handleStatus(job *Job) { + if client.StatusHandler != nil { + data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) + if len(data) != 5 { + client.err(common.Errorf("Invalid data: %V", job.Data)) + return + } + handle := string(data[0]) + known := (data[1][0] == '1') + running := (data[2][0] == '1') + numerator, err := strconv.ParseUint(string(data[3][0]), 10, 0) + if err != nil { + client.err(common.Errorf("Invalid handle: %s", data[3][0])) + return + } + denominator, err := strconv.ParseUint(string(data[4][0]), 10, 0) + if err != nil { + client.err(common.Errorf("Invalid handle: %s", data[4][0])) + return + } + client.StatusHandler(handle, known, running, numerator, denominator) + } +} + +// Do the function. +// funcname is a string with function name. +// data is encoding to byte array. +// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, +// and if it is background job: JOB_BG. +// JOB_LOW | JOB_BG means the job is running with low level in background. +func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { + var datatype uint32 + if flag & common.JOB_LOW == common.JOB_LOW { + if flag & common.JOB_BG == common.JOB_BG { + datatype = common.SUBMIT_JOB_LOW_BG + } else { + datatype = common.SUBMIT_JOB_LOW + } + } else if flag & common.JOB_HIGH == common.JOB_HIGH { + if flag & common.JOB_BG == common.JOB_BG { + datatype = common.SUBMIT_JOB_HIGH_BG + } else { + datatype = common.SUBMIT_JOB_HIGH + } + } else if flag & common.JOB_BG == common.JOB_BG { + datatype = common.SUBMIT_JOB_BG + } else { + datatype = common.SUBMIT_JOB + } + + uid := strconv.Itoa(int(client.ai.Id())) + l := len(funcname) + len(uid) + len(data) + 2 + rel := make([]byte, l) + rel = append(rel, []byte(funcname)...) // len(funcname) + rel = append(rel, '\x00') // 1 Byte + rel = append(rel, []byte(uid)...) // len(uid) + rel = append(rel, '\x00') // 1 Byte + rel = append(rel, data...) // len(data) + client.writeJob(newJob(common.REQ, datatype, rel)) + + // Waiting for JOB_CREATED + job := <-client.jobCreated + return string(job.Data), nil +} + +// Get job status from job server. +// !!!Not fully tested.!!! +func (client *Client) Status(handle string) { + job := newJob(common.REQ, common.GET_STATUS, []byte(handle)) + client.writeJob(job) +} + +// Send a something out, get the samething back. +func (client *Client) Echo(data []byte) { + client.writeJob(newJob(common.REQ, common.ECHO_REQ, data)) +} + +// Send the job to job server. +func (client *Client) writeJob(job *Job) { + client.out <- job +} + +// Internal write +func (client *Client) write(buf []byte) (err error) { + var n int + for i := 0; i < len(buf); i += n { + n, err = client.conn.Write(buf[i:]) + if err != nil { + return + } + } + return +} + +// Close +func (client *Client) Close() (err error) { + close(client.jobCreated) + close(client.in) + return client.conn.Close(); +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..a85832f --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,43 @@ +package client + +import ( + "bitbucket.org/mikespook/gearman-go/common" + "testing" +) + +var client *Client + +func TestClientAddServer(t *testing.T) { + t.Log("Add local server 127.0.0.1:4730") + var err error + if client, err = New("tcp4", "127.0.0.1:4730"); err != nil { + t.Error(err) + } +} + +func TestClientEcho(t *testing.T) { + client.JobHandler = func(job *Job) error { + echo := string(job.Data) + if echo == "Hello world" { + t.Log(echo) + } else { + t.Errorf("Invalid echo data: %s", job.Data) + } + return nil + } + client.Echo([]byte("Hello world")) +} + +func TestClientDo(t *testing.T) { + if handle, err := client.Do("ToUpper", []byte("abcdef"), common.JOB_LOW|common.JOB_BG); err != nil { + t.Error(err) + } else { + t.Log(handle) + } +} + +func TestClientClose(t *testing.T) { + if err := client.Close(); err != nil { + t.Error(err) + } +} diff --git a/client/job.go b/client/job.go new file mode 100644 index 0000000..c381464 --- /dev/null +++ b/client/job.go @@ -0,0 +1,105 @@ +// Copyright 2011 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "bytes" + "bitbucket.org/mikespook/gearman-go/common" +) +// An error handler +type ErrorHandler func(error) + +// Client side job +type Job struct { + Data []byte + Handle, UniqueId string + magicCode, DataType uint32 +} + +// Create a new job +func newJob(magiccode, datatype uint32, data []byte) (job *Job) { + return &Job{magicCode: magiccode, + DataType: datatype, + Data: data} +} + +// Decode a job from byte slice +func decodeJob(data []byte) (job *Job, err error) { + if len(data) < 12 { + return nil, common.Errorf("Invalid data: %V", data) + } + datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + return nil, common.Errorf("Invalid data: %V", data) + } + data = data[12:] + return newJob(common.RES, datatype, data), nil +} + +// Encode a job to byte slice +func (job *Job) Encode() (data []byte) { + l := len(job.Data) + 12 + data = make([]byte, l) + + magiccode := common.Uint32ToBytes(job.magicCode) + datatype := common.Uint32ToBytes(job.DataType) + datalength := common.Uint32ToBytes(uint32(l)) + for i := 0; i < l; i ++ { + switch { + case i < 4: + data[i] = magiccode[i] + case i < 8: + data[i] = datatype[i - 4] + case i < 12: + data[i] = datalength[i - 8] + default: + data[i] = job.Data[i - 12] + } + } + return +} + +// Extract the job's result. +func (job *Job) Result() (data []byte, err error) { + switch job.DataType { + case common.WORK_FAIL: + job.Handle = string(job.Data) + return nil, common.ErrWorkFail + case common.WORK_EXCEPTION: + err = common.ErrWorkException + fallthrough + case common.WORK_COMPLETE: + s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + return nil, common.Errorf("Invalid data: %V", job.Data) + } + job.Handle = string(s[0]) + data = s[1] + default: + err = common.ErrDataType + } + return +} + +// Extract the job's update +func (job *Job) Update() (data []byte, err error) { + if job.DataType != common.WORK_DATA && job.DataType != common.WORK_WARNING { + err = common.ErrDataType + return + } + s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + err = common.ErrInvalidData + return + } + if job.DataType == common.WORK_WARNING { + err = common.ErrWorkWarning + } + job.Handle = string(s[0]) + data = s[1] + return +} diff --git a/common/error.go b/common/error.go new file mode 100644 index 0000000..25ca67e --- /dev/null +++ b/common/error.go @@ -0,0 +1,47 @@ +// Copyright 2011 - 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package common + +import ( + "fmt" + "bytes" + "errors" + "syscall" +) + +var ( + ErrInvalidData = errors.New("Invalid data.") + ErrWorkWarning = errors.New("Work warning.") + ErrWorkFail = errors.New("Work fail.") + ErrWorkException = errors.New("Work exeption.") + ErrDataType = errors.New("Invalid data type.") + ErrOutOfCap = errors.New("Out of the capability.") + ErrNotConn = errors.New("Did not connect to job server.") + ErrFuncNotFound = errors.New("The function was not found.") +) + +// Extract the error message +func GetError(data []byte) (eno syscall.Errno, err error) { + rel := bytes.SplitN(data, []byte{'\x00'}, 2) + if len(rel) != 2 { + err = Errorf("Not a error data: %V", data) + return + } + l := len(rel[0]) + eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) + err = errors.New(string(rel[1])) + return +} + +// Get a formated error +func Errorf(format string, msg ... interface{}) error { + return errors.New(fmt.Sprintf(format, msg ... )) +} + +// An error handler +type ErrorHandler func(error) + + diff --git a/gearman/gearman.go b/common/gearman.go similarity index 50% rename from gearman/gearman.go rename to common/gearman.go index 636c5ff..1bdd6f7 100644 --- a/gearman/gearman.go +++ b/common/gearman.go @@ -1,30 +1,17 @@ -// Copyright 2011 Xing Xing All rights reserved. +// Copyright 2011 - 2012 Xing Xing . +// All rights reserved. // Use of this source code is governed by a MIT // license that can be found in the LICENSE file. -/* -This module is Gearman API for golang. -The protocol was implemented by native way. -*/ - -package gearman - -import ( - "bytes" - "errors" - "syscall" -) +package common const ( - // tcp4 is tested. You can modify this to 'tcp' for both ipv4 and ipv6, - // or 'tcp6' only for ipv6. - TCP = "tcp4" // the number limited for job servers. WORKER_SERVER_CAP = 32 // the number limited for functions. WORKER_FUNCTION_CAP = 512 // queue size - QUEUE_CAP = 512 + QUEUE_SIZE = 512 // read buffer size BUFFER_SIZE = 1024 @@ -80,44 +67,14 @@ const ( JOB_HIGH = 4 ) -var ( - ErrIsNotErr = errors.New("The input is not a error data.") - ErrInvalidData = errors.New("Invalid data.") - ErrWorkWarning = errors.New("Work warning.") - ErrWorkFail = errors.New("Work fail.") - ErrWorkException = errors.New("Work exeption.") - ErrDataType = errors.New("Invalid data type.") - ErrOutOfCap = errors.New("Out of the capability.") - ErrNotConn = errors.New("Did not connect to job server.") - ErrFuncNotFound = errors.New("The function was not found.") -) - -// Extract the error message -func GetError(data []byte) (eno syscall.Errno, err error) { - rel := bytes.SplitN(data, []byte{'\x00'}, 2) - if len(rel) != 2 { - err = ErrIsNotErr - return - } - l := len(rel[0]) - eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) - err = errors.New(string(rel[1])) - return -} - // Decode [4]byte to uint32 func BytesToUint32(buf [4]byte) uint32 { - return uint32(buf[0])<<24 + - uint32(buf[1])<<16 + - uint32(buf[2])<<8 + + return uint32(buf[0])<<24 + uint32(buf[1])<<16 + uint32(buf[2])<<8 + uint32(buf[3]) } // Encode uint32 to [4]byte -func Uint32ToBytes(i uint32) (data [4]byte) { - data[0] = byte((i >> 24) & 0xff) - data[1] = byte((i >> 16) & 0xff) - data[2] = byte((i >> 8) & 0xff) - data[3] = byte(i & 0xff) - return +func Uint32ToBytes(i uint32) [4]byte { + return [4]byte{byte((i >> 24) & 0xff), byte((i >> 16) & 0xff), + byte((i >> 8) & 0xff), byte(i & 0xff),} } diff --git a/example/client.go b/example/client.go index ec8bd94..d406eef 100644 --- a/example/client.go +++ b/example/client.go @@ -1,46 +1,42 @@ package main import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bitbucket.org/mikespook/gearman-go/gearman/client" "log" + "sync" + "bitbucket.org/mikespook/gearman-go" + "bitbucket.org/mikespook/gearman-go/client" ) func main() { - client := client.New() - defer client.Close() - if err := client.AddServer("127.0.0.1:4730"); err != nil { + var wg sync.WaitGroup + + c, err := client.New("tcp4", "127.0.0.1:4730") + if err != nil { log.Fatalln(err) } + defer c.Close() echo := []byte("Hello\x00 world") - - if data, err := client.Echo(echo); err != nil { - log.Fatalln(string(data)) + c.JobHandler = func(job *client.Job) error { + log.Printf("%V", job) + wg.Done() + return nil } - handle, err := client.Do("ToUpper", echo, gearman.JOB_NORMAL) + wg.Add(1) + c.Echo(echo) + + handle, err := c.Do("ToUpper", echo, gearman.JOB_NORMAL) if err != nil { log.Fatalln(err) } else { log.Println(handle) - job := <-client.JobQueue - if data, err := job.Result(); err != nil { - log.Fatalln(err) - } else { - log.Println(string(data)) - } } - known, running, numerator, denominator, err := client.Status(handle) - if err != nil { - log.Fatalln(err) - } - if !known { - log.Println("Unknown") - } - if running { - log.Printf("%g%%\n", float32(numerator)*100/float32(denominator)) - } else { - log.Println("Not running") + c.StatusHandler = func(handle string, known, running bool, numerator, denominator uint64) { + log.Printf("%s: %b, %b, %d, %d", handle, known, running, numerator, denominator) + wg.Done() } + wg.Add(1) + c.Status(handle) + wg.Wait() } diff --git a/gearman.go b/gearman.go index 067831f..5c17914 100644 --- a/gearman.go +++ b/gearman.go @@ -1,4 +1,4 @@ -// Copyright 2012 Xing Xing All rights reserved. +// Copyright 2011 Xing Xing All rights reserved. // Use of this source code is governed by a MIT // license that can be found in the LICENSE file. @@ -9,7 +9,15 @@ The protocol was implemented by native way. package gearman -import ( - _ "bitbucket.org/mikespook/gearman-go/gearman/client" - _ "bitbucket.org/mikespook/gearman-go/gearman/worker" +const ( + // Job type + // JOB_NORMAL | JOB_BG means a normal level job run in background + // normal level + JOB_NORMAL = 0 + // background job + JOB_BG = 1 + // low level + JOB_LOW = 2 + // high level + JOB_HIGH = 4 ) diff --git a/gearman/client/client.go b/gearman/client/client.go deleted file mode 100644 index 156bb8f..0000000 --- a/gearman/client/client.go +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package client - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bytes" - "io" - "net" - "strconv" - "sync" -) - -/* -The client side api for gearman. - -usage: - client = NewClient() - client.AddServer("127.0.0.1:4730") - handle := client.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) - -*/ -type Client struct { - mutex sync.Mutex - conn net.Conn - incoming chan []byte - - JobQueue chan *ClientJob - UId uint32 -} - -// Create a new client. -func New() (client *Client) { - return &Client{ - JobQueue: make(chan *ClientJob, gearman.QUEUE_CAP), - incoming: make(chan []byte, gearman.QUEUE_CAP), - UId:1} -} - -// Add a server. -// In this version, one client connect to one job server. -// Sample is better. Plz do the load balancing by your self. -func (client *Client) AddServer(addr string) (err error) { - client.conn, err = net.Dial(gearman.TCP, addr) - if err != nil { - return - } - return -} - -// Internal read -func (client *Client) read() (data []byte, err error) { - if len(client.incoming) > 0 { - // incoming queue is not empty - data = <-client.incoming - } else { - // empty queue, read data from socket - for { - buf := make([]byte, gearman.BUFFER_SIZE) - var n int - if n, err = client.conn.Read(buf); err != nil { - if err == io.EOF && n == 0 { - break - } - return - } - data = append(data, buf[0:n]...) - if n < gearman.BUFFER_SIZE { - break - } - } - } - // split package - start, end := 0, 4 - tl := len(data) - for i := 0; i < tl; i++ { - if string(data[start:end]) == gearman.RES_STR { - l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) - total := l + 12 - if total == tl { - return - } else { - client.incoming <- data[total:] - data = data[:total] - return - } - } else { - start++ - end++ - } - } - err = gearman.ErrInvalidData - return -} - -// Read a job from job server. -// This function will return the job, and add it to the job queue. -func (client *Client) ReadJob() (job *ClientJob, err error) { - var rel []byte - if rel, err = client.read(); err != nil { - return - } - if job, err = DecodeClientJob(rel); err != nil { - return - } else { - switch job.DataType { - case gearman.ERROR: - _, err = gearman.GetError(job.Data) - return - case gearman.WORK_DATA, gearman.WORK_WARNING, gearman.WORK_STATUS, gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION: - client.JobQueue <- job - } - } - return -} - -// Do the function. -// funcname is a string with function name. -// data is encoding to byte array. -// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, -// and if it is background job: JOB_BG. -// JOB_LOW | JOB_BG means the job is running with low level in background. -func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { - var datatype uint32 - if flag&gearman.JOB_LOW == gearman.JOB_LOW { - if flag&gearman.JOB_BG == gearman.JOB_BG { - datatype = gearman.SUBMIT_JOB_LOW_BG - } else { - datatype = gearman.SUBMIT_JOB_LOW - } - } else if flag&gearman.JOB_HIGH == gearman.JOB_HIGH { - if flag&gearman.JOB_BG == gearman.JOB_BG { - datatype = gearman.SUBMIT_JOB_HIGH_BG - } else { - datatype = gearman.SUBMIT_JOB_HIGH - } - } else if flag&gearman.JOB_BG == gearman.JOB_BG { - datatype = gearman.SUBMIT_JOB_BG - } else { - datatype = gearman.SUBMIT_JOB - } - - rel := make([]byte, 0, 1024*64) - rel = append(rel, []byte(funcname)...) - rel = append(rel, '\x00') - client.mutex.Lock() - uid := strconv.Itoa(int(client.UId)) - client.UId++ - rel = append(rel, []byte(uid)...) - client.mutex.Unlock() - rel = append(rel, '\x00') - rel = append(rel, data...) - if err = client.WriteJob(NewClientJob(gearman.REQ, datatype, rel)); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(gearman.JOB_CREATED); err != nil { - return - } - handle = string(job.Data) - go func() { - if flag&gearman.JOB_BG != gearman.JOB_BG { - for { - if job, err = client.ReadJob(); err != nil { - return - } - switch job.DataType { - case gearman.WORK_DATA, gearman.WORK_WARNING: - case gearman.WORK_STATUS: - case gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION: - return - } - } - } - }() - return -} - -// Internal read last job -func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err error) { - for { - if job, err = client.ReadJob(); err != nil { - return - } - if job.DataType == datatype { - break - } - } - if job.DataType != datatype { - err = gearman.ErrDataType - } - return -} - -// Get job status from job server. -// !!!Not fully tested.!!! -func (client *Client) Status(handle string) (known, running bool, numerator, denominator uint64, err error) { - - if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.GET_STATUS, []byte(handle))); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(gearman.STATUS_RES); err != nil { - return - } - data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) - if len(data) != 5 { - err = gearman.ErrInvalidData - return - } - if handle != string(data[0]) { - err = gearman.ErrInvalidData - return - } - known = data[1][0] == '1' - running = data[2][0] == '1' - if numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0); err != nil { - return - } - if denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0); err != nil { - return - } - return -} - -// Send a something out, get the samething back. -func (client *Client) Echo(data []byte) (echo []byte, err error) { - if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.ECHO_REQ, data)); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(gearman.ECHO_RES); err != nil { - return - } - echo = job.Data - return -} - -// Get the last job. -// the job means a network package. -// Normally, it is the job executed result. -func (client *Client) LastJob() (job *ClientJob) { - if l := len(client.JobQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l-1; i++ { - <-client.JobQueue - } - } - return <-client.JobQueue -} - -// Send the job to job server. -func (client *Client) WriteJob(job *ClientJob) (err error) { - return client.write(job.Encode()) -} - -// Internal write -func (client *Client) write(buf []byte) (err error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = client.conn.Write(buf[i:]) - if err != nil { - return - } - } - return -} - -// Close. -func (client *Client) Close() (err error) { - err = client.conn.Close() - close(client.JobQueue) - return -} diff --git a/gearman/client/client_test.go b/gearman/client/client_test.go deleted file mode 100644 index f2ae43f..0000000 --- a/gearman/client/client_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package client - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "testing" -) - -var client *Client - -func init() { - client = NewClient() -} - -func TestClientAddServer(t *testing.T) { - t.Log("Add local server 127.0.0.1:4730") - if err := client.AddServer("127.0.0.1:4730"); err != nil { - t.Error(err) - } -} - -func TestClientEcho(t *testing.T) { - if echo, err := client.Echo([]byte("Hello world")); err != nil { - t.Error(err) - } else { - t.Log(echo) - } -} - -func TestClientDo(t *testing.T) { - if handle, err := client.Do("ToUpper", []byte("abcdef"), gearman.JOB_LOW|gearman.JOB_BG); err != nil { - t.Error(err) - } else { - t.Log(handle) - } -} - -func TestClientClose(t *testing.T) { - if err := client.Close(); err != nil { - t.Error(err) - } -} diff --git a/gearman/client/clientjob.go b/gearman/client/clientjob.go deleted file mode 100644 index 184b93a..0000000 --- a/gearman/client/clientjob.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package client - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bytes" -) - -// Client side job -type ClientJob struct { - Data []byte - Handle, UniqueId string - magicCode, DataType uint32 -} - -// Create a new job -func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) { - return &ClientJob{magicCode: magiccode, - DataType: datatype, - Data: data} -} - -// Decode a job from byte slice -func DecodeClientJob(data []byte) (job *ClientJob, err error) { - if len(data) < 12 { - err = gearman.ErrInvalidData - return - } - datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) - if len(data[12:]) != int(l) { - err = gearman.ErrInvalidData - return - } - data = data[12:] - job = NewClientJob(gearman.RES, datatype, data) - return -} - -// Encode a job to byte slice -func (job *ClientJob) Encode() (data []byte) { - magiccode := gearman.Uint32ToBytes(job.magicCode) - datatype := gearman.Uint32ToBytes(job.DataType) - data = make([]byte, 0, 1024*64) - data = append(data, magiccode[:]...) - data = append(data, datatype[:]...) - l := len(job.Data) - datalength := gearman.Uint32ToBytes(uint32(l)) - data = append(data, datalength[:]...) - data = append(data, job.Data...) - return -} - -// Extract the job's result. -func (job *ClientJob) Result() (data []byte, err error) { - switch job.DataType { - case gearman.WORK_FAIL: - job.Handle = string(job.Data) - err = gearman.ErrWorkFail - return - case gearman.WORK_EXCEPTION: - err = gearman.ErrWorkException - fallthrough - case gearman.WORK_COMPLETE: - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = gearman.ErrInvalidData - return - } - job.Handle = string(s[0]) - data = s[1] - default: - err = gearman.ErrDataType - } - return -} - -// Extract the job's update -func (job *ClientJob) Update() (data []byte, err error) { - if job.DataType != gearman.WORK_DATA && job.DataType != gearman.WORK_WARNING { - err = gearman.ErrDataType - return - } - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = gearman.ErrInvalidData - return - } - if job.DataType == gearman.WORK_WARNING { - err = gearman.ErrWorkWarning - } - job.Handle = string(s[0]) - data = s[1] - return -} diff --git a/gearman/worker/jobagent.go b/worker/jobagent.go similarity index 98% rename from gearman/worker/jobagent.go rename to worker/jobagent.go index 0aee90c..5456e94 100644 --- a/gearman/worker/jobagent.go +++ b/worker/jobagent.go @@ -5,7 +5,7 @@ package worker import ( - "bitbucket.org/mikespook/gearman-go/gearman" + gearman "bitbucket.org/mikespook/gearman-go" "io" "net" ) diff --git a/gearman/worker/worker.go b/worker/worker.go similarity index 99% rename from gearman/worker/worker.go rename to worker/worker.go index ca4589f..9a61937 100644 --- a/gearman/worker/worker.go +++ b/worker/worker.go @@ -5,7 +5,7 @@ package worker import ( - "bitbucket.org/mikespook/gearman-go/gearman" + gearman "bitbucket.org/mikespook/gearman-go" "bytes" "sync" ) diff --git a/gearman/worker/worker_test.go b/worker/worker_test.go similarity index 100% rename from gearman/worker/worker_test.go rename to worker/worker_test.go diff --git a/gearman/worker/workerjob.go b/worker/workerjob.go similarity index 98% rename from gearman/worker/workerjob.go rename to worker/workerjob.go index f4ab029..1f60e12 100644 --- a/gearman/worker/workerjob.go +++ b/worker/workerjob.go @@ -5,7 +5,7 @@ package worker import ( - "bitbucket.org/mikespook/gearman-go/gearman" + gearman "bitbucket.org/mikespook/gearman-go" "strconv" ) From 563af037cb2dd4ccbda0682dbc04e0120d55ddcc Mon Sep 17 00:00:00 2001 From: mikespook Date: Wed, 23 May 2012 15:22:29 +0800 Subject: [PATCH 2/5] fixed client:Job.Encode --HG-- branch : 0.1 --- client/client.go | 44 ++++++++++++++++++++++++------------------- client/client_test.go | 8 +++++--- client/job.go | 29 +++++++++++++++++++++++++--- common/error.go | 1 + common/gearman.go | 11 ----------- example/client.go | 14 +++++++++----- 6 files changed, 66 insertions(+), 41 deletions(-) diff --git a/client/client.go b/client/client.go index e19f6b2..0f8d00a 100644 --- a/client/client.go +++ b/client/client.go @@ -6,9 +6,9 @@ package client import ( - "bytes" "io" "net" + "bytes" "strconv" "bitbucket.org/mikespook/golib/autoinc" "bitbucket.org/mikespook/gearman-go/common" @@ -24,8 +24,8 @@ type StatusHandler func(string, bool, bool, uint64, uint64) The client side api for gearman usage: - c := client.New("tcp4", "127.0.0.1:4730") - handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) +c := client.New("tcp4", "127.0.0.1:4730") +handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) */ type Client struct { @@ -43,9 +43,9 @@ type Client struct { // Create a new client. // Connect to "addr" through "network" // Eg. -// client, err := client.New("tcp4", "127.0.0.1:4730") -func New(network, addr string) (client *Client, err error) { - conn, err := net.Dial(network, addr) +// client, err := client.New("127.0.0.1:4730") +func New(addr string) (client *Client, err error) { + conn, err := net.Dial("tcp", addr) if err != nil { return } @@ -56,8 +56,8 @@ func New(network, addr string) (client *Client, err error) { conn: conn, ai: autoinc.New(0, 1), } - go client.outLoop() go client.inLoop() + go client.outLoop() return } @@ -66,7 +66,9 @@ func (client *Client) outLoop() { ok := true for ok { if job, ok := <-client.out; ok { - client.write(job.Encode()) + if err := client.write(job.Encode()); err != nil { + client.err(err) + } } } } @@ -88,14 +90,14 @@ func (client *Client) inLoop() { case common.ERROR: _, err := common.GetError(job.Data) client.err(err) - case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, + case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION, common.ECHO_RES: - client.handleJob(job) + go client.handleJob(job) case common.JOB_CREATED: client.jobCreated <- job case common.STATUS_RES: - client.handleStatus(job) + go client.handleStatus(job) } } } @@ -112,6 +114,10 @@ func (client *Client) read() (data []byte, err error) { var n int if n, err = client.conn.Read(buf); err != nil { if err == io.EOF && n == 0 { + if data == nil { + err = common.ErrEmptyReading + return + } break } return @@ -123,8 +129,8 @@ func (client *Client) read() (data []byte, err error) { } } // split package - start, end := 0, 4 tl := len(data) + start, end := 0, 4 for i := 0; i < tl; i++ { if string(data[start:end]) == common.RES_STR { l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) @@ -193,19 +199,19 @@ func (client *Client) handleStatus(job *Job) { // JOB_LOW | JOB_BG means the job is running with low level in background. func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { var datatype uint32 - if flag & common.JOB_LOW == common.JOB_LOW { - if flag & common.JOB_BG == common.JOB_BG { + if flag & JOB_LOW == JOB_LOW { + if flag & JOB_BG == JOB_BG { datatype = common.SUBMIT_JOB_LOW_BG } else { datatype = common.SUBMIT_JOB_LOW } - } else if flag & common.JOB_HIGH == common.JOB_HIGH { - if flag & common.JOB_BG == common.JOB_BG { + } else if flag & JOB_HIGH == JOB_HIGH { + if flag & JOB_BG == JOB_BG { datatype = common.SUBMIT_JOB_HIGH_BG } else { datatype = common.SUBMIT_JOB_HIGH } - } else if flag & common.JOB_BG == common.JOB_BG { + } else if flag & JOB_BG == JOB_BG { datatype = common.SUBMIT_JOB_BG } else { datatype = common.SUBMIT_JOB @@ -213,14 +219,13 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string uid := strconv.Itoa(int(client.ai.Id())) l := len(funcname) + len(uid) + len(data) + 2 - rel := make([]byte, l) + rel := make([]byte, 0, l) rel = append(rel, []byte(funcname)...) // len(funcname) rel = append(rel, '\x00') // 1 Byte rel = append(rel, []byte(uid)...) // len(uid) rel = append(rel, '\x00') // 1 Byte rel = append(rel, data...) // len(data) client.writeJob(newJob(common.REQ, datatype, rel)) - // Waiting for JOB_CREATED job := <-client.jobCreated return string(job.Data), nil @@ -259,5 +264,6 @@ func (client *Client) write(buf []byte) (err error) { func (client *Client) Close() (err error) { close(client.jobCreated) close(client.in) + close(client.out) return client.conn.Close(); } diff --git a/client/client_test.go b/client/client_test.go index a85832f..3853642 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,7 +1,6 @@ package client import ( - "bitbucket.org/mikespook/gearman-go/common" "testing" ) @@ -10,9 +9,12 @@ var client *Client func TestClientAddServer(t *testing.T) { t.Log("Add local server 127.0.0.1:4730") var err error - if client, err = New("tcp4", "127.0.0.1:4730"); err != nil { + if client, err = New("127.0.0.1:4730"); err != nil { t.Error(err) } + client.ErrHandler = func(e error) { + t.Error(e) + } } func TestClientEcho(t *testing.T) { @@ -29,7 +31,7 @@ func TestClientEcho(t *testing.T) { } func TestClientDo(t *testing.T) { - if handle, err := client.Do("ToUpper", []byte("abcdef"), common.JOB_LOW|common.JOB_BG); err != nil { + if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { t.Error(err) } else { t.Log(handle) diff --git a/client/job.go b/client/job.go index c381464..6b7246a 100644 --- a/client/job.go +++ b/client/job.go @@ -9,6 +9,20 @@ import ( "bytes" "bitbucket.org/mikespook/gearman-go/common" ) + +const ( + // Job type + // JOB_NORMAL | JOB_BG means a normal level job run in background + // normal level + JOB_NORMAL = 0 + // background job + JOB_BG = 1 + // low level + JOB_LOW = 2 + // high level + JOB_HIGH = 4 +) + // An error handler type ErrorHandler func(error) @@ -42,13 +56,15 @@ func decodeJob(data []byte) (job *Job, err error) { // Encode a job to byte slice func (job *Job) Encode() (data []byte) { - l := len(job.Data) + 12 - data = make([]byte, l) + l := len(job.Data) + tl := l + 12 + data = make([]byte, tl) magiccode := common.Uint32ToBytes(job.magicCode) datatype := common.Uint32ToBytes(job.DataType) datalength := common.Uint32ToBytes(uint32(l)) - for i := 0; i < l; i ++ { + + for i := 0; i < tl; i ++ { switch { case i < 4: data[i] = magiccode[i] @@ -60,6 +76,13 @@ func (job *Job) Encode() (data []byte) { data[i] = job.Data[i - 12] } } + // Alternative + /* + data = append(data, magiccode[:] ...) + data = append(data, datatype[:] ...) + data = append(data, datalength[:] ...) + data = append(data, job.Data ...) + */ return } diff --git a/common/error.go b/common/error.go index 25ca67e..8c1e8fa 100644 --- a/common/error.go +++ b/common/error.go @@ -21,6 +21,7 @@ var ( ErrOutOfCap = errors.New("Out of the capability.") ErrNotConn = errors.New("Did not connect to job server.") ErrFuncNotFound = errors.New("The function was not found.") + ErrEmptyReading = errors.New("Empty reading.") ) // Extract the error message diff --git a/common/gearman.go b/common/gearman.go index 1bdd6f7..eebaec1 100644 --- a/common/gearman.go +++ b/common/gearman.go @@ -54,17 +54,6 @@ const ( SUBMIT_JOB_HIGH_BG = 32 SUBMIT_JOB_LOW = 33 SUBMIT_JOB_LOW_BG = 34 - - // Job type - // JOB_NORMAL | JOB_BG means a normal level job run in background - // normal level - JOB_NORMAL = 0 - // background job - JOB_BG = 1 - // low level - JOB_LOW = 2 - // high level - JOB_HIGH = 4 ) // Decode [4]byte to uint32 diff --git a/example/client.go b/example/client.go index d406eef..d1ad7a2 100644 --- a/example/client.go +++ b/example/client.go @@ -3,29 +3,32 @@ package main import ( "log" "sync" - "bitbucket.org/mikespook/gearman-go" "bitbucket.org/mikespook/gearman-go/client" ) func main() { var wg sync.WaitGroup - c, err := client.New("tcp4", "127.0.0.1:4730") + c, err := client.New("127.0.0.1:4730") if err != nil { log.Fatalln(err) } defer c.Close() echo := []byte("Hello\x00 world") c.JobHandler = func(job *client.Job) error { - log.Printf("%V", job) + log.Printf("%s", job.Data) wg.Done() return nil } + c.ErrHandler = func(e error) { + log.Println(e) + panic(e) + } wg.Add(1) c.Echo(echo) - - handle, err := c.Do("ToUpper", echo, gearman.JOB_NORMAL) + wg.Add(1) + handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL) if err != nil { log.Fatalln(err) } else { @@ -38,5 +41,6 @@ func main() { } wg.Add(1) c.Status(handle) + wg.Wait() } From 2960cb99539141d2c79b9f96fdb5fcfe1f7eac48 Mon Sep 17 00:00:00 2001 From: mikespook Date: Wed, 23 May 2012 17:45:52 +0800 Subject: [PATCH 3/5] refacotring worker's code --HG-- branch : 0.1 rename : worker/jobagent.go => worker/agent.go rename : worker/workerjob.go => worker/job.go --- client/client.go | 2 +- common/gearman.go | 5 +- example/worker.go | 48 ++--------- worker/{jobagent.go => agent.go} | 54 +++++++------ worker/{workerjob.go => job.go} | 54 +++++++------ worker/worker.go | 131 +++++++++++++------------------ 6 files changed, 122 insertions(+), 172 deletions(-) rename worker/{jobagent.go => agent.go} (63%) rename worker/{workerjob.go => job.go} (51%) diff --git a/client/client.go b/client/client.go index 0f8d00a..fe3d7fe 100644 --- a/client/client.go +++ b/client/client.go @@ -45,7 +45,7 @@ type Client struct { // Eg. // client, err := client.New("127.0.0.1:4730") func New(addr string) (client *Client, err error) { - conn, err := net.Dial("tcp", addr) + conn, err := net.Dial(common.NETWORK, addr) if err != nil { return } diff --git a/common/gearman.go b/common/gearman.go index eebaec1..cc9d41e 100644 --- a/common/gearman.go +++ b/common/gearman.go @@ -6,10 +6,7 @@ package common const ( - // the number limited for job servers. - WORKER_SERVER_CAP = 32 - // the number limited for functions. - WORKER_FUNCTION_CAP = 512 + NETWORK = "tcp" // queue size QUEUE_SIZE = 512 // read buffer size diff --git a/example/worker.go b/example/worker.go index d9b4759..bd82a2f 100644 --- a/example/worker.go +++ b/example/worker.go @@ -1,61 +1,25 @@ package main import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bitbucket.org/mikespook/gearman-go/gearman/worker" - "bitbucket.org/mikespook/golib/signal" - "os" - "fmt" + "bitbucket.org/mikespook/gearman-go/worker" +// "bitbucket.org/mikespook/golib/signal" +// "os" "log" "strings" ) -func ToUpper(job *worker.WorkerJob) ([]byte, error) { +func ToUpper(job *worker.Job) ([]byte, error) { data := []byte(strings.ToUpper(string(job.Data))) return data, nil } func main() { - w := worker.New(worker.Unlimit) - w.ErrFunc = func(e error) { + w := worker.New(worker.Unlimited) + w.ErrHandler = func(e error) { log.Println(e) } w.AddServer("127.0.0.1:4730") w.AddFunction("ToUpper", ToUpper, 0) w.AddFunction("ToUpperTimeOut5", ToUpper, 5) - - // Catch the interrupt to exit the working loop. - sh := signal.NewHandler() - sh.Bind(os.Interrupt, func() bool { - w.Close() - return true - }) - go sh.Loop() - - go func() { - log.Println("start worker") - for { - print("cmd: ") - var str string - fmt.Scan(&str) - switch str { - case "echo": - w.Echo([]byte("Hello world!")) - var job *worker.WorkerJob - for job = <-w.JobQueue; job.DataType != gearman.ECHO_RES; job = <-w.JobQueue { - log.Println(job) - } - log.Println(string(job.Data)) - case "quit": - os.Exit(0) - return - case "result": - job := <-w.JobQueue - log.Println(string(job.Data)) - default: - log.Println("Unknown command") - } - } - }() w.Work() } diff --git a/worker/jobagent.go b/worker/agent.go similarity index 63% rename from worker/jobagent.go rename to worker/agent.go index 5456e94..9efc780 100644 --- a/worker/jobagent.go +++ b/worker/agent.go @@ -5,9 +5,9 @@ package worker import ( - gearman "bitbucket.org/mikespook/gearman-go" "io" "net" + "bitbucket.org/mikespook/gearman-go/common" ) // The agent of job server. @@ -15,27 +15,33 @@ type jobAgent struct { conn net.Conn worker *Worker running bool - incoming chan []byte + in chan []byte + out chan *Job } // Create the agent of job server. func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err error) { - conn, err := net.Dial(gearman.TCP, addr) + conn, err := net.Dial(common.NETWORK, addr) if err != nil { return nil, err } - jobagent = &jobAgent{conn: conn, worker: worker, running: true, incoming: make(chan []byte, gearman.QUEUE_CAP)} + jobagent = &jobAgent{ + conn: conn, + worker: worker, + running: true, + in: make(chan []byte, common.QUEUE_SIZE), + } return jobagent, err } // Internal read func (agent *jobAgent) read() (data []byte, err error) { - if len(agent.incoming) > 0 { - // incoming queue is not empty - data = <-agent.incoming + if len(agent.in) > 0 { + // in queue is not empty + data = <-agent.in } else { for { - buf := make([]byte, gearman.BUFFER_SIZE) + buf := make([]byte, common.BUFFER_SIZE) var n int if n, err = agent.conn.Read(buf); err != nil { if err == io.EOF && n == 0 { @@ -45,7 +51,7 @@ func (agent *jobAgent) read() (data []byte, err error) { return } data = append(data, buf[0:n]...) - if n < gearman.BUFFER_SIZE { + if n < common.BUFFER_SIZE { break } } @@ -54,13 +60,13 @@ func (agent *jobAgent) read() (data []byte, err error) { start := 0 tl := len(data) for i := 0; i < tl; i++ { - if string(data[start:start+4]) == gearman.RES_STR { - l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + if string(data[start:start+4]) == common.RES_STR { + l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) total := l + 12 if total == tl { return } else { - agent.incoming <- data[total:] + agent.in <- data[total:] data = data[:total] return } @@ -68,7 +74,7 @@ func (agent *jobAgent) read() (data []byte, err error) { start++ } } - err = gearman.ErrInvalidData + err = common.ErrInvalidData return } @@ -76,29 +82,29 @@ func (agent *jobAgent) read() (data []byte, err error) { func (agent *jobAgent) Work() { noop := true for agent.running { - // got noop msg and incoming queue is zero, grab job - if noop && len(agent.incoming) == 0 { - agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.GRAB_JOB, nil)) + // got noop msg and in queue is zero, grab job + if noop && len(agent.in) == 0 { + agent.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) } rel, err := agent.read() if err != nil { agent.worker.err(err) continue } - job, err := DecodeWorkerJob(rel) + job, err := decodeJob(rel) if err != nil { agent.worker.err(err) continue } else { switch job.DataType { - case gearman.NOOP: + case common.NOOP: noop = true - case gearman.NO_JOB: + case common.NO_JOB: noop = false - agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.PRE_SLEEP, nil)) - case gearman.ECHO_RES, gearman.JOB_ASSIGN_UNIQ, gearman.JOB_ASSIGN: + agent.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) + case common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: job.agent = agent - agent.worker.incoming <- job + agent.worker.in <- job } } } @@ -106,7 +112,7 @@ func (agent *jobAgent) Work() { } // Send a job to the job server. -func (agent *jobAgent) WriteJob(job *WorkerJob) (err error) { +func (agent *jobAgent) WriteJob(job *Job) (err error) { return agent.write(job.Encode()) } @@ -125,7 +131,7 @@ func (agent *jobAgent) write(buf []byte) (err error) { // Close. func (agent *jobAgent) Close() (err error) { agent.running = false - close(agent.incoming) + close(agent.in) err = agent.conn.Close() return } diff --git a/worker/workerjob.go b/worker/job.go similarity index 51% rename from worker/workerjob.go rename to worker/job.go index 1f60e12..267ca3a 100644 --- a/worker/workerjob.go +++ b/worker/job.go @@ -5,12 +5,12 @@ package worker import ( - gearman "bitbucket.org/mikespook/gearman-go" "strconv" + "bitbucket.org/mikespook/gearman-go/common" ) // Worker side job -type WorkerJob struct { +type Job struct { Data []byte Handle, UniqueId string agent *jobAgent @@ -18,70 +18,74 @@ type WorkerJob struct { } // Create a new job -func NewWorkerJob(magiccode, datatype uint32, data []byte) (job *WorkerJob) { - return &WorkerJob{magicCode: magiccode, +func newJob(magiccode, datatype uint32, data []byte) (job *Job) { + return &Job{magicCode: magiccode, DataType: datatype, Data: data} } // Decode job from byte slice -func DecodeWorkerJob(data []byte) (job *WorkerJob, err error) { +func decodeJob(data []byte) (job *Job, err error) { if len(data) < 12 { - err = gearman.ErrInvalidData + err = common.ErrInvalidData return } - datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) + datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) if len(data[12:]) != int(l) { - err = gearman.ErrInvalidData + err = common.ErrInvalidData return } data = data[12:] - job = NewWorkerJob(gearman.RES, datatype, data) + job = newJob(common.RES, datatype, data) return } // Encode a job to byte slice -func (job *WorkerJob) Encode() (data []byte) { - magiccode := gearman.Uint32ToBytes(job.magicCode) - datatype := gearman.Uint32ToBytes(job.DataType) - data = make([]byte, 0, 1024*64) +func (job *Job) Encode() (data []byte) { + l := len(job.Data) + tl := l + 12 + if job.Handle != "" { + tl += len(job.Handle) + 1 + } + data = make([]byte, 0, tl) + + magiccode := common.Uint32ToBytes(job.magicCode) + datatype := common.Uint32ToBytes(job.DataType) + datalength := common.Uint32ToBytes(uint32(tl)) + data = append(data, magiccode[:]...) data = append(data, datatype[:]...) - data = append(data, []byte{0, 0, 0, 0}...) - l := len(job.Data) + data = append(data, datalength[:]...) if job.Handle != "" { data = append(data, []byte(job.Handle)...) data = append(data, 0) - l += len(job.Handle) + 1 } data = append(data, job.Data...) - datalength := gearman.Uint32ToBytes(uint32(l)) - copy(data[8:12], datalength[:]) return } // Send some datas to client. // Using this in a job's executing. -func (job *WorkerJob) UpdateData(data []byte, iswaring bool) (err error) { +func (job *Job) UpdateData(data []byte, iswaring bool) (err error) { result := append([]byte(job.Handle), 0) result = append(result, data...) var datatype uint32 if iswaring { - datatype = gearman.WORK_WARNING + datatype = common.WORK_WARNING } else { - datatype = gearman.WORK_DATA + datatype = common.WORK_DATA } - return job.agent.WriteJob(NewWorkerJob(gearman.REQ, datatype, result)) + return job.agent.WriteJob(newJob(common.REQ, datatype, result)) } // Update status. // Tall client how many percent job has been executed. -func (job *WorkerJob) UpdateStatus(numerator, denominator int) (err error) { +func (job *Job) UpdateStatus(numerator, denominator int) (err error) { n := []byte(strconv.Itoa(numerator)) d := []byte(strconv.Itoa(denominator)) result := append([]byte(job.Handle), 0) result = append(result, n...) result = append(result, d...) - return job.agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.WORK_STATUS, result)) + return job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) } diff --git a/worker/worker.go b/worker/worker.go index 9a61937..3f1c38c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,29 +5,26 @@ package worker import ( - gearman "bitbucket.org/mikespook/gearman-go" "bytes" - "sync" + "bitbucket.org/mikespook/gearman-go/common" ) const ( - Unlimit = 0 + Unlimited = 0 OneByOne = 1 ) // The definition of the callback function. -type JobFunction func(job *WorkerJob) ([]byte, error) +type JobFunc func(job *Job) ([]byte, error) // Map for added function. -type JobFunctionMap map[string]JobFunction +type JobFuncs map[string]JobFunc -// Error Function -type ErrFunc func(e error) /* -Worker side api for gearman. +Worker side api for gearman usage: - w = worker.New(worker.Unlimit) + w = worker.New(worker.Unlimited) w.AddFunction("foobar", foobar) w.AddServer("127.0.0.1:4730") w.Work() // Enter the worker's main loop @@ -35,51 +32,49 @@ usage: The definition of the callback function 'foobar' should suit for the type 'JobFunction'. It looks like this: -func foobar(job *WorkerJob) (data []byte, err os.Error) { +func foobar(job *Job) (data []byte, err os.Error) { //sth. here //plaplapla... return } */ type Worker struct { - clients []*jobAgent - functions JobFunctionMap - running bool - incoming chan *WorkerJob - mutex sync.Mutex + agents []*jobAgent + funcs JobFuncs + in chan *Job + out chan *Job + running bool limit chan bool - JobQueue chan *WorkerJob - + Id string // assign a ErrFunc to handle errors - // Must assign befor AddServer - ErrFunc ErrFunc + ErrHandler common.ErrorHandler } // Get a new worker func New(l int) (worker *Worker) { worker = &Worker{ - // job server list - clients: make([]*jobAgent, 0, gearman.WORKER_SERVER_CAP), - // function list + agents: make([]*jobAgent, 0), functions: make(JobFunctionMap), - incoming: make(chan *WorkerJob, gearman.QUEUE_CAP), - JobQueue: make(chan *WorkerJob, gearman.QUEUE_CAP), + + in: make(chan *Job, common.QUEUE_SIZE), + out: make(chan *Job, common.QUEUE_SIZE), running: true, } - if l != Unlimit { + if l != Unlimited { worker.limit = make(chan bool, l) for i := 0; i < l; i ++ { worker.limit <- true } } + go worker.outLoop() return } // func (worker *Worker)err(e error) { - if worker.ErrFunc != nil { - worker.ErrFunc(e) + if worker.ErrHandler != nil { + worker.ErrHandler(e) } } @@ -90,7 +85,7 @@ func (worker *Worker) AddServer(addr string) (err error) { defer worker.mutex.Unlock() if len(worker.clients) == cap(worker.clients) { - return gearman.ErrOutOfCap + return common.ErrOutOfCap } // Create a new job server's client as a agent of server @@ -109,9 +104,9 @@ func (worker *Worker) AddServer(addr string) (err error) { // Plz added job servers first, then functions. // The API will tell every connected job server that 'I can do this' func (worker *Worker) AddFunction(funcname string, - f JobFunction, timeout uint32) (err error) { + f JobFunc, timeout uint32) (err error) { if len(worker.clients) < 1 { - return gearman.ErrNotConn + return common.ErrNotConn } worker.mutex.Lock() defer worker.mutex.Unlock() @@ -120,15 +115,15 @@ func (worker *Worker) AddFunction(funcname string, var datatype uint32 var data []byte if timeout == 0 { - datatype = gearman.CAN_DO + datatype = common.CAN_DO data = []byte(funcname) } else { - datatype = gearman.CAN_DO_TIMEOUT + datatype = common.CAN_DO_TIMEOUT data = []byte(funcname + "\x00") - t := gearman.Uint32ToBytes(timeout) + t := common.Uint32ToBytes(timeout) data = append(data, t[:]...) } - job := NewWorkerJob(gearman.REQ, datatype, data) + job := newJob(common.REQ, datatype, data) worker.WriteJob(job) return } @@ -140,10 +135,10 @@ func (worker *Worker) RemoveFunction(funcname string) (err error) { defer worker.mutex.Unlock() if worker.functions[funcname] == nil { - return gearman.ErrFuncNotFound + return common.ErrFuncNotFound } delete(worker.functions, funcname) - job := NewWorkerJob(gearman.REQ, gearman.CANT_DO, []byte(funcname)) + job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) worker.WriteJob(job) return } @@ -153,19 +148,19 @@ func (worker *Worker) Work() { for _, v := range worker.clients { go v.Work() } - for worker.running || len(worker.incoming) > 0{ + for worker.running || len(worker.in) > 0{ select { - case job := <-worker.incoming: + case job := <-worker.in: if job == nil { break } switch job.DataType { - case gearman.NO_JOB: + case common.NO_JOB: // do nothing - case gearman.ERROR: - _, err := gearman.GetError(job.Data) + case common.ERROR: + _, err := common.GetError(job.Data) worker.err(err) - case gearman.JOB_ASSIGN, gearman.JOB_ASSIGN_UNIQ: + case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: go func() { if err := worker.exec(job); err != nil { worker.err(err) @@ -176,23 +171,7 @@ func (worker *Worker) Work() { } } } - close(worker.incoming) -} - -// Get the last job in queue. -// If there are more than one job in the queue, -// the last one will be returned, -// the others will be lost. -func (worker *Worker) LastJob() (job *WorkerJob) { - if l := len(worker.JobQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l-1; i++ { - <-worker.JobQueue - } - } - return <-worker.JobQueue + close(worker.in) } // Close. @@ -207,26 +186,26 @@ func (worker *Worker) Close() (err error) { // Write a job to job server. // Here, the job's mean is not the oraginal mean. // Just looks like a network package for job's result or tell job server, there was a fail. -func (worker *Worker) WriteJob(job *WorkerJob) (err error) { - e := make(chan error) - for _, v := range worker.clients { +func (worker *Worker) Broadcast(job *Job) { + for _, v := range worker.agents { go func() { - e <- v.WriteJob(job) + if err := v.WriteJob(job); err != nil { + worker.err(err) + } }() } - return <-e } // Send a something out, get the samething back. func (worker *Worker) Echo(data []byte) (err error) { - job := NewWorkerJob(gearman.REQ, gearman.ECHO_REQ, data) + job := newJob(common.REQ, common.ECHO_REQ, data) return worker.WriteJob(job) } // Remove all of functions. // Both from the worker or job servers. func (worker *Worker) Reset() (err error) { - job := NewWorkerJob(gearman.REQ, gearman.RESET_ABILITIES, nil) + job := newJob(common.REQ, common.RESET_ABILITIES, nil) err = worker.WriteJob(job) worker.functions = make(JobFunctionMap) return @@ -234,20 +213,20 @@ func (worker *Worker) Reset() (err error) { // Set the worker's unique id. func (worker *Worker) SetId(id string) (err error) { - job := NewWorkerJob(gearman.REQ, gearman.SET_CLIENT_ID, []byte(id)) + job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) return worker.WriteJob(job) } // Execute the job. And send back the result. -func (worker *Worker) exec(job *WorkerJob) (err error) { +func (worker *Worker) exec(job *Job) (err error) { if worker.limit != nil { - <- worker.limit + <-worker.limit defer func() { worker.limit <- true }() } var limit int - if job.DataType == gearman.JOB_ASSIGN { + if job.DataType == common.JOB_ASSIGN { limit = 3 } else { limit = 4 @@ -255,7 +234,7 @@ func (worker *Worker) exec(job *WorkerJob) (err error) { jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit) job.Handle = string(jobdata[0]) funcname := string(jobdata[1]) - if job.DataType == gearman.JOB_ASSIGN { + if job.DataType == common.JOB_ASSIGN { job.Data = jobdata[2] } else { job.UniqueId = string(jobdata[2]) @@ -263,21 +242,21 @@ func (worker *Worker) exec(job *WorkerJob) (err error) { } f, ok := worker.functions[funcname] if !ok { - return gearman.ErrFuncNotFound + return common.ErrFuncNotFound } result, err := f(job) var datatype uint32 if err == nil { - datatype = gearman.WORK_COMPLETE + datatype = common.WORK_COMPLETE } else { if result == nil { - datatype = gearman.WORK_FAIL + datatype = common.WORK_FAIL } else { - datatype = gearman.WORK_EXCEPTION + datatype = common.WORK_EXCEPTION } } - job.magicCode = gearman.REQ + job.magicCode = common.REQ job.DataType = datatype job.Data = result From d6a9025a56d301ac6bc66a28a1f4e8b3193871ca Mon Sep 17 00:00:00 2001 From: mikespook Date: Thu, 24 May 2012 16:49:35 +0800 Subject: [PATCH 4/5] The worker can be working now. --HG-- branch : 0.1 --- example/worker.go | 24 ++++-- worker/agent.go | 152 +++++++++++++++++++---------------- worker/job.go | 23 +++--- worker/worker.go | 196 +++++++++++++++++++++++++--------------------- 4 files changed, 222 insertions(+), 173 deletions(-) diff --git a/example/worker.go b/example/worker.go index bd82a2f..9a19338 100644 --- a/example/worker.go +++ b/example/worker.go @@ -1,25 +1,37 @@ package main import ( - "bitbucket.org/mikespook/gearman-go/worker" -// "bitbucket.org/mikespook/golib/signal" -// "os" + "os" "log" "strings" + "bitbucket.org/mikespook/golib/signal" + "bitbucket.org/mikespook/gearman-go/worker" ) func ToUpper(job *worker.Job) ([]byte, error) { + log.Printf("Handle=[%s]; UID=[%s], Data=[%s]\n", + job.Handle, job.UniqueId, job.Data) data := []byte(strings.ToUpper(string(job.Data))) return data, nil } func main() { + log.Println("Starting ...") + defer log.Println("Shutdown complete!") w := worker.New(worker.Unlimited) w.ErrHandler = func(e error) { log.Println(e) } + w.JobHandler = func(job *worker.Job) error { + log.Printf("H=%s, UID=%s, Data=%s\n", job.Handle, + job.UniqueId, job.Data) + return nil + } w.AddServer("127.0.0.1:4730") - w.AddFunction("ToUpper", ToUpper, 0) - w.AddFunction("ToUpperTimeOut5", ToUpper, 5) - w.Work() + w.AddFunc("ToUpper", ToUpper, 0) + //w.AddFunc("ToUpperTimeOut5", ToUpper, 5) + go w.Work() + sh := signal.NewHandler() + sh.Bind(os.Interrupt, func() bool {return true}) + sh.Loop() } diff --git a/worker/agent.go b/worker/agent.go index 9efc780..ceef56a 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -11,42 +11,101 @@ import ( ) // The agent of job server. -type jobAgent struct { - conn net.Conn - worker *Worker - running bool - in chan []byte - out chan *Job +type agent struct { + conn net.Conn + worker *Worker + in chan []byte + out chan *Job } // Create the agent of job server. -func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err error) { +func newAgent(addr string, worker *Worker) (a *agent, err error) { conn, err := net.Dial(common.NETWORK, addr) if err != nil { - return nil, err + return } - jobagent = &jobAgent{ + a = &agent{ conn: conn, worker: worker, - running: true, in: make(chan []byte, common.QUEUE_SIZE), + out: make(chan *Job, common.QUEUE_SIZE), } - return jobagent, err + return +} + +// outputing loop +func (a *agent) outLoop() { + ok := true + for ok { + if job, ok := <-a.out; ok { + if err := a.write(job.Encode()); err != nil { + a.worker.err(err) + } + } + } +} + +// inputing loop +func (a *agent) inLoop() { + defer func() { + a.conn.Close() + close(a.in) + close(a.out) + a.worker.removeAgent(a) + }() + noop := true + for a.worker.running { + // got noop msg and in queue is zero, grab job + if noop && len(a.in) == 0 { + a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) + } + rel, err := a.read() + if err != nil { + if err == common.ErrEmptyReading { + break + } + a.worker.err(err) + continue + } + job, err := decodeJob(rel) + if err != nil { + a.worker.err(err) + continue + } + switch job.DataType { + case common.NOOP: + noop = true + case common.NO_JOB: + noop = false + a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) + case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: + job.agent = a + a.worker.in <- job + } + } +} + +func (a *agent) Work() { + go a.outLoop() + go a.inLoop() } // Internal read -func (agent *jobAgent) read() (data []byte, err error) { - if len(agent.in) > 0 { +func (a *agent) read() (data []byte, err error) { + if len(a.in) > 0 { // in queue is not empty - data = <-agent.in + data = <-a.in } else { for { buf := make([]byte, common.BUFFER_SIZE) var n int - if n, err = agent.conn.Read(buf); err != nil { + if n, err = a.conn.Read(buf); err != nil { if err == io.EOF && n == 0 { - err = nil - return + if data == nil { + err = common.ErrEmptyReading + return + } + break } return } @@ -57,16 +116,17 @@ func (agent *jobAgent) read() (data []byte, err error) { } } // split package - start := 0 tl := len(data) + start := 0 for i := 0; i < tl; i++ { if string(data[start:start+4]) == common.RES_STR { - l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + l := int(common.BytesToUint32([4]byte{data[start+8], + data[start+9], data[start+10], data[start+11]})) total := l + 12 if total == tl { return } else { - agent.in <- data[total:] + a.in <- data[total:] data = data[:total] return } @@ -74,64 +134,22 @@ func (agent *jobAgent) read() (data []byte, err error) { start++ } } - err = common.ErrInvalidData - return -} - -// Main loop. -func (agent *jobAgent) Work() { - noop := true - for agent.running { - // got noop msg and in queue is zero, grab job - if noop && len(agent.in) == 0 { - agent.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) - } - rel, err := agent.read() - if err != nil { - agent.worker.err(err) - continue - } - job, err := decodeJob(rel) - if err != nil { - agent.worker.err(err) - continue - } else { - switch job.DataType { - case common.NOOP: - noop = true - case common.NO_JOB: - noop = false - agent.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) - case common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: - job.agent = agent - agent.worker.in <- job - } - } - } - return + return nil, common.Errorf("Invalid data: %V", data) } // Send a job to the job server. -func (agent *jobAgent) WriteJob(job *Job) (err error) { - return agent.write(job.Encode()) +func (a *agent) WriteJob(job *Job) { + a.out <- job } // Internal write the encoded job. -func (agent *jobAgent) write(buf []byte) (err error) { +func (a *agent) write(buf []byte) (err error) { var n int for i := 0; i < len(buf); i += n { - n, err = agent.conn.Write(buf[i:]) + n, err = a.conn.Write(buf[i:]) if err != nil { return err } } return } - -// Close. -func (agent *jobAgent) Close() (err error) { - agent.running = false - close(agent.in) - err = agent.conn.Close() - return -} diff --git a/worker/job.go b/worker/job.go index 267ca3a..054fd30 100644 --- a/worker/job.go +++ b/worker/job.go @@ -1,4 +1,5 @@ -// Copyright 2011 Xing Xing All rights reserved. +// Copyright 2011 Xing Xing +// All rights reserved. // Use of this source code is governed by a MIT // license that can be found in the LICENSE file. @@ -13,7 +14,7 @@ import ( type Job struct { Data []byte Handle, UniqueId string - agent *jobAgent + agent *agent magicCode, DataType uint32 } @@ -27,14 +28,12 @@ func newJob(magiccode, datatype uint32, data []byte) (job *Job) { // Decode job from byte slice func decodeJob(data []byte) (job *Job, err error) { if len(data) < 12 { - err = common.ErrInvalidData - return + return nil, common.Errorf("Invalid data: %V", data) } datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) if len(data[12:]) != int(l) { - err = common.ErrInvalidData - return + return nil, common.Errorf("Invalid data: %V", data) } data = data[12:] job = newJob(common.RES, datatype, data) @@ -44,11 +43,11 @@ func decodeJob(data []byte) (job *Job, err error) { // Encode a job to byte slice func (job *Job) Encode() (data []byte) { l := len(job.Data) - tl := l + 12 + tl := l if job.Handle != "" { tl += len(job.Handle) + 1 } - data = make([]byte, 0, tl) + data = make([]byte, 0, tl + 12) magiccode := common.Uint32ToBytes(job.magicCode) datatype := common.Uint32ToBytes(job.DataType) @@ -67,7 +66,7 @@ func (job *Job) Encode() (data []byte) { // Send some datas to client. // Using this in a job's executing. -func (job *Job) UpdateData(data []byte, iswaring bool) (err error) { +func (job *Job) UpdateData(data []byte, iswaring bool) { result := append([]byte(job.Handle), 0) result = append(result, data...) var datatype uint32 @@ -76,16 +75,16 @@ func (job *Job) UpdateData(data []byte, iswaring bool) (err error) { } else { datatype = common.WORK_DATA } - return job.agent.WriteJob(newJob(common.REQ, datatype, result)) + job.agent.WriteJob(newJob(common.REQ, datatype, result)) } // Update status. // Tall client how many percent job has been executed. -func (job *Job) UpdateStatus(numerator, denominator int) (err error) { +func (job *Job) UpdateStatus(numerator, denominator int) { n := []byte(strconv.Itoa(numerator)) d := []byte(strconv.Itoa(denominator)) result := append([]byte(job.Handle), 0) result = append(result, n...) result = append(result, d...) - return job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) + job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) } diff --git a/worker/worker.go b/worker/worker.go index 3f1c38c..667e16e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -13,21 +13,28 @@ const ( Unlimited = 0 OneByOne = 1 ) +// Job handler +type JobHandler func(*Job) error -// The definition of the callback function. type JobFunc func(job *Job) ([]byte, error) +// The definition of the callback function. +type jobFunc struct { + f JobFunc + timeout uint32 +} + // Map for added function. -type JobFuncs map[string]JobFunc +type JobFuncs map[string]*jobFunc /* Worker side api for gearman usage: - w = worker.New(worker.Unlimited) - w.AddFunction("foobar", foobar) - w.AddServer("127.0.0.1:4730") - w.Work() // Enter the worker's main loop +w = worker.New(worker.Unlimited) +w.AddFunction("foobar", foobar) +w.AddServer("127.0.0.1:4730") +w.Work() // Enter the worker's main loop The definition of the callback function 'foobar' should suit for the type 'JobFunction'. It looks like this: @@ -39,27 +46,24 @@ func foobar(job *Job) (data []byte, err os.Error) { } */ type Worker struct { - agents []*jobAgent + agents []*agent funcs JobFuncs in chan *Job - out chan *Job running bool limit chan bool Id string // assign a ErrFunc to handle errors ErrHandler common.ErrorHandler + JobHandler JobHandler } // Get a new worker func New(l int) (worker *Worker) { worker = &Worker{ - agents: make([]*jobAgent, 0), - functions: make(JobFunctionMap), - + agents: make([]*agent, 0), + funcs: make(JobFuncs), in: make(chan *Job, common.QUEUE_SIZE), - out: make(chan *Job, common.QUEUE_SIZE), - running: true, } if l != Unlimited { worker.limit = make(chan bool, l) @@ -67,7 +71,6 @@ func New(l int) (worker *Worker) { worker.limit <- true } } - go worker.outLoop() return } @@ -81,37 +84,42 @@ func (worker *Worker)err(e error) { // Add a server. The addr should be 'host:port' format. // The connection is established at this time. func (worker *Worker) AddServer(addr string) (err error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() - - if len(worker.clients) == cap(worker.clients) { - return common.ErrOutOfCap - } - // Create a new job server's client as a agent of server - server, err := newJobAgent(addr, worker) + server, err := newAgent(addr, worker) if err != nil { return err } - - n := len(worker.clients) - worker.clients = worker.clients[0 : n+1] - worker.clients[n] = server + worker.agents = append(worker.agents, server) return } +// Write a job to job server. +// Here, the job's mean is not the oraginal mean. +// Just looks like a network package for job's result or tell job server, there was a fail. +func (worker *Worker) broadcast(job *Job) { + for _, v := range worker.agents { + v.WriteJob(job) + } +} + // Add a function. // Plz added job servers first, then functions. // The API will tell every connected job server that 'I can do this' -func (worker *Worker) AddFunction(funcname string, - f JobFunc, timeout uint32) (err error) { - if len(worker.clients) < 1 { - return common.ErrNotConn +func (worker *Worker) AddFunc(funcname string, +f JobFunc, timeout uint32) (err error) { + if _, ok := worker.funcs[funcname]; ok { + return common.Errorf("The function already exists: %s", funcname) } - worker.mutex.Lock() - defer worker.mutex.Unlock() - worker.functions[funcname] = f + worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout} + if worker.running { + worker.addFunc(funcname, timeout) + } + return +} + +// inner add function +func (worker *Worker) addFunc(funcname string, timeout uint32) { var datatype uint32 var data []byte if timeout == 0 { @@ -124,42 +132,50 @@ func (worker *Worker) AddFunction(funcname string, data = append(data, t[:]...) } job := newJob(common.REQ, datatype, data) - worker.WriteJob(job) - return + worker.broadcast(job) + } // Remove a function. // Tell job servers 'I can not do this now' at the same time. -func (worker *Worker) RemoveFunction(funcname string) (err error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() - - if worker.functions[funcname] == nil { - return common.ErrFuncNotFound +func (worker *Worker) RemoveFunc(funcname string) (err error) { + if _, ok := worker.funcs[funcname]; !ok { + return common.Errorf("The function does not exist: %s", funcname) + } + delete(worker.funcs, funcname) + if worker.running { + worker.removeFunc(funcname) } - delete(worker.functions, funcname) - job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) - worker.WriteJob(job) return } +// inner remove function +func (worker *Worker) removeFunc(funcname string) { + job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) + worker.broadcast(job) +} + // Main loop func (worker *Worker) Work() { - for _, v := range worker.clients { + defer func() { + worker.running = false + }() + for funcname, f := range worker.funcs { + worker.addFunc(funcname, f.timeout) + } + worker.running = true + for _, v := range worker.agents { go v.Work() } - for worker.running || len(worker.in) > 0{ - select { - case job := <-worker.in: - if job == nil { - break - } + ok := true + for ok { + if job, ok := <-worker.in; ok { switch job.DataType { - case common.NO_JOB: - // do nothing case common.ERROR: - _, err := common.GetError(job.Data) - worker.err(err) + go func() { + _, err := common.GetError(job.Data) + worker.err(err) + }() case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: go func() { if err := worker.exec(job); err != nil { @@ -167,54 +183,48 @@ func (worker *Worker) Work() { } }() default: - worker.JobQueue <- job + go worker.handleJob(job) } } } - close(worker.in) +} + +// job handler +func (worker *Worker) handleJob(job *Job) { + if worker.JobHandler != nil { + if err := worker.JobHandler(job); err != nil { + worker.err(err) + } + } } // Close. -func (worker *Worker) Close() (err error) { - for _, v := range worker.clients { - err = v.Close() - } - worker.running = false - return err -} - -// Write a job to job server. -// Here, the job's mean is not the oraginal mean. -// Just looks like a network package for job's result or tell job server, there was a fail. -func (worker *Worker) Broadcast(job *Job) { - for _, v := range worker.agents { - go func() { - if err := v.WriteJob(job); err != nil { - worker.err(err) - } - }() +func (worker *Worker) Close() { + close(worker.in) + if worker.limit != nil { + close(worker.limit) } } // Send a something out, get the samething back. -func (worker *Worker) Echo(data []byte) (err error) { +func (worker *Worker) Echo(data []byte) { job := newJob(common.REQ, common.ECHO_REQ, data) - return worker.WriteJob(job) + worker.broadcast(job) } // Remove all of functions. // Both from the worker or job servers. -func (worker *Worker) Reset() (err error) { +func (worker *Worker) Reset() { job := newJob(common.REQ, common.RESET_ABILITIES, nil) - err = worker.WriteJob(job) - worker.functions = make(JobFunctionMap) - return + worker.broadcast(job) + worker.funcs = make(JobFuncs) } // Set the worker's unique id. -func (worker *Worker) SetId(id string) (err error) { +func (worker *Worker) SetId(id string) { + worker.Id = id job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) - return worker.WriteJob(job) + worker.broadcast(job) } // Execute the job. And send back the result. @@ -240,11 +250,11 @@ func (worker *Worker) exec(job *Job) (err error) { job.UniqueId = string(jobdata[2]) job.Data = jobdata[3] } - f, ok := worker.functions[funcname] + f, ok := worker.funcs[funcname] if !ok { - return common.ErrFuncNotFound + return common.Errorf("The function does not exist: %s", funcname) } - result, err := f(job) + result, err := f.f(job) var datatype uint32 if err == nil { datatype = common.WORK_COMPLETE @@ -259,7 +269,17 @@ func (worker *Worker) exec(job *Job) (err error) { job.magicCode = common.REQ job.DataType = datatype job.Data = result - - worker.WriteJob(job) + job.agent.WriteJob(job) return } + +func (worker *Worker) removeAgent(a *agent) { + for k, v := range worker.agents { + if v == a { + worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...) + } + } + if len(worker.agents) == 0 { + worker.Close() + } +} From bd2478557e2f34524533ff732f54aba2fe995b48 Mon Sep 17 00:00:00 2001 From: mikespook Date: Thu, 24 May 2012 16:56:36 +0800 Subject: [PATCH 5/5] fixing for 'go install' --HG-- branch : 0.1 --- README.md | 11 ++++++++--- gearman.go | 15 ++++----------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index cf930bd..23e11d5 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,11 @@ in the LICENSE file. This will install the client: -> $ go get bitbucket.org/mikespook/gearman-go/gearman/client +> $ go get bitbucket.org/mikespook/gearman-go/client This will install the worker: -> $ go get bitbucket.org/mikespook/gearman-go/gearman/worker +> $ go get bitbucket.org/mikespook/gearman-go/worker This will install the client and the worker automatically: @@ -46,8 +46,13 @@ This will install the client and the worker automatically: # Contacts -xingxing +Xing Xing http://mikespook.com http://twitter.com/mikespook + +# History + + * 0.1 Refactoring code, redesign the API. + * 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API. diff --git a/gearman.go b/gearman.go index 5c17914..48c134b 100644 --- a/gearman.go +++ b/gearman.go @@ -9,15 +9,8 @@ The protocol was implemented by native way. package gearman -const ( - // Job type - // JOB_NORMAL | JOB_BG means a normal level job run in background - // normal level - JOB_NORMAL = 0 - // background job - JOB_BG = 1 - // low level - JOB_LOW = 2 - // high level - JOB_HIGH = 4 +import ( + _ "bitbucket.org/mikespook/gearman-go/common" + _ "bitbucket.org/mikespook/gearman-go/client" + _ "bitbucket.org/mikespook/gearman-go/worker" )