runabled 0.2
This commit is contained in:
parent
e5c30068cd
commit
4997e30a77
@ -6,10 +6,10 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/mikespook/golib/idgen"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"github.com/mikespook/golib/idgen"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -23,7 +23,7 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
|
|||||||
type Client struct {
|
type Client struct {
|
||||||
net, addr string
|
net, addr string
|
||||||
respHandler map[string]ResponseHandler
|
respHandler map[string]ResponseHandler
|
||||||
createdHandler ResponseHandler
|
innerHandler map[string]ResponseHandler
|
||||||
in chan []byte
|
in chan []byte
|
||||||
isConn bool
|
isConn bool
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
@ -42,7 +42,9 @@ func New(net, addr string) (client *Client, err error) {
|
|||||||
net: net,
|
net: net,
|
||||||
addr: addr,
|
addr: addr,
|
||||||
respHandler: make(map[string]ResponseHandler, QUEUE_SIZE),
|
respHandler: make(map[string]ResponseHandler, QUEUE_SIZE),
|
||||||
|
innerHandler: make(map[string]ResponseHandler, QUEUE_SIZE),
|
||||||
in: make(chan []byte, QUEUE_SIZE),
|
in: make(chan []byte, QUEUE_SIZE),
|
||||||
|
IdGen: idgen.NewObjectId(),
|
||||||
}
|
}
|
||||||
if err = client.connect(); err != nil {
|
if err = client.connect(); err != nil {
|
||||||
return
|
return
|
||||||
@ -119,14 +121,14 @@ func (client *Client) readLoop() {
|
|||||||
|
|
||||||
// decode data & process it
|
// decode data & process it
|
||||||
func (client *Client) processLoop() {
|
func (client *Client) processLoop() {
|
||||||
var resp *response
|
var resp *Response
|
||||||
var l int
|
var l int
|
||||||
var err error
|
var err error
|
||||||
var data, leftdata []byte
|
var data, leftdata []byte
|
||||||
for data = range client.in {
|
for data = range client.in {
|
||||||
l = len(data)
|
l = len(data)
|
||||||
if len(leftdata) > 0 { // some data left for processing
|
if len(leftdata) > 0 { // some data left for processing
|
||||||
data = append(leftdata, data ...)
|
data = append(leftdata, data...)
|
||||||
}
|
}
|
||||||
if l < MIN_PACKET_LEN { // not enough data
|
if l < MIN_PACKET_LEN { // not enough data
|
||||||
leftdata = data
|
leftdata = data
|
||||||
@ -137,9 +139,15 @@ func (client *Client) processLoop() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
switch resp.DataType {
|
switch resp.DataType {
|
||||||
|
case STATUS_RES:
|
||||||
|
client.handleInner("status-" + resp.Handle, resp)
|
||||||
|
case JOB_CREATED:
|
||||||
|
client.handleInner("created", resp)
|
||||||
|
case ECHO_RES:
|
||||||
|
client.handleInner("echo", resp)
|
||||||
case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE,
|
case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE,
|
||||||
WORK_FAIL, WORK_EXCEPTION:
|
WORK_FAIL, WORK_EXCEPTION:
|
||||||
client.handleResponse(string(resp.Handle), resp)
|
client.handleResponse(resp.Handle, resp)
|
||||||
}
|
}
|
||||||
if len(data) > l {
|
if len(data) > l {
|
||||||
leftdata = data[l:]
|
leftdata = data[l:]
|
||||||
@ -155,28 +163,37 @@ func (client *Client) err(e error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// job handler
|
// job handler
|
||||||
func (client *Client) handleResponse(key string, resp *response) {
|
func (client *Client) handleResponse(key string, resp *Response) {
|
||||||
client.mutex.RLock()
|
client.mutex.RLock()
|
||||||
defer client.mutex.RUnlock()
|
defer client.mutex.RUnlock()
|
||||||
if h, ok := client.respHandler[key]; ok {
|
if h, ok := client.respHandler[key]; ok {
|
||||||
h(resp)
|
h(resp)
|
||||||
delete(client.respHandler, string(resp.Handle))
|
delete(client.respHandler, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// job handler
|
||||||
|
func (client *Client) handleInner(key string, resp *Response) {
|
||||||
|
if h, ok := client.innerHandler[key]; ok {
|
||||||
|
h(resp)
|
||||||
|
delete(client.innerHandler, key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Internal do
|
// Internal do
|
||||||
func (client *Client) do(funcname string, data []byte,
|
func (client *Client) do(funcname string, data []byte,
|
||||||
flag uint32) (handle []byte) {
|
flag uint32) (handle string) {
|
||||||
req := getJob(funcname, client.IdGen.Id().(string), data)
|
id := client.IdGen.Id().(string)
|
||||||
client.mutex.Lock()
|
req := getJob(funcname, id, data)
|
||||||
defer client.mutex.Unlock()
|
|
||||||
client.write(req)
|
client.write(req)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
client.createdHandler = func(resp *response) {
|
client.mutex.RLock()
|
||||||
|
client.innerHandler["created"] = ResponseHandler(func(resp *Response) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
defer client.mutex.RUnlock()
|
||||||
handle = resp.Handle
|
handle = resp.Handle
|
||||||
}
|
})
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -188,7 +205,7 @@ func (client *Client) do(funcname string, data []byte,
|
|||||||
// data is encoding to byte array.
|
// data is encoding to byte array.
|
||||||
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH
|
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH
|
||||||
func (client *Client) Do(funcname string, data []byte,
|
func (client *Client) Do(funcname string, data []byte,
|
||||||
flag byte, h ResponseHandler) (handle []byte) {
|
flag byte, h ResponseHandler) (handle string) {
|
||||||
var datatype uint32
|
var datatype uint32
|
||||||
switch flag {
|
switch flag {
|
||||||
case JOB_LOW:
|
case JOB_LOW:
|
||||||
@ -202,7 +219,7 @@ func (client *Client) Do(funcname string, data []byte,
|
|||||||
client.mutex.Lock()
|
client.mutex.Lock()
|
||||||
defer client.mutex.Unlock()
|
defer client.mutex.Unlock()
|
||||||
if h != nil {
|
if h != nil {
|
||||||
client.respHandler[string(handle)] = h
|
client.respHandler[handle] = h
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -212,7 +229,7 @@ func (client *Client) Do(funcname string, data []byte,
|
|||||||
// data is encoding to byte array.
|
// data is encoding to byte array.
|
||||||
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH
|
// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH
|
||||||
func (client *Client) DoBg(funcname string, data []byte,
|
func (client *Client) DoBg(funcname string, data []byte,
|
||||||
flag byte) (handle []byte) {
|
flag byte) (handle string) {
|
||||||
var datatype uint32
|
var datatype uint32
|
||||||
switch flag {
|
switch flag {
|
||||||
case JOB_LOW:
|
case JOB_LOW:
|
||||||
@ -228,26 +245,42 @@ func (client *Client) DoBg(funcname string, data []byte,
|
|||||||
|
|
||||||
// Get job status from job server.
|
// Get job status from job server.
|
||||||
// !!!Not fully tested.!!!
|
// !!!Not fully tested.!!!
|
||||||
func (client *Client) Status(handle []byte, h ResponseHandler) (err error) {
|
func (client *Client) Status(handle string, h ResponseHandler) (status *Status, err error) {
|
||||||
req := getRequest()
|
req := getRequest()
|
||||||
req.DataType = GET_STATUS
|
req.DataType = GET_STATUS
|
||||||
req.Data = handle
|
req.Data = []byte(handle)
|
||||||
client.write(req)
|
client.write(req)
|
||||||
if h != nil {
|
var wg sync.WaitGroup
|
||||||
client.respHandler["status-" + string(handle)] = h
|
wg.Add(1)
|
||||||
|
client.mutex.Lock()
|
||||||
|
client.innerHandler["status-" + handle] = ResponseHandler(func(resp *Response) {
|
||||||
|
defer wg.Done()
|
||||||
|
defer client.mutex.Unlock()
|
||||||
|
var err error
|
||||||
|
status, err = resp.Status()
|
||||||
|
if err != nil {
|
||||||
|
client.err(err)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
wg.Wait()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a something out, get the samething back.
|
// Send a something out, get the samething back.
|
||||||
func (client *Client) Echo(data []byte, h ResponseHandler) (err error) {
|
func (client *Client) Echo(data []byte) (echo []byte, err error) {
|
||||||
req := getRequest()
|
req := getRequest()
|
||||||
req.DataType = ECHO_REQ
|
req.DataType = ECHO_REQ
|
||||||
req.Data = data
|
req.Data = data
|
||||||
client.write(req)
|
client.write(req)
|
||||||
if h != nil {
|
var wg sync.WaitGroup
|
||||||
client.respHandler["echo"] = h
|
wg.Add(1)
|
||||||
}
|
client.mutex.Lock()
|
||||||
|
client.innerHandler["echo"] = ResponseHandler(func(resp *Response) {
|
||||||
|
defer wg.Done()
|
||||||
|
defer client.mutex.Unlock()
|
||||||
|
echo = resp.Data
|
||||||
|
})
|
||||||
|
wg.Wait()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,34 +1,34 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var client *Client
|
var client *Client
|
||||||
|
|
||||||
|
func printHandle(resp *Response) {
|
||||||
|
fmt.Printf("%V", resp)
|
||||||
|
}
|
||||||
|
|
||||||
func TestClientAddServer(t *testing.T) {
|
func TestClientAddServer(t *testing.T) {
|
||||||
t.Log("Add local server 127.0.0.1:4730")
|
t.Log("Add local server 127.0.0.1:4730")
|
||||||
var err error
|
var err error
|
||||||
if client, err = New("127.0.0.1:4730"); err != nil {
|
if client, err = New("tcp4", "127.0.0.1:4730"); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
client.ErrHandler = func(e error) {
|
client.ErrorHandler = func(e error) {
|
||||||
t.Log(e)
|
t.Log(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestClientEcho(t *testing.T) {
|
func TestClientEcho(t *testing.T) {
|
||||||
echo, err := client.Echo([]byte("Hello world"), time.Second)
|
err := client.Echo([]byte("Hello world"), printHandle)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if string(echo) != "Hello world" {
|
|
||||||
t.Errorf("Invalid echo data: %s", echo)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestClientDoBg(t *testing.T) {
|
func TestClientDoBg(t *testing.T) {
|
||||||
@ -39,7 +39,7 @@ func TestClientDoBg(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestClientDo(t *testing.T) {
|
func TestClientDo(t *testing.T) {
|
||||||
jobHandler := func(job *Job) {
|
jobHandler := func(job *Response) {
|
||||||
str := string(job.Data)
|
str := string(job.Data)
|
||||||
if str == "ABCDEF" {
|
if str == "ABCDEF" {
|
||||||
t.Log(str)
|
t.Log(str)
|
||||||
@ -58,11 +58,12 @@ func TestClientDo(t *testing.T) {
|
|||||||
|
|
||||||
func TestClientStatus(t *testing.T) {
|
func TestClientStatus(t *testing.T) {
|
||||||
|
|
||||||
s1, err := client.Status("handle not exists", time.Second)
|
err := client.Status("handle not exists", printHandle)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
if s1.Known {
|
if s1.Known {
|
||||||
t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
|
t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
|
||||||
return
|
return
|
||||||
@ -70,10 +71,11 @@ func TestClientStatus(t *testing.T) {
|
|||||||
if s1.Running {
|
if s1.Running {
|
||||||
t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
|
t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
|
||||||
return
|
return
|
||||||
}
|
}*/
|
||||||
|
|
||||||
handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil)
|
handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil)
|
||||||
s2, err := client.Status(handle, time.Second)
|
err = client.Status(handle, printHandle)
|
||||||
|
/*
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -86,6 +88,7 @@ func TestClientStatus(t *testing.T) {
|
|||||||
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
|
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestClientClose(t *testing.T) {
|
func TestClientClose(t *testing.T) {
|
||||||
|
@ -1,48 +0,0 @@
|
|||||||
package client
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
testCase = map[uint32][4]byte{
|
|
||||||
0: [...]byte{0, 0, 0, 0},
|
|
||||||
1: [...]byte{0, 0, 0, 1},
|
|
||||||
256: [...]byte{0, 0, 1, 0},
|
|
||||||
256 * 256: [...]byte{0, 1, 0, 0},
|
|
||||||
256 * 256 * 256: [...]byte{1, 0, 0, 0},
|
|
||||||
256*256*256 + 256*256 + 256 + 1: [...]byte{1, 1, 1, 1},
|
|
||||||
4294967295: [...]byte{0xFF, 0xFF, 0xFF, 0xFF},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestUint32ToBytes(t *testing.T) {
|
|
||||||
for k, v := range testCase {
|
|
||||||
b := Uint32ToBytes(k)
|
|
||||||
if bytes.Compare(b[:], v[:]) != 0 {
|
|
||||||
t.Errorf("%v was expected, but %v was got", v, b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBytesToUint32s(t *testing.T) {
|
|
||||||
for k, v := range testCase {
|
|
||||||
u := BytesToUint32([4]byte(v))
|
|
||||||
if u != k {
|
|
||||||
t.Errorf("%v was expected, but %v was got", k, u)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkByteToUnit32(b *testing.B) {
|
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
BytesToUint32([4]byte{0xF, 0xF, 0xF, 0xF})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkUint32ToByte(b *testing.B) {
|
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
Uint32ToBytes(123456)
|
|
||||||
}
|
|
||||||
}
|
|
@ -9,8 +9,8 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"syscall"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
// Response handler
|
// Response handler
|
||||||
type ResponseHandler func(*response)
|
type ResponseHandler func(*Response)
|
||||||
|
|
||||||
// Error handler
|
// Error handler
|
||||||
type ErrorHandler func(error)
|
type ErrorHandler func(error)
|
||||||
|
|
||||||
|
35
client/id.go
35
client/id.go
@ -1,35 +0,0 @@
|
|||||||
package client
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/mikespook/golib/autoinc"
|
|
||||||
"labix.org/v2/mgo/bson"
|
|
||||||
"strconv"
|
|
||||||
)
|
|
||||||
|
|
||||||
type IdGenerator interface {
|
|
||||||
Id() string
|
|
||||||
}
|
|
||||||
|
|
||||||
// ObjectId
|
|
||||||
type objectId struct{}
|
|
||||||
|
|
||||||
func (id *objectId) Id() string {
|
|
||||||
return bson.NewObjectId().Hex()
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewObjectId() IdGenerator {
|
|
||||||
return &objectId{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AutoIncId
|
|
||||||
type autoincId struct {
|
|
||||||
*autoinc.AutoInc
|
|
||||||
}
|
|
||||||
|
|
||||||
func (id *autoincId) Id() string {
|
|
||||||
return strconv.Itoa(id.AutoInc.Id())
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewAutoIncId() IdGenerator {
|
|
||||||
return &autoincId{autoinc.New(1, 1)}
|
|
||||||
}
|
|
@ -94,7 +94,7 @@ func (pool *Pool) Remove(addr string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pool *Pool) Do(funcname string, data []byte,
|
func (pool *Pool) Do(funcname string, data []byte,
|
||||||
flag byte, h ResponseHandler) (addr string, handle []byte) {
|
flag byte, h ResponseHandler) (addr, handle string) {
|
||||||
client := pool.selectServer()
|
client := pool.selectServer()
|
||||||
handle = client.Do(funcname, data, flag, h)
|
handle = client.Do(funcname, data, flag, h)
|
||||||
addr = client.addr
|
addr = client.addr
|
||||||
@ -102,7 +102,7 @@ func (pool *Pool) Do(funcname string, data []byte,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pool *Pool) DoBg(funcname string, data []byte,
|
func (pool *Pool) DoBg(funcname string, data []byte,
|
||||||
flag byte) (addr string, handle []byte) {
|
flag byte) (addr, handle string) {
|
||||||
client := pool.selectServer()
|
client := pool.selectServer()
|
||||||
handle = client.DoBg(funcname, data, flag)
|
handle = client.DoBg(funcname, data, flag)
|
||||||
addr = client.addr
|
addr = client.addr
|
||||||
@ -111,9 +111,9 @@ func (pool *Pool) DoBg(funcname string, data []byte,
|
|||||||
|
|
||||||
// Get job status from job server.
|
// Get job status from job server.
|
||||||
// !!!Not fully tested.!!!
|
// !!!Not fully tested.!!!
|
||||||
func (pool *Pool) Status(addr string, handle []byte, h ResponseHandler) (err error) {
|
func (pool *Pool) Status(addr, handle string, h ResponseHandler) (status *Status, err error) {
|
||||||
if client, ok := pool.clients[addr]; ok {
|
if client, ok := pool.clients[addr]; ok {
|
||||||
err = client.Status(handle, h)
|
status, err = client.Status(handle, h)
|
||||||
} else {
|
} else {
|
||||||
err = ErrNotFound
|
err = ErrNotFound
|
||||||
}
|
}
|
||||||
@ -121,7 +121,7 @@ func (pool *Pool) Status(addr string, handle []byte, h ResponseHandler) (err err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send a something out, get the samething back.
|
// Send a something out, get the samething back.
|
||||||
func (pool *Pool) Echo(addr string, data []byte, h ResponseHandler) (r []byte, err error) {
|
func (pool *Pool) Echo(addr string, data []byte, h ResponseHandler) (echo []byte, err error) {
|
||||||
var client *poolClient
|
var client *poolClient
|
||||||
if addr == "" {
|
if addr == "" {
|
||||||
client = pool.selectServer()
|
client = pool.selectServer()
|
||||||
@ -132,7 +132,7 @@ func (pool *Pool) Echo(addr string, data []byte, h ResponseHandler) (r []byte, e
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = client.Echo(data, h)
|
echo, err = client.Echo(data)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,7 +2,6 @@ package client
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -11,10 +10,10 @@ var (
|
|||||||
|
|
||||||
func TestPoolAdd(t *testing.T) {
|
func TestPoolAdd(t *testing.T) {
|
||||||
t.Log("Add servers")
|
t.Log("Add servers")
|
||||||
if err := pool.Add("127.0.0.1:4730", 1); err != nil {
|
if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
if err := pool.Add("127.0.0.1:4730", 1); err != nil {
|
if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
if len(pool.clients) != 2 {
|
if len(pool.clients) != 2 {
|
||||||
@ -23,7 +22,7 @@ func TestPoolAdd(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolEcho(t *testing.T) {
|
func TestPoolEcho(t *testing.T) {
|
||||||
echo, err := pool.Echo("", []byte("Hello pool"), time.Second)
|
echo, err := pool.Echo("", []byte("Hello pool"), printHandle)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -33,7 +32,7 @@ func TestPoolEcho(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = pool.Echo("not exists", []byte("Hello pool"), time.Second)
|
_, err = pool.Echo("not exists", []byte("Hello pool"), printHandle)
|
||||||
if err != ErrNotFound {
|
if err != ErrNotFound {
|
||||||
t.Errorf("ErrNotFound expected, got %s", err)
|
t.Errorf("ErrNotFound expected, got %s", err)
|
||||||
}
|
}
|
||||||
@ -49,7 +48,7 @@ func TestPoolDoBg(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolDo(t *testing.T) {
|
func TestPoolDo(t *testing.T) {
|
||||||
jobHandler := func(job *Job) {
|
jobHandler := func(job *Response) {
|
||||||
str := string(job.Data)
|
str := string(job.Data)
|
||||||
if str == "ABCDEF" {
|
if str == "ABCDEF" {
|
||||||
t.Log(str)
|
t.Log(str)
|
||||||
@ -67,33 +66,34 @@ func TestPoolDo(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPoolStatus(t *testing.T) {
|
func TestPoolStatus(t *testing.T) {
|
||||||
s1, err := pool.Status("127.0.0.1:4730", "handle not exists", time.Second)
|
err := pool.Status("127.0.0.1:4730", "handle not exists", printHandle)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
if s1.Known {
|
if s1.Known {
|
||||||
t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
|
t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
|
||||||
}
|
}
|
||||||
if s1.Running {
|
if s1.Running {
|
||||||
t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
|
t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil)
|
addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil)
|
||||||
s2, err := pool.Status(addr, handle, time.Second)
|
err = pool.Status(addr, handle, printHandle)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
if !s2.Known {
|
if !s2.Known {
|
||||||
t.Errorf("The job (%s) should be known.", s2.Handle)
|
t.Errorf("The job (%s) should be known.", s2.Handle)
|
||||||
}
|
}
|
||||||
if s2.Running {
|
if s2.Running {
|
||||||
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
|
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
_, err = pool.Status("not exists", "not exists", time.Second)
|
err = pool.Status("not exists", "not exists", printHandle)
|
||||||
if err != ErrNotFound {
|
if err != ErrNotFound {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
@ -18,12 +18,12 @@ type request struct {
|
|||||||
// Encode a Request to byte slice
|
// Encode a Request to byte slice
|
||||||
func (req *request) Encode() (data []byte) {
|
func (req *request) Encode() (data []byte) {
|
||||||
l := len(req.Data) // length of data
|
l := len(req.Data) // length of data
|
||||||
tl := l + 12 // add 12 bytes head
|
tl := l + MIN_PACKET_LEN // add 12 bytes head
|
||||||
data = getBuffer(tl)
|
data = getBuffer(tl)
|
||||||
copy(data[:4], REQ_STR)
|
copy(data[:4], REQ_STR)
|
||||||
binary.BigEndian.PutUint32(data[4:8], req.DataType)
|
binary.BigEndian.PutUint32(data[4:8], req.DataType)
|
||||||
binary.BigEndian.PutUint32(data[8:12], uint32(l))
|
binary.BigEndian.PutUint32(data[8:12], uint32(l))
|
||||||
copy(data[12:], req.Data)
|
copy(data[MIN_PACKET_LEN:], req.Data)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,17 +6,17 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"bytes"
|
"bytes"
|
||||||
"strconv"
|
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
// response
|
// response
|
||||||
type response struct {
|
type Response struct {
|
||||||
DataType uint32
|
DataType uint32
|
||||||
Data, Handle []byte
|
Data []byte
|
||||||
UID string
|
UID, Handle string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract the Response's result.
|
// Extract the Response's result.
|
||||||
@ -24,10 +24,10 @@ type response struct {
|
|||||||
// if data != nil, err != nil, then worker has a exception
|
// if data != nil, err != nil, then worker has a exception
|
||||||
// if data != nil, err == nil, then worker complate job
|
// if data != nil, err == nil, then worker complate job
|
||||||
// after calling this method, the Response.Handle will be filled
|
// after calling this method, the Response.Handle will be filled
|
||||||
func (resp *response) Result() (data []byte, err error) {
|
func (resp *Response) Result() (data []byte, err error) {
|
||||||
switch resp.DataType {
|
switch resp.DataType {
|
||||||
case WORK_FAIL:
|
case WORK_FAIL:
|
||||||
resp.Handle = resp.Data
|
resp.Handle = string(resp.Data)
|
||||||
err = ErrWorkFail
|
err = ErrWorkFail
|
||||||
return
|
return
|
||||||
case WORK_EXCEPTION:
|
case WORK_EXCEPTION:
|
||||||
@ -39,7 +39,7 @@ func (resp *response) Result() (data []byte, err error) {
|
|||||||
err = fmt.Errorf("Invalid data: %V", resp.Data)
|
err = fmt.Errorf("Invalid data: %V", resp.Data)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp.Handle = s[0]
|
resp.Handle = string(s[0])
|
||||||
data = s[1]
|
data = s[1]
|
||||||
default:
|
default:
|
||||||
err = ErrDataType
|
err = ErrDataType
|
||||||
@ -48,7 +48,7 @@ func (resp *response) Result() (data []byte, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Extract the job's update
|
// Extract the job's update
|
||||||
func (resp *response) Update() (data []byte, err error) {
|
func (resp *Response) Update() (data []byte, err error) {
|
||||||
if resp.DataType != WORK_DATA &&
|
if resp.DataType != WORK_DATA &&
|
||||||
resp.DataType != WORK_WARNING {
|
resp.DataType != WORK_WARNING {
|
||||||
err = ErrDataType
|
err = ErrDataType
|
||||||
@ -62,19 +62,19 @@ func (resp *response) Update() (data []byte, err error) {
|
|||||||
if resp.DataType == WORK_WARNING {
|
if resp.DataType == WORK_WARNING {
|
||||||
err = ErrWorkWarning
|
err = ErrWorkWarning
|
||||||
}
|
}
|
||||||
resp.Handle = s[0]
|
resp.Handle = string(s[0])
|
||||||
data = s[1]
|
data = s[1]
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode a job from byte slice
|
// Decode a job from byte slice
|
||||||
func decodeResponse(data []byte) (resp *response, l int, err error) {
|
func decodeResponse(data []byte) (resp *Response, l int, err error) {
|
||||||
if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes
|
if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes
|
||||||
err = fmt.Errorf("Invalid data: %V", data)
|
err = fmt.Errorf("Invalid data: %V", data)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dl := int(binary.BigEndian.Uint32(data[8:12]))
|
dl := int(binary.BigEndian.Uint32(data[8:12]))
|
||||||
dt := data[MIN_PACKET_LEN:dl+MIN_PACKET_LEN]
|
dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN]
|
||||||
if len(dt) != int(dl) { // length not equal
|
if len(dt) != int(dl) { // length not equal
|
||||||
err = fmt.Errorf("Invalid data: %V", data)
|
err = fmt.Errorf("Invalid data: %V", data)
|
||||||
return
|
return
|
||||||
@ -82,11 +82,13 @@ func decodeResponse(data []byte) (resp *response, l int, err error) {
|
|||||||
resp = getResponse()
|
resp = getResponse()
|
||||||
resp.DataType = binary.BigEndian.Uint32(data[4:8])
|
resp.DataType = binary.BigEndian.Uint32(data[4:8])
|
||||||
switch resp.DataType {
|
switch resp.DataType {
|
||||||
|
case ECHO_RES:
|
||||||
|
resp.Data = dt
|
||||||
case WORK_DATA, WORK_WARNING, WORK_STATUS,
|
case WORK_DATA, WORK_WARNING, WORK_STATUS,
|
||||||
WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION:
|
WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION:
|
||||||
s := bytes.SplitN(data, []byte{'\x00'}, 2)
|
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
|
||||||
if len(s) >= 2 {
|
if len(s) >= 2 {
|
||||||
resp.Handle = s[0]
|
resp.Handle = string(s[0])
|
||||||
resp.Data = s[1]
|
resp.Data = s[1]
|
||||||
} else {
|
} else {
|
||||||
err = fmt.Errorf("Invalid data: %V", data)
|
err = fmt.Errorf("Invalid data: %V", data)
|
||||||
@ -97,8 +99,16 @@ func decodeResponse(data []byte) (resp *response, l int, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (resp *Response) IsEcho() bool {
|
||||||
|
return resp.DataType == ECHO_RES
|
||||||
|
}
|
||||||
|
|
||||||
|
func (resp *Response) IsStatus() bool {
|
||||||
|
return resp.DataType == STATUS_RES
|
||||||
|
}
|
||||||
|
|
||||||
// status handler
|
// status handler
|
||||||
func (resp *response) Status() (status *Status, err error) {
|
func (resp *Response) Status() (status *Status, err error) {
|
||||||
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 5)
|
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 5)
|
||||||
if len(data) != 5 {
|
if len(data) != 5 {
|
||||||
err = fmt.Errorf("Invalid data: %V", resp.Data)
|
err = fmt.Errorf("Invalid data: %V", resp.Data)
|
||||||
@ -121,9 +131,8 @@ func (resp *response) Status() (status *Status, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getResponse() (resp *Response) {
|
||||||
func getResponse() (resp *response) {
|
|
||||||
// TODO add a pool
|
// TODO add a pool
|
||||||
resp = &response{}
|
resp = &Response{}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user