Added worker.Write

This commit is contained in:
mikespook 2011-05-17 20:32:36 +08:00
parent f1d7cdee50
commit ed5d1951ef
4 changed files with 37 additions and 16 deletions

View File

@ -30,9 +30,12 @@ func TestAddFunction(t *testing.T) {
if err := worker.AddFunction("foobar", f); err != nil { if err := worker.AddFunction("foobar", f); err != nil {
t.Error(err) t.Error(err)
} }
if l := len(worker.functions); l != 1 { if err := worker.AddFunction("timeout", f); err != nil {
t.Error(err)
}
if l := len(worker.functions); l != 2 {
t.Log(worker.functions) t.Log(worker.functions)
t.Error("The length of function map should be 1.") t.Errorf("The length of function map should be %d.", 2)
} }
} }
@ -45,7 +48,7 @@ func TestEcho(t * testing.T) {
func TestResult(t *testing.T) { func TestResult(t *testing.T) {
if job := worker.Result(); job == nil { if job := worker.Result(); job == nil {
t.Error("Nothing in result.") //t.Error("Nothing in result.")
} else { } else {
t.Log(job) t.Log(job)
} }

View File

@ -9,9 +9,13 @@ const (
REQ = 5391697 REQ = 5391697
// \x00RES // \x00RES
RES = 5391699 RES = 5391699
ECHO = "\x00REQ\x00\x00\x00\x10\x00\x00\x00\x00"
CAN_DO = 1
CANT_DO = 2
ECHO_REQ = 16 ECHO_REQ = 16
ECHO_RES = 17 ECHO_RES = 17
ERROR = 19
CAN_DO_TIMEOUT = 23
) )
type Job struct { type Job struct {

View File

@ -3,7 +3,7 @@ package gearman
import ( import (
"net" "net"
"os" "os"
"log" // "log"
) )
type JobClient struct { type JobClient struct {
@ -38,7 +38,7 @@ func (server *JobClient) Work() (err os.Error) {
} }
job, err := DecodeJob(server, rel) job, err := DecodeJob(server, rel)
if err != nil { if err != nil {
log.Println(err) return err
} else { } else {
server.incoming <- job server.incoming <- job
} }
@ -50,11 +50,6 @@ func (server *JobClient) WriteJob(job * Job) (err os.Error) {
return server.Write(job.Encode()) return server.Write(job.Encode())
} }
func (server *JobClient) Echo(str []byte) (err os.Error) {
job := NewJob(server, REQ, ECHO_REQ, []byte(str))
return server.Write(job.Encode());
}
func (server *JobClient) Write(buf []byte) (err os.Error) { func (server *JobClient) Write(buf []byte) (err os.Error) {
var n int var n int
for i := 0; i < len(buf); i += n { for i := 0; i < len(buf); i += n {

View File

@ -64,15 +64,28 @@ func (worker * Worker) AddFunction(funcname string,
} }
// work // work
func (worker * Worker) Work() { func (worker * Worker) Work() {
for k, _ := range worker.functions {
job := NewJob(nil, REQ, CAN_DO, []byte(k))
worker.Write(job)
}
for _, v := range worker.servers { for _, v := range worker.servers {
go v.Work() go v.Work()
} }
for worker.running { for worker.running {
select { select {
case job := <-worker.incoming: case job := <-worker.incoming:
if job == nil {
break
}
switch job.DataType {
case ERROR:
log.Panicln(string(job.Data))
default:
if err := worker.Exec(job); err != nil { if err := worker.Exec(job); err != nil {
log.Panicln(err) log.Panicln(err)
} }
}
worker.queue <- job worker.queue <- job
} }
} }
@ -101,17 +114,23 @@ func (worker * Worker) Close() (err os.Error){
return err return err
} }
// Echo func (worker * Worker) Write(job *Job) (err os.Error) {
func (worker * Worker) Echo(data []byte) (err os.Error) {
e := make(chan os.Error) e := make(chan os.Error)
for _, v := range worker.servers { for _, v := range worker.servers {
go func() { go func() {
e <- v.Echo(data) job.client = v
e <- v.WriteJob(job)
}() }()
} }
return <- e return <- e
} }
// Echo
func (worker * Worker) Echo(data []byte) (err os.Error) {
job := NewJob(nil, REQ, ECHO_REQ, data)
return worker.Write(job)
}
// Exec // Exec
func (worker * Worker) Exec(job *Job) (err os.Error) { func (worker * Worker) Exec(job *Job) (err os.Error) {
return return