add timeout to client.Do

This commit is contained in:
mikespook 2012-05-24 19:21:30 +08:00
parent ba2c3721c8
commit f14964bc81
5 changed files with 52 additions and 55 deletions

View File

@ -56,3 +56,8 @@ http://twitter.com/mikespook
* 0.1 Refactoring code, redesign the API. * 0.1 Refactoring code, redesign the API.
* 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API. * 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API.
# TODO
* Can not grab a job correctly.
* The worker should can reconnect to the job server.

View File

@ -8,6 +8,7 @@ package client
import ( import (
"io" "io"
"net" "net"
"time"
"bytes" "bytes"
"strconv" "strconv"
"bitbucket.org/mikespook/golib/autoinc" "bitbucket.org/mikespook/golib/autoinc"
@ -32,6 +33,7 @@ type Client struct {
ErrHandler common.ErrorHandler ErrHandler common.ErrorHandler
JobHandler JobHandler JobHandler JobHandler
StatusHandler StatusHandler StatusHandler StatusHandler
TimeOut time.Duration
in chan []byte in chan []byte
out chan *Job out chan *Job
@ -55,6 +57,7 @@ func New(addr string) (client *Client, err error) {
out: make(chan *Job, common.QUEUE_SIZE), out: make(chan *Job, common.QUEUE_SIZE),
conn: conn, conn: conn,
ai: autoinc.New(0, 1), ai: autoinc.New(0, 1),
TimeOut: time.Second,
} }
go client.inLoop() go client.inLoop()
go client.outLoop() go client.outLoop()
@ -75,9 +78,14 @@ func (client *Client) outLoop() {
// in loop // in loop
func (client *Client) inLoop() { func (client *Client) inLoop() {
defer common.DisablePanic()
for { for {
rel, err := client.read() rel, err := client.read()
if err != nil { if err != nil {
if err == common.ErrEmptyReading {
client.Close()
break
}
client.err(err) client.err(err)
continue continue
} }
@ -227,8 +235,20 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string
rel = append(rel, data...) // len(data) rel = append(rel, data...) // len(data)
client.writeJob(newJob(common.REQ, datatype, rel)) client.writeJob(newJob(common.REQ, datatype, rel))
// Waiting for JOB_CREATED // Waiting for JOB_CREATED
job := <-client.jobCreated timeout := make(chan bool)
defer close(timeout)
go func() {
defer common.DisablePanic()
time.Sleep(client.TimeOut)
timeout <- true
}()
select {
case job := <-client.jobCreated:
return string(job.Data), nil return string(job.Data), nil
case <-timeout:
return "", common.ErrJobTimeOut
}
return
} }
// Get job status from job server. // Get job status from job server.

View File

@ -12,11 +12,8 @@ func TestClientAddServer(t *testing.T) {
if client, err = New("127.0.0.1:4730"); err != nil { if client, err = New("127.0.0.1:4730"); err != nil {
t.Error(err) t.Error(err)
} }
client.ErrHandler = func(e error) {
t.Error(e)
} }
} /*
func TestClientEcho(t *testing.T) { func TestClientEcho(t *testing.T) {
client.JobHandler = func(job *Job) error { client.JobHandler = func(job *Job) error {
echo := string(job.Data) echo := string(job.Data)
@ -29,7 +26,8 @@ func TestClientEcho(t *testing.T) {
} }
client.Echo([]byte("Hello world")) client.Echo([]byte("Hello world"))
} }
*/
/*
func TestClientDo(t *testing.T) { func TestClientDo(t *testing.T) {
if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil {
t.Error(err) t.Error(err)
@ -37,7 +35,7 @@ func TestClientDo(t *testing.T) {
t.Log(handle) t.Log(handle)
} }
} }
*/
func TestClientClose(t *testing.T) { func TestClientClose(t *testing.T) {
if err := client.Close(); err != nil { if err := client.Close(); err != nil {
t.Error(err) t.Error(err)

View File

@ -13,6 +13,7 @@ import (
) )
var ( var (
ErrJobTimeOut = errors.New("Do a job time out.")
ErrInvalidData = errors.New("Invalid data.") ErrInvalidData = errors.New("Invalid data.")
ErrWorkWarning = errors.New("Work warning.") ErrWorkWarning = errors.New("Work warning.")
ErrWorkFail = errors.New("Work fail.") ErrWorkFail = errors.New("Work fail.")
@ -23,6 +24,7 @@ var (
ErrFuncNotFound = errors.New("The function was not found.") ErrFuncNotFound = errors.New("The function was not found.")
ErrEmptyReading = errors.New("Empty reading.") ErrEmptyReading = errors.New("Empty reading.")
) )
func DisablePanic() {recover()}
// Extract the error message // Extract the error message
func GetError(data []byte) (eno syscall.Errno, err error) { func GetError(data []byte) (eno syscall.Errno, err error) {

View File

@ -5,7 +5,7 @@ import "testing"
var worker *Worker var worker *Worker
func init() { func init() {
worker = NewWorker() worker = New(Unlimited)
} }
func TestWorkerAddServer(t *testing.T) { func TestWorkerAddServer(t *testing.T) {
@ -14,59 +14,31 @@ func TestWorkerAddServer(t *testing.T) {
t.Error(err) t.Error(err)
} }
if l := len(worker.clients); l != 1 { if l := len(worker.agents); l != 1 {
t.Log(worker.clients) t.Log(worker.agents)
t.Error("The length of server list should be 1.") t.Error("The length of server list should be 1.")
} }
} }
func foobar(job *WorkerJob) ([]byte, error) { func foobar(job *Job) ([]byte, error) {
return nil, nil return nil, nil
} }
func TestWorkerAddFunction(t *testing.T) { func TestWorkerAddFunction(t *testing.T) {
if err := worker.AddFunction("foobar", foobar, 0); err != nil { if err := worker.AddFunc("foobar", foobar, 0); err != nil {
t.Error(err) t.Error(err)
} }
if err := worker.AddFunction("timeout", foobar, 5); err != nil { if err := worker.AddFunc("timeout", foobar, 5); err != nil {
t.Error(err) t.Error(err)
} }
if l := len(worker.functions); l != 2 { if l := len(worker.funcs); l != 2 {
t.Log(worker.functions) t.Log(worker.funcs)
t.Errorf("The length of function map should be %d.", 2) t.Errorf("The length of function map should be %d.", 2)
} }
} }
func TestWorkerEcho(t *testing.T) { func TestWorkerRemoveFunc(t *testing.T) {
if err := worker.Echo([]byte("Hello World")); err != nil { if err := worker.RemoveFunc("foobar"); err != nil {
t.Error(err)
}
}
/*
func TestWorkerResult(t *testing.T) {
if job := worker.LastResult(); job == nil {
t.Error("Nothing in result.")
} else {
t.Log(job)
}
}
*/
func TestWorkerRemoveFunction(t *testing.T) {
if err := worker.RemoveFunction("foobar"); err != nil {
t.Error(err)
}
}
func TestWorkerReset(t *testing.T) {
if err := worker.Reset(); err != nil {
t.Error(err)
}
}
func TestWorkerClose(t *testing.T) {
if err := worker.Close(); err != nil {
t.Error(err) t.Error(err)
} }
} }