From e0c1537e8ece7c608250ca33f35d1adaed30f450 Mon Sep 17 00:00:00 2001 From: mikespook Date: Mon, 26 Mar 2012 13:32:59 +0800 Subject: [PATCH] code refactoring, upgrade to GO1 RC2 --HG-- rename : README => README.md rename : src/gearman/client.go => gearman/client/client.go rename : src/gearman/client_test.go => gearman/client/client_test.go rename : src/gearman/clientjob.go => gearman/client/clientjob.go rename : src/gearman/gearman.go => gearman/gearman.go rename : src/gearman/jobagent.go => gearman/worker/jobagent.go rename : src/gearman/worker.go => gearman/worker/worker.go rename : src/gearman/worker_test.go => gearman/worker/worker_test.go rename : src/gearman/workerjob.go => gearman/worker/workerjob.go --- README | 25 -- README.md | 46 +++ README.zh | 24 -- example/Makefile | 21 -- example/client.go | 5 +- example/worker.go | 75 ++--- gearman.go | 15 + gearman/client/client.go | 278 +++++++++++++++++ .../gearman => gearman/client}/client_test.go | 6 +- gearman/client/clientjob.go | 98 ++++++ gearman/gearman.go | 123 ++++++++ gearman/worker/jobagent.go | 131 ++++++++ gearman/worker/worker.go | 257 ++++++++++++++++ gearman/worker/worker_test.go | 72 +++++ gearman/worker/workerjob.go | 87 ++++++ src/gearman/Makefile | 21 -- src/gearman/client.go | 279 ------------------ src/gearman/clientjob.go | 99 ------- src/gearman/gearman.go | 111 ------- src/gearman/jobagent.go | 133 --------- src/gearman/worker.go | 256 ---------------- src/gearman/worker_test.go | 71 ----- src/gearman/workerjob.go | 87 ------ 23 files changed, 1151 insertions(+), 1169 deletions(-) delete mode 100644 README create mode 100644 README.md delete mode 100644 README.zh delete mode 100644 example/Makefile create mode 100644 gearman.go create mode 100644 gearman/client/client.go rename {src/gearman => gearman/client}/client_test.go (79%) create mode 100644 gearman/client/clientjob.go create mode 100644 gearman/gearman.go create mode 100644 gearman/worker/jobagent.go create mode 100644 gearman/worker/worker.go create mode 100644 gearman/worker/worker_test.go create mode 100644 gearman/worker/workerjob.go delete mode 100644 src/gearman/Makefile delete mode 100644 src/gearman/client.go delete mode 100644 src/gearman/clientjob.go delete mode 100644 src/gearman/gearman.go delete mode 100644 src/gearman/jobagent.go delete mode 100644 src/gearman/worker.go delete mode 100644 src/gearman/worker_test.go delete mode 100644 src/gearman/workerjob.go diff --git a/README b/README deleted file mode 100644 index 970f54b..0000000 --- a/README +++ /dev/null @@ -1,25 +0,0 @@ -Gearman API for golang - -This module is Gearman API for golang. -It was implemented a native protocol for both worker and client API. - -- INSTALL - $ cd ./src/ - $ make install - -- SAMPLE OF USAGE - # example/worker.go - $ make worker - $ ./worker - - # example/client.go - $ make client - $ ./client - -- Code format - gofmt -spaces=true -tabwidth=4 -w=true -tabindent=false $(DIR) - ----- -xingxing -http://mikespook.com -http://twitter.com/mikespook diff --git a/README.md b/README.md new file mode 100644 index 0000000..6d5db19 --- /dev/null +++ b/README.md @@ -0,0 +1,46 @@ +# Gearman API for golang + +This module is Gearman API for golang. +It was implemented a native protocol for both worker and client API. + +Copyright 2012 Xing Xing All rights reserved. +Use of this source code is governed by a MIT license that can be found in the LICENSE file. + +# INSTALL + +This will install the client: + +> $ go get bitbucket.org/mikespook/gearman-go/gearman/client + +This will install the worker: + +> $ go get bitbucket.org/mikespook/gearman-go/gearman/worker + +This will install the client and the worker automatically: + +> $ go get bitbucket.org/mikespook/gearman-go + + +# SAMPLE OF USAGE + +## Worker + +> $ cd example +> $ go build worker +> $ ./worker + +## Client + +> $ cd example +> $ go build client +> $ ./client + +# Code format + +> $ gofmt -spaces=true -tabwidth=4 -w=true -tabindent=false $(DIR) + +# Contacts + +xingxing +http://mikespook.com +http://twitter.com/mikespook diff --git a/README.zh b/README.zh deleted file mode 100644 index 113d17a..0000000 --- a/README.zh +++ /dev/null @@ -1,24 +0,0 @@ -Go 语言的 Gearman API - -这是 Go 语言的 Gearman API。 -它使用 Go 语言原生实现了 Gearman 的 Worker 端和 Client 端 API 协议。 - -- 安装 - $ cd ./src - $ make install - -- 例程 - # example/worker.go - $ make worker - $ ./worker - - # example/client.go - $ make client - $ ./client - -- 代码格式 - gofmt -spaces=true -tabwidth=4 -w=true -tabindent=false $(DIR) ----- -xingxing -http://mikespook.com -http://twitter.com/mikespook diff --git a/example/Makefile b/example/Makefile deleted file mode 100644 index 6ef2081..0000000 --- a/example/Makefile +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2009 The Go Authors. All rights reserved. -# Use of this source code is governed by a BSD-style -# license that can be found in the LICENSE file. - -include $(GOROOT)/src/Make.inc - -TARG=gearman - -# GOFILES=\ -# worker.go\ - -CLEANFILES+= worker client - -include $(GOROOT)/src/Make.pkg - -%: %.go - $(GC) $*.go - $(LD) -o $@ $*.$O - -fmt: - gofmt -spaces=true -tabwidth=4 -w=true -tabindent=false ./ diff --git a/example/client.go b/example/client.go index 4bcb9d1..7ead4c5 100644 --- a/example/client.go +++ b/example/client.go @@ -1,12 +1,13 @@ package main import ( - "gearman" + "bitbucket.org/mikespook/gearman-go/gearman" + "bitbucket.org/mikespook/gearman-go/gearman/client" "log" ) func main() { - client := gearman.NewClient() + client := client.NewClient() defer client.Close() if err := client.AddServer("127.0.0.1:4730"); err != nil { log.Fatalln(err) diff --git a/example/worker.go b/example/worker.go index 7920a98..4ddd377 100644 --- a/example/worker.go +++ b/example/worker.go @@ -1,47 +1,48 @@ package main import ( - "fmt" - "gearman" - "log" - "strings" + "bitbucket.org/mikespook/gearman-go/gearman" + "bitbucket.org/mikespook/gearman-go/gearman/worker" + "fmt" + "log" + "strings" ) -func ToUpper(job *gearman.WorkerJob) ([]byte, error) { - data := []byte(strings.ToUpper(string(job.Data))) - return data, nil +func ToUpper(job *worker.WorkerJob) ([]byte, error) { + data := []byte(strings.ToUpper(string(job.Data))) + return data, nil } func main() { - worker := gearman.NewWorker() - worker.AddServer("127.0.0.1:4730") - worker.AddFunction("ToUpper", ToUpper, 0) - worker.AddFunction("ToUpperTimeOut5", ToUpper, 5) + w := worker.NewWorker() + defer w.Close() + w.AddServer("127.0.0.1:4730") + w.AddFunction("ToUpper", ToUpper, 0) + w.AddFunction("ToUpperTimeOut5", ToUpper, 5) - go func() { - log.Println("start worker") - for { - print("cmd: ") - var str string - fmt.Scan(&str) - switch str { - case "echo": - worker.Echo([]byte("Hello world!")) - var job *gearman.WorkerJob - for job = <-worker.JobQueue; job.DataType != gearman.ECHO_RES; job = <-worker.JobQueue { - log.Println(job) - } - log.Println(string(job.Data)) - case "quit": - worker.Close() - return - case "result": - job := <-worker.JobQueue - log.Println(string(job.Data)) - default: - log.Println("Unknown command") - } - } - }() - worker.Work() + go func() { + log.Println("start worker") + for { + print("cmd: ") + var str string + fmt.Scan(&str) + switch str { + case "echo": + w.Echo([]byte("Hello world!")) + var job *worker.WorkerJob + for job = <-w.JobQueue; job.DataType != gearman.ECHO_RES; job = <-w.JobQueue { + log.Println(job) + } + log.Println(string(job.Data)) + case "quit": + return + case "result": + job := <-w.JobQueue + log.Println(string(job.Data)) + default: + log.Println("Unknown command") + } + } + }() + w.Work() } diff --git a/gearman.go b/gearman.go new file mode 100644 index 0000000..067831f --- /dev/null +++ b/gearman.go @@ -0,0 +1,15 @@ +// Copyright 2012 Xing Xing All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +/* +This module is Gearman API for golang. +The protocol was implemented by native way. +*/ + +package gearman + +import ( + _ "bitbucket.org/mikespook/gearman-go/gearman/client" + _ "bitbucket.org/mikespook/gearman-go/gearman/worker" +) diff --git a/gearman/client/client.go b/gearman/client/client.go new file mode 100644 index 0000000..a5808fc --- /dev/null +++ b/gearman/client/client.go @@ -0,0 +1,278 @@ +// Copyright 2011 Xing Xing All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "bitbucket.org/mikespook/gearman-go/gearman" + "bytes" + "io" + "net" + "strconv" + "sync" +) + +/* +The client side api for gearman. + +usage: + client = NewClient() + client.AddServer("127.0.0.1:4730") + handle := client.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) + +*/ +type Client struct { + mutex sync.Mutex + conn net.Conn + JobQueue chan *ClientJob + incoming chan []byte + UId uint32 +} + +// Create a new client. +func NewClient() (client *Client) { + client = &Client{JobQueue: make(chan *ClientJob, gearman.QUEUE_CAP), + incoming: make(chan []byte, gearman.QUEUE_CAP), + UId: 1} + return +} + +// Add a server. +// In this version, one client connect to one job server. +// Sample is better. Plz do the load balancing by your self. +func (client *Client) AddServer(addr string) (err error) { + conn, err := net.Dial(gearman.TCP, addr) + if err != nil { + return + } + client.conn = conn + return +} + +// Internal read +func (client *Client) read() (data []byte, err error) { + if len(client.incoming) > 0 { + // incoming queue is not empty + data = <-client.incoming + } else { + // empty queue, read data from socket + for { + buf := make([]byte, gearman.BUFFER_SIZE) + var n int + if n, err = client.conn.Read(buf); err != nil { + if err == io.EOF && n == 0 { + break + } + return + } + data = append(data, buf[0:n]...) + if n < gearman.BUFFER_SIZE { + break + } + } + } + // split package + start, end := 0, 4 + tl := len(data) + for i := 0; i < tl; i++ { + if string(data[start:end]) == gearman.RES_STR { + l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + total := l + 12 + if total == tl { + return + } else { + client.incoming <- data[total:] + data = data[:total] + return + } + } else { + start++ + end++ + } + } + err = gearman.ErrInvalidData + return +} + +// Read a job from job server. +// This function will return the job, and add it to the job queue. +func (client *Client) ReadJob() (job *ClientJob, err error) { + var rel []byte + if rel, err = client.read(); err != nil { + return + } + if job, err = DecodeClientJob(rel); err != nil { + return + } else { + switch job.DataType { + case gearman.ERROR: + _, err = gearman.GetError(job.Data) + return + case gearman.WORK_DATA, gearman.WORK_WARNING, gearman.WORK_STATUS, gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION: + client.JobQueue <- job + } + } + return +} + +// Do the function. +// funcname is a string with function name. +// data is encoding to byte array. +// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, +// and if it is background job: JOB_BG. +// JOB_LOW | JOB_BG means the job is running with low level in background. +func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { + var datatype uint32 + if flag&gearman.JOB_LOW == gearman.JOB_LOW { + if flag&gearman.JOB_BG == gearman.JOB_BG { + datatype = gearman.SUBMIT_JOB_LOW_BG + } else { + datatype = gearman.SUBMIT_JOB_LOW + } + } else if flag&gearman.JOB_HIGH == gearman.JOB_HIGH { + if flag&gearman.JOB_BG == gearman.JOB_BG { + datatype = gearman.SUBMIT_JOB_HIGH_BG + } else { + datatype = gearman.SUBMIT_JOB_HIGH + } + } else if flag&gearman.JOB_BG == gearman.JOB_BG { + datatype = gearman.SUBMIT_JOB_BG + } else { + datatype = gearman.SUBMIT_JOB + } + + rel := make([]byte, 0, 1024*64) + rel = append(rel, []byte(funcname)...) + rel = append(rel, '\x00') + client.mutex.Lock() + uid := strconv.Itoa(int(client.UId)) + client.UId++ + rel = append(rel, []byte(uid)...) + client.mutex.Unlock() + rel = append(rel, '\x00') + rel = append(rel, data...) + if err = client.WriteJob(NewClientJob(gearman.REQ, datatype, rel)); err != nil { + return + } + var job *ClientJob + if job, err = client.readLastJob(gearman.JOB_CREATED); err != nil { + return + } + handle = string(job.Data) + go func() { + if flag&gearman.JOB_BG != gearman.JOB_BG { + for { + if job, err = client.ReadJob(); err != nil { + return + } + switch job.DataType { + case gearman.WORK_DATA, gearman.WORK_WARNING: + case gearman.WORK_STATUS: + case gearman.WORK_COMPLETE, gearman.WORK_FAIL, gearman.WORK_EXCEPTION: + return + } + } + } + }() + return +} + +// Internal read last job +func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err error) { + for { + if job, err = client.ReadJob(); err != nil { + return + } + if job.DataType == datatype { + break + } + } + if job.DataType != datatype { + err = gearman.ErrDataType + } + return +} + +// Get job status from job server. +// !!!Not fully tested.!!! +func (client *Client) Status(handle string) (known, running bool, numerator, denominator uint64, err error) { + + if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.GET_STATUS, []byte(handle))); err != nil { + return + } + var job *ClientJob + if job, err = client.readLastJob(gearman.STATUS_RES); err != nil { + return + } + data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) + if len(data) != 5 { + err = gearman.ErrInvalidData + return + } + if handle != string(data[0]) { + err = gearman.ErrInvalidData + return + } + known = data[1][0] == '1' + running = data[2][0] == '1' + if numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0); err != nil { + return + } + if denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0); err != nil { + return + } + return +} + +// Send a something out, get the samething back. +func (client *Client) Echo(data []byte) (echo []byte, err error) { + if err = client.WriteJob(NewClientJob(gearman.REQ, gearman.ECHO_REQ, data)); err != nil { + return + } + var job *ClientJob + if job, err = client.readLastJob(gearman.ECHO_RES); err != nil { + return + } + echo = job.Data + return +} + +// Get the last job. +// the job means a network package. +// Normally, it is the job executed result. +func (client *Client) LastJob() (job *ClientJob) { + if l := len(client.JobQueue); l != 1 { + if l == 0 { + return + } + for i := 0; i < l-1; i++ { + <-client.JobQueue + } + } + return <-client.JobQueue +} + +// Send the job to job server. +func (client *Client) WriteJob(job *ClientJob) (err error) { + return client.write(job.Encode()) +} + +// Internal write +func (client *Client) write(buf []byte) (err error) { + var n int + for i := 0; i < len(buf); i += n { + n, err = client.conn.Write(buf[i:]) + if err != nil { + return + } + } + return +} + +// Close. +func (client *Client) Close() (err error) { + err = client.conn.Close() + close(client.JobQueue) + return +} diff --git a/src/gearman/client_test.go b/gearman/client/client_test.go similarity index 79% rename from src/gearman/client_test.go rename to gearman/client/client_test.go index 9c9db84..f2ae43f 100644 --- a/src/gearman/client_test.go +++ b/gearman/client/client_test.go @@ -1,8 +1,8 @@ -package gearman +package client import ( + "bitbucket.org/mikespook/gearman-go/gearman" "testing" - // "os" ) var client *Client @@ -27,7 +27,7 @@ func TestClientEcho(t *testing.T) { } func TestClientDo(t *testing.T) { - if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { + if handle, err := client.Do("ToUpper", []byte("abcdef"), gearman.JOB_LOW|gearman.JOB_BG); err != nil { t.Error(err) } else { t.Log(handle) diff --git a/gearman/client/clientjob.go b/gearman/client/clientjob.go new file mode 100644 index 0000000..184b93a --- /dev/null +++ b/gearman/client/clientjob.go @@ -0,0 +1,98 @@ +// Copyright 2011 Xing Xing All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package client + +import ( + "bitbucket.org/mikespook/gearman-go/gearman" + "bytes" +) + +// Client side job +type ClientJob struct { + Data []byte + Handle, UniqueId string + magicCode, DataType uint32 +} + +// Create a new job +func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) { + return &ClientJob{magicCode: magiccode, + DataType: datatype, + Data: data} +} + +// Decode a job from byte slice +func DecodeClientJob(data []byte) (job *ClientJob, err error) { + if len(data) < 12 { + err = gearman.ErrInvalidData + return + } + datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + err = gearman.ErrInvalidData + return + } + data = data[12:] + job = NewClientJob(gearman.RES, datatype, data) + return +} + +// Encode a job to byte slice +func (job *ClientJob) Encode() (data []byte) { + magiccode := gearman.Uint32ToBytes(job.magicCode) + datatype := gearman.Uint32ToBytes(job.DataType) + data = make([]byte, 0, 1024*64) + data = append(data, magiccode[:]...) + data = append(data, datatype[:]...) + l := len(job.Data) + datalength := gearman.Uint32ToBytes(uint32(l)) + data = append(data, datalength[:]...) + data = append(data, job.Data...) + return +} + +// Extract the job's result. +func (job *ClientJob) Result() (data []byte, err error) { + switch job.DataType { + case gearman.WORK_FAIL: + job.Handle = string(job.Data) + err = gearman.ErrWorkFail + return + case gearman.WORK_EXCEPTION: + err = gearman.ErrWorkException + fallthrough + case gearman.WORK_COMPLETE: + s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + err = gearman.ErrInvalidData + return + } + job.Handle = string(s[0]) + data = s[1] + default: + err = gearman.ErrDataType + } + return +} + +// Extract the job's update +func (job *ClientJob) Update() (data []byte, err error) { + if job.DataType != gearman.WORK_DATA && job.DataType != gearman.WORK_WARNING { + err = gearman.ErrDataType + return + } + s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + err = gearman.ErrInvalidData + return + } + if job.DataType == gearman.WORK_WARNING { + err = gearman.ErrWorkWarning + } + job.Handle = string(s[0]) + data = s[1] + return +} diff --git a/gearman/gearman.go b/gearman/gearman.go new file mode 100644 index 0000000..636c5ff --- /dev/null +++ b/gearman/gearman.go @@ -0,0 +1,123 @@ +// Copyright 2011 Xing Xing All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +/* +This module is Gearman API for golang. +The protocol was implemented by native way. +*/ + +package gearman + +import ( + "bytes" + "errors" + "syscall" +) + +const ( + // tcp4 is tested. You can modify this to 'tcp' for both ipv4 and ipv6, + // or 'tcp6' only for ipv6. + TCP = "tcp4" + // the number limited for job servers. + WORKER_SERVER_CAP = 32 + // the number limited for functions. + WORKER_FUNCTION_CAP = 512 + // queue size + QUEUE_CAP = 512 + // read buffer size + BUFFER_SIZE = 1024 + + // \x00REQ + REQ = 5391697 + REQ_STR = "\x00REQ" + // \x00RES + RES = 5391699 + RES_STR = "\x00RES" + + // package data type + CAN_DO = 1 + CANT_DO = 2 + RESET_ABILITIES = 3 + PRE_SLEEP = 4 + NOOP = 6 + JOB_CREATED = 8 + GRAB_JOB = 9 + NO_JOB = 10 + JOB_ASSIGN = 11 + WORK_STATUS = 12 + WORK_COMPLETE = 13 + WORK_FAIL = 14 + GET_STATUS = 15 + ECHO_REQ = 16 + ECHO_RES = 17 + ERROR = 19 + STATUS_RES = 20 + SET_CLIENT_ID = 22 + CAN_DO_TIMEOUT = 23 + WORK_EXCEPTION = 25 + WORK_DATA = 28 + WORK_WARNING = 29 + GRAB_JOB_UNIQ = 30 + JOB_ASSIGN_UNIQ = 31 + + SUBMIT_JOB = 7 + SUBMIT_JOB_BG = 18 + SUBMIT_JOB_HIGH = 21 + SUBMIT_JOB_HIGH_BG = 32 + SUBMIT_JOB_LOW = 33 + SUBMIT_JOB_LOW_BG = 34 + + // Job type + // JOB_NORMAL | JOB_BG means a normal level job run in background + // normal level + JOB_NORMAL = 0 + // background job + JOB_BG = 1 + // low level + JOB_LOW = 2 + // high level + JOB_HIGH = 4 +) + +var ( + ErrIsNotErr = errors.New("The input is not a error data.") + ErrInvalidData = errors.New("Invalid data.") + ErrWorkWarning = errors.New("Work warning.") + ErrWorkFail = errors.New("Work fail.") + ErrWorkException = errors.New("Work exeption.") + ErrDataType = errors.New("Invalid data type.") + ErrOutOfCap = errors.New("Out of the capability.") + ErrNotConn = errors.New("Did not connect to job server.") + ErrFuncNotFound = errors.New("The function was not found.") +) + +// Extract the error message +func GetError(data []byte) (eno syscall.Errno, err error) { + rel := bytes.SplitN(data, []byte{'\x00'}, 2) + if len(rel) != 2 { + err = ErrIsNotErr + return + } + l := len(rel[0]) + eno = syscall.Errno(BytesToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) + err = errors.New(string(rel[1])) + return +} + +// Decode [4]byte to uint32 +func BytesToUint32(buf [4]byte) uint32 { + return uint32(buf[0])<<24 + + uint32(buf[1])<<16 + + uint32(buf[2])<<8 + + uint32(buf[3]) +} + +// Encode uint32 to [4]byte +func Uint32ToBytes(i uint32) (data [4]byte) { + data[0] = byte((i >> 24) & 0xff) + data[1] = byte((i >> 16) & 0xff) + data[2] = byte((i >> 8) & 0xff) + data[3] = byte(i & 0xff) + return +} diff --git a/gearman/worker/jobagent.go b/gearman/worker/jobagent.go new file mode 100644 index 0000000..118c1a5 --- /dev/null +++ b/gearman/worker/jobagent.go @@ -0,0 +1,131 @@ +// Copyright 2011 Xing Xing All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + "bitbucket.org/mikespook/gearman-go/gearman" + "io" + "net" +) + +// The agent of job server. +type jobAgent struct { + conn net.Conn + worker *Worker + running bool + incoming chan []byte +} + +// Create the agent of job server. +func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err error) { + conn, err := net.Dial(gearman.TCP, addr) + if err != nil { + return nil, err + } + jobagent = &jobAgent{conn: conn, worker: worker, running: true, incoming: make(chan []byte, gearman.QUEUE_CAP)} + return jobagent, err +} + +// Internal read +func (agent *jobAgent) read() (data []byte, err error) { + if len(agent.incoming) > 0 { + // incoming queue is not empty + data = <-agent.incoming + } else { + for { + buf := make([]byte, gearman.BUFFER_SIZE) + var n int + if n, err = agent.conn.Read(buf); err != nil { + if err == io.EOF && n == 0 { + err = nil + return + } + return + } + data = append(data, buf[0:n]...) + if n < gearman.BUFFER_SIZE { + break + } + } + } + // split package + start := 0 + tl := len(data) + for i := 0; i < tl; i++ { + if string(data[start:start+4]) == gearman.RES_STR { + l := int(gearman.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + total := l + 12 + if total == tl { + return + } else { + agent.incoming <- data[total:] + data = data[:total] + return + } + } else { + start++ + } + } + err = gearman.ErrInvalidData + return +} + +// Main loop. +func (agent *jobAgent) Work() { + noop := true + for agent.running { + // got noop msg and incoming queue is zero, grab job + if noop && len(agent.incoming) == 0 { + agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.GRAB_JOB, nil)) + } + rel, err := agent.read() + if err != nil { + agent.worker.ErrQueue <- err + continue + } + job, err := DecodeWorkerJob(rel) + if err != nil { + agent.worker.ErrQueue <- err + continue + } else { + switch job.DataType { + case gearman.NOOP: + noop = true + case gearman.NO_JOB: + noop = false + agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.PRE_SLEEP, nil)) + case gearman.ECHO_RES, gearman.JOB_ASSIGN_UNIQ, gearman.JOB_ASSIGN: + job.agent = agent + agent.worker.incoming <- job + } + } + } + return +} + +// Send a job to the job server. +func (agent *jobAgent) WriteJob(job *WorkerJob) (err error) { + return agent.write(job.Encode()) +} + +// Internal write the encoded job. +func (agent *jobAgent) write(buf []byte) (err error) { + var n int + for i := 0; i < len(buf); i += n { + n, err = agent.conn.Write(buf[i:]) + if err != nil { + return err + } + } + return +} + +// Close. +func (agent *jobAgent) Close() (err error) { + agent.running = false + close(agent.incoming) + err = agent.conn.Close() + return +} diff --git a/gearman/worker/worker.go b/gearman/worker/worker.go new file mode 100644 index 0000000..9e23c2c --- /dev/null +++ b/gearman/worker/worker.go @@ -0,0 +1,257 @@ +// Copyright 2011 Xing Xing All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + "bitbucket.org/mikespook/gearman-go/gearman" + "bytes" + "sync" +) + +// The definition of the callback function. +type JobFunction func(job *WorkerJob) ([]byte, error) + +// Map for added function. +type JobFunctionMap map[string]JobFunction + +/* +Worker side api for gearman. + +usage: + worker = NewWorker() + worker.AddFunction("foobar", foobar) + worker.AddServer("127.0.0.1:4730") + worker.Work() // Enter the worker's main loop + +The definition of the callback function 'foobar' should suit for the type 'JobFunction'. +It looks like this: + +func foobar(job *WorkerJob) (data []byte, err os.Error) { + //sth. here + //plaplapla... + return +} +*/ +type Worker struct { + clients []*jobAgent + functions JobFunctionMap + + running bool + incoming chan *WorkerJob + mutex sync.Mutex + JobQueue chan *WorkerJob + ErrQueue chan error +} + +// Get a new worker +func NewWorker() (worker *Worker) { + worker = &Worker{ + // job server list + clients: make([]*jobAgent, 0, gearman.WORKER_SERVER_CAP), + // function list + functions: make(JobFunctionMap), + incoming: make(chan *WorkerJob, gearman.QUEUE_CAP), + JobQueue: make(chan *WorkerJob, gearman.QUEUE_CAP), + ErrQueue: make(chan error, gearman.QUEUE_CAP), + running: true, + } + return +} + +// Add a server. The addr should be 'host:port' format. +// The connection is established at this time. +func (worker *Worker) AddServer(addr string) (err error) { + worker.mutex.Lock() + defer worker.mutex.Unlock() + + if len(worker.clients) == cap(worker.clients) { + return gearman.ErrOutOfCap + } + + // Create a new job server's client as a agent of server + server, err := newJobAgent(addr, worker) + if err != nil { + return err + } + + n := len(worker.clients) + worker.clients = worker.clients[0 : n+1] + worker.clients[n] = server + return +} + +// Add a function. +// Plz added job servers first, then functions. +// The API will tell every connected job server that 'I can do this' +func (worker *Worker) AddFunction(funcname string, + f JobFunction, timeout uint32) (err error) { + if len(worker.clients) < 1 { + return gearman.ErrNotConn + } + worker.mutex.Lock() + defer worker.mutex.Unlock() + worker.functions[funcname] = f + + var datatype uint32 + var data []byte + if timeout == 0 { + datatype = gearman.CAN_DO + data = []byte(funcname) + } else { + datatype = gearman.CAN_DO_TIMEOUT + data = []byte(funcname + "\x00") + t := gearman.Uint32ToBytes(timeout) + data = append(data, t[:]...) + } + job := NewWorkerJob(gearman.REQ, datatype, data) + worker.WriteJob(job) + return +} + +// Remove a function. +// Tell job servers 'I can not do this now' at the same time. +func (worker *Worker) RemoveFunction(funcname string) (err error) { + worker.mutex.Lock() + defer worker.mutex.Unlock() + + if worker.functions[funcname] == nil { + return gearman.ErrFuncNotFound + } + delete(worker.functions, funcname) + job := NewWorkerJob(gearman.REQ, gearman.CANT_DO, []byte(funcname)) + worker.WriteJob(job) + return +} + +// Main loop +func (worker *Worker) Work() { + for _, v := range worker.clients { + go v.Work() + } + for worker.running { + select { + case job := <-worker.incoming: + if job == nil { + break + } + switch job.DataType { + case gearman.NO_JOB: + // do nothing + case gearman.ERROR: + _, err := gearman.GetError(job.Data) + worker.ErrQueue <- err + case gearman.JOB_ASSIGN, gearman.JOB_ASSIGN_UNIQ: + go func() { + if err := worker.exec(job); err != nil { + worker.ErrQueue <- err + } + }() + default: + worker.JobQueue <- job + } + } + } +} + +// Get the last job in queue. +// If there are more than one job in the queue, +// the last one will be returned, +// the others will be lost. +func (worker *Worker) LastJob() (job *WorkerJob) { + if l := len(worker.JobQueue); l != 1 { + if l == 0 { + return + } + for i := 0; i < l-1; i++ { + <-worker.JobQueue + } + } + return <-worker.JobQueue +} + +// Close. +func (worker *Worker) Close() (err error) { + worker.running = false + for _, v := range worker.clients { + err = v.Close() + } + close(worker.incoming) + return err +} + +// Write a job to job server. +// Here, the job's mean is not the oraginal mean. +// Just looks like a network package for job's result or tell job server, there was a fail. +func (worker *Worker) WriteJob(job *WorkerJob) (err error) { + e := make(chan error) + for _, v := range worker.clients { + go func() { + e <- v.WriteJob(job) + }() + } + return <-e +} + +// Send a something out, get the samething back. +func (worker *Worker) Echo(data []byte) (err error) { + job := NewWorkerJob(gearman.REQ, gearman.ECHO_REQ, data) + return worker.WriteJob(job) +} + +// Remove all of functions. +// Both from the worker or job servers. +func (worker *Worker) Reset() (err error) { + job := NewWorkerJob(gearman.REQ, gearman.RESET_ABILITIES, nil) + err = worker.WriteJob(job) + worker.functions = make(JobFunctionMap) + return +} + +// Set the worker's unique id. +func (worker *Worker) SetId(id string) (err error) { + job := NewWorkerJob(gearman.REQ, gearman.SET_CLIENT_ID, []byte(id)) + return worker.WriteJob(job) +} + +// Execute the job. And send back the result. +func (worker *Worker) exec(job *WorkerJob) (err error) { + var limit int + if job.DataType == gearman.JOB_ASSIGN { + limit = 3 + } else { + limit = 4 + } + jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit) + job.Handle = string(jobdata[0]) + funcname := string(jobdata[1]) + if job.DataType == gearman.JOB_ASSIGN { + job.Data = jobdata[2] + } else { + job.UniqueId = string(jobdata[2]) + job.Data = jobdata[3] + } + f, ok := worker.functions[funcname] + if !ok { + return gearman.ErrFuncNotFound + } + result, err := f(job) + var datatype uint32 + if err == nil { + datatype = gearman.WORK_COMPLETE + } else { + if result == nil { + datatype = gearman.WORK_FAIL + } else { + datatype = gearman.WORK_EXCEPTION + } + } + + job.magicCode = gearman.REQ + job.DataType = datatype + job.Data = result + + worker.WriteJob(job) + return +} diff --git a/gearman/worker/worker_test.go b/gearman/worker/worker_test.go new file mode 100644 index 0000000..0f3fdf7 --- /dev/null +++ b/gearman/worker/worker_test.go @@ -0,0 +1,72 @@ +package worker + +import "testing" + +var worker *Worker + +func init() { + worker = NewWorker() +} + +func TestWorkerAddServer(t *testing.T) { + t.Log("Add local server 127.0.0.1:4730.") + if err := worker.AddServer("127.0.0.1:4730"); err != nil { + t.Error(err) + } + + if l := len(worker.clients); l != 1 { + t.Log(worker.clients) + t.Error("The length of server list should be 1.") + } +} + +func foobar(job *WorkerJob) ([]byte, error) { + return nil, nil +} + +func TestWorkerAddFunction(t *testing.T) { + if err := worker.AddFunction("foobar", foobar, 0); err != nil { + t.Error(err) + } + if err := worker.AddFunction("timeout", foobar, 5); err != nil { + t.Error(err) + } + if l := len(worker.functions); l != 2 { + t.Log(worker.functions) + t.Errorf("The length of function map should be %d.", 2) + } +} + +func TestWorkerEcho(t *testing.T) { + if err := worker.Echo([]byte("Hello World")); err != nil { + t.Error(err) + } +} + +/* +func TestWorkerResult(t *testing.T) { + if job := worker.LastResult(); job == nil { + t.Error("Nothing in result.") + } else { + t.Log(job) + } +} +*/ + +func TestWorkerRemoveFunction(t *testing.T) { + if err := worker.RemoveFunction("foobar"); err != nil { + t.Error(err) + } +} + +func TestWorkerReset(t *testing.T) { + if err := worker.Reset(); err != nil { + t.Error(err) + } +} + +func TestWorkerClose(t *testing.T) { + if err := worker.Close(); err != nil { + t.Error(err) + } +} diff --git a/gearman/worker/workerjob.go b/gearman/worker/workerjob.go new file mode 100644 index 0000000..f4ab029 --- /dev/null +++ b/gearman/worker/workerjob.go @@ -0,0 +1,87 @@ +// Copyright 2011 Xing Xing All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package worker + +import ( + "bitbucket.org/mikespook/gearman-go/gearman" + "strconv" +) + +// Worker side job +type WorkerJob struct { + Data []byte + Handle, UniqueId string + agent *jobAgent + magicCode, DataType uint32 +} + +// Create a new job +func NewWorkerJob(magiccode, datatype uint32, data []byte) (job *WorkerJob) { + return &WorkerJob{magicCode: magiccode, + DataType: datatype, + Data: data} +} + +// Decode job from byte slice +func DecodeWorkerJob(data []byte) (job *WorkerJob, err error) { + if len(data) < 12 { + err = gearman.ErrInvalidData + return + } + datatype := gearman.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := gearman.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + err = gearman.ErrInvalidData + return + } + data = data[12:] + job = NewWorkerJob(gearman.RES, datatype, data) + return +} + +// Encode a job to byte slice +func (job *WorkerJob) Encode() (data []byte) { + magiccode := gearman.Uint32ToBytes(job.magicCode) + datatype := gearman.Uint32ToBytes(job.DataType) + data = make([]byte, 0, 1024*64) + data = append(data, magiccode[:]...) + data = append(data, datatype[:]...) + data = append(data, []byte{0, 0, 0, 0}...) + l := len(job.Data) + if job.Handle != "" { + data = append(data, []byte(job.Handle)...) + data = append(data, 0) + l += len(job.Handle) + 1 + } + data = append(data, job.Data...) + datalength := gearman.Uint32ToBytes(uint32(l)) + copy(data[8:12], datalength[:]) + return +} + +// Send some datas to client. +// Using this in a job's executing. +func (job *WorkerJob) UpdateData(data []byte, iswaring bool) (err error) { + result := append([]byte(job.Handle), 0) + result = append(result, data...) + var datatype uint32 + if iswaring { + datatype = gearman.WORK_WARNING + } else { + datatype = gearman.WORK_DATA + } + return job.agent.WriteJob(NewWorkerJob(gearman.REQ, datatype, result)) +} + +// Update status. +// Tall client how many percent job has been executed. +func (job *WorkerJob) UpdateStatus(numerator, denominator int) (err error) { + n := []byte(strconv.Itoa(numerator)) + d := []byte(strconv.Itoa(denominator)) + result := append([]byte(job.Handle), 0) + result = append(result, n...) + result = append(result, d...) + return job.agent.WriteJob(NewWorkerJob(gearman.REQ, gearman.WORK_STATUS, result)) +} diff --git a/src/gearman/Makefile b/src/gearman/Makefile deleted file mode 100644 index cbead97..0000000 --- a/src/gearman/Makefile +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2009 The Go Authors. All rights reserved. -# Use of this source code is governed by a BSD-style -# license that can be found in the LICENSE file. - -include $(GOROOT)/src/Make.inc - -TARG=gearman -GOFILES=\ - gearman.go\ - workerjob.go\ - jobagent.go\ - worker.go\ - clientjob.go\ - client.go\ - -CLEANFILES+=gearman_test - -include $(GOROOT)/src/Make.pkg - -fmt: - gofmt -spaces=true -tabwidth=4 -w=true -tabindent=false ./ diff --git a/src/gearman/client.go b/src/gearman/client.go deleted file mode 100644 index e4b75c2..0000000 --- a/src/gearman/client.go +++ /dev/null @@ -1,279 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package gearman - -import ( - "errors" - "io" - "net" - "sync" - // "log" - "bytes" - "strconv" -) - -/* -The client side api for gearman. - -usage: - client = NewClient() - client.AddServer("127.0.0.1:4730") - handle := client.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) - -*/ -type Client struct { - mutex sync.Mutex - conn net.Conn - JobQueue chan *ClientJob - incoming chan []byte - UId uint32 -} - -// Create a new client. -func NewClient() (client *Client) { - client = &Client{JobQueue: make(chan *ClientJob, QUEUE_CAP), - incoming: make(chan []byte, QUEUE_CAP), - UId: 1} - return -} - -// Add a server. -// In this version, one client connect to one job server. -// Sample is better. Plz do the load balancing by your self. -func (client *Client) AddServer(addr string) (err error) { - conn, err := net.Dial(TCP, addr) - if err != nil { - return - } - client.conn = conn - return -} - -// Internal read -func (client *Client) read() (data []byte, err error) { - if len(client.incoming) > 0 { - // incoming queue is not empty - data = <-client.incoming - } else { - // empty queue, read data from socket - for { - buf := make([]byte, BUFFER_SIZE) - var n int - if n, err = client.conn.Read(buf); err != nil { - if err == io.EOF && n == 0 { - break - } - return - } - data = append(data, buf[0:n]...) - if n < BUFFER_SIZE { - break - } - } - } - // split package - start, end := 0, 4 - tl := len(data) - for i := 0; i < tl; i++ { - if string(data[start:end]) == RES_STR { - l := int(byteToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) - total := l + 12 - if total == tl { - return - } else { - client.incoming <- data[total:] - data = data[:total] - return - } - } else { - start++ - end++ - } - } - err = errors.New("Invalid data struct.") - return -} - -// Read a job from job server. -// This function will return the job, and add it to the job queue. -func (client *Client) ReadJob() (job *ClientJob, err error) { - var rel []byte - if rel, err = client.read(); err != nil { - return - } - if job, err = DecodeClientJob(rel); err != nil { - return - } else { - switch job.DataType { - case ERROR: - _, err = getError(job.Data) - return - case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: - client.JobQueue <- job - } - } - return -} - -// Do the function. -// funcname is a string with function name. -// data is encoding to byte array. -// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, -// and if it is background job: JOB_BG. -// JOB_LOW | JOB_BG means the job is running with low level in background. -func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { - var datatype uint32 - if flag&JOB_LOW == JOB_LOW { - if flag&JOB_BG == JOB_BG { - datatype = SUBMIT_JOB_LOW_BG - } else { - datatype = SUBMIT_JOB_LOW - } - } else if flag&JOB_HIGH == JOB_HIGH { - if flag&JOB_BG == JOB_BG { - datatype = SUBMIT_JOB_HIGH_BG - } else { - datatype = SUBMIT_JOB_HIGH - } - } else if flag&JOB_BG == JOB_BG { - datatype = SUBMIT_JOB_BG - } else { - datatype = SUBMIT_JOB - } - - rel := make([]byte, 0, 1024*64) - rel = append(rel, []byte(funcname)...) - rel = append(rel, '\x00') - client.mutex.Lock() - uid := strconv.Itoa(int(client.UId)) - client.UId++ - rel = append(rel, []byte(uid)...) - client.mutex.Unlock() - rel = append(rel, '\x00') - rel = append(rel, data...) - if err = client.WriteJob(NewClientJob(REQ, datatype, rel)); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(JOB_CREATED); err != nil { - return - } - handle = string(job.Data) - go func() { - if flag&JOB_BG != JOB_BG { - for { - if job, err = client.ReadJob(); err != nil { - return - } - switch job.DataType { - case WORK_DATA, WORK_WARNING: - case WORK_STATUS: - case WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: - return - } - } - } - }() - return -} - -// Internal read last job -func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err error) { - for { - if job, err = client.ReadJob(); err != nil { - return - } - if job.DataType == datatype { - break - } - } - if job.DataType != datatype { - err = errors.New("No job got.") - } - return -} - -// Get job status from job server. -// !!!Not fully tested.!!! -func (client *Client) Status(handle string) (known, running bool, numerator, denominator uint64, err error) { - - if err = client.WriteJob(NewClientJob(REQ, GET_STATUS, []byte(handle))); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(STATUS_RES); err != nil { - return - } - data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) - if len(data) != 5 { - err = errors.New("Data Error.") - return - } - if handle != string(data[0]) { - err = errors.New("Invalid handle.") - return - } - known = data[1][0] == '1' - running = data[2][0] == '1' - if numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0); err != nil { - return - } - if denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0); err != nil { - return - } - return -} - -// Send a something out, get the samething back. -func (client *Client) Echo(data []byte) (echo []byte, err error) { - if err = client.WriteJob(NewClientJob(REQ, ECHO_REQ, data)); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(ECHO_RES); err != nil { - return - } - echo = job.Data - return -} - -// Get the last job. -// the job means a network package. -// Normally, it is the job executed result. -func (client *Client) LastJob() (job *ClientJob) { - if l := len(client.JobQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l-1; i++ { - <-client.JobQueue - } - } - return <-client.JobQueue -} - -// Send the job to job server. -func (client *Client) WriteJob(job *ClientJob) (err error) { - return client.write(job.Encode()) -} - -// Internal write -func (client *Client) write(buf []byte) (err error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = client.conn.Write(buf[i:]) - if err != nil { - return - } - } - return -} - -// Close. -func (client *Client) Close() (err error) { - err = client.conn.Close() - close(client.JobQueue) - return -} diff --git a/src/gearman/clientjob.go b/src/gearman/clientjob.go deleted file mode 100644 index 6bb1350..0000000 --- a/src/gearman/clientjob.go +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package gearman - -import ( - "bytes" - "errors" - // "log" -) - -// Client side job -type ClientJob struct { - Data []byte - Handle, UniqueId string - magicCode, DataType uint32 -} - -// Create a new job -func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) { - return &ClientJob{magicCode: magiccode, - DataType: datatype, - Data: data} -} - -// Decode a job from byte slice -func DecodeClientJob(data []byte) (job *ClientJob, err error) { - if len(data) < 12 { - err = errors.New("Data length is too small.") - return - } - datatype := byteToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := byteToUint32([4]byte{data[8], data[9], data[10], data[11]}) - if len(data[12:]) != int(l) { - err = errors.New("Invalid data length.") - return - } - data = data[12:] - job = NewClientJob(RES, datatype, data) - return -} - -// Encode a job to byte slice -func (job *ClientJob) Encode() (data []byte) { - magiccode := uint32ToByte(job.magicCode) - datatype := uint32ToByte(job.DataType) - data = make([]byte, 0, 1024*64) - data = append(data, magiccode[:]...) - data = append(data, datatype[:]...) - l := len(job.Data) - datalength := uint32ToByte(uint32(l)) - data = append(data, datalength[:]...) - data = append(data, job.Data...) - return -} - -// Extract the job's result. -func (job *ClientJob) Result() (data []byte, err error) { - switch job.DataType { - case WORK_FAIL: - job.Handle = string(job.Data) - err = errors.New("Work fail.") - return - case WORK_EXCEPTION: - err = errors.New("Work exception.") - fallthrough - case WORK_COMPLETE: - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = errors.New("Invalid data.") - return - } - job.Handle = string(s[0]) - data = s[1] - default: - err = errors.New("The job is not a result.") - } - return -} - -// Extract the job's update -func (job *ClientJob) Update() (data []byte, err error) { - if job.DataType != WORK_DATA && job.DataType != WORK_WARNING { - err = errors.New("The job is not a update.") - return - } - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = errors.New("Invalid data.") - return - } - if job.DataType == WORK_WARNING { - err = errors.New("Work warning") - } - job.Handle = string(s[0]) - data = s[1] - return -} diff --git a/src/gearman/gearman.go b/src/gearman/gearman.go deleted file mode 100644 index 6fc1fac..0000000 --- a/src/gearman/gearman.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -/* -This module is Gearman API for golang. -The protocol was implemented by native way. -*/ - -package gearman - -import ( - "bytes" - "errors" - "syscall" -) - -const ( - // tcp4 is tested. You can modify this to 'tcp' for both ipv4 and ipv6, - // or 'tcp6' only for ipv6. - TCP = "tcp4" - // the number limited for job servers. - WORKER_SERVER_CAP = 32 - // the number limited for functions. - WORKER_FUNCTION_CAP = 512 - // queue size - QUEUE_CAP = 512 - // read buffer size - BUFFER_SIZE = 1024 - - // \x00REQ - REQ = 5391697 - REQ_STR = "\x00REQ" - // \x00RES - RES = 5391699 - RES_STR = "\x00RES" - - // package data type - CAN_DO = 1 - CANT_DO = 2 - RESET_ABILITIES = 3 - PRE_SLEEP = 4 - NOOP = 6 - JOB_CREATED = 8 - GRAB_JOB = 9 - NO_JOB = 10 - JOB_ASSIGN = 11 - WORK_STATUS = 12 - WORK_COMPLETE = 13 - WORK_FAIL = 14 - GET_STATUS = 15 - ECHO_REQ = 16 - ECHO_RES = 17 - ERROR = 19 - STATUS_RES = 20 - SET_CLIENT_ID = 22 - CAN_DO_TIMEOUT = 23 - WORK_EXCEPTION = 25 - WORK_DATA = 28 - WORK_WARNING = 29 - GRAB_JOB_UNIQ = 30 - JOB_ASSIGN_UNIQ = 31 - - SUBMIT_JOB = 7 - SUBMIT_JOB_BG = 18 - SUBMIT_JOB_HIGH = 21 - SUBMIT_JOB_HIGH_BG = 32 - SUBMIT_JOB_LOW = 33 - SUBMIT_JOB_LOW_BG = 34 - - // Job type - // JOB_NORMAL | JOB_BG means a normal level job run in background - // normal level - JOB_NORMAL = 0 - // background job - JOB_BG = 1 - // low level - JOB_LOW = 2 - // high level - JOB_HIGH = 4 -) - -// Extract the error message -func getError(data []byte) (eno syscall.Errno, err error) { - rel := bytes.SplitN(data, []byte{'\x00'}, 2) - if len(rel) != 2 { - err = errors.New("The input is not a error data.") - return - } - l := len(rel[0]) - eno = syscall.Errno(byteToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) - err = errors.New(string(rel[1])) - return -} - -// Decode [4]byte to uint32 -func byteToUint32(buf [4]byte) uint32 { - return uint32(buf[0])<<24 + - uint32(buf[1])<<16 + - uint32(buf[2])<<8 + - uint32(buf[3]) -} - -// Encode uint32 to [4]byte -func uint32ToByte(i uint32) (data [4]byte) { - data[0] = byte((i >> 24) & 0xff) - data[1] = byte((i >> 16) & 0xff) - data[2] = byte((i >> 8) & 0xff) - data[3] = byte(i & 0xff) - return -} diff --git a/src/gearman/jobagent.go b/src/gearman/jobagent.go deleted file mode 100644 index 31d726b..0000000 --- a/src/gearman/jobagent.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package gearman - -import ( - "errors" - "io" - "net" - - // "log" -) - -// The agent of job server. -type jobAgent struct { - conn net.Conn - worker *Worker - running bool - incoming chan []byte -} - -// Create the agent of job server. -func newJobAgent(addr string, worker *Worker) (jobagent *jobAgent, err error) { - conn, err := net.Dial(TCP, addr) - if err != nil { - return nil, err - } - jobagent = &jobAgent{conn: conn, worker: worker, running: true, incoming: make(chan []byte, QUEUE_CAP)} - return jobagent, err -} - -// Internal read -func (agent *jobAgent) read() (data []byte, err error) { - if len(agent.incoming) > 0 { - // incoming queue is not empty - data = <-agent.incoming - } else { - for { - buf := make([]byte, BUFFER_SIZE) - var n int - if n, err = agent.conn.Read(buf); err != nil { - if err == io.EOF && n == 0 { - err = nil - return - } - return - } - data = append(data, buf[0:n]...) - if n < BUFFER_SIZE { - break - } - } - } - // split package - start := 0 - tl := len(data) - for i := 0; i < tl; i++ { - if string(data[start:start+4]) == RES_STR { - l := int(byteToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) - total := l + 12 - if total == tl { - return - } else { - agent.incoming <- data[total:] - data = data[:total] - return - } - } else { - start++ - } - } - err = errors.New("Invalid data struct.") - return -} - -// Main loop. -func (agent *jobAgent) Work() { - noop := true - for agent.running { - // got noop msg and incoming queue is zero, grab job - if noop && len(agent.incoming) == 0 { - agent.WriteJob(NewWorkerJob(REQ, GRAB_JOB, nil)) - } - rel, err := agent.read() - if err != nil { - agent.worker.ErrQueue <- err - continue - } - job, err := DecodeWorkerJob(rel) - if err != nil { - agent.worker.ErrQueue <- err - continue - } else { - switch job.DataType { - case NOOP: - noop = true - case NO_JOB: - noop = false - agent.WriteJob(NewWorkerJob(REQ, PRE_SLEEP, nil)) - case ECHO_RES, JOB_ASSIGN_UNIQ, JOB_ASSIGN: - job.agent = agent - agent.worker.incoming <- job - } - } - } - return -} - -// Send a job to the job server. -func (agent *jobAgent) WriteJob(job *WorkerJob) (err error) { - return agent.write(job.Encode()) -} - -// Internal write the encoded job. -func (agent *jobAgent) write(buf []byte) (err error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = agent.conn.Write(buf[i:]) - if err != nil { - return err - } - } - return -} - -// Close. -func (agent *jobAgent) Close() (err error) { - agent.running = false - close(agent.incoming) - err = agent.conn.Close() - return -} diff --git a/src/gearman/worker.go b/src/gearman/worker.go deleted file mode 100644 index 1a0bc5e..0000000 --- a/src/gearman/worker.go +++ /dev/null @@ -1,256 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package gearman - -import ( - "bytes" - "errors" - "sync" -) - -// The definition of the callback function. -type JobFunction func(job *WorkerJob) ([]byte, error) -// Map for added function. -type JobFunctionMap map[string]JobFunction - -/* -Worker side api for gearman. - -usage: - worker = NewWorker() - worker.AddFunction("foobar", foobar) - worker.AddServer("127.0.0.1:4730") - worker.Work() // Enter the worker's main loop - -The definition of the callback function 'foobar' should suit for the type 'JobFunction'. -It looks like this: - -func foobar(job *WorkerJob) (data []byte, err os.Error) { - //sth. here - //plaplapla... - return -} -*/ -type Worker struct { - clients []*jobAgent - functions JobFunctionMap - - running bool - incoming chan *WorkerJob - mutex sync.Mutex - JobQueue chan *WorkerJob - ErrQueue chan error -} - -// Get a new worker -func NewWorker() (worker *Worker) { - worker = &Worker{ - // job server list - clients: make([]*jobAgent, 0, WORKER_SERVER_CAP), - // function list - functions: make(JobFunctionMap), - incoming: make(chan *WorkerJob, QUEUE_CAP), - JobQueue: make(chan *WorkerJob, QUEUE_CAP), - ErrQueue: make(chan error, QUEUE_CAP), - running: true, - } - return -} - -// Add a server. The addr should be 'host:port' format. -// The connection is established at this time. -func (worker *Worker) AddServer(addr string) (err error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() - - if len(worker.clients) == cap(worker.clients) { - return errors.New("To many servers added.") - } - - // Create a new job server's client as a agent of server - server, err := newJobAgent(addr, worker) - if err != nil { - return err - } - - n := len(worker.clients) - worker.clients = worker.clients[0 : n+1] - worker.clients[n] = server - return -} - -// Add a function. -// Plz added job servers first, then functions. -// The API will tell every connected job server that 'I can do this' -func (worker *Worker) AddFunction(funcname string, - f JobFunction, timeout uint32) (err error) { - if len(worker.clients) < 1 { - return errors.New("Did not connect to Job Server.") - } - worker.mutex.Lock() - defer worker.mutex.Unlock() - worker.functions[funcname] = f - - var datatype uint32 - var data []byte - if timeout == 0 { - datatype = CAN_DO - data = []byte(funcname) - } else { - datatype = CAN_DO_TIMEOUT - data = []byte(funcname + "\x00") - t := uint32ToByte(timeout) - data = append(data, t[:]...) - } - job := NewWorkerJob(REQ, datatype, data) - worker.WriteJob(job) - return -} - -// Remove a function. -// Tell job servers 'I can not do this now' at the same time. -func (worker *Worker) RemoveFunction(funcname string) (err error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() - - if worker.functions[funcname] == nil { - return errors.New("No function named: " + funcname) - } - delete(worker.functions, funcname) - job := NewWorkerJob(REQ, CANT_DO, []byte(funcname)) - worker.WriteJob(job) - return -} - -// Main loop -func (worker *Worker) Work() { - for _, v := range worker.clients { - go v.Work() - } - for worker.running { - select { - case job := <-worker.incoming: - if job == nil { - break - } - switch job.DataType { - case NO_JOB: - // do nothing - case ERROR: - _, err := getError(job.Data) - worker.ErrQueue <- err - case JOB_ASSIGN, JOB_ASSIGN_UNIQ: - go func() { - if err := worker.exec(job); err != nil { - worker.ErrQueue <- err - } - }() - default: - worker.JobQueue <- job - } - } - } -} - -// Get the last job in queue. -// If there are more than one job in the queue, -// the last one will be returned, -// the others will be lost. -func (worker *Worker) LastJob() (job *WorkerJob) { - if l := len(worker.JobQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l-1; i++ { - <-worker.JobQueue - } - } - return <-worker.JobQueue -} - -// Close. -func (worker *Worker) Close() (err error) { - worker.running = false - for _, v := range worker.clients { - err = v.Close() - } - close(worker.incoming) - return err -} - -// Write a job to job server. -// Here, the job's mean is not the oraginal mean. -// Just looks like a network package for job's result or tell job server, there was a fail. -func (worker *Worker) WriteJob(job *WorkerJob) (err error) { - e := make(chan error) - for _, v := range worker.clients { - go func() { - e <- v.WriteJob(job) - }() - } - return <-e -} - -// Send a something out, get the samething back. -func (worker *Worker) Echo(data []byte) (err error) { - job := NewWorkerJob(REQ, ECHO_REQ, data) - return worker.WriteJob(job) -} - -// Remove all of functions. -// Both from the worker or job servers. -func (worker *Worker) Reset() (err error) { - job := NewWorkerJob(REQ, RESET_ABILITIES, nil) - err = worker.WriteJob(job) - worker.functions = make(JobFunctionMap) - return -} - -// Set the worker's unique id. -func (worker *Worker) SetId(id string) (err error) { - job := NewWorkerJob(REQ, SET_CLIENT_ID, []byte(id)) - return worker.WriteJob(job) -} - -// Execute the job. And send back the result. -func (worker *Worker) exec(job *WorkerJob) (err error) { - var limit int - if job.DataType == JOB_ASSIGN { - limit = 3 - } else { - limit = 4 - } - jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit) - job.Handle = string(jobdata[0]) - funcname := string(jobdata[1]) - if job.DataType == JOB_ASSIGN { - job.Data = jobdata[2] - } else { - job.UniqueId = string(jobdata[2]) - job.Data = jobdata[3] - } - f := worker.functions[funcname] - if f == nil { - return errors.New("function is nil") - } - result, err := f(job) - var datatype uint32 - if err == nil { - datatype = WORK_COMPLETE - } else { - if result == nil { - datatype = WORK_FAIL - } else { - datatype = WORK_EXCEPTION - } - } - - job.magicCode = REQ - job.DataType = datatype - job.Data = result - - worker.WriteJob(job) - return -} diff --git a/src/gearman/worker_test.go b/src/gearman/worker_test.go deleted file mode 100644 index 8492c8b..0000000 --- a/src/gearman/worker_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package gearman - -import "testing" - -var worker *Worker - -func init() { - worker = NewWorker() -} - -func TestWorkerAddServer(t *testing.T) { - t.Log("Add local server 127.0.0.1:4730.") - if err := worker.AddServer("127.0.0.1:4730"); err != nil { - t.Error(err) - } - - if l := len(worker.clients); l != 1 { - t.Log(worker.clients) - t.Error("The length of server list should be 1.") - } -} - -func foobar(job *WorkerJob) ([]byte, error) { - return nil, nil -} - -func TestWorkerAddFunction(t *testing.T) { - if err := worker.AddFunction("foobar", foobar, 0); err != nil { - t.Error(err) - } - if err := worker.AddFunction("timeout", foobar, 5); err != nil { - t.Error(err) - } - if l := len(worker.functions); l != 2 { - t.Log(worker.functions) - t.Errorf("The length of function map should be %d.", 2) - } -} - -func TestWorkerEcho(t *testing.T) { - if err := worker.Echo([]byte("Hello World")); err != nil { - t.Error(err) - } -} -/* -func TestWorkerResult(t *testing.T) { - if job := worker.LastResult(); job == nil { - t.Error("Nothing in result.") - } else { - t.Log(job) - } -} -*/ - -func TestWorkerRemoveFunction(t *testing.T) { - if err := worker.RemoveFunction("foobar"); err != nil { - t.Error(err) - } -} - -func TestWorkerReset(t *testing.T) { - if err := worker.Reset(); err != nil { - t.Error(err) - } -} - -func TestWorkerClose(t *testing.T) { - if err := worker.Close(); err != nil { - t.Error(err) - } -} diff --git a/src/gearman/workerjob.go b/src/gearman/workerjob.go deleted file mode 100644 index 417c2af..0000000 --- a/src/gearman/workerjob.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package gearman - -import ( - "errors" - "strconv" -) - -// Worker side job -type WorkerJob struct { - Data []byte - Handle, UniqueId string - agent *jobAgent - magicCode, DataType uint32 -} - -// Create a new job -func NewWorkerJob(magiccode, datatype uint32, data []byte) (job *WorkerJob) { - return &WorkerJob{magicCode: magiccode, - DataType: datatype, - Data: data} -} - -// Decode job from byte slice -func DecodeWorkerJob(data []byte) (job *WorkerJob, err error) { - if len(data) < 12 { - err = errors.New("Data length is too small.") - return - } - datatype := byteToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := byteToUint32([4]byte{data[8], data[9], data[10], data[11]}) - if len(data[12:]) != int(l) { - err = errors.New("Invalid data length.") - return - } - data = data[12:] - job = NewWorkerJob(RES, datatype, data) - return -} - -// Encode a job to byte slice -func (job *WorkerJob) Encode() (data []byte) { - magiccode := uint32ToByte(job.magicCode) - datatype := uint32ToByte(job.DataType) - data = make([]byte, 0, 1024*64) - data = append(data, magiccode[:]...) - data = append(data, datatype[:]...) - data = append(data, []byte{0, 0, 0, 0}...) - l := len(job.Data) - if job.Handle != "" { - data = append(data, []byte(job.Handle)...) - data = append(data, 0) - l += len(job.Handle) + 1 - } - data = append(data, job.Data...) - datalength := uint32ToByte(uint32(l)) - copy(data[8:12], datalength[:]) - return -} - -// Send some datas to client. -// Using this in a job's executing. -func (job *WorkerJob) UpdateData(data []byte, iswaring bool) (err error) { - result := append([]byte(job.Handle), 0) - result = append(result, data...) - var datatype uint32 - if iswaring { - datatype = WORK_WARNING - } else { - datatype = WORK_DATA - } - return job.agent.WriteJob(NewWorkerJob(REQ, datatype, result)) -} - -// Update status. -// Tall client how many percent job has been executed. -func (job *WorkerJob) UpdateStatus(numerator, denominator int) (err error) { - n := []byte(strconv.Itoa(numerator)) - d := []byte(strconv.Itoa(denominator)) - result := append([]byte(job.Handle), 0) - result = append(result, n...) - result = append(result, d...) - return job.agent.WriteJob(NewWorkerJob(REQ, WORK_STATUS, result)) -}