gearman-go/worker/agent.go

225 lines
3.9 KiB
Go
Raw Normal View History

package worker
import (
2014-01-09 17:58:02 +08:00
"bufio"
2014-03-09 08:22:14 +08:00
"bytes"
"encoding/binary"
2015-01-06 11:45:18 +08:00
"io"
2013-08-30 12:36:57 +08:00
"net"
2013-12-26 12:06:47 +08:00
"sync"
)
// The agent of job server.
type agent struct {
2013-12-26 12:06:47 +08:00
sync.Mutex
2013-12-23 17:05:42 +08:00
conn net.Conn
2014-01-09 17:58:02 +08:00
rw *bufio.ReadWriter
2013-12-23 17:05:42 +08:00
worker *Worker
in chan []byte
net, addr string
}
// Create the agent of job server.
2013-08-30 18:01:10 +08:00
func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
2013-08-30 12:36:57 +08:00
a = &agent{
2013-12-23 17:05:42 +08:00
net: net,
2013-08-30 12:36:57 +08:00
addr: addr,
2013-08-30 18:01:10 +08:00
worker: worker,
in: make(chan []byte, queueSize),
2013-08-30 12:36:57 +08:00
}
return
}
2013-08-30 18:01:10 +08:00
func (a *agent) Connect() (err error) {
2013-12-26 12:06:47 +08:00
a.Lock()
defer a.Unlock()
2013-08-30 18:01:10 +08:00
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
return
2013-08-30 12:36:57 +08:00
}
2014-01-09 16:16:34 +08:00
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
2013-12-24 14:35:33 +08:00
go a.work()
2013-08-30 18:01:10 +08:00
return
}
2013-12-24 14:35:33 +08:00
func (a *agent) work() {
2013-12-29 17:09:06 +08:00
defer func() {
if err := recover(); err != nil {
a.worker.err(err.(error))
}
}()
var inpack *inPack
2013-08-30 18:01:10 +08:00
var l int
var err error
var data, leftdata []byte
2013-12-24 14:35:33 +08:00
for {
2014-03-09 08:22:14 +08:00
if data, err = a.read(); err != nil {
2014-03-03 14:45:35 +08:00
if opErr, ok := err.(*net.OpError); ok {
if opErr.Temporary() {
continue
2015-01-06 11:45:18 +08:00
} else {
2014-06-10 10:46:21 +08:00
a.disconnect_error(err)
// else - we're probably dc'ing due to a Close()
break
2014-03-03 14:45:35 +08:00
}
2015-01-06 11:45:18 +08:00
} else if err == io.EOF {
2014-06-10 10:46:21 +08:00
a.disconnect_error(err)
break
2013-12-26 12:06:47 +08:00
}
2013-12-26 15:55:16 +08:00
a.worker.err(err)
2013-12-26 12:06:47 +08:00
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the agent should close the conection
2013-12-26 12:06:47 +08:00
// and reconnect to job server.
a.Close()
2013-12-26 12:06:47 +08:00
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
a.worker.err(err)
break
}
2014-01-09 16:16:34 +08:00
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
}
2013-08-30 18:01:10 +08:00
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
2013-08-30 12:36:57 +08:00
}
if len(data) < minPacketLength { // not enough data
2013-08-30 18:01:10 +08:00
leftdata = data
2013-08-30 12:36:57 +08:00
continue
}
2016-04-20 12:54:21 +08:00
for {
if inpack, l, err = decodeInPack(data); err != nil {
a.worker.err(err)
leftdata = data
break
} else {
leftdata = nil
inpack.a = a
select {
case <-a.worker.closed:
return
default:
}
2016-04-20 12:54:21 +08:00
a.worker.in <- inpack
if len(data) == l {
break
}
if len(data) > l {
data = data[l:]
}
}
2013-08-30 18:01:10 +08:00
}
}
}
2015-01-06 11:45:18 +08:00
func (a *agent) disconnect_error(err error) {
a.Lock()
defer a.Unlock()
2015-01-06 11:45:18 +08:00
if a.conn != nil {
2014-06-10 10:46:21 +08:00
err = &WorkerDisconnectError{
2015-01-06 11:45:18 +08:00
err: err,
agent: a,
2014-06-10 10:46:21 +08:00
}
a.worker.err(err)
}
}
2012-05-28 10:34:16 +08:00
func (a *agent) Close() {
2013-12-26 12:06:47 +08:00
a.Lock()
defer a.Unlock()
if a.conn != nil {
a.conn.Close()
a.conn = nil
}
2013-12-23 17:01:01 +08:00
}
func (a *agent) Grab() {
2013-12-26 12:06:47 +08:00
a.Lock()
defer a.Unlock()
a.grab()
}
2015-01-06 11:45:18 +08:00
func (a *agent) grab() {
2013-12-23 17:01:01 +08:00
outpack := getOutPack()
outpack.dataType = dtGrabJobUniq
2013-12-23 17:01:01 +08:00
a.write(outpack)
}
func (a *agent) PreSleep() {
2013-12-26 12:06:47 +08:00
a.Lock()
defer a.Unlock()
2013-12-23 17:01:01 +08:00
outpack := getOutPack()
outpack.dataType = dtPreSleep
2013-12-23 17:01:01 +08:00
a.write(outpack)
2012-05-28 10:34:16 +08:00
}
2015-01-06 11:45:18 +08:00
func (a *agent) reconnect() error {
a.Lock()
defer a.Unlock()
conn, err := net.Dial(a.net, a.addr)
if err != nil {
2015-01-06 11:45:18 +08:00
return err
}
a.conn = conn
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
2015-01-06 11:45:18 +08:00
bufio.NewWriter(a.conn))
a.worker.reRegisterFuncsForAgent(a)
a.grab()
go a.work()
return nil
}
2013-08-30 18:01:10 +08:00
// read length bytes from the socket
2014-03-09 08:22:14 +08:00
func (a *agent) read() (data []byte, err error) {
2013-08-30 12:36:57 +08:00
n := 0
2014-03-09 08:22:14 +08:00
tmp := getBuffer(bufferSize)
var buf bytes.Buffer
// read the header so we can get the length of the data
if n, err = a.rw.Read(tmp); err != nil {
return
}
dl := int(binary.BigEndian.Uint32(tmp[8:12]))
// write what we read so far
buf.Write(tmp[:n])
// read until we receive all the data
for buf.Len() < dl+minPacketLength {
if n, err = a.rw.Read(tmp); err != nil {
return buf.Bytes(), err
2013-08-30 12:36:57 +08:00
}
2014-03-09 08:22:14 +08:00
buf.Write(tmp[:n])
2013-08-30 12:36:57 +08:00
}
2014-03-09 08:22:14 +08:00
return buf.Bytes(), err
2012-12-28 21:19:58 +08:00
}
// Internal write the encoded job.
func (a *agent) write(outpack *outPack) (err error) {
2013-08-30 12:36:57 +08:00
var n int
buf := outpack.Encode()
2013-08-30 12:36:57 +08:00
for i := 0; i < len(buf); i += n {
2014-01-09 16:16:34 +08:00
n, err = a.rw.Write(buf[i:])
2013-08-30 12:36:57 +08:00
if err != nil {
return err
}
}
2014-01-09 16:16:34 +08:00
return a.rw.Flush()
}
// Write with lock
func (a *agent) Write(outpack *outPack) (err error) {
a.Lock()
defer a.Unlock()
return a.write(outpack)
}