This commit is contained in:
mikespook 2013-04-24 16:58:06 +08:00
parent 2107c9b883
commit 8e009be940
9 changed files with 49 additions and 29 deletions

View File

@ -350,16 +350,20 @@ func (client *Client) Status(handle string, timeout time.Duration) (status *Stat
client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle)))
select {
case status = <-client.status:
case <-time.NewTimer(timeout).C:
case <-time.After(timeout):
err = common.ErrTimeOut
}
return
}
// Send a something out, get the samething back.
func (client *Client) Echo(data []byte) (r []byte) {
func (client *Client) Echo(data []byte, timeout time.Duration) (r []byte, err error) {
client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
r = <-client.echo
select {
case r = <-client.echo:
case <-time.After(timeout):
err = common.ErrTimeOut
}
return
}

View File

@ -1,6 +1,7 @@
package client
import (
"time"
"testing"
)
@ -19,10 +20,14 @@ func TestClientAddServer(t *testing.T) {
}
func TestClientEcho(t *testing.T) {
if echo := string(client.Echo([]byte("Hello world"))); echo == "Hello world" {
t.Log(echo)
} else {
echo, err := client.Echo([]byte("Hello world"), time.Second)
if err != nil {
t.Error(err)
return
}
if string(echo) != "Hello world" {
t.Errorf("Invalid echo data: %s", echo)
return
}
}
@ -30,8 +35,6 @@ func TestClientDoBg(t *testing.T) {
if handle := client.DoBg("ToUpper", []byte("abcdef"),
JOB_LOW); handle == "" {
t.Error("Handle is empty.")
} else {
t.Log(handle)
}
}
@ -55,21 +58,33 @@ func TestClientDo(t *testing.T) {
func TestClientStatus(t *testing.T) {
s1 := client.Status("handle not exists")
s1, err := client.Status("handle not exists", time.Second)
if err != nil {
t.Error(err)
return
}
if s1.Known {
t.Errorf("The job (%s) shouldn't be known.", s1.Handle)
return
}
if s1.Running {
t.Errorf("The job (%s) shouldn't be running.", s1.Handle)
return
}
handle := client.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil);
s2 := client.Status(handle)
s2, err := client.Status(handle, time.Second)
if err != nil {
t.Error(err)
return
}
if !s2.Known {
t.Errorf("The job (%s) should be known.", s2.Handle)
return
}
if s2.Running {
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
return
}
}

View File

@ -85,7 +85,6 @@ func (pool *Pool) Add(addr string, rate int) (err error) {
var client *Client
client, err = New(addr)
item = &poolClient{Client: client, Rate: rate}
err = item.connect()
pool.clients[addr] = item
}
return
@ -126,7 +125,7 @@ func (pool *Pool) Status(addr, handle string, timeout time.Duration) (status *St
}
// Send a something out, get the samething back.
func (pool *Pool) Echo(addr string, data []byte) (r []byte, err error) {
func (pool *Pool) Echo(addr string, data []byte, timeout time.Duration) (r []byte, err error) {
var client *poolClient
if addr == "" {
client = pool.selectServer()
@ -137,7 +136,7 @@ func (pool *Pool) Echo(addr string, data []byte) (r []byte, err error) {
return
}
}
r = client.Echo(data)
r, err = client.Echo(data, timeout)
return
}

View File

@ -1,6 +1,7 @@
package client
import (
"time"
"testing"
)
@ -22,17 +23,17 @@ func TestPoolAdd(t *testing.T) {
}
func TestPoolEcho(t *testing.T) {
echo, err := pool.Echo("", []byte("Hello world"))
echo, err := pool.Echo("", []byte("Hello pool"), time.Second)
if err != nil {
t.Error(err)
return
}
if string(echo) != "Hello world" {
if string(echo) != "Hello pool" {
t.Errorf("Invalid echo data: %s", echo)
return
}
_, err = pool.Echo("not exists", []byte("Hello world"))
_, err = pool.Echo("not exists", []byte("Hello pool"), time.Second)
if err != ErrNotFound {
t.Errorf("ErrNotFound expected, got %s", err)
}
@ -66,7 +67,7 @@ func TestPoolDo(t *testing.T) {
}
func TestPoolStatus(t *testing.T) {
s1, err := pool.Status("127.0.0.1:4730", "handle not exists")
s1, err := pool.Status("127.0.0.1:4730", "handle not exists", time.Second)
if err != nil {
t.Error(err)
return
@ -79,7 +80,7 @@ func TestPoolStatus(t *testing.T) {
}
addr, handle := pool.Do("Delay5sec", []byte("abcdef"), JOB_LOW, nil);
s2, err := pool.Status(addr, handle)
s2, err := pool.Status(addr, handle, time.Second)
if err != nil {
t.Error(err)
return
@ -92,7 +93,7 @@ func TestPoolStatus(t *testing.T) {
t.Errorf("The job (%s) shouldn't be running.", s2.Handle)
}
_, err = pool.Status("not exists", "not exists")
_, err = pool.Status("not exists", "not exists", time.Second)
if err != ErrNotFound {
t.Error(err)
}

View File

@ -24,7 +24,11 @@ func main() {
}
echo := []byte("Hello\x00 world")
wg.Add(1)
log.Println(string(c.Echo(echo)))
echomsg, err := c.Echo(echo, time.Second)
if err != nil {
log.Fatalln(err)
}
log.Println(string(echomsg))
wg.Done()
jobHandler := func(job *client.Job) {
log.Printf("%s", job.Data)

Binary file not shown.

View File

@ -91,8 +91,11 @@ func (a *agent) inLoop() {
case common.NOOP:
a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil))
case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
job.agent = a
if a.worker.running {
if a.worker.limit != nil {
a.worker.limit <- true
}
job.agent = a
a.worker.in <- job
}
}

View File

@ -110,7 +110,7 @@ func (job *Job) UpdateData(data []byte, iswarning bool) {
func (job *Job) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
result := append([]byte(job.Handle), 0)
result := append([]byte(job.Handle), '\x00')
result = append(result, n...)
result = append(result, '\x00')
result = append(result, d...)

View File

@ -73,9 +73,6 @@ func New(l int) (worker *Worker) {
}
if l != Unlimited {
worker.limit = make(chan bool, l)
for i := 0; i < l; i ++ {
worker.limit <- true
}
}
return
}
@ -165,7 +162,7 @@ func (worker *Worker) dealJob(job *Job) {
defer func() {
job.Close()
if worker.running && worker.limit != nil {
worker.limit <- true
<-worker.limit
}
}()
switch job.DataType {
@ -199,9 +196,6 @@ func (worker *Worker) Work() {
for ok {
var job *Job
if job, ok = <-worker.in; ok {
if worker.limit != nil {
<-worker.limit
}
go worker.dealJob(job)
}
}