From 2b4cc002d12ca4bcfd5c825418c9b2b90c6f4da6 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Mon, 23 Dec 2013 17:01:01 +0800 Subject: [PATCH] grab, no_job, pre_sleep, noop --- worker/agent.go | 16 +++++++++++++++- worker/worker.go | 27 +++++++++++++++++++-------- worker/worker_test.go | 18 +++++++++++++++++- 3 files changed, 51 insertions(+), 10 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 7a9f70d..7ca3b44 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -72,7 +72,21 @@ func (a *agent) Work() { } func (a *agent) Close() { - a.conn.Close() + if a.conn != nil { + a.conn.Close() + } +} + +func (a *agent) Grab() { + outpack := getOutPack() + outpack.dataType = GRAB_JOB_UNIQ + a.write(outpack) +} + +func (a *agent) PreSleep() { + outpack := getOutPack() + outpack.dataType = PRE_SLEEP + a.write(outpack) } // read length bytes from the socket diff --git a/worker/worker.go b/worker/worker.go index 6b935e7..cd46657 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -154,6 +154,10 @@ func (worker *Worker) handleInPack(inpack *inPack) { } }() switch inpack.dataType { + case NO_JOB: + inpack.a.PreSleep() + case NOOP: + inpack.a.Grab() case ERROR: worker.err(GetError(inpack.data)) case JOB_ASSIGN, JOB_ASSIGN_UNIQ: @@ -165,6 +169,20 @@ func (worker *Worker) handleInPack(inpack *inPack) { } } +func (worker *Worker) Ready() (err error) { + for _, v := range worker.agents { + if err = v.Connect(); err != nil { + return + } + go v.Work() + } + worker.Reset() + for funcname, f := range worker.funcs { + worker.addFunc(funcname, f.timeout) + } + return +} + // Main loop func (worker *Worker) Work() { defer func() { @@ -173,14 +191,6 @@ func (worker *Worker) Work() { } }() worker.running = true - for _, v := range worker.agents { - v.Connect() - go v.Work() - } - worker.Reset() - for funcname, f := range worker.funcs { - worker.addFunc(funcname, f.timeout) - } var inpack *inPack for inpack = range worker.in { go worker.handleInPack(inpack) @@ -267,6 +277,7 @@ func (worker *Worker) exec(inpack *inPack) (err error) { outpack.data = r.data if worker.running { inpack.a.write(outpack) + inpack.a.Grab() } return } diff --git a/worker/worker_test.go b/worker/worker_test.go index c875ff7..911aa9b 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -1,6 +1,9 @@ package worker -import "testing" +import ( +"sync" +"testing" +) var worker *Worker @@ -44,7 +47,20 @@ func TestWorkerRemoveFunc(t *testing.T) { } func TestWork(t *testing.T) { + var wg sync.WaitGroup + worker.JobHandler = func(job Job) error { + t.Logf("%s", job.Data()) + wg.Done() + return nil + } + if err := worker.Ready(); err != nil { + t.Error(err) + return + } go worker.Work() + wg.Add(1) + worker.Echo([]byte("Hello")) + wg.Wait() } func TestWorkerClose(t *testing.T) {