From dcd42b5c91deecdae8bccd39cb5af0fbf5305ac6 Mon Sep 17 00:00:00 2001 From: mikespook Date: Tue, 17 May 2011 18:12:40 +0800 Subject: [PATCH] Job server can echo sth. to worker now. --- src/pkg/gearman/Makefile | 2 + src/pkg/gearman/gearman.go | 3 + src/pkg/gearman/gearman_test.go | 34 ++++++++++- src/pkg/gearman/worker.go | 100 ++++++++++++++++++++++++++------ 4 files changed, 120 insertions(+), 19 deletions(-) diff --git a/src/pkg/gearman/Makefile b/src/pkg/gearman/Makefile index c6b451d..988fe7b 100644 --- a/src/pkg/gearman/Makefile +++ b/src/pkg/gearman/Makefile @@ -7,6 +7,8 @@ include $(GOROOT)/src/Make.inc TARG=gearman GOFILES=\ gearman.go\ + job.go\ + jobclient.go\ worker.go\ # client.go\ diff --git a/src/pkg/gearman/gearman.go b/src/pkg/gearman/gearman.go index 1740203..18a7cfe 100644 --- a/src/pkg/gearman/gearman.go +++ b/src/pkg/gearman/gearman.go @@ -3,4 +3,7 @@ package gearman const ( TCP = "tcp4" WORKER_SERVER_CAP = 32 + WORKER_FUNCTION_CAP = 512 ) + + diff --git a/src/pkg/gearman/gearman_test.go b/src/pkg/gearman/gearman_test.go index de669cb..e00f7df 100644 --- a/src/pkg/gearman/gearman_test.go +++ b/src/pkg/gearman/gearman_test.go @@ -4,8 +4,13 @@ import ( "testing" ) +var worker *Worker + +func init() { + worker = NewWorker() +} + func TestAddServer(t *testing.T) { - worker := NewWorker() t.Log("Add local server 127.0.0.1:4730.") if err := worker.AddServer("127.0.0.1:4730"); err != nil { t.Error(err) @@ -16,3 +21,30 @@ func TestAddServer(t *testing.T) { t.Error("The length of server list should be 1.") } } + +func TestAddFunction(t *testing.T) { + f := func(job *Job) []byte { + return nil + } + + if err := worker.AddFunction("foobar", f); err != nil { + t.Error(err) + } + if l := len(worker.functions); l != 1 { + t.Log(worker.functions) + t.Error("The length of function map should be 1.") + } +} + +func TestEcho(t * testing.T) { + go worker.Work() + if err := worker.Echo([]byte("Hello World")); err != nil { + t.Error(err) + } +} + +func TestClose(t *testing.T) { + if err := worker.Close(); err != nil { + t.Error(err) + } +} diff --git a/src/pkg/gearman/worker.go b/src/pkg/gearman/worker.go index 810c2b8..ba9adcf 100644 --- a/src/pkg/gearman/worker.go +++ b/src/pkg/gearman/worker.go @@ -1,54 +1,118 @@ package gearman import( - "net" "os" + "sync" + "log" ) -type Worker struct { +type JobFunction func(job *Job) []byte - servers []net.Conn +type Worker struct { + servers []*JobClient + functions map[string]JobFunction + + running bool + incoming chan *Job + mutex sync.Mutex + queue chan *Job } func NewWorker() (worker *Worker) { - worker = &Worker{servers:make([]net.Conn, 0, WORKER_SERVER_CAP)} + worker = &Worker{servers:make([]*JobClient, 0, WORKER_SERVER_CAP), + functions: make(map[string]JobFunction), + incoming: make(chan *Job, 512), + queue: make(chan *Job, 512), + running: true,} return worker } - // add server // worker.AddServer("127.0.0.1:4730") func (worker * Worker) AddServer(addr string) (err os.Error) { + worker.mutex.Lock() + defer worker.mutex.Unlock() + if len(worker.servers) == cap(worker.servers) { return os.NewError("There were too many servers.") } - conn, err := net.Dial(TCP, addr) + + // Create a new job server's client as a agent of server + server, err := NewJobClient(addr, worker.incoming) if err != nil { return err } + n := len(worker.servers) worker.servers = worker.servers[0: n + 1] - worker.servers[n] = conn - return nil + worker.servers[n] = server + return } -/* + // add function -func (worker * Worker) AddFunction(funcname string, - f interface{}, context interface{}) (err Error) { +func (worker * Worker) AddFunction(funcname string, + f JobFunction) (err os.Error) { + worker.mutex.Lock() + defer worker.mutex.Unlock() + if f == nil { + return os.NewError("Job function should not be nil.") + } + worker.functions[funcname] = f + return +} +// work +func (worker * Worker) Work() { + for _, v := range worker.servers { + go v.Work() + } + for worker.running { + select { + case job := <-worker.incoming: + if err := worker.Exec(job); err != nil { + log.Panicln(err) + } + worker.queue <- job + } + } } -// work -func (worker * GearmanWorker) Work() { - for { - +func (worker * Worker) Result() (job *Job) { + if l := len(worker.queue); l != 1 { + if l == 0 { + return + } + for i := 0; i < l - 1; i ++ { + <-worker.queue + } } + return <-worker.queue } // Close // should used as defer -func (worker * GearmanWorker) Close() (err Error){ - +func (worker * Worker) Close() (err os.Error){ + worker.running = false + for _, v := range worker.servers { + err = v.Close() + } + close(worker.incoming) + return err +} + +// Echo +func (worker * Worker) Echo(data []byte) (err os.Error) { + e := make(chan os.Error) + for _, v := range worker.servers { + go func() { + e <- v.Echo(data) + }() + } + return <- e +} + +// Exec +func (worker * Worker) Exec(job *Job) (err os.Error) { + return } -*/