Cope with io.EOF as a disconnect error

This commit is contained in:
Joe Higton 2014-06-10 03:46:21 +01:00
parent 1ebb3d5fcc
commit 09c626f488
3 changed files with 19 additions and 14 deletions

View File

@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"net" "net"
"sync" "sync"
"io"
) )
// The agent of job server. // The agent of job server.
@ -57,20 +58,14 @@ func (a *agent) work() {
if opErr.Temporary() { if opErr.Temporary() {
continue continue
}else{ }else{
a.Lock() a.disconnect_error(err)
if( a.conn != nil ){
a.Unlock()
err = &WorkerDisconnectError{
OpError : opErr,
agent : a,
}
a.worker.err(err)
}
// else - we're probably dc'ing due to a Close() // else - we're probably dc'ing due to a Close()
break break
} }
} else if( err == io.EOF ){
a.disconnect_error(err)
} }
a.worker.err(err) a.worker.err(err)
// If it is unexpected error and the connection wasn't // If it is unexpected error and the connection wasn't
@ -106,6 +101,16 @@ func (a *agent) work() {
} }
} }
func (a * agent) disconnect_error( err error ){
if( a.conn != nil ){
err = &WorkerDisconnectError{
err : err,
agent : a,
}
a.worker.err(err)
}
}
func (a *agent) Close() { func (a *agent) Close() {
a.Lock() a.Lock()
defer a.Unlock() defer a.Unlock()

View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"sync" "sync"
"time" "time"
"net"
) )
const ( const (
@ -335,9 +334,13 @@ func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
// Error type passed when a worker connection disconnects // Error type passed when a worker connection disconnects
type WorkerDisconnectError struct{ type WorkerDisconnectError struct{
*net.OpError err error
agent * agent agent * agent
} }
func (e *WorkerDisconnectError) Error() ( string){
return e.err.Error();
}
// Responds to the error by asking the worker to reconnect // Responds to the error by asking the worker to reconnect
func (e *WorkerDisconnectError) Reconnect() ( err error ){ func (e *WorkerDisconnectError) Reconnect() ( err error ){
return e.agent.reconnect() return e.agent.reconnect()

View File

@ -36,7 +36,6 @@ func run_gearman() {
// Make sure we clear up our gearman: // Make sure we clear up our gearman:
defer func() { defer func() {
log.Println("killing gearmand")
gm_cmd.Process.Kill() gm_cmd.Process.Kill()
}() }()
@ -58,7 +57,6 @@ func run_gearman() {
func check_gearman_present() bool { func check_gearman_present() bool {
con, err := net.Dial(`tcp`, `127.0.0.1:`+port) con, err := net.Dial(`tcp`, `127.0.0.1:`+port)
if err != nil { if err != nil {
log.Println("gearman not ready " + err.Error())
return false return false
} }
con.Close() con.Close()
@ -233,7 +231,6 @@ func TestDcRc(t *testing.T) {
} }
func send_client_request() { func send_client_request() {
log.Println("sending client request")
c, err := client.New(Network, "127.0.0.1:"+port) c, err := client.New(Network, "127.0.0.1:"+port)
if err == nil { if err == nil {
_, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh)