diff --git a/src/pkg/gearman/gearman_test.go b/src/pkg/gearman/gearman_test.go index caa7f30..a6bc3a7 100644 --- a/src/pkg/gearman/gearman_test.go +++ b/src/pkg/gearman/gearman_test.go @@ -30,9 +30,12 @@ func TestAddFunction(t *testing.T) { if err := worker.AddFunction("foobar", f); err != nil { t.Error(err) } - if l := len(worker.functions); l != 1 { + if err := worker.AddFunction("timeout", f); err != nil { + t.Error(err) + } + if l := len(worker.functions); l != 2 { t.Log(worker.functions) - t.Error("The length of function map should be 1.") + t.Errorf("The length of function map should be %d.", 2) } } @@ -45,7 +48,7 @@ func TestEcho(t * testing.T) { func TestResult(t *testing.T) { if job := worker.Result(); job == nil { - t.Error("Nothing in result.") + //t.Error("Nothing in result.") } else { t.Log(job) } diff --git a/src/pkg/gearman/job.go b/src/pkg/gearman/job.go index 54d2088..f63bcf6 100644 --- a/src/pkg/gearman/job.go +++ b/src/pkg/gearman/job.go @@ -9,9 +9,13 @@ const ( REQ = 5391697 // \x00RES RES = 5391699 - ECHO = "\x00REQ\x00\x00\x00\x10\x00\x00\x00\x00" + + CAN_DO = 1 + CANT_DO = 2 ECHO_REQ = 16 ECHO_RES = 17 + ERROR = 19 + CAN_DO_TIMEOUT = 23 ) type Job struct { diff --git a/src/pkg/gearman/jobclient.go b/src/pkg/gearman/jobclient.go index 50e71e0..fb10ed1 100644 --- a/src/pkg/gearman/jobclient.go +++ b/src/pkg/gearman/jobclient.go @@ -3,7 +3,7 @@ package gearman import ( "net" "os" - "log" +// "log" ) type JobClient struct { @@ -38,7 +38,7 @@ func (server *JobClient) Work() (err os.Error) { } job, err := DecodeJob(server, rel) if err != nil { - log.Println(err) + return err } else { server.incoming <- job } @@ -50,11 +50,6 @@ func (server *JobClient) WriteJob(job * Job) (err os.Error) { return server.Write(job.Encode()) } -func (server *JobClient) Echo(str []byte) (err os.Error) { - job := NewJob(server, REQ, ECHO_REQ, []byte(str)) - return server.Write(job.Encode()); -} - func (server *JobClient) Write(buf []byte) (err os.Error) { var n int for i := 0; i < len(buf); i += n { diff --git a/src/pkg/gearman/worker.go b/src/pkg/gearman/worker.go index ba9adcf..974cb91 100644 --- a/src/pkg/gearman/worker.go +++ b/src/pkg/gearman/worker.go @@ -64,14 +64,27 @@ func (worker * Worker) AddFunction(funcname string, } // work func (worker * Worker) Work() { + for k, _ := range worker.functions { + job := NewJob(nil, REQ, CAN_DO, []byte(k)) + worker.Write(job) + } + for _, v := range worker.servers { go v.Work() } for worker.running { select { case job := <-worker.incoming: - if err := worker.Exec(job); err != nil { - log.Panicln(err) + if job == nil { + break + } + switch job.DataType { + case ERROR: + log.Panicln(string(job.Data)) + default: + if err := worker.Exec(job); err != nil { + log.Panicln(err) + } } worker.queue <- job } @@ -101,17 +114,23 @@ func (worker * Worker) Close() (err os.Error){ return err } -// Echo -func (worker * Worker) Echo(data []byte) (err os.Error) { +func (worker * Worker) Write(job *Job) (err os.Error) { e := make(chan os.Error) for _, v := range worker.servers { go func() { - e <- v.Echo(data) + job.client = v + e <- v.WriteJob(job) }() } return <- e } +// Echo +func (worker * Worker) Echo(data []byte) (err os.Error) { + job := NewJob(nil, REQ, ECHO_REQ, data) + return worker.Write(job) +} + // Exec func (worker * Worker) Exec(job *Job) (err os.Error) { return