gearman-go/worker/agent.go

154 lines
2.7 KiB
Go
Raw Normal View History

package worker
import (
2013-08-30 12:36:57 +08:00
"io"
"net"
2013-12-26 15:55:16 +08:00
"strings"
2013-12-26 12:06:47 +08:00
"sync"
)
// The agent of job server.
type agent struct {
2013-12-26 12:06:47 +08:00
sync.Mutex
2013-12-23 17:05:42 +08:00
conn net.Conn
worker *Worker
in chan []byte
net, addr string
}
// Create the agent of job server.
2013-08-30 18:01:10 +08:00
func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
2013-08-30 12:36:57 +08:00
a = &agent{
2013-12-23 17:05:42 +08:00
net: net,
2013-08-30 12:36:57 +08:00
addr: addr,
2013-08-30 18:01:10 +08:00
worker: worker,
in: make(chan []byte, queueSize),
2013-08-30 12:36:57 +08:00
}
return
}
2013-08-30 18:01:10 +08:00
func (a *agent) Connect() (err error) {
2013-12-26 12:06:47 +08:00
a.Lock()
defer a.Unlock()
2013-08-30 18:01:10 +08:00
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
return
2013-08-30 12:36:57 +08:00
}
2013-12-24 14:35:33 +08:00
go a.work()
2013-08-30 18:01:10 +08:00
return
}
2013-12-24 14:35:33 +08:00
func (a *agent) work() {
2013-12-29 17:09:06 +08:00
defer func() {
if err := recover(); err != nil {
a.worker.err(err.(error))
}
}()
var inpack *inPack
2013-08-30 18:01:10 +08:00
var l int
var err error
var data, leftdata []byte
2013-12-24 14:35:33 +08:00
for {
if data, err = a.read(bufferSize); err != nil {
2013-12-26 12:06:47 +08:00
if err == ErrLostConn {
break
}
2013-12-26 15:55:16 +08:00
a.worker.err(err)
2013-12-26 12:06:47 +08:00
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the agent should close the conection
2013-12-26 12:06:47 +08:00
// and reconnect to job server.
a.Close()
2013-12-26 12:06:47 +08:00
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
a.worker.err(err)
break
}
}
2013-08-30 18:01:10 +08:00
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
2013-08-30 12:36:57 +08:00
}
if len(data) < minPacketLength { // not enough data
2013-08-30 18:01:10 +08:00
leftdata = data
2013-08-30 12:36:57 +08:00
continue
}
if inpack, l, err = decodeInPack(data); err != nil {
2013-08-30 12:36:57 +08:00
a.worker.err(err)
continue
}
2013-08-30 18:01:10 +08:00
leftdata = nil
inpack.a = a
a.worker.in <- inpack
2013-08-30 18:01:10 +08:00
if len(data) > l {
leftdata = data[l:]
}
}
}
2012-05-28 10:34:16 +08:00
func (a *agent) Close() {
2013-12-26 12:06:47 +08:00
a.Lock()
defer a.Unlock()
if a.conn != nil {
a.conn.Close()
a.conn = nil
}
2013-12-23 17:01:01 +08:00
}
func (a *agent) Grab() {
2013-12-26 12:06:47 +08:00
a.Lock()
defer a.Unlock()
2013-12-23 17:01:01 +08:00
outpack := getOutPack()
outpack.dataType = dtGrabJobUniq
2013-12-23 17:01:01 +08:00
a.write(outpack)
}
func (a *agent) PreSleep() {
2013-12-26 12:06:47 +08:00
a.Lock()
defer a.Unlock()
2013-12-23 17:01:01 +08:00
outpack := getOutPack()
outpack.dataType = dtPreSleep
2013-12-23 17:01:01 +08:00
a.write(outpack)
2012-05-28 10:34:16 +08:00
}
2013-12-26 15:55:16 +08:00
func isClosed(err error) bool {
switch {
case err == io.EOF:
fallthrough
case strings.Contains(err.Error(), "use of closed network connection"):
return true
}
return false
}
2013-08-30 18:01:10 +08:00
// read length bytes from the socket
func (a *agent) read(length int) (data []byte, err error) {
2013-08-30 12:36:57 +08:00
n := 0
buf := getBuffer(bufferSize)
2013-08-30 12:36:57 +08:00
// read until data can be unpacked
for i := length; i > 0 || len(data) < minPacketLength; i -= n {
2013-08-30 12:36:57 +08:00
if n, err = a.conn.Read(buf); err != nil {
2013-12-26 15:55:16 +08:00
if isClosed(err) {
2013-12-26 12:06:47 +08:00
err = ErrLostConn
2013-08-30 12:36:57 +08:00
}
return
}
data = append(data, buf[0:n]...)
if n < bufferSize {
2013-08-30 12:36:57 +08:00
break
}
}
return
2012-12-28 21:19:58 +08:00
}
// Internal write the encoded job.
func (a *agent) write(outpack *outPack) (err error) {
2013-08-30 12:36:57 +08:00
var n int
buf := outpack.Encode()
2013-08-30 12:36:57 +08:00
for i := 0; i < len(buf); i += n {
n, err = a.conn.Write(buf[i:])
if err != nil {
return err
}
}
return
}