@@ -18,7 +18,8 @@ var ( | |||
type poolClient struct { | |||
*Client | |||
Rate int | |||
Rate int | |||
mutex sync.Mutex | |||
} | |||
type SelectionHandler func(map[string]*poolClient, string) string | |||
@@ -95,6 +96,8 @@ func (pool *Pool) Remove(addr string) { | |||
func (pool *Pool) Do(funcname string, data []byte, | |||
flag byte, h ResponseHandler) (addr, handle string, err error) { | |||
client := pool.selectServer() | |||
client.Lock() | |||
defer client.Unlock() | |||
handle, err = client.Do(funcname, data, flag, h) | |||
addr = client.addr | |||
return | |||
@@ -103,6 +106,8 @@ func (pool *Pool) Do(funcname string, data []byte, | |||
func (pool *Pool) DoBg(funcname string, data []byte, | |||
flag byte) (addr, handle string, err error) { | |||
client := pool.selectServer() | |||
client.Lock() | |||
defer client.Unlock() | |||
handle, err = client.DoBg(funcname, data, flag) | |||
addr = client.addr | |||
return | |||
@@ -112,6 +117,8 @@ func (pool *Pool) DoBg(funcname string, data []byte, | |||
// !!!Not fully tested.!!! | |||
func (pool *Pool) Status(addr, handle string) (status *Status, err error) { | |||
if client, ok := pool.clients[addr]; ok { | |||
client.Lock() | |||
defer client.Unlock() | |||
status, err = client.Status(handle) | |||
} else { | |||
err = ErrNotFound | |||
@@ -131,6 +138,8 @@ func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { | |||
return | |||
} | |||
} | |||
client.Lock() | |||
defer client.Unlock() | |||
echo, err = client.Echo(data) | |||
return | |||
} | |||
@@ -53,13 +53,13 @@ func (a *agent) work() { | |||
for { | |||
if data, err = a.read(bufferSize); err != nil { | |||
if opErr, ok := err.(*net.OpError); ok { | |||
if opErr.Timeout() { | |||
a.worker.err(err) | |||
} | |||
if opErr.Temporary() { | |||
continue | |||
}else{ | |||
a.worker.err(err) | |||
break | |||
} | |||
break | |||
} | |||
a.worker.err(err) | |||
// If it is unexpected error and the connection wasn't | |||
@@ -22,6 +22,7 @@ type Worker struct { | |||
funcs jobFuncs | |||
in chan *inPack | |||
running bool | |||
ready bool | |||
Id string | |||
ErrorHandler ErrorHandler | |||
@@ -174,12 +175,21 @@ func (worker *Worker) Ready() (err error) { | |||
for funcname, f := range worker.funcs { | |||
worker.addFunc(funcname, f.timeout) | |||
} | |||
worker.ready = true | |||
return | |||
} | |||
// Main loop, block here | |||
// Most of time, this should be evaluated in goroutine. | |||
func (worker *Worker) Work() { | |||
if ! worker.ready { | |||
// didn't run Ready beforehand, so we'll have to do it: | |||
err := worker.Ready() | |||
if err != nil { | |||
panic( err ) | |||
} | |||
} | |||
defer func() { | |||
for _, a := range worker.agents { | |||
a.Close() | |||
@@ -0,0 +1,170 @@ | |||
package worker | |||
import ( | |||
"../client" | |||
"log" | |||
"net" | |||
"os/exec" | |||
"testing" | |||
"time" | |||
) | |||
const port = `3700` | |||
var gearman_ready chan bool | |||
var kill_gearman chan bool | |||
var bye chan bool | |||
func init() { | |||
if check_gearman_present() { | |||
panic(`Something already listening on our testing port. Chickening out of testing with it!`) | |||
} | |||
gearman_ready = make( chan bool ) | |||
kill_gearman = make( chan bool ) | |||
// TODO: verify port is clear | |||
go run_gearman() | |||
} | |||
func run_gearman() { | |||
gm_cmd := exec.Command(`/usr/sbin/gearmand`, `--port`, port) | |||
start_err := gm_cmd.Start() | |||
if start_err != nil { | |||
panic(`could not start gearman, aborting test :` + start_err.Error()) | |||
} | |||
// Make sure we clear up our gearman: | |||
defer func() { | |||
log.Println("killing gearmand") | |||
gm_cmd.Process.Kill() | |||
}() | |||
for tries := 10; tries > 0; tries-- { | |||
if check_gearman_present() { | |||
break | |||
} | |||
time.Sleep(250 * time.Millisecond) | |||
} | |||
if !check_gearman_present() { | |||
panic(`Unable to start gearman aborting test`) | |||
} | |||
gearman_ready <- true | |||
<- kill_gearman | |||
} | |||
func check_gearman_present() bool { | |||
con, err := net.Dial(`tcp`, `127.0.0.1:`+port) | |||
if err != nil { | |||
log.Println("gearman not ready " + err.Error()) | |||
return false | |||
} | |||
log.Println("gearman ready") | |||
con.Close() | |||
return true | |||
} | |||
func check_gearman_is_dead() bool { | |||
for tries := 10; tries > 0; tries-- { | |||
if !check_gearman_present() { | |||
return true | |||
} | |||
time.Sleep(250 * time.Millisecond) | |||
} | |||
return false | |||
} | |||
/* | |||
Checks for a disconnect whilst not working | |||
*/ | |||
func TestBasicDisconnect(t *testing.T) { | |||
<- gearman_ready | |||
worker := New(Unlimited) | |||
timeout := make(chan bool, 1) | |||
done := make( chan bool, 1) | |||
if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil { | |||
t.Error(err) | |||
} | |||
work_done := false; | |||
if err := worker.AddFunc("gearman-go-workertest", | |||
func(j Job)(b []byte, e error){ | |||
work_done = true; | |||
done <- true | |||
return}, 0); | |||
err != nil { | |||
t.Error(err) | |||
} | |||
worker.JobHandler = func( j Job ) error { | |||
if( ! worker.ready ){ | |||
t.Error("Worker not ready as expected"); | |||
} | |||
done <-true | |||
return nil | |||
} | |||
handled_errors := false | |||
c_error := make( chan bool) | |||
worker.ErrorHandler = func( e error ){ | |||
log.Println( e ) | |||
handled_errors = true | |||
c_error <- true | |||
} | |||
go func() { | |||
time.Sleep(5 * time.Second) | |||
timeout <- true | |||
}() | |||
err := worker.Ready() | |||
if err != nil { | |||
t.Error(err) | |||
} | |||
go worker.Work() | |||
kill_gearman <- true | |||
check_gearman_is_dead() | |||
go run_gearman() | |||
select { | |||
case <-gearman_ready: | |||
case <-timeout: | |||
} | |||
send_client_request() | |||
select { | |||
case <- done: | |||
t.Error("Client request handled (somehow), did we magically reconnect?") | |||
case <-timeout: | |||
t.Error("Test timed out waiting for the error handler") | |||
case <-c_error: | |||
// error was handled! | |||
} | |||
kill_gearman <- true | |||
} | |||
func send_client_request(){ | |||
log.Println("sending client request"); | |||
c, err := client.New( Network, "127.0.0.1:" + port ) | |||
if err == nil { | |||
_, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) | |||
if err != nil { | |||
log.Println( "error sending client request " + err.Error() ) | |||
} | |||
}else{ | |||
log.Println( "error with client " + err.Error() ) | |||
} | |||
} |
@@ -3,6 +3,7 @@ package worker | |||
import ( | |||
"sync" | |||
"testing" | |||
"time" | |||
) | |||
var worker *Worker | |||
@@ -77,6 +78,92 @@ func TestWork(t *testing.T) { | |||
wg.Wait() | |||
} | |||
func TestWorkerClose(t *testing.T) { | |||
worker.Close() | |||
} | |||
func TestWorkWithoutReady(t * testing.T){ | |||
other_worker := New(Unlimited) | |||
if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil { | |||
t.Error(err) | |||
} | |||
if err := other_worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { | |||
t.Error(err) | |||
} | |||
timeout := make(chan bool, 1) | |||
done := make( chan bool, 1) | |||
other_worker.JobHandler = func( j Job ) error { | |||
if( ! other_worker.ready ){ | |||
t.Error("Worker not ready as expected"); | |||
} | |||
done <-true | |||
return nil | |||
} | |||
go func() { | |||
time.Sleep(5 * time.Second) | |||
timeout <- true | |||
}() | |||
go func(){ | |||
other_worker.Work(); | |||
}() | |||
// With the all-in-one Work() we don't know if the | |||
// worker is ready at this stage so we may have to wait a sec: | |||
go func(){ | |||
tries := 5 | |||
for( tries > 0 ){ | |||
if other_worker.ready { | |||
other_worker.Echo([]byte("Hello")) | |||
break | |||
} | |||
// still waiting for it to be ready.. | |||
time.Sleep(250 * time.Millisecond) | |||
tries-- | |||
} | |||
}() | |||
// determine if we've finished or timed out: | |||
select{ | |||
case <- timeout: | |||
t.Error("Test timed out waiting for the worker") | |||
case <- done: | |||
} | |||
} | |||
func TestWorkWithoutReadyWithPanic(t * testing.T){ | |||
other_worker := New(Unlimited) | |||
timeout := make(chan bool, 1) | |||
done := make( chan bool, 1) | |||
// Going to work with no worker setup. | |||
// when Work (hopefully) calls Ready it will get an error which should cause it to panic() | |||
go func(){ | |||
defer func() { | |||
if err := recover(); err != nil { | |||
done <- true | |||
return | |||
} | |||
t.Error("Work should raise a panic.") | |||
done <- true | |||
}() | |||
other_worker.Work(); | |||
}() | |||
go func() { | |||
time.Sleep(2 * time.Second) | |||
timeout <- true | |||
}() | |||
select{ | |||
case <- timeout: | |||
t.Error("Test timed out waiting for the worker") | |||
case <- done: | |||
} | |||
} |