gearman-go/client/client.go

308 lines
6.6 KiB
Go
Raw Normal View History

// Copyright 2011 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
2013-08-29 16:51:23 +08:00
"io"
"net"
"sync"
2013-08-30 11:20:51 +08:00
"github.com/mikespook/golib/idgen"
)
2013-08-29 16:51:23 +08:00
/*
The client side api for gearman
usage:
c := client.New("tcp4", "127.0.0.1:4730")
handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
*/
type Client struct {
2013-08-30 11:20:51 +08:00
net, addr, lastcall string
respHandler map[string]ResponseHandler
innerHandler map[string]ResponseHandler
in chan []byte
isConn bool
conn net.Conn
mutex sync.RWMutex
2013-08-29 16:51:23 +08:00
2013-08-30 11:20:51 +08:00
ErrorHandler ErrorHandler
2013-08-29 16:51:23 +08:00
IdGen idgen.IdGen
}
// Create a new client.
// Connect to "addr" through "network"
// Eg.
// client, err := client.New("127.0.0.1:4730")
2013-08-29 16:51:23 +08:00
func New(net, addr string) (client *Client, err error) {
client = &Client{
2013-08-30 11:20:51 +08:00
net: net,
addr: addr,
respHandler: make(map[string]ResponseHandler, QUEUE_SIZE),
2013-08-29 18:08:05 +08:00
innerHandler: make(map[string]ResponseHandler, QUEUE_SIZE),
2013-08-30 11:20:51 +08:00
in: make(chan []byte, QUEUE_SIZE),
IdGen: idgen.NewObjectId(),
2013-08-29 16:51:23 +08:00
}
if err = client.connect(); err != nil {
return
}
client.isConn = true
go client.readLoop()
go client.processLoop()
return
}
2013-01-14 17:59:48 +08:00
// {{{ private functions
2013-08-29 16:51:23 +08:00
//
2013-01-14 17:59:48 +08:00
func (client *Client) connect() (err error) {
2013-08-29 16:51:23 +08:00
client.conn, err = net.Dial(client.net, client.addr)
return
2013-01-14 17:59:48 +08:00
}
// Internal write
2013-08-29 16:51:23 +08:00
func (client *Client) write(req *request) (err error) {
var n int
buf := req.Encode()
for i := 0; i < len(buf); i += n {
n, err = client.conn.Write(buf[i:])
if err != nil {
return
}
}
return
2013-01-14 17:59:48 +08:00
}
// read length bytes from the socket
2013-08-29 16:51:23 +08:00
func (client *Client) read(length int) (data []byte, err error) {
n := 0
buf := getBuffer(BUFFER_SIZE)
// read until data can be unpacked
for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n {
if n, err = client.conn.Read(buf); err != nil {
if !client.isConn {
err = ErrConnClosed
return
}
if err == io.EOF && n == 0 {
if data == nil {
err = ErrConnection
}
}
return
}
data = append(data, buf[0:n]...)
if n < BUFFER_SIZE {
break
}
}
return
2013-01-14 17:59:48 +08:00
}
2013-08-29 16:51:23 +08:00
// read data from socket
func (client *Client) readLoop() {
var data []byte
var err error
for client.isConn {
if data, err = client.read(BUFFER_SIZE); err != nil {
if err == ErrConnClosed {
break
}
client.err(err)
continue
}
client.in <- data
}
close(client.in)
2013-01-14 17:59:48 +08:00
}
2013-08-29 16:51:23 +08:00
// decode data & process it
func (client *Client) processLoop() {
2013-08-29 18:08:05 +08:00
var resp *Response
2013-08-29 16:51:23 +08:00
var l int
var err error
var data, leftdata []byte
for data = range client.in {
if len(leftdata) > 0 { // some data left for processing
2013-08-29 18:08:05 +08:00
data = append(leftdata, data...)
2013-08-29 16:51:23 +08:00
}
2013-08-30 11:20:51 +08:00
l = len(data)
2013-08-29 16:51:23 +08:00
if l < MIN_PACKET_LEN { // not enough data
leftdata = data
continue
}
if resp, l, err = decodeResponse(data); err != nil {
client.err(err)
continue
}
2013-08-30 11:20:51 +08:00
leftdata = nil
2013-08-29 16:51:23 +08:00
switch resp.DataType {
2013-08-30 11:20:51 +08:00
case ERROR:
if client.lastcall != "" {
client.handleInner(client.lastcall, resp)
client.lastcall = ""
} else {
client.err(GetError(resp.Data))
}
2013-08-29 18:08:05 +08:00
case STATUS_RES:
2013-08-30 11:20:51 +08:00
client.handleInner("s"+resp.Handle, resp)
2013-08-29 18:08:05 +08:00
case JOB_CREATED:
2013-08-30 11:20:51 +08:00
client.handleInner("c", resp)
2013-08-29 18:08:05 +08:00
case ECHO_RES:
2013-08-30 11:20:51 +08:00
client.handleInner("e", resp)
2013-08-29 18:08:05 +08:00
case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE,
WORK_FAIL, WORK_EXCEPTION:
client.handleResponse(resp.Handle, resp)
2013-08-29 16:51:23 +08:00
}
if len(data) > l {
leftdata = data[l:]
}
}
}
// error handler
2013-08-29 16:51:23 +08:00
func (client *Client) err(e error) {
if client.ErrorHandler != nil {
client.ErrorHandler(e)
}
}
// job handler
2013-08-29 18:08:05 +08:00
func (client *Client) handleResponse(key string, resp *Response) {
2013-08-29 16:51:23 +08:00
client.mutex.RLock()
defer client.mutex.RUnlock()
if h, ok := client.respHandler[key]; ok {
h(resp)
2013-08-29 18:08:05 +08:00
delete(client.respHandler, key)
}
}
// job handler
func (client *Client) handleInner(key string, resp *Response) {
if h, ok := client.innerHandler[key]; ok {
h(resp)
delete(client.innerHandler, key)
2013-08-29 16:51:23 +08:00
}
2013-01-14 17:59:48 +08:00
}
// Internal do
2013-01-15 17:55:44 +08:00
func (client *Client) do(funcname string, data []byte,
2013-08-30 11:20:51 +08:00
flag uint32) (handle string, err error) {
2013-08-29 18:08:05 +08:00
id := client.IdGen.Id().(string)
2013-08-30 11:20:51 +08:00
req := getJob(id, []byte(funcname), data)
req.DataType = flag
2013-08-29 16:51:23 +08:00
client.write(req)
var wg sync.WaitGroup
wg.Add(1)
2013-08-29 18:08:05 +08:00
client.mutex.RLock()
2013-08-30 11:20:51 +08:00
client.lastcall = "c"
client.innerHandler["c"] = ResponseHandler(func(resp *Response) {
2013-08-29 16:51:23 +08:00
defer wg.Done()
2013-08-29 18:08:05 +08:00
defer client.mutex.RUnlock()
2013-08-30 11:20:51 +08:00
if resp.DataType == ERROR {
err = GetError(resp.Data)
return
}
2013-08-29 16:51:23 +08:00
handle = resp.Handle
2013-08-29 18:08:05 +08:00
})
2013-08-29 16:51:23 +08:00
wg.Wait()
return
2013-01-14 17:59:48 +08:00
}
// }}}
// Do the function.
// funcname is a string with function name.
// data is encoding to byte array.
2013-08-29 16:51:23 +08:00
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH
2013-01-15 17:55:44 +08:00
func (client *Client) Do(funcname string, data []byte,
2013-08-30 11:20:51 +08:00
flag byte, h ResponseHandler) (handle string, err error) {
2013-08-29 16:51:23 +08:00
var datatype uint32
switch flag {
case JOB_LOW:
datatype = SUBMIT_JOB_LOW
case JOB_HIGH:
datatype = SUBMIT_JOB_HIGH
default:
datatype = SUBMIT_JOB
}
2013-08-30 11:20:51 +08:00
handle, err = client.do(funcname, data, datatype)
2013-08-29 16:51:23 +08:00
client.mutex.Lock()
defer client.mutex.Unlock()
if h != nil {
2013-08-29 18:08:05 +08:00
client.respHandler[handle] = h
2013-08-29 16:51:23 +08:00
}
return
2013-01-14 17:59:48 +08:00
}
2013-08-29 16:51:23 +08:00
// Do the function at background.
// funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH
2013-01-15 17:55:44 +08:00
func (client *Client) DoBg(funcname string, data []byte,
2013-08-30 11:20:51 +08:00
flag byte) (handle string, err error) {
2013-08-29 16:51:23 +08:00
var datatype uint32
switch flag {
case JOB_LOW:
datatype = SUBMIT_JOB_LOW_BG
case JOB_HIGH:
datatype = SUBMIT_JOB_HIGH_BG
default:
datatype = SUBMIT_JOB_BG
}
2013-08-30 11:20:51 +08:00
handle, err = client.do(funcname, data, datatype)
2013-08-29 16:51:23 +08:00
return
}
// Get job status from job server.
// !!!Not fully tested.!!!
2013-08-30 11:20:51 +08:00
func (client *Client) Status(handle string) (status *Status, err error) {
2013-08-29 16:51:23 +08:00
req := getRequest()
req.DataType = GET_STATUS
2013-08-29 18:08:05 +08:00
req.Data = []byte(handle)
2013-08-29 16:51:23 +08:00
client.write(req)
2013-08-29 18:08:05 +08:00
var wg sync.WaitGroup
wg.Add(1)
client.mutex.Lock()
2013-08-30 11:20:51 +08:00
client.lastcall = "s" + handle
client.innerHandler["s" + handle] = ResponseHandler(func(resp *Response) {
2013-08-29 18:08:05 +08:00
defer wg.Done()
defer client.mutex.Unlock()
var err error
status, err = resp.Status()
if err != nil {
client.err(err)
}
})
wg.Wait()
2013-08-29 16:51:23 +08:00
return
}
// Send a something out, get the samething back.
2013-08-29 18:08:05 +08:00
func (client *Client) Echo(data []byte) (echo []byte, err error) {
2013-08-29 16:51:23 +08:00
req := getRequest()
req.DataType = ECHO_REQ
req.Data = data
client.write(req)
2013-08-29 18:08:05 +08:00
var wg sync.WaitGroup
wg.Add(1)
client.mutex.Lock()
2013-08-30 11:20:51 +08:00
client.lastcall = "e"
client.innerHandler["e"] = ResponseHandler(func(resp *Response) {
2013-08-29 18:08:05 +08:00
defer wg.Done()
defer client.mutex.Unlock()
echo = resp.Data
})
wg.Wait()
2013-08-29 16:51:23 +08:00
return
}
// Close
func (client *Client) Close() (err error) {
2013-08-29 16:51:23 +08:00
client.isConn = false
return client.conn.Close()
}