diff --git a/worker/agent.go b/worker/agent.go index 7a9f70d..7ca3b44 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -72,7 +72,21 @@ func (a *agent) Work() { } func (a *agent) Close() { - a.conn.Close() + if a.conn != nil { + a.conn.Close() + } +} + +func (a *agent) Grab() { + outpack := getOutPack() + outpack.dataType = GRAB_JOB_UNIQ + a.write(outpack) +} + +func (a *agent) PreSleep() { + outpack := getOutPack() + outpack.dataType = PRE_SLEEP + a.write(outpack) } // read length bytes from the socket diff --git a/worker/worker.go b/worker/worker.go index 6b935e7..cd46657 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -154,6 +154,10 @@ func (worker *Worker) handleInPack(inpack *inPack) { } }() switch inpack.dataType { + case NO_JOB: + inpack.a.PreSleep() + case NOOP: + inpack.a.Grab() case ERROR: worker.err(GetError(inpack.data)) case JOB_ASSIGN, JOB_ASSIGN_UNIQ: @@ -165,6 +169,20 @@ func (worker *Worker) handleInPack(inpack *inPack) { } } +func (worker *Worker) Ready() (err error) { + for _, v := range worker.agents { + if err = v.Connect(); err != nil { + return + } + go v.Work() + } + worker.Reset() + for funcname, f := range worker.funcs { + worker.addFunc(funcname, f.timeout) + } + return +} + // Main loop func (worker *Worker) Work() { defer func() { @@ -173,14 +191,6 @@ func (worker *Worker) Work() { } }() worker.running = true - for _, v := range worker.agents { - v.Connect() - go v.Work() - } - worker.Reset() - for funcname, f := range worker.funcs { - worker.addFunc(funcname, f.timeout) - } var inpack *inPack for inpack = range worker.in { go worker.handleInPack(inpack) @@ -267,6 +277,7 @@ func (worker *Worker) exec(inpack *inPack) (err error) { outpack.data = r.data if worker.running { inpack.a.write(outpack) + inpack.a.Grab() } return } diff --git a/worker/worker_test.go b/worker/worker_test.go index c875ff7..911aa9b 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -1,6 +1,9 @@ package worker -import "testing" +import ( +"sync" +"testing" +) var worker *Worker @@ -44,7 +47,20 @@ func TestWorkerRemoveFunc(t *testing.T) { } func TestWork(t *testing.T) { + var wg sync.WaitGroup + worker.JobHandler = func(job Job) error { + t.Logf("%s", job.Data()) + wg.Done() + return nil + } + if err := worker.Ready(); err != nil { + t.Error(err) + return + } go worker.Work() + wg.Add(1) + worker.Echo([]byte("Hello")) + wg.Wait() } func TestWorkerClose(t *testing.T) {