diff --git a/common/gearman.go b/common/gearman.go index 969c457..8ddad56 100644 --- a/common/gearman.go +++ b/common/gearman.go @@ -16,6 +16,8 @@ const ( QUEUE_SIZE = 8 // read buffer size BUFFER_SIZE = 1024 + // min packet length + PACKET_LEN = 12 // \x00REQ REQ = 5391697 diff --git a/worker/agent.go b/worker/agent.go index 708cf50..d1ff750 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -5,18 +5,18 @@ package worker import ( + "github.com/miraclesu/gearman-go/common" "io" "net" - "github.com/mikespook/gearman-go/common" ) // The agent of job server. type agent struct { - conn net.Conn - worker *Worker - in chan []byte - out chan *Job - addr string + conn net.Conn + worker *Worker + in chan []byte + out chan *Job + addr string } // Create the agent of job server. @@ -26,11 +26,11 @@ func newAgent(addr string, worker *Worker) (a *agent, err error) { return } a = &agent{ - conn: conn, + conn: conn, worker: worker, - addr: addr, - in: make(chan []byte, common.QUEUE_SIZE), - out: make(chan *Job, common.QUEUE_SIZE), + addr: addr, + in: make(chan []byte, common.QUEUE_SIZE), + out: make(chan *Job, common.QUEUE_SIZE), } // reset abilities a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil)) @@ -62,12 +62,12 @@ func (a *agent) inLoop() { }() for a.worker.running { a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) - RESTART: + RESTART: // got noop msg and in queue is zero, grab job rel, err := a.read() if err != nil { if err == common.ErrConnection { - for i:= 0; i < 3 && a.worker.running; i++ { + for i := 0; i < 3 && a.worker.running; i++ { if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { a.worker.err(common.Errorf("Reconnection: %d faild", i)) continue @@ -106,64 +106,78 @@ func (a *agent) Work() { go a.inLoop() } -// Internal read -func (a *agent) read() (data []byte, err error) { -BEGIN: - inlen := len(a.in) - if inlen > 0 { - // in queue is not empty - for i := 0; i < inlen; i ++ { - data = append(data, <-a.in...) - } - } else { - for i := 0; i < 10; i ++ { - buf := make([]byte, common.BUFFER_SIZE) - var n int - if n, err = a.conn.Read(buf); err != nil { - if err == io.EOF && n == 0 { - if data == nil { - err = common.ErrConnection - return - } - break +func (a *agent) readData(length int) (data []byte, err error) { + n := 0 + buf := make([]byte, common.BUFFER_SIZE) + // read until data can be unpacked + for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n { + if n, err = a.conn.Read(buf); err != nil { + if err == io.EOF && n == 0 { + if data == nil { + err = common.ErrConnection + return } - return - } - data = append(data, buf[0:n]...) - if n < common.BUFFER_SIZE { - break + return data, nil } + return + } + data = append(data, buf[0:n]...) + if n < common.BUFFER_SIZE { + break } } - // split package + return +} + +func (a *agent) unpack(data []byte) ([]byte, int, bool) { tl := len(data) - if tl < 12 { // too few data to unpack, read more - goto BEGIN - } start := 0 - for i := 0; i < tl - 11; i++ { - if start + 12 > tl { // too few data to unpack, read more - goto BEGIN + for i := 0; i < tl+1-common.PACKET_LEN; i++ { + if start+common.PACKET_LEN > tl { // too few data to unpack, read more + return nil, common.PACKET_LEN, false } if string(data[start:start+4]) == common.RES_STR { l := int(common.BytesToUint32([4]byte{data[start+8], - data[start+9], data[start+10], data[start+11]})) - total := l + 12 + data[start+9], data[start+10], data[start+11]})) + total := l + common.PACKET_LEN if total == tl { // data is what we want - return - } else if total < tl{ // data[:total] is what we want, data[total:] is the more + return data, common.PACKET_LEN, true + } else if total < tl { // data[:total] is what we want, data[total:] is the more a.in <- data[total:] - data = data[:total] - return + data = data[start:total] + return data, common.PACKET_LEN, true } else { // ops! It won't be possible. - goto BEGIN + return nil, total - tl, false } } else { // flag was not found, move to next step start++ } } - goto BEGIN - return nil, common.Errorf("Invalid data: %V", data) + return nil, common.PACKET_LEN, false +} + +func (a *agent) read() (rel []byte, err error) { + var data []byte + ok := false + l := common.PACKET_LEN + for !ok { + inlen := len(a.in) + if inlen > 0 { + // in queue is not empty + for i := 0; i < inlen; i++ { + data = append(data, <-a.in...) + } + } else { + var d []byte + d, err = a.readData(l) + if err != nil { + return + } + data = append(data, d...) + } + rel, l, ok = a.unpack(data) + } + return } // Send a job to the job server.