grab, no_job, pre_sleep, noop

This commit is contained in:
Xing Xing 2013-12-23 17:01:01 +08:00
parent a33a6cde32
commit 2b4cc002d1
3 changed files with 51 additions and 10 deletions

View File

@ -72,7 +72,21 @@ func (a *agent) Work() {
} }
func (a *agent) Close() { func (a *agent) Close() {
a.conn.Close() if a.conn != nil {
a.conn.Close()
}
}
func (a *agent) Grab() {
outpack := getOutPack()
outpack.dataType = GRAB_JOB_UNIQ
a.write(outpack)
}
func (a *agent) PreSleep() {
outpack := getOutPack()
outpack.dataType = PRE_SLEEP
a.write(outpack)
} }
// read length bytes from the socket // read length bytes from the socket

View File

@ -154,6 +154,10 @@ func (worker *Worker) handleInPack(inpack *inPack) {
} }
}() }()
switch inpack.dataType { switch inpack.dataType {
case NO_JOB:
inpack.a.PreSleep()
case NOOP:
inpack.a.Grab()
case ERROR: case ERROR:
worker.err(GetError(inpack.data)) worker.err(GetError(inpack.data))
case JOB_ASSIGN, JOB_ASSIGN_UNIQ: case JOB_ASSIGN, JOB_ASSIGN_UNIQ:
@ -165,6 +169,20 @@ func (worker *Worker) handleInPack(inpack *inPack) {
} }
} }
func (worker *Worker) Ready() (err error) {
for _, v := range worker.agents {
if err = v.Connect(); err != nil {
return
}
go v.Work()
}
worker.Reset()
for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout)
}
return
}
// Main loop // Main loop
func (worker *Worker) Work() { func (worker *Worker) Work() {
defer func() { defer func() {
@ -173,14 +191,6 @@ func (worker *Worker) Work() {
} }
}() }()
worker.running = true worker.running = true
for _, v := range worker.agents {
v.Connect()
go v.Work()
}
worker.Reset()
for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout)
}
var inpack *inPack var inpack *inPack
for inpack = range worker.in { for inpack = range worker.in {
go worker.handleInPack(inpack) go worker.handleInPack(inpack)
@ -267,6 +277,7 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
outpack.data = r.data outpack.data = r.data
if worker.running { if worker.running {
inpack.a.write(outpack) inpack.a.write(outpack)
inpack.a.Grab()
} }
return return
} }

View File

@ -1,6 +1,9 @@
package worker package worker
import "testing" import (
"sync"
"testing"
)
var worker *Worker var worker *Worker
@ -44,7 +47,20 @@ func TestWorkerRemoveFunc(t *testing.T) {
} }
func TestWork(t *testing.T) { func TestWork(t *testing.T) {
var wg sync.WaitGroup
worker.JobHandler = func(job Job) error {
t.Logf("%s", job.Data())
wg.Done()
return nil
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work() go worker.Work()
wg.Add(1)
worker.Echo([]byte("Hello"))
wg.Wait()
} }
func TestWorkerClose(t *testing.T) { func TestWorkerClose(t *testing.T) {