From f6d2a1668557dd8a88db7d092eea80c2069d7e22 Mon Sep 17 00:00:00 2001 From: mikespook Date: Sat, 21 May 2011 21:11:29 +0800 Subject: [PATCH] Great. We got it. The client do the job correctly. --- example/client.go | 20 ++++++++++------ src/pkg/gearman/client.go | 50 ++++++++++++++++++++++++++++----------- 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/example/client.go b/example/client.go index 011e9b4..c138790 100644 --- a/example/client.go +++ b/example/client.go @@ -10,18 +10,24 @@ func main() { defer client.Close() client.AddServer("127.0.0.1:4730") echo := []byte("Hello world") -/* - log.Println(echo) - log.Println(client.Echo(echo)) -*/ + + if data, err := client.Echo(echo); err != nil { + log.Println(string(data)) + } + handle, err := client.Do("ToUpper", echo, gearman.JOB_NORMAL) if err != nil { log.Println(err) } else { log.Println(handle) - log.Println(<-client.JobQueue) + job := <-client.JobQueue + if data, err := job.Result(); err != nil { + log.Println(err) + } else { + log.Println(string(data)) + } } - /* + known, running, numerator, denominator, err := client.Status(handle) if err != nil { log.Println(err) @@ -42,5 +48,5 @@ func main() { log.Println(err) log.Println(data) } - */ + } diff --git a/src/pkg/gearman/client.go b/src/pkg/gearman/client.go index 1fbbe53..25c50f1 100644 --- a/src/pkg/gearman/client.go +++ b/src/pkg/gearman/client.go @@ -4,7 +4,7 @@ import ( "os" "net" "sync" - "log" +// "log" "strconv" ) @@ -33,21 +33,43 @@ func (client *Client) AddServer(addr string) (err os.Error) { } func (client *Client) read() (data []byte, err os.Error) { - var rel []byte - for { - buf := make([]byte, BUFFER_SIZE) - var n int - if n, err = client.conn.Read(buf); err != nil { - if (err == os.EOF && n == 0) { + if len(client.incoming) > 0 { + data = <-client.incoming + } else { + for { + buf := make([]byte, BUFFER_SIZE) + var n int + if n, err = client.conn.Read(buf); err != nil { + if (err == os.EOF && n == 0) { + break + } + return + } + data = append(data, buf[0: n] ...) + if n < BUFFER_SIZE { break } - return - } - rel = append(rel, buf[0: n] ...) - if n < BUFFER_SIZE { - break } } + start, end := 0, 4 + for i := 0; i < len(data); i ++{ + if string(data[start:end]) == RES_STR { + l := int(byteToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + total := l + 12 + if total == len(data) { + return + } else { + client.incoming <- data[total:] + data = data[:total] + return + } + } else { + start++ + end++ + } + } + err = os.NewError("Invalid data struct.") + return } func (client *Client) ReadJob() (job *ClientJob, err os.Error) { @@ -107,7 +129,6 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string return } handle = string(job.Data) - log.Println(handle) go func() { if flag & JOB_BG != JOB_BG { for { @@ -174,7 +195,8 @@ func (client *Client) Echo(data []byte) (echo []byte, err os.Error) { if job, err = client.readLastJob(ECHO_RES); err != nil { return } - return job.Data, err + echo, err = job.Result() + return } func (client *Client) LastResult() (job *ClientJob) {