diff --git a/README.md b/README.md index 23e11d5..3356685 100644 --- a/README.md +++ b/README.md @@ -56,3 +56,8 @@ http://twitter.com/mikespook * 0.1 Refactoring code, redesign the API. * 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API. + +# TODO + + * Can not grab a job correctly. + * The worker should can reconnect to the job server. diff --git a/client/client.go b/client/client.go index fe3d7fe..6de4212 100644 --- a/client/client.go +++ b/client/client.go @@ -8,6 +8,7 @@ package client import ( "io" "net" + "time" "bytes" "strconv" "bitbucket.org/mikespook/golib/autoinc" @@ -32,6 +33,7 @@ type Client struct { ErrHandler common.ErrorHandler JobHandler JobHandler StatusHandler StatusHandler + TimeOut time.Duration in chan []byte out chan *Job @@ -55,6 +57,7 @@ func New(addr string) (client *Client, err error) { out: make(chan *Job, common.QUEUE_SIZE), conn: conn, ai: autoinc.New(0, 1), + TimeOut: time.Second, } go client.inLoop() go client.outLoop() @@ -75,9 +78,14 @@ func (client *Client) outLoop() { // in loop func (client *Client) inLoop() { + defer common.DisablePanic() for { rel, err := client.read() if err != nil { + if err == common.ErrEmptyReading { + client.Close() + break + } client.err(err) continue } @@ -227,8 +235,20 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string rel = append(rel, data...) // len(data) client.writeJob(newJob(common.REQ, datatype, rel)) // Waiting for JOB_CREATED - job := <-client.jobCreated - return string(job.Data), nil + timeout := make(chan bool) + defer close(timeout) + go func() { + defer common.DisablePanic() + time.Sleep(client.TimeOut) + timeout <- true + }() + select { + case job := <-client.jobCreated: + return string(job.Data), nil + case <-timeout: + return "", common.ErrJobTimeOut + } + return } // Get job status from job server. diff --git a/client/client_test.go b/client/client_test.go index 3853642..1c8db92 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -12,11 +12,8 @@ func TestClientAddServer(t *testing.T) { if client, err = New("127.0.0.1:4730"); err != nil { t.Error(err) } - client.ErrHandler = func(e error) { - t.Error(e) - } } - +/* func TestClientEcho(t *testing.T) { client.JobHandler = func(job *Job) error { echo := string(job.Data) @@ -29,7 +26,8 @@ func TestClientEcho(t *testing.T) { } client.Echo([]byte("Hello world")) } - +*/ +/* func TestClientDo(t *testing.T) { if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { t.Error(err) @@ -37,7 +35,7 @@ func TestClientDo(t *testing.T) { t.Log(handle) } } - +*/ func TestClientClose(t *testing.T) { if err := client.Close(); err != nil { t.Error(err) diff --git a/common/error.go b/common/error.go index 8c1e8fa..463af2d 100644 --- a/common/error.go +++ b/common/error.go @@ -13,16 +13,18 @@ import ( ) var ( - ErrInvalidData = errors.New("Invalid data.") - ErrWorkWarning = errors.New("Work warning.") - ErrWorkFail = errors.New("Work fail.") - ErrWorkException = errors.New("Work exeption.") - ErrDataType = errors.New("Invalid data type.") - ErrOutOfCap = errors.New("Out of the capability.") - ErrNotConn = errors.New("Did not connect to job server.") - ErrFuncNotFound = errors.New("The function was not found.") - ErrEmptyReading = errors.New("Empty reading.") + ErrJobTimeOut = errors.New("Do a job time out.") + ErrInvalidData = errors.New("Invalid data.") + ErrWorkWarning = errors.New("Work warning.") + ErrWorkFail = errors.New("Work fail.") + ErrWorkException = errors.New("Work exeption.") + ErrDataType = errors.New("Invalid data type.") + ErrOutOfCap = errors.New("Out of the capability.") + ErrNotConn = errors.New("Did not connect to job server.") + ErrFuncNotFound = errors.New("The function was not found.") + ErrEmptyReading = errors.New("Empty reading.") ) +func DisablePanic() {recover()} // Extract the error message func GetError(data []byte) (eno syscall.Errno, err error) { diff --git a/worker/worker_test.go b/worker/worker_test.go index 0f3fdf7..25f5836 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -5,7 +5,7 @@ import "testing" var worker *Worker func init() { - worker = NewWorker() + worker = New(Unlimited) } func TestWorkerAddServer(t *testing.T) { @@ -14,59 +14,31 @@ func TestWorkerAddServer(t *testing.T) { t.Error(err) } - if l := len(worker.clients); l != 1 { - t.Log(worker.clients) + if l := len(worker.agents); l != 1 { + t.Log(worker.agents) t.Error("The length of server list should be 1.") } } -func foobar(job *WorkerJob) ([]byte, error) { +func foobar(job *Job) ([]byte, error) { return nil, nil } func TestWorkerAddFunction(t *testing.T) { - if err := worker.AddFunction("foobar", foobar, 0); err != nil { + if err := worker.AddFunc("foobar", foobar, 0); err != nil { t.Error(err) } - if err := worker.AddFunction("timeout", foobar, 5); err != nil { + if err := worker.AddFunc("timeout", foobar, 5); err != nil { t.Error(err) } - if l := len(worker.functions); l != 2 { - t.Log(worker.functions) + if l := len(worker.funcs); l != 2 { + t.Log(worker.funcs) t.Errorf("The length of function map should be %d.", 2) } } -func TestWorkerEcho(t *testing.T) { - if err := worker.Echo([]byte("Hello World")); err != nil { - t.Error(err) - } -} - -/* -func TestWorkerResult(t *testing.T) { - if job := worker.LastResult(); job == nil { - t.Error("Nothing in result.") - } else { - t.Log(job) - } -} -*/ - -func TestWorkerRemoveFunction(t *testing.T) { - if err := worker.RemoveFunction("foobar"); err != nil { - t.Error(err) - } -} - -func TestWorkerReset(t *testing.T) { - if err := worker.Reset(); err != nil { - t.Error(err) - } -} - -func TestWorkerClose(t *testing.T) { - if err := worker.Close(); err != nil { +func TestWorkerRemoveFunc(t *testing.T) { + if err := worker.RemoveFunc("foobar"); err != nil { t.Error(err) } }