diff --git a/.travis.yml b/.travis.yml index b20dfe6..5eb5bad 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go go: - - 1.2 + - 1.4 before_install: - sudo apt-get remove -y gearman-job-server diff --git a/README.md b/README.md index 8ea9c06..43b94fa 100644 --- a/README.md +++ b/README.md @@ -37,41 +37,46 @@ Usage ## Worker - // Limit number of concurrent jobs execution. Use worker.Unlimited (0) if you want no limitation. - w := worker.New(worker.OneByOne) - w.ErrHandler = func(e error) { - log.Println(e) - } - w.AddServer("127.0.0.1:4730") - // Use worker.Unlimited (0) if you want no timeout - w.AddFunc("ToUpper", ToUpper, worker.Unlimited) - // This will give a timeout of 5 seconds - w.AddFunc("ToUpperTimeOut5", ToUpper, 5) - if err := w.Ready(); err != nil { - log.Fatal(err) - return - } - go w.Work() - +```go +// Limit number of concurrent jobs execution. +// Use worker.Unlimited (0) if you want no limitation. +w := worker.New(worker.OneByOne) +w.ErrHandler = func(e error) { + log.Println(e) +} +w.AddServer("127.0.0.1:4730") +// Use worker.Unlimited (0) if you want no timeout +w.AddFunc("ToUpper", ToUpper, worker.Unlimited) +// This will give a timeout of 5 seconds +w.AddFunc("ToUpperTimeOut5", ToUpper, 5) + +if err := w.Ready(); err != nil { + log.Fatal(err) + return +} +go w.Work() +``` ## Client - // ... - c, err := client.New("tcp4", "127.0.0.1:4730") - // ... error handling - defer c.Close() - c.ErrorHandler = func(e error) { - log.Println(e) - } - echo := []byte("Hello\x00 world") - echomsg, err := c.Echo(echo) - // ... error handling - log.Println(string(echomsg)) - jobHandler := func(job *client.Job) { - log.Printf("%s", job.Data) - } - handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) - // ... +```go +// ... +c, err := client.New("tcp4", "127.0.0.1:4730") +// ... error handling +defer c.Close() +c.ErrorHandler = func(e error) { + log.Println(e) +} +echo := []byte("Hello\x00 world") +echomsg, err := c.Echo(echo) +// ... error handling +log.Println(string(echomsg)) +jobHandler := func(resp *client.Response) { + log.Printf("%s", resp.Data) +} +handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler) +// ... +``` Branches ======== @@ -87,17 +92,29 @@ __Use at your own risk!__ Contributors ============ +Great thanks to all of you for your support and interest! + (_Alphabetic order_) * [Alex Zylman](https://github.com/azylman) + * [C.R. Kirkwood-Watts](https://github.com/kirkwood) + * [Damian Gryski](https://github.com/dgryski) + * [Gabriel Cristian Alecu](https://github.com/AzuraMeta) + * [Graham Barr](https://github.com/gbarr) * [Ingo Oeser](https://github.com/nightlyone) * [jake](https://github.com/jbaikge) + * [Joe Higton](https://github.com/draxil) * [Jonathan Wills](https://github.com/runningwild) + * [Kevin Darlington](https://github.com/kdar) * [miraclesu](https://github.com/miraclesu) * [Paul Mach](https://github.com/paulmach) + * [Randall McPherson](https://github.com/rlmcpherson) * [Sam Grimee](https://github.com/sgrimee) - * suchj - * [Xing Xing](http://mikespook.com) [@Twitter](http://twitter.com/mikespook) + +Maintainer +========== + + * [Xing Xing](http://mikespook.com) <> [@Twitter](http://twitter.com/mikespook) Open Source - MIT Software License ================================== diff --git a/client/client.go b/client/client.go index 5e12dbb..907704f 100644 --- a/client/client.go +++ b/client/client.go @@ -6,6 +6,11 @@ import ( "bufio" "net" "sync" + "time" +) + +var ( + DefaultTimeout time.Duration = 1000 ) // One client connect to one server. @@ -20,17 +25,20 @@ type Client struct { conn net.Conn rw *bufio.ReadWriter + ResponseTimeout time.Duration // response timeout for do() in ms + ErrorHandler ErrorHandler } // Return a client. func New(network, addr string) (client *Client, err error) { client = &Client{ - net: network, - addr: addr, - respHandler: make(map[string]ResponseHandler, queueSize), - innerHandler: make(map[string]ResponseHandler, queueSize), - in: make(chan *Response, queueSize), + net: network, + addr: addr, + respHandler: make(map[string]ResponseHandler, queueSize), + innerHandler: make(map[string]ResponseHandler, queueSize), + in: make(chan *Response, queueSize), + ResponseTimeout: DefaultTimeout, } client.conn, err = net.Dial(client.net, client.addr) if err != nil { @@ -104,6 +112,7 @@ ReadLoop: } if len(leftdata) > 0 { // some data left for processing data = append(leftdata, data...) + leftdata = nil } for { l := len(data) @@ -145,10 +154,8 @@ func (client *Client) processLoop() { case dtWorkData, dtWorkWarning, dtWorkStatus: resp = client.handleResponse(resp.Handle, resp) case dtWorkComplete, dtWorkFail, dtWorkException: - resp = client.handleResponse(resp.Handle, resp) - if resp != nil { - delete(client.respHandler, resp.Handle) - } + client.handleResponse(resp.Handle, resp) + delete(client.respHandler, resp.Handle) } } } @@ -176,27 +183,44 @@ func (client *Client) handleInner(key string, resp *Response) *Response { return resp } +type handleOrError struct { + handle string + err error +} + func (client *Client) do(funcname string, data []byte, flag uint32) (handle string, err error) { if client.conn == nil { return "", ErrLostConn } - var mutex sync.Mutex - mutex.Lock() + var result = make(chan handleOrError, 1) client.lastcall = "c" client.innerHandler["c"] = func(resp *Response) { if resp.DataType == dtError { err = getError(resp.Data) + result <- handleOrError{"", err} return } handle = resp.Handle - mutex.Unlock() + result <- handleOrError{handle, nil} } id := IdGen.Id() req := getJob(id, []byte(funcname), data) req.DataType = flag - client.write(req) - mutex.Lock() + if err = client.write(req); err != nil { + delete(client.innerHandler, "c") + client.lastcall = "" + return + } + var timer = time.After(client.ResponseTimeout * time.Millisecond) + select { + case ret := <-result: + return ret.handle, ret.err + case <-timer: + delete(client.innerHandler, "c") + client.lastcall = "" + return "", ErrLostConn + } return } @@ -249,12 +273,12 @@ func (client *Client) Status(handle string) (status *Status, err error) { mutex.Lock() client.lastcall = "s" + handle client.innerHandler["s"+handle] = func(resp *Response) { + defer mutex.Unlock() var err error status, err = resp._status() if err != nil { client.err(err) } - mutex.Unlock() } req := getRequest() req.DataType = dtGetStatus diff --git a/client/common.go b/client/common.go index ad47936..e520bf0 100644 --- a/client/common.go +++ b/client/common.go @@ -5,7 +5,7 @@ const ( // queue size queueSize = 8 // read buffer size - bufferSize = 1024 + bufferSize = 8192 // min packet length minPacketLength = 12 diff --git a/client/pool.go b/client/pool.go index 23efb74..9b94888 100644 --- a/client/pool.go +++ b/client/pool.go @@ -11,20 +11,18 @@ const ( ) var ( - ErrNotFound = errors.New("Server Not Found") - SelectWithRate = selectWithRate - SelectRandom = selectRandom + ErrNotFound = errors.New("Server Not Found") ) -type poolClient struct { +type PoolClient struct { *Client Rate int mutex sync.Mutex } -type SelectionHandler func(map[string]*poolClient, string) string +type SelectionHandler func(map[string]*PoolClient, string) string -func selectWithRate(pool map[string]*poolClient, +func SelectWithRate(pool map[string]*PoolClient, last string) (addr string) { total := 0 for _, item := range pool { @@ -36,7 +34,7 @@ func selectWithRate(pool map[string]*poolClient, return last } -func selectRandom(pool map[string]*poolClient, +func SelectRandom(pool map[string]*PoolClient, last string) (addr string) { r := rand.Intn(len(pool)) i := 0 @@ -52,9 +50,9 @@ func selectRandom(pool map[string]*poolClient, type Pool struct { SelectionHandler SelectionHandler ErrorHandler ErrorHandler + Clients map[string]*PoolClient - clients map[string]*poolClient - last string + last string mutex sync.Mutex } @@ -62,7 +60,7 @@ type Pool struct { // Return a new pool. func NewPool() (pool *Pool) { return &Pool{ - clients: make(map[string]*poolClient, poolSize), + Clients: make(map[string]*PoolClient, poolSize), SelectionHandler: SelectWithRate, } } @@ -71,16 +69,16 @@ func NewPool() (pool *Pool) { func (pool *Pool) Add(net, addr string, rate int) (err error) { pool.mutex.Lock() defer pool.mutex.Unlock() - var item *poolClient + var item *PoolClient var ok bool - if item, ok = pool.clients[addr]; ok { + if item, ok = pool.Clients[addr]; ok { item.Rate = rate } else { var client *Client client, err = New(net, addr) if err == nil { - item = &poolClient{Client: client, Rate: rate} - pool.clients[addr] = item + item = &PoolClient{Client: client, Rate: rate} + pool.Clients[addr] = item } } return @@ -90,7 +88,7 @@ func (pool *Pool) Add(net, addr string, rate int) (err error) { func (pool *Pool) Remove(addr string) { pool.mutex.Lock() defer pool.mutex.Unlock() - delete(pool.clients, addr) + delete(pool.Clients, addr) } func (pool *Pool) Do(funcname string, data []byte, @@ -116,7 +114,7 @@ func (pool *Pool) DoBg(funcname string, data []byte, // Get job status from job server. // !!!Not fully tested.!!! func (pool *Pool) Status(addr, handle string) (status *Status, err error) { - if client, ok := pool.clients[addr]; ok { + if client, ok := pool.Clients[addr]; ok { client.Lock() defer client.Unlock() status, err = client.Status(handle) @@ -128,12 +126,12 @@ func (pool *Pool) Status(addr, handle string) (status *Status, err error) { // Send a something out, get the samething back. func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { - var client *poolClient + var client *PoolClient if addr == "" { client = pool.selectServer() } else { var ok bool - if client, ok = pool.clients[addr]; !ok { + if client, ok = pool.Clients[addr]; !ok { err = ErrNotFound return } @@ -147,18 +145,18 @@ func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { // Close func (pool *Pool) Close() (err map[string]error) { err = make(map[string]error) - for _, c := range pool.clients { + for _, c := range pool.Clients { err[c.addr] = c.Close() } return } // selecting server -func (pool *Pool) selectServer() (client *poolClient) { +func (pool *Pool) selectServer() (client *PoolClient) { for client == nil { - addr := pool.SelectionHandler(pool.clients, pool.last) + addr := pool.SelectionHandler(pool.Clients, pool.last) var ok bool - if client, ok = pool.clients[addr]; ok { + if client, ok = pool.Clients[addr]; ok { pool.last = addr break } diff --git a/client/pool_test.go b/client/pool_test.go index 5324db9..54e7e03 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -18,8 +18,8 @@ func TestPoolAdd(t *testing.T) { t.Log(err) c -= 1 } - if len(pool.clients) != c { - t.Errorf("%d servers expected, %d got.", c, len(pool.clients)) + if len(pool.Clients) != c { + t.Errorf("%d servers expected, %d got.", c, len(pool.Clients)) } } diff --git a/client/response.go b/client/response.go index bcafc53..ce8e509 100644 --- a/client/response.go +++ b/client/response.go @@ -76,7 +76,7 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) { case dtJobCreated: resp.Handle = string(dt) case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus, - dtWorkComplete, dtWorkFail, dtWorkException: + dtWorkComplete, dtWorkException: s := bytes.SplitN(dt, []byte{'\x00'}, 2) if len(s) >= 2 { resp.Handle = string(s[0]) @@ -85,6 +85,14 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) { err = fmt.Errorf("Invalid data: %v", data) return } + case dtWorkFail: + s := bytes.SplitN(dt, []byte{'\x00'}, 2) + if len(s) >= 1 { + resp.Handle = string(s[0]) + } else { + err = fmt.Errorf("Invalid data: %v", data) + return + } case dtEchoRes: fallthrough default: diff --git a/example/client/client.go b/example/client/client.go index e7a34ea..61b0d61 100644 --- a/example/client/client.go +++ b/example/client/client.go @@ -3,8 +3,8 @@ package main import ( "github.com/mikespook/gearman-go/client" "log" - "sync" "os" + "sync" ) func main() { diff --git a/example/worker/worker.go b/example/worker/worker.go index 7474517..3b0ddad 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -1,13 +1,14 @@ package main import ( - "github.com/mikespook/gearman-go/worker" - "github.com/mikespook/golib/signal" "log" + "net" "os" "strings" "time" - "net" + + "github.com/mikespook/gearman-go/worker" + "github.com/mikespook/golib/signal" ) func ToUpper(job worker.Job) ([]byte, error) { @@ -41,7 +42,7 @@ func main() { w.ErrorHandler = func(e error) { log.Println(e) if opErr, ok := e.(*net.OpError); ok { - if ! opErr.Temporary() { + if !opErr.Temporary() { proc, err := os.FindProcess(os.Getpid()) if err != nil { log.Println(err) @@ -68,7 +69,6 @@ func main() { return } go w.Work() - sh := signal.NewHandler() - sh.Bind(os.Interrupt, func() bool { return true }) - sh.Loop() + signal.Bind(os.Interrupt, func() uint { return signal.BreakExit }) + signal.Wait() } diff --git a/worker/agent.go b/worker/agent.go index b6af74c..ad72690 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -2,7 +2,9 @@ package worker import ( "bufio" + "bytes" "encoding/binary" + "io" "net" "sync" ) @@ -47,19 +49,25 @@ func (a *agent) work() { a.worker.err(err.(error)) } }() + var inpack *inPack var l int var err error var data, leftdata []byte for { - if data, err = a.read(bufferSize); err != nil { + if data, err = a.read(); err != nil { if opErr, ok := err.(*net.OpError); ok { - if opErr.Timeout() { - a.worker.err(err) - } if opErr.Temporary() { continue + } else { + a.disconnect_error(err) + // else - we're probably dc'ing due to a Close() + + break } + + } else if err == io.EOF { + a.disconnect_error(err) break } a.worker.err(err) @@ -83,17 +91,33 @@ func (a *agent) work() { leftdata = data continue } - if inpack, l, err = decodeInPack(data); err != nil { - a.worker.err(err) - leftdata = data - continue + for { + if inpack, l, err = decodeInPack(data); err != nil { + a.worker.err(err) + leftdata = data + break + } else { + leftdata = nil + inpack.a = a + a.worker.in <- inpack + if len(data) == l { + break + } + if len(data) > l { + data = data[l:] + } + } } - leftdata = nil - inpack.a = a - a.worker.in <- inpack - if len(data) > l { - leftdata = data[l:] + } +} + +func (a *agent) disconnect_error(err error) { + if a.conn != nil { + err = &WorkerDisconnectError{ + err: err, + agent: a, } + a.worker.err(err) } } @@ -109,6 +133,10 @@ func (a *agent) Close() { func (a *agent) Grab() { a.Lock() defer a.Unlock() + a.grab() +} + +func (a *agent) grab() { outpack := getOutPack() outpack.dataType = dtGrabJobUniq a.write(outpack) @@ -122,21 +150,49 @@ func (a *agent) PreSleep() { a.write(outpack) } -// read length bytes from the socket -func (a *agent) read(length int) (data []byte, err error) { - n := 0 - buf := getBuffer(bufferSize) - // read until data can be unpacked - for i := length; i > 0 || len(data) < minPacketLength; i -= n { - if n, err = a.rw.Read(buf); err != nil { - return - } - data = append(data, buf[0:n]...) - if n < bufferSize { - break - } +func (a *agent) reconnect() error { + a.Lock() + defer a.Unlock() + conn, err := net.Dial(a.net, a.addr) + if err != nil { + return err } - return + a.conn = conn + a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn), + bufio.NewWriter(a.conn)) + a.grab() + a.worker.reRegisterFuncsForAgent(a) + + go a.work() + return nil +} + +// read length bytes from the socket +func (a *agent) read() (data []byte, err error) { + n := 0 + + tmp := getBuffer(bufferSize) + var buf bytes.Buffer + + // read the header so we can get the length of the data + if n, err = a.rw.Read(tmp); err != nil { + return + } + dl := int(binary.BigEndian.Uint32(tmp[8:12])) + + // write what we read so far + buf.Write(tmp[:n]) + + // read until we receive all the data + for buf.Len() < dl+minPacketLength { + if n, err = a.rw.Read(tmp); err != nil { + return buf.Bytes(), err + } + + buf.Write(tmp[:n]) + } + + return buf.Bytes(), err } // Internal write the encoded job. @@ -151,3 +207,10 @@ func (a *agent) write(outpack *outPack) (err error) { } return a.rw.Flush() } + +// Write with lock +func (a *agent) Write(outpack *outPack) (err error) { + a.Lock() + defer a.Unlock() + return a.write(outpack) +} diff --git a/worker/worker.go b/worker/worker.go index 9926aec..92b8579 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -19,15 +19,15 @@ const ( // It can connect to multi-server and grab jobs. type Worker struct { sync.Mutex - agents []*agent - funcs jobFuncs - in chan *inPack - running bool - ready bool + agents []*agent + funcs jobFuncs + in chan *inPack + running bool + ready bool // The shuttingDown variable is protected by the Worker lock shuttingDown bool - // Used during shutdown to wait for all active jobs to finish - activeJobs sync.WaitGroup + // Used during shutdown to wait for all active jobs to finish + activeJobs sync.WaitGroup Id string ErrorHandler ErrorHandler @@ -99,6 +99,11 @@ func (worker *Worker) AddFunc(funcname string, // inner add func (worker *Worker) addFunc(funcname string, timeout uint32) { + outpack := prepFuncOutpack(funcname, timeout) + worker.broadcast(outpack) +} + +func prepFuncOutpack(funcname string, timeout uint32) *outPack { outpack := getOutPack() if timeout == 0 { outpack.dataType = dtCanDo @@ -111,7 +116,7 @@ func (worker *Worker) addFunc(funcname string, timeout uint32) { outpack.data[l] = '\x00' binary.BigEndian.PutUint32(outpack.data[l+1:], timeout) } - worker.broadcast(outpack) + return outpack } // Remove a function. @@ -200,11 +205,6 @@ func (worker *Worker) Work() { } } - defer func() { - for _, a := range worker.agents { - a.Close() - } - }() worker.running = true for _, a := range worker.agents { a.Grab() @@ -227,8 +227,11 @@ func (worker *Worker) customeHandler(inpack *inPack) { // Close connection and exit main loop func (worker *Worker) Close() { worker.Lock() - worker.Unlock() + defer worker.Unlock() if worker.running == true { + for _, a := range worker.agents { + a.Close() + } worker.running = false close(worker.in) } @@ -323,10 +326,19 @@ func (worker *Worker) exec(inpack *inPack) (err error) { } outpack.handle = inpack.handle outpack.data = r.data - inpack.a.write(outpack) + inpack.a.Write(outpack) } return } +func (worker *Worker) reRegisterFuncsForAgent(a *agent) { + worker.Lock() + defer worker.Unlock() + for funcname, f := range worker.funcs { + outpack := prepFuncOutpack(funcname, f.timeout) + a.write(outpack) + } + +} // inner result type result struct { @@ -350,3 +362,23 @@ func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) { } return r } + +// Error type passed when a worker connection disconnects +type WorkerDisconnectError struct { + err error + agent *agent +} + +func (e *WorkerDisconnectError) Error() string { + return e.err.Error() +} + +// Responds to the error by asking the worker to reconnect +func (e *WorkerDisconnectError) Reconnect() (err error) { + return e.agent.reconnect() +} + +// Which server was this for? +func (e *WorkerDisconnectError) Server() (net string, addr string) { + return e.agent.net, e.agent.addr +} diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go new file mode 100644 index 0000000..d0f6cc2 --- /dev/null +++ b/worker/worker_disconnect_test.go @@ -0,0 +1,243 @@ +package worker + +import ( + "../client" + "log" + "net" + "os/exec" + "testing" + "time" +) + +const port = `3700` + +var gearman_ready chan bool +var kill_gearman chan bool +var bye chan bool + +func init() { + + if check_gearman_present() { + panic(`Something already listening on our testing port. Chickening out of testing with it!`) + } + gearman_ready = make(chan bool) + kill_gearman = make(chan bool) + // TODO: verify port is clear + go run_gearman() +} + +func run_gearman() { + gm_cmd := exec.Command(`/usr/sbin/gearmand`, `--port`, port) + start_err := gm_cmd.Start() + + if start_err != nil { + panic(`could not start gearman, aborting test :` + start_err.Error()) + } + + // Make sure we clear up our gearman: + defer func() { + gm_cmd.Process.Kill() + }() + + for tries := 10; tries > 0; tries-- { + if check_gearman_present() { + break + } + time.Sleep(250 * time.Millisecond) + } + + if !check_gearman_present() { + panic(`Unable to start gearman aborting test`) + } + gearman_ready <- true + + <-kill_gearman +} + +func check_gearman_present() bool { + con, err := net.Dial(`tcp`, `127.0.0.1:`+port) + if err != nil { + return false + } + con.Close() + return true +} + +func check_gearman_is_dead() bool { + + for tries := 10; tries > 0; tries-- { + if !check_gearman_present() { + return true + } + time.Sleep(250 * time.Millisecond) + } + return false +} + +/* + Checks for a disconnect whilst not working +*/ +func TestBasicDisconnect(t *testing.T) { + <-gearman_ready + + worker := New(Unlimited) + timeout := make(chan bool, 1) + done := make(chan bool, 1) + + if err := worker.AddServer(Network, "127.0.0.1:"+port); err != nil { + t.Error(err) + } + work_done := false + if err := worker.AddFunc("gearman-go-workertest", + func(j Job) (b []byte, e error) { + work_done = true + done <- true + return + }, 0); err != nil { + t.Error(err) + } + + handled_errors := false + + c_error := make(chan bool) + was_dc_err := false + worker.ErrorHandler = func(e error) { + log.Println(e) + _, was_dc_err = e.(*WorkerDisconnectError) + handled_errors = true + c_error <- true + } + + go func() { + time.Sleep(5 * time.Second) + timeout <- true + }() + + err := worker.Ready() + + if err != nil { + t.Error(err) + } + + go worker.Work() + + kill_gearman <- true + + check_gearman_is_dead() + go run_gearman() + + select { + case <-gearman_ready: + case <-timeout: + } + + send_client_request() + + select { + case <-done: + t.Error("Client request handled (somehow), did we magically reconnect?") + case <-timeout: + t.Error("Test timed out waiting for the error handler") + case <-c_error: + // error was handled! + if !was_dc_err { + t.Error("Disconnect didn't manifest as a net.OpError?") + } + } + worker.Close() + kill_gearman <- true + +} + +func TestDcRc(t *testing.T) { + check_gearman_is_dead() + go run_gearman() + + <-gearman_ready + + worker := New(Unlimited) + timeout := make(chan bool, 1) + done := make(chan bool, 1) + + if err := worker.AddServer(Network, "127.0.0.1:"+port); err != nil { + t.Error(err) + } + work_done := false + if err := worker.AddFunc("gearman-go-workertest", + func(j Job) (b []byte, e error) { + log.Println("Actual work happens!") + work_done = true + done <- true + return + }, 0); err != nil { + t.Error(err) + } + + worker.ErrorHandler = func(e error) { + wdc, wdcok := e.(*WorkerDisconnectError) + + if wdcok { + log.Println("Reconnecting!") + reconnected := false + for tries := 20; !reconnected && tries > 0; tries-- { + rcerr := wdc.Reconnect() + if rcerr != nil { + time.Sleep(250 * time.Millisecond) + } else { + reconnected = true + } + } + + } else { + panic("Some other kind of error " + e.Error()) + } + + } + + go func() { + time.Sleep(5 * time.Second) + timeout <- true + }() + + err := worker.Ready() + + if err != nil { + t.Error(err) + } + + go worker.Work() + + kill_gearman <- true + + check_gearman_is_dead() + go run_gearman() + + select { + case <-gearman_ready: + case <-timeout: + } + + send_client_request() + + select { + case <-done: + case <-timeout: + t.Error("Test timed out") + } + worker.Close() + kill_gearman <- true + +} + +func send_client_request() { + c, err := client.New(Network, "127.0.0.1:"+port) + if err == nil { + _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) + if err != nil { + log.Println("error sending client request " + err.Error()) + } + + } else { + log.Println("error with client " + err.Error()) + } +} diff --git a/worker/worker_test.go b/worker/worker_test.go index 946d166..baa50fe 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -1,6 +1,7 @@ package worker import ( + "bytes" "errors" "sync" "sync/atomic" @@ -80,6 +81,62 @@ func TestWork(t *testing.T) { wg.Wait() } +func TestLargeDataWork(t *testing.T) { + worker := New(Unlimited) + defer worker.Close() + + if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil { + t.Error(err) + } + worker.Ready() + + l := 5714 + var wg sync.WaitGroup + + bigdataHandler := func(job Job) error { + defer wg.Done() + if len(job.Data()) != l { + t.Errorf("expected length %d. got %d.", l, len(job.Data())) + } + return nil + } + if err := worker.AddFunc("bigdata", foobar, 0); err != nil { + defer wg.Done() + t.Error(err) + } + + worker.JobHandler = bigdataHandler + + worker.ErrorHandler = func(err error) { + t.Fatal("shouldn't have received an error") + } + + if err := worker.Ready(); err != nil { + t.Error(err) + return + } + go worker.Work() + wg.Add(1) + + // var cli *client.Client + // var err error + // if cli, err = client.New(client.Network, "127.0.0.1:4730"); err != nil { + // t.Fatal(err) + // } + // cli.ErrorHandler = func(e error) { + // t.Error(e) + // } + + // _, err = cli.Do("bigdata", bytes.Repeat([]byte("a"), l), client.JobLow, func(res *client.Response) { + // }) + // if err != nil { + // t.Error(err) + // } + + worker.Echo(bytes.Repeat([]byte("a"), l)) + wg.Wait() +} + func TestWorkerClose(t *testing.T) { worker.Close() } @@ -116,7 +173,7 @@ func TestWorkWithoutReady(t *testing.T) { // With the all-in-one Work() we don't know if the // worker is ready at this stage so we may have to wait a sec: go func() { - tries := 3 + tries := 5 for tries > 0 { if other_worker.ready { other_worker.Echo([]byte("Hello")) @@ -124,7 +181,7 @@ func TestWorkWithoutReady(t *testing.T) { } // still waiting for it to be ready.. - time.Sleep(1 * time.Second) + time.Sleep(250 * time.Millisecond) tries-- } }()