diff --git a/src/pkg/gearman/Makefile b/src/pkg/gearman/Makefile index 988fe7b..19ac949 100644 --- a/src/pkg/gearman/Makefile +++ b/src/pkg/gearman/Makefile @@ -7,8 +7,8 @@ include $(GOROOT)/src/Make.inc TARG=gearman GOFILES=\ gearman.go\ - job.go\ - jobclient.go\ + worker/job.go\ + worker/jobclient.go\ worker.go\ # client.go\ diff --git a/src/pkg/gearman/gearman.go b/src/pkg/gearman/gearman.go index 18a7cfe..dc8f3ca 100644 --- a/src/pkg/gearman/gearman.go +++ b/src/pkg/gearman/gearman.go @@ -4,6 +4,34 @@ const ( TCP = "tcp4" WORKER_SERVER_CAP = 32 WORKER_FUNCTION_CAP = 512 + + + // \x00REQ + REQ = 5391697 + // \x00RES + RES = 5391699 + + CAN_DO = 1 + CANT_DO = 2 + RESET_ABILITIES = 3 + PRE_SLEEP = 4 + NOOP = 6 + GRAB_JOB = 9 + NO_JOB = 10 + JOB_ASSIGN = 11 + WORK_STATUS = 12 + WORK_COMPLETE = 13 + WORK_FAIL = 14 + ECHO_REQ = 16 + ECHO_RES = 17 + ERROR = 19 + SET_CLIENT_ID = 22 + CAN_DO_TIMEOUT = 23 + WORK_EXCEPTION = 25 + WORK_DATA = 28 + WORK_WARNING = 29 + GRAB_JOB_UNIQ = 30 + JOB_ASSIGN_UNIQ = 31 ) diff --git a/src/pkg/gearman/gearman_test.go b/src/pkg/gearman/gearman_test.go index a6bc3a7..84aae20 100644 --- a/src/pkg/gearman/gearman_test.go +++ b/src/pkg/gearman/gearman_test.go @@ -2,6 +2,7 @@ package gearman import ( "testing" + "os" ) var worker *Worker @@ -16,21 +17,22 @@ func TestAddServer(t *testing.T) { t.Error(err) } - if l := len(worker.servers); l != 1 { - t.Log(worker.servers) + if l := len(worker.clients); l != 1 { + t.Log(worker.clients) t.Error("The length of server list should be 1.") } } -func TestAddFunction(t *testing.T) { - f := func(job *Job) []byte { - return nil - } +func foobar(job *Job) ([]byte, os.Error) { + return nil, nil +} - if err := worker.AddFunction("foobar", f); err != nil { + +func TestAddFunction(t *testing.T) { + if err := worker.AddFunction("foobar", foobar, 0); err != nil { t.Error(err) } - if err := worker.AddFunction("timeout", f); err != nil { + if err := worker.AddFunction("timeout", foobar, 5); err != nil { t.Error(err) } if l := len(worker.functions); l != 2 { @@ -40,19 +42,31 @@ func TestAddFunction(t *testing.T) { } func TestEcho(t * testing.T) { - go worker.Work() if err := worker.Echo([]byte("Hello World")); err != nil { t.Error(err) } } - +/* func TestResult(t *testing.T) { if job := worker.Result(); job == nil { - //t.Error("Nothing in result.") + t.Error("Nothing in result.") } else { t.Log(job) } } +*/ + +func TestRemoveFunction(t * testing.T) { + if err := worker.RemoveFunction("foobar"); err != nil { + t.Error(err) + } +} + +func TestReset(t * testing.T) { + if err := worker.Reset(); err != nil { + t.Error(err) + } +} func TestClose(t *testing.T) { if err := worker.Close(); err != nil { diff --git a/src/pkg/gearman/job.go b/src/pkg/gearman/job.go deleted file mode 100644 index f63bcf6..0000000 --- a/src/pkg/gearman/job.go +++ /dev/null @@ -1,79 +0,0 @@ -package gearman - -import ( - "os" -) - -const ( - // \x00REQ - REQ = 5391697 - // \x00RES - RES = 5391699 - - CAN_DO = 1 - CANT_DO = 2 - ECHO_REQ = 16 - ECHO_RES = 17 - ERROR = 19 - CAN_DO_TIMEOUT = 23 -) - -type Job struct { - client *JobClient - Data []byte - MagicCode, DataType uint32 -} - -func ByteToUint32(buf [4]byte) uint32 { - return uint32(buf[0]) << 24 + - uint32(buf[1]) << 16 + - uint32(buf[2]) << 8 + - uint32(buf[3]) -} - -func Uint32ToByte(i uint32) (data [4]byte) { - data[0] = byte((i >> 24) & 0xff) - data[1] = byte((i >> 16) & 0xff) - data[2] = byte((i >> 8) & 0xff) - data[3] = byte(i & 0xff) - return -} - -func NewJob(server *JobClient, magiccode, datatype uint32, data []byte) (job *Job) { - return &Job{client: server, - MagicCode:magiccode, - DataType: datatype, - Data:data} -} - -func DecodeJob(server *JobClient, data []byte) (job *Job, err os.Error) { - if len(data) < 12 { - return nil, os.NewError("Data length is too small.") - } - datatype := ByteToUint32([4]byte{data[4], data[5], data[6], data[7]}) - l := ByteToUint32([4]byte{data[8], data[9], data[10], data[11]}) - if len(data[12:]) != int(l) { - return nil, os.NewError("Invalid data length.") - } - switch(ByteToUint32([4]byte{data[4], data[5], data[6], data[7]})) { - case ECHO_RES: - data = data[12:] - } - return NewJob(server, REQ, datatype, data), err -} - -func (job *Job) Encode() (data []byte) { - magiccode := Uint32ToByte(job.MagicCode) - datatype := Uint32ToByte(job.DataType) - l := len(job.Data) - datalength := Uint32ToByte(uint32(l)) - data = make([]byte, 12 + l) - copy(data[:4], magiccode[:]) - copy(data[4:8], datatype[:]) - copy(data[8:12], datalength[:]) - copy(data[12:], job.Data) - return -} - - - diff --git a/src/pkg/gearman/jobclient.go b/src/pkg/gearman/jobclient.go deleted file mode 100644 index fb10ed1..0000000 --- a/src/pkg/gearman/jobclient.go +++ /dev/null @@ -1,68 +0,0 @@ -package gearman - -import ( - "net" - "os" -// "log" -) - -type JobClient struct { - conn net.Conn - incoming chan *Job - running bool -} - -func NewJobClient(addr string, incoming chan *Job) (jobclient *JobClient, err os.Error) { - conn, err := net.Dial(TCP, addr) - if err != nil { - return nil, err - } - jobclient = &JobClient{conn: conn, incoming: incoming, running:true} - return jobclient, err -} - -func (server *JobClient) Work() (err os.Error) { - for server.running { - var rel []byte - for { - buf := make([]byte, 2048) - n, err := server.conn.Read(buf) - if err != nil { - if err == os.EOF && n == 0 { - break - } - return err - } - rel = append(rel, buf[0: n] ...) - break - } - job, err := DecodeJob(server, rel) - if err != nil { - return err - } else { - server.incoming <- job - } - } - return -} - -func (server *JobClient) WriteJob(job * Job) (err os.Error) { - return server.Write(job.Encode()) -} - -func (server *JobClient) Write(buf []byte) (err os.Error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = server.conn.Write(buf[i:]) - if err != nil { - return err - } - } - return -} - -func (server *JobClient) Close() (err os.Error) { - server.running = false - err = server.conn.Close() - return -} diff --git a/src/pkg/gearman/worker.go b/src/pkg/gearman/worker.go index 974cb91..541e515 100644 --- a/src/pkg/gearman/worker.go +++ b/src/pkg/gearman/worker.go @@ -6,24 +6,29 @@ import( "log" ) -type JobFunction func(job *Job) []byte +type JobFunction func(job *Job) ([]byte, os.Error) +type JobFunctionMap map[string]JobFunction type Worker struct { - servers []*JobClient - functions map[string]JobFunction + clients []*jobClient + functions JobFunctionMap running bool incoming chan *Job mutex sync.Mutex - queue chan *Job + Queue chan *Job } func NewWorker() (worker *Worker) { - worker = &Worker{servers:make([]*JobClient, 0, WORKER_SERVER_CAP), - functions: make(map[string]JobFunction), + worker = &Worker{ + // job server list + clients:make([]*jobClient, 0, WORKER_SERVER_CAP), + // function list + functions: make(JobFunctionMap), incoming: make(chan *Job, 512), - queue: make(chan *Job, 512), - running: true,} + Queue: make(chan *Job, 512), + running: true, + } return worker } @@ -33,43 +38,69 @@ func (worker * Worker) AddServer(addr string) (err os.Error) { worker.mutex.Lock() defer worker.mutex.Unlock() - if len(worker.servers) == cap(worker.servers) { - return os.NewError("There were too many servers.") + if len(worker.clients) == cap(worker.clients) { + return os.NewError("There were too many clients.") } // Create a new job server's client as a agent of server - server, err := NewJobClient(addr, worker.incoming) + server, err := newJobClient(addr, worker.incoming) if err != nil { return err } - n := len(worker.servers) - worker.servers = worker.servers[0: n + 1] - worker.servers[n] = server + n := len(worker.clients) + worker.clients = worker.clients[0: n + 1] + worker.clients[n] = server return } // add function func (worker * Worker) AddFunction(funcname string, - f JobFunction) (err os.Error) { - worker.mutex.Lock() - defer worker.mutex.Unlock() - + f JobFunction, timeout uint32) (err os.Error) { if f == nil { return os.NewError("Job function should not be nil.") } + if len(worker.clients) < 1 { + return os.NewError("Did not connect to Job Server.") + } + worker.mutex.Lock() + defer worker.mutex.Unlock() worker.functions[funcname] = f + + var datatype uint32 + var data []byte + if timeout == 0 { + datatype = CAN_DO + data = []byte(funcname) + } else { + datatype = CAN_DO_TIMEOUT + data = []byte(funcname + "\x00") + t := uint32ToByte(timeout) + data = append(data, t[:] ...) + } + job := NewJob(REQ, datatype, data) + worker.WriteJob(job) return } + +// remove function +func (worker * Worker) RemoveFunction(funcname string) (err os.Error) { + worker.mutex.Lock() + defer worker.mutex.Unlock() + + if worker.functions[funcname] == nil { + return os.NewError("No function named: " + funcname) + } + worker.functions[funcname] = nil, false + job := NewJob(REQ, CANT_DO, []byte(funcname)) + worker.WriteJob(job) + return +} + // work func (worker * Worker) Work() { - for k, _ := range worker.functions { - job := NewJob(nil, REQ, CAN_DO, []byte(k)) - worker.Write(job) - } - - for _, v := range worker.servers { + for _, v := range worker.clients { go v.Work() } for worker.running { @@ -78,47 +109,51 @@ func (worker * Worker) Work() { if job == nil { break } - switch job.DataType { + switch job.dataType { + case NO_JOB: + // do nothing + log.Println(job) case ERROR: - log.Panicln(string(job.Data)) - default: - if err := worker.Exec(job); err != nil { + log.Println(string(job.Data)) + case JOB_ASSIGN, JOB_ASSIGN_UNIQ: + if err := worker.exec(job); err != nil { log.Panicln(err) } + continue + default: + worker.Queue <- job } - worker.queue <- job } } } func (worker * Worker) Result() (job *Job) { - if l := len(worker.queue); l != 1 { + if l := len(worker.Queue); l != 1 { if l == 0 { return } for i := 0; i < l - 1; i ++ { - <-worker.queue + <-worker.Queue } } - return <-worker.queue + return <-worker.Queue } // Close // should used as defer func (worker * Worker) Close() (err os.Error){ worker.running = false - for _, v := range worker.servers { + for _, v := range worker.clients { err = v.Close() } close(worker.incoming) return err } -func (worker * Worker) Write(job *Job) (err os.Error) { +func (worker * Worker) WriteJob(job *Job) (err os.Error) { e := make(chan os.Error) - for _, v := range worker.servers { + for _, v := range worker.clients { go func() { - job.client = v e <- v.WriteJob(job) }() } @@ -127,11 +162,61 @@ func (worker * Worker) Write(job *Job) (err os.Error) { // Echo func (worker * Worker) Echo(data []byte) (err os.Error) { - job := NewJob(nil, REQ, ECHO_REQ, data) - return worker.Write(job) + job := NewJob(REQ, ECHO_REQ, data) + return worker.WriteJob(job) +} + +// Reset +func (worker * Worker) Reset() (err os.Error){ + job := NewJob(REQ, RESET_ABILITIES, nil) + return worker.WriteJob(job) +} + +// SetId +func (worker * Worker) SetId(id string) (err os.Error) { + job := NewJob(REQ, SET_CLIENT_ID, []byte(id)) + return worker.WriteJob(job) } // Exec -func (worker * Worker) Exec(job *Job) (err os.Error) { +func (worker * Worker) exec(job *Job) (err os.Error) { + jobdata := splitByteArray(job.Data, '\x00') + job.Handle = string(jobdata[0]) + funcname := string(jobdata[1]) + if job.dataType == JOB_ASSIGN { + job.Data = jobdata[2] + } else { + job.UniqueId = string(jobdata[2]) + job.Data = jobdata[3] + } + result, err := worker.functions[funcname](job) + var datatype uint32 + if err == nil { + datatype = WORK_COMPLETE + } else{ + if result == nil { + datatype = WORK_FAIL + } else { + datatype = WORK_EXCEPTION + } + } + worker.WriteJob(NewJob(REQ, datatype, result)) + return +} + +func splitByteArray(slice []byte, spot byte) (data [][]byte){ + data = make([][]byte, 0, 10) + log.Println(data) + start, end := 0, 0 + for i, v := range slice { + if v == spot { + if start != end { + data = append(data, slice[start:end]) + } + start, end = i, i + } else { + end ++ + } + } return } diff --git a/src/pkg/gearman/worker/job.go b/src/pkg/gearman/worker/job.go new file mode 100644 index 0000000..88d0e6f --- /dev/null +++ b/src/pkg/gearman/worker/job.go @@ -0,0 +1,88 @@ +package gearman + +import ( + "os" +) + +type Job struct { + Data []byte + Handle string + UniqueId string + client *jobClient + magicCode, dataType uint32 +} + +func byteToUint32(buf [4]byte) uint32 { + return uint32(buf[0]) << 24 + + uint32(buf[1]) << 16 + + uint32(buf[2]) << 8 + + uint32(buf[3]) +} + +func uint32ToByte(i uint32) (data [4]byte) { + data[0] = byte((i >> 24) & 0xff) + data[1] = byte((i >> 16) & 0xff) + data[2] = byte((i >> 8) & 0xff) + data[3] = byte(i & 0xff) + return +} + +func NewJob(magiccode, datatype uint32, data []byte) (job *Job) { + return &Job{magicCode:magiccode, + dataType: datatype, + Data:data} +} + +func DecodeJob(data []byte) (job *Job, err os.Error) { + if len(data) < 12 { + return nil, os.NewError("Data length is too small.") + } + datatype := byteToUint32([4]byte{data[4], data[5], data[6], data[7]}) + l := byteToUint32([4]byte{data[8], data[9], data[10], data[11]}) + if len(data[12:]) != int(l) { + return nil, os.NewError("Invalid data length.") + } + switch(byteToUint32([4]byte{data[4], data[5], data[6], data[7]})) { + case ECHO_RES: + data = data[12:] + } + return NewJob(REQ, datatype, data), err +} + +func (job *Job) Encode() (data []byte) { + magiccode := uint32ToByte(job.magicCode) + datatype := uint32ToByte(job.dataType) + l := len(job.Data) + datalength := uint32ToByte(uint32(l)) + data = make([]byte, 12 + l) + copy(data[:4], magiccode[:]) + copy(data[4:8], datatype[:]) + copy(data[8:12], datalength[:]) + copy(data[12:], job.Data) + return +} + +// update data +func (job * Job) UpdateData(data []byte, iswaring bool) (err os.Error) { + result := append([]byte(job.Handle), 0) + result = append(result, data ...) + var datatype uint32 + if iswaring { + datatype = WORK_WARNING + } else { + datatype = WORK_DATA + } + return job.client.WriteJob(NewJob(REQ, datatype, result)) +} + +// update status +func (job * Job) UpdateStatus(numerator, denominator uint32) (err os.Error) { + n := uint32ToByte(numerator) + d := uint32ToByte(denominator) + result := append([]byte(job.Handle), 0) + result = append(result, n[:] ...) + result = append(result, d[:] ...) + return job.client.WriteJob(NewJob(REQ, WORK_STATUS, result)) +} + + diff --git a/src/pkg/gearman/worker/jobclient.go b/src/pkg/gearman/worker/jobclient.go new file mode 100644 index 0000000..453dc46 --- /dev/null +++ b/src/pkg/gearman/worker/jobclient.go @@ -0,0 +1,82 @@ +package gearman + +import ( + "net" + "os" +// "log" +) + +type jobClient struct { + conn net.Conn + incoming chan *Job + running bool +} + +func newJobClient(addr string, incoming chan *Job) (jobclient *jobClient, err os.Error) { + conn, err := net.Dial(TCP, addr) + if err != nil { + return nil, err + } + jobclient = &jobClient{conn:conn, incoming: incoming, running:true} + return jobclient, err +} + +func (client *jobClient) Work() (err os.Error) { + noop := true + for client.running { + // grab job + if noop { + client.WriteJob(NewJob(REQ, GRAB_JOB, nil)) + } + var rel []byte + for { + buf := make([]byte, 2048) + n, err := client.conn.Read(buf) + if err != nil { + if err == os.EOF && n == 0 { + break + } + return err + } + rel = append(rel, buf[0: n] ...) + break + } + job, err := DecodeJob(rel) + if err != nil { + return err + } else { + switch(job.dataType) { + case NOOP: + noop = true + case NO_JOB: + noop = false + client.WriteJob(NewJob(REQ, PRE_SLEEP, nil)) + case ECHO_RES, JOB_ASSIGN_UNIQ, JOB_ASSIGN: + job.client = client + client.incoming <- job + } + } + } + return +} + +func (client *jobClient) WriteJob(job * Job) (err os.Error) { + return client.Write(job.Encode()) +} + +func (client *jobClient) Write(buf []byte) (err os.Error) { + var n int + for i := 0; i < len(buf); i += n { + n, err = client.conn.Write(buf[i:]) + if err != nil { + return err + } + } + return +} + +func (client *jobClient) Close() (err os.Error) { + client.running = false + err = client.conn.Close() + return +} diff --git a/test/Makefile b/test/Makefile deleted file mode 100644 index cab8f72..0000000 --- a/test/Makefile +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright 2009 The Go Authors. All rights reserved. -# Use of this source code is governed by a BSD-style -# license that can be found in the LICENSE file. - -include $(GOROOT)/src/Make.inc - -TARG=client -GOFILES=\ - client.go\ - -CLEANFILES+=client - -include $(GOROOT)/src/Make.pkg - -%: install %.go - $(GC) $*.go - $(LD) -o $@ $*.$O diff --git a/test/client.go b/test/client.go deleted file mode 100644 index 7bc6b91..0000000 --- a/test/client.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "net" - "log" - "fmt" -) - -type Client struct { - c * net.TCPConn - w chan []byte - r chan []byte -} - -func NewClient() * Client { - client := new(Client) - wch := make(chan []byte, 2048) - rch := make(chan []byte, 2048) - client.w = wch - client.r = rch - addr, err := net.ResolveTCPAddr("127.0.0.1:4730") - if err != nil { - log.Panic(err.String()) - } - c, err := net.DialTCP("tcp", nil, addr) - client.c = c - if err != nil { - log.Panic(err.String()) - } - go client.ReadLoop() - go client.WriteLoop() - return client -} - -func (client * Client) ReadLoop() { - for { - buf := make([]byte, 2048) - _, err := client.c.Read(buf) - if err != nil { - client.c.Close() - break; - } - client.r <- buf - } -} - -func (client * Client) WriteLoop() { - for { - select { - case buf := <- client.w: - client.c.Write(buf) - } - } -} - -func (client * Client) Write(data []byte) { - client.w <- data -} - -func (client * Client) Read() []byte{ - var buf []byte - select { - case buf = <- client.r: - } - return buf -} - -func main() { - client := NewClient() - client.Write([]byte("TEST\r\n")) - for { - fmt.Println(client.Read()) - } -}