This commit is contained in:
mikespook 2012-05-28 10:36:08 +08:00
commit 3e7b5d6cf3
4 changed files with 14 additions and 4 deletions

View File

@ -22,7 +22,8 @@ var (
ErrOutOfCap = errors.New("Out of the capability.") ErrOutOfCap = errors.New("Out of the capability.")
ErrNotConn = errors.New("Did not connect to job server.") ErrNotConn = errors.New("Did not connect to job server.")
ErrFuncNotFound = errors.New("The function was not found.") ErrFuncNotFound = errors.New("The function was not found.")
ErrConnection = errors.New("Connection error.") ErrEmptyReading = errors.New("Empty reading.")
ErrNoActiveAgent = errors.New("No active agent.")
) )
func DisablePanic() {recover()} func DisablePanic() {recover()}

View File

@ -19,6 +19,7 @@ func main() {
log.Println("Starting ...") log.Println("Starting ...")
defer log.Println("Shutdown complete!") defer log.Println("Shutdown complete!")
w := worker.New(worker.Unlimited) w := worker.New(worker.Unlimited)
defer w.Close()
w.ErrHandler = func(e error) { w.ErrHandler = func(e error) {
log.Println(e) log.Println(e)
} }

View File

@ -48,7 +48,7 @@ func (a *agent) outLoop() {
// inputing loop // inputing loop
func (a *agent) inLoop() { func (a *agent) inLoop() {
defer func() { defer func() {
a.conn.Close() recover()
close(a.in) close(a.in)
close(a.out) close(a.out)
a.worker.removeAgent(a) a.worker.removeAgent(a)
@ -86,6 +86,10 @@ func (a *agent) inLoop() {
} }
} }
func (a *agent) Close() {
a.conn.Close()
}
func (a *agent) Work() { func (a *agent) Work() {
go a.outLoop() go a.outLoop()
go a.inLoop() go a.inLoop()

View File

@ -159,6 +159,9 @@ func (worker *Worker) removeFunc(funcname string) {
func (worker *Worker) Work() { func (worker *Worker) Work() {
defer func() { defer func() {
worker.running = false worker.running = false
for _, v := range worker.agents {
v.Close()
}
}() }()
for funcname, f := range worker.funcs { for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout) worker.addFunc(funcname, f.timeout)
@ -168,8 +171,9 @@ func (worker *Worker) Work() {
go v.Work() go v.Work()
} }
ok := true ok := true
var job *Job
for ok { for ok {
if job, ok := <-worker.in; ok { if job, ok = <-worker.in; ok {
switch job.DataType { switch job.DataType {
case common.ERROR: case common.ERROR:
go func() { go func() {
@ -280,6 +284,6 @@ func (worker *Worker) removeAgent(a *agent) {
} }
} }
if len(worker.agents) == 0 { if len(worker.agents) == 0 {
worker.Close() worker.err(common.ErrNoActiveAgent)
} }
} }