start refactoring...

This commit is contained in:
mikespook 2013-01-14 17:59:48 +08:00
parent aa9b5e9598
commit dbc06bf540
3 changed files with 242 additions and 150 deletions

View File

@ -8,6 +8,7 @@ package client
import ( import (
"io" "io"
"net" "net"
"sync"
"time" "time"
"bytes" "bytes"
"strconv" "strconv"
@ -15,8 +16,6 @@ import (
"github.com/mikespook/gearman-go/common" "github.com/mikespook/gearman-go/common"
) )
// Job handler
type JobHandler func(*Job) error
// Status handler // Status handler
// handle, known, running, numerator, denominator // handle, known, running, numerator, denominator
type StatusHandler func(string, bool, bool, uint64, uint64) type StatusHandler func(string, bool, bool, uint64, uint64)
@ -31,15 +30,14 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
*/ */
type Client struct { type Client struct {
ErrHandler common.ErrorHandler ErrHandler common.ErrorHandler
JobHandler JobHandler
StatusHandler StatusHandler
TimeOut time.Duration TimeOut time.Duration
in chan []byte in chan []byte
out chan *Job out chan *Job
jobCreated chan *Job conn net.Conn
conn net.Conn addr string
ai *autoinc.AutoInc ai *autoinc.AutoInc
mutex sync.RWMutex
} }
// Create a new client. // Create a new client.
@ -47,23 +45,125 @@ type Client struct {
// Eg. // Eg.
// client, err := client.New("127.0.0.1:4730") // client, err := client.New("127.0.0.1:4730")
func New(addr string) (client *Client, err error) { func New(addr string) (client *Client, err error) {
conn, err := net.Dial(common.NETWORK, addr)
if err != nil {
return
}
client = &Client{ client = &Client{
jobCreated: make(chan *Job), jobCreated: make(chan *Job),
in: make(chan []byte, common.QUEUE_SIZE), in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE), out: make(chan *Job, common.QUEUE_SIZE),
conn: conn, addr: addr,
ai: autoinc.New(0, 1), ai: autoinc.New(0, 1),
TimeOut: time.Second, TimeOut: time.Second,
} }
if err = client.connect(); err != nil {
return
}
go client.inLoop() go client.inLoop()
go client.outLoop() go client.outLoop()
return return
} }
// {{{ private functions
//
func (client *Client) connect() (err error) {
client.mutex.Lock()
defer client.mutex.Unlock()
client.conn, err = net.Dial(common.NETWORK, addr)
return
}
// Internal write
func (client *Client) write(buf []byte) (err error) {
client.mutex.RLock()
defer client.mutex.RUnlock()
var n int
for i := 0; i < len(buf); i += n {
n, err = client.conn.Write(buf[i:])
if err != nil {
return
}
}
return
}
// read length bytes from the socket
func (client *Client) readData(length int) (data []byte, err error) {
client.mutex.RLock()
defer client.mutex.RUnlock()
n := 0
buf := make([]byte, common.BUFFER_SIZE)
// read until data can be unpacked
for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n {
if n, err = client.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 {
if data == nil {
err = common.ErrConnection
return
}
return data, nil
}
return
}
data = append(data, buf[0:n]...)
if n < common.BUFFER_SIZE {
break
}
}
return
}
// unpack data
func (client *Client) unpack(data []byte) ([]byte, int, bool) {
tl := len(data)
start := 0
for i := 0; i < tl+1-common.PACKET_LEN; i++ {
if start+common.PACKET_LEN > tl { // too few data to unpack, read more
return nil, common.PACKET_LEN, false
}
if string(data[start:start+4]) == common.RES_STR {
l := int(common.BytesToUint32([4]byte{data[start+8],
data[start+9], data[start+10], data[start+11]}))
total := l + common.PACKET_LEN
if total == tl { // data is what we want
return data, common.PACKET_LEN, true
} else if total < tl { // data[:total] is what we want, data[total:] is the more
client.in <- data[total:]
data = data[start:total]
return data, common.PACKET_LEN, true
} else { // ops! It won't be possible.
return nil, total - tl, false
}
} else { // flag was not found, move to next step
start++
}
}
return nil, common.PACKET_LEN, false
}
// Internal read
func (client *Client) read() (rel []byte, err error) {
var data []byte
ok := false
l := common.PACKET_LEN
for !ok {
inlen := len(client.in)
if inlen > 0 {
// in queue is not empty
for i := 0; i < inlen; i++ {
data = append(data, <-client.in...)
}
} else {
var d []byte
d, err = client.readData(l)
if err != nil {
return
}
data = append(data, d...)
}
rel, l, ok = client.unpack(data)
}
return
}
// out loop // out loop
func (client *Client) outLoop() { func (client *Client) outLoop() {
ok := true ok := true
@ -98,66 +198,19 @@ func (client *Client) inLoop() {
case common.ERROR: case common.ERROR:
_, err := common.GetError(job.Data) _, err := common.GetError(job.Data)
client.err(err) client.err(err)
case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION, common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
common.ECHO_RES: client.handleJob(job)
go client.handleJob(job) case common.ECHO_RES:
client.handleEcho(job)
case common.JOB_CREATED: case common.JOB_CREATED:
client.jobCreated <- job client.handleCreated(job)
case common.STATUS_RES: case common.STATUS_RES:
go client.handleStatus(job) client.handleStatus(job)
} }
} }
} }
// inner read
func (client *Client) read() (data []byte, err error) {
if len(client.in) > 0 {
// incoming queue is not empty
data = <-client.in
} else {
// empty queue, read data from socket
for {
buf := make([]byte, common.BUFFER_SIZE)
var n int
if n, err = client.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 {
if data == nil {
err = common.ErrConnection
return
}
break
}
return
}
data = append(data, buf[0:n]...)
if n < common.BUFFER_SIZE {
break
}
}
}
// split package
tl := len(data)
start, end := 0, 4
for i := 0; i < tl; i++ {
if string(data[start:end]) == common.RES_STR {
l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]}))
total := l + 12
if total == tl {
return
} else {
client.in <- data[total:]
data = data[:total]
return
}
} else {
start++
end++
}
}
return nil, common.Errorf("Invalid data: %V", data)
}
// error handler // error handler
func (client *Client) err (e error) { func (client *Client) err (e error) {
if client.ErrHandler != nil { if client.ErrHandler != nil {
@ -167,16 +220,11 @@ func (client *Client) err (e error) {
// job handler // job handler
func (client *Client) handleJob(job *Job) { func (client *Client) handleJob(job *Job) {
if client.JobHandler != nil {
if err := client.JobHandler(job); err != nil {
client.err(err)
}
}
} }
// status handler // status handler
func (client *Client) handleStatus(job *Job) { func (client *Client) handleStatus(job *Job) {
if client.StatusHandler != nil {
data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
if len(data) != 5 { if len(data) != 5 {
client.err(common.Errorf("Invalid data: %V", job.Data)) client.err(common.Errorf("Invalid data: %V", job.Data))
@ -195,65 +243,6 @@ func (client *Client) handleStatus(job *Job) {
client.err(common.Errorf("Invalid handle: %s", data[4][0])) client.err(common.Errorf("Invalid handle: %s", data[4][0]))
return return
} }
client.StatusHandler(handle, known, running, numerator, denominator)
}
}
// Do the function.
// funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
// and if it is background job: JOB_BG.
// JOB_LOW | JOB_BG means the job is running with low level in background.
func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) {
var datatype uint32
if flag & JOB_LOW == JOB_LOW {
if flag & JOB_BG == JOB_BG {
datatype = common.SUBMIT_JOB_LOW_BG
} else {
datatype = common.SUBMIT_JOB_LOW
}
} else if flag & JOB_HIGH == JOB_HIGH {
if flag & JOB_BG == JOB_BG {
datatype = common.SUBMIT_JOB_HIGH_BG
} else {
datatype = common.SUBMIT_JOB_HIGH
}
} else if flag & JOB_BG == JOB_BG {
datatype = common.SUBMIT_JOB_BG
} else {
datatype = common.SUBMIT_JOB
}
uid := strconv.Itoa(int(client.ai.Id()))
l := len(funcname) + len(uid) + len(data) + 2
rel := make([]byte, 0, l)
rel = append(rel, []byte(funcname)...) // len(funcname)
rel = append(rel, '\x00') // 1 Byte
rel = append(rel, []byte(uid)...) // len(uid)
rel = append(rel, '\x00') // 1 Byte
rel = append(rel, data...) // len(data)
client.writeJob(newJob(common.REQ, datatype, rel))
// Waiting for JOB_CREATED
select {
case job := <-client.jobCreated:
return string(job.Data), nil
case <-time.After(client.TimeOut):
return "", common.ErrJobTimeOut
}
return
}
// Get job status from job server.
// !!!Not fully tested.!!!
func (client *Client) Status(handle string) {
job := newJob(common.REQ, common.GET_STATUS, []byte(handle))
client.writeJob(job)
}
// Send a something out, get the samething back.
func (client *Client) Echo(data []byte) {
client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
} }
// Send the job to job server. // Send the job to job server.
@ -261,21 +250,71 @@ func (client *Client) writeJob(job *Job) {
client.out <- job client.out <- job
} }
// Internal write // Internal do
func (client *Client) write(buf []byte) (err error) { func (client *Client) do(funcname string, data []byte, flag uint32) (id int) {
var n int id = strconv.Itoa(int(client.ai.Id()))
for i := 0; i < len(buf); i += n { l := len(funcname) + len(id) + len(data) + 2
n, err = client.conn.Write(buf[i:]) rel := make([]byte, 0, l)
if err != nil { rel = append(rel, []byte(funcname)...) // len(funcname)
return rel = append(rel, '\x00') // 1 Byte
} rel = append(rel, []byte(id)...) // len(uid)
} rel = append(rel, '\x00') // 1 Byte
rel = append(rel, data...) // len(data)
client.writeJob(newJob(common.REQ, flag, rel))
return return
} }
// }}}
// Do the function.
// funcname is a string with function name.
// data is encoding to byte array.
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
// and if it is background job: JOB_BG.
// JOB_LOW | JOB_BG means the job is running with low level in background.
func (client *Client) Do(funcname string, data []byte, flag byte)
(handle string, err error) {
var datatype uint32
switch flag {
case JOB_LOW :
datatype = common.SUBMIT_JOB_LOW
case JOB_HIGH :
datatype = common.SUBMIT_JOB_HIGH
default:
datatype = common.SUBMIT_JOB
}
client.do(funcname, data, datatype)
}
func (client *Client) DoBg(funcname string, data []byte, flag byte)
(handle string, err error) {
var datatype uint32
switch flag {
case JOB_LOW :
datatype = common.SUBMIT_JOB_LOW_BG
case JOB_HIGH :
datatype = common.SUBMIT_JOB_HIGH_BG
default:
datatype = common.SUBMIT_JOB_BG
}
client.do(funcname, data, datatype)
}
// Get job status from job server.
// !!!Not fully tested.!!!
func (client *Client) Status(handle string, handler StatusHandler) {
client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle)))
}
// Send a something out, get the samething back.
func (client *Client) Echo(data []byte, handler JobHandler) (echo []byte) {
client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
client.echo
}
// Close // Close
func (client *Client) Close() (err error) { func (client *Client) Close() (err error) {
close(client.jobCreated)
close(client.in) close(client.in)
close(client.out) close(client.out)
return client.conn.Close(); return client.conn.Close();

View File

@ -13,31 +13,80 @@ func TestClientAddServer(t *testing.T) {
t.Error(err) t.Error(err)
} }
} }
/*
func TestClientEcho(t *testing.T) { func TestClientEcho(t *testing.T) {
client.JobHandler = func(job *Job) error { echoHandler = func(job *Job) {
echo := string(job.Data) echo := string(job.Data)
if echo == "Hello world" { if echo == "Hello world" {
t.Log(echo) t.Log(echo)
} else { } else {
t.Errorf("Invalid echo data: %s", job.Data) t.Errorf("Invalid echo data: %s", job.Data)
} }
return nil return
} }
client.Echo([]byte("Hello world")) client.Echo([]byte("Hello world"), echoHandler)
} }
*/
/* func TestClientDoBg(t *testing.T) {
func TestClientDo(t *testing.T) { if handle, err := client.DoBg("ToUpper", []byte("abcdef"),
if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { JOB_LOW); err != nil {
t.Error(err) t.Error(err)
} else { } else {
t.Log(handle) t.Log(handle)
} }
} }
*/
func TestClientDo(t *testing.T) {
jobHandler := func(job *Job) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
}
return
}
if handle, err := client.Do("ToUpper", []byte("abcdef"),
JOB_LOW, jobHandler); err != nil {
t.Error(err)
} else {
t.Log(handle)
}
}
func TestClientStatus(t *testing.T) {
statusHandler = func(handler string, known bool,
running bool, numerator uint64, denominator uint64) {
if known {
t.Errorf("The job (%s) shouldn't be known.", handler)
}
if running {
t.Errorf("The job (%s) shouldn't be running.", handler)
}
}
client.Status("handle not exists", statusHandler)
if handle, err := client.Do("Delay5sec", []byte("abcdef"),
JOB_LOW, nil); err != nil {
t.Error(err)
} else {
t.Log(handle)
statusHandler = func(handler string, known bool,
running bool, numerator uint64, denominator uint64) {
if !known {
t.Errorf("The job (%s) shouldn be known.", handler)
}
if !running {
t.Errorf("The job (%s) shouldn be running.", handler)
}
}
client.Status(handle, statusHandler)
}
}
func TestClientClose(t *testing.T) { func TestClientClose(t *testing.T) {
return
if err := client.Close(); err != nil { if err := client.Close(); err != nil {
t.Error(err) t.Error(err)
} }

4
client/handler.go Normal file
View File

@ -0,0 +1,4 @@
package client
// Job handler
type JobHandler func(*Job)