gearman-go/client/client.go
Graham Barr d82da8fd71 Avoid read channel corruption when response size > bufferSize
When receiving a response, what was happening

1. Read bufferSize and it gets assigned to leftdata
2. Read another bufferSize
3. 2 buffers get appended, but leftdata still points to first buffer
4. Process data buffer which contains only complete responses
5. Back to ReadLoop, but leftdata still points to first incomplete buffer
    causing corrupt data to be processed

Solution is to make leftdata nil once we have merged it with the second buffer
2014-08-18 12:35:31 -05:00

296 lines
6.5 KiB
Go

// The client package helps developers connect to Gearmand, send
// jobs and fetch result.
package client
import (
"bufio"
"net"
"sync"
)
// One client connect to one server.
// Use Pool for multi-connections.
type Client struct {
sync.Mutex
net, addr, lastcall string
respHandler map[string]ResponseHandler
innerHandler map[string]ResponseHandler
in chan *Response
conn net.Conn
rw *bufio.ReadWriter
ErrorHandler ErrorHandler
}
// Return a client.
func New(network, addr string) (client *Client, err error) {
client = &Client{
net: network,
addr: addr,
respHandler: make(map[string]ResponseHandler, queueSize),
innerHandler: make(map[string]ResponseHandler, queueSize),
in: make(chan *Response, queueSize),
}
client.conn, err = net.Dial(client.net, client.addr)
if err != nil {
return
}
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
bufio.NewWriter(client.conn))
go client.readLoop()
go client.processLoop()
return
}
func (client *Client) write(req *request) (err error) {
var n int
buf := req.Encode()
for i := 0; i < len(buf); i += n {
n, err = client.rw.Write(buf[i:])
if err != nil {
return
}
}
return client.rw.Flush()
}
func (client *Client) read(length int) (data []byte, err error) {
n := 0
buf := getBuffer(bufferSize)
// read until data can be unpacked
for i := length; i > 0 || len(data) < minPacketLength; i -= n {
if n, err = client.rw.Read(buf); err != nil {
return
}
data = append(data, buf[0:n]...)
if n < bufferSize {
break
}
}
return
}
func (client *Client) readLoop() {
defer close(client.in)
var data, leftdata []byte
var err error
var resp *Response
ReadLoop:
for client.conn != nil {
if data, err = client.read(bufferSize); err != nil {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Timeout() {
client.err(err)
}
if opErr.Temporary() {
continue
}
break
}
client.err(err)
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the client should close the conection
// and reconnect to job server.
client.Close()
client.conn, err = net.Dial(client.net, client.addr)
if err != nil {
client.err(err)
break
}
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
bufio.NewWriter(client.conn))
continue
}
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
leftdata = nil
}
for {
l := len(data)
if l < minPacketLength { // not enough data
leftdata = data
continue ReadLoop
}
if resp, l, err = decodeResponse(data); err != nil {
leftdata = data[l:]
continue ReadLoop
} else {
client.in <- resp
}
data = data[l:]
if len(data) > 0 {
continue
}
break
}
}
}
func (client *Client) processLoop() {
for resp := range client.in {
switch resp.DataType {
case dtError:
if client.lastcall != "" {
resp = client.handleInner(client.lastcall, resp)
client.lastcall = ""
} else {
client.err(getError(resp.Data))
}
case dtStatusRes:
resp = client.handleInner("s"+resp.Handle, resp)
case dtJobCreated:
resp = client.handleInner("c", resp)
case dtEchoRes:
resp = client.handleInner("e", resp)
case dtWorkData, dtWorkWarning, dtWorkStatus:
resp = client.handleResponse(resp.Handle, resp)
case dtWorkComplete, dtWorkFail, dtWorkException:
client.handleResponse(resp.Handle, resp)
delete(client.respHandler, resp.Handle)
}
}
}
func (client *Client) err(e error) {
if client.ErrorHandler != nil {
client.ErrorHandler(e)
}
}
func (client *Client) handleResponse(key string, resp *Response) *Response {
if h, ok := client.respHandler[key]; ok {
h(resp)
return nil
}
return resp
}
func (client *Client) handleInner(key string, resp *Response) *Response {
if h, ok := client.innerHandler[key]; ok {
h(resp)
delete(client.innerHandler, key)
return nil
}
return resp
}
func (client *Client) do(funcname string, data []byte,
flag uint32) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var mutex sync.Mutex
mutex.Lock()
client.lastcall = "c"
client.innerHandler["c"] = func(resp *Response) {
defer mutex.Unlock()
if resp.DataType == dtError {
err = getError(resp.Data)
return
}
handle = resp.Handle
}
id := IdGen.Id()
req := getJob(id, []byte(funcname), data)
req.DataType = flag
client.write(req)
mutex.Lock()
return
}
// Call the function and get a response.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) Do(funcname string, data []byte,
flag byte, h ResponseHandler) (handle string, err error) {
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLow
case JobHigh:
datatype = dtSubmitJobHigh
default:
datatype = dtSubmitJob
}
handle, err = client.do(funcname, data, datatype)
if err == nil && h != nil {
client.respHandler[handle] = h
}
return
}
// Call the function in background, no response needed.
// flag can be set to: JobLow, JobNormal and JobHigh
func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string, err error) {
if client.conn == nil {
return "", ErrLostConn
}
var datatype uint32
switch flag {
case JobLow:
datatype = dtSubmitJobLowBg
case JobHigh:
datatype = dtSubmitJobHighBg
default:
datatype = dtSubmitJobBg
}
handle, err = client.do(funcname, data, datatype)
return
}
// Get job status from job server.
func (client *Client) Status(handle string) (status *Status, err error) {
if client.conn == nil {
return nil, ErrLostConn
}
var mutex sync.Mutex
mutex.Lock()
client.lastcall = "s" + handle
client.innerHandler["s"+handle] = func(resp *Response) {
defer mutex.Unlock()
var err error
status, err = resp._status()
if err != nil {
client.err(err)
}
}
req := getRequest()
req.DataType = dtGetStatus
req.Data = []byte(handle)
client.write(req)
mutex.Lock()
return
}
// Echo.
func (client *Client) Echo(data []byte) (echo []byte, err error) {
if client.conn == nil {
return nil, ErrLostConn
}
var mutex sync.Mutex
mutex.Lock()
client.innerHandler["e"] = func(resp *Response) {
echo = resp.Data
mutex.Unlock()
}
req := getRequest()
req.DataType = dtEchoReq
req.Data = data
client.lastcall = "e"
client.write(req)
mutex.Lock()
return
}
// Close connection
func (client *Client) Close() (err error) {
client.Lock()
defer client.Unlock()
if client.conn != nil {
err = client.conn.Close()
client.conn = nil
}
return
}