From bf0c48b87fc5121c8f6d521ef233b4aa264ae6cb Mon Sep 17 00:00:00 2001 From: mikespook Date: Thu, 19 May 2011 14:39:31 +0800 Subject: [PATCH] Fixed a bug, that encoding the job but without the handle infomation. --- src/pkg/gearman/worker.go | 22 ++++++++++++++++------ src/pkg/gearman/worker/job.go | 21 +++++++++++++-------- src/pkg/gearman/worker/jobclient.go | 4 +++- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/pkg/gearman/worker.go b/src/pkg/gearman/worker.go index 541e515..9feb994 100644 --- a/src/pkg/gearman/worker.go +++ b/src/pkg/gearman/worker.go @@ -112,12 +112,11 @@ func (worker * Worker) Work() { switch job.dataType { case NO_JOB: // do nothing - log.Println(job) case ERROR: log.Println(string(job.Data)) case JOB_ASSIGN, JOB_ASSIGN_UNIQ: if err := worker.exec(job); err != nil { - log.Panicln(err) + log.Println(err) } continue default: @@ -154,6 +153,7 @@ func (worker * Worker) WriteJob(job *Job) (err os.Error) { e := make(chan os.Error) for _, v := range worker.clients { go func() { + log.Println(v) e <- v.WriteJob(job) }() } @@ -189,7 +189,13 @@ func (worker * Worker) exec(job *Job) (err os.Error) { job.UniqueId = string(jobdata[2]) job.Data = jobdata[3] } - result, err := worker.functions[funcname](job) + f := worker.functions[funcname] + if f == nil { + return os.NewError("function is nil") + } + result, err := f(job) + log.Println(result) + log.Println(err) var datatype uint32 if err == nil { datatype = WORK_COMPLETE @@ -200,23 +206,27 @@ func (worker * Worker) exec(job *Job) (err os.Error) { datatype = WORK_EXCEPTION } } - worker.WriteJob(NewJob(REQ, datatype, result)) + job.magicCode = REQ + job.dataType = datatype + job.Data = result + + worker.WriteJob(job) return } func splitByteArray(slice []byte, spot byte) (data [][]byte){ data = make([][]byte, 0, 10) - log.Println(data) start, end := 0, 0 for i, v := range slice { if v == spot { if start != end { data = append(data, slice[start:end]) } - start, end = i, i + start, end = i + 1, i + 1 } else { end ++ } } + data = append(data, slice[start:]) return } diff --git a/src/pkg/gearman/worker/job.go b/src/pkg/gearman/worker/job.go index 88d0e6f..9b0f0a6 100644 --- a/src/pkg/gearman/worker/job.go +++ b/src/pkg/gearman/worker/job.go @@ -2,6 +2,7 @@ package gearman import ( "os" + "log" ) type Job struct { @@ -42,23 +43,27 @@ func DecodeJob(data []byte) (job *Job, err os.Error) { if len(data[12:]) != int(l) { return nil, os.NewError("Invalid data length.") } - switch(byteToUint32([4]byte{data[4], data[5], data[6], data[7]})) { - case ECHO_RES: - data = data[12:] - } + data = data[12:] return NewJob(REQ, datatype, data), err } func (job *Job) Encode() (data []byte) { magiccode := uint32ToByte(job.magicCode) datatype := uint32ToByte(job.dataType) + data = make([]byte, 0, 1024 * 64) + data = append(data, magiccode[:] ...) + data = append(data, datatype[:] ...) + data = append(data, []byte{0, 0, 0, 0} ...) l := len(job.Data) + if job.Handle != "" { + data = append(data, []byte(job.Handle) ...) + data = append(data, 0) + l += len(job.Handle) + 1 + } + data = append(data, job.Data ...) datalength := uint32ToByte(uint32(l)) - data = make([]byte, 12 + l) - copy(data[:4], magiccode[:]) - copy(data[4:8], datatype[:]) copy(data[8:12], datalength[:]) - copy(data[12:], job.Data) + log.Println(data) return } diff --git a/src/pkg/gearman/worker/jobclient.go b/src/pkg/gearman/worker/jobclient.go index 453dc46..f391e34 100644 --- a/src/pkg/gearman/worker/jobclient.go +++ b/src/pkg/gearman/worker/jobclient.go @@ -3,7 +3,7 @@ package gearman import ( "net" "os" -// "log" + "log" ) type jobClient struct { @@ -22,6 +22,7 @@ func newJobClient(addr string, incoming chan *Job) (jobclient *jobClient, err os } func (client *jobClient) Work() (err os.Error) { + log.Println("Job client work().") noop := true for client.running { // grab job @@ -65,6 +66,7 @@ func (client *jobClient) WriteJob(job * Job) (err os.Error) { } func (client *jobClient) Write(buf []byte) (err os.Error) { + log.Println(buf) var n int for i := 0; i < len(buf); i += n { n, err = client.conn.Write(buf[i:])