diff --git a/client/client.go b/client/client.go index b0265c6..cdfdd5f 100644 --- a/client/client.go +++ b/client/client.go @@ -350,16 +350,20 @@ func (client *Client) Status(handle string, timeout time.Duration) (status *Stat client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle))) select { case status = <-client.status: - case <-time.NewTimer(timeout).C: + case <-time.After(timeout): err = common.ErrTimeOut } return } // Send a something out, get the samething back. -func (client *Client) Echo(data []byte) (r []byte) { +func (client *Client) Echo(data []byte, timeout time.Duration) (r []byte, err error) { client.writeJob(newJob(common.REQ, common.ECHO_REQ, data)) - r = <-client.echo + select { + case r = <-client.echo: + case <-time.After(timeout): + err = common.ErrTimeOut + } return } diff --git a/client/client_test.go b/client/client_test.go index e58a71d..11269d7 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,6 +1,7 @@ package client import ( + "time" "testing" ) @@ -19,10 +20,14 @@ func TestClientAddServer(t *testing.T) { } func TestClientEcho(t *testing.T) { - if echo := string(client.Echo([]byte("Hello world"))); echo == "Hello world" { - t.Log(echo) - } else { + echo, err := client.Echo([]byte("Hello world"), time.Second) + if err != nil { + t.Error(err) + return + } + if string(echo) != "Hello world" { t.Errorf("Invalid echo data: %s", echo) + return } } @@ -30,8 +35,6 @@ func TestClientDoBg(t *testing.T) { if handle := client.DoBg("ToUpper", []byte("abcdef"), JOB_LOW); handle == "" { t.Error("Handle is empty.") - } else { - t.Log(handle) } } @@ -55,21 +58,33 @@ func TestClientDo(t *testing.T) { func TestClientStatus(t *testing.T) { - s1 := client.Status("handle not exists") + s1, err := client.Status("handle not exists", time.Second) + if err != nil { + t.Error(err) + return + } if s1.Known { t.Errorf("The job (%s) shouldn't be known.", s1.Handle) + return } if s1.Running { t.Errorf("The job (%s) shouldn't be running.", s1.Handle) + return } handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil); - s2 := client.Status(handle) + s2, err := client.Status(handle, time.Second) + if err != nil { + t.Error(err) + return + } if !s2.Known { t.Errorf("The job (%s) should be known.", s2.Handle) + return } if s2.Running { t.Errorf("The job (%s) shouldn't be running.", s2.Handle) + return } } diff --git a/client/pool.go b/client/pool.go index 894cf56..ed48734 100644 --- a/client/pool.go +++ b/client/pool.go @@ -85,7 +85,6 @@ func (pool *Pool) Add(addr string, rate int) (err error) { var client *Client client, err = New(addr) item = &poolClient{Client: client, Rate: rate} - err = item.connect() pool.clients[addr] = item } return @@ -126,7 +125,7 @@ func (pool *Pool) Status(addr, handle string, timeout time.Duration) (status *St } // Send a something out, get the samething back. -func (pool *Pool) Echo(addr string, data []byte) (r []byte, err error) { +func (pool *Pool) Echo(addr string, data []byte, timeout time.Duration) (r []byte, err error) { var client *poolClient if addr == "" { client = pool.selectServer() @@ -137,7 +136,7 @@ func (pool *Pool) Echo(addr string, data []byte) (r []byte, err error) { return } } - r = client.Echo(data) + r, err = client.Echo(data, timeout) return } diff --git a/client/pool_test.go b/client/pool_test.go index bf21f0c..227c1f0 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -1,6 +1,7 @@ package client import ( + "time" "testing" ) @@ -22,17 +23,17 @@ func TestPoolAdd(t *testing.T) { } func TestPoolEcho(t *testing.T) { - echo, err := pool.Echo("", []byte("Hello world")) + echo, err := pool.Echo("", []byte("Hello pool"), time.Second) if err != nil { t.Error(err) return } - if string(echo) != "Hello world" { + if string(echo) != "Hello pool" { t.Errorf("Invalid echo data: %s", echo) return } - _, err = pool.Echo("not exists", []byte("Hello world")) + _, err = pool.Echo("not exists", []byte("Hello pool"), time.Second) if err != ErrNotFound { t.Errorf("ErrNotFound expected, got %s", err) } @@ -66,7 +67,7 @@ func TestPoolDo(t *testing.T) { } func TestPoolStatus(t *testing.T) { - s1, err := pool.Status("127.0.0.1:4730", "handle not exists") + s1, err := pool.Status("127.0.0.1:4730", "handle not exists", time.Second) if err != nil { t.Error(err) return @@ -79,7 +80,7 @@ func TestPoolStatus(t *testing.T) { } addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil); - s2, err := pool.Status(addr, handle) + s2, err := pool.Status(addr, handle, time.Second) if err != nil { t.Error(err) return @@ -92,7 +93,7 @@ func TestPoolStatus(t *testing.T) { t.Errorf("The job (%s) shouldn't be running.", s2.Handle) } - _, err = pool.Status("not exists", "not exists") + _, err = pool.Status("not exists", "not exists", time.Second) if err != ErrNotFound { t.Error(err) } diff --git a/example/client.go b/example/client.go index d377312..cce04dc 100644 --- a/example/client.go +++ b/example/client.go @@ -24,7 +24,11 @@ func main() { } echo := []byte("Hello\x00 world") wg.Add(1) - log.Println(string(c.Echo(echo))) + echomsg, err := c.Echo(echo, time.Second) + if err != nil { + log.Fatalln(err) + } + log.Println(string(echomsg)) wg.Done() jobHandler := func(job *client.Job) { log.Printf("%s", job.Data) diff --git a/example/worker b/example/worker deleted file mode 100755 index 0558732..0000000 Binary files a/example/worker and /dev/null differ diff --git a/worker/agent.go b/worker/agent.go index 1e17584..fbc8c76 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -91,8 +91,11 @@ func (a *agent) inLoop() { case common.NOOP: a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil)) case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: - job.agent = a if a.worker.running { + if a.worker.limit != nil { + a.worker.limit <- true + } + job.agent = a a.worker.in <- job } } diff --git a/worker/job.go b/worker/job.go index 361fdce..6f86989 100644 --- a/worker/job.go +++ b/worker/job.go @@ -110,7 +110,7 @@ func (job *Job) UpdateData(data []byte, iswarning bool) { func (job *Job) UpdateStatus(numerator, denominator int) { n := []byte(strconv.Itoa(numerator)) d := []byte(strconv.Itoa(denominator)) - result := append([]byte(job.Handle), 0) + result := append([]byte(job.Handle), '\x00') result = append(result, n...) result = append(result, '\x00') result = append(result, d...) diff --git a/worker/worker.go b/worker/worker.go index 7ad830d..e96118c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -73,9 +73,6 @@ func New(l int) (worker *Worker) { } if l != Unlimited { worker.limit = make(chan bool, l) - for i := 0; i < l; i ++ { - worker.limit <- true - } } return } @@ -165,7 +162,7 @@ func (worker *Worker) dealJob(job *Job) { defer func() { job.Close() if worker.running && worker.limit != nil { - worker.limit <- true + <-worker.limit } }() switch job.DataType { @@ -199,9 +196,6 @@ func (worker *Worker) Work() { for ok { var job *Job if job, ok = <-worker.in; ok { - if worker.limit != nil { - <-worker.limit - } go worker.dealJob(job) } }