forked from yuxh/gearman-go
Job server can echo sth. to worker now.
This commit is contained in:
parent
364744ff92
commit
dcd42b5c91
@ -7,6 +7,8 @@ include $(GOROOT)/src/Make.inc
|
|||||||
TARG=gearman
|
TARG=gearman
|
||||||
GOFILES=\
|
GOFILES=\
|
||||||
gearman.go\
|
gearman.go\
|
||||||
|
job.go\
|
||||||
|
jobclient.go\
|
||||||
worker.go\
|
worker.go\
|
||||||
# client.go\
|
# client.go\
|
||||||
|
|
||||||
|
@ -3,4 +3,7 @@ package gearman
|
|||||||
const (
|
const (
|
||||||
TCP = "tcp4"
|
TCP = "tcp4"
|
||||||
WORKER_SERVER_CAP = 32
|
WORKER_SERVER_CAP = 32
|
||||||
|
WORKER_FUNCTION_CAP = 512
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -4,8 +4,13 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var worker *Worker
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
worker = NewWorker()
|
||||||
|
}
|
||||||
|
|
||||||
func TestAddServer(t *testing.T) {
|
func TestAddServer(t *testing.T) {
|
||||||
worker := NewWorker()
|
|
||||||
t.Log("Add local server 127.0.0.1:4730.")
|
t.Log("Add local server 127.0.0.1:4730.")
|
||||||
if err := worker.AddServer("127.0.0.1:4730"); err != nil {
|
if err := worker.AddServer("127.0.0.1:4730"); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -16,3 +21,30 @@ func TestAddServer(t *testing.T) {
|
|||||||
t.Error("The length of server list should be 1.")
|
t.Error("The length of server list should be 1.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAddFunction(t *testing.T) {
|
||||||
|
f := func(job *Job) []byte {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := worker.AddFunction("foobar", f); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if l := len(worker.functions); l != 1 {
|
||||||
|
t.Log(worker.functions)
|
||||||
|
t.Error("The length of function map should be 1.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEcho(t * testing.T) {
|
||||||
|
go worker.Work()
|
||||||
|
if err := worker.Echo([]byte("Hello World")); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClose(t *testing.T) {
|
||||||
|
if err := worker.Close(); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,54 +1,118 @@
|
|||||||
package gearman
|
package gearman
|
||||||
|
|
||||||
import(
|
import(
|
||||||
"net"
|
|
||||||
"os"
|
"os"
|
||||||
|
"sync"
|
||||||
|
"log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Worker struct {
|
type JobFunction func(job *Job) []byte
|
||||||
|
|
||||||
servers []net.Conn
|
type Worker struct {
|
||||||
|
servers []*JobClient
|
||||||
|
functions map[string]JobFunction
|
||||||
|
|
||||||
|
running bool
|
||||||
|
incoming chan *Job
|
||||||
|
mutex sync.Mutex
|
||||||
|
queue chan *Job
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWorker() (worker *Worker) {
|
func NewWorker() (worker *Worker) {
|
||||||
worker = &Worker{servers:make([]net.Conn, 0, WORKER_SERVER_CAP)}
|
worker = &Worker{servers:make([]*JobClient, 0, WORKER_SERVER_CAP),
|
||||||
|
functions: make(map[string]JobFunction),
|
||||||
|
incoming: make(chan *Job, 512),
|
||||||
|
queue: make(chan *Job, 512),
|
||||||
|
running: true,}
|
||||||
return worker
|
return worker
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// add server
|
// add server
|
||||||
// worker.AddServer("127.0.0.1:4730")
|
// worker.AddServer("127.0.0.1:4730")
|
||||||
func (worker * Worker) AddServer(addr string) (err os.Error) {
|
func (worker * Worker) AddServer(addr string) (err os.Error) {
|
||||||
|
worker.mutex.Lock()
|
||||||
|
defer worker.mutex.Unlock()
|
||||||
|
|
||||||
if len(worker.servers) == cap(worker.servers) {
|
if len(worker.servers) == cap(worker.servers) {
|
||||||
return os.NewError("There were too many servers.")
|
return os.NewError("There were too many servers.")
|
||||||
}
|
}
|
||||||
conn, err := net.Dial(TCP, addr)
|
|
||||||
|
// Create a new job server's client as a agent of server
|
||||||
|
server, err := NewJobClient(addr, worker.incoming)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
n := len(worker.servers)
|
n := len(worker.servers)
|
||||||
worker.servers = worker.servers[0: n + 1]
|
worker.servers = worker.servers[0: n + 1]
|
||||||
worker.servers[n] = conn
|
worker.servers[n] = server
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
|
|
||||||
// add function
|
// add function
|
||||||
func (worker * Worker) AddFunction(funcname string,
|
func (worker * Worker) AddFunction(funcname string,
|
||||||
f interface{}, context interface{}) (err Error) {
|
f JobFunction) (err os.Error) {
|
||||||
|
worker.mutex.Lock()
|
||||||
|
defer worker.mutex.Unlock()
|
||||||
|
|
||||||
|
if f == nil {
|
||||||
|
return os.NewError("Job function should not be nil.")
|
||||||
|
}
|
||||||
|
worker.functions[funcname] = f
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// work
|
// work
|
||||||
func (worker * GearmanWorker) Work() {
|
func (worker * Worker) Work() {
|
||||||
for {
|
for _, v := range worker.servers {
|
||||||
|
go v.Work()
|
||||||
}
|
}
|
||||||
|
for worker.running {
|
||||||
|
select {
|
||||||
|
case job := <-worker.incoming:
|
||||||
|
if err := worker.Exec(job); err != nil {
|
||||||
|
log.Panicln(err)
|
||||||
|
}
|
||||||
|
worker.queue <- job
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (worker * Worker) Result() (job *Job) {
|
||||||
|
if l := len(worker.queue); l != 1 {
|
||||||
|
if l == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for i := 0; i < l - 1; i ++ {
|
||||||
|
<-worker.queue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return <-worker.queue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close
|
// Close
|
||||||
// should used as defer
|
// should used as defer
|
||||||
func (worker * GearmanWorker) Close() (err Error){
|
func (worker * Worker) Close() (err os.Error){
|
||||||
|
worker.running = false
|
||||||
|
for _, v := range worker.servers {
|
||||||
|
err = v.Close()
|
||||||
|
}
|
||||||
|
close(worker.incoming)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Echo
|
||||||
|
func (worker * Worker) Echo(data []byte) (err os.Error) {
|
||||||
|
e := make(chan os.Error)
|
||||||
|
for _, v := range worker.servers {
|
||||||
|
go func() {
|
||||||
|
e <- v.Echo(data)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
return <- e
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exec
|
||||||
|
func (worker * Worker) Exec(job *Job) (err os.Error) {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
Loading…
Reference in New Issue
Block a user