|
|
@@ -5,18 +5,18 @@ |
|
|
|
package worker |
|
|
|
|
|
|
|
import ( |
|
|
|
"github.com/miraclesu/gearman-go/common" |
|
|
|
"io" |
|
|
|
"net" |
|
|
|
"github.com/mikespook/gearman-go/common" |
|
|
|
) |
|
|
|
|
|
|
|
// The agent of job server. |
|
|
|
type agent struct { |
|
|
|
conn net.Conn |
|
|
|
worker *Worker |
|
|
|
in chan []byte |
|
|
|
out chan *Job |
|
|
|
addr string |
|
|
|
conn net.Conn |
|
|
|
worker *Worker |
|
|
|
in chan []byte |
|
|
|
out chan *Job |
|
|
|
addr string |
|
|
|
} |
|
|
|
|
|
|
|
// Create the agent of job server. |
|
|
@@ -26,11 +26,11 @@ func newAgent(addr string, worker *Worker) (a *agent, err error) { |
|
|
|
return |
|
|
|
} |
|
|
|
a = &agent{ |
|
|
|
conn: conn, |
|
|
|
conn: conn, |
|
|
|
worker: worker, |
|
|
|
addr: addr, |
|
|
|
in: make(chan []byte, common.QUEUE_SIZE), |
|
|
|
out: make(chan *Job, common.QUEUE_SIZE), |
|
|
|
addr: addr, |
|
|
|
in: make(chan []byte, common.QUEUE_SIZE), |
|
|
|
out: make(chan *Job, common.QUEUE_SIZE), |
|
|
|
} |
|
|
|
// reset abilities |
|
|
|
a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil)) |
|
|
@@ -62,12 +62,12 @@ func (a *agent) inLoop() { |
|
|
|
}() |
|
|
|
for a.worker.running { |
|
|
|
a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil)) |
|
|
|
RESTART: |
|
|
|
RESTART: |
|
|
|
// got noop msg and in queue is zero, grab job |
|
|
|
rel, err := a.read() |
|
|
|
if err != nil { |
|
|
|
if err == common.ErrConnection { |
|
|
|
for i:= 0; i < 3 && a.worker.running; i++ { |
|
|
|
for i := 0; i < 3 && a.worker.running; i++ { |
|
|
|
if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { |
|
|
|
a.worker.err(common.Errorf("Reconnection: %d faild", i)) |
|
|
|
continue |
|
|
@@ -106,64 +106,78 @@ func (a *agent) Work() { |
|
|
|
go a.inLoop() |
|
|
|
} |
|
|
|
|
|
|
|
// Internal read |
|
|
|
func (a *agent) read() (data []byte, err error) { |
|
|
|
BEGIN: |
|
|
|
inlen := len(a.in) |
|
|
|
if inlen > 0 { |
|
|
|
// in queue is not empty |
|
|
|
for i := 0; i < inlen; i ++ { |
|
|
|
data = append(data, <-a.in...) |
|
|
|
} |
|
|
|
} else { |
|
|
|
for i := 0; i < 10; i ++ { |
|
|
|
buf := make([]byte, common.BUFFER_SIZE) |
|
|
|
var n int |
|
|
|
if n, err = a.conn.Read(buf); err != nil { |
|
|
|
if err == io.EOF && n == 0 { |
|
|
|
if data == nil { |
|
|
|
err = common.ErrConnection |
|
|
|
return |
|
|
|
} |
|
|
|
break |
|
|
|
func (a *agent) readData(length int) (data []byte, err error) { |
|
|
|
n := 0 |
|
|
|
buf := make([]byte, common.BUFFER_SIZE) |
|
|
|
// read until data can be unpacked |
|
|
|
for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n { |
|
|
|
if n, err = a.conn.Read(buf); err != nil { |
|
|
|
if err == io.EOF && n == 0 { |
|
|
|
if data == nil { |
|
|
|
err = common.ErrConnection |
|
|
|
return |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
data = append(data, buf[0:n]...) |
|
|
|
if n < common.BUFFER_SIZE { |
|
|
|
break |
|
|
|
return data, nil |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
data = append(data, buf[0:n]...) |
|
|
|
if n < common.BUFFER_SIZE { |
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
// split package |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func (a *agent) unpack(data []byte) ([]byte, int, bool) { |
|
|
|
tl := len(data) |
|
|
|
if tl < 12 { // too few data to unpack, read more |
|
|
|
goto BEGIN |
|
|
|
} |
|
|
|
start := 0 |
|
|
|
for i := 0; i < tl - 11; i++ { |
|
|
|
if start + 12 > tl { // too few data to unpack, read more |
|
|
|
goto BEGIN |
|
|
|
for i := 0; i < tl+1-common.PACKET_LEN; i++ { |
|
|
|
if start+common.PACKET_LEN > tl { // too few data to unpack, read more |
|
|
|
return nil, common.PACKET_LEN, false |
|
|
|
} |
|
|
|
if string(data[start:start+4]) == common.RES_STR { |
|
|
|
l := int(common.BytesToUint32([4]byte{data[start+8], |
|
|
|
data[start+9], data[start+10], data[start+11]})) |
|
|
|
total := l + 12 |
|
|
|
data[start+9], data[start+10], data[start+11]})) |
|
|
|
total := l + common.PACKET_LEN |
|
|
|
if total == tl { // data is what we want |
|
|
|
return |
|
|
|
} else if total < tl{ // data[:total] is what we want, data[total:] is the more |
|
|
|
return data, common.PACKET_LEN, true |
|
|
|
} else if total < tl { // data[:total] is what we want, data[total:] is the more |
|
|
|
a.in <- data[total:] |
|
|
|
data = data[:total] |
|
|
|
return |
|
|
|
data = data[start:total] |
|
|
|
return data, common.PACKET_LEN, true |
|
|
|
} else { // ops! It won't be possible. |
|
|
|
goto BEGIN |
|
|
|
return nil, total - tl, false |
|
|
|
} |
|
|
|
} else { // flag was not found, move to next step |
|
|
|
start++ |
|
|
|
} |
|
|
|
} |
|
|
|
goto BEGIN |
|
|
|
return nil, common.Errorf("Invalid data: %V", data) |
|
|
|
return nil, common.PACKET_LEN, false |
|
|
|
} |
|
|
|
|
|
|
|
func (a *agent) read() (rel []byte, err error) { |
|
|
|
var data []byte |
|
|
|
ok := false |
|
|
|
l := common.PACKET_LEN |
|
|
|
for !ok { |
|
|
|
inlen := len(a.in) |
|
|
|
if inlen > 0 { |
|
|
|
// in queue is not empty |
|
|
|
for i := 0; i < inlen; i++ { |
|
|
|
data = append(data, <-a.in...) |
|
|
|
} |
|
|
|
} else { |
|
|
|
var d []byte |
|
|
|
d, err = a.readData(l) |
|
|
|
if err != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
data = append(data, d...) |
|
|
|
} |
|
|
|
rel, l, ok = a.unpack(data) |
|
|
|
} |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// Send a job to the job server. |
|
|
|