diff --git a/README.md b/README.md index cf930bd..23e11d5 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,11 @@ in the LICENSE file. This will install the client: -> $ go get bitbucket.org/mikespook/gearman-go/gearman/client +> $ go get bitbucket.org/mikespook/gearman-go/client This will install the worker: -> $ go get bitbucket.org/mikespook/gearman-go/gearman/worker +> $ go get bitbucket.org/mikespook/gearman-go/worker This will install the client and the worker automatically: @@ -46,8 +46,13 @@ This will install the client and the worker automatically: # Contacts -xingxing +Xing Xing http://mikespook.com http://twitter.com/mikespook + +# History + + * 0.1 Refactoring code, redesign the API. + * 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API. diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..fe3d7fe --- /dev/null +++ b/client/client.go @@ -0,0 +1,269 @@ +// Copyright 2011 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "io" + "net" + "bytes" + "strconv" + "bitbucket.org/mikespook/golib/autoinc" + "bitbucket.org/mikespook/gearman-go/common" +) + +// Job handler +type JobHandler func(*Job) error +// Status handler +// handle, known, running, numerator, denominator +type StatusHandler func(string, bool, bool, uint64, uint64) + +/* +The client side api for gearman + +usage: +c := client.New("tcp4", "127.0.0.1:4730") +handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) + +*/ +type Client struct { + ErrHandler common.ErrorHandler + JobHandler JobHandler + StatusHandler StatusHandler + + in chan []byte + out chan *Job + jobCreated chan *Job + conn net.Conn + ai *autoinc.AutoInc +} + +// Create a new client. +// Connect to "addr" through "network" +// Eg. +// client, err := client.New("127.0.0.1:4730") +func New(addr string) (client *Client, err error) { + conn, err := net.Dial(common.NETWORK, addr) + if err != nil { + return + } + client = &Client{ + jobCreated: make(chan *Job), + in: make(chan []byte, common.QUEUE_SIZE), + out: make(chan *Job, common.QUEUE_SIZE), + conn: conn, + ai: autoinc.New(0, 1), + } + go client.inLoop() + go client.outLoop() + return +} + +// out loop +func (client *Client) outLoop() { + ok := true + for ok { + if job, ok := <-client.out; ok { + if err := client.write(job.Encode()); err != nil { + client.err(err) + } + } + } +} + +// in loop +func (client *Client) inLoop() { + for { + rel, err := client.read() + if err != nil { + client.err(err) + continue + } + job, err := decodeJob(rel) + if err != nil { + client.err(err) + continue + } + switch job.DataType { + case common.ERROR: + _, err := common.GetError(job.Data) + client.err(err) + case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, + common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION, + common.ECHO_RES: + go client.handleJob(job) + case common.JOB_CREATED: + client.jobCreated <- job + case common.STATUS_RES: + go client.handleStatus(job) + } + } +} + +// inner read +func (client *Client) read() (data []byte, err error) { + if len(client.in) > 0 { + // incoming queue is not empty + data = <-client.in + } else { + // empty queue, read data from socket + for { + buf := make([]byte, common.BUFFER_SIZE) + var n int + if n, err = client.conn.Read(buf); err != nil { + if err == io.EOF && n == 0 { + if data == nil { + err = common.ErrEmptyReading + return + } + break + } + return + } + data = append(data, buf[0:n]...) + if n < common.BUFFER_SIZE { + break + } + } + } + // split package + tl := len(data) + start, end := 0, 4 + for i := 0; i < tl; i++ { + if string(data[start:end]) == common.RES_STR { + l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + total := l + 12 + if total == tl { + return + } else { + client.in <- data[total:] + data = data[:total] + return + } + } else { + start++ + end++ + } + } + return nil, common.Errorf("Invalid data: %V", data) +} + +// error handler +func (client *Client) err (e error) { + if client.ErrHandler != nil { + client.ErrHandler(e) + } +} + +// job handler +func (client *Client) handleJob(job *Job) { + if client.JobHandler != nil { + if err := client.JobHandler(job); err != nil { + client.err(err) + } + } +} + +// status handler +func (client *Client) handleStatus(job *Job) { + if client.StatusHandler != nil { + data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) + if len(data) != 5 { + client.err(common.Errorf("Invalid data: %V", job.Data)) + return + } + handle := string(data[0]) + known := (data[1][0] == '1') + running := (data[2][0] == '1') + numerator, err := strconv.ParseUint(string(data[3][0]), 10, 0) + if err != nil { + client.err(common.Errorf("Invalid handle: %s", data[3][0])) + return + } + denominator, err := strconv.ParseUint(string(data[4][0]), 10, 0) + if err != nil { + client.err(common.Errorf("Invalid handle: %s", data[4][0])) + return + } + client.StatusHandler(handle, known, running, numerator, denominator) + } +} + +// Do the function. +// funcname is a string with function name. +// data is encoding to byte array. +// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, +// and if it is background job: JOB_BG. +// JOB_LOW | JOB_BG means the job is running with low level in background. +func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { + var datatype uint32 + if flag & JOB_LOW == JOB_LOW { + if flag & JOB_BG == JOB_BG { + datatype = common.SUBMIT_JOB_LOW_BG + } else { + datatype = common.SUBMIT_JOB_LOW + } + } else if flag & JOB_HIGH == JOB_HIGH { + if flag & JOB_BG == JOB_BG { + datatype = common.SUBMIT_JOB_HIGH_BG + } else { + datatype = common.SUBMIT_JOB_HIGH + } + } else if flag & JOB_BG == JOB_BG { + datatype = common.SUBMIT_JOB_BG + } else { + datatype = common.SUBMIT_JOB + } + + uid := strconv.Itoa(int(client.ai.Id())) + l := len(funcname) + len(uid) + len(data) + 2 + rel := make([]byte, 0, l) + rel = append(rel, []byte(funcname)...) // len(funcname) + rel = append(rel, '\x00') // 1 Byte + rel = append(rel, []byte(uid)...) // len(uid) + rel = append(rel, '\x00') // 1 Byte + rel = append(rel, data...) // len(data) + client.writeJob(newJob(common.REQ, datatype, rel)) + // Waiting for JOB_CREATED + job := <-client.jobCreated + return string(job.Data), nil +} + +// Get job status from job server. +// !!!Not fully tested.!!! +func (client *Client) Status(handle string) { + job := newJob(common.REQ, common.GET_STATUS, []byte(handle)) + client.writeJob(job) +} + +// Send a something out, get the samething back. +func (client *Client) Echo(data []byte) { + client.writeJob(newJob(common.REQ, common.ECHO_REQ, data)) +} + +// Send the job to job server. +func (client *Client) writeJob(job *Job) { + client.out <- job +} + +// Internal write +func (client *Client) write(buf []byte) (err error) { + var n int + for i := 0; i < len(buf); i += n { + n, err = client.conn.Write(buf[i:]) + if err != nil { + return + } + } + return +} + +// Close +func (client *Client) Close() (err error) { + close(client.jobCreated) + close(client.in) + close(client.out) + return client.conn.Close(); +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..3853642 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,45 @@ +package client + +import ( + "testing" +) + +var client *Client + +func TestClientAddServer(t *testing.T) { + t.Log("Add local server 127.0.0.1:4730") + var err error + if client, err = New("127.0.0.1:4730"); err != nil { + t.Error(err) + } + client.ErrHandler = func(e error) { + t.Error(e) + } +} + +func TestClientEcho(t *testing.T) { + client.JobHandler = func(job *Job) error { + echo := string(job.Data) + if echo == "Hello world" { + t.Log(echo) + } else { + t.Errorf("Invalid echo data: %s", job.Data) + } + return nil + } + client.Echo([]byte("Hello world")) +} + +func TestClientDo(t *testing.T) { + if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { + t.Error(err) + } else { + t.Log(handle) + } +} + +func TestClientClose(t *testing.T) { + if err := client.Close(); err != nil { + t.Error(err) + } +} diff --git a/client/job.go b/client/job.go new file mode 100644 index 0000000..6b7246a --- /dev/null +++ b/client/job.go @@ -0,0 +1,128 @@ +// Copyright 2011 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "bytes" + "bitbucket.org/mikespook/gearman-go/common" +) + +const ( + // Job type + // JOB_NORMAL | JOB_BG means a normal level job run in background + // normal level + JOB_NORMAL = 0 + // background job + JOB_BG = 1 + // low level + JOB_LOW = 2 + // high level + JOB_HIGH = 4 +) + +// An error handler +type ErrorHandler func(error) + +// Client side job +type Job struct { + Data []byte + Handle, UniqueId string + magicCode, DataType uint32 +} + +// Create a new job +func newJob(magiccode, datatype uint32, data []byte) (job *Job) { + return &Job{magicCode: magiccode, + DataType: datatype, + Data: data} +} + +// Decode a job from byte slice +func decodeJob(data []byte) (job *Job, err error) { + if len(data) < 12 { + return nil, common.Errorf("Invalid data: %V", data) + } + datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + return nil, common.Errorf("Invalid data: %V", data) + } + data = data[12:] + return newJob(common.RES, datatype, data), nil +} + +// Encode a job to byte slice +func (job *Job) Encode() (data []byte) { + l := len(job.Data) + tl := l + 12 + data = make([]byte, tl) + + magiccode := common.Uint32ToBytes(job.magicCode) + datatype := common.Uint32ToBytes(job.DataType) + datalength := common.Uint32ToBytes(uint32(l)) + + for i := 0; i < tl; i ++ { + switch { + case i < 4: + data[i] = magiccode[i] + case i < 8: + data[i] = datatype[i - 4] + case i < 12: + data[i] = datalength[i - 8] + default: + data[i] = job.Data[i - 12] + } + } + // Alternative + /* + data = append(data, magiccode[:] ...) + data = append(data, datatype[:] ...) + data = append(data, datalength[:] ...) + data = append(data, job.Data ...) + */ + return +} + +// Extract the job's result. +func (job *Job) Result() (data []byte, err error) { + switch job.DataType { + case common.WORK_FAIL: + job.Handle = string(job.Data) + return nil, common.ErrWorkFail + case common.WORK_EXCEPTION: + err = common.ErrWorkException + fallthrough + case common.WORK_COMPLETE: + s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + return nil, common.Errorf("Invalid data: %V", job.Data) + } + job.Handle = string(s[0]) + data = s[1] + default: + err = common.ErrDataType + } + return +} + +// Extract the job's update +func (job *Job) Update() (data []byte, err error) { + if job.DataType != common.WORK_DATA && job.DataType != common.WORK_WARNING { + err = common.ErrDataType + return + } + s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + err = common.ErrInvalidData + return + } + if job.DataType == common.WORK_WARNING { + err = common.ErrWorkWarning + } + job.Handle = string(s[0]) + data = s[1] + return +} diff --git a/common/error.go b/common/error.go new file mode 100644 index 0000000..8c1e8fa --- /dev/null +++ b/common/error.go @@ -0,0 +1,48 @@ +// Copyright 2011 - 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package common + +import ( + "fmt" + "bytes" + "errors" + "syscall" +) + +var ( + ErrInvalidData = errors.New("Invalid data.") + ErrWorkWarning = errors.New("Work warning.") + ErrWorkFail = errors.New("Work fail.") + ErrWorkException = errors.New("Work exeption.") + ErrDataType = errors.New("Invalid data type.") + ErrOutOfCap = errors.New("Out of the capability.") + ErrNotConn = errors.New("Did not connect to job server.") + ErrFuncNotFound = errors.New("The function was not found.") + ErrEmptyReading = errors.New("Empty reading.") +) + +// Extract the error message +func GetError(data []byte) (eno syscall.Errno, err error) { + rel := bytes.SplitN(data, []byte{'\x00'}, 2) + if len(rel) != 2 { + err = Errorf("Not a error data: %V", data) + return + } + l := len(rel[0]) + eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) + err = errors.New(string(rel[1])) + return +} + +// Get a formated error +func Errorf(format string, msg ... interface{}) error { + return errors.New(fmt.Sprintf(format, msg ... )) +} + +// An error handler +type ErrorHandler func(error) + + diff --git a/common/gearman.go b/common/gearman.go new file mode 100644 index 0000000..cc9d41e --- /dev/null +++ b/common/gearman.go @@ -0,0 +1,66 @@ +// Copyright 2011 - 2012 Xing Xing . +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package common + +const ( + NETWORK = "tcp" + // queue size + QUEUE_SIZE = 512 + // read buffer size + BUFFER_SIZE = 1024 + + // \x00REQ + REQ = 5391697 + REQ_STR = "\x00REQ" + // \x00RES + RES = 5391699 + RES_STR = "\x00RES" + + // package data type + CAN_DO = 1 + CANT_DO = 2 + RESET_ABILITIES = 3 + PRE_SLEEP = 4 + NOOP = 6 + JOB_CREATED = 8 + GRAB_JOB = 9 + NO_JOB = 10 + JOB_ASSIGN = 11 + WORK_STATUS = 12 + WORK_COMPLETE = 13 + WORK_FAIL = 14 + GET_STATUS = 15 + ECHO_REQ = 16 + ECHO_RES = 17 + ERROR = 19 + STATUS_RES = 20 + SET_CLIENT_ID = 22 + CAN_DO_TIMEOUT = 23 + WORK_EXCEPTION = 25 + WORK_DATA = 28 + WORK_WARNING = 29 + GRAB_JOB_UNIQ = 30 + JOB_ASSIGN_UNIQ = 31 + + SUBMIT_JOB = 7 + SUBMIT_JOB_BG = 18 + SUBMIT_JOB_HIGH = 21 + SUBMIT_JOB_HIGH_BG = 32 + SUBMIT_JOB_LOW = 33 + SUBMIT_JOB_LOW_BG = 34 +) + +// Decode [4]byte to uint32 +func BytesToUint32(buf [4]byte) uint32 { + return uint32(buf[0])<<24 + uint32(buf[1])<<16 + uint32(buf[2])<<8 + + uint32(buf[3]) +} + +// Encode uint32 to [4]byte +func Uint32ToBytes(i uint32) [4]byte { + return [4]byte{byte((i >> 24) & 0xff), byte((i >> 16) & 0xff), + byte((i >> 8) & 0xff), byte(i & 0xff),} +} diff --git a/example/client.go b/example/client.go index ec8bd94..d1ad7a2 100644 --- a/example/client.go +++ b/example/client.go @@ -1,46 +1,46 @@ package main import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bitbucket.org/mikespook/gearman-go/gearman/client" "log" + "sync" + "bitbucket.org/mikespook/gearman-go/client" ) func main() { - client := client.New() - defer client.Close() - if err := client.AddServer("127.0.0.1:4730"); err != nil { + var wg sync.WaitGroup + + c, err := client.New("127.0.0.1:4730") + if err != nil { log.Fatalln(err) } + defer c.Close() echo := []byte("Hello\x00 world") - - if data, err := client.Echo(echo); err != nil { - log.Fatalln(string(data)) + c.JobHandler = func(job *client.Job) error { + log.Printf("%s", job.Data) + wg.Done() + return nil } - handle, err := client.Do("ToUpper", echo, gearman.JOB_NORMAL) + c.ErrHandler = func(e error) { + log.Println(e) + panic(e) + } + wg.Add(1) + c.Echo(echo) + wg.Add(1) + handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL) if err != nil { log.Fatalln(err) } else { log.Println(handle) - job := <-client.JobQueue - if data, err := job.Result(); err != nil { - log.Fatalln(err) - } else { - log.Println(string(data)) - } } - known, running, numerator, denominator, err := client.Status(handle) - if err != nil { - log.Fatalln(err) - } - if !known { - log.Println("Unknown") - } - if running { - log.Printf("%g%%\n", float32(numerator)*100/float32(denominator)) - } else { - log.Println("Not running") + c.StatusHandler = func(handle string, known, running bool, numerator, denominator uint64) { + log.Printf("%s: %b, %b, %d, %d", handle, known, running, numerator, denominator) + wg.Done() } + wg.Add(1) + c.Status(handle) + + wg.Wait() } diff --git a/example/worker.go b/example/worker.go index d9b4759..9a19338 100644 --- a/example/worker.go +++ b/example/worker.go @@ -1,61 +1,37 @@ package main import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bitbucket.org/mikespook/gearman-go/gearman/worker" - "bitbucket.org/mikespook/golib/signal" "os" - "fmt" "log" "strings" + "bitbucket.org/mikespook/golib/signal" + "bitbucket.org/mikespook/gearman-go/worker" ) -func ToUpper(job *worker.WorkerJob) ([]byte, error) { +func ToUpper(job *worker.Job) ([]byte, error) { + log.Printf("Handle=[%s]; UID=[%s], Data=[%s]\n", + job.Handle, job.UniqueId, job.Data) data := []byte(strings.ToUpper(string(job.Data))) return data, nil } func main() { - w := worker.New(worker.Unlimit) - w.ErrFunc = func(e error) { + log.Println("Starting ...") + defer log.Println("Shutdown complete!") + w := worker.New(worker.Unlimited) + w.ErrHandler = func(e error) { log.Println(e) } + w.JobHandler = func(job *worker.Job) error { + log.Printf("H=%s, UID=%s, Data=%s\n", job.Handle, + job.UniqueId, job.Data) + return nil + } w.AddServer("127.0.0.1:4730") - w.AddFunction("ToUpper", ToUpper, 0) - w.AddFunction("ToUpperTimeOut5", ToUpper, 5) - - // Catch the interrupt to exit the working loop. + w.AddFunc("ToUpper", ToUpper, 0) + //w.AddFunc("ToUpperTimeOut5", ToUpper, 5) + go w.Work() sh := signal.NewHandler() - sh.Bind(os.Interrupt, func() bool { - w.Close() - return true - }) - go sh.Loop() - - go func() { - log.Println("start worker") - for { - print("cmd: ") - var str string - fmt.Scan(&str) - switch str { - case "echo": - w.Echo([]byte("Hello world!")) - var job *worker.WorkerJob - for job = <-w.JobQueue; job.DataType != gearman.ECHO_RES; job = <-w.JobQueue { - log.Println(job) - } - log.Println(string(job.Data)) - case "quit": - os.Exit(0) - return - case "result": - job := <-w.JobQueue - log.Println(string(job.Data)) - default: - log.Println("Unknown command") - } - } - }() - w.Work() + sh.Bind(os.Interrupt, func() bool {return true}) + sh.Loop() } diff --git a/gearman.go b/gearman.go index 067831f..48c134b 100644 --- a/gearman.go +++ b/gearman.go @@ -1,4 +1,4 @@ -// Copyright 2012 Xing Xing All rights reserved. +// Copyright 2011 Xing Xing All rights reserved. // Use of this source code is governed by a MIT // license that can be found in the LICENSE file. @@ -10,6 +10,7 @@ The protocol was implemented by native way. package gearman import ( - _ "bitbucket.org/mikespook/gearman-go/gearman/client" - _ "bitbucket.org/mikespook/gearman-go/gearman/worker" + _ "bitbucket.org/mikespook/gearman-go/common" + _ "bitbucket.org/mikespook/gearman-go/client" + _ "bitbucket.org/mikespook/gearman-go/worker" ) diff --git a/gearman/client/client.go b/gearman/client/client.go deleted file mode 100644 index 156bb8f..0000000 --- a/gearman/client/client.go +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package client - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bytes" - "io" - "net" - "strconv" - "sync" -) - -/* -The client side api for gearman. - -usage: - client = NewClient() - client.AddServer("127.0.0.1:4730") - handle := client.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) - -*/ -type Client struct { - mutex sync.Mutex - conn net.Conn - incoming chan []byte - - JobQueue chan *ClientJob - UId uint32 -} - -// Create a new client. -func New() (client *Client) { - return &Client{ - JobQueue: make(chan *ClientJob, gearman.QUEUE_CAP), - incoming: make(chan []byte, gearman.QUEUE_CAP), - UId:1} -} - -// Add a server. -// In this version, one client connect to one job server. -// Sample is better. Plz do the load balancing by your self. -func (client *Client) AddServer(addr string) (err error) { - client.conn, err = net.Dial(gearman.TCP, addr) - if err != nil { - return - } - return -} - -// Internal read -func (client *Client) read() (data []byte, err error) { - if len(client.incoming) > 0 { - // incoming queue is not empty - data = <-client.incoming - } else { - // empty queue, read data from socket - for { - buf := make([]byte, gearman.BUFFER_SIZE) - var n int - if n, err = client.conn.Read(buf); err != nil { - if err == io.EOF && n == 0 { - break - } - return - } - data = append(data, buf[0:n]...) - if n < gearman.BUFFER_SIZE { - break - } - } - } - // split package - start, end := 0, 4 - tl := len(data) - for i := 0; i < tl; i++ { - if string(data[start:end]) == gearman.RES_STR { - l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) - total := l + 12 - if total == tl { - return - } else { - client.incoming <- data[total:] - data = data[:total] - return - } - } else { - start++ - end++ - } - } - err = gearman.ErrInvalidData - return -} - -// Read a job from job server. -// This function will return the job, and add it to the job queue. -func (client *Client) ReadJob() (job *ClientJob, err error) { - var rel []byte - if rel, err = client.read(); err != nil { - return - } - if job, err = DecodeClientJob(rel); err != nil { - return - } else { - switch job.DataType { - case gearman.ERROR: - _, err = gearman.GetError(job.Data) - return - case gearman.WORK_DATA, gearman.WORK_WARNING, gearman.WORK_STATUS, gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION: - client.JobQueue <- job - } - } - return -} - -// Do the function. -// funcname is a string with function name. -// data is encoding to byte array. -// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, -// and if it is background job: JOB_BG. -// JOB_LOW | JOB_BG means the job is running with low level in background. -func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { - var datatype uint32 - if flag&gearman.JOB_LOW == gearman.JOB_LOW { - if flag&gearman.JOB_BG == gearman.JOB_BG { - datatype = gearman.SUBMIT_JOB_LOW_BG - } else { - datatype = gearman.SUBMIT_JOB_LOW - } - } else if flag&gearman.JOB_HIGH == gearman.JOB_HIGH { - if flag&gearman.JOB_BG == gearman.JOB_BG { - datatype = gearman.SUBMIT_JOB_HIGH_BG - } else { - datatype = gearman.SUBMIT_JOB_HIGH - } - } else if flag&gearman.JOB_BG == gearman.JOB_BG { - datatype = gearman.SUBMIT_JOB_BG - } else { - datatype = gearman.SUBMIT_JOB - } - - rel := make([]byte, 0, 1024*64) - rel = append(rel, []byte(funcname)...) - rel = append(rel, '\x00') - client.mutex.Lock() - uid := strconv.Itoa(int(client.UId)) - client.UId++ - rel = append(rel, []byte(uid)...) - client.mutex.Unlock() - rel = append(rel, '\x00') - rel = append(rel, data...) - if err = client.WriteJob(NewClientJob(gearman.REQ, datatype, rel)); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(gearman.JOB_CREATED); err != nil { - return - } - handle = string(job.Data) - go func() { - if flag&gearman.JOB_BG != gearman.JOB_BG { - for { - if job, err = client.ReadJob(); err != nil { - return - } - switch job.DataType { - case gearman.WORK_DATA, gearman.WORK_WARNING: - case gearman.WORK_STATUS: - case gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION: - return - } - } - } - }() - return -} - -// Internal read last job -func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err error) { - for { - if job, err = client.ReadJob(); err != nil { - return - } - if job.DataType == datatype { - break - } - } - if job.DataType != datatype { - err = gearman.ErrDataType - } - return -} - -// Get job status from job server. -// !!!Not fully tested.!!! -func (client *Client) Status(handle string) (known, running bool, numerator, denominator uint64, err error) { - - if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.GET_STATUS, []byte(handle))); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(gearman.STATUS_RES); err != nil { - return - } - data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) - if len(data) != 5 { - err = gearman.ErrInvalidData - return - } - if handle != string(data[0]) { - err = gearman.ErrInvalidData - return - } - known = data[1][0] == '1' - running = data[2][0] == '1' - if numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0); err != nil { - return - } - if denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0); err != nil { - return - } - return -} - -// Send a something out, get the samething back. -func (client *Client) Echo(data []byte) (echo []byte, err error) { - if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.ECHO_REQ, data)); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(gearman.ECHO_RES); err != nil { - return - } - echo = job.Data - return -} - -// Get the last job. -// the job means a network package. -// Normally, it is the job executed result. -func (client *Client) LastJob() (job *ClientJob) { - if l := len(client.JobQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l-1; i++ { - <-client.JobQueue - } - } - return <-client.JobQueue -} - -// Send the job to job server. -func (client *Client) WriteJob(job *ClientJob) (err error) { - return client.write(job.Encode()) -} - -// Internal write -func (client *Client) write(buf []byte) (err error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = client.conn.Write(buf[i:]) - if err != nil { - return - } - } - return -} - -// Close. -func (client *Client) Close() (err error) { - err = client.conn.Close() - close(client.JobQueue) - return -} diff --git a/gearman/client/client_test.go b/gearman/client/client_test.go deleted file mode 100644 index f2ae43f..0000000 --- a/gearman/client/client_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package client - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "testing" -) - -var client *Client - -func init() { - client = NewClient() -} - -func TestClientAddServer(t *testing.T) { - t.Log("Add local server 127.0.0.1:4730") - if err := client.AddServer("127.0.0.1:4730"); err != nil { - t.Error(err) - } -} - -func TestClientEcho(t *testing.T) { - if echo, err := client.Echo([]byte("Hello world")); err != nil { - t.Error(err) - } else { - t.Log(echo) - } -} - -func TestClientDo(t *testing.T) { - if handle, err := client.Do("ToUpper", []byte("abcdef"), gearman.JOB_LOW|gearman.JOB_BG); err != nil { - t.Error(err) - } else { - t.Log(handle) - } -} - -func TestClientClose(t *testing.T) { - if err := client.Close(); err != nil { - t.Error(err) - } -} diff --git a/gearman/client/clientjob.go b/gearman/client/clientjob.go deleted file mode 100644 index 184b93a..0000000 --- a/gearman/client/clientjob.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package client - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bytes" -) - -// Client side job -type ClientJob struct { - Data []byte - Handle, UniqueId string - magicCode, DataType uint32 -} - -// Create a new job -func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) { - return &ClientJob{magicCode: magiccode, - DataType: datatype, - Data: data} -} - -// Decode a job from byte slice -func DecodeClientJob(data []byte) (job *ClientJob, err error) { - if len(data) < 12 { - err = gearman.ErrInvalidData - return - } - datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) - if len(data[12:]) != int(l) { - err = gearman.ErrInvalidData - return - } - data = data[12:] - job = NewClientJob(gearman.RES, datatype, data) - return -} - -// Encode a job to byte slice -func (job *ClientJob) Encode() (data []byte) { - magiccode := gearman.Uint32ToBytes(job.magicCode) - datatype := gearman.Uint32ToBytes(job.DataType) - data = make([]byte, 0, 1024*64) - data = append(data, magiccode[:]...) - data = append(data, datatype[:]...) - l := len(job.Data) - datalength := gearman.Uint32ToBytes(uint32(l)) - data = append(data, datalength[:]...) - data = append(data, job.Data...) - return -} - -// Extract the job's result. -func (job *ClientJob) Result() (data []byte, err error) { - switch job.DataType { - case gearman.WORK_FAIL: - job.Handle = string(job.Data) - err = gearman.ErrWorkFail - return - case gearman.WORK_EXCEPTION: - err = gearman.ErrWorkException - fallthrough - case gearman.WORK_COMPLETE: - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = gearman.ErrInvalidData - return - } - job.Handle = string(s[0]) - data = s[1] - default: - err = gearman.ErrDataType - } - return -} - -// Extract the job's update -func (job *ClientJob) Update() (data []byte, err error) { - if job.DataType != gearman.WORK_DATA && job.DataType != gearman.WORK_WARNING { - err = gearman.ErrDataType - return - } - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = gearman.ErrInvalidData - return - } - if job.DataType == gearman.WORK_WARNING { - err = gearman.ErrWorkWarning - } - job.Handle = string(s[0]) - data = s[1] - return -} diff --git a/gearman/gearman.go b/gearman/gearman.go deleted file mode 100644 index 636c5ff..0000000 --- a/gearman/gearman.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -/* -This module is Gearman API for golang. -The protocol was implemented by native way. -*/ - -package gearman - -import ( - "bytes" - "errors" - "syscall" -) - -const ( - // tcp4 is tested. You can modify this to 'tcp' for both ipv4 and ipv6, - // or 'tcp6' only for ipv6. - TCP = "tcp4" - // the number limited for job servers. - WORKER_SERVER_CAP = 32 - // the number limited for functions. - WORKER_FUNCTION_CAP = 512 - // queue size - QUEUE_CAP = 512 - // read buffer size - BUFFER_SIZE = 1024 - - // \x00REQ - REQ = 5391697 - REQ_STR = "\x00REQ" - // \x00RES - RES = 5391699 - RES_STR = "\x00RES" - - // package data type - CAN_DO = 1 - CANT_DO = 2 - RESET_ABILITIES = 3 - PRE_SLEEP = 4 - NOOP = 6 - JOB_CREATED = 8 - GRAB_JOB = 9 - NO_JOB = 10 - JOB_ASSIGN = 11 - WORK_STATUS = 12 - WORK_COMPLETE = 13 - WORK_FAIL = 14 - GET_STATUS = 15 - ECHO_REQ = 16 - ECHO_RES = 17 - ERROR = 19 - STATUS_RES = 20 - SET_CLIENT_ID = 22 - CAN_DO_TIMEOUT = 23 - WORK_EXCEPTION = 25 - WORK_DATA = 28 - WORK_WARNING = 29 - GRAB_JOB_UNIQ = 30 - JOB_ASSIGN_UNIQ = 31 - - SUBMIT_JOB = 7 - SUBMIT_JOB_BG = 18 - SUBMIT_JOB_HIGH = 21 - SUBMIT_JOB_HIGH_BG = 32 - SUBMIT_JOB_LOW = 33 - SUBMIT_JOB_LOW_BG = 34 - - // Job type - // JOB_NORMAL | JOB_BG means a normal level job run in background - // normal level - JOB_NORMAL = 0 - // background job - JOB_BG = 1 - // low level - JOB_LOW = 2 - // high level - JOB_HIGH = 4 -) - -var ( - ErrIsNotErr = errors.New("The input is not a error data.") - ErrInvalidData = errors.New("Invalid data.") - ErrWorkWarning = errors.New("Work warning.") - ErrWorkFail = errors.New("Work fail.") - ErrWorkException = errors.New("Work exeption.") - ErrDataType = errors.New("Invalid data type.") - ErrOutOfCap = errors.New("Out of the capability.") - ErrNotConn = errors.New("Did not connect to job server.") - ErrFuncNotFound = errors.New("The function was not found.") -) - -// Extract the error message -func GetError(data []byte) (eno syscall.Errno, err error) { - rel := bytes.SplitN(data, []byte{'\x00'}, 2) - if len(rel) != 2 { - err = ErrIsNotErr - return - } - l := len(rel[0]) - eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) - err = errors.New(string(rel[1])) - return -} - -// Decode [4]byte to uint32 -func BytesToUint32(buf [4]byte) uint32 { - return uint32(buf[0])<<24 + - uint32(buf[1])<<16 + - uint32(buf[2])<<8 + - uint32(buf[3]) -} - -// Encode uint32 to [4]byte -func Uint32ToBytes(i uint32) (data [4]byte) { - data[0] = byte((i >> 24) & 0xff) - data[1] = byte((i >> 16) & 0xff) - data[2] = byte((i >> 8) & 0xff) - data[3] = byte(i & 0xff) - return -} diff --git a/gearman/worker/jobagent.go b/gearman/worker/jobagent.go deleted file mode 100644 index 0aee90c..0000000 --- a/gearman/worker/jobagent.go +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package worker - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "io" - "net" -) - -// The agent of job server. -type jobAgent struct { - conn net.Conn - worker *Worker - running bool - incoming chan []byte -} - -// Create the agent of job server. -func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err error) { - conn, err := net.Dial(gearman.TCP, addr) - if err != nil { - return nil, err - } - jobagent = &jobAgent{conn: conn, worker: worker, running: true, incoming: make(chan []byte, gearman.QUEUE_CAP)} - return jobagent, err -} - -// Internal read -func (agent *jobAgent) read() (data []byte, err error) { - if len(agent.incoming) > 0 { - // incoming queue is not empty - data = <-agent.incoming - } else { - for { - buf := make([]byte, gearman.BUFFER_SIZE) - var n int - if n, err = agent.conn.Read(buf); err != nil { - if err == io.EOF && n == 0 { - err = nil - return - } - return - } - data = append(data, buf[0:n]...) - if n < gearman.BUFFER_SIZE { - break - } - } - } - // split package - start := 0 - tl := len(data) - for i := 0; i < tl; i++ { - if string(data[start:start+4]) == gearman.RES_STR { - l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) - total := l + 12 - if total == tl { - return - } else { - agent.incoming <- data[total:] - data = data[:total] - return - } - } else { - start++ - } - } - err = gearman.ErrInvalidData - return -} - -// Main loop. -func (agent *jobAgent) Work() { - noop := true - for agent.running { - // got noop msg and incoming queue is zero, grab job - if noop && len(agent.incoming) == 0 { - agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.GRAB_JOB, nil)) - } - rel, err := agent.read() - if err != nil { - agent.worker.err(err) - continue - } - job, err := DecodeWorkerJob(rel) - if err != nil { - agent.worker.err(err) - continue - } else { - switch job.DataType { - case gearman.NOOP: - noop = true - case gearman.NO_JOB: - noop = false - agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.PRE_SLEEP, nil)) - case gearman.ECHO_RES, gearman.JOB_ASSIGN_UNIQ, gearman.JOB_ASSIGN: - job.agent = agent - agent.worker.incoming <- job - } - } - } - return -} - -// Send a job to the job server. -func (agent *jobAgent) WriteJob(job *WorkerJob) (err error) { - return agent.write(job.Encode()) -} - -// Internal write the encoded job. -func (agent *jobAgent) write(buf []byte) (err error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = agent.conn.Write(buf[i:]) - if err != nil { - return err - } - } - return -} - -// Close. -func (agent *jobAgent) Close() (err error) { - agent.running = false - close(agent.incoming) - err = agent.conn.Close() - return -} diff --git a/gearman/worker/worker.go b/gearman/worker/worker.go deleted file mode 100644 index ca4589f..0000000 --- a/gearman/worker/worker.go +++ /dev/null @@ -1,286 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package worker - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "bytes" - "sync" -) - -const ( - Unlimit = 0 - OneByOne = 1 -) - -// The definition of the callback function. -type JobFunction func(job *WorkerJob) ([]byte, error) - -// Map for added function. -type JobFunctionMap map[string]JobFunction - -// Error Function -type ErrFunc func(e error) -/* -Worker side api for gearman. - -usage: - w = worker.New(worker.Unlimit) - w.AddFunction("foobar", foobar) - w.AddServer("127.0.0.1:4730") - w.Work() // Enter the worker's main loop - -The definition of the callback function 'foobar' should suit for the type 'JobFunction'. -It looks like this: - -func foobar(job *WorkerJob) (data []byte, err os.Error) { - //sth. here - //plaplapla... - return -} -*/ -type Worker struct { - clients []*jobAgent - functions JobFunctionMap - running bool - incoming chan *WorkerJob - mutex sync.Mutex - limit chan bool - - JobQueue chan *WorkerJob - - // assign a ErrFunc to handle errors - // Must assign befor AddServer - ErrFunc ErrFunc -} - -// Get a new worker -func New(l int) (worker *Worker) { - worker = &Worker{ - // job server list - clients: make([]*jobAgent, 0, gearman.WORKER_SERVER_CAP), - // function list - functions: make(JobFunctionMap), - incoming: make(chan *WorkerJob, gearman.QUEUE_CAP), - JobQueue: make(chan *WorkerJob, gearman.QUEUE_CAP), - running: true, - } - if l != Unlimit { - worker.limit = make(chan bool, l) - for i := 0; i < l; i ++ { - worker.limit <- true - } - } - return -} - -// -func (worker *Worker)err(e error) { - if worker.ErrFunc != nil { - worker.ErrFunc(e) - } -} - -// Add a server. The addr should be 'host:port' format. -// The connection is established at this time. -func (worker *Worker) AddServer(addr string) (err error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() - - if len(worker.clients) == cap(worker.clients) { - return gearman.ErrOutOfCap - } - - // Create a new job server's client as a agent of server - server, err := newJobAgent(addr, worker) - if err != nil { - return err - } - - n := len(worker.clients) - worker.clients = worker.clients[0 : n+1] - worker.clients[n] = server - return -} - -// Add a function. -// Plz added job servers first, then functions. -// The API will tell every connected job server that 'I can do this' -func (worker *Worker) AddFunction(funcname string, - f JobFunction, timeout uint32) (err error) { - if len(worker.clients) < 1 { - return gearman.ErrNotConn - } - worker.mutex.Lock() - defer worker.mutex.Unlock() - worker.functions[funcname] = f - - var datatype uint32 - var data []byte - if timeout == 0 { - datatype = gearman.CAN_DO - data = []byte(funcname) - } else { - datatype = gearman.CAN_DO_TIMEOUT - data = []byte(funcname + "\x00") - t := gearman.Uint32ToBytes(timeout) - data = append(data, t[:]...) - } - job := NewWorkerJob(gearman.REQ, datatype, data) - worker.WriteJob(job) - return -} - -// Remove a function. -// Tell job servers 'I can not do this now' at the same time. -func (worker *Worker) RemoveFunction(funcname string) (err error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() - - if worker.functions[funcname] == nil { - return gearman.ErrFuncNotFound - } - delete(worker.functions, funcname) - job := NewWorkerJob(gearman.REQ, gearman.CANT_DO, []byte(funcname)) - worker.WriteJob(job) - return -} - -// Main loop -func (worker *Worker) Work() { - for _, v := range worker.clients { - go v.Work() - } - for worker.running || len(worker.incoming) > 0{ - select { - case job := <-worker.incoming: - if job == nil { - break - } - switch job.DataType { - case gearman.NO_JOB: - // do nothing - case gearman.ERROR: - _, err := gearman.GetError(job.Data) - worker.err(err) - case gearman.JOB_ASSIGN, gearman.JOB_ASSIGN_UNIQ: - go func() { - if err := worker.exec(job); err != nil { - worker.err(err) - } - }() - default: - worker.JobQueue <- job - } - } - } - close(worker.incoming) -} - -// Get the last job in queue. -// If there are more than one job in the queue, -// the last one will be returned, -// the others will be lost. -func (worker *Worker) LastJob() (job *WorkerJob) { - if l := len(worker.JobQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l-1; i++ { - <-worker.JobQueue - } - } - return <-worker.JobQueue -} - -// Close. -func (worker *Worker) Close() (err error) { - for _, v := range worker.clients { - err = v.Close() - } - worker.running = false - return err -} - -// Write a job to job server. -// Here, the job's mean is not the oraginal mean. -// Just looks like a network package for job's result or tell job server, there was a fail. -func (worker *Worker) WriteJob(job *WorkerJob) (err error) { - e := make(chan error) - for _, v := range worker.clients { - go func() { - e <- v.WriteJob(job) - }() - } - return <-e -} - -// Send a something out, get the samething back. -func (worker *Worker) Echo(data []byte) (err error) { - job := NewWorkerJob(gearman.REQ, gearman.ECHO_REQ, data) - return worker.WriteJob(job) -} - -// Remove all of functions. -// Both from the worker or job servers. -func (worker *Worker) Reset() (err error) { - job := NewWorkerJob(gearman.REQ, gearman.RESET_ABILITIES, nil) - err = worker.WriteJob(job) - worker.functions = make(JobFunctionMap) - return -} - -// Set the worker's unique id. -func (worker *Worker) SetId(id string) (err error) { - job := NewWorkerJob(gearman.REQ, gearman.SET_CLIENT_ID, []byte(id)) - return worker.WriteJob(job) -} - -// Execute the job. And send back the result. -func (worker *Worker) exec(job *WorkerJob) (err error) { - if worker.limit != nil { - <- worker.limit - defer func() { - worker.limit <- true - }() - } - var limit int - if job.DataType == gearman.JOB_ASSIGN { - limit = 3 - } else { - limit = 4 - } - jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit) - job.Handle = string(jobdata[0]) - funcname := string(jobdata[1]) - if job.DataType == gearman.JOB_ASSIGN { - job.Data = jobdata[2] - } else { - job.UniqueId = string(jobdata[2]) - job.Data = jobdata[3] - } - f, ok := worker.functions[funcname] - if !ok { - return gearman.ErrFuncNotFound - } - result, err := f(job) - var datatype uint32 - if err == nil { - datatype = gearman.WORK_COMPLETE - } else { - if result == nil { - datatype = gearman.WORK_FAIL - } else { - datatype = gearman.WORK_EXCEPTION - } - } - - job.magicCode = gearman.REQ - job.DataType = datatype - job.Data = result - - worker.WriteJob(job) - return -} diff --git a/gearman/worker/workerjob.go b/gearman/worker/workerjob.go deleted file mode 100644 index f4ab029..0000000 --- a/gearman/worker/workerjob.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package worker - -import ( - "bitbucket.org/mikespook/gearman-go/gearman" - "strconv" -) - -// Worker side job -type WorkerJob struct { - Data []byte - Handle, UniqueId string - agent *jobAgent - magicCode, DataType uint32 -} - -// Create a new job -func NewWorkerJob(magiccode, datatype uint32, data []byte) (job *WorkerJob) { - return &WorkerJob{magicCode: magiccode, - DataType: datatype, - Data: data} -} - -// Decode job from byte slice -func DecodeWorkerJob(data []byte) (job *WorkerJob, err error) { - if len(data) < 12 { - err = gearman.ErrInvalidData - return - } - datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) - if len(data[12:]) != int(l) { - err = gearman.ErrInvalidData - return - } - data = data[12:] - job = NewWorkerJob(gearman.RES, datatype, data) - return -} - -// Encode a job to byte slice -func (job *WorkerJob) Encode() (data []byte) { - magiccode := gearman.Uint32ToBytes(job.magicCode) - datatype := gearman.Uint32ToBytes(job.DataType) - data = make([]byte, 0, 1024*64) - data = append(data, magiccode[:]...) - data = append(data, datatype[:]...) - data = append(data, []byte{0, 0, 0, 0}...) - l := len(job.Data) - if job.Handle != "" { - data = append(data, []byte(job.Handle)...) - data = append(data, 0) - l += len(job.Handle) + 1 - } - data = append(data, job.Data...) - datalength := gearman.Uint32ToBytes(uint32(l)) - copy(data[8:12], datalength[:]) - return -} - -// Send some datas to client. -// Using this in a job's executing. -func (job *WorkerJob) UpdateData(data []byte, iswaring bool) (err error) { - result := append([]byte(job.Handle), 0) - result = append(result, data...) - var datatype uint32 - if iswaring { - datatype = gearman.WORK_WARNING - } else { - datatype = gearman.WORK_DATA - } - return job.agent.WriteJob(NewWorkerJob(gearman.REQ, datatype, result)) -} - -// Update status. -// Tall client how many percent job has been executed. -func (job *WorkerJob) UpdateStatus(numerator, denominator int) (err error) { - n := []byte(strconv.Itoa(numerator)) - d := []byte(strconv.Itoa(denominator)) - result := append([]byte(job.Handle), 0) - result = append(result, n...) - result = append(result, d...) - return job.agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.WORK_STATUS, result)) -} diff --git a/worker/agent.go b/worker/agent.go new file mode 100644 index 0000000..ceef56a --- /dev/null +++ b/worker/agent.go @@ -0,0 +1,155 @@ +// Copyright 2011 Xing Xing All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + "io" + "net" + "bitbucket.org/mikespook/gearman-go/common" +) + +// The agent of job server. +type agent struct { + conn net.Conn + worker *Worker + in chan []byte + out chan *Job +} + +// Create the agent of job server. +func newAgent(addr string, worker *Worker) (a *agent, err error) { + conn, err := net.Dial(common.NETWORK, addr) + if err != nil { + return + } + a = &agent{ + conn: conn, + worker: worker, + in: make(chan []byte, common.QUEUE_SIZE), + out: make(chan *Job, common.QUEUE_SIZE), + } + return +} + +// outputing loop +func (a *agent) outLoop() { + ok := true + for ok { + if job, ok := <-a.out; ok { + if err := a.write(job.Encode()); err != nil { + a.worker.err(err) + } + } + } +} + +// inputing loop +func (a *agent) inLoop() { + defer func() { + a.conn.Close() + close(a.in) + close(a.out) + a.worker.removeAgent(a) + }() + noop := true + for a.worker.running { + // got noop msg and in queue is zero, grab job + if noop && len(a.in) == 0 { + a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) + } + rel, err := a.read() + if err != nil { + if err == common.ErrEmptyReading { + break + } + a.worker.err(err) + continue + } + job, err := decodeJob(rel) + if err != nil { + a.worker.err(err) + continue + } + switch job.DataType { + case common.NOOP: + noop = true + case common.NO_JOB: + noop = false + a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) + case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: + job.agent = a + a.worker.in <- job + } + } +} + +func (a *agent) Work() { + go a.outLoop() + go a.inLoop() +} + +// Internal read +func (a *agent) read() (data []byte, err error) { + if len(a.in) > 0 { + // in queue is not empty + data = <-a.in + } else { + for { + buf := make([]byte, common.BUFFER_SIZE) + var n int + if n, err = a.conn.Read(buf); err != nil { + if err == io.EOF && n == 0 { + if data == nil { + err = common.ErrEmptyReading + return + } + break + } + return + } + data = append(data, buf[0:n]...) + if n < common.BUFFER_SIZE { + break + } + } + } + // split package + tl := len(data) + start := 0 + for i := 0; i < tl; i++ { + if string(data[start:start+4]) == common.RES_STR { + l := int(common.BytesToUint32([4]byte{data[start+8], + data[start+9], data[start+10], data[start+11]})) + total := l + 12 + if total == tl { + return + } else { + a.in <- data[total:] + data = data[:total] + return + } + } else { + start++ + } + } + return nil, common.Errorf("Invalid data: %V", data) +} + +// Send a job to the job server. +func (a *agent) WriteJob(job *Job) { + a.out <- job +} + +// Internal write the encoded job. +func (a *agent) write(buf []byte) (err error) { + var n int + for i := 0; i < len(buf); i += n { + n, err = a.conn.Write(buf[i:]) + if err != nil { + return err + } + } + return +} diff --git a/worker/job.go b/worker/job.go new file mode 100644 index 0000000..054fd30 --- /dev/null +++ b/worker/job.go @@ -0,0 +1,90 @@ +// Copyright 2011 Xing Xing +// All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + "strconv" + "bitbucket.org/mikespook/gearman-go/common" +) + +// Worker side job +type Job struct { + Data []byte + Handle, UniqueId string + agent *agent + magicCode, DataType uint32 +} + +// Create a new job +func newJob(magiccode, datatype uint32, data []byte) (job *Job) { + return &Job{magicCode: magiccode, + DataType: datatype, + Data: data} +} + +// Decode job from byte slice +func decodeJob(data []byte) (job *Job, err error) { + if len(data) < 12 { + return nil, common.Errorf("Invalid data: %V", data) + } + datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + return nil, common.Errorf("Invalid data: %V", data) + } + data = data[12:] + job = newJob(common.RES, datatype, data) + return +} + +// Encode a job to byte slice +func (job *Job) Encode() (data []byte) { + l := len(job.Data) + tl := l + if job.Handle != "" { + tl += len(job.Handle) + 1 + } + data = make([]byte, 0, tl + 12) + + magiccode := common.Uint32ToBytes(job.magicCode) + datatype := common.Uint32ToBytes(job.DataType) + datalength := common.Uint32ToBytes(uint32(tl)) + + data = append(data, magiccode[:]...) + data = append(data, datatype[:]...) + data = append(data, datalength[:]...) + if job.Handle != "" { + data = append(data, []byte(job.Handle)...) + data = append(data, 0) + } + data = append(data, job.Data...) + return +} + +// Send some datas to client. +// Using this in a job's executing. +func (job *Job) UpdateData(data []byte, iswaring bool) { + result := append([]byte(job.Handle), 0) + result = append(result, data...) + var datatype uint32 + if iswaring { + datatype = common.WORK_WARNING + } else { + datatype = common.WORK_DATA + } + job.agent.WriteJob(newJob(common.REQ, datatype, result)) +} + +// Update status. +// Tall client how many percent job has been executed. +func (job *Job) UpdateStatus(numerator, denominator int) { + n := []byte(strconv.Itoa(numerator)) + d := []byte(strconv.Itoa(denominator)) + result := append([]byte(job.Handle), 0) + result = append(result, n...) + result = append(result, d...) + job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) +} diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..667e16e --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,285 @@ +// Copyright 2011 Xing Xing All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + "bytes" + "bitbucket.org/mikespook/gearman-go/common" +) + +const ( + Unlimited = 0 + OneByOne = 1 +) +// Job handler +type JobHandler func(*Job) error + +type JobFunc func(job *Job) ([]byte, error) + +// The definition of the callback function. +type jobFunc struct { + f JobFunc + timeout uint32 +} + +// Map for added function. +type JobFuncs map[string]*jobFunc + +/* +Worker side api for gearman + +usage: +w = worker.New(worker.Unlimited) +w.AddFunction("foobar", foobar) +w.AddServer("127.0.0.1:4730") +w.Work() // Enter the worker's main loop + +The definition of the callback function 'foobar' should suit for the type 'JobFunction'. +It looks like this: + +func foobar(job *Job) (data []byte, err os.Error) { + //sth. here + //plaplapla... + return +} +*/ +type Worker struct { + agents []*agent + funcs JobFuncs + in chan *Job + running bool + limit chan bool + + Id string + // assign a ErrFunc to handle errors + ErrHandler common.ErrorHandler + JobHandler JobHandler +} + +// Get a new worker +func New(l int) (worker *Worker) { + worker = &Worker{ + agents: make([]*agent, 0), + funcs: make(JobFuncs), + in: make(chan *Job, common.QUEUE_SIZE), + } + if l != Unlimited { + worker.limit = make(chan bool, l) + for i := 0; i < l; i ++ { + worker.limit <- true + } + } + return +} + +// +func (worker *Worker)err(e error) { + if worker.ErrHandler != nil { + worker.ErrHandler(e) + } +} + +// Add a server. The addr should be 'host:port' format. +// The connection is established at this time. +func (worker *Worker) AddServer(addr string) (err error) { + // Create a new job server's client as a agent of server + server, err := newAgent(addr, worker) + if err != nil { + return err + } + worker.agents = append(worker.agents, server) + return +} + +// Write a job to job server. +// Here, the job's mean is not the oraginal mean. +// Just looks like a network package for job's result or tell job server, there was a fail. +func (worker *Worker) broadcast(job *Job) { + for _, v := range worker.agents { + v.WriteJob(job) + } +} + +// Add a function. +// Plz added job servers first, then functions. +// The API will tell every connected job server that 'I can do this' +func (worker *Worker) AddFunc(funcname string, +f JobFunc, timeout uint32) (err error) { + if _, ok := worker.funcs[funcname]; ok { + return common.Errorf("The function already exists: %s", funcname) + } + worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout} + + if worker.running { + worker.addFunc(funcname, timeout) + } + return +} + +// inner add function +func (worker *Worker) addFunc(funcname string, timeout uint32) { + var datatype uint32 + var data []byte + if timeout == 0 { + datatype = common.CAN_DO + data = []byte(funcname) + } else { + datatype = common.CAN_DO_TIMEOUT + data = []byte(funcname + "\x00") + t := common.Uint32ToBytes(timeout) + data = append(data, t[:]...) + } + job := newJob(common.REQ, datatype, data) + worker.broadcast(job) + +} + +// Remove a function. +// Tell job servers 'I can not do this now' at the same time. +func (worker *Worker) RemoveFunc(funcname string) (err error) { + if _, ok := worker.funcs[funcname]; !ok { + return common.Errorf("The function does not exist: %s", funcname) + } + delete(worker.funcs, funcname) + if worker.running { + worker.removeFunc(funcname) + } + return +} + +// inner remove function +func (worker *Worker) removeFunc(funcname string) { + job := newJob(common.REQ, common.CANT_DO, []byte(funcname)) + worker.broadcast(job) +} + +// Main loop +func (worker *Worker) Work() { + defer func() { + worker.running = false + }() + for funcname, f := range worker.funcs { + worker.addFunc(funcname, f.timeout) + } + worker.running = true + for _, v := range worker.agents { + go v.Work() + } + ok := true + for ok { + if job, ok := <-worker.in; ok { + switch job.DataType { + case common.ERROR: + go func() { + _, err := common.GetError(job.Data) + worker.err(err) + }() + case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: + go func() { + if err := worker.exec(job); err != nil { + worker.err(err) + } + }() + default: + go worker.handleJob(job) + } + } + } +} + +// job handler +func (worker *Worker) handleJob(job *Job) { + if worker.JobHandler != nil { + if err := worker.JobHandler(job); err != nil { + worker.err(err) + } + } +} + +// Close. +func (worker *Worker) Close() { + close(worker.in) + if worker.limit != nil { + close(worker.limit) + } +} + +// Send a something out, get the samething back. +func (worker *Worker) Echo(data []byte) { + job := newJob(common.REQ, common.ECHO_REQ, data) + worker.broadcast(job) +} + +// Remove all of functions. +// Both from the worker or job servers. +func (worker *Worker) Reset() { + job := newJob(common.REQ, common.RESET_ABILITIES, nil) + worker.broadcast(job) + worker.funcs = make(JobFuncs) +} + +// Set the worker's unique id. +func (worker *Worker) SetId(id string) { + worker.Id = id + job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id)) + worker.broadcast(job) +} + +// Execute the job. And send back the result. +func (worker *Worker) exec(job *Job) (err error) { + if worker.limit != nil { + <-worker.limit + defer func() { + worker.limit <- true + }() + } + var limit int + if job.DataType == common.JOB_ASSIGN { + limit = 3 + } else { + limit = 4 + } + jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit) + job.Handle = string(jobdata[0]) + funcname := string(jobdata[1]) + if job.DataType == common.JOB_ASSIGN { + job.Data = jobdata[2] + } else { + job.UniqueId = string(jobdata[2]) + job.Data = jobdata[3] + } + f, ok := worker.funcs[funcname] + if !ok { + return common.Errorf("The function does not exist: %s", funcname) + } + result, err := f.f(job) + var datatype uint32 + if err == nil { + datatype = common.WORK_COMPLETE + } else { + if result == nil { + datatype = common.WORK_FAIL + } else { + datatype = common.WORK_EXCEPTION + } + } + + job.magicCode = common.REQ + job.DataType = datatype + job.Data = result + job.agent.WriteJob(job) + return +} + +func (worker *Worker) removeAgent(a *agent) { + for k, v := range worker.agents { + if v == a { + worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...) + } + } + if len(worker.agents) == 0 { + worker.Close() + } +} diff --git a/gearman/worker/worker_test.go b/worker/worker_test.go similarity index 100% rename from gearman/worker/worker_test.go rename to worker/worker_test.go