gearman-go/client/client.go

369 lines
9.5 KiB
Go
Raw Normal View History

// Copyright 2011 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
"io"
"net"
2013-01-14 17:59:48 +08:00
"sync"
2012-05-24 19:21:30 +08:00
"time"
"bytes"
"strconv"
2012-12-21 11:11:37 +08:00
"github.com/mikespook/gearman-go/common"
)
2013-02-03 21:34:13 +08:00
var (
IdGen IdGenerator
2013-02-03 21:34:13 +08:00
)
func init() {
IdGen = NewObjectId()
2013-02-03 21:34:13 +08:00
}
// Status handler
// handle, known, running, numerator, denominator
type StatusHandler func(string, bool, bool, uint64, uint64)
/*
The client side api for gearman
usage:
c := client.New("tcp4", "127.0.0.1:4730")
handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
*/
type Client struct {
ErrHandler common.ErrorHandler
2012-05-24 19:21:30 +08:00
TimeOut time.Duration
in chan []byte
out chan *Job
2013-01-15 17:55:44 +08:00
created chan string
echo chan []byte
status chan *Status
jobhandlers map[string]JobHandler
2013-01-14 17:59:48 +08:00
conn net.Conn
addr string
mutex sync.RWMutex
}
// Create a new client.
// Connect to "addr" through "network"
// Eg.
// client, err := client.New("127.0.0.1:4730")
func New(addr string) (client *Client, err error) {
client = &Client{
2013-01-15 17:55:44 +08:00
created: make(chan string, common.QUEUE_SIZE),
echo: make(chan []byte, common.QUEUE_SIZE),
status: make(chan *Status, common.QUEUE_SIZE),
jobhandlers: make(map[string]JobHandler, common.QUEUE_SIZE),
in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE),
2013-01-14 17:59:48 +08:00
addr: addr,
2012-05-24 19:21:30 +08:00
TimeOut: time.Second,
}
2013-01-14 17:59:48 +08:00
if err = client.connect(); err != nil {
return
}
go client.inLoop()
go client.outLoop()
return
}
2013-01-14 17:59:48 +08:00
// {{{ private functions
//
func (client *Client) connect() (err error) {
2013-01-15 17:55:44 +08:00
client.conn, err = net.Dial(common.NETWORK, client.addr)
2013-01-14 17:59:48 +08:00
return
}
// Internal write
func (client *Client) write(buf []byte) (err error) {
var n int
for i := 0; i < len(buf); i += n {
n, err = client.conn.Write(buf[i:])
if err != nil {
return
}
}
return
}
// read length bytes from the socket
func (client *Client) readData(length int) (data []byte, err error) {
n := 0
buf := make([]byte, common.BUFFER_SIZE)
// read until data can be unpacked
for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n {
if n, err = client.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 {
if data == nil {
err = common.ErrConnection
return
}
return data, nil
}
return
}
data = append(data, buf[0:n]...)
if n < common.BUFFER_SIZE {
break
}
}
return
}
// unpack data
func (client *Client) unpack(data []byte) ([]byte, int, bool) {
tl := len(data)
start := 0
for i := 0; i < tl+1-common.PACKET_LEN; i++ {
if start+common.PACKET_LEN > tl { // too few data to unpack, read more
return nil, common.PACKET_LEN, false
}
if string(data[start:start+4]) == common.RES_STR {
l := int(common.BytesToUint32([4]byte{data[start+8],
2013-01-15 17:55:44 +08:00
data[start+9], data[start+10], data[start+11]}))
2013-01-14 17:59:48 +08:00
total := l + common.PACKET_LEN
if total == tl { // data is what we want
return data, common.PACKET_LEN, true
} else if total < tl { // data[:total] is what we want, data[total:] is the more
client.in <- data[total:]
data = data[start:total]
return data, common.PACKET_LEN, true
} else { // ops! It won't be possible.
return nil, total - tl, false
}
} else { // flag was not found, move to next step
start++
}
}
return nil, common.PACKET_LEN, false
}
// Internal read
func (client *Client) read() (rel []byte, err error) {
var data []byte
ok := false
l := common.PACKET_LEN
for !ok {
inlen := len(client.in)
if inlen > 0 {
// in queue is not empty
for i := 0; i < inlen; i++ {
data = append(data, <-client.in...)
}
} else {
var d []byte
d, err = client.readData(l)
if err != nil {
return
}
data = append(data, d...)
}
rel, l, ok = client.unpack(data)
}
return
}
// out loop
func (client *Client) outLoop() {
2013-01-23 17:25:38 +08:00
for job := range client.out {
if err := client.write(job.Encode()); err != nil {
client.err(err)
}
}
}
// in loop
func (client *Client) inLoop() {
2012-05-24 19:21:30 +08:00
defer common.DisablePanic()
for {
rel, err := client.read()
if err != nil {
if err == common.ErrConnection {
2012-05-24 19:21:30 +08:00
client.Close()
}
client.err(err)
2013-01-24 18:13:02 +08:00
break
}
job, err := decodeJob(rel)
if err != nil {
client.err(err)
continue
2013-01-24 18:13:02 +08:00
//break
}
switch job.DataType {
case common.ERROR:
_, err := common.GetError(job.Data)
client.err(err)
2013-01-24 18:13:02 +08:00
case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
2013-01-15 17:55:44 +08:00
common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
2013-01-14 17:59:48 +08:00
client.handleJob(job)
case common.ECHO_RES:
client.handleEcho(job)
case common.JOB_CREATED:
2013-01-14 17:59:48 +08:00
client.handleCreated(job)
case common.STATUS_RES:
2013-01-14 17:59:48 +08:00
client.handleStatus(job)
2013-01-24 18:13:02 +08:00
default:
break
}
}
}
// error handler
func (client *Client) err (e error) {
if client.ErrHandler != nil {
client.ErrHandler(e)
}
}
// job handler
func (client *Client) handleJob(job *Job) {
2013-04-23 16:58:06 +08:00
client.mutex.RLock()
defer client.mutex.RUnlock()
if h, ok := client.jobhandlers[job.Handle]; ok {
2013-01-15 17:55:44 +08:00
h(job)
2013-04-23 16:58:06 +08:00
delete(client.jobhandlers, job.Handle)
2013-01-15 17:55:44 +08:00
}
}
2013-01-14 17:59:48 +08:00
2013-01-15 17:55:44 +08:00
func (client *Client) handleEcho(job *Job) {
client.echo <- job.Data
}
func (client *Client) handleCreated(job *Job) {
client.created <- string(job.Data)
}
// status handler
func (client *Client) handleStatus(job *Job) {
2013-01-15 17:55:44 +08:00
data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
if len(data) != 5 {
client.err(common.Errorf("Invalid data: %V", job.Data))
return
}
status := &Status{}
status.Handle = string(data[0])
status.Known = (data[1][0] == '1')
status.Running = (data[2][0] == '1')
var err error
status.Numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0)
if err != nil {
client.err(common.Errorf("Invalid handle: %s", data[3][0]))
return
}
status.Denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0)
if err != nil {
client.err(common.Errorf("Invalid handle: %s", data[4][0]))
return
}
client.status <- status
}
2013-01-14 17:59:48 +08:00
// Send the job to job server.
func (client *Client) writeJob(job *Job) {
client.out <- job
}
// Internal do
2013-01-15 17:55:44 +08:00
func (client *Client) do(funcname string, data []byte,
flag uint32, id string) (handle string) {
2013-01-14 17:59:48 +08:00
l := len(funcname) + len(id) + len(data) + 2
rel := make([]byte, 0, l)
rel = append(rel, []byte(funcname)...) // len(funcname)
rel = append(rel, '\x00') // 1 Byte
rel = append(rel, []byte(id)...) // len(uid)
rel = append(rel, '\x00') // 1 Byte
rel = append(rel, data...) // len(data)
client.writeJob(newJob(common.REQ, flag, rel))
2013-01-15 17:55:44 +08:00
// Waiting for JOB_CREATED
handle = <-client.created
2013-01-14 17:59:48 +08:00
return
}
// }}}
// Do the function.
// funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
// and if it is background job: JOB_BG.
// JOB_LOW | JOB_BG means the job is running with low level in background.
2013-01-15 17:55:44 +08:00
func (client *Client) Do(funcname string, data []byte,
flag byte, jobhandler JobHandler) (handle string) {
var datatype uint32
2013-01-14 17:59:48 +08:00
switch flag {
case JOB_LOW :
datatype = common.SUBMIT_JOB_LOW
case JOB_HIGH :
datatype = common.SUBMIT_JOB_HIGH
default:
datatype = common.SUBMIT_JOB
}
id := IdGen.Id()
2013-04-23 16:58:06 +08:00
client.mutex.Lock()
defer client.mutex.Unlock()
handle = client.do(funcname, data, datatype, id)
2013-01-15 17:55:44 +08:00
if jobhandler != nil {
2013-04-23 16:58:06 +08:00
client.jobhandlers[handle] = jobhandler
2013-01-15 17:55:44 +08:00
}
return
2013-01-14 17:59:48 +08:00
}
2013-01-15 17:55:44 +08:00
func (client *Client) DoBg(funcname string, data []byte,
flag byte) (handle string) {
2013-01-14 17:59:48 +08:00
var datatype uint32
switch flag {
case JOB_LOW :
datatype = common.SUBMIT_JOB_LOW_BG
case JOB_HIGH :
datatype = common.SUBMIT_JOB_HIGH_BG
default:
datatype = common.SUBMIT_JOB_BG
2012-05-24 19:21:30 +08:00
}
handle = client.do(funcname, data, datatype, "")
2013-01-15 17:55:44 +08:00
return
}
2013-01-14 17:59:48 +08:00
// Get job status from job server.
// !!!Not fully tested.!!!
2013-04-23 16:58:06 +08:00
func (client *Client) Status(handle string, timeout time.Duration) (status *Status, err error) {
2013-01-14 17:59:48 +08:00
client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle)))
2013-04-23 16:58:06 +08:00
select {
case status = <-client.status:
case <-time.NewTimer(timeout).C:
err = common.ErrTimeOut
}
2013-01-15 17:55:44 +08:00
return
}
// Send a something out, get the samething back.
2013-01-15 17:55:44 +08:00
func (client *Client) Echo(data []byte) (r []byte) {
client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
2013-01-15 17:55:44 +08:00
r = <-client.echo
return
}
// Close
func (client *Client) Close() (err error) {
2013-01-24 18:13:02 +08:00
close(client.in)
close(client.out)
2013-01-15 17:55:44 +08:00
close(client.echo)
close(client.created)
close(client.status)
return client.conn.Close();
}