From 82f0ab2cd5e53404bbcd0117b50450a0a2c27c39 Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 24 May 2011 12:21:49 +0800 Subject: [PATCH] Could not trust os.EOF as the end of msg's mark. --- example/py/client.py | 20 +++++--- example/py/worker.py | 14 +++--- src/pkg/gearman/client.go | 5 +- src/pkg/gearman/worker/jobclient.go | 73 +++++++++++++++++++++-------- 4 files changed, 77 insertions(+), 35 deletions(-) diff --git a/example/py/client.py b/example/py/client.py index 18c9d38..592f8ee 100755 --- a/example/py/client.py +++ b/example/py/client.py @@ -1,13 +1,21 @@ #!/usr/bin/python -from gearman import libgearman +import gearman + +def check_request_status(job_request): + if job_request.complete: + print "Job %s finished! Result: %s - %s" % (job_request.job.unique, job_request.state, job_request.result) + elif job_request.timed_out: + print "Job %s timed out!" % job_request.unique + elif job_request.state == JOB_UNKNOWN: + print "Job %s connection failed!" % job_request.unique def main(): - client = libgearman.Client() - client.add_server("127.0.0.1", 4730) - r = client.do("ToUpper", "arbitrary binary data") - print r + client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730']) + completed_job_request = client.submit_job("ToUpper", "arbitrary binary data") + check_request_status(completed_job_request) if __name__ == '__main__': - main() + for i in range(100): + main() diff --git a/example/py/worker.py b/example/py/worker.py index d6cb7b8..1e779b3 100755 --- a/example/py/worker.py +++ b/example/py/worker.py @@ -1,20 +1,18 @@ #!/usr/bin/python import time -from gearman import libgearman +import gearman -def toUpper(job): - r = job.get_workload().upper() +def toUpper(worker, job): + r = job.data.upper() print r return r def main(): - worker = libgearman.Worker() - worker.add_server("127.0.0.1", 4730) - worker.add_function("ToUpper", toUpper) - while True: - worker.work() + worker = gearman.GearmanWorker(['localhost:4730']) + worker.register_task('ToUpper', toUpper) + worker.work() if __name__ == '__main__': main() diff --git a/src/pkg/gearman/client.go b/src/pkg/gearman/client.go index d34f5da..ca06b97 100644 --- a/src/pkg/gearman/client.go +++ b/src/pkg/gearman/client.go @@ -73,11 +73,12 @@ func (client *Client) read() (data []byte, err os.Error) { } // split package start, end := 0, 4 - for i := 0; i < len(data); i ++{ + tl := len(data) + for i := 0; i < tl; i ++{ if string(data[start:end]) == RES_STR { l := int(byteToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) total := l + 12 - if total == len(data) { + if total == tl { return } else { client.incoming <- data[total:] diff --git a/src/pkg/gearman/worker/jobclient.go b/src/pkg/gearman/worker/jobclient.go index 31ca16d..8d3c57b 100644 --- a/src/pkg/gearman/worker/jobclient.go +++ b/src/pkg/gearman/worker/jobclient.go @@ -15,6 +15,7 @@ type jobClient struct { conn net.Conn worker *Worker running bool + incoming chan []byte } // Create the client of job server. @@ -23,33 +24,66 @@ func newJobClient(addr string, worker *Worker) (jobclient *jobClient, err os.Err if err != nil { return nil, err } - jobclient = &jobClient{conn:conn, worker:worker, running:true} + jobclient = &jobClient{conn:conn, worker:worker, running:true, incoming: make(chan []byte, QUEUE_CAP)} return jobclient, err } +// Internal read +func (client *jobClient) read() (data []byte, err os.Error){ + if len(client.incoming) > 0 { + // incoming queue is not empty + data = <-client.incoming + } else { + for { + buf := make([]byte, BUFFER_SIZE) + var n int + if n, err = client.conn.Read(buf); err != nil { + if err == os.EOF && n == 0 { + err = nil + return + } + return + } + data = append(data, buf[0: n] ...) + if n < BUFFER_SIZE { + break + } + } + } + // split package + start := 0 + tl := len(data) + for i := 0; i < tl; i ++{ + if string(data[start:start+4]) == RES_STR { + l := int(byteToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + total := l + 12 + if total == tl { + return + } else { + client.incoming <- data[total:] + data = data[:total] + return + } + } else { + start++ + } + } + err = os.NewError("Invalid data struct.") + return +} + // Main loop. func (client *jobClient) Work() { noop := true - OUT: for client.running { - // grab job - if noop { + for client.running { + // got noop msg and incoming queue is zero, grab job + if noop && len(client.incoming) == 0 { client.WriteJob(NewWorkerJob(REQ, GRAB_JOB, nil)) } - var rel []byte - for { - buf := make([]byte, BUFFER_SIZE) - n, err := client.conn.Read(buf) - if err != nil { - if err == os.EOF && n == 0 { - break - } - client.worker.ErrQueue <- err - continue OUT - } - rel = append(rel, buf[0: n] ...) - if n < BUFFER_SIZE { - break - } + rel, err := client.read() + if err != nil { + client.worker.ErrQueue <- err + continue } job, err := DecodeWorkerJob(rel) if err != nil { @@ -91,6 +125,7 @@ func (client *jobClient) Write(buf []byte) (err os.Error) { // Close. func (client *jobClient) Close() (err os.Error) { client.running = false + close(client.incoming) err = client.conn.Close() return }