From 124e686699691d0cd6a8e94b4ebcb3686810798e Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Fri, 30 Aug 2013 12:36:57 +0800 Subject: [PATCH] go fmt --- client/client.go | 16 +- client/error.go | 5 +- client/handler.go | 11 -- client/id.go | 2 +- client/pool_test.go | 2 +- client/response.go | 13 +- client/status.go | 4 + common/error.go | 54 ------ common/gearman.go | 86 ---------- common/gearman_test.go | 48 ------ gearman.go | 8 +- gearman_test.go | 193 +++++++++++---------- worker/agent.go | 311 +++++++++++++++++---------------- worker/common.go | 56 ++++++ worker/error.go | 45 +++++ worker/func.go | 32 ++-- worker/job.go | 134 --------------- worker/request.go | 133 +++++++++++++++ worker/response.go | 133 +++++++++++++++ worker/worker.go | 378 ++++++++++++++++++++--------------------- worker/worker_test.go | 46 ++--- 21 files changed, 876 insertions(+), 834 deletions(-) delete mode 100644 client/handler.go delete mode 100644 common/error.go delete mode 100644 common/gearman.go delete mode 100644 common/gearman_test.go create mode 100644 worker/common.go create mode 100644 worker/error.go delete mode 100644 worker/job.go create mode 100644 worker/request.go create mode 100644 worker/response.go diff --git a/client/client.go b/client/client.go index 8fde1dc..6722c20 100644 --- a/client/client.go +++ b/client/client.go @@ -20,13 +20,13 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) */ type Client struct { - net, addr, lastcall string - respHandler map[string]ResponseHandler - innerHandler map[string]ResponseHandler - in chan []byte - isConn bool - conn net.Conn - mutex sync.RWMutex + net, addr, lastcall string + respHandler map[string]ResponseHandler + innerHandler map[string]ResponseHandler + in chan []byte + isConn bool + conn net.Conn + mutex sync.RWMutex ErrorHandler ErrorHandler } @@ -265,7 +265,7 @@ func (client *Client) Status(handle string) (status *Status, err error) { wg.Add(1) client.mutex.Lock() client.lastcall = "s" + handle - client.innerHandler["s" + handle] = ResponseHandler(func(resp *Response) { + client.innerHandler["s"+handle] = ResponseHandler(func(resp *Response) { defer wg.Done() defer client.mutex.Unlock() var err error diff --git a/client/error.go b/client/error.go index 5600158..154e248 100644 --- a/client/error.go +++ b/client/error.go @@ -6,9 +6,9 @@ package client import ( - "fmt" "bytes" "errors" + "fmt" ) var ( @@ -40,3 +40,6 @@ func GetError(data []byte) (err error) { err = errors.New(fmt.Sprintf("%s: %s", rel[0], rel[1])) return } + +// Error handler +type ErrorHandler func(error) diff --git a/client/handler.go b/client/handler.go deleted file mode 100644 index 6e5fbaf..0000000 --- a/client/handler.go +++ /dev/null @@ -1,11 +0,0 @@ -package client - -// Response handler -type ResponseHandler func(*Response) - -// Error handler -type ErrorHandler func(error) - -// Status handler -// handle, known, running, numerator, denominator -type StatusHandler func(string, bool, bool, uint64, uint64) diff --git a/client/id.go b/client/id.go index b3f878b..eb9992a 100644 --- a/client/id.go +++ b/client/id.go @@ -1,9 +1,9 @@ package client import ( - "time" "strconv" "sync/atomic" + "time" ) var ( diff --git a/client/pool_test.go b/client/pool_test.go index d74e5a5..51b00cf 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -40,7 +40,7 @@ func TestPoolEcho(t *testing.T) { func TestPoolDoBg(t *testing.T) { addr, handle, err := pool.DoBg("ToUpper", - []byte("abcdef"), JOB_LOW); + []byte("abcdef"), JOB_LOW) if err != nil { t.Error(err) return diff --git a/client/response.go b/client/response.go index 89076f0..b142baf 100644 --- a/client/response.go +++ b/client/response.go @@ -6,17 +6,20 @@ package client import ( - "fmt" "bytes" - "strconv" "encoding/binary" + "fmt" + "strconv" ) +// Response handler +type ResponseHandler func(*Response) + // response type Response struct { - DataType uint32 - Data, UID []byte - Handle string + DataType uint32 + Data, UID []byte + Handle string } // Extract the Response's result. diff --git a/client/status.go b/client/status.go index 4370c33..b1f6e95 100644 --- a/client/status.go +++ b/client/status.go @@ -1,5 +1,9 @@ package client +// Status handler +// handle, known, running, numerator, denominator +type StatusHandler func(string, bool, bool, uint64, uint64) + type Status struct { Handle string Known, Running bool diff --git a/common/error.go b/common/error.go deleted file mode 100644 index fa52530..0000000 --- a/common/error.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2011 - 2012 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package common - -import ( - "fmt" - "bytes" - "errors" - "syscall" -) - -var ( - ErrJobTimeOut = errors.New("Do a job time out") - ErrInvalidData = errors.New("Invalid data") - ErrWorkWarning = errors.New("Work warning") - ErrWorkFail = errors.New("Work fail") - ErrWorkException = errors.New("Work exeption") - ErrDataType = errors.New("Invalid data type") - ErrOutOfCap = errors.New("Out of the capability") - ErrNotConn = errors.New("Did not connect to job server") - ErrFuncNotFound = errors.New("The function was not found") - ErrConnection = errors.New("Connection error") - ErrNoActiveAgent = errors.New("No active agent") - ErrTimeOut = errors.New("Executing time out") - ErrUnknown = errors.New("Unknown error") - ErrConnClosed = errors.New("Connection closed") -) -func DisablePanic() {recover()} - -// Extract the error message -func GetError(data []byte) (eno syscall.Errno, err error) { - rel := bytes.SplitN(data, []byte{'\x00'}, 2) - if len(rel) != 2 { - err = Errorf("Not a error data: %V", data) - return - } - l := len(rel[0]) - eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) - err = errors.New(string(rel[1])) - return -} - -// Get a formated error -func Errorf(format string, msg ... interface{}) error { - return errors.New(fmt.Sprintf(format, msg ... )) -} - -// An error handler -type ErrorHandler func(error) - - diff --git a/common/gearman.go b/common/gearman.go deleted file mode 100644 index 8ddad56..0000000 --- a/common/gearman.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2011 - 2012 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package common - -import ( - "bytes" - "encoding/binary" -) - -const ( - NETWORK = "tcp" - // queue size - QUEUE_SIZE = 8 - // read buffer size - BUFFER_SIZE = 1024 - // min packet length - PACKET_LEN = 12 - - // \x00REQ - REQ = 5391697 - REQ_STR = "\x00REQ" - // \x00RES - RES = 5391699 - RES_STR = "\x00RES" - - // package data type - CAN_DO = 1 - CANT_DO = 2 - RESET_ABILITIES = 3 - PRE_SLEEP = 4 - NOOP = 6 - JOB_CREATED = 8 - GRAB_JOB = 9 - NO_JOB = 10 - JOB_ASSIGN = 11 - WORK_STATUS = 12 - WORK_COMPLETE = 13 - WORK_FAIL = 14 - GET_STATUS = 15 - ECHO_REQ = 16 - ECHO_RES = 17 - ERROR = 19 - STATUS_RES = 20 - SET_CLIENT_ID = 22 - CAN_DO_TIMEOUT = 23 - WORK_EXCEPTION = 25 - WORK_DATA = 28 - WORK_WARNING = 29 - GRAB_JOB_UNIQ = 30 - JOB_ASSIGN_UNIQ = 31 - - SUBMIT_JOB = 7 - SUBMIT_JOB_BG = 18 - SUBMIT_JOB_HIGH = 21 - SUBMIT_JOB_HIGH_BG = 32 - SUBMIT_JOB_LOW = 33 - SUBMIT_JOB_LOW_BG = 34 -) - -// Decode [4]byte to uint32 -func BytesToUint32(buf [4]byte) uint32 { - var r uint32 - b := bytes.NewBuffer(buf[:]) - err := binary.Read(b, binary.BigEndian, &r) - if err != nil { - return 0 - } - return r -} - -// Encode uint32 to [4]byte -func Uint32ToBytes(i uint32) [4]byte { - buf := new(bytes.Buffer) - err := binary.Write(buf, binary.BigEndian, i) - if err != nil { - return [4]byte{0, 0, 0, 0} - } - var r [4]byte - for k, v := range buf.Bytes() { - r[k] = v - } - return r -} diff --git a/common/gearman_test.go b/common/gearman_test.go deleted file mode 100644 index 098076c..0000000 --- a/common/gearman_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package common - -import ( - "bytes" - "testing" -) - -var ( - testCase = map[uint32][4]byte { - 0: [...]byte{0, 0, 0, 0}, - 1: [...]byte{0, 0, 0, 1}, - 256: [...]byte{0, 0, 1, 0}, - 256 * 256: [...]byte{0, 1, 0, 0}, - 256 * 256 * 256: [...]byte{1, 0, 0, 0}, - 256 * 256 * 256 + 256 * 256 + 256 + 1: [...]byte{1, 1, 1, 1}, - 4294967295 : [...]byte{0xFF, 0xFF, 0xFF, 0xFF}, - } -) - -func TestUint32ToBytes(t *testing.T) { - for k, v := range testCase { - b := Uint32ToBytes(k) - if bytes.Compare(b[:], v[:]) != 0 { - t.Errorf("%v was expected, but %v was got", v, b) - } - } -} - -func TestBytesToUint32s(t *testing.T) { - for k, v := range testCase { - u := BytesToUint32([4]byte(v)) - if u != k { - t.Errorf("%v was expected, but %v was got", k, u) - } - } -} - -func BenchmarkByteToUnit32(b * testing.B) { - for i := 0; i < b.N; i++ { - BytesToUint32([4]byte{0xF, 0xF, 0xF, 0xF}); - } -} - -func BenchmarkUint32ToByte(b *testing.B) { - for i := 0; i < b.N; i++ { - Uint32ToBytes(123456); - } -} diff --git a/gearman.go b/gearman.go index 5dc77a7..b134cd6 100644 --- a/gearman.go +++ b/gearman.go @@ -3,14 +3,14 @@ // license that can be found in the LICENSE file. /* -This module is Gearman API for golang. +This module is Gearman API for golang. The protocol was implemented by native way. */ package gearman import ( - _ "github.com/mikespook/gearman-go/common" - _ "github.com/mikespook/gearman-go/client" - _ "github.com/mikespook/gearman-go/worker" + _ "github.com/mikespook/gearman-go/client" + _ "github.com/mikespook/gearman-go/common" + _ "github.com/mikespook/gearman-go/worker" ) diff --git a/gearman_test.go b/gearman_test.go index 58d0303..2e81af8 100644 --- a/gearman_test.go +++ b/gearman_test.go @@ -3,138 +3,137 @@ // license that can be found in the LICENSE file. /* -This module is Gearman API for golang. +This module is Gearman API for golang. The protocol was implemented by native way. */ package gearman import ( - "time" - "sync" - "testing" - "strings" - "github.com/mikespook/gearman-go/client" - "github.com/mikespook/gearman-go/worker" + "github.com/mikespook/gearman-go/client" + "github.com/mikespook/gearman-go/worker" + "strings" + "sync" + "testing" + "time" ) -const( - STR = "The gearman-go is a pure go implemented library." - GEARMAND = "127.0.0.1:4730" +const ( + STR = "The gearman-go is a pure go implemented library." + GEARMAND = "127.0.0.1:4730" ) func ToUpper(job *worker.Job) ([]byte, error) { - data := []byte(strings.ToUpper(string(job.Data))) - return data, nil + data := []byte(strings.ToUpper(string(job.Data))) + return data, nil } func Sleep(job *worker.Job) ([]byte, error) { - time.Sleep(time.Second * 5) - return nil, nil + time.Sleep(time.Second * 5) + return nil, nil } - func TestJobs(t *testing.T) { - w := worker.New(worker.Unlimited) - if err := w.AddServer(GEARMAND); err != nil { - t.Error(err) - return - } - defer w.Close() + w := worker.New(worker.Unlimited) + if err := w.AddServer(GEARMAND); err != nil { + t.Error(err) + return + } + defer w.Close() t.Log("Servers added...") - if err := w.AddFunc("ToUpper", ToUpper, 0); err != nil { - t.Error(err) - return - } - if err := w.AddFunc("Sleep", Sleep, 0); err != nil { - t.Error(err) - return - } + if err := w.AddFunc("ToUpper", ToUpper, 0); err != nil { + t.Error(err) + return + } + if err := w.AddFunc("Sleep", Sleep, 0); err != nil { + t.Error(err) + return + } t.Log("Functions added...") - w.ErrHandler = func(e error) { - t.Error(e) - } - go w.Work() + w.ErrHandler = func(e error) { + t.Error(e) + } + go w.Work() t.Log("Worker is running...") c, err := client.New("tcp4", GEARMAND) - if err != nil { - t.Error(err) - return - } - defer c.Close() + if err != nil { + t.Error(err) + return + } + defer c.Close() - c.ErrorHandler = func(e error) { - t.Log(e) - } + c.ErrorHandler = func(e error) { + t.Log(e) + } - { - var w sync.WaitGroup - jobHandler := func(job *client.Response) { - upper := strings.ToUpper(STR) - if (string(job.Data) != upper) { - t.Errorf("%s expected, got %s", upper, job.Data) - } - w.Done() - } + { + var w sync.WaitGroup + jobHandler := func(job *client.Response) { + upper := strings.ToUpper(STR) + if string(job.Data) != upper { + t.Errorf("%s expected, got %s", upper, job.Data) + } + w.Done() + } - w.Add(1) - handle, err := c.Do("ToUpper", []byte(STR), client.JOB_NORMAL, jobHandler) + w.Add(1) + handle, err := c.Do("ToUpper", []byte(STR), client.JOB_NORMAL, jobHandler) if err != nil { t.Error(err) return } - w.Wait() - status, err := c.Status(handle) - if err != nil { - t.Error(err) - return - } - if status.Known { - t.Errorf("%s shouldn't be known", status.Handle) - return - } - - if status.Running { - t.Errorf("%s shouldn't be running", status.Handle) - } - } - { - handle, err := c.DoBg("Sleep", nil, client.JOB_NORMAL) + w.Wait() + status, err := c.Status(handle) if err != nil { t.Error(err) return } - time.Sleep(time.Second) - status, err := c.Status(handle) - if err != nil { - t.Error(err) - return - } + if status.Known { + t.Errorf("%s shouldn't be known", status.Handle) + return + } - if !status.Known { - t.Errorf("%s should be known", status.Handle) - return - } + if status.Running { + t.Errorf("%s shouldn't be running", status.Handle) + } + } + { + handle, err := c.DoBg("Sleep", nil, client.JOB_NORMAL) + if err != nil { + t.Error(err) + return + } + time.Sleep(time.Second) + status, err := c.Status(handle) + if err != nil { + t.Error(err) + return + } - if !status.Running { - t.Errorf("%s should be running", status.Handle) - } - } - { - status, err := c.Status("not exists handle") - if err != nil { - t.Error(err) - return - } + if !status.Known { + t.Errorf("%s should be known", status.Handle) + return + } - if status.Known { - t.Errorf("%s shouldn't be known", status.Handle) - return - } + if !status.Running { + t.Errorf("%s should be running", status.Handle) + } + } + { + status, err := c.Status("not exists handle") + if err != nil { + t.Error(err) + return + } - if status.Running { - t.Errorf("%s shouldn't be running", status.Handle) - } - } + if status.Known { + t.Errorf("%s shouldn't be known", status.Handle) + return + } + + if status.Running { + t.Errorf("%s shouldn't be running", status.Handle) + } + } } diff --git a/worker/agent.go b/worker/agent.go index fbc8c76..f19b5f2 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -5,199 +5,198 @@ package worker import ( - "io" - "net" - "github.com/mikespook/gearman-go/common" + "io" + "net" ) // The agent of job server. type agent struct { - conn net.Conn - worker *Worker - in chan []byte - out chan *Job - addr string + conn net.Conn + worker *Worker + in chan []byte + out chan *Job + addr string } // Create the agent of job server. func newAgent(addr string, worker *Worker) (a *agent, err error) { - conn, err := net.Dial(common.NETWORK, addr) - if err != nil { - return - } - a = &agent{ - conn: conn, - worker: worker, - addr: addr, - in: make(chan []byte, common.QUEUE_SIZE), - out: make(chan *Job, common.QUEUE_SIZE), - } - // reset abilities - a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil)) - return + conn, err := net.Dial(common.NETWORK, addr) + if err != nil { + return + } + a = &agent{ + conn: conn, + worker: worker, + addr: addr, + in: make(chan []byte, common.QUEUE_SIZE), + out: make(chan *Job, common.QUEUE_SIZE), + } + // reset abilities + a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil)) + return } // outputing loop func (a *agent) outLoop() { - ok := true - var job *Job - for a.worker.running && ok { - if job, ok = <-a.out; ok { - if err := a.write(job.Encode()); err != nil { - a.worker.err(err) - } - } - } + ok := true + var job *Job + for a.worker.running && ok { + if job, ok = <-a.out; ok { + if err := a.write(job.Encode()); err != nil { + a.worker.err(err) + } + } + } } // inputing loop func (a *agent) inLoop() { - defer func() { - if r := recover(); r != nil { - a.worker.err(common.Errorf("Exiting: %s", r)) - } - close(a.in) - close(a.out) - a.worker.removeAgent(a) - }() - for a.worker.running { - a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) - RESTART: - // got noop msg and in queue is zero, grab job - rel, err := a.read() - if err != nil { - if err == common.ErrConnection { - for i := 0; i < 3 && a.worker.running; i++ { - if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { - a.worker.err(common.Errorf("Reconnection: %d faild", i)) - continue - } else { - a.conn = conn - goto RESTART - } - } - a.worker.err(err) - break - } - a.worker.err(err) - continue - } - job, err := decodeJob(rel) - if err != nil { - a.worker.err(err) - continue - } - switch job.DataType { - case common.NOOP: - a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil)) - case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: - if a.worker.running { - if a.worker.limit != nil { - a.worker.limit <- true - } - job.agent = a - a.worker.in <- job - } - } - } + defer func() { + if r := recover(); r != nil { + a.worker.err(common.Errorf("Exiting: %s", r)) + } + close(a.in) + close(a.out) + a.worker.removeAgent(a) + }() + for a.worker.running { + a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) + RESTART: + // got noop msg and in queue is zero, grab job + rel, err := a.read() + if err != nil { + if err == common.ErrConnection { + for i := 0; i < 3 && a.worker.running; i++ { + if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { + a.worker.err(common.Errorf("Reconnection: %d faild", i)) + continue + } else { + a.conn = conn + goto RESTART + } + } + a.worker.err(err) + break + } + a.worker.err(err) + continue + } + job, err := decodeJob(rel) + if err != nil { + a.worker.err(err) + continue + } + switch job.DataType { + case common.NOOP: + a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil)) + case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: + if a.worker.running { + if a.worker.limit != nil { + a.worker.limit <- true + } + job.agent = a + a.worker.in <- job + } + } + } } func (a *agent) Close() { - a.conn.Close() + a.conn.Close() } func (a *agent) Work() { - go a.outLoop() - go a.inLoop() + go a.outLoop() + go a.inLoop() } func (a *agent) readData(length int) (data []byte, err error) { - n := 0 - buf := make([]byte, common.BUFFER_SIZE) - // read until data can be unpacked - for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n { - if n, err = a.conn.Read(buf); err != nil { - if err == io.EOF && n == 0 { - if data == nil { - err = common.ErrConnection - return - } - return data, nil - } - return - } - data = append(data, buf[0:n]...) - if n < common.BUFFER_SIZE { - break - } - } - return + n := 0 + buf := make([]byte, common.BUFFER_SIZE) + // read until data can be unpacked + for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n { + if n, err = a.conn.Read(buf); err != nil { + if err == io.EOF && n == 0 { + if data == nil { + err = common.ErrConnection + return + } + return data, nil + } + return + } + data = append(data, buf[0:n]...) + if n < common.BUFFER_SIZE { + break + } + } + return } func (a *agent) unpack(data []byte) ([]byte, int, bool) { - tl := len(data) - start := 0 - for i := 0; i < tl+1-common.PACKET_LEN; i++ { - if start+common.PACKET_LEN > tl { // too few data to unpack, read more - return nil, common.PACKET_LEN, false - } - if string(data[start:start+4]) == common.RES_STR { - l := int(common.BytesToUint32([4]byte{data[start+8], - data[start+9], data[start+10], data[start+11]})) - total := l + common.PACKET_LEN - if total == tl { // data is what we want - return data, common.PACKET_LEN, true - } else if total < tl { // data[:total] is what we want, data[total:] is the more - a.in <- data[total:] - data = data[start:total] - return data, common.PACKET_LEN, true - } else { // ops! It won't be possible. - return nil, total - tl, false - } - } else { // flag was not found, move to next step - start++ - } - } - return nil, common.PACKET_LEN, false + tl := len(data) + start := 0 + for i := 0; i < tl+1-common.PACKET_LEN; i++ { + if start+common.PACKET_LEN > tl { // too few data to unpack, read more + return nil, common.PACKET_LEN, false + } + if string(data[start:start+4]) == common.RES_STR { + l := int(common.BytesToUint32([4]byte{data[start+8], + data[start+9], data[start+10], data[start+11]})) + total := l + common.PACKET_LEN + if total == tl { // data is what we want + return data, common.PACKET_LEN, true + } else if total < tl { // data[:total] is what we want, data[total:] is the more + a.in <- data[total:] + data = data[start:total] + return data, common.PACKET_LEN, true + } else { // ops! It won't be possible. + return nil, total - tl, false + } + } else { // flag was not found, move to next step + start++ + } + } + return nil, common.PACKET_LEN, false } func (a *agent) read() (rel []byte, err error) { - var data []byte - ok := false - l := common.PACKET_LEN - for !ok { - inlen := len(a.in) - if inlen > 0 { - // in queue is not empty - for i := 0; i < inlen; i++ { - data = append(data, <-a.in...) - } - } else { - var d []byte - d, err = a.readData(l) - if err != nil { - return - } - data = append(data, d...) - } - rel, l, ok = a.unpack(data) - } - return + var data []byte + ok := false + l := common.PACKET_LEN + for !ok { + inlen := len(a.in) + if inlen > 0 { + // in queue is not empty + for i := 0; i < inlen; i++ { + data = append(data, <-a.in...) + } + } else { + var d []byte + d, err = a.readData(l) + if err != nil { + return + } + data = append(data, d...) + } + rel, l, ok = a.unpack(data) + } + return } // Send a job to the job server. func (a *agent) WriteJob(job *Job) { - a.out <- job + a.out <- job } // Internal write the encoded job. func (a *agent) write(buf []byte) (err error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = a.conn.Write(buf[i:]) - if err != nil { - return err - } - } - return + var n int + for i := 0; i < len(buf); i += n { + n, err = a.conn.Write(buf[i:]) + if err != nil { + return err + } + } + return } diff --git a/worker/common.go b/worker/common.go new file mode 100644 index 0000000..86e8ac7 --- /dev/null +++ b/worker/common.go @@ -0,0 +1,56 @@ +// Copyright 2011 - 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +const ( + NETWORK = "tcp" + // queue size + QUEUE_SIZE = 8 + // read buffer size + BUFFER_SIZE = 1024 + // min packet length + MIN_PACKET_LEN = 12 + + // \x00REQ + REQ = 5391697 + REQ_STR = "\x00REQ" + // \x00RES + RES = 5391699 + RES_STR = "\x00RES" + + // package data type + CAN_DO = 1 + CANT_DO = 2 + RESET_ABILITIES = 3 + PRE_SLEEP = 4 + NOOP = 6 + JOB_CREATED = 8 + GRAB_JOB = 9 + NO_JOB = 10 + JOB_ASSIGN = 11 + WORK_STATUS = 12 + WORK_COMPLETE = 13 + WORK_FAIL = 14 + GET_STATUS = 15 + ECHO_REQ = 16 + ECHO_RES = 17 + ERROR = 19 + STATUS_RES = 20 + SET_CLIENT_ID = 22 + CAN_DO_TIMEOUT = 23 + WORK_EXCEPTION = 25 + WORK_DATA = 28 + WORK_WARNING = 29 + GRAB_JOB_UNIQ = 30 + JOB_ASSIGN_UNIQ = 31 + + SUBMIT_JOB = 7 + SUBMIT_JOB_BG = 18 + SUBMIT_JOB_HIGH = 21 + SUBMIT_JOB_HIGH_BG = 32 + SUBMIT_JOB_LOW = 33 + SUBMIT_JOB_LOW_BG = 34 +) diff --git a/worker/error.go b/worker/error.go new file mode 100644 index 0000000..a1108fa --- /dev/null +++ b/worker/error.go @@ -0,0 +1,45 @@ +// Copyright 2011 - 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + "bytes" + "errors" + "fmt" +) + +var ( + ErrJobTimeOut = errors.New("Do a job time out") + ErrInvalidData = errors.New("Invalid data") + ErrWorkWarning = errors.New("Work warning") + ErrWorkFail = errors.New("Work fail") + ErrWorkException = errors.New("Work exeption") + ErrDataType = errors.New("Invalid data type") + ErrOutOfCap = errors.New("Out of the capability") + ErrNotConn = errors.New("Did not connect to job server") + ErrFuncNotFound = errors.New("The function was not found") + ErrConnection = errors.New("Connection error") + ErrNoActiveAgent = errors.New("No active agent") + ErrTimeOut = errors.New("Executing time out") + ErrUnknown = errors.New("Unknown error") + ErrConnClosed = errors.New("Connection closed") +) + +func DisablePanic() { recover() } + +// Extract the error message +func GetError(data []byte) (err error) { + rel := bytes.SplitN(data, []byte{'\x00'}, 2) + if len(rel) != 2 { + err = fmt.Errorf("Not a error data: %V", data) + return + } + err = errors.New(fmt.Sprintf("%s: %s", rel[0], rel[1])) + return +} + +// An error handler +type ErrorHandler func(error) diff --git a/worker/func.go b/worker/func.go index 374b7dd..84f82c2 100644 --- a/worker/func.go +++ b/worker/func.go @@ -1,31 +1,31 @@ package worker import ( - "runtime" - "encoding/json" + "encoding/json" + "runtime" ) type systemInfo struct { - GOOS, GOARCH, GOROOT, Version string - NumCPU, NumGoroutine int - NumCgoCall int64 + GOOS, GOARCH, GOROOT, Version string + NumCPU, NumGoroutine int + NumCgoCall int64 } func SysInfo(job *Job) ([]byte, error) { - return json.Marshal(&systemInfo{ - GOOS: runtime.GOOS, - GOARCH: runtime.GOARCH, - GOROOT: runtime.GOROOT(), - Version: runtime.Version(), - NumCPU: runtime.NumCPU(), - NumGoroutine: runtime.NumGoroutine(), - NumCgoCall: runtime.NumCgoCall(), - }) + return json.Marshal(&systemInfo{ + GOOS: runtime.GOOS, + GOARCH: runtime.GOARCH, + GOROOT: runtime.GOROOT(), + Version: runtime.Version(), + NumCPU: runtime.NumCPU(), + NumGoroutine: runtime.NumGoroutine(), + NumCgoCall: runtime.NumCgoCall(), + }) } var memState runtime.MemStats func MemInfo(job *Job) ([]byte, error) { - runtime.ReadMemStats(&memState) - return json.Marshal(&memState) + runtime.ReadMemStats(&memState) + return json.Marshal(&memState) } diff --git a/worker/job.go b/worker/job.go deleted file mode 100644 index 6f86989..0000000 --- a/worker/job.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2011 Xing Xing -// All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package worker - -import ( - "bytes" - "strconv" - "github.com/mikespook/gearman-go/common" -) - -// Worker side job -type Job struct { - Data []byte - Handle, UniqueId, Fn string - agent *agent - magicCode, DataType uint32 - c chan bool -} - -// Create a new job -func newJob(magiccode, datatype uint32, data []byte) (job *Job) { - return &Job{magicCode: magiccode, - DataType: datatype, - Data: data, - c: make(chan bool),} -} - -// Decode job from byte slice -func decodeJob(data []byte) (job *Job, err error) { - if len(data) < 12 { - return nil, common.Errorf("Invalid data: %V", data) - } - datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) - if len(data[12:]) != int(l) { - return nil, common.Errorf("Invalid data: %V", data) - } - data = data[12:] - job = &Job{magicCode: common.RES, DataType: datatype, c: make(chan bool),} - switch datatype { - case common.JOB_ASSIGN: - s := bytes.SplitN(data, []byte{'\x00'}, 3) - if len(s) == 3 { - job.Handle = string(s[0]) - job.Fn = string(s[1]) - data = s[2] - } - case common.JOB_ASSIGN_UNIQ: - s := bytes.SplitN(data, []byte{'\x00'}, 4) - if len(s) == 4 { - job.Handle = string(s[0]) - job.Fn = string(s[1]) - job.UniqueId = string(s[2]) - data = s[3] - } - } - job.Data = data - return -} - -// Encode a job to byte slice -func (job *Job) Encode() (data []byte) { - var l int - if job.DataType == common.WORK_FAIL { - l = len(job.Handle) - } else { - l = len(job.Data) - if job.Handle != "" { - l += len(job.Handle) + 1 - } - } - data = make([]byte, 0, l + 12) - - magiccode := common.Uint32ToBytes(job.magicCode) - datatype := common.Uint32ToBytes(job.DataType) - datalength := common.Uint32ToBytes(uint32(l)) - - data = append(data, magiccode[:]...) - data = append(data, datatype[:]...) - data = append(data, datalength[:]...) - if job.Handle != "" { - data = append(data, []byte(job.Handle)...) - if job.DataType != common.WORK_FAIL { - data = append(data, 0) - } - } - data = append(data, job.Data...) - return -} - -// Send some datas to client. -// Using this in a job's executing. -func (job *Job) UpdateData(data []byte, iswarning bool) { - result := append([]byte(job.Handle), 0) - result = append(result, data...) - var datatype uint32 - if iswarning { - datatype = common.WORK_WARNING - } else { - datatype = common.WORK_DATA - } - job.agent.WriteJob(newJob(common.REQ, datatype, result)) -} - -// Update status. -// Tall client how many percent job has been executed. -func (job *Job) UpdateStatus(numerator, denominator int) { - n := []byte(strconv.Itoa(numerator)) - d := []byte(strconv.Itoa(denominator)) - result := append([]byte(job.Handle), '\x00') - result = append(result, n...) - result = append(result, '\x00') - result = append(result, d...) - job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) -} - -// close the job -func (job *Job) Close() { - close(job.c) -} - -// cancel the job executing -func (job *Job) cancel() { - defer func() {recover()}() - job.c <- true -} - -// When a job was canceled, return a true form a channel -func (job *Job) Canceled() <-chan bool { - return job.c -} diff --git a/worker/request.go b/worker/request.go new file mode 100644 index 0000000..cec1b2a --- /dev/null +++ b/worker/request.go @@ -0,0 +1,133 @@ +// Copyright 2011 Xing Xing +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + "bytes" + "strconv" +) + +// Worker side job +type Job struct { + Data []byte + Handle, UniqueId, Fn string + agent *agent + magicCode, DataType uint32 + c chan bool +} + +// Create a new job +func newJob(magiccode, datatype uint32, data []byte) (job *Job) { + return &Job{magicCode: magiccode, + DataType: datatype, + Data: data, + c: make(chan bool)} +} + +// Decode job from byte slice +func decodeJob(data []byte) (job *Job, err error) { + if len(data) < 12 { + return nil, common.Errorf("Invalid data: %V", data) + } + datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + return nil, common.Errorf("Invalid data: %V", data) + } + data = data[12:] + job = &Job{magicCode: common.RES, DataType: datatype, c: make(chan bool)} + switch datatype { + case common.JOB_ASSIGN: + s := bytes.SplitN(data, []byte{'\x00'}, 3) + if len(s) == 3 { + job.Handle = string(s[0]) + job.Fn = string(s[1]) + data = s[2] + } + case common.JOB_ASSIGN_UNIQ: + s := bytes.SplitN(data, []byte{'\x00'}, 4) + if len(s) == 4 { + job.Handle = string(s[0]) + job.Fn = string(s[1]) + job.UniqueId = string(s[2]) + data = s[3] + } + } + job.Data = data + return +} + +// Encode a job to byte slice +func (job *Job) Encode() (data []byte) { + var l int + if job.DataType == common.WORK_FAIL { + l = len(job.Handle) + } else { + l = len(job.Data) + if job.Handle != "" { + l += len(job.Handle) + 1 + } + } + data = make([]byte, 0, l+12) + + magiccode := common.Uint32ToBytes(job.magicCode) + datatype := common.Uint32ToBytes(job.DataType) + datalength := common.Uint32ToBytes(uint32(l)) + + data = append(data, magiccode[:]...) + data = append(data, datatype[:]...) + data = append(data, datalength[:]...) + if job.Handle != "" { + data = append(data, []byte(job.Handle)...) + if job.DataType != common.WORK_FAIL { + data = append(data, 0) + } + } + data = append(data, job.Data...) + return +} + +// Send some datas to client. +// Using this in a job's executing. +func (job *Job) UpdateData(data []byte, iswarning bool) { + result := append([]byte(job.Handle), 0) + result = append(result, data...) + var datatype uint32 + if iswarning { + datatype = common.WORK_WARNING + } else { + datatype = common.WORK_DATA + } + job.agent.WriteJob(newJob(common.REQ, datatype, result)) +} + +// Update status. +// Tall client how many percent job has been executed. +func (job *Job) UpdateStatus(numerator, denominator int) { + n := []byte(strconv.Itoa(numerator)) + d := []byte(strconv.Itoa(denominator)) + result := append([]byte(job.Handle), '\x00') + result = append(result, n...) + result = append(result, '\x00') + result = append(result, d...) + job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) +} + +// close the job +func (job *Job) Close() { + close(job.c) +} + +// cancel the job executing +func (job *Job) cancel() { + defer func() { recover() }() + job.c <- true +} + +// When a job was canceled, return a true form a channel +func (job *Job) Canceled() <-chan bool { + return job.c +} diff --git a/worker/response.go b/worker/response.go new file mode 100644 index 0000000..cec1b2a --- /dev/null +++ b/worker/response.go @@ -0,0 +1,133 @@ +// Copyright 2011 Xing Xing +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + "bytes" + "strconv" +) + +// Worker side job +type Job struct { + Data []byte + Handle, UniqueId, Fn string + agent *agent + magicCode, DataType uint32 + c chan bool +} + +// Create a new job +func newJob(magiccode, datatype uint32, data []byte) (job *Job) { + return &Job{magicCode: magiccode, + DataType: datatype, + Data: data, + c: make(chan bool)} +} + +// Decode job from byte slice +func decodeJob(data []byte) (job *Job, err error) { + if len(data) < 12 { + return nil, common.Errorf("Invalid data: %V", data) + } + datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + return nil, common.Errorf("Invalid data: %V", data) + } + data = data[12:] + job = &Job{magicCode: common.RES, DataType: datatype, c: make(chan bool)} + switch datatype { + case common.JOB_ASSIGN: + s := bytes.SplitN(data, []byte{'\x00'}, 3) + if len(s) == 3 { + job.Handle = string(s[0]) + job.Fn = string(s[1]) + data = s[2] + } + case common.JOB_ASSIGN_UNIQ: + s := bytes.SplitN(data, []byte{'\x00'}, 4) + if len(s) == 4 { + job.Handle = string(s[0]) + job.Fn = string(s[1]) + job.UniqueId = string(s[2]) + data = s[3] + } + } + job.Data = data + return +} + +// Encode a job to byte slice +func (job *Job) Encode() (data []byte) { + var l int + if job.DataType == common.WORK_FAIL { + l = len(job.Handle) + } else { + l = len(job.Data) + if job.Handle != "" { + l += len(job.Handle) + 1 + } + } + data = make([]byte, 0, l+12) + + magiccode := common.Uint32ToBytes(job.magicCode) + datatype := common.Uint32ToBytes(job.DataType) + datalength := common.Uint32ToBytes(uint32(l)) + + data = append(data, magiccode[:]...) + data = append(data, datatype[:]...) + data = append(data, datalength[:]...) + if job.Handle != "" { + data = append(data, []byte(job.Handle)...) + if job.DataType != common.WORK_FAIL { + data = append(data, 0) + } + } + data = append(data, job.Data...) + return +} + +// Send some datas to client. +// Using this in a job's executing. +func (job *Job) UpdateData(data []byte, iswarning bool) { + result := append([]byte(job.Handle), 0) + result = append(result, data...) + var datatype uint32 + if iswarning { + datatype = common.WORK_WARNING + } else { + datatype = common.WORK_DATA + } + job.agent.WriteJob(newJob(common.REQ, datatype, result)) +} + +// Update status. +// Tall client how many percent job has been executed. +func (job *Job) UpdateStatus(numerator, denominator int) { + n := []byte(strconv.Itoa(numerator)) + d := []byte(strconv.Itoa(denominator)) + result := append([]byte(job.Handle), '\x00') + result = append(result, n...) + result = append(result, '\x00') + result = append(result, d...) + job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) +} + +// close the job +func (job *Job) Close() { + close(job.c) +} + +// cancel the job executing +func (job *Job) cancel() { + defer func() { recover() }() + job.c <- true +} + +// When a job was canceled, return a true form a channel +func (job *Job) Canceled() <-chan bool { + return job.c +} diff --git a/worker/worker.go b/worker/worker.go index e96118c..416e942 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,20 +5,20 @@ package worker import ( - "time" - "github.com/mikespook/gearman-go/common" + "time" ) const ( - Unlimited = 0 - OneByOne = 1 + Unlimited = 0 + OneByOne = 1 - Immediately = 0 + Immediately = 0 ) var ( - ErrConnection = common.ErrConnection + ErrConnection = common.ErrConnection ) + // Job handler type JobHandler func(*Job) error @@ -26,8 +26,8 @@ type JobFunc func(*Job) ([]byte, error) // The definition of the callback function. type jobFunc struct { - f JobFunc - timeout uint32 + f JobFunc + timeout uint32 } // Map for added function. @@ -52,266 +52,266 @@ func foobar(job *Job) (data []byte, err os.Error) { } */ type Worker struct { - agents []*agent - funcs JobFuncs - in chan *Job - running bool - limit chan bool + agents []*agent + funcs JobFuncs + in chan *Job + running bool + limit chan bool - Id string - // assign a ErrFunc to handle errors - ErrHandler common.ErrorHandler - JobHandler JobHandler + Id string + // assign a ErrFunc to handle errors + ErrHandler common.ErrorHandler + JobHandler JobHandler } // Get a new worker func New(l int) (worker *Worker) { - worker = &Worker{ - agents: make([]*agent, 0), - funcs: make(JobFuncs), - in: make(chan *Job, common.QUEUE_SIZE), - } - if l != Unlimited { - worker.limit = make(chan bool, l) - } - return + worker = &Worker{ + agents: make([]*agent, 0), + funcs: make(JobFuncs), + in: make(chan *Job, common.QUEUE_SIZE), + } + if l != Unlimited { + worker.limit = make(chan bool, l) + } + return } -// -func (worker *Worker)err(e error) { - if worker.ErrHandler != nil { - worker.ErrHandler(e) - } +// +func (worker *Worker) err(e error) { + if worker.ErrHandler != nil { + worker.ErrHandler(e) + } } // Add a server. The addr should be 'host:port' format. // The connection is established at this time. func (worker *Worker) AddServer(addr string) (err error) { - // Create a new job server's client as a agent of server - server, err := newAgent(addr, worker) - if err != nil { - return err - } - worker.agents = append(worker.agents, server) - return + // Create a new job server's client as a agent of server + server, err := newAgent(addr, worker) + if err != nil { + return err + } + worker.agents = append(worker.agents, server) + return } // Write a job to job server. // Here, the job's mean is not the oraginal mean. // Just looks like a network package for job's result or tell job server, there was a fail. func (worker *Worker) broadcast(job *Job) { - for _, v := range worker.agents { - v.WriteJob(job) - } + for _, v := range worker.agents { + v.WriteJob(job) + } } // Add a function. // Plz added job servers first, then functions. // The API will tell every connected job server that 'I can do this' func (worker *Worker) AddFunc(funcname string, -f JobFunc, timeout uint32) (err error) { - if _, ok := worker.funcs[funcname]; ok { - return common.Errorf("The function already exists: %s", funcname) - } - worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout} + f JobFunc, timeout uint32) (err error) { + if _, ok := worker.funcs[funcname]; ok { + return common.Errorf("The function already exists: %s", funcname) + } + worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout} - if worker.running { - worker.addFunc(funcname, timeout) - } - return + if worker.running { + worker.addFunc(funcname, timeout) + } + return } // inner add function func (worker *Worker) addFunc(funcname string, timeout uint32) { - var datatype uint32 - var data []byte - if timeout == 0 { - datatype = common.CAN_DO - data = []byte(funcname) - } else { - datatype = common.CAN_DO_TIMEOUT - data = []byte(funcname + "\x00") - t := common.Uint32ToBytes(timeout) - data = append(data, t[:]...) - } - job := newJob(common.REQ, datatype, data) - worker.broadcast(job) + var datatype uint32 + var data []byte + if timeout == 0 { + datatype = common.CAN_DO + data = []byte(funcname) + } else { + datatype = common.CAN_DO_TIMEOUT + data = []byte(funcname + "\x00") + t := common.Uint32ToBytes(timeout) + data = append(data, t[:]...) + } + job := newJob(common.REQ, datatype, data) + worker.broadcast(job) } // Remove a function. // Tell job servers 'I can not do this now' at the same time. func (worker *Worker) RemoveFunc(funcname string) (err error) { - if _, ok := worker.funcs[funcname]; !ok { - return common.Errorf("The function does not exist: %s", funcname) - } - delete(worker.funcs, funcname) - if worker.running { - worker.removeFunc(funcname) - } - return + if _, ok := worker.funcs[funcname]; !ok { + return common.Errorf("The function does not exist: %s", funcname) + } + delete(worker.funcs, funcname) + if worker.running { + worker.removeFunc(funcname) + } + return } // inner remove function func (worker *Worker) removeFunc(funcname string) { - job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) - worker.broadcast(job) + job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) + worker.broadcast(job) } func (worker *Worker) dealJob(job *Job) { - defer func() { - job.Close() - if worker.running && worker.limit != nil { - <-worker.limit - } - }() - switch job.DataType { - case common.ERROR: - _, err := common.GetError(job.Data) - worker.err(err) - case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: - if err := worker.exec(job); err != nil { - worker.err(err) - } - default: - worker.handleJob(job) - } + defer func() { + job.Close() + if worker.running && worker.limit != nil { + <-worker.limit + } + }() + switch job.DataType { + case common.ERROR: + _, err := common.GetError(job.Data) + worker.err(err) + case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: + if err := worker.exec(job); err != nil { + worker.err(err) + } + default: + worker.handleJob(job) + } } // Main loop func (worker *Worker) Work() { - defer func() { - for _, v := range worker.agents { - v.Close() - } - }() - worker.running = true - for _, v := range worker.agents { - go v.Work() - } - for funcname, f := range worker.funcs { - worker.addFunc(funcname, f.timeout) - } - ok := true - for ok { - var job *Job - if job, ok = <-worker.in; ok { - go worker.dealJob(job) - } - } + defer func() { + for _, v := range worker.agents { + v.Close() + } + }() + worker.running = true + for _, v := range worker.agents { + go v.Work() + } + for funcname, f := range worker.funcs { + worker.addFunc(funcname, f.timeout) + } + ok := true + for ok { + var job *Job + if job, ok = <-worker.in; ok { + go worker.dealJob(job) + } + } } // job handler func (worker *Worker) handleJob(job *Job) { - if worker.JobHandler != nil { - if err := worker.JobHandler(job); err != nil { - worker.err(err) - } - } + if worker.JobHandler != nil { + if err := worker.JobHandler(job); err != nil { + worker.err(err) + } + } } // Close. func (worker *Worker) Close() { - worker.running = false - close(worker.in) - if worker.limit != nil { - close(worker.limit) - } + worker.running = false + close(worker.in) + if worker.limit != nil { + close(worker.limit) + } } // Send a something out, get the samething back. func (worker *Worker) Echo(data []byte) { - job := newJob(common.REQ, common.ECHO_REQ, data) - worker.broadcast(job) + job := newJob(common.REQ, common.ECHO_REQ, data) + worker.broadcast(job) } // Remove all of functions. // Both from the worker or job servers. func (worker *Worker) Reset() { - job := newJob(common.REQ, common.RESET_ABILITIES, nil) - worker.broadcast(job) - worker.funcs = make(JobFuncs) + job := newJob(common.REQ, common.RESET_ABILITIES, nil) + worker.broadcast(job) + worker.funcs = make(JobFuncs) } // Set the worker's unique id. func (worker *Worker) SetId(id string) { - worker.Id = id - job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) - worker.broadcast(job) + worker.Id = id + job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) + worker.broadcast(job) } // Execute the job. And send back the result. func (worker *Worker) exec(job *Job) (err error) { - defer func() { - if r := recover(); r != nil { - if e, ok := r.(error); ok { - err = e - } else { - err = common.ErrUnknown - } - } - } () - f, ok := worker.funcs[job.Fn] - if !ok { - return common.Errorf("The function does not exist: %s", job.Fn) - } - var r *result - if f.timeout == 0 { - d, e := f.f(job) - r = &result{data:d, err: e} - } else { - r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second) - } - var datatype uint32 - if r.err == nil { - datatype = common.WORK_COMPLETE - } else { - if r.data == nil { - datatype = common.WORK_FAIL - } else { - datatype = common.WORK_EXCEPTION - } - err = r.err - } + defer func() { + if r := recover(); r != nil { + if e, ok := r.(error); ok { + err = e + } else { + err = common.ErrUnknown + } + } + }() + f, ok := worker.funcs[job.Fn] + if !ok { + return common.Errorf("The function does not exist: %s", job.Fn) + } + var r *result + if f.timeout == 0 { + d, e := f.f(job) + r = &result{data: d, err: e} + } else { + r = execTimeout(f.f, job, time.Duration(f.timeout)*time.Second) + } + var datatype uint32 + if r.err == nil { + datatype = common.WORK_COMPLETE + } else { + if r.data == nil { + datatype = common.WORK_FAIL + } else { + datatype = common.WORK_EXCEPTION + } + err = r.err + } - job.magicCode = common.REQ - job.DataType = datatype - job.Data = r.data - if worker.running { - job.agent.WriteJob(job) - } - return + job.magicCode = common.REQ + job.DataType = datatype + job.Data = r.data + if worker.running { + job.agent.WriteJob(job) + } + return } func (worker *Worker) removeAgent(a *agent) { - for k, v := range worker.agents { - if v == a { - worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...) - } - } - if len(worker.agents) == 0 { - worker.err(common.ErrNoActiveAgent) - } + for k, v := range worker.agents { + if v == a { + worker.agents = append(worker.agents[:k], worker.agents[k+1:]...) + } + } + if len(worker.agents) == 0 { + worker.err(common.ErrNoActiveAgent) + } } type result struct { - data []byte - err error + data []byte + err error } func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) { - rslt := make(chan *result) - defer close(rslt) - go func() { - defer func() {recover()}() - d, e := f(job) - rslt <- &result{data: d, err: e} - }() - select { - case r = <-rslt: - case <-time.After(timeout): - go job.cancel() - return &result{err:common.ErrTimeOut} - } - return r + rslt := make(chan *result) + defer close(rslt) + go func() { + defer func() { recover() }() + d, e := f(job) + rslt <- &result{data: d, err: e} + }() + select { + case r = <-rslt: + case <-time.After(timeout): + go job.cancel() + return &result{err: common.ErrTimeOut} + } + return r } diff --git a/worker/worker_test.go b/worker/worker_test.go index 25f5836..99b8c0f 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -5,40 +5,40 @@ import "testing" var worker *Worker func init() { - worker = New(Unlimited) + worker = New(Unlimited) } func TestWorkerAddServer(t *testing.T) { - t.Log("Add local server 127.0.0.1:4730.") - if err := worker.AddServer("127.0.0.1:4730"); err != nil { - t.Error(err) - } + t.Log("Add local server 127.0.0.1:4730.") + if err := worker.AddServer("127.0.0.1:4730"); err != nil { + t.Error(err) + } - if l := len(worker.agents); l != 1 { - t.Log(worker.agents) - t.Error("The length of server list should be 1.") - } + if l := len(worker.agents); l != 1 { + t.Log(worker.agents) + t.Error("The length of server list should be 1.") + } } func foobar(job *Job) ([]byte, error) { - return nil, nil + return nil, nil } func TestWorkerAddFunction(t *testing.T) { - if err := worker.AddFunc("foobar", foobar, 0); err != nil { - t.Error(err) - } - if err := worker.AddFunc("timeout", foobar, 5); err != nil { - t.Error(err) - } - if l := len(worker.funcs); l != 2 { - t.Log(worker.funcs) - t.Errorf("The length of function map should be %d.", 2) - } + if err := worker.AddFunc("foobar", foobar, 0); err != nil { + t.Error(err) + } + if err := worker.AddFunc("timeout", foobar, 5); err != nil { + t.Error(err) + } + if l := len(worker.funcs); l != 2 { + t.Log(worker.funcs) + t.Errorf("The length of function map should be %d.", 2) + } } func TestWorkerRemoveFunc(t *testing.T) { - if err := worker.RemoveFunc("foobar"); err != nil { - t.Error(err) - } + if err := worker.RemoveFunc("foobar"); err != nil { + t.Error(err) + } }