Something should not display in documents, disabled.

This commit is contained in:
Xing Xing 2013-12-26 12:32:27 +08:00
parent 18791d0490
commit 59941371fb
10 changed files with 93 additions and 100 deletions

View File

@ -21,7 +21,7 @@ func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
net: net, net: net,
addr: addr, addr: addr,
worker: worker, worker: worker,
in: make(chan []byte, QUEUE_SIZE), in: make(chan []byte, queueSize),
} }
return return
} }
@ -43,7 +43,7 @@ func (a *agent) work() {
var err error var err error
var data, leftdata []byte var data, leftdata []byte
for { for {
if data, err = a.read(BUFFER_SIZE); err != nil { if data, err = a.read(bufferSize); err != nil {
a.worker.err(err) a.worker.err(err)
if err == ErrLostConn { if err == ErrLostConn {
break break
@ -60,7 +60,7 @@ func (a *agent) work() {
if len(leftdata) > 0 { // some data left for processing if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...) data = append(leftdata, data...)
} }
if len(data) < MIN_PACKET_LEN { // not enough data if len(data) < minPacketLength { // not enough data
leftdata = data leftdata = data
continue continue
} }
@ -90,7 +90,7 @@ func (a *agent) Grab() {
a.Lock() a.Lock()
defer a.Unlock() defer a.Unlock()
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = GRAB_JOB_UNIQ outpack.dataType = dtGrabJobUniq
a.write(outpack) a.write(outpack)
} }
@ -98,16 +98,16 @@ func (a *agent) PreSleep() {
a.Lock() a.Lock()
defer a.Unlock() defer a.Unlock()
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = PRE_SLEEP outpack.dataType = dtPreSleep
a.write(outpack) a.write(outpack)
} }
// read length bytes from the socket // read length bytes from the socket
func (a *agent) read(length int) (data []byte, err error) { func (a *agent) read(length int) (data []byte, err error) {
n := 0 n := 0
buf := getBuffer(BUFFER_SIZE) buf := getBuffer(bufferSize)
// read until data can be unpacked // read until data can be unpacked
for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n { for i := length; i > 0 || len(data) < minPacketLength; i -= n {
if n, err = a.conn.Read(buf); err != nil { if n, err = a.conn.Read(buf); err != nil {
if err == io.EOF { if err == io.EOF {
err = ErrLostConn err = ErrLostConn
@ -115,7 +115,7 @@ func (a *agent) read(length int) (data []byte, err error) {
return return
} }
data = append(data, buf[0:n]...) data = append(data, buf[0:n]...)
if n < BUFFER_SIZE { if n < bufferSize {
break break
} }
} }

View File

@ -1,54 +1,47 @@
package worker package worker
const ( const (
NETWORK = "tcp" Network = "tcp"
// queue size // queue size
QUEUE_SIZE = 8 queueSize = 8
// read buffer size // read buffer size
BUFFER_SIZE = 1024 bufferSize = 1024
// min packet length // min packet length
MIN_PACKET_LEN = 12 minPacketLength = 12
// \x00REQ // \x00REQ
REQ = 5391697 req = 5391697
REQ_STR = "\x00REQ" reqStr = "\x00REQ"
// \x00RES // \x00RES
RES = 5391699 res = 5391699
RES_STR = "\x00RES" resStr = "\x00RES"
// package data type // package data type
CAN_DO = 1 dtCanDo = 1
CANT_DO = 2 dtCantDo = 2
RESET_ABILITIES = 3 dtResetAbilities = 3
PRE_SLEEP = 4 dtPreSleep = 4
NOOP = 6 dtNoop = 6
JOB_CREATED = 8 dtJobCreated = 8
GRAB_JOB = 9 dtGrabJob = 9
NO_JOB = 10 dtNoJob = 10
JOB_ASSIGN = 11 dtJobAssign = 11
WORK_STATUS = 12 dtWorkStatus = 12
WORK_COMPLETE = 13 dtWorkComplete = 13
WORK_FAIL = 14 dtWorkFail = 14
GET_STATUS = 15 dtGetStatus = 15
ECHO_REQ = 16 dtEchoReq = 16
ECHO_RES = 17 dtEchoRes = 17
ERROR = 19 dtError = 19
STATUS_RES = 20 dtStatusRes = 20
SET_CLIENT_ID = 22 dtSetClientId = 22
CAN_DO_TIMEOUT = 23 dtCanDoTimeout = 23
ALL_YOURS = 24 dtAllYours = 24
WORK_EXCEPTION = 25 dtWorkException = 25
WORK_DATA = 28 dtWorkData = 28
WORK_WARNING = 29 dtWorkWarning = 29
GRAB_JOB_UNIQ = 30 dtGrabJobUniq = 30
JOB_ASSIGN_UNIQ = 31 dtJobAssignUniq = 31
SUBMIT_JOB = 7
SUBMIT_JOB_BG = 18
SUBMIT_JOB_HIGH = 21
SUBMIT_JOB_HIGH_BG = 32
SUBMIT_JOB_LOW = 33
SUBMIT_JOB_LOW_BG = 34
) )
func getBuffer(l int) (buf []byte) { func getBuffer(l int) (buf []byte) {

View File

@ -15,7 +15,7 @@ var (
) )
// Extract the error message // Extract the error message
func GetError(data []byte) (err error) { func getError(data []byte) (err error) {
rel := bytes.SplitN(data, []byte{'\x00'}, 2) rel := bytes.SplitN(data, []byte{'\x00'}, 2)
if len(rel) != 2 { if len(rel) != 2 {
err = fmt.Errorf("Not a error data: %V", data) err = fmt.Errorf("Not a error data: %V", data)

View File

@ -11,7 +11,7 @@ func ExampleWorker() {
w := worker.New(worker.Unlimited) w := worker.New(worker.Unlimited)
defer w.Close() defer w.Close()
// Add a gearman job server // Add a gearman job server
if err := w.AddServer("tcp4", "127.0.0.1:4730"); err != nil { if err := w.AddServer(worker.NETWORK, "127.0.0.1:4730"); err != nil {
fmt.Println(err) fmt.Println(err)
return return
} }
@ -26,7 +26,7 @@ func ExampleWorker() {
return return
} }
var wg sync.WaitGroup var wg sync.WaitGroup
// A custome handler, for handling other results, eg. ECHO, ERROR. // A custome handler, for handling other results, eg. ECHO, dtError.
w.JobHandler = func(job worker.Job) error { w.JobHandler = func(job worker.Job) error {
if job.Err() == nil { if job.Err() == nil {
fmt.Println(string(job.Data())) fmt.Println(string(job.Data()))

View File

@ -17,7 +17,7 @@ type jobFunc struct {
} }
// Map for added function. // Map for added function.
type JobFuncs map[string]*jobFunc type jobFuncs map[string]*jobFunc
type systemInfo struct { type systemInfo struct {
GOOS, GOARCH, GOROOT, Version string GOOS, GOARCH, GOROOT, Version string

View File

@ -25,8 +25,8 @@ func (inpack *inPack) Data() []byte {
} }
func (inpack *inPack) Err() error { func (inpack *inPack) Err() error {
if inpack.dataType == ERROR { if inpack.dataType == dtError {
return GetError(inpack.data) return getError(inpack.data)
} }
return nil return nil
} }
@ -35,7 +35,7 @@ func (inpack *inPack) Err() error {
// Using this in a job's executing. // Using this in a job's executing.
func (inpack *inPack) SendData(data []byte) { func (inpack *inPack) SendData(data []byte) {
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = WORK_DATA outpack.dataType = dtWorkData
hl := len(inpack.handle) hl := len(inpack.handle)
l := hl + len(data) + 1 l := hl + len(data) + 1
outpack.data = getBuffer(l) outpack.data = getBuffer(l)
@ -46,7 +46,7 @@ func (inpack *inPack) SendData(data []byte) {
func (inpack *inPack) SendWarning(data []byte) { func (inpack *inPack) SendWarning(data []byte) {
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = WORK_WARNING outpack.dataType = dtWorkWarning
hl := len(inpack.handle) hl := len(inpack.handle)
l := hl + len(data) + 1 l := hl + len(data) + 1
outpack.data = getBuffer(l) outpack.data = getBuffer(l)
@ -61,7 +61,7 @@ func (inpack *inPack) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator)) n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator)) d := []byte(strconv.Itoa(denominator))
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = WORK_STATUS outpack.dataType = dtWorkStatus
hl := len(inpack.handle) hl := len(inpack.handle)
nl := len(n) nl := len(n)
dl := len(d) dl := len(d)
@ -74,12 +74,12 @@ func (inpack *inPack) UpdateStatus(numerator, denominator int) {
// Decode job from byte slice // Decode job from byte slice
func decodeInPack(data []byte) (inpack *inPack, l int, err error) { func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes if len(data) < minPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %V", data) err = fmt.Errorf("Invalid data: %V", data)
return return
} }
dl := int(binary.BigEndian.Uint32(data[8:12])) dl := int(binary.BigEndian.Uint32(data[8:12]))
dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN] dt := data[minPacketLength : dl+minPacketLength]
if len(dt) != int(dl) { // length not equal if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %V", data) err = fmt.Errorf("Invalid data: %V", data)
return return
@ -87,14 +87,14 @@ func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
inpack = getInPack() inpack = getInPack()
inpack.dataType = binary.BigEndian.Uint32(data[4:8]) inpack.dataType = binary.BigEndian.Uint32(data[4:8])
switch inpack.dataType { switch inpack.dataType {
case JOB_ASSIGN: case dtJobAssign:
s := bytes.SplitN(dt, []byte{'\x00'}, 3) s := bytes.SplitN(dt, []byte{'\x00'}, 3)
if len(s) == 3 { if len(s) == 3 {
inpack.handle = string(s[0]) inpack.handle = string(s[0])
inpack.fn = string(s[1]) inpack.fn = string(s[1])
inpack.data = s[2] inpack.data = s[2]
} }
case JOB_ASSIGN_UNIQ: case dtJobAssignUniq:
s := bytes.SplitN(dt, []byte{'\x00'}, 4) s := bytes.SplitN(dt, []byte{'\x00'}, 4)
if len(s) == 4 { if len(s) == 4 {
inpack.handle = string(s[0]) inpack.handle = string(s[0])
@ -105,6 +105,6 @@ func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
default: default:
inpack.data = dt inpack.data = dt
} }
l = dl + MIN_PACKET_LEN l = dl + minPacketLength
return return
} }

View File

@ -7,19 +7,19 @@ import (
var ( var (
inpackcases = map[uint32]map[string]string{ inpackcases = map[uint32]map[string]string{
NOOP: map[string]string{ noop: map[string]string{
"src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00", "src": "\x00RES\x00\x00\x00\x06\x00\x00\x00\x00",
}, },
NO_JOB: map[string]string{ noJob: map[string]string{
"src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00", "src": "\x00RES\x00\x00\x00\x0a\x00\x00\x00\x00",
}, },
JOB_ASSIGN: map[string]string{ jobAssign: map[string]string{
"src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz", "src": "\x00RES\x00\x00\x00\x0b\x00\x00\x00\x07a\x00b\x00xyz",
"handle": "a", "handle": "a",
"fn": "b", "fn": "b",
"data": "xyz", "data": "xyz",
}, },
JOB_ASSIGN_UNIQ: map[string]string{ jobAssign_UNIQ: map[string]string{
"src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz", "src": "\x00RES\x00\x00\x00\x1F\x00\x00\x00\x09a\x00b\x00c\x00xyz",
"handle": "a", "handle": "a",
"fn": "b", "fn": "b",

View File

@ -19,7 +19,7 @@ func getOutPack() (outpack *outPack) {
// Encode a job to byte slice // Encode a job to byte slice
func (outpack *outPack) Encode() (data []byte) { func (outpack *outPack) Encode() (data []byte) {
var l int var l int
if outpack.dataType == WORK_FAIL { if outpack.dataType == dtWorkFail {
l = len(outpack.handle) l = len(outpack.handle)
} else { } else {
l = len(outpack.data) l = len(outpack.data)
@ -27,20 +27,20 @@ func (outpack *outPack) Encode() (data []byte) {
l += len(outpack.handle) + 1 l += len(outpack.handle) + 1
} }
} }
data = getBuffer(l + MIN_PACKET_LEN) data = getBuffer(l + minPacketLength)
binary.BigEndian.PutUint32(data[:4], REQ) binary.BigEndian.PutUint32(data[:4], req)
binary.BigEndian.PutUint32(data[4:8], outpack.dataType) binary.BigEndian.PutUint32(data[4:8], outpack.dataType)
binary.BigEndian.PutUint32(data[8:MIN_PACKET_LEN], uint32(l)) binary.BigEndian.PutUint32(data[8:minPacketLength], uint32(l))
i := MIN_PACKET_LEN i := minPacketLength
if outpack.handle != "" { if outpack.handle != "" {
hi := len(outpack.handle) + i hi := len(outpack.handle) + i
copy(data[i:hi], []byte(outpack.handle)) copy(data[i:hi], []byte(outpack.handle))
if outpack.dataType != WORK_FAIL { if outpack.dataType != dtWorkFail {
data[hi] = '\x00' data[hi] = '\x00'
} }
i = hi + 1 i = hi + 1
} }
if outpack.dataType != WORK_FAIL { if outpack.dataType != dtWorkFail {
copy(data[i:], outpack.data) copy(data[i:], outpack.data)
} }
return return

View File

@ -7,22 +7,22 @@ import (
var ( var (
outpackcases = map[uint32]map[string]string{ outpackcases = map[uint32]map[string]string{
CAN_DO: map[string]string{ canDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a", "src": "\x00REQ\x00\x00\x00\x01\x00\x00\x00\x01a",
"data": "a", "data": "a",
}, },
CAN_DO_TIMEOUT: map[string]string{ canDo_TIMEOUT: map[string]string{
"src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01", "src": "\x00REQ\x00\x00\x00\x17\x00\x00\x00\x06a\x00\x00\x00\x00\x01",
"data": "a\x00\x00\x00\x00\x01", "data": "a\x00\x00\x00\x00\x01",
}, },
CANT_DO: map[string]string{ cantDo: map[string]string{
"src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a", "src": "\x00REQ\x00\x00\x00\x02\x00\x00\x00\x01a",
"data": "a", "data": "a",
}, },
RESET_ABILITIES: map[string]string{ resetAbilities: map[string]string{
"src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00", "src": "\x00REQ\x00\x00\x00\x03\x00\x00\x00\x00",
}, },
PRE_SLEEP: map[string]string{ preSleep: map[string]string{
"src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00", "src": "\x00REQ\x00\x00\x00\x04\x00\x00\x00\x00",
}, },
GRAB_JOB: map[string]string{ GRAB_JOB: map[string]string{
@ -39,15 +39,15 @@ var (
"src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b", "src": "\x00REQ\x00\x00\x00\x1D\x00\x00\x00\x03a\x00b",
"data": "a\x00b", "data": "a\x00b",
}, },
WORK_STATUS: map[string]string{ workStatus: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100", "src": "\x00REQ\x00\x00\x00\x0C\x00\x00\x00\x08a\x0050\x00100",
"data": "a\x0050\x00100", "data": "a\x0050\x00100",
}, },
WORK_COMPLETE: map[string]string{ workComplete: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b", "src": "\x00REQ\x00\x00\x00\x0D\x00\x00\x00\x03a\x00b",
"data": "a\x00b", "data": "a\x00b",
}, },
WORK_FAIL: map[string]string{ workFail: map[string]string{
"src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a", "src": "\x00REQ\x00\x00\x00\x0E\x00\x00\x00\x01a",
"handle": "a", "handle": "a",
}, },
@ -55,7 +55,7 @@ var (
"src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b", "src": "\x00REQ\x00\x00\x00\x19\x00\x00\x00\x03a\x00b",
"data": "a\x00b", "data": "a\x00b",
}, },
SET_CLIENT_ID: map[string]string{ dtSetClientId: map[string]string{
"src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a", "src": "\x00REQ\x00\x00\x00\x16\x00\x00\x00\x01a",
"data": "a", "data": "a",
}, },

View File

@ -21,7 +21,7 @@ const (
type Worker struct { type Worker struct {
sync.Mutex sync.Mutex
agents []*agent agents []*agent
funcs JobFuncs funcs jobFuncs
in chan *inPack in chan *inPack
running bool running bool
@ -41,8 +41,8 @@ type Worker struct {
func New(limit int) (worker *Worker) { func New(limit int) (worker *Worker) {
worker = &Worker{ worker = &Worker{
agents: make([]*agent, 0, limit), agents: make([]*agent, 0, limit),
funcs: make(JobFuncs), funcs: make(jobFuncs),
in: make(chan *inPack, QUEUE_SIZE), in: make(chan *inPack, queueSize),
} }
if limit != Unlimited { if limit != Unlimited {
worker.limit = make(chan bool, limit-1) worker.limit = make(chan bool, limit-1)
@ -97,10 +97,10 @@ func (worker *Worker) AddFunc(funcname string,
func (worker *Worker) addFunc(funcname string, timeout uint32) { func (worker *Worker) addFunc(funcname string, timeout uint32) {
outpack := getOutPack() outpack := getOutPack()
if timeout == 0 { if timeout == 0 {
outpack.dataType = CAN_DO outpack.dataType = dtCanDo
outpack.data = []byte(funcname) outpack.data = []byte(funcname)
} else { } else {
outpack.dataType = CAN_DO_TIMEOUT outpack.dataType = dtCanDoTimeout
l := len(funcname) l := len(funcname)
outpack.data = getBuffer(l + 5) outpack.data = getBuffer(l + 5)
copy(outpack.data, []byte(funcname)) copy(outpack.data, []byte(funcname))
@ -127,7 +127,7 @@ func (worker *Worker) RemoveFunc(funcname string) (err error) {
// inner remove // inner remove
func (worker *Worker) removeFunc(funcname string) { func (worker *Worker) removeFunc(funcname string) {
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = CANT_DO outpack.dataType = dtCantDo
outpack.data = []byte(funcname) outpack.data = []byte(funcname)
worker.broadcast(outpack) worker.broadcast(outpack)
} }
@ -135,11 +135,11 @@ func (worker *Worker) removeFunc(funcname string) {
// inner package handling // inner package handling
func (worker *Worker) handleInPack(inpack *inPack) { func (worker *Worker) handleInPack(inpack *inPack) {
switch inpack.dataType { switch inpack.dataType {
case NO_JOB: case dtNoJob:
inpack.a.PreSleep() inpack.a.PreSleep()
case NOOP: case dtNoop:
inpack.a.Grab() inpack.a.Grab()
case JOB_ASSIGN, JOB_ASSIGN_UNIQ: case dtJobAssign, dtJobAssignUniq:
go func() { go func() {
if err := worker.exec(inpack); err != nil { if err := worker.exec(inpack); err != nil {
worker.err(err) worker.err(err)
@ -149,10 +149,10 @@ func (worker *Worker) handleInPack(inpack *inPack) {
worker.limit <- true worker.limit <- true
} }
inpack.a.Grab() inpack.a.Grab()
case ERROR: case dtError:
worker.err(inpack.Err()) worker.err(inpack.Err())
fallthrough fallthrough
case ECHO_RES: case dtEchoRes:
fallthrough fallthrough
default: default:
worker.customeHandler(inpack) worker.customeHandler(inpack)
@ -219,7 +219,7 @@ func (worker *Worker) Close() {
// Echo // Echo
func (worker *Worker) Echo(data []byte) { func (worker *Worker) Echo(data []byte) {
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = ECHO_REQ outpack.dataType = dtEchoReq
outpack.data = data outpack.data = data
worker.broadcast(outpack) worker.broadcast(outpack)
} }
@ -228,16 +228,16 @@ func (worker *Worker) Echo(data []byte) {
// Both from the worker and job servers. // Both from the worker and job servers.
func (worker *Worker) Reset() { func (worker *Worker) Reset() {
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = RESET_ABILITIES outpack.dataType = dtResetAbilities
worker.broadcast(outpack) worker.broadcast(outpack)
worker.funcs = make(JobFuncs) worker.funcs = make(jobFuncs)
} }
// Set the worker's unique id. // Set the worker's unique id.
func (worker *Worker) SetId(id string) { func (worker *Worker) SetId(id string) {
worker.Id = id worker.Id = id
outpack := getOutPack() outpack := getOutPack()
outpack.dataType = SET_CLIENT_ID outpack.dataType = dtSetClientId
outpack.data = []byte(id) outpack.data = []byte(id)
worker.broadcast(outpack) worker.broadcast(outpack)
} }
@ -270,12 +270,12 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
if worker.running { if worker.running {
outpack := getOutPack() outpack := getOutPack()
if r.err == nil { if r.err == nil {
outpack.dataType = WORK_COMPLETE outpack.dataType = dtWorkComplete
} else { } else {
if len(r.data) == 0 { if len(r.data) == 0 {
outpack.dataType = WORK_FAIL outpack.dataType = dtWorkFail
} else { } else {
outpack.dataType = WORK_EXCEPTION outpack.dataType = dtWorkException
} }
err = r.err err = r.err
} }