From c66d460235c6f5e27922a2a9f711bddfde97124e Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 6 Sep 2011 18:04:14 +0800 Subject: [PATCH] damn --- src/pkg/gearman/Makefile | 6 +- src/pkg/gearman/client.go | 394 +++++++++--------- src/pkg/gearman/client/job.go | 99 +++++ src/pkg/gearman/clientjob.go | 99 ----- src/pkg/gearman/gearman.go | 158 +++---- src/pkg/gearman/worker.go | 312 +++++++------- .../gearman/{workerjob.go => worker/job.go} | 0 .../jobclient.go} | 0 src/pkg/gearman/worker_test.go | 1 - 9 files changed, 534 insertions(+), 535 deletions(-) create mode 100644 src/pkg/gearman/client/job.go delete mode 100644 src/pkg/gearman/clientjob.go rename src/pkg/gearman/{workerjob.go => worker/job.go} (100%) rename src/pkg/gearman/{workerjobclient.go => worker/jobclient.go} (100%) diff --git a/src/pkg/gearman/Makefile b/src/pkg/gearman/Makefile index 76e5ee9..9b2abfd 100644 --- a/src/pkg/gearman/Makefile +++ b/src/pkg/gearman/Makefile @@ -7,10 +7,10 @@ include $(GOROOT)/src/Make.inc TARG=gearman GOFILES=\ gearman.go\ - workerjob.go\ - workerjobclient.go\ + worker/job.go\ + worker/jobclient.go\ worker.go\ - clientjob.go\ + client/job.go\ client.go\ CLEANFILES+=gearman_test diff --git a/src/pkg/gearman/client.go b/src/pkg/gearman/client.go index cacde9f..d506fe0 100644 --- a/src/pkg/gearman/client.go +++ b/src/pkg/gearman/client.go @@ -5,12 +5,12 @@ package gearman import ( - "os" - "net" - "sync" - // "log" - "strconv" - "bytes" + "os" + "net" + "sync" + // "log" + "strconv" + "bytes" ) /* @@ -23,97 +23,97 @@ usage: */ type Client struct { - mutex sync.Mutex - conn net.Conn - JobQueue chan *ClientJob - incoming chan []byte - UId uint32 + mutex sync.Mutex + conn net.Conn + JobQueue chan *ClientJob + incoming chan []byte + UId uint32 } // Create a new client. func NewClient() (client *Client) { - client = &Client{JobQueue: make(chan *ClientJob, QUEUE_CAP), - incoming: make(chan []byte, QUEUE_CAP), - UId: 1} - return + client = &Client{JobQueue: make(chan *ClientJob, QUEUE_CAP), + incoming: make(chan []byte, QUEUE_CAP), + UId: 1} + return } // Add a server. // In this version, one client connect to one job server. // Sample is better. Plz do the load balancing by your self. func (client *Client) AddServer(addr string) (err os.Error) { - conn, err := net.Dial(TCP, addr) - if err != nil { - return - } - client.conn = conn - return + conn, err := net.Dial(TCP, addr) + if err != nil { + return + } + client.conn = conn + return } // Internal read func (client *Client) read() (data []byte, err os.Error) { - if len(client.incoming) > 0 { - // incoming queue is not empty - data = <-client.incoming - } else { - // empty queue, read data from socket - for { - buf := make([]byte, BUFFER_SIZE) - var n int - if n, err = client.conn.Read(buf); err != nil { - if err == os.EOF && n == 0 { - break - } - return - } - data = append(data, buf[0:n]...) - if n < BUFFER_SIZE { - break - } - } - } - // split package - start, end := 0, 4 - tl := len(data) - for i := 0; i < tl; i++ { - if string(data[start:end]) == RES_STR { - l := int(byteToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) - total := l + 12 - if total == tl { - return - } else { - client.incoming <- data[total:] - data = data[:total] - return - } - } else { - start++ - end++ - } - } - err = os.NewError("Invalid data struct.") - return + if len(client.incoming) > 0 { + // incoming queue is not empty + data = <-client.incoming + } else { + // empty queue, read data from socket + for { + buf := make([]byte, BUFFER_SIZE) + var n int + if n, err = client.conn.Read(buf); err != nil { + if err == os.EOF && n == 0 { + break + } + return + } + data = append(data, buf[0:n]...) + if n < BUFFER_SIZE { + break + } + } + } + // split package + start, end := 0, 4 + tl := len(data) + for i := 0; i < tl; i++ { + if string(data[start:end]) == RES_STR { + l := int(byteToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) + total := l + 12 + if total == tl { + return + } else { + client.incoming <- data[total:] + data = data[:total] + return + } + } else { + start++ + end++ + } + } + err = os.NewError("Invalid data struct.") + return } // Read a job from job server. // This function will return the job, and add it to the job queue. func (client *Client) ReadJob() (job *ClientJob, err os.Error) { - var rel []byte - if rel, err = client.read(); err != nil { - return - } - if job, err = DecodeClientJob(rel); err != nil { - return - } else { - switch job.DataType { - case ERROR: - _, err = getError(job.Data) - return - case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: - client.JobQueue <- job - } - } - return + var rel []byte + if rel, err = client.read(); err != nil { + return + } + if job, err = DecodeClientJob(rel); err != nil { + return + } else { + switch job.DataType { + case ERROR: + _, err = getError(job.Data) + return + case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: + client.JobQueue <- job + } + } + return } // Do the function. @@ -123,156 +123,156 @@ func (client *Client) ReadJob() (job *ClientJob, err os.Error) { // and if it is background job: JOB_BG. // JOB_LOW | JOB_BG means the job is running with low level in background. func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err os.Error) { - var datatype uint32 - if flag&JOB_LOW == JOB_LOW { - if flag&JOB_BG == JOB_BG { - datatype = SUBMIT_JOB_LOW_BG - } else { - datatype = SUBMIT_JOB_LOW - } - } else if flag&JOB_HIGH == JOB_HIGH { - if flag&JOB_BG == JOB_BG { - datatype = SUBMIT_JOB_HIGH_BG - } else { - datatype = SUBMIT_JOB_HIGH - } - } else if flag&JOB_BG == JOB_BG { - datatype = SUBMIT_JOB_BG - } else { - datatype = SUBMIT_JOB - } + var datatype uint32 + if flag&JOB_LOW == JOB_LOW { + if flag&JOB_BG == JOB_BG { + datatype = SUBMIT_JOB_LOW_BG + } else { + datatype = SUBMIT_JOB_LOW + } + } else if flag&JOB_HIGH == JOB_HIGH { + if flag&JOB_BG == JOB_BG { + datatype = SUBMIT_JOB_HIGH_BG + } else { + datatype = SUBMIT_JOB_HIGH + } + } else if flag&JOB_BG == JOB_BG { + datatype = SUBMIT_JOB_BG + } else { + datatype = SUBMIT_JOB + } - rel := make([]byte, 0, 1024*64) - rel = append(rel, []byte(funcname)...) - rel = append(rel, '\x00') - client.mutex.Lock() - uid := strconv.Itoa(int(client.UId)) - client.UId++ - rel = append(rel, []byte(uid)...) - client.mutex.Unlock() - rel = append(rel, '\x00') - rel = append(rel, data...) - if err = client.WriteJob(NewClientJob(REQ, datatype, rel)); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(JOB_CREATED); err != nil { - return - } - handle = string(job.Data) - go func() { - if flag&JOB_BG != JOB_BG { - for { - if job, err = client.ReadJob(); err != nil { - return - } - switch job.DataType { - case WORK_DATA, WORK_WARNING: - case WORK_STATUS: - case WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: - return - } - } - } - }() - return + rel := make([]byte, 0, 1024*64) + rel = append(rel, []byte(funcname)...) + rel = append(rel, '\x00') + client.mutex.Lock() + uid := strconv.Itoa(int(client.UId)) + client.UId++ + rel = append(rel, []byte(uid)...) + client.mutex.Unlock() + rel = append(rel, '\x00') + rel = append(rel, data...) + if err = client.WriteJob(NewClientJob(REQ, datatype, rel)); err != nil { + return + } + var job *ClientJob + if job, err = client.readLastJob(JOB_CREATED); err != nil { + return + } + handle = string(job.Data) + go func() { + if flag&JOB_BG != JOB_BG { + for { + if job, err = client.ReadJob(); err != nil { + return + } + switch job.DataType { + case WORK_DATA, WORK_WARNING: + case WORK_STATUS: + case WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION: + return + } + } + } + }() + return } // Internal read last job func (client *Client) readLastJob(datatype uint32) (job *ClientJob, err os.Error) { - for { - if job, err = client.ReadJob(); err != nil { - return - } - if job.DataType == datatype { - break - } - } - if job.DataType != datatype { - err = os.NewError("No job got.") - } - return + for { + if job, err = client.ReadJob(); err != nil { + return + } + if job.DataType == datatype { + break + } + } + if job.DataType != datatype { + err = os.NewError("No job got.") + } + return } // Get job status from job server. // !!!Not fully tested.!!! func (client *Client) Status(handle string) (known, running bool, numerator, denominator uint, err os.Error) { - if err = client.WriteJob(NewClientJob(REQ, GET_STATUS, []byte(handle))); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(STATUS_RES); err != nil { - return - } - data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) - if len(data) != 5 { - err = os.NewError("Data Error.") - return - } - if handle != string(data[0]) { - err = os.NewError("Invalid handle.") - return - } - known = data[1][0] == '1' - running = data[2][0] == '1' - if numerator, err = strconv.Atoui(string(data[3][0])); err != nil { - return - } - if denominator, err = strconv.Atoui(string(data[4][0])); err != nil { - return - } - return + if err = client.WriteJob(NewClientJob(REQ, GET_STATUS, []byte(handle))); err != nil { + return + } + var job *ClientJob + if job, err = client.readLastJob(STATUS_RES); err != nil { + return + } + data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) + if len(data) != 5 { + err = os.NewError("Data Error.") + return + } + if handle != string(data[0]) { + err = os.NewError("Invalid handle.") + return + } + known = data[1][0] == '1' + running = data[2][0] == '1' + if numerator, err = strconv.Atoui(string(data[3][0])); err != nil { + return + } + if denominator, err = strconv.Atoui(string(data[4][0])); err != nil { + return + } + return } // Send a something out, get the samething back. func (client *Client) Echo(data []byte) (echo []byte, err os.Error) { - if err = client.WriteJob(NewClientJob(REQ, ECHO_REQ, data)); err != nil { - return - } - var job *ClientJob - if job, err = client.readLastJob(ECHO_RES); err != nil { - return - } - echo = job.Data - return + if err = client.WriteJob(NewClientJob(REQ, ECHO_REQ, data)); err != nil { + return + } + var job *ClientJob + if job, err = client.readLastJob(ECHO_RES); err != nil { + return + } + echo = job.Data + return } // Get the last job. // the job means a network package. // Normally, it is the job executed result. func (client *Client) LastJob() (job *ClientJob) { - if l := len(client.JobQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l-1; i++ { - <-client.JobQueue - } - } - return <-client.JobQueue + if l := len(client.JobQueue); l != 1 { + if l == 0 { + return + } + for i := 0; i < l-1; i++ { + <-client.JobQueue + } + } + return <-client.JobQueue } // Send the job to job server. func (client *Client) WriteJob(job *ClientJob) (err os.Error) { - return client.write(job.Encode()) + return client.write(job.Encode()) } // Internal write func (client *Client) write(buf []byte) (err os.Error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = client.conn.Write(buf[i:]) - if err != nil { - return - } - } - return + var n int + for i := 0; i < len(buf); i += n { + n, err = client.conn.Write(buf[i:]) + if err != nil { + return + } + } + return } // Close. func (client *Client) Close() (err os.Error) { - err = client.conn.Close() - close(client.JobQueue) - return + err = client.conn.Close() + close(client.JobQueue) + return } diff --git a/src/pkg/gearman/client/job.go b/src/pkg/gearman/client/job.go new file mode 100644 index 0000000..56171c8 --- /dev/null +++ b/src/pkg/gearman/client/job.go @@ -0,0 +1,99 @@ +// Copyright 2011 Xing Xing All rights reserved. +// Use of this source code is governed by a MIT +// license that can be found in the LICENSE file. + +package gearman + +import ( + "os" + "bytes" + // "log" +) + +// Client side job +type ClientJob struct { + Data []byte + Handle, UniqueId string + magicCode, DataType uint32 +} + +// Create a new job +func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) { + return &ClientJob{magicCode: magiccode, + DataType: datatype, + Data: data} +} + +// Decode a job from byte slice +func DecodeClientJob(data []byte) (job *ClientJob, err os.Error) { + if len(data) < 12 { + err = os.NewError("Data length is too small.") + return + } + datatype := byteToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := byteToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + err = os.NewError("Invalid data length.") + return + } + data = data[12:] + job = NewClientJob(RES, datatype, data) + return +} + +// Encode a job to byte slice +func (job *ClientJob) Encode() (data []byte) { + magiccode := uint32ToByte(job.magicCode) + datatype := uint32ToByte(job.DataType) + data = make([]byte, 0, 1024*64) + data = append(data, magiccode[:]...) + data = append(data, datatype[:]...) + l := len(job.Data) + datalength := uint32ToByte(uint32(l)) + data = append(data, datalength[:]...) + data = append(data, job.Data...) + return +} + +// Extract the job's result. +func (job *ClientJob) Result() (data []byte, err os.Error) { + switch job.DataType { + case WORK_FAIL: + job.Handle = string(job.Data) + err = os.NewError("Work fail.") + return + case WORK_EXCEPTION: + err = os.NewError("Work exception.") + fallthrough + case WORK_COMPLETE: + s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + err = os.NewError("Invalid data.") + return + } + job.Handle = string(s[0]) + data = s[1] + default: + err = os.NewError("The job is not a result.") + } + return +} + +// Extract the job's update +func (job *ClientJob) Update() (data []byte, err os.Error) { + if job.DataType != WORK_DATA && job.DataType != WORK_WARNING { + err = os.NewError("The job is not a update.") + return + } + s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) + if len(s) != 2 { + err = os.NewError("Invalid data.") + return + } + if job.DataType == WORK_WARNING { + err = os.NewError("Work warning") + } + job.Handle = string(s[0]) + data = s[1] + return +} diff --git a/src/pkg/gearman/clientjob.go b/src/pkg/gearman/clientjob.go deleted file mode 100644 index cde3bcc..0000000 --- a/src/pkg/gearman/clientjob.go +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2011 Xing Xing All rights reserved. -// Use of this source code is governed by a MIT -// license that can be found in the LICENSE file. - -package gearman - -import ( - "os" - "bytes" - // "log" -) - -// Client side job -type ClientJob struct { - Data []byte - Handle, UniqueId string - magicCode, DataType uint32 -} - -// Create a new job -func NewClientJob(magiccode, datatype uint32, data []byte) (job *ClientJob) { - return &ClientJob{magicCode: magiccode, - DataType: datatype, - Data: data} -} - -// Decode a job from byte slice -func DecodeClientJob(data []byte) (job *ClientJob, err os.Error) { - if len(data) < 12 { - err = os.NewError("Data length is too small.") - return - } - datatype := byteToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := byteToUint32([4]byte{data[8], data[9], data[10], data[11]}) - if len(data[12:]) != int(l) { - err = os.NewError("Invalid data length.") - return - } - data = data[12:] - job = NewClientJob(RES, datatype, data) - return -} - -// Encode a job to byte slice -func (job *ClientJob) Encode() (data []byte) { - magiccode := uint32ToByte(job.magicCode) - datatype := uint32ToByte(job.DataType) - data = make([]byte, 0, 1024*64) - data = append(data, magiccode[:]...) - data = append(data, datatype[:]...) - l := len(job.Data) - datalength := uint32ToByte(uint32(l)) - data = append(data, datalength[:]...) - data = append(data, job.Data...) - return -} - -// Extract the job's result. -func (job *ClientJob) Result() (data []byte, err os.Error) { - switch job.DataType { - case WORK_FAIL: - job.Handle = string(job.Data) - err = os.NewError("Work fail.") - return - case WORK_EXCEPTION: - err = os.NewError("Work exception.") - fallthrough - case WORK_COMPLETE: - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = os.NewError("Invalid data.") - return - } - job.Handle = string(s[0]) - data = s[1] - default: - err = os.NewError("The job is not a result.") - } - return -} - -// Extract the job's update -func (job *ClientJob) Update() (data []byte, err os.Error) { - if job.DataType != WORK_DATA && job.DataType != WORK_WARNING { - err = os.NewError("The job is not a update.") - return - } - s := bytes.SplitN(job.Data, []byte{'\x00'}, 2) - if len(s) != 2 { - err = os.NewError("Invalid data.") - return - } - if job.DataType == WORK_WARNING { - err = os.NewError("Work warning") - } - job.Handle = string(s[0]) - data = s[1] - return -} diff --git a/src/pkg/gearman/gearman.go b/src/pkg/gearman/gearman.go index 5d70d26..f8ae0c6 100644 --- a/src/pkg/gearman/gearman.go +++ b/src/pkg/gearman/gearman.go @@ -10,106 +10,106 @@ The protocol was implemented by native way. package gearman import ( - "bytes" - "os" + "bytes" + "os" ) const ( - // tcp4 is tested. You can modify this to 'tcp' for both ipv4 and ipv6, - // or 'tcp6' only for ipv6. - TCP = "tcp4" - // the number limited for job servers. - WORKER_SERVER_CAP = 32 - // the number limited for functions. - WORKER_FUNCTION_CAP = 512 - // queue size - QUEUE_CAP = 512 - // read buffer size - BUFFER_SIZE = 1024 + // tcp4 is tested. You can modify this to 'tcp' for both ipv4 and ipv6, + // or 'tcp6' only for ipv6. + TCP = "tcp4" + // the number limited for job servers. + WORKER_SERVER_CAP = 32 + // the number limited for functions. + WORKER_FUNCTION_CAP = 512 + // queue size + QUEUE_CAP = 512 + // read buffer size + BUFFER_SIZE = 1024 - // \x00REQ - REQ = 5391697 - REQ_STR = "\x00REQ" - // \x00RES - RES = 5391699 - RES_STR = "\x00RES" + // \x00REQ + REQ = 5391697 + REQ_STR = "\x00REQ" + // \x00RES + RES = 5391699 + RES_STR = "\x00RES" - // package data type - CAN_DO = 1 - CANT_DO = 2 - RESET_ABILITIES = 3 - PRE_SLEEP = 4 - NOOP = 6 - JOB_CREATED = 8 - GRAB_JOB = 9 - NO_JOB = 10 - JOB_ASSIGN = 11 - WORK_STATUS = 12 - WORK_COMPLETE = 13 - WORK_FAIL = 14 - GET_STATUS = 15 - ECHO_REQ = 16 - ECHO_RES = 17 - ERROR = 19 - STATUS_RES = 20 - SET_CLIENT_ID = 22 - CAN_DO_TIMEOUT = 23 - WORK_EXCEPTION = 25 - WORK_DATA = 28 - WORK_WARNING = 29 - GRAB_JOB_UNIQ = 30 - JOB_ASSIGN_UNIQ = 31 + // package data type + CAN_DO = 1 + CANT_DO = 2 + RESET_ABILITIES = 3 + PRE_SLEEP = 4 + NOOP = 6 + JOB_CREATED = 8 + GRAB_JOB = 9 + NO_JOB = 10 + JOB_ASSIGN = 11 + WORK_STATUS = 12 + WORK_COMPLETE = 13 + WORK_FAIL = 14 + GET_STATUS = 15 + ECHO_REQ = 16 + ECHO_RES = 17 + ERROR = 19 + STATUS_RES = 20 + SET_CLIENT_ID = 22 + CAN_DO_TIMEOUT = 23 + WORK_EXCEPTION = 25 + WORK_DATA = 28 + WORK_WARNING = 29 + GRAB_JOB_UNIQ = 30 + JOB_ASSIGN_UNIQ = 31 - SUBMIT_JOB = 7 - SUBMIT_JOB_BG = 18 - SUBMIT_JOB_HIGH = 21 - SUBMIT_JOB_HIGH_BG = 32 - SUBMIT_JOB_LOW = 33 - SUBMIT_JOB_LOW_BG = 34 + SUBMIT_JOB = 7 + SUBMIT_JOB_BG = 18 + SUBMIT_JOB_HIGH = 21 + SUBMIT_JOB_HIGH_BG = 32 + SUBMIT_JOB_LOW = 33 + SUBMIT_JOB_LOW_BG = 34 - // Job type - // JOB_NORMAL | JOB_BG means a normal level job run in background - // normal level - JOB_NORMAL = 0 - // background job - JOB_BG = 1 - // low level - JOB_LOW = 2 - // high level - JOB_HIGH = 4 + // Job type + // JOB_NORMAL | JOB_BG means a normal level job run in background + // normal level + JOB_NORMAL = 0 + // background job + JOB_BG = 1 + // low level + JOB_LOW = 2 + // high level + JOB_HIGH = 4 ) // No use type Job interface { - Encode() []byte + Encode() []byte } // Extract the error message func getError(data []byte) (eno os.Errno, err os.Error) { - rel := bytes.SplitN(data, []byte{'\x00'}, 2) - if len(rel) != 2 { - err = os.NewError("The input is not a error data.") - return - } - l := len(rel[0]) - eno = os.Errno(byteToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) - err = os.NewError(string(rel[1])) - return + rel := bytes.SplitN(data, []byte{'\x00'}, 2) + if len(rel) != 2 { + err = os.NewError("The input is not a error data.") + return + } + l := len(rel[0]) + eno = os.Errno(byteToUint32([4]byte{rel[0][l-4], rel[0][l-3], rel[0][l-2], rel[0][l-1]})) + err = os.NewError(string(rel[1])) + return } // Decode [4]byte to uint32 func byteToUint32(buf [4]byte) uint32 { - return uint32(buf[0])<<24 + - uint32(buf[1])<<16 + - uint32(buf[2])<<8 + - uint32(buf[3]) + return uint32(buf[0])<<24 + + uint32(buf[1])<<16 + + uint32(buf[2])<<8 + + uint32(buf[3]) } // Encode uint32 to [4]byte func uint32ToByte(i uint32) (data [4]byte) { - data[0] = byte((i >> 24) & 0xff) - data[1] = byte((i >> 16) & 0xff) - data[2] = byte((i >> 8) & 0xff) - data[3] = byte(i & 0xff) - return + data[0] = byte((i >> 24) & 0xff) + data[1] = byte((i >> 16) & 0xff) + data[2] = byte((i >> 8) & 0xff) + data[3] = byte(i & 0xff) + return } diff --git a/src/pkg/gearman/worker.go b/src/pkg/gearman/worker.go index 9ec8304..e29f779 100644 --- a/src/pkg/gearman/worker.go +++ b/src/pkg/gearman/worker.go @@ -5,10 +5,10 @@ package gearman import ( - "os" - "sync" - "bytes" - // "log" + "os" + "sync" + "bytes" + // "log" ) // The definition of the callback function. @@ -35,51 +35,51 @@ func foobar(job *WorkerJob) (data []byte, err os.Error) { } */ type Worker struct { - clients []*jobClient - functions JobFunctionMap + clients []*jobClient + functions JobFunctionMap - running bool - incoming chan *WorkerJob - mutex sync.Mutex - JobQueue chan *WorkerJob - ErrQueue chan os.Error + running bool + incoming chan *WorkerJob + mutex sync.Mutex + JobQueue chan *WorkerJob + ErrQueue chan os.Error } // Get a new worker func NewWorker() (worker *Worker) { - worker = &Worker{ - // job server list - clients: make([]*jobClient, 0, WORKER_SERVER_CAP), - // function list - functions: make(JobFunctionMap), - incoming: make(chan *WorkerJob, QUEUE_CAP), - JobQueue: make(chan *WorkerJob, QUEUE_CAP), - ErrQueue: make(chan os.Error, QUEUE_CAP), - running: true, - } - return + worker = &Worker{ + // job server list + clients: make([]*jobClient, 0, WORKER_SERVER_CAP), + // function list + functions: make(JobFunctionMap), + incoming: make(chan *WorkerJob, QUEUE_CAP), + JobQueue: make(chan *WorkerJob, QUEUE_CAP), + ErrQueue: make(chan os.Error, QUEUE_CAP), + running: true, + } + return } // Add a server. The addr should be 'host:port' format. // The connection is established at this time. func (worker *Worker) AddServer(addr string) (err os.Error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() + worker.mutex.Lock() + defer worker.mutex.Unlock() - if len(worker.clients) == cap(worker.clients) { - return os.NewError("There were too many clients.") - } + if len(worker.clients) == cap(worker.clients) { + return os.NewError("There were too many clients.") + } - // Create a new job server's client as a agent of server - server, err := newJobClient(addr, worker) - if err != nil { - return err - } + // Create a new job server's client as a agent of server + server, err := newJobClient(addr, worker) + if err != nil { + return err + } - n := len(worker.clients) - worker.clients = worker.clients[0 : n+1] - worker.clients[n] = server - return + n := len(worker.clients) + worker.clients = worker.clients[0 : n+1] + worker.clients[n] = server + return } // Add a function. @@ -87,72 +87,72 @@ func (worker *Worker) AddServer(addr string) (err os.Error) { // The API will tell every connected job server that 'I can do this' func (worker *Worker) AddFunction(funcname string, f JobFunction, timeout uint32) (err os.Error) { - if len(worker.clients) < 1 { - return os.NewError("Did not connect to Job Server.") - } - worker.mutex.Lock() - defer worker.mutex.Unlock() - worker.functions[funcname] = f + if len(worker.clients) < 1 { + return os.NewError("Did not connect to Job Server.") + } + worker.mutex.Lock() + defer worker.mutex.Unlock() + worker.functions[funcname] = f - var datatype uint32 - var data []byte - if timeout == 0 { - datatype = CAN_DO - data = []byte(funcname) - } else { - datatype = CAN_DO_TIMEOUT - data = []byte(funcname + "\x00") - t := uint32ToByte(timeout) - data = append(data, t[:]...) - } - job := NewWorkerJob(REQ, datatype, data) - worker.WriteJob(job) - return + var datatype uint32 + var data []byte + if timeout == 0 { + datatype = CAN_DO + data = []byte(funcname) + } else { + datatype = CAN_DO_TIMEOUT + data = []byte(funcname + "\x00") + t := uint32ToByte(timeout) + data = append(data, t[:]...) + } + job := NewWorkerJob(REQ, datatype, data) + worker.WriteJob(job) + return } // Remove a function. // Tell job servers 'I can not do this now' at the same time. func (worker *Worker) RemoveFunction(funcname string) (err os.Error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() + worker.mutex.Lock() + defer worker.mutex.Unlock() - if worker.functions[funcname] == nil { - return os.NewError("No function named: " + funcname) - } - worker.functions[funcname] = nil, false - job := NewWorkerJob(REQ, CANT_DO, []byte(funcname)) - worker.WriteJob(job) - return + if worker.functions[funcname] == nil { + return os.NewError("No function named: " + funcname) + } + worker.functions[funcname] = nil, false + job := NewWorkerJob(REQ, CANT_DO, []byte(funcname)) + worker.WriteJob(job) + return } // Main loop func (worker *Worker) Work() { - for _, v := range worker.clients { - go v.Work() - } - for worker.running { - select { - case job := <-worker.incoming: - if job == nil { - break - } - switch job.DataType { - case NO_JOB: - // do nothing - case ERROR: - _, err := getError(job.Data) - worker.ErrQueue <- err - case JOB_ASSIGN, JOB_ASSIGN_UNIQ: - go func() { - if err := worker.exec(job); err != nil { - worker.ErrQueue <- err - } - }() - default: - worker.JobQueue <- job - } - } - } + for _, v := range worker.clients { + go v.Work() + } + for worker.running { + select { + case job := <-worker.incoming: + if job == nil { + break + } + switch job.DataType { + case NO_JOB: + // do nothing + case ERROR: + _, err := getError(job.Data) + worker.ErrQueue <- err + case JOB_ASSIGN, JOB_ASSIGN_UNIQ: + go func() { + if err := worker.exec(job); err != nil { + worker.ErrQueue <- err + } + }() + default: + worker.JobQueue <- job + } + } + } } // Get the last job in queue. @@ -160,98 +160,98 @@ func (worker *Worker) Work() { // the last one will be returned, // the others will be lost. func (worker *Worker) LastJob() (job *WorkerJob) { - if l := len(worker.JobQueue); l != 1 { - if l == 0 { - return - } - for i := 0; i < l-1; i++ { - <-worker.JobQueue - } - } - return <-worker.JobQueue + if l := len(worker.JobQueue); l != 1 { + if l == 0 { + return + } + for i := 0; i < l-1; i++ { + <-worker.JobQueue + } + } + return <-worker.JobQueue } // Close. func (worker *Worker) Close() (err os.Error) { - worker.running = false - for _, v := range worker.clients { - err = v.Close() - } - close(worker.incoming) - return err + worker.running = false + for _, v := range worker.clients { + err = v.Close() + } + close(worker.incoming) + return err } // Write a job to job server. // Here, the job's mean is not the oraginal mean. // Just looks like a network package for job's result or tell job server, there was a fail. func (worker *Worker) WriteJob(job *WorkerJob) (err os.Error) { - e := make(chan os.Error) - for _, v := range worker.clients { - go func() { - e <- v.WriteJob(job) - }() - } - return <-e + e := make(chan os.Error) + for _, v := range worker.clients { + go func() { + e <- v.WriteJob(job) + }() + } + return <-e } // Send a something out, get the samething back. func (worker *Worker) Echo(data []byte) (err os.Error) { - job := NewWorkerJob(REQ, ECHO_REQ, data) - return worker.WriteJob(job) + job := NewWorkerJob(REQ, ECHO_REQ, data) + return worker.WriteJob(job) } // Remove all of functions. // Both from the worker or job servers. func (worker *Worker) Reset() (err os.Error) { - job := NewWorkerJob(REQ, RESET_ABILITIES, nil) - err = worker.WriteJob(job) - worker.functions = make(JobFunctionMap) - return + job := NewWorkerJob(REQ, RESET_ABILITIES, nil) + err = worker.WriteJob(job) + worker.functions = make(JobFunctionMap) + return } // Set the worker's unique id. func (worker *Worker) SetId(id string) (err os.Error) { - job := NewWorkerJob(REQ, SET_CLIENT_ID, []byte(id)) - return worker.WriteJob(job) + job := NewWorkerJob(REQ, SET_CLIENT_ID, []byte(id)) + return worker.WriteJob(job) } // Execute the job. And send back the result. func (worker *Worker) exec(job *WorkerJob) (err os.Error) { - var limit int - if job.DataType == JOB_ASSIGN { - limit = 3 - } else { - limit = 4 - } - jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit) - job.Handle = string(jobdata[0]) - funcname := string(jobdata[1]) - if job.DataType == JOB_ASSIGN { - job.Data = jobdata[2] - } else { - job.UniqueId = string(jobdata[2]) - job.Data = jobdata[3] - } - f := worker.functions[funcname] - if f == nil { - return os.NewError("function is nil") - } - result, err := f(job) - var datatype uint32 - if err == nil { - datatype = WORK_COMPLETE - } else { - if result == nil { - datatype = WORK_FAIL - } else { - datatype = WORK_EXCEPTION - } - } + var limit int + if job.DataType == JOB_ASSIGN { + limit = 3 + } else { + limit = 4 + } + jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit) + job.Handle = string(jobdata[0]) + funcname := string(jobdata[1]) + if job.DataType == JOB_ASSIGN { + job.Data = jobdata[2] + } else { + job.UniqueId = string(jobdata[2]) + job.Data = jobdata[3] + } + f := worker.functions[funcname] + if f == nil { + return os.NewError("function is nil") + } + result, err := f(job) + var datatype uint32 + if err == nil { + datatype = WORK_COMPLETE + } else { + if result == nil { + datatype = WORK_FAIL + } else { + datatype = WORK_EXCEPTION + } + } - job.magicCode = REQ - job.DataType = datatype - job.Data = result + job.magicCode = REQ + job.DataType = datatype + job.Data = result - worker.WriteJob(job) - return + worker.WriteJob(job) + return } diff --git a/src/pkg/gearman/workerjob.go b/src/pkg/gearman/worker/job.go similarity index 100% rename from src/pkg/gearman/workerjob.go rename to src/pkg/gearman/worker/job.go diff --git a/src/pkg/gearman/workerjobclient.go b/src/pkg/gearman/worker/jobclient.go similarity index 100% rename from src/pkg/gearman/workerjobclient.go rename to src/pkg/gearman/worker/jobclient.go diff --git a/src/pkg/gearman/worker_test.go b/src/pkg/gearman/worker_test.go index 5aa6288..9956190 100644 --- a/src/pkg/gearman/worker_test.go +++ b/src/pkg/gearman/worker_test.go @@ -27,7 +27,6 @@ func foobar(job *WorkerJob) ([]byte, os.Error) { return nil, nil } - func TestWorkerAddFunction(t *testing.T) { if err := worker.AddFunction("foobar", foobar, 0); err != nil { t.Error(err)