forked from yuxh/gearman-go
d82da8fd71
When receiving a response, what was happening 1. Read bufferSize and it gets assigned to leftdata 2. Read another bufferSize 3. 2 buffers get appended, but leftdata still points to first buffer 4. Process data buffer which contains only complete responses 5. Back to ReadLoop, but leftdata still points to first incomplete buffer causing corrupt data to be processed Solution is to make leftdata nil once we have merged it with the second buffer
296 lines
6.5 KiB
Go
296 lines
6.5 KiB
Go
// The client package helps developers connect to Gearmand, send
|
|
// jobs and fetch result.
|
|
package client
|
|
|
|
import (
|
|
"bufio"
|
|
"net"
|
|
"sync"
|
|
)
|
|
|
|
// One client connect to one server.
|
|
// Use Pool for multi-connections.
|
|
type Client struct {
|
|
sync.Mutex
|
|
|
|
net, addr, lastcall string
|
|
respHandler map[string]ResponseHandler
|
|
innerHandler map[string]ResponseHandler
|
|
in chan *Response
|
|
conn net.Conn
|
|
rw *bufio.ReadWriter
|
|
|
|
ErrorHandler ErrorHandler
|
|
}
|
|
|
|
// Return a client.
|
|
func New(network, addr string) (client *Client, err error) {
|
|
client = &Client{
|
|
net: network,
|
|
addr: addr,
|
|
respHandler: make(map[string]ResponseHandler, queueSize),
|
|
innerHandler: make(map[string]ResponseHandler, queueSize),
|
|
in: make(chan *Response, queueSize),
|
|
}
|
|
client.conn, err = net.Dial(client.net, client.addr)
|
|
if err != nil {
|
|
return
|
|
}
|
|
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
|
|
bufio.NewWriter(client.conn))
|
|
go client.readLoop()
|
|
go client.processLoop()
|
|
return
|
|
}
|
|
|
|
func (client *Client) write(req *request) (err error) {
|
|
var n int
|
|
buf := req.Encode()
|
|
for i := 0; i < len(buf); i += n {
|
|
n, err = client.rw.Write(buf[i:])
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
return client.rw.Flush()
|
|
}
|
|
|
|
func (client *Client) read(length int) (data []byte, err error) {
|
|
n := 0
|
|
buf := getBuffer(bufferSize)
|
|
// read until data can be unpacked
|
|
for i := length; i > 0 || len(data) < minPacketLength; i -= n {
|
|
if n, err = client.rw.Read(buf); err != nil {
|
|
return
|
|
}
|
|
data = append(data, buf[0:n]...)
|
|
if n < bufferSize {
|
|
break
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (client *Client) readLoop() {
|
|
defer close(client.in)
|
|
var data, leftdata []byte
|
|
var err error
|
|
var resp *Response
|
|
ReadLoop:
|
|
for client.conn != nil {
|
|
if data, err = client.read(bufferSize); err != nil {
|
|
if opErr, ok := err.(*net.OpError); ok {
|
|
if opErr.Timeout() {
|
|
client.err(err)
|
|
}
|
|
if opErr.Temporary() {
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
client.err(err)
|
|
// If it is unexpected error and the connection wasn't
|
|
// closed by Gearmand, the client should close the conection
|
|
// and reconnect to job server.
|
|
client.Close()
|
|
client.conn, err = net.Dial(client.net, client.addr)
|
|
if err != nil {
|
|
client.err(err)
|
|
break
|
|
}
|
|
client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
|
|
bufio.NewWriter(client.conn))
|
|
continue
|
|
}
|
|
if len(leftdata) > 0 { // some data left for processing
|
|
data = append(leftdata, data...)
|
|
leftdata = nil
|
|
}
|
|
for {
|
|
l := len(data)
|
|
if l < minPacketLength { // not enough data
|
|
leftdata = data
|
|
continue ReadLoop
|
|
}
|
|
if resp, l, err = decodeResponse(data); err != nil {
|
|
leftdata = data[l:]
|
|
continue ReadLoop
|
|
} else {
|
|
client.in <- resp
|
|
}
|
|
data = data[l:]
|
|
if len(data) > 0 {
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (client *Client) processLoop() {
|
|
for resp := range client.in {
|
|
switch resp.DataType {
|
|
case dtError:
|
|
if client.lastcall != "" {
|
|
resp = client.handleInner(client.lastcall, resp)
|
|
client.lastcall = ""
|
|
} else {
|
|
client.err(getError(resp.Data))
|
|
}
|
|
case dtStatusRes:
|
|
resp = client.handleInner("s"+resp.Handle, resp)
|
|
case dtJobCreated:
|
|
resp = client.handleInner("c", resp)
|
|
case dtEchoRes:
|
|
resp = client.handleInner("e", resp)
|
|
case dtWorkData, dtWorkWarning, dtWorkStatus:
|
|
resp = client.handleResponse(resp.Handle, resp)
|
|
case dtWorkComplete, dtWorkFail, dtWorkException:
|
|
client.handleResponse(resp.Handle, resp)
|
|
delete(client.respHandler, resp.Handle)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (client *Client) err(e error) {
|
|
if client.ErrorHandler != nil {
|
|
client.ErrorHandler(e)
|
|
}
|
|
}
|
|
|
|
func (client *Client) handleResponse(key string, resp *Response) *Response {
|
|
if h, ok := client.respHandler[key]; ok {
|
|
h(resp)
|
|
return nil
|
|
}
|
|
return resp
|
|
}
|
|
|
|
func (client *Client) handleInner(key string, resp *Response) *Response {
|
|
if h, ok := client.innerHandler[key]; ok {
|
|
h(resp)
|
|
delete(client.innerHandler, key)
|
|
return nil
|
|
}
|
|
return resp
|
|
}
|
|
|
|
func (client *Client) do(funcname string, data []byte,
|
|
flag uint32) (handle string, err error) {
|
|
if client.conn == nil {
|
|
return "", ErrLostConn
|
|
}
|
|
var mutex sync.Mutex
|
|
mutex.Lock()
|
|
client.lastcall = "c"
|
|
client.innerHandler["c"] = func(resp *Response) {
|
|
defer mutex.Unlock()
|
|
if resp.DataType == dtError {
|
|
err = getError(resp.Data)
|
|
return
|
|
}
|
|
handle = resp.Handle
|
|
}
|
|
id := IdGen.Id()
|
|
req := getJob(id, []byte(funcname), data)
|
|
req.DataType = flag
|
|
client.write(req)
|
|
mutex.Lock()
|
|
return
|
|
}
|
|
|
|
// Call the function and get a response.
|
|
// flag can be set to: JobLow, JobNormal and JobHigh
|
|
func (client *Client) Do(funcname string, data []byte,
|
|
flag byte, h ResponseHandler) (handle string, err error) {
|
|
var datatype uint32
|
|
switch flag {
|
|
case JobLow:
|
|
datatype = dtSubmitJobLow
|
|
case JobHigh:
|
|
datatype = dtSubmitJobHigh
|
|
default:
|
|
datatype = dtSubmitJob
|
|
}
|
|
handle, err = client.do(funcname, data, datatype)
|
|
if err == nil && h != nil {
|
|
client.respHandler[handle] = h
|
|
}
|
|
return
|
|
}
|
|
|
|
// Call the function in background, no response needed.
|
|
// flag can be set to: JobLow, JobNormal and JobHigh
|
|
func (client *Client) DoBg(funcname string, data []byte,
|
|
flag byte) (handle string, err error) {
|
|
if client.conn == nil {
|
|
return "", ErrLostConn
|
|
}
|
|
var datatype uint32
|
|
switch flag {
|
|
case JobLow:
|
|
datatype = dtSubmitJobLowBg
|
|
case JobHigh:
|
|
datatype = dtSubmitJobHighBg
|
|
default:
|
|
datatype = dtSubmitJobBg
|
|
}
|
|
handle, err = client.do(funcname, data, datatype)
|
|
return
|
|
}
|
|
|
|
// Get job status from job server.
|
|
func (client *Client) Status(handle string) (status *Status, err error) {
|
|
if client.conn == nil {
|
|
return nil, ErrLostConn
|
|
}
|
|
var mutex sync.Mutex
|
|
mutex.Lock()
|
|
client.lastcall = "s" + handle
|
|
client.innerHandler["s"+handle] = func(resp *Response) {
|
|
defer mutex.Unlock()
|
|
var err error
|
|
status, err = resp._status()
|
|
if err != nil {
|
|
client.err(err)
|
|
}
|
|
}
|
|
req := getRequest()
|
|
req.DataType = dtGetStatus
|
|
req.Data = []byte(handle)
|
|
client.write(req)
|
|
mutex.Lock()
|
|
return
|
|
}
|
|
|
|
// Echo.
|
|
func (client *Client) Echo(data []byte) (echo []byte, err error) {
|
|
if client.conn == nil {
|
|
return nil, ErrLostConn
|
|
}
|
|
var mutex sync.Mutex
|
|
mutex.Lock()
|
|
client.innerHandler["e"] = func(resp *Response) {
|
|
echo = resp.Data
|
|
mutex.Unlock()
|
|
}
|
|
req := getRequest()
|
|
req.DataType = dtEchoReq
|
|
req.Data = data
|
|
client.lastcall = "e"
|
|
client.write(req)
|
|
mutex.Lock()
|
|
return
|
|
}
|
|
|
|
// Close connection
|
|
func (client *Client) Close() (err error) {
|
|
client.Lock()
|
|
defer client.Unlock()
|
|
if client.conn != nil {
|
|
err = client.conn.Close()
|
|
client.conn = nil
|
|
}
|
|
return
|
|
}
|