From d8882938f7dc5994f5482edadec6a04c8bf2599b Mon Sep 17 00:00:00 2001 From: mikespook Date: Sat, 21 May 2011 12:32:44 +0800 Subject: [PATCH] no eof got invalid package, would modify the depack method --- example/client.go | 10 ++++++-- example/worker.go | 5 +++- src/pkg/gearman/client.go | 38 +++++++++++++++++++---------- src/pkg/gearman/client/job.go | 20 +++++++-------- src/pkg/gearman/worker.go | 8 +++--- src/pkg/gearman/worker/job.go | 9 +++---- src/pkg/gearman/worker/jobclient.go | 5 ++-- 7 files changed, 57 insertions(+), 38 deletions(-) diff --git a/example/client.go b/example/client.go index 6f6e029..011e9b4 100644 --- a/example/client.go +++ b/example/client.go @@ -10,13 +10,18 @@ func main() { defer client.Close() client.AddServer("127.0.0.1:4730") echo := []byte("Hello world") +/* log.Println(echo) log.Println(client.Echo(echo)) - - handle, err := client.Do("ToUpper", echo, gearman.JOB_HIGH) +*/ + handle, err := client.Do("ToUpper", echo, gearman.JOB_NORMAL) if err != nil { log.Println(err) + } else { + log.Println(handle) + log.Println(<-client.JobQueue) } + /* known, running, numerator, denominator, err := client.Status(handle) if err != nil { log.Println(err) @@ -37,4 +42,5 @@ func main() { log.Println(err) log.Println(data) } + */ } diff --git a/example/worker.go b/example/worker.go index 26831c2..55971ec 100644 --- a/example/worker.go +++ b/example/worker.go @@ -28,7 +28,10 @@ func main() { switch str { case "echo": worker.Echo([]byte("Hello world!")) - job := <-worker.JobQueue + var job *gearman.WorkerJob + for job = <-worker.JobQueue; job.DataType != gearman.ECHO_RES; job = <-worker.JobQueue { + log.Println(job) + } log.Println(string(job.Data)) case "quit": worker.Close() diff --git a/src/pkg/gearman/client.go b/src/pkg/gearman/client.go index f58e635..1fbbe53 100644 --- a/src/pkg/gearman/client.go +++ b/src/pkg/gearman/client.go @@ -4,17 +4,21 @@ import ( "os" "net" "sync" + "log" + "strconv" ) type Client struct { mutex sync.Mutex conn net.Conn JobQueue chan *ClientJob + incoming chan []byte UId uint32 } func NewClient() (client * Client){ client = &Client{JobQueue:make(chan *ClientJob, QUEUE_CAP), + incoming:make(chan []byte, QUEUE_CAP), UId:1} return } @@ -28,7 +32,7 @@ func (client *Client) AddServer(addr string) (err os.Error) { return } -func (client *Client) ReadJob() (job *ClientJob, err os.Error) { +func (client *Client) read() (data []byte, err os.Error) { var rel []byte for { buf := make([]byte, BUFFER_SIZE) @@ -44,10 +48,17 @@ func (client *Client) ReadJob() (job *ClientJob, err os.Error) { break } } +} + +func (client *Client) ReadJob() (job *ClientJob, err os.Error) { + var rel []byte + if rel, err = client.read(); err != nil { + return + } if job, err = DecodeClientJob(rel); err != nil { return } else { - switch(job.dataType) { + switch(job.DataType) { case ERROR: _, err = getError(job.Data) return @@ -82,9 +93,9 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string rel = append(rel, []byte(funcname) ...) rel = append(rel, '\x00') client.mutex.Lock() - uid := uint32ToByte(client.UId) + uid := strconv.Itoa(int(client.UId)) client.UId ++ - rel = append(rel, uid[:] ...) + rel = append(rel, []byte(uid) ...) client.mutex.Unlock() rel = append(rel, '\x00') rel = append(rel, data ...) @@ -92,17 +103,18 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string return } var job *ClientJob - if job, err = client.ReadLastJob(JOB_CREATED); err != nil { + if job, err = client.readLastJob(JOB_CREATED); err != nil { return } handle = string(job.Data) + log.Println(handle) go func() { if flag & JOB_BG != JOB_BG { for { if job, err = client.ReadJob(); err != nil { return } - switch job.dataType { + switch job.DataType { case WORK_DATA, WORK_WARNING: case WORK_STATUS: case WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: @@ -114,16 +126,16 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string return } -func (client *Client) ReadLastJob(datatype uint32) (job *ClientJob, err os.Error){ +func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err os.Error){ for { if job, err = client.ReadJob(); err != nil { return } - if job.dataType == datatype { + if job.DataType == datatype { break } } - if job.dataType != datatype { + if job.DataType != datatype { err = os.NewError("No job got.") } return @@ -135,7 +147,7 @@ func (client *Client) Status(handle string) (known, running bool, numerator, den return } var job * ClientJob - if job, err = client.ReadLastJob(STATUS_RES); err != nil { + if job, err = client.readLastJob(STATUS_RES); err != nil { return } data := splitByteArray(job.Data, '\x00') @@ -159,7 +171,7 @@ func (client *Client) Echo(data []byte) (echo []byte, err os.Error) { return } var job *ClientJob - if job, err = client.ReadLastJob(ECHO_RES); err != nil { + if job, err = client.readLastJob(ECHO_RES); err != nil { return } return job.Data, err @@ -178,10 +190,10 @@ func (client *Client) LastResult() (job *ClientJob) { } func (client *Client) WriteJob(job *ClientJob) (err os.Error) { - return client.Write(job.Encode()) + return client.write(job.Encode()) } -func (client *Client) Write(buf []byte) (err os.Error) { +func (client *Client) write(buf []byte) (err os.Error) { var n int for i := 0; i < len(buf); i += n { n, err = client.conn.Write(buf[i:]) diff --git a/src/pkg/gearman/client/job.go b/src/pkg/gearman/client/job.go index cb7b1ce..f35ce39 100644 --- a/src/pkg/gearman/client/job.go +++ b/src/pkg/gearman/client/job.go @@ -7,14 +7,13 @@ import ( type ClientJob struct { Data []byte - Handle string - UniqueId string - magicCode, dataType uint32 + Handle, UniqueId string + magicCode, DataType uint32 } func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) { return &ClientJob{magicCode:magiccode, - dataType:datatype, + DataType:datatype, Data:data} } @@ -36,20 +35,19 @@ func DecodeClientJob(data []byte) (job * ClientJob, err os.Error) { func (job *ClientJob) Encode() (data []byte) { magiccode := uint32ToByte(job.magicCode) - datatype := uint32ToByte(job.dataType) + datatype := uint32ToByte(job.DataType) data = make([]byte, 0, 1024 * 64) data = append(data, magiccode[:] ...) data = append(data, datatype[:] ...) - data = append(data, []byte{0, 0, 0, 0} ...) l := len(job.Data) - data = append(data, job.Data ...) datalength := uint32ToByte(uint32(l)) - copy(data[8:12], datalength[:]) + data = append(data, datalength[:] ...) + data = append(data, job.Data ...) return } func (job * ClientJob) Result() (data []byte, err os.Error){ - switch job.dataType { + switch job.DataType { case WORK_FAIL: job.Handle = string(job.Data) err = os.NewError("Work fail.") @@ -72,7 +70,7 @@ func (job * ClientJob) Result() (data []byte, err os.Error){ } func (job *ClientJob) Update() (data []byte, err os.Error) { - if job.dataType != WORK_DATA && job.dataType != WORK_WARNING { + if job.DataType != WORK_DATA && job.DataType != WORK_WARNING { err = os.NewError("The job is not a update.") return } @@ -81,7 +79,7 @@ func (job *ClientJob) Update() (data []byte, err os.Error) { err = os.NewError("Invalid data.") return } - if job.dataType == WORK_WARNING { + if job.DataType == WORK_WARNING { err = os.NewError("Work warning") } job.Handle = string(s[0]) diff --git a/src/pkg/gearman/worker.go b/src/pkg/gearman/worker.go index 9e38e9d..79fed33 100644 --- a/src/pkg/gearman/worker.go +++ b/src/pkg/gearman/worker.go @@ -111,7 +111,7 @@ func (worker * Worker) Work() { if job == nil { break } - switch job.dataType { + switch job.DataType { case NO_JOB: // do nothing case ERROR: @@ -120,8 +120,8 @@ func (worker * Worker) Work() { case JOB_ASSIGN, JOB_ASSIGN_UNIQ: if err := worker.exec(job); err != nil { worker.ErrQueue <- err + continue } - fallthrough default: worker.JobQueue <- job } @@ -185,7 +185,7 @@ func (worker * Worker) exec(job *WorkerJob) (err os.Error) { jobdata := splitByteArray(job.Data, '\x00') job.Handle = string(jobdata[0]) funcname := string(jobdata[1]) - if job.dataType == JOB_ASSIGN { + if job.DataType == JOB_ASSIGN { job.Data = jobdata[2] } else { job.UniqueId = string(jobdata[2]) @@ -208,7 +208,7 @@ func (worker * Worker) exec(job *WorkerJob) (err os.Error) { } job.magicCode = REQ - job.dataType = datatype + job.DataType = datatype job.Data = result worker.WriteJob(job) diff --git a/src/pkg/gearman/worker/job.go b/src/pkg/gearman/worker/job.go index da68cc1..ec6b456 100644 --- a/src/pkg/gearman/worker/job.go +++ b/src/pkg/gearman/worker/job.go @@ -7,16 +7,15 @@ import ( type WorkerJob struct { Data []byte - Handle string - UniqueId string + Handle, UniqueId string client *jobClient - magicCode, dataType uint32 + magicCode, DataType uint32 Job } func NewWorkerJob(magiccode, datatype uint32, data []byte) (job *WorkerJob) { return &WorkerJob{magicCode:magiccode, - dataType: datatype, + DataType: datatype, Data:data} } @@ -38,7 +37,7 @@ func DecodeWorkerJob(data []byte) (job *WorkerJob, err os.Error) { func (job *WorkerJob) Encode() (data []byte) { magiccode := uint32ToByte(job.magicCode) - datatype := uint32ToByte(job.dataType) + datatype := uint32ToByte(job.DataType) data = make([]byte, 0, 1024 * 64) data = append(data, magiccode[:] ...) data = append(data, datatype[:] ...) diff --git a/src/pkg/gearman/worker/jobclient.go b/src/pkg/gearman/worker/jobclient.go index 638e0f9..5ab0546 100644 --- a/src/pkg/gearman/worker/jobclient.go +++ b/src/pkg/gearman/worker/jobclient.go @@ -3,7 +3,7 @@ package gearman import ( "net" "os" -// "log" + "log" ) type jobClient struct { @@ -44,12 +44,13 @@ func (client *jobClient) Work() { break } } + log.Println(string(rel)) job, err := DecodeWorkerJob(rel) if err != nil { client.worker.ErrQueue <- err continue } else { - switch(job.dataType) { + switch(job.DataType) { case NOOP: noop = true case NO_JOB: