fixed the closing method

This commit is contained in:
Xing Xing 2015-01-06 11:34:39 +08:00
parent d32eb195e1
commit a003eac543

View File

@ -22,7 +22,7 @@ type Worker struct {
funcs jobFuncs funcs jobFuncs
in chan *inPack in chan *inPack
running bool running bool
ready bool ready bool
Id string Id string
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
@ -30,7 +30,6 @@ type Worker struct {
limit chan bool limit chan bool
} }
// Return a worker. // Return a worker.
// //
// If limit is set to Unlimited(=0), the worker will grab all jobs // If limit is set to Unlimited(=0), the worker will grab all jobs
@ -95,11 +94,11 @@ func (worker *Worker) AddFunc(funcname string,
// inner add // inner add
func (worker *Worker) addFunc(funcname string, timeout uint32) { func (worker *Worker) addFunc(funcname string, timeout uint32) {
outpack := prepFuncOutpack( funcname, timeout) outpack := prepFuncOutpack(funcname, timeout)
worker.broadcast(outpack) worker.broadcast(outpack)
} }
func prepFuncOutpack(funcname string, timeout uint32) (*outPack){ func prepFuncOutpack(funcname string, timeout uint32) *outPack {
outpack := getOutPack() outpack := getOutPack()
if timeout == 0 { if timeout == 0 {
outpack.dataType = dtCanDo outpack.dataType = dtCanDo
@ -188,19 +187,14 @@ func (worker *Worker) Ready() (err error) {
// Main loop, block here // Main loop, block here
// Most of time, this should be evaluated in goroutine. // Most of time, this should be evaluated in goroutine.
func (worker *Worker) Work() { func (worker *Worker) Work() {
if ! worker.ready { if !worker.ready {
// didn't run Ready beforehand, so we'll have to do it: // didn't run Ready beforehand, so we'll have to do it:
err := worker.Ready() err := worker.Ready()
if err != nil { if err != nil {
panic( err ) panic(err)
} }
} }
defer func() {
for _, a := range worker.agents {
a.Close()
}
}()
worker.running = true worker.running = true
for _, a := range worker.agents { for _, a := range worker.agents {
a.Grab() a.Grab()
@ -223,8 +217,11 @@ func (worker *Worker) customeHandler(inpack *inPack) {
// Close connection and exit main loop // Close connection and exit main loop
func (worker *Worker) Close() { func (worker *Worker) Close() {
worker.Lock() worker.Lock()
worker.Unlock() defer worker.Unlock()
if worker.running == true { if worker.running == true {
for _, a := range worker.agents {
a.Close()
}
worker.running = false worker.running = false
close(worker.in) close(worker.in)
} }
@ -299,11 +296,11 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
} }
return return
} }
func (worker *Worker)reRegisterFuncsForAgent( a * agent ){ func (worker *Worker) reRegisterFuncsForAgent(a *agent) {
worker.Lock() worker.Lock()
defer worker.Unlock() defer worker.Unlock()
for funcname, f := range worker.funcs { for funcname, f := range worker.funcs {
outpack := prepFuncOutpack( funcname, f.timeout) outpack := prepFuncOutpack(funcname, f.timeout)
a.write(outpack) a.write(outpack)
} }
@ -333,19 +330,21 @@ func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
} }
// Error type passed when a worker connection disconnects // Error type passed when a worker connection disconnects
type WorkerDisconnectError struct{ type WorkerDisconnectError struct {
err error err error
agent * agent agent *agent
} }
func (e *WorkerDisconnectError) Error() ( string){
return e.err.Error(); func (e *WorkerDisconnectError) Error() string {
return e.err.Error()
} }
// Responds to the error by asking the worker to reconnect // Responds to the error by asking the worker to reconnect
func (e *WorkerDisconnectError) Reconnect() ( err error ){ func (e *WorkerDisconnectError) Reconnect() (err error) {
return e.agent.reconnect() return e.agent.reconnect()
} }
// Which server was this for? // Which server was this for?
func(e *WorkerDisconnectError) Server() ( net string, addr string ){ func (e *WorkerDisconnectError) Server() (net string, addr string) {
return e.agent.net, e.agent.addr return e.agent.net, e.agent.addr
} }