diff --git a/README b/README index 4795023..3704d43 100644 --- a/README +++ b/README @@ -16,6 +16,9 @@ It was implemented a native protocol for both worker and client API. $ make client $ ./client +- Code format + gofmt -spaces=true -tabwidth=4 -w=true -tabindent=false $(DIR) + ---- xingxing http://mikespook.com diff --git a/example/Makefile b/example/Makefile index fb941db..6ef2081 100644 --- a/example/Makefile +++ b/example/Makefile @@ -16,3 +16,6 @@ include $(GOROOT)/src/Make.pkg %: %.go $(GC) $*.go $(LD) -o $@ $*.$O + +fmt: + gofmt -spaces=true -tabwidth=4 -w=true -tabindent=false ./ diff --git a/example/client.go b/example/client.go index c41d24a..48e868d 100644 --- a/example/client.go +++ b/example/client.go @@ -27,7 +27,7 @@ func main() { log.Println(string(data)) } } - + known, running, numerator, denominator, err := client.Status(handle) if err != nil { log.Println(err) @@ -36,7 +36,7 @@ func main() { log.Println("Unknown") } if running { - log.Printf("%g%%\n", float32(numerator) * 100 / float32(denominator)) + log.Printf("%g%%\n", float32(numerator)*100/float32(denominator)) } else { log.Println("Not running") } diff --git a/example/worker.go b/example/worker.go index 55971ec..97b35a9 100644 --- a/example/worker.go +++ b/example/worker.go @@ -26,21 +26,21 @@ func main() { var str string fmt.Scan(&str) switch str { - case "echo": - worker.Echo([]byte("Hello world!")) - var job *gearman.WorkerJob - for job = <-worker.JobQueue; job.DataType != gearman.ECHO_RES; job = <-worker.JobQueue { - log.Println(job) - } - log.Println(string(job.Data)) - case "quit": - worker.Close() - return - case "result": - job := <-worker.JobQueue - log.Println(string(job.Data)) - default: - log.Println("Unknown command") + case "echo": + worker.Echo([]byte("Hello world!")) + var job *gearman.WorkerJob + for job = <-worker.JobQueue; job.DataType != gearman.ECHO_RES; job = <-worker.JobQueue { + log.Println(job) + } + log.Println(string(job.Data)) + case "quit": + worker.Close() + return + case "result": + job := <-worker.JobQueue + log.Println(string(job.Data)) + default: + log.Println("Unknown command") } } }() diff --git a/src/pkg/gearman/Makefile b/src/pkg/gearman/Makefile index 14a92f2..9b2abfd 100644 --- a/src/pkg/gearman/Makefile +++ b/src/pkg/gearman/Makefile @@ -20,3 +20,6 @@ include $(GOROOT)/src/Make.pkg %: install %.go $(GC) $*.go $(LD) -o $@ $*.$O + +fmt: + gofmt -spaces=true -tabwidth=4 -w=true -tabindent=false ./ diff --git a/src/pkg/gearman/client.go b/src/pkg/gearman/client.go index ca06b97..006946e 100644 --- a/src/pkg/gearman/client.go +++ b/src/pkg/gearman/client.go @@ -8,7 +8,7 @@ import ( "os" "net" "sync" -// "log" + // "log" "strconv" ) @@ -22,18 +22,18 @@ usage: */ type Client struct { - mutex sync.Mutex - conn net.Conn + mutex sync.Mutex + conn net.Conn JobQueue chan *ClientJob incoming chan []byte - UId uint32 + UId uint32 } // Create a new client. -func NewClient() (client * Client){ - client = &Client{JobQueue:make(chan *ClientJob, QUEUE_CAP), - incoming:make(chan []byte, QUEUE_CAP), - UId:1} +func NewClient() (client *Client) { + client = &Client{JobQueue: make(chan *ClientJob, QUEUE_CAP), + incoming: make(chan []byte, QUEUE_CAP), + UId: 1} return } @@ -60,12 +60,12 @@ func (client *Client) read() (data []byte, err os.Error) { buf := make([]byte, BUFFER_SIZE) var n int if n, err = client.conn.Read(buf); err != nil { - if (err == os.EOF && n == 0) { + if err == os.EOF && n == 0 { break } return } - data = append(data, buf[0: n] ...) + data = append(data, buf[0:n]...) if n < BUFFER_SIZE { break } @@ -74,7 +74,7 @@ func (client *Client) read() (data []byte, err os.Error) { // split package start, end := 0, 4 tl := len(data) - for i := 0; i < tl; i ++{ + for i := 0; i < tl; i++ { if string(data[start:end]) == RES_STR { l := int(byteToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) total := l + 12 @@ -104,12 +104,12 @@ func (client *Client) ReadJob() (job *ClientJob, err os.Error) { if job, err = DecodeClientJob(rel); err != nil { return } else { - switch(job.DataType) { - case ERROR: - _, err = getError(job.Data) - return - case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: - client.JobQueue <- job + switch job.DataType { + case ERROR: + _, err = getError(job.Data) + return + case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: + client.JobQueue <- job } } return @@ -123,34 +123,34 @@ func (client *Client) ReadJob() (job *ClientJob, err os.Error) { // JOB_LOW | JOB_BG means the job is running with low level in background. func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err os.Error) { var datatype uint32 - if flag & JOB_LOW == JOB_LOW { - if flag & JOB_BG == JOB_BG { + if flag&JOB_LOW == JOB_LOW { + if flag&JOB_BG == JOB_BG { datatype = SUBMIT_JOB_LOW_BG } else { datatype = SUBMIT_JOB_LOW } - } else if flag & JOB_HIGH == JOB_HIGH { - if flag & JOB_BG == JOB_BG { + } else if flag&JOB_HIGH == JOB_HIGH { + if flag&JOB_BG == JOB_BG { datatype = SUBMIT_JOB_HIGH_BG } else { datatype = SUBMIT_JOB_HIGH } - } else if flag & JOB_BG == JOB_BG { + } else if flag&JOB_BG == JOB_BG { datatype = SUBMIT_JOB_BG } else { datatype = SUBMIT_JOB } - rel := make([]byte, 0, 1024 * 64) - rel = append(rel, []byte(funcname) ...) + rel := make([]byte, 0, 1024*64) + rel = append(rel, []byte(funcname)...) rel = append(rel, '\x00') client.mutex.Lock() uid := strconv.Itoa(int(client.UId)) - client.UId ++ - rel = append(rel, []byte(uid) ...) + client.UId++ + rel = append(rel, []byte(uid)...) client.mutex.Unlock() rel = append(rel, '\x00') - rel = append(rel, data ...) + rel = append(rel, data...) if err = client.WriteJob(NewClientJob(REQ, datatype, rel)); err != nil { return } @@ -160,16 +160,16 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string } handle = string(job.Data) go func() { - if flag & JOB_BG != JOB_BG { + if flag&JOB_BG != JOB_BG { for { if job, err = client.ReadJob(); err != nil { return } switch job.DataType { - case WORK_DATA, WORK_WARNING: - case WORK_STATUS: - case WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: - return + case WORK_DATA, WORK_WARNING: + case WORK_STATUS: + case WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: + return } } } @@ -178,7 +178,7 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string } // Internal read last job -func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err os.Error){ +func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err os.Error) { for { if job, err = client.ReadJob(); err != nil { return @@ -200,7 +200,7 @@ func (client *Client) Status(handle string) (known, running bool, numerator, den if err = client.WriteJob(NewClientJob(REQ, GET_STATUS, []byte(handle))); err != nil { return } - var job * ClientJob + var job *ClientJob if job, err = client.readLastJob(STATUS_RES); err != nil { return } @@ -245,7 +245,7 @@ func (client *Client) LastJob() (job *ClientJob) { if l == 0 { return } - for i := 0; i < l - 1; i ++ { + for i := 0; i < l-1; i++ { <-client.JobQueue } } diff --git a/src/pkg/gearman/client/job.go b/src/pkg/gearman/client/job.go index 0f33293..d1062b8 100644 --- a/src/pkg/gearman/client/job.go +++ b/src/pkg/gearman/client/job.go @@ -6,25 +6,25 @@ package gearman import ( "os" -// "log" + // "log" ) // Client side job type ClientJob struct { - Data []byte - Handle, UniqueId string + Data []byte + Handle, UniqueId string magicCode, DataType uint32 } // Create a new job func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) { - return &ClientJob{magicCode:magiccode, - DataType:datatype, - Data:data} + return &ClientJob{magicCode: magiccode, + DataType: datatype, + Data: data} } // Decode a job from byte slice -func DecodeClientJob(data []byte) (job * ClientJob, err os.Error) { +func DecodeClientJob(data []byte) (job *ClientJob, err os.Error) { if len(data) < 12 { err = os.NewError("Data length is too small.") return @@ -32,8 +32,8 @@ func DecodeClientJob(data []byte) (job * ClientJob, err os.Error) { datatype := byteToUint32([4]byte{data[4], data[5], data[6], data[7]}) l := byteToUint32([4]byte{data[8], data[9], data[10], data[11]}) if len(data[12:]) != int(l) { - err = os.NewError("Invalid data length.") - return + err = os.NewError("Invalid data length.") + return } data = data[12:] job = NewClientJob(RES, datatype, data) @@ -44,36 +44,36 @@ func DecodeClientJob(data []byte) (job * ClientJob, err os.Error) { func (job *ClientJob) Encode() (data []byte) { magiccode := uint32ToByte(job.magicCode) datatype := uint32ToByte(job.DataType) - data = make([]byte, 0, 1024 * 64) - data = append(data, magiccode[:] ...) - data = append(data, datatype[:] ...) + data = make([]byte, 0, 1024*64) + data = append(data, magiccode[:]...) + data = append(data, datatype[:]...) l := len(job.Data) datalength := uint32ToByte(uint32(l)) - data = append(data, datalength[:] ...) - data = append(data, job.Data ...) + data = append(data, datalength[:]...) + data = append(data, job.Data...) return } // Extract the job's result. -func (job * ClientJob) Result() (data []byte, err os.Error){ +func (job *ClientJob) Result() (data []byte, err os.Error) { switch job.DataType { - case WORK_FAIL: - job.Handle = string(job.Data) - err = os.NewError("Work fail.") + case WORK_FAIL: + job.Handle = string(job.Data) + err = os.NewError("Work fail.") + return + case WORK_EXCEPTION: + err = os.NewError("Work exception.") + fallthrough + case WORK_COMPLETE: + s := splitByteArray(job.Data, '\x00') + if len(s) != 2 { + err = os.NewError("Invalid data.") return - case WORK_EXCEPTION: - err = os.NewError("Work exception.") - fallthrough - case WORK_COMPLETE: - s := splitByteArray(job.Data, '\x00') - if len(s) != 2 { - err = os.NewError("Invalid data.") - return - } - job.Handle = string(s[0]) - data = s[1] - default: - err = os.NewError("The job is not a result.") + } + job.Handle = string(s[0]) + data = s[1] + default: + err = os.NewError("The job is not a result.") } return } diff --git a/src/pkg/gearman/client_test.go b/src/pkg/gearman/client_test.go index fead96f..9c9db84 100644 --- a/src/pkg/gearman/client_test.go +++ b/src/pkg/gearman/client_test.go @@ -2,7 +2,7 @@ package gearman import ( "testing" -// "os" + // "os" ) var client *Client @@ -11,14 +11,14 @@ func init() { client = NewClient() } -func TestClientAddServer(t * testing.T) { +func TestClientAddServer(t *testing.T) { t.Log("Add local server 127.0.0.1:4730") if err := client.AddServer("127.0.0.1:4730"); err != nil { t.Error(err) } } -func TestClientEcho(t * testing.T) { +func TestClientEcho(t *testing.T) { if echo, err := client.Echo([]byte("Hello world")); err != nil { t.Error(err) } else { @@ -26,17 +26,16 @@ func TestClientEcho(t * testing.T) { } } -func TestClientDo(t * testing.T) { - if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW | JOB_BG); err != nil { +func TestClientDo(t *testing.T) { + if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { t.Error(err) } else { t.Log(handle) } } -func TestClientClose(t * testing.T) { +func TestClientClose(t *testing.T) { if err := client.Close(); err != nil { t.Error(err) } } - diff --git a/src/pkg/gearman/gearman.go b/src/pkg/gearman/gearman.go index d1da4ad..81a0a54 100644 --- a/src/pkg/gearman/gearman.go +++ b/src/pkg/gearman/gearman.go @@ -28,44 +28,44 @@ const ( // \x00REQ - REQ = 5391697 + REQ = 5391697 REQ_STR = "\x00REQ" // \x00RES - RES = 5391699 + RES = 5391699 RES_STR = "\x00RES" // package data type - CAN_DO = 1 - CANT_DO = 2 + CAN_DO = 1 + CANT_DO = 2 RESET_ABILITIES = 3 - PRE_SLEEP = 4 - NOOP = 6 - JOB_CREATED = 8 - GRAB_JOB = 9 - NO_JOB = 10 - JOB_ASSIGN = 11 - WORK_STATUS = 12 - WORK_COMPLETE = 13 - WORK_FAIL = 14 - GET_STATUS = 15 - ECHO_REQ = 16 - ECHO_RES = 17 - ERROR = 19 - STATUS_RES = 20 - SET_CLIENT_ID = 22 - CAN_DO_TIMEOUT = 23 - WORK_EXCEPTION = 25 - WORK_DATA = 28 - WORK_WARNING = 29 - GRAB_JOB_UNIQ = 30 + PRE_SLEEP = 4 + NOOP = 6 + JOB_CREATED = 8 + GRAB_JOB = 9 + NO_JOB = 10 + JOB_ASSIGN = 11 + WORK_STATUS = 12 + WORK_COMPLETE = 13 + WORK_FAIL = 14 + GET_STATUS = 15 + ECHO_REQ = 16 + ECHO_RES = 17 + ERROR = 19 + STATUS_RES = 20 + SET_CLIENT_ID = 22 + CAN_DO_TIMEOUT = 23 + WORK_EXCEPTION = 25 + WORK_DATA = 28 + WORK_WARNING = 29 + GRAB_JOB_UNIQ = 30 JOB_ASSIGN_UNIQ = 31 - SUBMIT_JOB = 7 - SUBMIT_JOB_BG = 18 - SUBMIT_JOB_HIGH = 21 + SUBMIT_JOB = 7 + SUBMIT_JOB_BG = 18 + SUBMIT_JOB_HIGH = 21 SUBMIT_JOB_HIGH_BG = 32 - SUBMIT_JOB_LOW = 33 - SUBMIT_JOB_LOW_BG = 34 + SUBMIT_JOB_LOW = 33 + SUBMIT_JOB_LOW_BG = 34 // Job type // JOB_NORMAL | JOB_BG means a normal level job run in background @@ -85,7 +85,7 @@ type Job interface { } // Splite the byte array by a byte -func splitByteArray(slice []byte, spot byte) (data [][]byte){ +func splitByteArray(slice []byte, spot byte) (data [][]byte) { data = make([][]byte, 0, 10) start, end := 0, 0 for i, v := range slice { @@ -93,9 +93,9 @@ func splitByteArray(slice []byte, spot byte) (data [][]byte){ if start != end { data = append(data, slice[start:end]) } - start, end = i + 1, i + 1 + start, end = i+1, i+1 } else { - end ++ + end++ } } data = append(data, slice[start:]) @@ -110,16 +110,16 @@ func getError(data []byte) (eno os.Errno, err os.Error) { return } l := len(rel[0]) - eno = os.Errno(byteToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l - 1]})) + eno = os.Errno(byteToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) err = os.NewError(string(rel[1])) return } // Decode [4]byte to uint32 func byteToUint32(buf [4]byte) uint32 { - return uint32(buf[0]) << 24 + - uint32(buf[1]) << 16 + - uint32(buf[2]) << 8 + + return uint32(buf[0])<<24 + + uint32(buf[1])<<16 + + uint32(buf[2])<<8 + uint32(buf[3]) } diff --git a/src/pkg/gearman/worker.go b/src/pkg/gearman/worker.go index 2b90000..16c2171 100644 --- a/src/pkg/gearman/worker.go +++ b/src/pkg/gearman/worker.go @@ -4,10 +4,10 @@ package gearman -import( +import ( "os" "sync" -// "log" + // "log" ) // The definition of the callback function. @@ -32,14 +32,14 @@ func foobar(job *WorkerJob) (data []byte, err os.Error) { //plaplapla... return } -*/ +*/ type Worker struct { - clients []*jobClient + clients []*jobClient functions JobFunctionMap - running bool + running bool incoming chan *WorkerJob - mutex sync.Mutex + mutex sync.Mutex JobQueue chan *WorkerJob ErrQueue chan os.Error } @@ -48,20 +48,20 @@ type Worker struct { func NewWorker() (worker *Worker) { worker = &Worker{ // job server list - clients:make([]*jobClient, 0, WORKER_SERVER_CAP), + clients: make([]*jobClient, 0, WORKER_SERVER_CAP), // function list functions: make(JobFunctionMap), - incoming: make(chan *WorkerJob, QUEUE_CAP), - JobQueue: make(chan *WorkerJob, QUEUE_CAP), - ErrQueue: make(chan os.Error, QUEUE_CAP), - running: true, + incoming: make(chan *WorkerJob, QUEUE_CAP), + JobQueue: make(chan *WorkerJob, QUEUE_CAP), + ErrQueue: make(chan os.Error, QUEUE_CAP), + running: true, } return } // Add a server. The addr should be 'host:port' format. // The connection is established at this time. -func (worker * Worker) AddServer(addr string) (err os.Error) { +func (worker *Worker) AddServer(addr string) (err os.Error) { worker.mutex.Lock() defer worker.mutex.Unlock() @@ -76,7 +76,7 @@ func (worker * Worker) AddServer(addr string) (err os.Error) { } n := len(worker.clients) - worker.clients = worker.clients[0: n + 1] + worker.clients = worker.clients[0 : n+1] worker.clients[n] = server return } @@ -85,8 +85,8 @@ func (worker * Worker) AddServer(addr string) (err os.Error) { // Add a function. // Plz added job servers first, then functions. // The API will tell every connected job server that 'I can do this' -func (worker * Worker) AddFunction(funcname string, - f JobFunction, timeout uint32) (err os.Error) { +func (worker *Worker) AddFunction(funcname string, +f JobFunction, timeout uint32) (err os.Error) { if len(worker.clients) < 1 { return os.NewError("Did not connect to Job Server.") } @@ -103,7 +103,7 @@ func (worker * Worker) AddFunction(funcname string, datatype = CAN_DO_TIMEOUT data = []byte(funcname + "\x00") t := uint32ToByte(timeout) - data = append(data, t[:] ...) + data = append(data, t[:]...) } job := NewWorkerJob(REQ, datatype, data) worker.WriteJob(job) @@ -112,7 +112,7 @@ func (worker * Worker) AddFunction(funcname string, // Remove a function. // Tell job servers 'I can not do this now' at the same time. -func (worker * Worker) RemoveFunction(funcname string) (err os.Error) { +func (worker *Worker) RemoveFunction(funcname string) (err os.Error) { worker.mutex.Lock() defer worker.mutex.Unlock() @@ -126,31 +126,31 @@ func (worker * Worker) RemoveFunction(funcname string) (err os.Error) { } // Main loop -func (worker * Worker) Work() { +func (worker *Worker) Work() { for _, v := range worker.clients { go v.Work() } for worker.running { select { - case job := <-worker.incoming: - if job == nil { - break - } - switch job.DataType { - case NO_JOB: - // do nothing - case ERROR: - _, err := getError(job.Data) + case job := <-worker.incoming: + if job == nil { + break + } + switch job.DataType { + case NO_JOB: + // do nothing + case ERROR: + _, err := getError(job.Data) + worker.ErrQueue <- err + case JOB_ASSIGN, JOB_ASSIGN_UNIQ: + go func() { + if err := worker.exec(job); err != nil { worker.ErrQueue <- err - case JOB_ASSIGN, JOB_ASSIGN_UNIQ: - go func() { - if err := worker.exec(job); err != nil { - worker.ErrQueue <- err - } - }() - default: - worker.JobQueue <- job - } + } + }() + default: + worker.JobQueue <- job + } } } } @@ -159,12 +159,12 @@ func (worker * Worker) Work() { // If there are more than one job in the queue, // the last one will be returned, // the others will be lost. -func (worker * Worker) LastJob() (job *WorkerJob) { +func (worker *Worker) LastJob() (job *WorkerJob) { if l := len(worker.JobQueue); l != 1 { if l == 0 { return } - for i := 0; i < l - 1; i ++ { + for i := 0; i < l-1; i++ { <-worker.JobQueue } } @@ -172,7 +172,7 @@ func (worker * Worker) LastJob() (job *WorkerJob) { } // Close. -func (worker * Worker) Close() (err os.Error){ +func (worker *Worker) Close() (err os.Error) { worker.running = false for _, v := range worker.clients { err = v.Close() @@ -184,25 +184,25 @@ func (worker * Worker) Close() (err os.Error){ // Write a job to job server. // Here, the job's mean is not the oraginal mean. // Just looks like a network package for job's result or tell job server, there was a fail. -func (worker * Worker) WriteJob(job *WorkerJob) (err os.Error) { +func (worker *Worker) WriteJob(job *WorkerJob) (err os.Error) { e := make(chan os.Error) for _, v := range worker.clients { go func() { e <- v.WriteJob(job) }() } - return <- e + return <-e } // Send a something out, get the samething back. -func (worker * Worker) Echo(data []byte) (err os.Error) { +func (worker *Worker) Echo(data []byte) (err os.Error) { job := NewWorkerJob(REQ, ECHO_REQ, data) return worker.WriteJob(job) } // Remove all of functions. // Both from the worker or job servers. -func (worker * Worker) Reset() (err os.Error){ +func (worker *Worker) Reset() (err os.Error) { job := NewWorkerJob(REQ, RESET_ABILITIES, nil) err = worker.WriteJob(job) worker.functions = make(JobFunctionMap) @@ -210,13 +210,13 @@ func (worker * Worker) Reset() (err os.Error){ } // Set the worker's unique id. -func (worker * Worker) SetId(id string) (err os.Error) { +func (worker *Worker) SetId(id string) (err os.Error) { job := NewWorkerJob(REQ, SET_CLIENT_ID, []byte(id)) return worker.WriteJob(job) } // Execute the job. And send back the result. -func (worker * Worker) exec(job *WorkerJob) (err os.Error) { +func (worker *Worker) exec(job *WorkerJob) (err os.Error) { jobdata := splitByteArray(job.Data, '\x00') job.Handle = string(jobdata[0]) funcname := string(jobdata[1]) @@ -234,7 +234,7 @@ func (worker * Worker) exec(job *WorkerJob) (err os.Error) { var datatype uint32 if err == nil { datatype = WORK_COMPLETE - } else{ + } else { if result == nil { datatype = WORK_FAIL } else { diff --git a/src/pkg/gearman/worker/job.go b/src/pkg/gearman/worker/job.go index 8215463..ec2bbb3 100644 --- a/src/pkg/gearman/worker/job.go +++ b/src/pkg/gearman/worker/job.go @@ -7,23 +7,23 @@ package gearman import ( "os" "strconv" -// "log" + // "log" ) // Worker side job type WorkerJob struct { - Data []byte - Handle, UniqueId string - client *jobClient + Data []byte + Handle, UniqueId string + client *jobClient magicCode, DataType uint32 Job } // Create a new job func NewWorkerJob(magiccode, datatype uint32, data []byte) (job *WorkerJob) { - return &WorkerJob{magicCode:magiccode, + return &WorkerJob{magicCode: magiccode, DataType: datatype, - Data:data} + Data: data} } // Decode job from byte slice @@ -35,8 +35,8 @@ func DecodeWorkerJob(data []byte) (job *WorkerJob, err os.Error) { datatype := byteToUint32([4]byte{data[4], data[5], data[6], data[7]}) l := byteToUint32([4]byte{data[8], data[9], data[10], data[11]}) if len(data[12:]) != int(l) { - err = os.NewError("Invalid data length.") - return + err = os.NewError("Invalid data length.") + return } data = data[12:] job = NewWorkerJob(RES, datatype, data) @@ -47,17 +47,17 @@ func DecodeWorkerJob(data []byte) (job *WorkerJob, err os.Error) { func (job *WorkerJob) Encode() (data []byte) { magiccode := uint32ToByte(job.magicCode) datatype := uint32ToByte(job.DataType) - data = make([]byte, 0, 1024 * 64) - data = append(data, magiccode[:] ...) - data = append(data, datatype[:] ...) - data = append(data, []byte{0, 0, 0, 0} ...) + data = make([]byte, 0, 1024*64) + data = append(data, magiccode[:]...) + data = append(data, datatype[:]...) + data = append(data, []byte{0, 0, 0, 0}...) l := len(job.Data) if job.Handle != "" { - data = append(data, []byte(job.Handle) ...) + data = append(data, []byte(job.Handle)...) data = append(data, 0) l += len(job.Handle) + 1 } - data = append(data, job.Data ...) + data = append(data, job.Data...) datalength := uint32ToByte(uint32(l)) copy(data[8:12], datalength[:]) return @@ -65,9 +65,9 @@ func (job *WorkerJob) Encode() (data []byte) { // Send some datas to client. // Using this in a job's executing. -func (job * WorkerJob) UpdateData(data []byte, iswaring bool) (err os.Error) { +func (job *WorkerJob) UpdateData(data []byte, iswaring bool) (err os.Error) { result := append([]byte(job.Handle), 0) - result = append(result, data ...) + result = append(result, data...) var datatype uint32 if iswaring { datatype = WORK_WARNING @@ -79,13 +79,11 @@ func (job * WorkerJob) UpdateData(data []byte, iswaring bool) (err os.Error) { // Update status. // Tall client how many percent job has been executed. -func (job * WorkerJob) UpdateStatus(numerator, denominator int) (err os.Error) { +func (job *WorkerJob) UpdateStatus(numerator, denominator int) (err os.Error) { n := []byte(strconv.Itoa(numerator)) d := []byte(strconv.Itoa(denominator)) result := append([]byte(job.Handle), 0) - result = append(result, n ...) - result = append(result, d ...) + result = append(result, n...) + result = append(result, d...) return job.client.WriteJob(NewWorkerJob(REQ, WORK_STATUS, result)) } - - diff --git a/src/pkg/gearman/worker/jobclient.go b/src/pkg/gearman/worker/jobclient.go index 8d3c57b..01b7ed3 100644 --- a/src/pkg/gearman/worker/jobclient.go +++ b/src/pkg/gearman/worker/jobclient.go @@ -7,14 +7,14 @@ package gearman import ( "net" "os" -// "log" + // "log" ) // The client of job server. type jobClient struct { - conn net.Conn - worker *Worker - running bool + conn net.Conn + worker *Worker + running bool incoming chan []byte } @@ -24,12 +24,12 @@ func newJobClient(addr string, worker *Worker) (jobclient *jobClient, err os.Err if err != nil { return nil, err } - jobclient = &jobClient{conn:conn, worker:worker, running:true, incoming: make(chan []byte, QUEUE_CAP)} + jobclient = &jobClient{conn: conn, worker: worker, running: true, incoming: make(chan []byte, QUEUE_CAP)} return jobclient, err } // Internal read -func (client *jobClient) read() (data []byte, err os.Error){ +func (client *jobClient) read() (data []byte, err os.Error) { if len(client.incoming) > 0 { // incoming queue is not empty data = <-client.incoming @@ -44,7 +44,7 @@ func (client *jobClient) read() (data []byte, err os.Error){ } return } - data = append(data, buf[0: n] ...) + data = append(data, buf[0:n]...) if n < BUFFER_SIZE { break } @@ -53,7 +53,7 @@ func (client *jobClient) read() (data []byte, err os.Error){ // split package start := 0 tl := len(data) - for i := 0; i < tl; i ++{ + for i := 0; i < tl; i++ { if string(data[start:start+4]) == RES_STR { l := int(byteToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) total := l + 12 @@ -90,15 +90,15 @@ func (client *jobClient) Work() { client.worker.ErrQueue <- err continue } else { - switch(job.DataType) { - case NOOP: - noop = true - case NO_JOB: - noop = false - client.WriteJob(NewWorkerJob(REQ, PRE_SLEEP, nil)) - case ECHO_RES, JOB_ASSIGN_UNIQ, JOB_ASSIGN: - job.client = client - client.worker.incoming <- job + switch job.DataType { + case NOOP: + noop = true + case NO_JOB: + noop = false + client.WriteJob(NewWorkerJob(REQ, PRE_SLEEP, nil)) + case ECHO_RES, JOB_ASSIGN_UNIQ, JOB_ASSIGN: + job.client = client + client.worker.incoming <- job } } } diff --git a/src/pkg/gearman/worker_test.go b/src/pkg/gearman/worker_test.go index 8feae31..5aa6288 100644 --- a/src/pkg/gearman/worker_test.go +++ b/src/pkg/gearman/worker_test.go @@ -41,7 +41,7 @@ func TestWorkerAddFunction(t *testing.T) { } } -func TestWorkerEcho(t * testing.T) { +func TestWorkerEcho(t *testing.T) { if err := worker.Echo([]byte("Hello World")); err != nil { t.Error(err) } @@ -56,13 +56,13 @@ func TestWorkerResult(t *testing.T) { } */ -func TestWorkerRemoveFunction(t * testing.T) { +func TestWorkerRemoveFunction(t *testing.T) { if err := worker.RemoveFunction("foobar"); err != nil { t.Error(err) } } -func TestWorkerReset(t * testing.T) { +func TestWorkerReset(t *testing.T) { if err := worker.Reset(); err != nil { t.Error(err) }