diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8dd15b7 --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright (C) 2011 by Xing Xing + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README b/README new file mode 100644 index 0000000..fdea0a1 --- /dev/null +++ b/README @@ -0,0 +1,23 @@ +Gearman API for golang + +This module is Gearman API for golang. +It was implemented a native protocol for both worker and client API. + +- INSTALL + $ cd ./src/pkg/gearman/ + $ make install + +- SAMPLE OF USAGE + # example/worker.go + $ make worker + $ ./worker + + # example/client.go + $ make client + $ ./client + +---- +xingxing +http://mikespook.com + + diff --git a/example/worker.go b/example/worker.go index b5a9853..26831c2 100644 --- a/example/worker.go +++ b/example/worker.go @@ -8,7 +8,7 @@ import ( "strings" ) -func ToUpper(job *gearman.Job) ([]byte, os.Error) { +func ToUpper(job *gearman.WorkerJob) ([]byte, os.Error) { data := []byte(strings.ToUpper(string(job.Data))) return data, nil } @@ -28,13 +28,13 @@ func main() { switch str { case "echo": worker.Echo([]byte("Hello world!")) - job := <-worker.Queue + job := <-worker.JobQueue log.Println(string(job.Data)) case "quit": worker.Close() return case "result": - job := <-worker.Queue + job := <-worker.JobQueue log.Println(string(job.Data)) default: log.Println("Unknown command") diff --git a/src/pkg/gearman/Makefile b/src/pkg/gearman/Makefile index 19ac949..14a92f2 100644 --- a/src/pkg/gearman/Makefile +++ b/src/pkg/gearman/Makefile @@ -10,7 +10,8 @@ GOFILES=\ worker/job.go\ worker/jobclient.go\ worker.go\ -# client.go\ + client/job.go\ + client.go\ CLEANFILES+=gearman_test diff --git a/src/pkg/gearman/client.go b/src/pkg/gearman/client.go index 6731e83..094d0bf 100644 --- a/src/pkg/gearman/client.go +++ b/src/pkg/gearman/client.go @@ -1,6 +1,118 @@ package gearman -// #cgo: LDFLAGS: -lgearman -// #include +import ( + "os" + "net" + "log" +) -import "C" +type Client struct { + conn net.Conn + running bool + JobQueue chan *ClientJob + ErrQueue chan os.Error +} + +func NewClient() (client * Client){ + client = &Client{running:false, + JobQueue:make(chan *ClientJob, QUEUE_CAP), + ErrQueue:make(chan os.Error, QUEUE_CAP),} + return +} + +func (client *Client) AddServer(addr string) (err os.Error) { + conn, err := net.Dial(TCP, addr) + if err != nil { + return + } + client.conn = conn + go client.work() + return +} + +func (client *Client) work() { + OUT: for client.running { + var rel []byte + for { + buf := make([]byte, 2048) + n, err := client.conn.Read(buf) + if err != nil { + if err == os.EOF && n == 0 { + break + } + client.ErrQueue <- err + continue OUT + } + rel = append(rel, buf[0: n] ...) + } + job, err := DecodeClientJob(rel) + if err != nil { + client.ErrQueue <- err + } else { + switch(job.dataType) { + case ERROR: + _, err := getError(job.Data) + client.ErrQueue <- err + case ECHO_RES: + client.JobQueue <- job + } + } + } +} + +func (client *Client) Do(funcname string, data []byte, flag byte) (err os.Error) { + return +} + +func (client *Client) Echo(data []byte) (err os.Error) { + job := NewClientJob(REQ, ECHO_REQ, data) + return client.WriteJob(job) +} + +func (client *Client) LastResult() (job *ClientJob) { + if l := len(client.JobQueue); l != 1 { + if l == 0 { + return + } + for i := 0; i < l - 1; i ++ { + <-client.JobQueue + } + } + return <-client.JobQueue +} + +func (client *Client) LastError() (err os.Error) { + if l := len(client.ErrQueue); l != 1 { + if l == 0 { + return + } + for i := 0; i < l - 1; i ++ { + <-client.ErrQueue + } + } + return <-client.ErrQueue +} + +func (client *Client) WriteJob(job *ClientJob) (err os.Error) { + return client.Write(job.Encode()) +} + +func (client *Client) Write(buf []byte) (err os.Error) { + log.Println(buf) + var n int + for i := 0; i < len(buf); i += n { + n, err = client.conn.Write(buf[i:]) + if err != nil { + return + } + } + return +} + +func (client *Client) Close() (err os.Error) { + client.running = false + err = client.conn.Close() + close(client.JobQueue) + close(client.ErrQueue) + return +} diff --git a/src/pkg/gearman/client/job.go b/src/pkg/gearman/client/job.go new file mode 100644 index 0000000..264b152 --- /dev/null +++ b/src/pkg/gearman/client/job.go @@ -0,0 +1,49 @@ +package gearman + +import ( + "os" +) + +type ClientJob struct { + Data []byte + Handle string + UniqueId string + magicCode, dataType uint32 +} + +func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) { + return &ClientJob{magicCode:magiccode, + dataType:datatype, + Data:data} +} + +func DecodeClientJob(data []byte) (job * ClientJob, err os.Error) { + if len(data) < 12 { + err = os.NewError("Data length is too small.") + return + } + datatype := byteToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := byteToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + err = os.NewError("Invalid data length.") + return + } + data = data[12:] + job = NewClientJob(RES, datatype, data) + return +} + +func (job *ClientJob) Encode() (data []byte) { + magiccode := uint32ToByte(job.magicCode) + datatype := uint32ToByte(job.dataType) + data = make([]byte, 0, 1024 * 64) + data = append(data, magiccode[:] ...) + data = append(data, datatype[:] ...) + data = append(data, []byte{0, 0, 0, 0} ...) + l := len(job.Data) + data = append(data, job.Data ...) + datalength := uint32ToByte(uint32(l)) + copy(data[8:12], datalength[:]) + return +} + diff --git a/src/pkg/gearman/client_test.go b/src/pkg/gearman/client_test.go new file mode 100644 index 0000000..02554e5 --- /dev/null +++ b/src/pkg/gearman/client_test.go @@ -0,0 +1,42 @@ +package gearman + +import ( + "testing" + "os" +) + +var client *Client + +func init() { + client = NewClient() +} + +func TestClientAddServer(t * testing.T) { + t.Log("Add local server 127.0.0.1:4730") + if err := client.AddServer("127.0.0.1:4730"); err != nil { + t.Error(err) + } +} + +func TestClientEcho(t * testing.T) { + if err := client.Echo([]byte("Hello world")); err != nil { + t.Error(err) + } +} + +/* +func TestClientLastResult(t * testing.T) { + job := client.LastResult() + if job == nil { + t.Error(os.NewError("job shuold be the echo.")) + } else { + t.Log(job) + } +} +*/ + +func TestClientClose(t * testing.T) { + if err := client.Close(); err != nil { + t.Error(err) + } +} diff --git a/src/pkg/gearman/gearman.go b/src/pkg/gearman/gearman.go index dc8f3ca..8650484 100644 --- a/src/pkg/gearman/gearman.go +++ b/src/pkg/gearman/gearman.go @@ -1,9 +1,14 @@ package gearman +import ( + "os" +) + const ( TCP = "tcp4" WORKER_SERVER_CAP = 32 WORKER_FUNCTION_CAP = 512 + QUEUE_CAP = 512 // \x00REQ @@ -32,6 +37,62 @@ const ( WORK_WARNING = 29 GRAB_JOB_UNIQ = 30 JOB_ASSIGN_UNIQ = 31 + + SUBMIT_JOB = 7 + SUBMIT_JOB_BG = 18 + SUBMIT_JOB_HIGH = 21 + SUBMIT_JOB_HIGH_BG = 32 + SUBMIT_JOB_LOW = 33 + SUBMIT_JOB_LOW_BG = 34 + + JOB_NORMAL = 0 + JOB_BG = 1 + JOB_LOW = 2 + JOB_HIGH = 4 ) +type Job interface { + Encode() []byte +} + +func splitByteArray(slice []byte, spot byte) (data [][]byte){ + data = make([][]byte, 0, 10) + start, end := 0, 0 + for i, v := range slice { + if v == spot { + if start != end { + data = append(data, slice[start:end]) + } + start, end = i + 1, i + 1 + } else { + end ++ + } + } + data = append(data, slice[start:]) + return +} + +func getError(data []byte) (eno os.Errno, err os.Error) { + rel := splitByteArray(data, '\x00') + if len(rel) != 2 { + err = os.NewError("The input is not a error data.") + return + } + eno, err = rel[0], rel[1] + return +} + +func byteToUint32(buf [4]byte) uint32 { + return uint32(buf[0]) << 24 + + uint32(buf[1]) << 16 + + uint32(buf[2]) << 8 + + uint32(buf[3]) +} +func uint32ToByte(i uint32) (data [4]byte) { + data[0] = byte((i >> 24) & 0xff) + data[1] = byte((i >> 16) & 0xff) + data[2] = byte((i >> 8) & 0xff) + data[3] = byte(i & 0xff) + return +} diff --git a/src/pkg/gearman/worker.go b/src/pkg/gearman/worker.go index 9feb994..8502750 100644 --- a/src/pkg/gearman/worker.go +++ b/src/pkg/gearman/worker.go @@ -3,10 +3,10 @@ package gearman import( "os" "sync" - "log" +// "log" ) -type JobFunction func(job *Job) ([]byte, os.Error) +type JobFunction func(job *WorkerJob) ([]byte, os.Error) type JobFunctionMap map[string]JobFunction type Worker struct { @@ -14,9 +14,10 @@ type Worker struct { functions JobFunctionMap running bool - incoming chan *Job + incoming chan *WorkerJob mutex sync.Mutex - Queue chan *Job + JobQueue chan *WorkerJob + ErrQueue chan os.Error } func NewWorker() (worker *Worker) { @@ -25,11 +26,12 @@ func NewWorker() (worker *Worker) { clients:make([]*jobClient, 0, WORKER_SERVER_CAP), // function list functions: make(JobFunctionMap), - incoming: make(chan *Job, 512), - Queue: make(chan *Job, 512), + incoming: make(chan *WorkerJob, QUEUE_CAP), + JobQueue: make(chan *WorkerJob, QUEUE_CAP), + ErrQueue: make(chan os.Error, QUEUE_CAP), running: true, } - return worker + return } // add server @@ -43,7 +45,7 @@ func (worker * Worker) AddServer(addr string) (err os.Error) { } // Create a new job server's client as a agent of server - server, err := newJobClient(addr, worker.incoming) + server, err := newJobClient(addr, worker) if err != nil { return err } @@ -79,7 +81,7 @@ func (worker * Worker) AddFunction(funcname string, t := uint32ToByte(timeout) data = append(data, t[:] ...) } - job := NewJob(REQ, datatype, data) + job := NewWorkerJob(REQ, datatype, data) worker.WriteJob(job) return } @@ -93,7 +95,7 @@ func (worker * Worker) RemoveFunction(funcname string) (err os.Error) { return os.NewError("No function named: " + funcname) } worker.functions[funcname] = nil, false - job := NewJob(REQ, CANT_DO, []byte(funcname)) + job := NewWorkerJob(REQ, CANT_DO, []byte(funcname)) worker.WriteJob(job) return } @@ -113,29 +115,30 @@ func (worker * Worker) Work() { case NO_JOB: // do nothing case ERROR: - log.Println(string(job.Data)) + _, err := getError(job.Data) + worker.ErrQueue <- err case JOB_ASSIGN, JOB_ASSIGN_UNIQ: if err := worker.exec(job); err != nil { - log.Println(err) + worker.ErrQueue <- err } continue default: - worker.Queue <- job + worker.JobQueue <- job } } } } -func (worker * Worker) Result() (job *Job) { - if l := len(worker.Queue); l != 1 { +func (worker * Worker) LastResult() (job *WorkerJob) { + if l := len(worker.JobQueue); l != 1 { if l == 0 { return } for i := 0; i < l - 1; i ++ { - <-worker.Queue + <-worker.JobQueue } } - return <-worker.Queue + return <-worker.JobQueue } // Close @@ -149,11 +152,10 @@ func (worker * Worker) Close() (err os.Error){ return err } -func (worker * Worker) WriteJob(job *Job) (err os.Error) { +func (worker * Worker) WriteJob(job *WorkerJob) (err os.Error) { e := make(chan os.Error) for _, v := range worker.clients { go func() { - log.Println(v) e <- v.WriteJob(job) }() } @@ -162,24 +164,24 @@ func (worker * Worker) WriteJob(job *Job) (err os.Error) { // Echo func (worker * Worker) Echo(data []byte) (err os.Error) { - job := NewJob(REQ, ECHO_REQ, data) + job := NewWorkerJob(REQ, ECHO_REQ, data) return worker.WriteJob(job) } // Reset func (worker * Worker) Reset() (err os.Error){ - job := NewJob(REQ, RESET_ABILITIES, nil) + job := NewWorkerJob(REQ, RESET_ABILITIES, nil) return worker.WriteJob(job) } // SetId func (worker * Worker) SetId(id string) (err os.Error) { - job := NewJob(REQ, SET_CLIENT_ID, []byte(id)) + job := NewWorkerJob(REQ, SET_CLIENT_ID, []byte(id)) return worker.WriteJob(job) } // Exec -func (worker * Worker) exec(job *Job) (err os.Error) { +func (worker * Worker) exec(job *WorkerJob) (err os.Error) { jobdata := splitByteArray(job.Data, '\x00') job.Handle = string(jobdata[0]) funcname := string(jobdata[1]) @@ -194,8 +196,6 @@ func (worker * Worker) exec(job *Job) (err os.Error) { return os.NewError("function is nil") } result, err := f(job) - log.Println(result) - log.Println(err) var datatype uint32 if err == nil { datatype = WORK_COMPLETE @@ -206,27 +206,11 @@ func (worker * Worker) exec(job *Job) (err os.Error) { datatype = WORK_EXCEPTION } } + job.magicCode = REQ job.dataType = datatype job.Data = result - - worker.WriteJob(job) - return -} -func splitByteArray(slice []byte, spot byte) (data [][]byte){ - data = make([][]byte, 0, 10) - start, end := 0, 0 - for i, v := range slice { - if v == spot { - if start != end { - data = append(data, slice[start:end]) - } - start, end = i + 1, i + 1 - } else { - end ++ - } - } - data = append(data, slice[start:]) + worker.WriteJob(job) return } diff --git a/src/pkg/gearman/worker/job.go b/src/pkg/gearman/worker/job.go index 9b0f0a6..da68cc1 100644 --- a/src/pkg/gearman/worker/job.go +++ b/src/pkg/gearman/worker/job.go @@ -2,52 +2,41 @@ package gearman import ( "os" - "log" +// "log" ) -type Job struct { +type WorkerJob struct { Data []byte Handle string UniqueId string client *jobClient magicCode, dataType uint32 + Job } -func byteToUint32(buf [4]byte) uint32 { - return uint32(buf[0]) << 24 + - uint32(buf[1]) << 16 + - uint32(buf[2]) << 8 + - uint32(buf[3]) -} - -func uint32ToByte(i uint32) (data [4]byte) { - data[0] = byte((i >> 24) & 0xff) - data[1] = byte((i >> 16) & 0xff) - data[2] = byte((i >> 8) & 0xff) - data[3] = byte(i & 0xff) - return -} - -func NewJob(magiccode, datatype uint32, data []byte) (job *Job) { - return &Job{magicCode:magiccode, +func NewWorkerJob(magiccode, datatype uint32, data []byte) (job *WorkerJob) { + return &WorkerJob{magicCode:magiccode, dataType: datatype, Data:data} } -func DecodeJob(data []byte) (job *Job, err os.Error) { +func DecodeWorkerJob(data []byte) (job *WorkerJob, err os.Error) { if len(data) < 12 { - return nil, os.NewError("Data length is too small.") + err = os.NewError("Data length is too small.") + return } datatype := byteToUint32([4]byte{data[4], data[5], data[6], data[7]}) l := byteToUint32([4]byte{data[8], data[9], data[10], data[11]}) if len(data[12:]) != int(l) { - return nil, os.NewError("Invalid data length.") + err = os.NewError("Invalid data length.") + return } data = data[12:] - return NewJob(REQ, datatype, data), err + job = NewWorkerJob(RES, datatype, data) + return } -func (job *Job) Encode() (data []byte) { +func (job *WorkerJob) Encode() (data []byte) { magiccode := uint32ToByte(job.magicCode) datatype := uint32ToByte(job.dataType) data = make([]byte, 0, 1024 * 64) @@ -63,12 +52,11 @@ func (job *Job) Encode() (data []byte) { data = append(data, job.Data ...) datalength := uint32ToByte(uint32(l)) copy(data[8:12], datalength[:]) - log.Println(data) return } // update data -func (job * Job) UpdateData(data []byte, iswaring bool) (err os.Error) { +func (job * WorkerJob) UpdateData(data []byte, iswaring bool) (err os.Error) { result := append([]byte(job.Handle), 0) result = append(result, data ...) var datatype uint32 @@ -77,17 +65,17 @@ func (job * Job) UpdateData(data []byte, iswaring bool) (err os.Error) { } else { datatype = WORK_DATA } - return job.client.WriteJob(NewJob(REQ, datatype, result)) + return job.client.WriteJob(NewWorkerJob(REQ, datatype, result)) } // update status -func (job * Job) UpdateStatus(numerator, denominator uint32) (err os.Error) { +func (job * WorkerJob) UpdateStatus(numerator, denominator uint32) (err os.Error) { n := uint32ToByte(numerator) d := uint32ToByte(denominator) result := append([]byte(job.Handle), 0) result = append(result, n[:] ...) result = append(result, d[:] ...) - return job.client.WriteJob(NewJob(REQ, WORK_STATUS, result)) + return job.client.WriteJob(NewWorkerJob(REQ, WORK_STATUS, result)) } diff --git a/src/pkg/gearman/worker/jobclient.go b/src/pkg/gearman/worker/jobclient.go index f391e34..a71ec62 100644 --- a/src/pkg/gearman/worker/jobclient.go +++ b/src/pkg/gearman/worker/jobclient.go @@ -3,31 +3,30 @@ package gearman import ( "net" "os" - "log" +// "log" ) type jobClient struct { conn net.Conn - incoming chan *Job + worker *Worker running bool } -func newJobClient(addr string, incoming chan *Job) (jobclient *jobClient, err os.Error) { +func newJobClient(addr string, worker *Worker) (jobclient *jobClient, err os.Error) { conn, err := net.Dial(TCP, addr) if err != nil { return nil, err } - jobclient = &jobClient{conn:conn, incoming: incoming, running:true} + jobclient = &jobClient{conn:conn, worker:worker, running:true} return jobclient, err } -func (client *jobClient) Work() (err os.Error) { - log.Println("Job client work().") +func (client *jobClient) Work() { noop := true - for client.running { + OUT: for client.running { // grab job if noop { - client.WriteJob(NewJob(REQ, GRAB_JOB, nil)) + client.WriteJob(NewWorkerJob(REQ, GRAB_JOB, nil)) } var rel []byte for { @@ -37,36 +36,36 @@ func (client *jobClient) Work() (err os.Error) { if err == os.EOF && n == 0 { break } - return err + client.worker.ErrQueue <- err + continue OUT } rel = append(rel, buf[0: n] ...) - break } - job, err := DecodeJob(rel) + job, err := DecodeWorkerJob(rel) if err != nil { - return err + client.worker.ErrQueue <- err + continue } else { switch(job.dataType) { case NOOP: noop = true case NO_JOB: noop = false - client.WriteJob(NewJob(REQ, PRE_SLEEP, nil)) + client.WriteJob(NewWorkerJob(REQ, PRE_SLEEP, nil)) case ECHO_RES, JOB_ASSIGN_UNIQ, JOB_ASSIGN: job.client = client - client.incoming <- job + client.worker.incoming <- job } } } return } -func (client *jobClient) WriteJob(job * Job) (err os.Error) { +func (client *jobClient) WriteJob(job *WorkerJob) (err os.Error) { return client.Write(job.Encode()) } func (client *jobClient) Write(buf []byte) (err os.Error) { - log.Println(buf) var n int for i := 0; i < len(buf); i += n { n, err = client.conn.Write(buf[i:]) diff --git a/src/pkg/gearman/gearman_test.go b/src/pkg/gearman/worker_test.go similarity index 75% rename from src/pkg/gearman/gearman_test.go rename to src/pkg/gearman/worker_test.go index 84aae20..8feae31 100644 --- a/src/pkg/gearman/gearman_test.go +++ b/src/pkg/gearman/worker_test.go @@ -11,7 +11,7 @@ func init() { worker = NewWorker() } -func TestAddServer(t *testing.T) { +func TestWorkerAddServer(t *testing.T) { t.Log("Add local server 127.0.0.1:4730.") if err := worker.AddServer("127.0.0.1:4730"); err != nil { t.Error(err) @@ -23,12 +23,12 @@ func TestAddServer(t *testing.T) { } } -func foobar(job *Job) ([]byte, os.Error) { +func foobar(job *WorkerJob) ([]byte, os.Error) { return nil, nil } -func TestAddFunction(t *testing.T) { +func TestWorkerAddFunction(t *testing.T) { if err := worker.AddFunction("foobar", foobar, 0); err != nil { t.Error(err) } @@ -41,14 +41,14 @@ func TestAddFunction(t *testing.T) { } } -func TestEcho(t * testing.T) { +func TestWorkerEcho(t * testing.T) { if err := worker.Echo([]byte("Hello World")); err != nil { t.Error(err) } } /* -func TestResult(t *testing.T) { - if job := worker.Result(); job == nil { +func TestWorkerResult(t *testing.T) { + if job := worker.LastResult(); job == nil { t.Error("Nothing in result.") } else { t.Log(job) @@ -56,19 +56,19 @@ func TestResult(t *testing.T) { } */ -func TestRemoveFunction(t * testing.T) { +func TestWorkerRemoveFunction(t * testing.T) { if err := worker.RemoveFunction("foobar"); err != nil { t.Error(err) } } -func TestReset(t * testing.T) { +func TestWorkerReset(t * testing.T) { if err := worker.Reset(); err != nil { t.Error(err) } } -func TestClose(t *testing.T) { +func TestWorkerClose(t *testing.T) { if err := worker.Close(); err != nil { t.Error(err) }