From e5c30068cd759fc6d79c605505e49c4d36eca8a0 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Thu, 29 Aug 2013 16:51:23 +0800 Subject: [PATCH] 0.2 refactoring begining --- client/client.go | 492 ++++++++++++++++-------------------------- client/client_test.go | 139 ++++++------ client/common.go | 75 +++++++ client/common_test.go | 48 +++++ client/error.go | 49 +++++ client/handler.go | 10 +- client/id.go | 20 +- client/job.go | 142 ------------ client/pool.go | 197 +++++++++-------- client/pool_test.go | 156 +++++++------- client/request.go | 47 ++++ client/response.go | 129 +++++++++++ client/status.go | 6 +- 13 files changed, 798 insertions(+), 712 deletions(-) create mode 100644 client/common.go create mode 100644 client/common_test.go create mode 100644 client/error.go delete mode 100644 client/job.go create mode 100644 client/request.go create mode 100644 client/response.go diff --git a/client/client.go b/client/client.go index cdfdd5f..2bd209f 100644 --- a/client/client.go +++ b/client/client.go @@ -6,28 +6,13 @@ package client import ( - "io" - "net" - "sync" - "time" - "bytes" - "strconv" - "github.com/mikespook/gearman-go/common" + "io" + "net" + "sync" + "github.com/mikespook/golib/idgen" ) -var ( - IdGen IdGenerator -) - -func init() { - IdGen = NewObjectId() -} - -// Status handler -// handle, known, running, numerator, denominator -type StatusHandler func(string, bool, bool, uint64, uint64) - -/* +/* The client side api for gearman usage: @@ -36,267 +21,164 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) */ type Client struct { - ErrHandler common.ErrorHandler - TimeOut time.Duration + net, addr string + respHandler map[string]ResponseHandler + createdHandler ResponseHandler + in chan []byte + isConn bool + conn net.Conn + mutex sync.RWMutex + ErrorHandler ErrorHandler - in chan []byte - out chan *Job - - created chan string - echo chan []byte - status chan *Status - - jobhandlers map[string]JobHandler - - isConn bool - conn net.Conn - addr string - mutex sync.RWMutex + IdGen idgen.IdGen } // Create a new client. // Connect to "addr" through "network" // Eg. // client, err := client.New("127.0.0.1:4730") -func New(addr string) (client *Client, err error) { - client = &Client{ - created: make(chan string, common.QUEUE_SIZE), - echo: make(chan []byte, common.QUEUE_SIZE), - status: make(chan *Status, common.QUEUE_SIZE), - - jobhandlers: make(map[string]JobHandler, common.QUEUE_SIZE), - - in: make(chan []byte, common.QUEUE_SIZE), - out: make(chan *Job, common.QUEUE_SIZE), - addr: addr, - TimeOut: time.Second, - } - if err = client.connect(); err != nil { - return - } - client.isConn = true - go client.inLoop() - go client.outLoop() - return +func New(net, addr string) (client *Client, err error) { + client = &Client{ + net: net, + addr: addr, + respHandler: make(map[string]ResponseHandler, QUEUE_SIZE), + in: make(chan []byte, QUEUE_SIZE), + } + if err = client.connect(); err != nil { + return + } + client.isConn = true + go client.readLoop() + go client.processLoop() + return } // {{{ private functions -// +// func (client *Client) connect() (err error) { - client.conn, err = net.Dial(common.NETWORK, client.addr) - return + client.conn, err = net.Dial(client.net, client.addr) + return } // Internal write -func (client *Client) write(buf []byte) (err error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = client.conn.Write(buf[i:]) - if err != nil { - return - } - } - return +func (client *Client) write(req *request) (err error) { + var n int + buf := req.Encode() + for i := 0; i < len(buf); i += n { + n, err = client.conn.Write(buf[i:]) + if err != nil { + return + } + } + return } // read length bytes from the socket -func (client *Client) readData(length int) (data []byte, err error) { - n := 0 - buf := make([]byte, common.BUFFER_SIZE) - // read until data can be unpacked - for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n { - if n, err = client.conn.Read(buf); err != nil { - if !client.isConn { - return nil, common.ErrConnClosed - } - if err == io.EOF && n == 0 { - if data == nil { - err = common.ErrConnection - return - } - return data, nil - } - return - } - data = append(data, buf[0:n]...) - if n < common.BUFFER_SIZE { - break - } - } - return +func (client *Client) read(length int) (data []byte, err error) { + n := 0 + buf := getBuffer(BUFFER_SIZE) + // read until data can be unpacked + for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n { + if n, err = client.conn.Read(buf); err != nil { + if !client.isConn { + err = ErrConnClosed + return + } + if err == io.EOF && n == 0 { + if data == nil { + err = ErrConnection + } + } + return + } + data = append(data, buf[0:n]...) + if n < BUFFER_SIZE { + break + } + } + return } -// unpack data -func (client *Client) unpack(data []byte) ([]byte, int, bool) { - tl := len(data) - start := 0 - for i := 0; i < tl+1-common.PACKET_LEN; i++ { - if start+common.PACKET_LEN > tl { // too few data to unpack, read more - return nil, common.PACKET_LEN, false - } - if string(data[start:start+4]) == common.RES_STR { - l := int(common.BytesToUint32([4]byte{data[start+8], - data[start+9], data[start+10], data[start+11]})) - total := l + common.PACKET_LEN - if total == tl { // data is what we want - return data, common.PACKET_LEN, true - } else if total < tl { // data[:total] is what we want, data[total:] is the more - client.in <- data[total:] - data = data[start:total] - return data, common.PACKET_LEN, true - } else { // ops! It won't be possible. - return nil, total - tl, false - } - } else { // flag was not found, move to next step - start++ - } - } - return nil, common.PACKET_LEN, false +// read data from socket +func (client *Client) readLoop() { + var data []byte + var err error + for client.isConn { + if data, err = client.read(BUFFER_SIZE); err != nil { + if err == ErrConnClosed { + break + } + client.err(err) + continue + } + client.in <- data + } + close(client.in) } -// Internal read -func (client *Client) read() (rel []byte, err error) { - var data []byte - ok := false - l := common.PACKET_LEN - for !ok { - inlen := len(client.in) - if inlen > 0 { - // in queue is not empty - for i := 0; i < inlen; i++ { - data = append(data, <-client.in...) - } - } else { - var d []byte - d, err = client.readData(l) - if err != nil { - return - } - data = append(data, d...) - } - rel, l, ok = client.unpack(data) - } - return -} - -// out loop -func (client *Client) outLoop() { - for job := range client.out { - if err := client.write(job.Encode()); err != nil { - client.err(err) - } - } -} - -// in loop -func (client *Client) inLoop() { - defer common.DisablePanic() - for { - rel, err := client.read() - if err != nil { - if err == common.ErrConnection { - client.Close() - } - if err != common.ErrConnClosed { - client.err(err) - } - break - } - job, err := decodeJob(rel) - if err != nil { - client.err(err) - continue - //break - } - switch job.DataType { - case common.ERROR: - _, err := common.GetError(job.Data) - client.err(err) - case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, - common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION: - client.handleJob(job) - case common.ECHO_RES: - client.handleEcho(job) - case common.JOB_CREATED: - client.handleCreated(job) - case common.STATUS_RES: - client.handleStatus(job) - default: - break - } - } +// decode data & process it +func (client *Client) processLoop() { + var resp *response + var l int + var err error + var data, leftdata []byte + for data = range client.in { + l = len(data) + if len(leftdata) > 0 { // some data left for processing + data = append(leftdata, data ...) + } + if l < MIN_PACKET_LEN { // not enough data + leftdata = data + continue + } + if resp, l, err = decodeResponse(data); err != nil { + client.err(err) + continue + } + switch resp.DataType { + case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, +WORK_FAIL, WORK_EXCEPTION: + client.handleResponse(string(resp.Handle), resp) + } + if len(data) > l { + leftdata = data[l:] + } + } } // error handler -func (client *Client) err (e error) { - if client.ErrHandler != nil { - client.ErrHandler(e) - } +func (client *Client) err(e error) { + if client.ErrorHandler != nil { + client.ErrorHandler(e) + } } // job handler -func (client *Client) handleJob(job *Job) { - client.mutex.RLock() - defer client.mutex.RUnlock() - if h, ok := client.jobhandlers[job.Handle]; ok { - h(job) - delete(client.jobhandlers, job.Handle) - } -} - -func (client *Client) handleEcho(job *Job) { - client.echo <- job.Data -} - -func (client *Client) handleCreated(job *Job) { - client.created <- string(job.Data) -} - -// status handler -func (client *Client) handleStatus(job *Job) { - data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) - if len(data) != 5 { - client.err(common.Errorf("Invalid data: %V", job.Data)) - return - } - status := &Status{} - status.Handle = string(data[0]) - status.Known = (data[1][0] == '1') - status.Running = (data[2][0] == '1') - var err error - status.Numerator, err = strconv.ParseUint(string(data[3]), 10, 0) - if err != nil { - client.err(common.Errorf("Invalid Integer: %s", data[3])) - return - } - status.Denominator, err = strconv.ParseUint(string(data[4]), 10, 0) - if err != nil { - client.err(common.Errorf("Invalid Integer: %s", data[4])) - return - } - client.status <- status -} - -// Send the job to job server. -func (client *Client) writeJob(job *Job) { - client.out <- job +func (client *Client) handleResponse(key string, resp *response) { + client.mutex.RLock() + defer client.mutex.RUnlock() + if h, ok := client.respHandler[key]; ok { + h(resp) + delete(client.respHandler, string(resp.Handle)) + } } // Internal do func (client *Client) do(funcname string, data []byte, -flag uint32, id string) (handle string) { - l := len(funcname) + len(id) + len(data) + 2 - rel := make([]byte, 0, l) - rel = append(rel, []byte(funcname)...) // len(funcname) - rel = append(rel, '\x00') // 1 Byte - rel = append(rel, []byte(id)...) // len(uid) - rel = append(rel, '\x00') // 1 Byte - rel = append(rel, data...) // len(data) - client.writeJob(newJob(common.REQ, flag, rel)) - // Waiting for JOB_CREATED - handle = <-client.created - return + flag uint32) (handle []byte) { + req := getJob(funcname, client.IdGen.Id().(string), data) + client.mutex.Lock() + defer client.mutex.Unlock() + client.write(req) + var wg sync.WaitGroup + wg.Add(1) + client.createdHandler = func(resp *response) { + defer wg.Done() + handle = resp.Handle + } + wg.Wait() + return } // }}} @@ -304,77 +186,73 @@ flag uint32, id string) (handle string) { // Do the function. // funcname is a string with function name. // data is encoding to byte array. -// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, -// and if it is background job: JOB_BG. -// JOB_LOW | JOB_BG means the job is running with low level in background. +// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH func (client *Client) Do(funcname string, data []byte, -flag byte, jobhandler JobHandler) (handle string) { - var datatype uint32 - switch flag { - case JOB_LOW : - datatype = common.SUBMIT_JOB_LOW - case JOB_HIGH : - datatype = common.SUBMIT_JOB_HIGH - default: - datatype = common.SUBMIT_JOB - } - id := IdGen.Id() - client.mutex.Lock() - defer client.mutex.Unlock() - handle = client.do(funcname, data, datatype, id) - if jobhandler != nil { - client.jobhandlers[handle] = jobhandler - } - return + flag byte, h ResponseHandler) (handle []byte) { + var datatype uint32 + switch flag { + case JOB_LOW: + datatype = SUBMIT_JOB_LOW + case JOB_HIGH: + datatype = SUBMIT_JOB_HIGH + default: + datatype = SUBMIT_JOB + } + handle = client.do(funcname, data, datatype) + client.mutex.Lock() + defer client.mutex.Unlock() + if h != nil { + client.respHandler[string(handle)] = h + } + return } +// Do the function at background. +// funcname is a string with function name. +// data is encoding to byte array. +// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH func (client *Client) DoBg(funcname string, data []byte, -flag byte) (handle string) { - var datatype uint32 - switch flag { - case JOB_LOW : - datatype = common.SUBMIT_JOB_LOW_BG - case JOB_HIGH : - datatype = common.SUBMIT_JOB_HIGH_BG - default: - datatype = common.SUBMIT_JOB_BG - } - handle = client.do(funcname, data, datatype, "") - return + flag byte) (handle []byte) { + var datatype uint32 + switch flag { + case JOB_LOW: + datatype = SUBMIT_JOB_LOW_BG + case JOB_HIGH: + datatype = SUBMIT_JOB_HIGH_BG + default: + datatype = SUBMIT_JOB_BG + } + handle = client.do(funcname, data, datatype) + return } - // Get job status from job server. // !!!Not fully tested.!!! -func (client *Client) Status(handle string, timeout time.Duration) (status *Status, err error) { - client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle))) - select { - case status = <-client.status: - case <-time.After(timeout): - err = common.ErrTimeOut - } - return +func (client *Client) Status(handle []byte, h ResponseHandler) (err error) { + req := getRequest() + req.DataType = GET_STATUS + req.Data = handle + client.write(req) + if h != nil { + client.respHandler["status-" + string(handle)] = h + } + return } // Send a something out, get the samething back. -func (client *Client) Echo(data []byte, timeout time.Duration) (r []byte, err error) { - client.writeJob(newJob(common.REQ, common.ECHO_REQ, data)) - select { - case r = <-client.echo: - case <-time.After(timeout): - err = common.ErrTimeOut - } - return +func (client *Client) Echo(data []byte, h ResponseHandler) (err error) { + req := getRequest() + req.DataType = ECHO_REQ + req.Data = data + client.write(req) + if h != nil { + client.respHandler["echo"] = h + } + return } // Close func (client *Client) Close() (err error) { - client.isConn = false - close(client.in) - close(client.out) - - close(client.echo) - close(client.created) - close(client.status) - return client.conn.Close(); + client.isConn = false + return client.conn.Close() } diff --git a/client/client_test.go b/client/client_test.go index 11269d7..4fb1910 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,96 +1,95 @@ package client import ( - "time" - "testing" + "testing" + "time" ) var client *Client func TestClientAddServer(t *testing.T) { - t.Log("Add local server 127.0.0.1:4730") - var err error - if client, err = New("127.0.0.1:4730"); err != nil { - t.Error(err) - return - } - client.ErrHandler = func(e error) { - t.Log(e) - } + t.Log("Add local server 127.0.0.1:4730") + var err error + if client, err = New("127.0.0.1:4730"); err != nil { + t.Error(err) + return + } + client.ErrHandler = func(e error) { + t.Log(e) + } } func TestClientEcho(t *testing.T) { - echo, err := client.Echo([]byte("Hello world"), time.Second) - if err != nil { - t.Error(err) - return - } - if string(echo) != "Hello world" { - t.Errorf("Invalid echo data: %s", echo) - return - } + echo, err := client.Echo([]byte("Hello world"), time.Second) + if err != nil { + t.Error(err) + return + } + if string(echo) != "Hello world" { + t.Errorf("Invalid echo data: %s", echo) + return + } } func TestClientDoBg(t *testing.T) { - if handle := client.DoBg("ToUpper", []byte("abcdef"), - JOB_LOW); handle == "" { - t.Error("Handle is empty.") - } + if handle := client.DoBg("ToUpper", []byte("abcdef"), + JOB_LOW); handle == "" { + t.Error("Handle is empty.") + } } func TestClientDo(t *testing.T) { - jobHandler := func(job *Job) { - str := string(job.Data) - if str == "ABCDEF" { - t.Log(str) - } else { - t.Errorf("Invalid data: %s", job.Data) - } - return - } - if handle := client.Do("ToUpper", []byte("abcdef"), - JOB_LOW, jobHandler); handle == "" { - t.Error("Handle is empty.") - } else { - t.Log(handle) - } + jobHandler := func(job *Job) { + str := string(job.Data) + if str == "ABCDEF" { + t.Log(str) + } else { + t.Errorf("Invalid data: %s", job.Data) + } + return + } + if handle := client.Do("ToUpper", []byte("abcdef"), + JOB_LOW, jobHandler); handle == "" { + t.Error("Handle is empty.") + } else { + t.Log(handle) + } } func TestClientStatus(t *testing.T) { - s1, err := client.Status("handle not exists", time.Second) - if err != nil { - t.Error(err) - return - } - if s1.Known { - t.Errorf("The job (%s) shouldn't be known.", s1.Handle) - return - } - if s1.Running { - t.Errorf("The job (%s) shouldn't be running.", s1.Handle) - return - } + s1, err := client.Status("handle not exists", time.Second) + if err != nil { + t.Error(err) + return + } + if s1.Known { + t.Errorf("The job (%s) shouldn't be known.", s1.Handle) + return + } + if s1.Running { + t.Errorf("The job (%s) shouldn't be running.", s1.Handle) + return + } - handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil); - s2, err := client.Status(handle, time.Second) - if err != nil { - t.Error(err) - return - } - if !s2.Known { - t.Errorf("The job (%s) should be known.", s2.Handle) - return - } - if s2.Running { - t.Errorf("The job (%s) shouldn't be running.", s2.Handle) - return - } + handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil) + s2, err := client.Status(handle, time.Second) + if err != nil { + t.Error(err) + return + } + if !s2.Known { + t.Errorf("The job (%s) should be known.", s2.Handle) + return + } + if s2.Running { + t.Errorf("The job (%s) shouldn't be running.", s2.Handle) + return + } } - func TestClientClose(t *testing.T) { - if err := client.Close(); err != nil { - t.Error(err) - } + if err := client.Close(); err != nil { + t.Error(err) + } } diff --git a/client/common.go b/client/common.go new file mode 100644 index 0000000..4636bfb --- /dev/null +++ b/client/common.go @@ -0,0 +1,75 @@ +// Copyright 2011 - 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +const ( + NETWORK = "tcp" + // queue size + QUEUE_SIZE = 8 + // read buffer size + BUFFER_SIZE = 1024 + // min packet length + MIN_PACKET_LEN = 12 + + // \x00REQ + REQ = 5391697 + REQ_STR = "\x00REQ" + // \x00RES + RES = 5391699 + RES_STR = "\x00RES" + + // package data type + CAN_DO = 1 + CANT_DO = 2 + RESET_ABILITIES = 3 + PRE_SLEEP = 4 + NOOP = 6 + JOB_CREATED = 8 + GRAB_JOB = 9 + NO_JOB = 10 + JOB_ASSIGN = 11 + WORK_STATUS = 12 + WORK_COMPLETE = 13 + WORK_FAIL = 14 + GET_STATUS = 15 + ECHO_REQ = 16 + ECHO_RES = 17 + ERROR = 19 + STATUS_RES = 20 + SET_CLIENT_ID = 22 + CAN_DO_TIMEOUT = 23 + WORK_EXCEPTION = 25 + WORK_DATA = 28 + WORK_WARNING = 29 + GRAB_JOB_UNIQ = 30 + JOB_ASSIGN_UNIQ = 31 + + SUBMIT_JOB = 7 + SUBMIT_JOB_BG = 18 + SUBMIT_JOB_HIGH = 21 + SUBMIT_JOB_HIGH_BG = 32 + SUBMIT_JOB_LOW = 33 + SUBMIT_JOB_LOW_BG = 34 +) + +const ( + // Job type + // JOB_NORMAL | JOB_BG means a normal level job run in background + // normal level + JOB_NORMAL = 0 + // background job + JOB_BG = 1 + // low level + JOB_LOW = 2 + // high level + JOB_HIGH = 4 +) + +func getBuffer(l int) (buf []byte) { + // TODO add byte buffer pool + buf = make([]byte, l) + return +} diff --git a/client/common_test.go b/client/common_test.go new file mode 100644 index 0000000..fa8ded9 --- /dev/null +++ b/client/common_test.go @@ -0,0 +1,48 @@ +package client + +import ( + "bytes" + "testing" +) + +var ( + testCase = map[uint32][4]byte{ + 0: [...]byte{0, 0, 0, 0}, + 1: [...]byte{0, 0, 0, 1}, + 256: [...]byte{0, 0, 1, 0}, + 256 * 256: [...]byte{0, 1, 0, 0}, + 256 * 256 * 256: [...]byte{1, 0, 0, 0}, + 256*256*256 + 256*256 + 256 + 1: [...]byte{1, 1, 1, 1}, + 4294967295: [...]byte{0xFF, 0xFF, 0xFF, 0xFF}, + } +) + +func TestUint32ToBytes(t *testing.T) { + for k, v := range testCase { + b := Uint32ToBytes(k) + if bytes.Compare(b[:], v[:]) != 0 { + t.Errorf("%v was expected, but %v was got", v, b) + } + } +} + +func TestBytesToUint32s(t *testing.T) { + for k, v := range testCase { + u := BytesToUint32([4]byte(v)) + if u != k { + t.Errorf("%v was expected, but %v was got", k, u) + } + } +} + +func BenchmarkByteToUnit32(b *testing.B) { + for i := 0; i < b.N; i++ { + BytesToUint32([4]byte{0xF, 0xF, 0xF, 0xF}) + } +} + +func BenchmarkUint32ToByte(b *testing.B) { + for i := 0; i < b.N; i++ { + Uint32ToBytes(123456) + } +} diff --git a/client/error.go b/client/error.go new file mode 100644 index 0000000..8c77616 --- /dev/null +++ b/client/error.go @@ -0,0 +1,49 @@ +// Copyright 2011 - 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "bytes" + "errors" + "fmt" + "syscall" + "strconv" +) + +var ( + ErrJobTimeOut = errors.New("Do a job time out") + ErrInvalidData = errors.New("Invalid data") + ErrWorkWarning = errors.New("Work warning") + ErrWorkFail = errors.New("Work fail") + ErrWorkException = errors.New("Work exeption") + ErrDataType = errors.New("Invalid data type") + ErrOutOfCap = errors.New("Out of the capability") + ErrNotConn = errors.New("Did not connect to job server") + ErrFuncNotFound = errors.New("The function was not found") + ErrConnection = errors.New("Connection error") + ErrNoActiveAgent = errors.New("No active agent") + ErrTimeOut = errors.New("Executing time out") + ErrUnknown = errors.New("Unknown error") + ErrConnClosed = errors.New("Connection closed") +) + +func DisablePanic() { recover() } + +// Extract the error message +func GetError(data []byte) (eno syscall.Errno, err error) { + rel := bytes.SplitN(data, []byte{'\x00'}, 2) + if len(rel) != 2 { + err = fmt.Errorf("Not a error data: %V", data) + return + } + var n uint64 + if n, err = strconv.ParseUint(string(rel[0]), 10, 0); err != nil { + return + } + eno = syscall.Errno(n) + err = errors.New(string(rel[1])) + return +} diff --git a/client/handler.go b/client/handler.go index 9074d4a..cee4cdd 100644 --- a/client/handler.go +++ b/client/handler.go @@ -1,4 +1,10 @@ package client -// Job handler -type JobHandler func(*Job) +// Response handler +type ResponseHandler func(*response) +// Error handler +type ErrorHandler func(error) + +// Status handler +// handle, known, running, numerator, denominator +type StatusHandler func(string, bool, bool, uint64, uint64) diff --git a/client/id.go b/client/id.go index d597168..0fe8ab7 100644 --- a/client/id.go +++ b/client/id.go @@ -1,35 +1,35 @@ package client import ( - "strconv" - "labix.org/v2/mgo/bson" - "github.com/mikespook/golib/autoinc" + "github.com/mikespook/golib/autoinc" + "labix.org/v2/mgo/bson" + "strconv" ) type IdGenerator interface { - Id() string + Id() string } // ObjectId -type objectId struct {} +type objectId struct{} func (id *objectId) Id() string { - return bson.NewObjectId().Hex() + return bson.NewObjectId().Hex() } func NewObjectId() IdGenerator { - return &objectId{} + return &objectId{} } // AutoIncId type autoincId struct { - *autoinc.AutoInc + *autoinc.AutoInc } func (id *autoincId) Id() string { - return strconv.Itoa(id.AutoInc.Id()) + return strconv.Itoa(id.AutoInc.Id()) } func NewAutoIncId() IdGenerator { - return &autoincId{autoinc.New(1, 1)} + return &autoincId{autoinc.New(1, 1)} } diff --git a/client/job.go b/client/job.go deleted file mode 100644 index 2402a58..0000000 --- a/client/job.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2011 Xing Xing . -// All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package client - -import ( - "bytes" - "github.com/mikespook/gearman-go/common" -) - -const ( - // Job type - // JOB_NORMAL | JOB_BG means a normal level job run in background - // normal level - JOB_NORMAL = 0 - // background job - JOB_BG = 1 - // low level - JOB_LOW = 2 - // high level - JOB_HIGH = 4 -) - -// An error handler -type ErrorHandler func(error) - -// Client side job -type Job struct { - Data []byte - Handle, UniqueId string - magicCode, DataType uint32 -} - -// Create a new job -func newJob(magiccode, datatype uint32, data []byte) (job *Job) { - return &Job{magicCode: magiccode, - DataType: datatype, - Data: data} -} - -// Decode a job from byte slice -func decodeJob(data []byte) (job *Job, err error) { - if len(data) < 12 { - return nil, common.Errorf("Invalid data: %V", data) - } - datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) - if len(data[12:]) != int(l) { - return nil, common.Errorf("Invalid data: %V", data) - } - data = data[12:] - var handle string - switch datatype { - case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, - common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION: - i := bytes.IndexByte(data, '\x00') - if i != -1 { - handle = string(data[:i]) - data = data[i + 1:] - } - } - - return &Job{magicCode: common.RES, - DataType: datatype, - Data: data, - Handle: handle}, nil -} - -// Encode a job to byte slice -func (job *Job) Encode() (data []byte) { - l := len(job.Data) - tl := l + 12 - data = make([]byte, tl) - - magiccode := common.Uint32ToBytes(job.magicCode) - datatype := common.Uint32ToBytes(job.DataType) - datalength := common.Uint32ToBytes(uint32(l)) - - for i := 0; i < tl; i ++ { - switch { - case i < 4: - data[i] = magiccode[i] - case i < 8: - data[i] = datatype[i - 4] - case i < 12: - data[i] = datalength[i - 8] - default: - data[i] = job.Data[i - 12] - } - } - // Alternative - /* - data = append(data, magiccode[:] ...) - data = append(data, datatype[:] ...) - data = append(data, datalength[:] ...) - data = append(data, job.Data ...) - */ - return -} - -// Extract the job's result. -func (job *Job) Result() (data []byte, err error) { - switch job.DataType { - case common.WORK_FAIL: - job.Handle = string(job.Data) - return nil, common.ErrWorkFail - case common.WORK_EXCEPTION: - err = common.ErrWorkException - fallthrough - case common.WORK_COMPLETE: - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - return nil, common.Errorf("Invalid data: %V", job.Data) - } - job.Handle = string(s[0]) - data = s[1] - default: - err = common.ErrDataType - } - return -} - -// Extract the job's update -func (job *Job) Update() (data []byte, err error) { - if job.DataType != common.WORK_DATA && job.DataType != common.WORK_WARNING { - err = common.ErrDataType - return - } - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = common.ErrInvalidData - return - } - if job.DataType == common.WORK_WARNING { - err = common.ErrWorkWarning - } - job.Handle = string(s[0]) - data = s[1] - return -} diff --git a/client/pool.go b/client/pool.go index ed48734..8bcab3e 100644 --- a/client/pool.go +++ b/client/pool.go @@ -6,157 +6,154 @@ package client import ( - "sync" - "time" - "errors" - "math/rand" - "github.com/mikespook/gearman-go/common" + "errors" + "math/rand" + "sync" ) const ( - PoolSize = 10 + PoolSize = 10 ) var ( - ErrNotFound = errors.New("Server Not Found") + ErrNotFound = errors.New("Server Not Found") ) type poolClient struct { - *Client - Rate int + *Client + Rate int } type SelectionHandler func(map[string]*poolClient, string) string func SelectWithRate(pool map[string]*poolClient, -last string) (addr string) { - total := 0 - for _, item := range pool { - total += item.Rate - if rand.Intn(total) < item.Rate { - return item.addr - } - } - return last + last string) (addr string) { + total := 0 + for _, item := range pool { + total += item.Rate + if rand.Intn(total) < item.Rate { + return item.addr + } + } + return last } func SelectRandom(pool map[string]*poolClient, -last string) (addr string) { - r := rand.Intn(len(pool)) - i := 0 - for k, _ := range pool { - if r == i { - return k - } - i ++ - } - return last + last string) (addr string) { + r := rand.Intn(len(pool)) + i := 0 + for k, _ := range pool { + if r == i { + return k + } + i++ + } + return last } - - type Pool struct { - SelectionHandler SelectionHandler - ErrHandler common.ErrorHandler + SelectionHandler SelectionHandler + ErrorHandler ErrorHandler - clients map[string]*poolClient - last string + clients map[string]*poolClient + last string - mutex sync.Mutex + mutex sync.Mutex } // Create a new pool. func NewPool() (pool *Pool) { - return &Pool{ - clients: make(map[string]*poolClient, PoolSize), - SelectionHandler: SelectWithRate, - } + return &Pool{ + clients: make(map[string]*poolClient, PoolSize), + SelectionHandler: SelectWithRate, + } } // Add a server with rate. -func (pool *Pool) Add(addr string, rate int) (err error) { - pool.mutex.Lock() - defer pool.mutex.Unlock() - var item *poolClient - var ok bool - if item, ok = pool.clients[addr]; ok { - item.Rate = rate - } else { - var client *Client - client, err = New(addr) - item = &poolClient{Client: client, Rate: rate} - pool.clients[addr] = item - } - return +func (pool *Pool) Add(net, addr string, rate int) (err error) { + pool.mutex.Lock() + defer pool.mutex.Unlock() + var item *poolClient + var ok bool + if item, ok = pool.clients[addr]; ok { + item.Rate = rate + } else { + var client *Client + client, err = New(net, addr) + item = &poolClient{Client: client, Rate: rate} + pool.clients[addr] = item + } + return } // Remove a server. func (pool *Pool) Remove(addr string) { - pool.mutex.Lock() - defer pool.mutex.Unlock() - delete(pool.clients, addr) + pool.mutex.Lock() + defer pool.mutex.Unlock() + delete(pool.clients, addr) } func (pool *Pool) Do(funcname string, data []byte, -flag byte, h JobHandler) (addr, handle string) { - client := pool.selectServer() - handle = client.Do(funcname, data, flag, h) - addr = client.addr - return + flag byte, h ResponseHandler) (addr string, handle []byte) { + client := pool.selectServer() + handle = client.Do(funcname, data, flag, h) + addr = client.addr + return } func (pool *Pool) DoBg(funcname string, data []byte, -flag byte) (addr, handle string) { - client := pool.selectServer() - handle = client.DoBg(funcname, data, flag) - addr = client.addr - return + flag byte) (addr string, handle []byte) { + client := pool.selectServer() + handle = client.DoBg(funcname, data, flag) + addr = client.addr + return } // Get job status from job server. // !!!Not fully tested.!!! -func (pool *Pool) Status(addr, handle string, timeout time.Duration) (status *Status, err error) { - if client, ok := pool.clients[addr]; ok { - status, err = client.Status(handle, timeout) - } else { - err = ErrNotFound - } - return +func (pool *Pool) Status(addr string, handle []byte, h ResponseHandler) (err error) { + if client, ok := pool.clients[addr]; ok { + err = client.Status(handle, h) + } else { + err = ErrNotFound + } + return } // Send a something out, get the samething back. -func (pool *Pool) Echo(addr string, data []byte, timeout time.Duration) (r []byte, err error) { - var client *poolClient - if addr == "" { - client = pool.selectServer() - } else { - var ok bool - if client, ok = pool.clients[addr]; !ok { - err = ErrNotFound - return - } - } - r, err = client.Echo(data, timeout) - return +func (pool *Pool) Echo(addr string, data []byte, h ResponseHandler) (r []byte, err error) { + var client *poolClient + if addr == "" { + client = pool.selectServer() + } else { + var ok bool + if client, ok = pool.clients[addr]; !ok { + err = ErrNotFound + return + } + } + err = client.Echo(data, h) + return } // Close func (pool *Pool) Close() (err map[string]error) { - err = make(map[string]error) - for _, c := range pool.clients { - err[c.addr] = c.Close() - } - return + err = make(map[string]error) + for _, c := range pool.clients { + err[c.addr] = c.Close() + } + return } +// selecting server func (pool *Pool) selectServer() (client *poolClient) { - for client == nil { - addr := pool.SelectionHandler(pool.clients, pool.last) - var ok bool - if client, ok = pool.clients[addr]; ok { - pool.last = addr - break - } - } - return + for client == nil { + addr := pool.SelectionHandler(pool.clients, pool.last) + var ok bool + if client, ok = pool.clients[addr]; ok { + pool.last = addr + break + } + } + return } diff --git a/client/pool_test.go b/client/pool_test.go index 2e5c284..778c43c 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -1,107 +1,107 @@ package client import ( - "time" - "testing" + "testing" + "time" ) var ( - pool = NewPool() + pool = NewPool() ) func TestPoolAdd(t *testing.T) { - t.Log("Add servers") - if err := pool.Add("127.0.0.1:4730", 1); err != nil { - t.Error(err) - } - if err := pool.Add("127.0.0.1:4730", 1); err != nil { - t.Error(err) - } - if len(pool.clients) != 2 { - t.Errorf("2 servers expected, %d got.", len(pool.clients)) - } + t.Log("Add servers") + if err := pool.Add("127.0.0.1:4730", 1); err != nil { + t.Error(err) + } + if err := pool.Add("127.0.0.1:4730", 1); err != nil { + t.Error(err) + } + if len(pool.clients) != 2 { + t.Errorf("2 servers expected, %d got.", len(pool.clients)) + } } func TestPoolEcho(t *testing.T) { - echo, err := pool.Echo("", []byte("Hello pool"), time.Second) - if err != nil { - t.Error(err) - return - } - if string(echo) != "Hello pool" { - t.Errorf("Invalid echo data: %s", echo) - return - } + echo, err := pool.Echo("", []byte("Hello pool"), time.Second) + if err != nil { + t.Error(err) + return + } + if string(echo) != "Hello pool" { + t.Errorf("Invalid echo data: %s", echo) + return + } - _, err = pool.Echo("not exists", []byte("Hello pool"), time.Second) - if err != ErrNotFound { - t.Errorf("ErrNotFound expected, got %s", err) - } + _, err = pool.Echo("not exists", []byte("Hello pool"), time.Second) + if err != ErrNotFound { + t.Errorf("ErrNotFound expected, got %s", err) + } } func TestPoolDoBg(t *testing.T) { - if addr, handle := pool.DoBg("ToUpper", []byte("abcdef"), - JOB_LOW); handle == "" { - t.Error("Handle is empty.") - } else { - t.Log(addr, handle) - } + if addr, handle := pool.DoBg("ToUpper", []byte("abcdef"), + JOB_LOW); handle == "" { + t.Error("Handle is empty.") + } else { + t.Log(addr, handle) + } } func TestPoolDo(t *testing.T) { - jobHandler := func(job *Job) { - str := string(job.Data) - if str == "ABCDEF" { - t.Log(str) - } else { - t.Errorf("Invalid data: %s", job.Data) - } - return - } - if addr, handle := pool.Do("ToUpper", []byte("abcdef"), - JOB_LOW, jobHandler); handle == "" { - t.Error("Handle is empty.") - } else { - t.Log(addr, handle) - } + jobHandler := func(job *Job) { + str := string(job.Data) + if str == "ABCDEF" { + t.Log(str) + } else { + t.Errorf("Invalid data: %s", job.Data) + } + return + } + if addr, handle := pool.Do("ToUpper", []byte("abcdef"), + JOB_LOW, jobHandler); handle == "" { + t.Error("Handle is empty.") + } else { + t.Log(addr, handle) + } } func TestPoolStatus(t *testing.T) { - s1, err := pool.Status("127.0.0.1:4730", "handle not exists", time.Second) - if err != nil { - t.Error(err) - return - } - if s1.Known { - t.Errorf("The job (%s) shouldn't be known.", s1.Handle) - } - if s1.Running { - t.Errorf("The job (%s) shouldn't be running.", s1.Handle) - } + s1, err := pool.Status("127.0.0.1:4730", "handle not exists", time.Second) + if err != nil { + t.Error(err) + return + } + if s1.Known { + t.Errorf("The job (%s) shouldn't be known.", s1.Handle) + } + if s1.Running { + t.Errorf("The job (%s) shouldn't be running.", s1.Handle) + } - addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil); - s2, err := pool.Status(addr, handle, time.Second) - if err != nil { - t.Error(err) - return - } + addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil) + s2, err := pool.Status(addr, handle, time.Second) + if err != nil { + t.Error(err) + return + } - if !s2.Known { - t.Errorf("The job (%s) should be known.", s2.Handle) - } - if s2.Running { - t.Errorf("The job (%s) shouldn't be running.", s2.Handle) - } + if !s2.Known { + t.Errorf("The job (%s) should be known.", s2.Handle) + } + if s2.Running { + t.Errorf("The job (%s) shouldn't be running.", s2.Handle) + } - _, err = pool.Status("not exists", "not exists", time.Second) - if err != ErrNotFound { - t.Error(err) - } + _, err = pool.Status("not exists", "not exists", time.Second) + if err != ErrNotFound { + t.Error(err) + } } func TestPoolClose(t *testing.T) { - return - if err := pool.Close(); err != nil { - t.Error(err) - } + return + if err := pool.Close(); err != nil { + t.Error(err) + } } diff --git a/client/request.go b/client/request.go new file mode 100644 index 0000000..28f925c --- /dev/null +++ b/client/request.go @@ -0,0 +1,47 @@ +// Copyright 2013 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "encoding/binary" +) + +// request +type request struct { + DataType uint32 + Data []byte +} + +// Encode a Request to byte slice +func (req *request) Encode() (data []byte) { + l := len(req.Data) // length of data + tl := l + 12 // add 12 bytes head + data = getBuffer(tl) + copy(data[:4], REQ_STR) + binary.BigEndian.PutUint32(data[4:8], req.DataType) + binary.BigEndian.PutUint32(data[8:12], uint32(l)) + copy(data[12:], req.Data) + return +} + +func getRequest() (req *request) { + // TODO add a pool + req = &request{} + return +} + +func getJob(funcname, id string, data []byte) (req *request) { + req = getRequest() + a := len(funcname) + b := len(id) + c := len(data) + l := a + b + c + 2 + req.Data = getBuffer(l) + copy(req.Data[0:a], funcname) + copy(req.Data[a+1:a+b+1], []byte(id)) + copy(req.Data[a+b+1:a+b+c+1], data) + return +} diff --git a/client/response.go b/client/response.go new file mode 100644 index 0000000..b98ec81 --- /dev/null +++ b/client/response.go @@ -0,0 +1,129 @@ +// Copyright 2011 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "fmt" + "bytes" + "strconv" + "encoding/binary" +) + +// response +type response struct { + DataType uint32 + Data, Handle []byte + UID string +} + +// Extract the Response's result. +// if data == nil, err != nil, then worker failing to execute job +// if data != nil, err != nil, then worker has a exception +// if data != nil, err == nil, then worker complate job +// after calling this method, the Response.Handle will be filled +func (resp *response) Result() (data []byte, err error) { + switch resp.DataType { + case WORK_FAIL: + resp.Handle = resp.Data + err = ErrWorkFail + return + case WORK_EXCEPTION: + err = ErrWorkException + fallthrough + case WORK_COMPLETE: + s := bytes.SplitN(resp.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + err = fmt.Errorf("Invalid data: %V", resp.Data) + return + } + resp.Handle = s[0] + data = s[1] + default: + err = ErrDataType + } + return +} + +// Extract the job's update +func (resp *response) Update() (data []byte, err error) { + if resp.DataType != WORK_DATA && + resp.DataType != WORK_WARNING { + err = ErrDataType + return + } + s := bytes.SplitN(resp.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + err = ErrInvalidData + return + } + if resp.DataType == WORK_WARNING { + err = ErrWorkWarning + } + resp.Handle = s[0] + data = s[1] + return +} + +// Decode a job from byte slice +func decodeResponse(data []byte) (resp *response, l int, err error) { + if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes + err = fmt.Errorf("Invalid data: %V", data) + return + } + dl := int(binary.BigEndian.Uint32(data[8:12])) + dt := data[MIN_PACKET_LEN:dl+MIN_PACKET_LEN] + if len(dt) != int(dl) { // length not equal + err = fmt.Errorf("Invalid data: %V", data) + return + } + resp = getResponse() + resp.DataType = binary.BigEndian.Uint32(data[4:8]) + switch resp.DataType { + case WORK_DATA, WORK_WARNING, WORK_STATUS, + WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: + s := bytes.SplitN(data, []byte{'\x00'}, 2) + if len(s) >= 2 { + resp.Handle = s[0] + resp.Data = s[1] + } else { + err = fmt.Errorf("Invalid data: %V", data) + return + } + } + l = len(resp.Data) + MIN_PACKET_LEN + return +} + +// status handler +func (resp *response) Status() (status *Status, err error) { + data := bytes.SplitN(resp.Data, []byte{'\x00'}, 5) + if len(data) != 5 { + err = fmt.Errorf("Invalid data: %V", resp.Data) + return + } + status = &Status{} + status.Handle = data[0] + status.Known = (data[1][0] == '1') + status.Running = (data[2][0] == '1') + status.Numerator, err = strconv.ParseUint(string(data[3]), 10, 0) + if err != nil { + err = fmt.Errorf("Invalid Integer: %s", data[3]) + return + } + status.Denominator, err = strconv.ParseUint(string(data[4]), 10, 0) + if err != nil { + err = fmt.Errorf("Invalid Integer: %s", data[4]) + return + } + return +} + + +func getResponse() (resp *response) { + // TODO add a pool + resp = &response{} + return +} diff --git a/client/status.go b/client/status.go index 5671c60..f656203 100644 --- a/client/status.go +++ b/client/status.go @@ -1,7 +1,7 @@ package client type Status struct { - Handle string - Known, Running bool - Numerator, Denominator uint64 + Handle []byte + Known, Running bool + Numerator, Denominator uint64 }