Fixed a bug, that encoding the job but without the handle infomation.

This commit is contained in:
mikespook 2011-05-19 14:39:31 +08:00
parent a6fab88b22
commit bf0c48b87f
3 changed files with 32 additions and 15 deletions

View File

@ -112,12 +112,11 @@ func (worker * Worker) Work() {
switch job.dataType { switch job.dataType {
case NO_JOB: case NO_JOB:
// do nothing // do nothing
log.Println(job)
case ERROR: case ERROR:
log.Println(string(job.Data)) log.Println(string(job.Data))
case JOB_ASSIGN, JOB_ASSIGN_UNIQ: case JOB_ASSIGN, JOB_ASSIGN_UNIQ:
if err := worker.exec(job); err != nil { if err := worker.exec(job); err != nil {
log.Panicln(err) log.Println(err)
} }
continue continue
default: default:
@ -154,6 +153,7 @@ func (worker * Worker) WriteJob(job *Job) (err os.Error) {
e := make(chan os.Error) e := make(chan os.Error)
for _, v := range worker.clients { for _, v := range worker.clients {
go func() { go func() {
log.Println(v)
e <- v.WriteJob(job) e <- v.WriteJob(job)
}() }()
} }
@ -189,7 +189,13 @@ func (worker * Worker) exec(job *Job) (err os.Error) {
job.UniqueId = string(jobdata[2]) job.UniqueId = string(jobdata[2])
job.Data = jobdata[3] job.Data = jobdata[3]
} }
result, err := worker.functions[funcname](job) f := worker.functions[funcname]
if f == nil {
return os.NewError("function is nil")
}
result, err := f(job)
log.Println(result)
log.Println(err)
var datatype uint32 var datatype uint32
if err == nil { if err == nil {
datatype = WORK_COMPLETE datatype = WORK_COMPLETE
@ -200,23 +206,27 @@ func (worker * Worker) exec(job *Job) (err os.Error) {
datatype = WORK_EXCEPTION datatype = WORK_EXCEPTION
} }
} }
worker.WriteJob(NewJob(REQ, datatype, result)) job.magicCode = REQ
job.dataType = datatype
job.Data = result
worker.WriteJob(job)
return return
} }
func splitByteArray(slice []byte, spot byte) (data [][]byte){ func splitByteArray(slice []byte, spot byte) (data [][]byte){
data = make([][]byte, 0, 10) data = make([][]byte, 0, 10)
log.Println(data)
start, end := 0, 0 start, end := 0, 0
for i, v := range slice { for i, v := range slice {
if v == spot { if v == spot {
if start != end { if start != end {
data = append(data, slice[start:end]) data = append(data, slice[start:end])
} }
start, end = i, i start, end = i + 1, i + 1
} else { } else {
end ++ end ++
} }
} }
data = append(data, slice[start:])
return return
} }

View File

@ -2,6 +2,7 @@ package gearman
import ( import (
"os" "os"
"log"
) )
type Job struct { type Job struct {
@ -42,23 +43,27 @@ func DecodeJob(data []byte) (job *Job, err os.Error) {
if len(data[12:]) != int(l) { if len(data[12:]) != int(l) {
return nil, os.NewError("Invalid data length.") return nil, os.NewError("Invalid data length.")
} }
switch(byteToUint32([4]byte{data[4], data[5], data[6], data[7]})) {
case ECHO_RES:
data = data[12:] data = data[12:]
}
return NewJob(REQ, datatype, data), err return NewJob(REQ, datatype, data), err
} }
func (job *Job) Encode() (data []byte) { func (job *Job) Encode() (data []byte) {
magiccode := uint32ToByte(job.magicCode) magiccode := uint32ToByte(job.magicCode)
datatype := uint32ToByte(job.dataType) datatype := uint32ToByte(job.dataType)
data = make([]byte, 0, 1024 * 64)
data = append(data, magiccode[:] ...)
data = append(data, datatype[:] ...)
data = append(data, []byte{0, 0, 0, 0} ...)
l := len(job.Data) l := len(job.Data)
if job.Handle != "" {
data = append(data, []byte(job.Handle) ...)
data = append(data, 0)
l += len(job.Handle) + 1
}
data = append(data, job.Data ...)
datalength := uint32ToByte(uint32(l)) datalength := uint32ToByte(uint32(l))
data = make([]byte, 12 + l)
copy(data[:4], magiccode[:])
copy(data[4:8], datatype[:])
copy(data[8:12], datalength[:]) copy(data[8:12], datalength[:])
copy(data[12:], job.Data) log.Println(data)
return return
} }

View File

@ -3,7 +3,7 @@ package gearman
import ( import (
"net" "net"
"os" "os"
// "log" "log"
) )
type jobClient struct { type jobClient struct {
@ -22,6 +22,7 @@ func newJobClient(addr string, incoming chan *Job) (jobclient *jobClient, err os
} }
func (client *jobClient) Work() (err os.Error) { func (client *jobClient) Work() (err os.Error) {
log.Println("Job client work().")
noop := true noop := true
for client.running { for client.running {
// grab job // grab job
@ -65,6 +66,7 @@ func (client *jobClient) WriteJob(job * Job) (err os.Error) {
} }
func (client *jobClient) Write(buf []byte) (err os.Error) { func (client *jobClient) Write(buf []byte) (err os.Error) {
log.Println(buf)
var n int var n int
for i := 0; i < len(buf); i += n { for i := 0; i < len(buf); i += n {
n, err = client.conn.Write(buf[i:]) n, err = client.conn.Write(buf[i:])