Xing Xing пре 9 година
родитељ
комит
ad9b3cb988
5 измењених фајлова са 56 додато и 57 уклоњено
  1. +1
    -1
      example/client/client.go
  2. +2
    -2
      example/worker/worker.go
  3. +13
    -13
      worker/agent.go
  4. +16
    -17
      worker/worker_disconnect_test.go
  5. +24
    -24
      worker/worker_test.go

+ 1
- 1
example/client/client.go Прегледај датотеку

@@ -3,8 +3,8 @@ package main
import (
"github.com/mikespook/gearman-go/client"
"log"
"sync"
"os"
"sync"
)

func main() {


+ 2
- 2
example/worker/worker.go Прегледај датотеку

@@ -4,10 +4,10 @@ import (
"github.com/mikespook/gearman-go/worker"
"github.com/mikespook/golib/signal"
"log"
"net"
"os"
"strings"
"time"
"net"
)

func ToUpper(job worker.Job) ([]byte, error) {
@@ -41,7 +41,7 @@ func main() {
w.ErrorHandler = func(e error) {
log.Println(e)
if opErr, ok := e.(*net.OpError); ok {
if ! opErr.Temporary() {
if !opErr.Temporary() {
proc, err := os.FindProcess(os.Getpid())
if err != nil {
log.Println(err)


+ 13
- 13
worker/agent.go Прегледај датотеку

@@ -4,9 +4,9 @@ import (
"bufio"
"bytes"
"encoding/binary"
"io"
"net"
"sync"
"io"
)

// The agent of job server.
@@ -59,14 +59,14 @@ func (a *agent) work() {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Temporary() {
continue
}else{
} else {
a.disconnect_error(err)
// else - we're probably dc'ing due to a Close()

break
}
} else if( err == io.EOF ){
} else if err == io.EOF {
a.disconnect_error(err)
break
}
@@ -104,11 +104,11 @@ func (a *agent) work() {
}
}

func (a * agent) disconnect_error( err error ){
if( a.conn != nil ){
func (a *agent) disconnect_error(err error) {
if a.conn != nil {
err = &WorkerDisconnectError{
err : err,
agent : a,
err: err,
agent: a,
}
a.worker.err(err)
}
@@ -129,7 +129,7 @@ func (a *agent) Grab() {
a.grab()
}

func (a *agent) grab(){
func (a *agent) grab() {
outpack := getOutPack()
outpack.dataType = dtGrabJobUniq
a.write(outpack)
@@ -143,16 +143,16 @@ func (a *agent) PreSleep() {
a.write(outpack)
}

func (a *agent) reconnect() (error){
a.Lock()
func (a *agent) reconnect() error {
a.Lock()
defer a.Unlock()
conn, err := net.Dial(a.net, a.addr)
if err != nil {
return err;
return err
}
a.conn = conn
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
bufio.NewWriter(a.conn))
a.grab()
a.worker.reRegisterFuncsForAgent(a)



+ 16
- 17
worker/worker_disconnect_test.go Прегледај датотеку

@@ -50,8 +50,8 @@ func run_gearman() {
panic(`Unable to start gearman aborting test`)
}
gearman_ready <- true
<- kill_gearman
<-kill_gearman
}

func check_gearman_present() bool {
@@ -134,14 +134,14 @@ func TestBasicDisconnect(t *testing.T) {
send_client_request()

select {
case <- done:
case <-done:
t.Error("Client request handled (somehow), did we magically reconnect?")
case <-timeout:
t.Error("Test timed out waiting for the error handler")
case <-c_error:
// error was handled!
if ! was_dc_err {
t.Error( "Disconnect didn't manifest as a net.OpError?")
if !was_dc_err {
t.Error("Disconnect didn't manifest as a net.OpError?")
}
}
worker.Close()
@@ -175,24 +175,23 @@ func TestDcRc(t *testing.T) {

worker.ErrorHandler = func(e error) {
wdc, wdcok := e.(*WorkerDisconnectError)
if( wdcok){
if wdcok {
log.Println("Reconnecting!")
reconnected := false
for tries := 20 ; ! reconnected && tries > 0; tries -- {
for tries := 20; !reconnected && tries > 0; tries-- {
rcerr := wdc.Reconnect()
if rcerr != nil{
if rcerr != nil {
time.Sleep(250 * time.Millisecond)
} else{
reconnected = true;
} else {
reconnected = true
}
}
} else{

} else {
panic("Some other kind of error " + e.Error())
}
}

go func() {
@@ -219,9 +218,9 @@ func TestDcRc(t *testing.T) {
}

send_client_request()
select {
case <- done:
case <-done:
case <-timeout:
t.Error("Test timed out")
}


+ 24
- 24
worker/worker_test.go Прегледај датотеку

@@ -139,7 +139,7 @@ func TestWorkerClose(t *testing.T) {
worker.Close()
}

func TestWorkWithoutReady(t * testing.T){
func TestWorkWithoutReady(t *testing.T) {
other_worker := New(Unlimited)

if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
@@ -148,15 +148,15 @@ func TestWorkWithoutReady(t * testing.T){
if err := other_worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil {
t.Error(err)
}
timeout := make(chan bool, 1)
done := make( chan bool, 1)
done := make(chan bool, 1)

other_worker.JobHandler = func( j Job ) error {
if( ! other_worker.ready ){
t.Error("Worker not ready as expected");
other_worker.JobHandler = func(j Job) error {
if !other_worker.ready {
t.Error("Worker not ready as expected")
}
done <-true
done <- true
return nil
}
go func() {
@@ -164,15 +164,15 @@ func TestWorkWithoutReady(t * testing.T){
timeout <- true
}()

go func(){
other_worker.Work();
go func() {
other_worker.Work()
}()

// With the all-in-one Work() we don't know if the
// With the all-in-one Work() we don't know if the
// worker is ready at this stage so we may have to wait a sec:
go func(){
go func() {
tries := 5
for( tries > 0 ){
for tries > 0 {
if other_worker.ready {
other_worker.Echo([]byte("Hello"))
break
@@ -183,24 +183,24 @@ func TestWorkWithoutReady(t * testing.T){
tries--
}
}()
// determine if we've finished or timed out:
select{
case <- timeout:
select {
case <-timeout:
t.Error("Test timed out waiting for the worker")
case <- done:
case <-done:
}
}

func TestWorkWithoutReadyWithPanic(t * testing.T){
func TestWorkWithoutReadyWithPanic(t *testing.T) {
other_worker := New(Unlimited)
timeout := make(chan bool, 1)
done := make( chan bool, 1)
done := make(chan bool, 1)

// Going to work with no worker setup.
// when Work (hopefully) calls Ready it will get an error which should cause it to panic()
go func(){
go func() {
defer func() {
if err := recover(); err != nil {
done <- true
@@ -209,17 +209,17 @@ func TestWorkWithoutReadyWithPanic(t * testing.T){
t.Error("Work should raise a panic.")
done <- true
}()
other_worker.Work();
other_worker.Work()
}()
go func() {
time.Sleep(2 * time.Second)
timeout <- true
}()

select{
case <- timeout:
select {
case <-timeout:
t.Error("Test timed out waiting for the worker")
case <- done:
case <-done:
}

}

Loading…
Откажи
Сачувај