add reconnection to worker

This commit is contained in:
mikespook 2012-05-30 11:20:29 +08:00
parent 624c61519f
commit b7c858553c

View File

@ -16,6 +16,7 @@ type agent struct {
worker *Worker worker *Worker
in chan []byte in chan []byte
out chan *Job out chan *Job
addr string
} }
// Create the agent of job server. // Create the agent of job server.
@ -27,6 +28,7 @@ func newAgent(addr string, worker *Worker) (a *agent, err error) {
a = &agent{ a = &agent{
conn: conn, conn: conn,
worker: worker, worker: worker,
addr: addr,
in: make(chan []byte, common.QUEUE_SIZE), in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE), out: make(chan *Job, common.QUEUE_SIZE),
} }
@ -56,6 +58,7 @@ func (a *agent) inLoop() {
}() }()
noop := true noop := true
for a.worker.running { for a.worker.running {
RESTART:
// got noop msg and in queue is zero, grab job // got noop msg and in queue is zero, grab job
if noop && len(a.in) == 0 { if noop && len(a.in) == 0 {
a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil)) a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil))
@ -64,6 +67,16 @@ func (a *agent) inLoop() {
if err != nil { if err != nil {
if err == common.ErrConnection { if err == common.ErrConnection {
// TODO: reconnection // TODO: reconnection
for i:= 0; i < 3 && a.worker.running; i++ {
if conn, err := net.Dial(common.NETWORK, a.addr); err != nil {
a.worker.err(common.Errorf("Reconnection: %d faild", i))
continue
} else {
a.conn = conn
goto RESTART
}
}
a.worker.err(err)
break break
} }
a.worker.err(err) a.worker.err(err)