diff --git a/worker/agent.go b/worker/agent.go index 22f8781..8dd0ba6 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -21,7 +21,7 @@ func newAgent(net, addr string, worker *Worker) (a *agent, err error) { net: net, addr: addr, worker: worker, - in: make(chan []byte, QUEUE_SIZE), + in: make(chan []byte, queueSize), } return } @@ -43,7 +43,7 @@ func (a *agent) work() { var err error var data, leftdata []byte for { - if data, err = a.read(BUFFER_SIZE); err != nil { + if data, err = a.read(bufferSize); err != nil { a.worker.err(err) if err == ErrLostConn { break @@ -60,7 +60,7 @@ func (a *agent) work() { if len(leftdata) > 0 { // some data left for processing data = append(leftdata, data...) } - if len(data) < MIN_PACKET_LEN { // not enough data + if len(data) < minPacketLength { // not enough data leftdata = data continue } @@ -90,7 +90,7 @@ func (a *agent) Grab() { a.Lock() defer a.Unlock() outpack := getOutPack() - outpack.dataType = GRAB_JOB_UNIQ + outpack.dataType = dtGrabJobUniq a.write(outpack) } @@ -98,16 +98,16 @@ func (a *agent) PreSleep() { a.Lock() defer a.Unlock() outpack := getOutPack() - outpack.dataType = PRE_SLEEP + outpack.dataType = dtPreSleep a.write(outpack) } // read length bytes from the socket func (a *agent) read(length int) (data []byte, err error) { n := 0 - buf := getBuffer(BUFFER_SIZE) + buf := getBuffer(bufferSize) // read until data can be unpacked - for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n { + for i := length; i > 0 || len(data) < minPacketLength; i -= n { if n, err = a.conn.Read(buf); err != nil { if err == io.EOF { err = ErrLostConn @@ -115,7 +115,7 @@ func (a *agent) read(length int) (data []byte, err error) { return } data = append(data, buf[0:n]...) - if n < BUFFER_SIZE { + if n < bufferSize { break } } diff --git a/worker/common.go b/worker/common.go index 47d69d3..99c0e6d 100644 --- a/worker/common.go +++ b/worker/common.go @@ -1,54 +1,47 @@ package worker const ( - NETWORK = "tcp" + Network = "tcp" // queue size - QUEUE_SIZE = 8 + queueSize = 8 // read buffer size - BUFFER_SIZE = 1024 + bufferSize = 1024 // min packet length - MIN_PACKET_LEN = 12 + minPacketLength = 12 // \x00REQ - REQ = 5391697 - REQ_STR = "\x00REQ" + req = 5391697 + reqStr = "\x00REQ" // \x00RES - RES = 5391699 - RES_STR = "\x00RES" + res = 5391699 + resStr = "\x00RES" // package data type - CAN_DO = 1 - CANT_DO = 2 - RESET_ABILITIES = 3 - PRE_SLEEP = 4 - NOOP = 6 - JOB_CREATED = 8 - GRAB_JOB = 9 - NO_JOB = 10 - JOB_ASSIGN = 11 - WORK_STATUS = 12 - WORK_COMPLETE = 13 - WORK_FAIL = 14 - GET_STATUS = 15 - ECHO_REQ = 16 - ECHO_RES = 17 - ERROR = 19 - STATUS_RES = 20 - SET_CLIENT_ID = 22 - CAN_DO_TIMEOUT = 23 - ALL_YOURS = 24 - WORK_EXCEPTION = 25 - WORK_DATA = 28 - WORK_WARNING = 29 - GRAB_JOB_UNIQ = 30 - JOB_ASSIGN_UNIQ = 31 - - SUBMIT_JOB = 7 - SUBMIT_JOB_BG = 18 - SUBMIT_JOB_HIGH = 21 - SUBMIT_JOB_HIGH_BG = 32 - SUBMIT_JOB_LOW = 33 - SUBMIT_JOB_LOW_BG = 34 + dtCanDo = 1 + dtCantDo = 2 + dtResetAbilities = 3 + dtPreSleep = 4 + dtNoop = 6 + dtJobCreated = 8 + dtGrabJob = 9 + dtNoJob = 10 + dtJobAssign = 11 + dtWorkStatus = 12 + dtWorkComplete = 13 + dtWorkFail = 14 + dtGetStatus = 15 + dtEchoReq = 16 + dtEchoRes = 17 + dtError = 19 + dtStatusRes = 20 + dtSetClientId = 22 + dtCanDoTimeout = 23 + dtAllYours = 24 + dtWorkException = 25 + dtWorkData = 28 + dtWorkWarning = 29 + dtGrabJobUniq = 30 + dtJobAssignUniq = 31 ) func getBuffer(l int) (buf []byte) { diff --git a/worker/error.go b/worker/error.go index ceade3e..c25a6d3 100644 --- a/worker/error.go +++ b/worker/error.go @@ -15,7 +15,7 @@ var ( ) // Extract the error message -func GetError(data []byte) (err error) { +func getError(data []byte) (err error) { rel := bytes.SplitN(data, []byte{'\x00'}, 2) if len(rel) != 2 { err = fmt.Errorf("Not a error data: %V", data) diff --git a/worker/example_test.go b/worker/example_test.go index f317783..52032e4 100644 --- a/worker/example_test.go +++ b/worker/example_test.go @@ -11,7 +11,7 @@ func ExampleWorker() { w := worker.New(worker.Unlimited) defer w.Close() // Add a gearman job server - if err := w.AddServer("tcp4", "127.0.0.1:4730"); err != nil { + if err := w.AddServer(worker.NETWORK, "127.0.0.1:4730"); err != nil { fmt.Println(err) return } @@ -26,7 +26,7 @@ func ExampleWorker() { return } var wg sync.WaitGroup - // A custome handler, for handling other results, eg. ECHO, ERROR. + // A custome handler, for handling other results, eg. ECHO, dtError. w.JobHandler = func(job worker.Job) error { if job.Err() == nil { fmt.Println(string(job.Data())) diff --git a/worker/func.go b/worker/func.go index 20ec910..22ad9f5 100644 --- a/worker/func.go +++ b/worker/func.go @@ -17,7 +17,7 @@ type jobFunc struct { } // Map for added function. -type JobFuncs map[string]*jobFunc +type jobFuncs map[string]*jobFunc type systemInfo struct { GOOS, GOARCH, GOROOT, Version string diff --git a/worker/inpack.go b/worker/inpack.go index c6edc9c..68d9928 100644 --- a/worker/inpack.go +++ b/worker/inpack.go @@ -25,8 +25,8 @@ func (inpack *inPack) Data() []byte { } func (inpack *inPack) Err() error { - if inpack.dataType == ERROR { - return GetError(inpack.data) + if inpack.dataType == dtError { + return getError(inpack.data) } return nil } @@ -35,7 +35,7 @@ func (inpack *inPack) Err() error { // Using this in a job's executing. func (inpack *inPack) SendData(data []byte) { outpack := getOutPack() - outpack.dataType = WORK_DATA + outpack.dataType = dtWorkData hl := len(inpack.handle) l := hl + len(data) + 1 outpack.data = getBuffer(l) @@ -46,7 +46,7 @@ func (inpack *inPack) SendData(data []byte) { func (inpack *inPack) SendWarning(data []byte) { outpack := getOutPack() - outpack.dataType = WORK_WARNING + outpack.dataType = dtWorkWarning hl := len(inpack.handle) l := hl + len(data) + 1 outpack.data = getBuffer(l) @@ -61,7 +61,7 @@ func (inpack *inPack) UpdateStatus(numerator, denominator int) { n := []byte(strconv.Itoa(numerator)) d := []byte(strconv.Itoa(denominator)) outpack := getOutPack() - outpack.dataType = WORK_STATUS + outpack.dataType = dtWorkStatus hl := len(inpack.handle) nl := len(n) dl := len(d) @@ -74,12 +74,12 @@ func (inpack *inPack) UpdateStatus(numerator, denominator int) { // Decode job from byte slice func decodeInPack(data []byte) (inpack *inPack, l int, err error) { - if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes + if len(data) < minPacketLength { // valid package should not less 12 bytes err = fmt.Errorf("Invalid data: %V", data) return } dl := int(binary.BigEndian.Uint32(data[8:12])) - dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN] + dt := data[minPacketLength : dl+minPacketLength] if len(dt) != int(dl) { // length not equal err = fmt.Errorf("Invalid data: %V", data) return @@ -87,14 +87,14 @@ func decodeInPack(data []byte) (inpack *inPack, l int, err error) { inpack = getInPack() inpack.dataType = binary.BigEndian.Uint32(data[4:8]) switch inpack.dataType { - case JOB_ASSIGN: + case dtJobAssign: s := bytes.SplitN(dt, []byte{'\x00'}, 3) if len(s) == 3 { inpack.handle = string(s[0]) inpack.fn = string(s[1]) inpack.data = s[2] } - case JOB_ASSIGN_UNIQ: + case dtJobAssignUniq: s := bytes.SplitN(dt, []byte{'\x00'}, 4) if len(s) == 4 { inpack.handle = string(s[0]) @@ -105,6 +105,6 @@ func decodeInPack(data []byte) (inpack *inPack, l int, err error) { default: inpack.data = dt } - l = dl + MIN_PACKET_LEN + l = dl + minPacketLength return } diff --git a/worker/inpack_test.go b/worker/inpack_test.go index 75b9ba5..4ae978e 100644 --- a/worker/inpack_test.go +++ b/worker/inpack_test.go @@ -7,19 +7,19 @@ import ( var ( inpackcases = map[uint32]map[string]string{ - NOOP: map[string]string{ + noop: map[string]string{ "src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00", }, - NO_JOB: map[string]string{ + noJob: map[string]string{ "src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00", }, - JOB_ASSIGN: map[string]string{ + jobAssign: map[string]string{ "src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz", "handle": "a", "fn": "b", "data": "xyz", }, - JOB_ASSIGN_UNIQ: map[string]string{ + jobAssign_UNIQ: map[string]string{ "src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz", "handle": "a", "fn": "b", diff --git a/worker/outpack.go b/worker/outpack.go index 0a9ba6e..1022cda 100644 --- a/worker/outpack.go +++ b/worker/outpack.go @@ -19,7 +19,7 @@ func getOutPack() (outpack *outPack) { // Encode a job to byte slice func (outpack *outPack) Encode() (data []byte) { var l int - if outpack.dataType == WORK_FAIL { + if outpack.dataType == dtWorkFail { l = len(outpack.handle) } else { l = len(outpack.data) @@ -27,20 +27,20 @@ func (outpack *outPack) Encode() (data []byte) { l += len(outpack.handle) + 1 } } - data = getBuffer(l + MIN_PACKET_LEN) - binary.BigEndian.PutUint32(data[:4], REQ) + data = getBuffer(l + minPacketLength) + binary.BigEndian.PutUint32(data[:4], req) binary.BigEndian.PutUint32(data[4:8], outpack.dataType) - binary.BigEndian.PutUint32(data[8:MIN_PACKET_LEN], uint32(l)) - i := MIN_PACKET_LEN + binary.BigEndian.PutUint32(data[8:minPacketLength], uint32(l)) + i := minPacketLength if outpack.handle != "" { hi := len(outpack.handle) + i copy(data[i:hi], []byte(outpack.handle)) - if outpack.dataType != WORK_FAIL { + if outpack.dataType != dtWorkFail { data[hi] = '\x00' } i = hi + 1 } - if outpack.dataType != WORK_FAIL { + if outpack.dataType != dtWorkFail { copy(data[i:], outpack.data) } return diff --git a/worker/outpack_test.go b/worker/outpack_test.go index dca1d8e..65ba330 100644 --- a/worker/outpack_test.go +++ b/worker/outpack_test.go @@ -7,22 +7,22 @@ import ( var ( outpackcases = map[uint32]map[string]string{ - CAN_DO: map[string]string{ + canDo: map[string]string{ "src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a", "data": "a", }, - CAN_DO_TIMEOUT: map[string]string{ + canDo_TIMEOUT: map[string]string{ "src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01", "data": "a\x00\x00\x00\x00\x01", }, - CANT_DO: map[string]string{ + cantDo: map[string]string{ "src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a", "data": "a", }, - RESET_ABILITIES: map[string]string{ + resetAbilities: map[string]string{ "src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00", }, - PRE_SLEEP: map[string]string{ + preSleep: map[string]string{ "src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00", }, GRAB_JOB: map[string]string{ @@ -39,15 +39,15 @@ var ( "src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b", "data": "a\x00b", }, - WORK_STATUS: map[string]string{ + workStatus: map[string]string{ "src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100", "data": "a\x0050\x00100", }, - WORK_COMPLETE: map[string]string{ + workComplete: map[string]string{ "src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b", "data": "a\x00b", }, - WORK_FAIL: map[string]string{ + workFail: map[string]string{ "src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a", "handle": "a", }, @@ -55,7 +55,7 @@ var ( "src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b", "data": "a\x00b", }, - SET_CLIENT_ID: map[string]string{ + dtSetClientId: map[string]string{ "src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a", "data": "a", }, diff --git a/worker/worker.go b/worker/worker.go index 478f00f..f6cdb13 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -21,7 +21,7 @@ const ( type Worker struct { sync.Mutex agents []*agent - funcs JobFuncs + funcs jobFuncs in chan *inPack running bool @@ -41,8 +41,8 @@ type Worker struct { func New(limit int) (worker *Worker) { worker = &Worker{ agents: make([]*agent, 0, limit), - funcs: make(JobFuncs), - in: make(chan *inPack, QUEUE_SIZE), + funcs: make(jobFuncs), + in: make(chan *inPack, queueSize), } if limit != Unlimited { worker.limit = make(chan bool, limit-1) @@ -97,10 +97,10 @@ func (worker *Worker) AddFunc(funcname string, func (worker *Worker) addFunc(funcname string, timeout uint32) { outpack := getOutPack() if timeout == 0 { - outpack.dataType = CAN_DO + outpack.dataType = dtCanDo outpack.data = []byte(funcname) } else { - outpack.dataType = CAN_DO_TIMEOUT + outpack.dataType = dtCanDoTimeout l := len(funcname) outpack.data = getBuffer(l + 5) copy(outpack.data, []byte(funcname)) @@ -127,7 +127,7 @@ func (worker *Worker) RemoveFunc(funcname string) (err error) { // inner remove func (worker *Worker) removeFunc(funcname string) { outpack := getOutPack() - outpack.dataType = CANT_DO + outpack.dataType = dtCantDo outpack.data = []byte(funcname) worker.broadcast(outpack) } @@ -135,11 +135,11 @@ func (worker *Worker) removeFunc(funcname string) { // inner package handling func (worker *Worker) handleInPack(inpack *inPack) { switch inpack.dataType { - case NO_JOB: + case dtNoJob: inpack.a.PreSleep() - case NOOP: + case dtNoop: inpack.a.Grab() - case JOB_ASSIGN, JOB_ASSIGN_UNIQ: + case dtJobAssign, dtJobAssignUniq: go func() { if err := worker.exec(inpack); err != nil { worker.err(err) @@ -149,10 +149,10 @@ func (worker *Worker) handleInPack(inpack *inPack) { worker.limit <- true } inpack.a.Grab() - case ERROR: + case dtError: worker.err(inpack.Err()) fallthrough - case ECHO_RES: + case dtEchoRes: fallthrough default: worker.customeHandler(inpack) @@ -219,7 +219,7 @@ func (worker *Worker) Close() { // Echo func (worker *Worker) Echo(data []byte) { outpack := getOutPack() - outpack.dataType = ECHO_REQ + outpack.dataType = dtEchoReq outpack.data = data worker.broadcast(outpack) } @@ -228,16 +228,16 @@ func (worker *Worker) Echo(data []byte) { // Both from the worker and job servers. func (worker *Worker) Reset() { outpack := getOutPack() - outpack.dataType = RESET_ABILITIES + outpack.dataType = dtResetAbilities worker.broadcast(outpack) - worker.funcs = make(JobFuncs) + worker.funcs = make(jobFuncs) } // Set the worker's unique id. func (worker *Worker) SetId(id string) { worker.Id = id outpack := getOutPack() - outpack.dataType = SET_CLIENT_ID + outpack.dataType = dtSetClientId outpack.data = []byte(id) worker.broadcast(outpack) } @@ -270,12 +270,12 @@ func (worker *Worker) exec(inpack *inPack) (err error) { if worker.running { outpack := getOutPack() if r.err == nil { - outpack.dataType = WORK_COMPLETE + outpack.dataType = dtWorkComplete } else { if len(r.data) == 0 { - outpack.dataType = WORK_FAIL + outpack.dataType = dtWorkFail } else { - outpack.dataType = WORK_EXCEPTION + outpack.dataType = dtWorkException } err = r.err }