From 7614c2678a8fcde805257eece6a9b12729efc68d Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 22 May 2012 20:05:39 +0800 Subject: [PATCH] make a new branch, refactoring codes --HG-- branch : 0.1 rename : gearman/client/client.go => client/client.go rename : gearman/client/client_test.go => client/client_test.go rename : gearman/client/clientjob.go => client/job.go rename : gearman/gearman.go => common/gearman.go rename : gearman/worker/jobagent.go => worker/jobagent.go rename : gearman/worker/worker.go => worker/worker.go rename : gearman/worker/worker_test.go => worker/worker_test.go rename : gearman/worker/workerjob.go => worker/workerjob.go --- client/client.go | 263 ++++++++++++++++++++ client/client_test.go | 43 ++++ client/job.go | 105 ++++++++ common/error.go | 47 ++++ {gearman => common}/gearman.go | 59 +---- example/client.go | 48 ++-- gearman.go | 16 +- gearman/client/client.go | 278 ---------------------- gearman/client/client_test.go | 41 ---- gearman/client/clientjob.go | 98 -------- {gearman/worker => worker}/jobagent.go | 2 +- {gearman/worker => worker}/worker.go | 2 +- {gearman/worker => worker}/worker_test.go | 0 {gearman/worker => worker}/workerjob.go | 2 +- 14 files changed, 503 insertions(+), 501 deletions(-) create mode 100644 client/client.go create mode 100644 client/client_test.go create mode 100644 client/job.go create mode 100644 common/error.go rename {gearman => common}/gearman.go (50%) delete mode 100644 gearman/client/client.go delete mode 100644 gearman/client/client_test.go delete mode 100644 gearman/client/clientjob.go rename {gearman/worker => worker}/jobagent.go (98%) rename {gearman/worker => worker}/worker.go (99%) rename {gearman/worker => worker}/worker_test.go (100%) rename {gearman/worker => worker}/workerjob.go (98%) diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..e19f6b2 --- /dev/null +++ b/client/client.go @@ -0,0 +1,263 @@ +// Copyright 2011 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "bytes" + "io" + "net" + "strconv" + "bitbucket.org/mikespook/golib/autoinc" + "bitbucket.org/mikespook/gearman-go/common" +) + +// Job handler +type JobHandler func(*Job) error +// Status handler +// handle, known, running, numerator, denominator +type StatusHandler func(string, bool, bool, uint64, uint64) + +/* +The client side api for gearman + +usage: + c := client.New("tcp4", "127.0.0.1:4730") + handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) + +*/ +type Client struct { + ErrHandler common.ErrorHandler + JobHandler JobHandler + StatusHandler StatusHandler + + in chan []byte + out chan *Job + jobCreated chan *Job + conn net.Conn + ai *autoinc.AutoInc +} + +// Create a new client. +// Connect to "addr" through "network" +// Eg. +// client, err := client.New("tcp4", "127.0.0.1:4730") +func New(network, addr string) (client *Client, err error) { + conn, err := net.Dial(network, addr) + if err != nil { + return + } + client = &Client{ + jobCreated: make(chan *Job), + in: make(chan []byte, common.QUEUE_SIZE), + out: make(chan *Job, common.QUEUE_SIZE), + conn: conn, + ai: autoinc.New(0, 1), + } + go client.outLoop() + go client.inLoop() + return +} + +// out loop +func (client *Client) outLoop() { + ok := true + for ok { + if job, ok := <-client.out; ok { + client.write(job.Encode()) + } + } +} + +// in loop +func (client *Client) inLoop() { + for { + rel, err := client.read() + if err != nil { + client.err(err) + continue + } + job, err := decodeJob(rel) + if err != nil { + client.err(err) + continue + } + switch job.DataType { + case common.ERROR: + _, err := common.GetError(job.Data) + client.err(err) + case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, + common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION, + common.ECHO_RES: + client.handleJob(job) + case common.JOB_CREATED: + client.jobCreated <- job + case common.STATUS_RES: + client.handleStatus(job) + } + } +} + +// inner read +func (client *Client) read() (data []byte, err error) { + if len(client.in) > 0 { + // incoming queue is not empty + data = <-client.in + } else { + // empty queue, read data from socket + for { + buf := make([]byte, common.BUFFER_SIZE) + var n int + if n, err = client.conn.Read(buf); err != nil { + if err == io.EOF && n == 0 { + break + } + return + } + data = append(data, buf[0:n]...) + if n < common.BUFFER_SIZE { + break + } + } + } + // split package + start, end := 0, 4 + tl := len(data) + for i := 0; i < tl; i++ { + if string(data[start:end]) == common.RES_STR { + l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + total := l + 12 + if total == tl { + return + } else { + client.in <- data[total:] + data = data[:total] + return + } + } else { + start++ + end++ + } + } + return nil, common.Errorf("Invalid data: %V", data) +} + +// error handler +func (client *Client) err (e error) { + if client.ErrHandler != nil { + client.ErrHandler(e) + } +} + +// job handler +func (client *Client) handleJob(job *Job) { + if client.JobHandler != nil { + if err := client.JobHandler(job); err != nil { + client.err(err) + } + } +} + +// status handler +func (client *Client) handleStatus(job *Job) { + if client.StatusHandler != nil { + data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) + if len(data) != 5 { + client.err(common.Errorf("Invalid data: %V", job.Data)) + return + } + handle := string(data[0]) + known := (data[1][0] == '1') + running := (data[2][0] == '1') + numerator, err := strconv.ParseUint(string(data[3][0]), 10, 0) + if err != nil { + client.err(common.Errorf("Invalid handle: %s", data[3][0])) + return + } + denominator, err := strconv.ParseUint(string(data[4][0]), 10, 0) + if err != nil { + client.err(common.Errorf("Invalid handle: %s", data[4][0])) + return + } + client.StatusHandler(handle, known, running, numerator, denominator) + } +} + +// Do the function. +// funcname is a string with function name. +// data is encoding to byte array. +// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, +// and if it is background job: JOB_BG. +// JOB_LOW | JOB_BG means the job is running with low level in background. +func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { + var datatype uint32 + if flag & common.JOB_LOW == common.JOB_LOW { + if flag & common.JOB_BG == common.JOB_BG { + datatype = common.SUBMIT_JOB_LOW_BG + } else { + datatype = common.SUBMIT_JOB_LOW + } + } else if flag & common.JOB_HIGH == common.JOB_HIGH { + if flag & common.JOB_BG == common.JOB_BG { + datatype = common.SUBMIT_JOB_HIGH_BG + } else { + datatype = common.SUBMIT_JOB_HIGH + } + } else if flag & common.JOB_BG == common.JOB_BG { + datatype = common.SUBMIT_JOB_BG + } else { + datatype = common.SUBMIT_JOB + } + + uid := strconv.Itoa(int(client.ai.Id())) + l := len(funcname) + len(uid) + len(data) + 2 + rel := make([]byte, l) + rel = append(rel, []byte(funcname)...) // len(funcname) + rel = append(rel, '\x00') // 1 Byte + rel = append(rel, []byte(uid)...) // len(uid) + rel = append(rel, '\x00') // 1 Byte + rel = append(rel, data...) // len(data) + client.writeJob(newJob(common.REQ, datatype, rel)) + + // Waiting for JOB_CREATED + job := <-client.jobCreated + return string(job.Data), nil +} + +// Get job status from job server. +// !!!Not fully tested.!!! +func (client *Client) Status(handle string) { + job := newJob(common.REQ, common.GET_STATUS, []byte(handle)) + client.writeJob(job) +} + +// Send a something out, get the samething back. +func (client *Client) Echo(data []byte) { + client.writeJob(newJob(common.REQ, common.ECHO_REQ, data)) +} + +// Send the job to job server. +func (client *Client) writeJob(job *Job) { + client.out <- job +} + +// Internal write +func (client *Client) write(buf []byte) (err error) { + var n int + for i := 0; i < len(buf); i += n { + n, err = client.conn.Write(buf[i:]) + if err != nil { + return + } + } + return +} + +// Close +func (client *Client) Close() (err error) { + close(client.jobCreated) + close(client.in) + return client.conn.Close(); +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..a85832f --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,43 @@ +package client + +import ( + "bitbucket.org/mikespook/gearman-go/common" + "testing" +) + +var client *Client + +func TestClientAddServer(t *testing.T) { + t.Log("Add local server 127.0.0.1:4730") + var err error + if client, err = New("tcp4", "127.0.0.1:4730"); err != nil { + t.Error(err) + } +} + +func TestClientEcho(t *testing.T) { + client.JobHandler = func(job *Job) error { + echo := string(job.Data) + if echo == "Hello world" { + t.Log(echo) + } else { + t.Errorf("Invalid echo data: %s", job.Data) + } + return nil + } + client.Echo([]byte("Hello world")) +} + +func TestClientDo(t *testing.T) { + if handle, err := client.Do("ToUpper", []byte("abcdef"), common.JOB_LOW|common.JOB_BG); err != nil { + t.Error(err) + } else { + t.Log(handle) + } +} + +func TestClientClose(t *testing.T) { + if err := client.Close(); err != nil { + t.Error(err) + } +} diff --git a/client/job.go b/client/job.go new file mode 100644 index 0000000..c381464 --- /dev/null +++ b/client/job.go @@ -0,0 +1,105 @@ +// Copyright 2011 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "bytes" + "bitbucket.org/mikespook/gearman-go/common" +) +// An error handler +type ErrorHandler func(error) + +// Client side job +type Job struct { + Data []byte + Handle, UniqueId string + magicCode, DataType uint32 +} + +// Create a new job +func newJob(magiccode, datatype uint32, data []byte) (job *Job) { + return &Job{magicCode: magiccode, + DataType: datatype, + Data: data} +} + +// Decode a job from byte slice +func decodeJob(data []byte) (job *Job, err error) { + if len(data) < 12 { + return nil, common.Errorf("Invalid data: %V", data) + } + datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + return nil, common.Errorf("Invalid data: %V", data) + } + data = data[12:] + return newJob(common.RES, datatype, data), nil +} + +// Encode a job to byte slice +func (job *Job) Encode() (data []byte) { + l := len(job.Data) + 12 + data = make([]byte, l) + + magiccode := common.Uint32ToBytes(job.magicCode) + datatype := common.Uint32ToBytes(job.DataType) + datalength := common.Uint32ToBytes(uint32(l)) + for i := 0; i < l; i ++ { + switch { + case i < 4: + data[i] = magiccode[i] + case i < 8: + data[i] = datatype[i - 4] + case i < 12: + data[i] = datalength[i - 8] + default: + data[i] = job.Data[i - 12] + } + } + return +} + +// Extract the job's result. +func (job *Job) Result() (data []byte, err error) { + switch job.DataType { + case common.WORK_FAIL: + job.Handle = string(job.Data) + return nil, common.ErrWorkFail + case common.WORK_EXCEPTION: + err = common.ErrWorkException + fallthrough + case common.WORK_COMPLETE: + s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + return nil, common.Errorf("Invalid data: %V", job.Data) + } + job.Handle = string(s[0]) + data = s[1] + default: + err = common.ErrDataType + } + return +} + +// Extract the job's update +func (job *Job) Update() (data []byte, err error) { + if job.DataType != common.WORK_DATA && job.DataType != common.WORK_WARNING { + err = common.ErrDataType + return + } + s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + err = common.ErrInvalidData + return + } + if job.DataType == common.WORK_WARNING { + err = common.ErrWorkWarning + } + job.Handle = string(s[0]) + data = s[1] + return +} diff --git a/common/error.go b/common/error.go new file mode 100644 index 0000000..25ca67e --- /dev/null +++ b/common/error.go @@ -0,0 +1,47 @@ +// Copyright 2011 - 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package common + +import ( + "fmt" + "bytes" + "errors" + "syscall" +) + +var ( + ErrInvalidData = errors.New("Invalid data.") + ErrWorkWarning = errors.New("Work warning.") + ErrWorkFail = errors.New("Work fail.") + ErrWorkException = errors.New("Work exeption.") + ErrDataType = errors.New("Invalid data type.") + ErrOutOfCap = errors.New("Out of the capability.") + ErrNotConn = errors.New("Did not connect to job server.") + ErrFuncNotFound = errors.New("The function was not found.") +) + +// Extract the error message +func GetError(data []byte) (eno syscall.Errno, err error) { + rel := bytes.SplitN(data, []byte{'\x00'}, 2) + if len(rel) != 2 { + err = Errorf("Not a error data: %V", data) + return + } + l := len(rel[0]) + eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) + err = errors.New(string(rel[1])) + return +} + +// Get a formated error +func Errorf(format string, msg ... interface{}) error { + return errors.New(fmt.Sprintf(format, msg ... )) +} + +// An error handler +type ErrorHandler func(error) + + diff --git a/gearman/gearman.go b/common/gearman.go similarity index 50% rename from gearman/gearman.go rename to common/gearman.go index 636c5ff..1bdd6f7 100644 --- a/gearman/gearman.go +++ b/common/gearman.go @@ -1,30 +1,17 @@ -// Copyright 2011 Xing Xing All rights reserved. +// Copyright 2011 - 2012 Xing Xing . +// All rights reserved. // Use of this source code is governed by a MIT // license that can be found in the LICENSE file. -/* -This module is Gearman API for golang. -The protocol was implemented by native way. -*/ - -package gearman - -import ( - "bytes" - "errors" - "syscall" -) +package common const ( - // tcp4 is tested. You can modify this to 'tcp' for both ipv4 and ipv6, - // or 'tcp6' only for ipv6. - TCP = "tcp4" // the number limited for job servers. WORKER_SERVER_CAP = 32 // the number limited for functions. WORKER_FUNCTION_CAP = 512 // queue size - QUEUE_CAP = 512 + QUEUE_SIZE = 512 // read buffer size BUFFER_SIZE = 1024 @@ -80,44 +67,14 @@ const ( JOB_HIGH = 4 ) -var ( - ErrIsNotErr = errors.New("The input is not a error data.") - ErrInvalidData = errors.New("Invalid data.") - ErrWorkWarning = errors.New("Work warning.") - ErrWorkFail = errors.New("Work fail.") - ErrWorkException = errors.New("Work exeption.") - ErrDataType = errors.New("Invalid data type.") - ErrOutOfCap = errors.New("Out of the capability.") - ErrNotConn = errors.New("Did not connect to job server.") - ErrFuncNotFound = errors.New("The function was not found.") -) - -// Extract the error message -func GetError(data []byte) (eno syscall.Errno, err error) { - rel := bytes.SplitN(data, []byte{'\x00'}, 2) - if len(rel) != 2 { - err = ErrIsNotErr - return - } - l := len(rel[0]) - eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) - err = errors.New(string(rel[1])) - return -} - // Decode [4]byte to uint32 func BytesToUint32(buf [4]byte) uint32 { - return uint32(buf[0])<<24 + - uint32(buf[1])<<16 + - uint32(buf[2])<<8 + + return uint32(buf[0])<<24 + uint32(buf[1])<<16 + uint32(buf[2])<<8 + uint32(buf[3]) } // Encode uint32 to [4]byte -func Uint32ToBytes(i uint32) (data [4]byte) { - data[0] = byte((i >> 24) & 0xff) - data[1] = byte((i >> 16) & 0xff) - data[2] = byte((i >> 8) & 0xff) - data[3] = byte(i & 0xff) - return +func Uint32ToBytes(i uint32) [4]byte { + return [4]byte{byte((i >> 24) & 0xff), byte((i >> 16) & 0xff), + byte((i >> 8) & 0xff), byte(i & 0xff),} } diff --git a/example/client.go b/example/client.go index ec8bd94..d406eef 100644 --- a/example/client.go +++ b/example/client.go @@ -1,46 +1,42 @@ package main import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bitbucket.org/mikespook/gearman-go/gearman/client" "log" + "sync" + "bitbucket.org/mikespook/gearman-go" + "bitbucket.org/mikespook/gearman-go/client" ) func main() { - client := client.New() - defer client.Close() - if err := client.AddServer("127.0.0.1:4730"); err != nil { + var wg sync.WaitGroup + + c, err := client.New("tcp4", "127.0.0.1:4730") + if err != nil { log.Fatalln(err) } + defer c.Close() echo := []byte("Hello\x00 world") - - if data, err := client.Echo(echo); err != nil { - log.Fatalln(string(data)) + c.JobHandler = func(job *client.Job) error { + log.Printf("%V", job) + wg.Done() + return nil } - handle, err := client.Do("ToUpper", echo, gearman.JOB_NORMAL) + wg.Add(1) + c.Echo(echo) + + handle, err := c.Do("ToUpper", echo, gearman.JOB_NORMAL) if err != nil { log.Fatalln(err) } else { log.Println(handle) - job := <-client.JobQueue - if data, err := job.Result(); err != nil { - log.Fatalln(err) - } else { - log.Println(string(data)) - } } - known, running, numerator, denominator, err := client.Status(handle) - if err != nil { - log.Fatalln(err) - } - if !known { - log.Println("Unknown") - } - if running { - log.Printf("%g%%\n", float32(numerator)*100/float32(denominator)) - } else { - log.Println("Not running") + c.StatusHandler = func(handle string, known, running bool, numerator, denominator uint64) { + log.Printf("%s: %b, %b, %d, %d", handle, known, running, numerator, denominator) + wg.Done() } + wg.Add(1) + c.Status(handle) + wg.Wait() } diff --git a/gearman.go b/gearman.go index 067831f..5c17914 100644 --- a/gearman.go +++ b/gearman.go @@ -1,4 +1,4 @@ -// Copyright 2012 Xing Xing All rights reserved. +// Copyright 2011 Xing Xing All rights reserved. // Use of this source code is governed by a MIT // license that can be found in the LICENSE file. @@ -9,7 +9,15 @@ The protocol was implemented by native way. package gearman -import ( - _ "bitbucket.org/mikespook/gearman-go/gearman/client" - _ "bitbucket.org/mikespook/gearman-go/gearman/worker" +const ( + // Job type + // JOB_NORMAL | JOB_BG means a normal level job run in background + // normal level + JOB_NORMAL = 0 + // background job + JOB_BG = 1 + // low level + JOB_LOW = 2 + // high level + JOB_HIGH = 4 ) diff --git a/gearman/client/client.go b/gearman/client/client.go deleted file mode 100644 index 156bb8f..0000000 --- a/gearman/client/client.go +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package client - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bytes" - "io" - "net" - "strconv" - "sync" -) - -/* -The client side api for gearman. - -usage: - client = NewClient() - client.AddServer("127.0.0.1:4730") - handle := client.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) - -*/ -type Client struct { - mutex sync.Mutex - conn net.Conn - incoming chan []byte - - JobQueue chan *ClientJob - UId uint32 -} - -// Create a new client. -func New() (client *Client) { - return &Client{ - JobQueue: make(chan *ClientJob, gearman.QUEUE_CAP), - incoming: make(chan []byte, gearman.QUEUE_CAP), - UId:1} -} - -// Add a server. -// In this version, one client connect to one job server. -// Sample is better. Plz do the load balancing by your self. -func (client *Client) AddServer(addr string) (err error) { - client.conn, err = net.Dial(gearman.TCP, addr) - if err != nil { - return - } - return -} - -// Internal read -func (client *Client) read() (data []byte, err error) { - if len(client.incoming) > 0 { - // incoming queue is not empty - data = <-client.incoming - } else { - // empty queue, read data from socket - for { - buf := make([]byte, gearman.BUFFER_SIZE) - var n int - if n, err = client.conn.Read(buf); err != nil { - if err == io.EOF && n == 0 { - break - } - return - } - data = append(data, buf[0:n]...) - if n < gearman.BUFFER_SIZE { - break - } - } - } - // split package - start, end := 0, 4 - tl := len(data) - for i := 0; i < tl; i++ { - if string(data[start:end]) == gearman.RES_STR { - l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) - total := l + 12 - if total == tl { - return - } else { - client.incoming <- data[total:] - data = data[:total] - return - } - } else { - start++ - end++ - } - } - err = gearman.ErrInvalidData - return -} - -// Read a job from job server. -// This function will return the job, and add it to the job queue. -func (client *Client) ReadJob() (job *ClientJob, err error) { - var rel []byte - if rel, err = client.read(); err != nil { - return - } - if job, err = DecodeClientJob(rel); err != nil { - return - } else { - switch job.DataType { - case gearman.ERROR: - _, err = gearman.GetError(job.Data) - return - case gearman.WORK_DATA, gearman.WORK_WARNING, gearman.WORK_STATUS, gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION: - client.JobQueue <- job - } - } - return -} - -// Do the function. -// funcname is a string with function name. -// data is encoding to byte array. -// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, -// and if it is background job: JOB_BG. -// JOB_LOW | JOB_BG means the job is running with low level in background. -func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { - var datatype uint32 - if flag&gearman.JOB_LOW == gearman.JOB_LOW { - if flag&gearman.JOB_BG == gearman.JOB_BG { - datatype = gearman.SUBMIT_JOB_LOW_BG - } else { - datatype = gearman.SUBMIT_JOB_LOW - } - } else if flag&gearman.JOB_HIGH == gearman.JOB_HIGH { - if flag&gearman.JOB_BG == gearman.JOB_BG { - datatype = gearman.SUBMIT_JOB_HIGH_BG - } else { - datatype = gearman.SUBMIT_JOB_HIGH - } - } else if flag&gearman.JOB_BG == gearman.JOB_BG { - datatype = gearman.SUBMIT_JOB_BG - } else { - datatype = gearman.SUBMIT_JOB - } - - rel := make([]byte, 0, 1024*64) - rel = append(rel, []byte(funcname)...) - rel = append(rel, '\x00') - client.mutex.Lock() - uid := strconv.Itoa(int(client.UId)) - client.UId++ - rel = append(rel, []byte(uid)...) - client.mutex.Unlock() - rel = append(rel, '\x00') - rel = append(rel, data...) - if err = client.WriteJob(NewClientJob(gearman.REQ, datatype, rel)); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(gearman.JOB_CREATED); err != nil { - return - } - handle = string(job.Data) - go func() { - if flag&gearman.JOB_BG != gearman.JOB_BG { - for { - if job, err = client.ReadJob(); err != nil { - return - } - switch job.DataType { - case gearman.WORK_DATA, gearman.WORK_WARNING: - case gearman.WORK_STATUS: - case gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION: - return - } - } - } - }() - return -} - -// Internal read last job -func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err error) { - for { - if job, err = client.ReadJob(); err != nil { - return - } - if job.DataType == datatype { - break - } - } - if job.DataType != datatype { - err = gearman.ErrDataType - } - return -} - -// Get job status from job server. -// !!!Not fully tested.!!! -func (client *Client) Status(handle string) (known, running bool, numerator, denominator uint64, err error) { - - if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.GET_STATUS, []byte(handle))); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(gearman.STATUS_RES); err != nil { - return - } - data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) - if len(data) != 5 { - err = gearman.ErrInvalidData - return - } - if handle != string(data[0]) { - err = gearman.ErrInvalidData - return - } - known = data[1][0] == '1' - running = data[2][0] == '1' - if numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0); err != nil { - return - } - if denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0); err != nil { - return - } - return -} - -// Send a something out, get the samething back. -func (client *Client) Echo(data []byte) (echo []byte, err error) { - if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.ECHO_REQ, data)); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(gearman.ECHO_RES); err != nil { - return - } - echo = job.Data - return -} - -// Get the last job. -// the job means a network package. -// Normally, it is the job executed result. -func (client *Client) LastJob() (job *ClientJob) { - if l := len(client.JobQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l-1; i++ { - <-client.JobQueue - } - } - return <-client.JobQueue -} - -// Send the job to job server. -func (client *Client) WriteJob(job *ClientJob) (err error) { - return client.write(job.Encode()) -} - -// Internal write -func (client *Client) write(buf []byte) (err error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = client.conn.Write(buf[i:]) - if err != nil { - return - } - } - return -} - -// Close. -func (client *Client) Close() (err error) { - err = client.conn.Close() - close(client.JobQueue) - return -} diff --git a/gearman/client/client_test.go b/gearman/client/client_test.go deleted file mode 100644 index f2ae43f..0000000 --- a/gearman/client/client_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package client - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "testing" -) - -var client *Client - -func init() { - client = NewClient() -} - -func TestClientAddServer(t *testing.T) { - t.Log("Add local server 127.0.0.1:4730") - if err := client.AddServer("127.0.0.1:4730"); err != nil { - t.Error(err) - } -} - -func TestClientEcho(t *testing.T) { - if echo, err := client.Echo([]byte("Hello world")); err != nil { - t.Error(err) - } else { - t.Log(echo) - } -} - -func TestClientDo(t *testing.T) { - if handle, err := client.Do("ToUpper", []byte("abcdef"), gearman.JOB_LOW|gearman.JOB_BG); err != nil { - t.Error(err) - } else { - t.Log(handle) - } -} - -func TestClientClose(t *testing.T) { - if err := client.Close(); err != nil { - t.Error(err) - } -} diff --git a/gearman/client/clientjob.go b/gearman/client/clientjob.go deleted file mode 100644 index 184b93a..0000000 --- a/gearman/client/clientjob.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package client - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bytes" -) - -// Client side job -type ClientJob struct { - Data []byte - Handle, UniqueId string - magicCode, DataType uint32 -} - -// Create a new job -func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) { - return &ClientJob{magicCode: magiccode, - DataType: datatype, - Data: data} -} - -// Decode a job from byte slice -func DecodeClientJob(data []byte) (job *ClientJob, err error) { - if len(data) < 12 { - err = gearman.ErrInvalidData - return - } - datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) - if len(data[12:]) != int(l) { - err = gearman.ErrInvalidData - return - } - data = data[12:] - job = NewClientJob(gearman.RES, datatype, data) - return -} - -// Encode a job to byte slice -func (job *ClientJob) Encode() (data []byte) { - magiccode := gearman.Uint32ToBytes(job.magicCode) - datatype := gearman.Uint32ToBytes(job.DataType) - data = make([]byte, 0, 1024*64) - data = append(data, magiccode[:]...) - data = append(data, datatype[:]...) - l := len(job.Data) - datalength := gearman.Uint32ToBytes(uint32(l)) - data = append(data, datalength[:]...) - data = append(data, job.Data...) - return -} - -// Extract the job's result. -func (job *ClientJob) Result() (data []byte, err error) { - switch job.DataType { - case gearman.WORK_FAIL: - job.Handle = string(job.Data) - err = gearman.ErrWorkFail - return - case gearman.WORK_EXCEPTION: - err = gearman.ErrWorkException - fallthrough - case gearman.WORK_COMPLETE: - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = gearman.ErrInvalidData - return - } - job.Handle = string(s[0]) - data = s[1] - default: - err = gearman.ErrDataType - } - return -} - -// Extract the job's update -func (job *ClientJob) Update() (data []byte, err error) { - if job.DataType != gearman.WORK_DATA && job.DataType != gearman.WORK_WARNING { - err = gearman.ErrDataType - return - } - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = gearman.ErrInvalidData - return - } - if job.DataType == gearman.WORK_WARNING { - err = gearman.ErrWorkWarning - } - job.Handle = string(s[0]) - data = s[1] - return -} diff --git a/gearman/worker/jobagent.go b/worker/jobagent.go similarity index 98% rename from gearman/worker/jobagent.go rename to worker/jobagent.go index 0aee90c..5456e94 100644 --- a/gearman/worker/jobagent.go +++ b/worker/jobagent.go @@ -5,7 +5,7 @@ package worker import ( - "bitbucket.org/mikespook/gearman-go/gearman" + gearman "bitbucket.org/mikespook/gearman-go" "io" "net" ) diff --git a/gearman/worker/worker.go b/worker/worker.go similarity index 99% rename from gearman/worker/worker.go rename to worker/worker.go index ca4589f..9a61937 100644 --- a/gearman/worker/worker.go +++ b/worker/worker.go @@ -5,7 +5,7 @@ package worker import ( - "bitbucket.org/mikespook/gearman-go/gearman" + gearman "bitbucket.org/mikespook/gearman-go" "bytes" "sync" ) diff --git a/gearman/worker/worker_test.go b/worker/worker_test.go similarity index 100% rename from gearman/worker/worker_test.go rename to worker/worker_test.go diff --git a/gearman/worker/workerjob.go b/worker/workerjob.go similarity index 98% rename from gearman/worker/workerjob.go rename to worker/workerjob.go index f4ab029..1f60e12 100644 --- a/gearman/worker/workerjob.go +++ b/worker/workerjob.go @@ -5,7 +5,7 @@ package worker import ( - "bitbucket.org/mikespook/gearman-go/gearman" + gearman "bitbucket.org/mikespook/gearman-go" "strconv" )