From 016dd669aaba24eade8b452b75c9aac62faa34a5 Mon Sep 17 00:00:00 2001 From: mikespook Date: Fri, 20 May 2011 17:38:10 +0800 Subject: [PATCH] The client can work now...with some known bugs. :( --- example/Makefile | 2 +- example/client.go | 40 +++++++ src/pkg/gearman/client.go | 173 ++++++++++++++++++---------- src/pkg/gearman/client/job.go | 41 +++++++ src/pkg/gearman/client_test.go | 13 ++- src/pkg/gearman/gearman.go | 6 + src/pkg/gearman/worker.go | 2 +- src/pkg/gearman/worker/jobclient.go | 5 +- 8 files changed, 216 insertions(+), 66 deletions(-) create mode 100644 example/client.go diff --git a/example/Makefile b/example/Makefile index 5bf1f39..fb941db 100644 --- a/example/Makefile +++ b/example/Makefile @@ -9,7 +9,7 @@ TARG=gearman # GOFILES=\ # worker.go\ -CLEANFILES+= worker +CLEANFILES+= worker client include $(GOROOT)/src/Make.pkg diff --git a/example/client.go b/example/client.go new file mode 100644 index 0000000..6f6e029 --- /dev/null +++ b/example/client.go @@ -0,0 +1,40 @@ +package main + +import ( + "gearman" + "log" +) + +func main() { + client := gearman.NewClient() + defer client.Close() + client.AddServer("127.0.0.1:4730") + echo := []byte("Hello world") + log.Println(echo) + log.Println(client.Echo(echo)) + + handle, err := client.Do("ToUpper", echo, gearman.JOB_HIGH) + if err != nil { + log.Println(err) + } + known, running, numerator, denominator, err := client.Status(handle) + if err != nil { + log.Println(err) + } + if !known { + log.Println("Unknown") + } + if running { + log.Printf("%g%%\n", float32(numerator) * 100 / float32(denominator)) + } else { + log.Println("Not running") + } + log.Println("read") + if job, err := client.ReadJob(); err != nil { + log.Println(err) + } else { + data, err := job.Result(); + log.Println(err) + log.Println(data) + } +} diff --git a/src/pkg/gearman/client.go b/src/pkg/gearman/client.go index 1e561f6..f58e635 100644 --- a/src/pkg/gearman/client.go +++ b/src/pkg/gearman/client.go @@ -3,20 +3,19 @@ package gearman import ( "os" "net" -// "log" + "sync" ) type Client struct { + mutex sync.Mutex conn net.Conn - running bool JobQueue chan *ClientJob - ErrQueue chan os.Error + UId uint32 } func NewClient() (client * Client){ - client = &Client{running:false, - JobQueue:make(chan *ClientJob, QUEUE_CAP), - ErrQueue:make(chan os.Error, QUEUE_CAP),} + client = &Client{JobQueue:make(chan *ClientJob, QUEUE_CAP), + UId:1} return } @@ -26,49 +25,42 @@ func (client *Client) AddServer(addr string) (err os.Error) { return } client.conn = conn - go client.work() return } -func (client *Client) work() { - OUT: for client.running { - var rel []byte - for { - buf := make([]byte, 2048) - n, err := client.conn.Read(buf) - if err != nil { - if err == os.EOF && n == 0 { - break - } - client.ErrQueue <- err - continue OUT +func (client *Client) ReadJob() (job *ClientJob, err os.Error) { + var rel []byte + for { + buf := make([]byte, BUFFER_SIZE) + var n int + if n, err = client.conn.Read(buf); err != nil { + if (err == os.EOF && n == 0) { + break } - rel = append(rel, buf[0: n] ...) + return } - job, err := DecodeClientJob(rel) - if err != nil { - client.ErrQueue <- err - } else { - switch(job.dataType) { - case ERROR: - _, err := getError(job.Data) - client.ErrQueue <- err - case ECHO_RES: - client.JobQueue <- job - } + rel = append(rel, buf[0: n] ...) + if n < BUFFER_SIZE { + break } } + if job, err = DecodeClientJob(rel); err != nil { + return + } else { + switch(job.dataType) { + case ERROR: + _, err = getError(job.Data) + return + case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: + client.JobQueue <- job + } + } + return } -func (client *Client) Do(funcname string, data []byte, flag byte) (err os.Error) { +func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err os.Error) { var datatype uint32 - if flag & JOB_NORMAL == JOB_NORMAL { - if flag & JOB_BG == JOB_BG { - datatype = SUBMIT_JOB_BG - } else { - datatype = SUBMIT_JOB - } - } else if flag & JOB_LOW == JOB_LOW { + if flag & JOB_LOW == JOB_LOW { if flag & JOB_BG == JOB_BG { datatype = SUBMIT_JOB_LOW_BG } else { @@ -80,20 +72,97 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (err os.Error) } else { datatype = SUBMIT_JOB_HIGH } + } else if flag & JOB_BG == JOB_BG { + datatype = SUBMIT_JOB_BG + } else { + datatype = SUBMIT_JOB } + rel := make([]byte, 0, 1024 * 64) rel = append(rel, []byte(funcname) ...) rel = append(rel, '\x00') - rel = append(rel, '\xFF') + client.mutex.Lock() + uid := uint32ToByte(client.UId) + client.UId ++ + rel = append(rel, uid[:] ...) + client.mutex.Unlock() rel = append(rel, '\x00') rel = append(rel, data ...) - job := NewClientJob(REQ, datatype, data) - return client.WriteJob(job) + if err = client.WriteJob(NewClientJob(REQ, datatype, rel)); err != nil { + return + } + var job *ClientJob + if job, err = client.ReadLastJob(JOB_CREATED); err != nil { + return + } + handle = string(job.Data) + go func() { + if flag & JOB_BG != JOB_BG { + for { + if job, err = client.ReadJob(); err != nil { + return + } + switch job.dataType { + case WORK_DATA, WORK_WARNING: + case WORK_STATUS: + case WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: + return + } + } + } + }() + return } -func (client *Client) Echo(data []byte) (err os.Error) { - job := NewClientJob(REQ, ECHO_REQ, data) - return client.WriteJob(job) +func (client *Client) ReadLastJob(datatype uint32) (job *ClientJob, err os.Error){ + for { + if job, err = client.ReadJob(); err != nil { + return + } + if job.dataType == datatype { + break + } + } + if job.dataType != datatype { + err = os.NewError("No job got.") + } + return +} + +func (client *Client) Status(handle string) (known, running bool, numerator, denominator uint, err os.Error) { + + if err = client.WriteJob(NewClientJob(REQ, GET_STATUS, []byte(handle))); err != nil { + return + } + var job * ClientJob + if job, err = client.ReadLastJob(STATUS_RES); err != nil { + return + } + data := splitByteArray(job.Data, '\x00') + if len(data) != 5 { + err = os.NewError("Data Error.") + return + } + if handle != string(data[0]) { + err = os.NewError("Invalid handle.") + return + } + known = data[1][0] == '1' + running = data[2][0] == '1' + numerator = uint(data[3][0]) + denominator = uint(data[4][0]) + return +} + +func (client *Client) Echo(data []byte) (echo []byte, err os.Error) { + if err = client.WriteJob(NewClientJob(REQ, ECHO_REQ, data)); err != nil { + return + } + var job *ClientJob + if job, err = client.ReadLastJob(ECHO_RES); err != nil { + return + } + return job.Data, err } func (client *Client) LastResult() (job *ClientJob) { @@ -108,18 +177,6 @@ func (client *Client) LastResult() (job *ClientJob) { return <-client.JobQueue } -func (client *Client) LastError() (err os.Error) { - if l := len(client.ErrQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l - 1; i ++ { - <-client.ErrQueue - } - } - return <-client.ErrQueue -} - func (client *Client) WriteJob(job *ClientJob) (err os.Error) { return client.Write(job.Encode()) } @@ -136,9 +193,7 @@ func (client *Client) Write(buf []byte) (err os.Error) { } func (client *Client) Close() (err os.Error) { - client.running = false err = client.conn.Close() close(client.JobQueue) - close(client.ErrQueue) return } diff --git a/src/pkg/gearman/client/job.go b/src/pkg/gearman/client/job.go index 264b152..cb7b1ce 100644 --- a/src/pkg/gearman/client/job.go +++ b/src/pkg/gearman/client/job.go @@ -2,6 +2,7 @@ package gearman import ( "os" +// "log" ) type ClientJob struct { @@ -47,3 +48,43 @@ func (job *ClientJob) Encode() (data []byte) { return } +func (job * ClientJob) Result() (data []byte, err os.Error){ + switch job.dataType { + case WORK_FAIL: + job.Handle = string(job.Data) + err = os.NewError("Work fail.") + return + case WORK_EXCEPTION: + err = os.NewError("Work exception.") + fallthrough + case WORK_COMPLETE: + s := splitByteArray(job.Data, '\x00') + if len(s) != 2 { + err = os.NewError("Invalid data.") + return + } + job.Handle = string(s[0]) + data = s[1] + default: + err = os.NewError("The job is not a result.") + } + return +} + +func (job *ClientJob) Update() (data []byte, err os.Error) { + if job.dataType != WORK_DATA && job.dataType != WORK_WARNING { + err = os.NewError("The job is not a update.") + return + } + s := splitByteArray(job.Data, '\x00') + if len(s) != 2 { + err = os.NewError("Invalid data.") + return + } + if job.dataType == WORK_WARNING { + err = os.NewError("Work warning") + } + job.Handle = string(s[0]) + data = s[1] + return +} diff --git a/src/pkg/gearman/client_test.go b/src/pkg/gearman/client_test.go index 3c6fbc6..065807d 100644 --- a/src/pkg/gearman/client_test.go +++ b/src/pkg/gearman/client_test.go @@ -19,17 +19,21 @@ func TestClientAddServer(t * testing.T) { } func TestClientEcho(t * testing.T) { - if err := client.Echo([]byte("Hello world")); err != nil { + if echo, err := client.Echo([]byte("Hello world")); err != nil { t.Error(err) + } else { + t.Log(echo) } } - +/* func TestClientDo(t * testing.T) { - if err := client.Do("ToUpper", []byte("abcdef"), JOB_NORMAL); err != nil { + if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW | JOB_BG); err != nil { t.Error(err) + } else { + t.Log(handle) } } - +*/ /* func TestClientLastResult(t * testing.T) { job := client.LastResult() @@ -46,3 +50,4 @@ func TestClientClose(t * testing.T) { t.Error(err) } } + diff --git a/src/pkg/gearman/gearman.go b/src/pkg/gearman/gearman.go index a59e931..8e8605e 100644 --- a/src/pkg/gearman/gearman.go +++ b/src/pkg/gearman/gearman.go @@ -9,27 +9,33 @@ const ( WORKER_SERVER_CAP = 32 WORKER_FUNCTION_CAP = 512 QUEUE_CAP = 512 + BUFFER_SIZE = 1024 // \x00REQ REQ = 5391697 + REQ_STR = "\x00REQ" // \x00RES RES = 5391699 + RES_STR = "\x00RES" CAN_DO = 1 CANT_DO = 2 RESET_ABILITIES = 3 PRE_SLEEP = 4 NOOP = 6 + JOB_CREATED = 8 GRAB_JOB = 9 NO_JOB = 10 JOB_ASSIGN = 11 WORK_STATUS = 12 WORK_COMPLETE = 13 WORK_FAIL = 14 + GET_STATUS = 15 ECHO_REQ = 16 ECHO_RES = 17 ERROR = 19 + STATUS_RES = 20 SET_CLIENT_ID = 22 CAN_DO_TIMEOUT = 23 WORK_EXCEPTION = 25 diff --git a/src/pkg/gearman/worker.go b/src/pkg/gearman/worker.go index 8502750..9e38e9d 100644 --- a/src/pkg/gearman/worker.go +++ b/src/pkg/gearman/worker.go @@ -121,7 +121,7 @@ func (worker * Worker) Work() { if err := worker.exec(job); err != nil { worker.ErrQueue <- err } - continue + fallthrough default: worker.JobQueue <- job } diff --git a/src/pkg/gearman/worker/jobclient.go b/src/pkg/gearman/worker/jobclient.go index a71ec62..638e0f9 100644 --- a/src/pkg/gearman/worker/jobclient.go +++ b/src/pkg/gearman/worker/jobclient.go @@ -30,7 +30,7 @@ func (client *jobClient) Work() { } var rel []byte for { - buf := make([]byte, 2048) + buf := make([]byte, BUFFER_SIZE) n, err := client.conn.Read(buf) if err != nil { if err == os.EOF && n == 0 { @@ -40,6 +40,9 @@ func (client *jobClient) Work() { continue OUT } rel = append(rel, buf[0: n] ...) + if n < BUFFER_SIZE { + break + } } job, err := DecodeWorkerJob(rel) if err != nil {