From b7c858553c42493ca8023c1b5fdb0a8dfd90b652 Mon Sep 17 00:00:00 2001 From: mikespook Date: Wed, 30 May 2012 11:20:29 +0800 Subject: [PATCH] add reconnection to worker --- worker/agent.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/worker/agent.go b/worker/agent.go index c997878..74d62e4 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -16,6 +16,7 @@ type agent struct { worker *Worker in chan []byte out chan *Job + addr string } // Create the agent of job server. @@ -27,6 +28,7 @@ func newAgent(addr string, worker *Worker) (a *agent, err error) { a = &agent{ conn: conn, worker: worker, + addr: addr, in: make(chan []byte, common.QUEUE_SIZE), out: make(chan *Job, common.QUEUE_SIZE), } @@ -56,6 +58,7 @@ func (a *agent) inLoop() { }() noop := true for a.worker.running { + RESTART: // got noop msg and in queue is zero, grab job if noop && len(a.in) == 0 { a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) @@ -64,6 +67,16 @@ func (a *agent) inLoop() { if err != nil { if err == common.ErrConnection { // TODO: reconnection + for i:= 0; i < 3 && a.worker.running; i++ { + if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { + a.worker.err(common.Errorf("Reconnection: %d faild", i)) + continue + } else { + a.conn = conn + goto RESTART + } + } + a.worker.err(err) break } a.worker.err(err)