From da3d31662b106ade0fddcfc5e5140436628a9e65 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Fri, 7 Mar 2014 17:40:32 +0800 Subject: [PATCH 01/34] added Damian to contributors --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 8ea9c06..3677d85 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,7 @@ Contributors (_Alphabetic order_) * [Alex Zylman](https://github.com/azylman) + * [Damian Gryski](https://github.com/dgryski) * [Ingo Oeser](https://github.com/nightlyone) * [jake](https://github.com/jbaikge) * [Jonathan Wills](https://github.com/runningwild) From de91c999f7b7cf62319bd6c304582b02dc08f429 Mon Sep 17 00:00:00 2001 From: Kevin Darlington Date: Sat, 8 Mar 2014 19:22:14 -0500 Subject: [PATCH 02/34] Changed agent.read to handle big data. --- worker/agent.go | 39 +++++++++++++++++++---------- worker/worker_test.go | 57 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 13 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 2236173..a3df097 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -2,6 +2,8 @@ package worker import ( "bufio" + "bytes" + "encoding/binary" "net" "sync" ) @@ -51,7 +53,7 @@ func (a *agent) work() { var err error var data, leftdata []byte for { - if data, err = a.read(bufferSize); err != nil { + if data, err = a.read(); err != nil { if opErr, ok := err.(*net.OpError); ok { if opErr.Timeout() { a.worker.err(err) @@ -121,20 +123,31 @@ func (a *agent) PreSleep() { } // read length bytes from the socket -func (a *agent) read(length int) (data []byte, err error) { +func (a *agent) read() (data []byte, err error) { n := 0 - buf := getBuffer(bufferSize) - // read until data can be unpacked - for i := length; i > 0 || len(data) < minPacketLength; i -= n { - if n, err = a.rw.Read(buf); err != nil { - return - } - data = append(data, buf[0:n]...) - if n < bufferSize { - break - } + + tmp := getBuffer(bufferSize) + var buf bytes.Buffer + + // read the header so we can get the length of the data + if n, err = a.rw.Read(tmp); err != nil { + return } - return + dl := int(binary.BigEndian.Uint32(tmp[8:12])) + + // write what we read so far + buf.Write(tmp[:n]) + + // read until we receive all the data + for buf.Len() < dl+minPacketLength { + if n, err = a.rw.Read(tmp); err != nil { + return buf.Bytes(), err + } + + buf.Write(tmp[:n]) + } + + return buf.Bytes(), err } // Internal write the encoded job. diff --git a/worker/worker_test.go b/worker/worker_test.go index 7fa8f0b..1b1c9fd 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -1,6 +1,7 @@ package worker import ( + "bytes" "sync" "testing" ) @@ -77,6 +78,62 @@ func TestWork(t *testing.T) { wg.Wait() } +func TestLargeDataWork(t *testing.T) { + worker := New(Unlimited) + defer worker.Close() + + if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil { + t.Error(err) + } + worker.Ready() + + l := 5714 + var wg sync.WaitGroup + + bigdataHandler := func(job Job) error { + defer wg.Done() + if len(job.Data()) != l { + t.Errorf("expected length %d. got %d.", l, len(job.Data())) + } + return nil + } + if err := worker.AddFunc("bigdata", foobar, 0); err != nil { + defer wg.Done() + t.Error(err) + } + + worker.JobHandler = bigdataHandler + + worker.ErrorHandler = func(err error) { + t.Fatal("shouldn't have received an error") + } + + if err := worker.Ready(); err != nil { + t.Error(err) + return + } + go worker.Work() + wg.Add(1) + + // var cli *client.Client + // var err error + // if cli, err = client.New(client.Network, "127.0.0.1:4730"); err != nil { + // t.Fatal(err) + // } + // cli.ErrorHandler = func(e error) { + // t.Error(e) + // } + + // _, err = cli.Do("bigdata", bytes.Repeat([]byte("a"), l), client.JobLow, func(res *client.Response) { + // }) + // if err != nil { + // t.Error(err) + // } + + worker.Echo(bytes.Repeat([]byte("a"), l)) + wg.Wait() +} + func TestWorkerClose(t *testing.T) { worker.Close() } From 5334b50533202a5421754ff15b780c1b1fa8ae02 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Tue, 11 Mar 2014 10:03:00 +0800 Subject: [PATCH 03/34] fixed #34 --- client/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index 5e12dbb..b98dffe 100644 --- a/client/client.go +++ b/client/client.go @@ -185,12 +185,12 @@ func (client *Client) do(funcname string, data []byte, mutex.Lock() client.lastcall = "c" client.innerHandler["c"] = func(resp *Response) { + defer mutex.Unlock() if resp.DataType == dtError { err = getError(resp.Data) return } handle = resp.Handle - mutex.Unlock() } id := IdGen.Id() req := getJob(id, []byte(funcname), data) @@ -249,12 +249,12 @@ func (client *Client) Status(handle string) (status *Status, err error) { mutex.Lock() client.lastcall = "s" + handle client.innerHandler["s"+handle] = func(resp *Response) { + defer mutex.Unlock() var err error status, err = resp._status() if err != nil { client.err(err) } - mutex.Unlock() } req := getRequest() req.DataType = dtGetStatus From 6910d548bec38278213d901c95c667173fe9cf48 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Tue, 11 Mar 2014 10:08:20 +0800 Subject: [PATCH 04/34] added Kevin to the contributors list --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 3677d85..20c0281 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,7 @@ Contributors * [Ingo Oeser](https://github.com/nightlyone) * [jake](https://github.com/jbaikge) * [Jonathan Wills](https://github.com/runningwild) + * [Kevin Darlington](https://github.com/kdar) * [miraclesu](https://github.com/miraclesu) * [Paul Mach](https://github.com/paulmach) * [Sam Grimee](https://github.com/sgrimee) From 2f72b28d762e42cdd94adc3f02181bf808591e2e Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 05:35:05 +0100 Subject: [PATCH 05/34] FIX: waiting for worker fireup was making the test slow Dodging the race condition with a sleep was fine, but we slept for a whole second which was longer than this test takes to run! I've scaled the sleep down to 250 miliseconds and increased the tries. Even this is probably excessively long but now the test is fast agin. --- worker/worker_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/worker_test.go b/worker/worker_test.go index 06ce15f..7c47b14 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -115,7 +115,7 @@ func TestWorkWithoutReady(t * testing.T){ // With the all-in-one Work() we don't know if the // worker is ready at this stage so we may have to wait a sec: go func(){ - tries := 3 + tries := 5 for( tries > 0 ){ if other_worker.ready { other_worker.Echo([]byte("Hello")) @@ -123,7 +123,7 @@ func TestWorkWithoutReady(t * testing.T){ } // still waiting for it to be ready.. - time.Sleep(1 * time.Second) + time.Sleep(250 * time.Millisecond) tries-- } }() From 912db95c0a995dc9897f8e0a30627cca48ddcdee Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 05:35:05 +0100 Subject: [PATCH 06/34] FIX: waiting for worker fireup was making the test slow Dodging the race condition with a sleep was fine, but we slept for a whole second which was longer than this test takes to run! I've scaled the sleep down to 250 miliseconds and increased the tries. Even this is probably excessively long but now the test is fast agin. --- worker/worker_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/worker_test.go b/worker/worker_test.go index 06ce15f..7c47b14 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -115,7 +115,7 @@ func TestWorkWithoutReady(t * testing.T){ // With the all-in-one Work() we don't know if the // worker is ready at this stage so we may have to wait a sec: go func(){ - tries := 3 + tries := 5 for( tries > 0 ){ if other_worker.ready { other_worker.Echo([]byte("Hello")) @@ -123,7 +123,7 @@ func TestWorkWithoutReady(t * testing.T){ } // still waiting for it to be ready.. - time.Sleep(1 * time.Second) + time.Sleep(250 * time.Millisecond) tries-- } }() From e0614657e058098f3f33797ccc7c003c53af2d16 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 05:50:58 +0100 Subject: [PATCH 07/34] Added the original gearmand to travis for testing: 1. It's easier to test with (it installs in the regular path), we need to start and stop it for network error tests 2. Can't hurt to test with both implementations! --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index b20dfe6..3d97d6d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,3 +5,4 @@ go: before_install: - sudo apt-get remove -y gearman-job-server - sudo apt-get install -y gearman-job-server + - sudo apt-get install -y gearman-server From 7ddad76b8530380150e61a2c3302d68062c84754 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 06:14:28 +0100 Subject: [PATCH 08/34] Early stab at disconnect handling test --- worker/worker_disconnect_test.go | 132 +++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 worker/worker_disconnect_test.go diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go new file mode 100644 index 0000000..fcfe1d9 --- /dev/null +++ b/worker/worker_disconnect_test.go @@ -0,0 +1,132 @@ +package worker + +import ( + "log" + "net" + "os/exec" + "testing" + "time" +) + +const port = `3700` + +var gearman_ready chan bool +var kill_gearman chan bool +var bye chan bool + +func init() { + + if check_gearman_present() { + panic(`Something already listening on our testing port. Chickening out of testing with it!`) + } + gearman_ready = make( chan bool ) + kill_gearman = make( chan bool ) + // TODO: verify port is clear + go run_gearman() +} + +func run_gearman() { + gm_cmd := exec.Command(`gearmand`, `--port`, port) + start_err := gm_cmd.Start() + + if start_err != nil { + panic(`could not start gearman, aborting test :` + start_err.Error()) + } + + // Make sure we clear up our gearman: + defer func(){ + gm_cmd.Process.Kill() + }() + + for tries := 10; tries > 0; tries-- { + if check_gearman_present() { + break + } + time.Sleep(250 * time.Millisecond) + } + + if !check_gearman_present() { + panic(`Unable to start gearman aborting test`) + } + gearman_ready <- true + + <- kill_gearman +} + +func check_gearman_present() bool { + con, err := net.Dial(`tcp`, `127.0.0.1:`+port) + if err != nil { + log.Println("gearman not ready " + err.Error()) + return false + } + log.Println("gearman ready") + con.Close() + return true +} + +func TestBasicDisconnect(t *testing.T) { + <- gearman_ready + + worker := New(Unlimited) + + if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil { + t.Error(err) + } + if err := worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { + t.Error(err) + } + + timeout := make(chan bool, 1) + done := make( chan bool, 1) + + worker.JobHandler = func( j Job ) error { + if( ! worker.ready ){ + t.Error("Worker not ready as expected"); + } + done <-true + return nil + } + handled_errors := false + + c_error := make( chan bool) + worker.ErrorHandler = func( e error ){ + handled_errors = true + c_error <- true + } + + go func() { + time.Sleep(5 * time.Second) + timeout <- true + }() + + go func(){ + worker.Work(); + }() + + // With the all-in-one Work() we don't know if the + // worker is ready at this stage so we may have to wait a sec: + go func(){ + tries := 3 + for( tries > 0 ){ + if worker.ready { + worker.Echo([]byte("Hello")) + kill_gearman <- true + log.Println("ok...") + worker.Echo([]byte("Hello")) + break + } + + // still waiting for it to be ready.. + time.Sleep(250 * time.Millisecond) + tries-- + } + }() + + + select{ + case <- c_error: + log.Println("eoo") + case <- timeout: + t.Error( "Test timed out waiting for the error handler" ) + } +} From 827cf3fa3e68aae370d938d653b679673a64d693 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 06:36:24 +0100 Subject: [PATCH 09/34] Removed old gearman from travis gearman-go and original gearmand don't play well, possibly an issue? --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 3d97d6d..b20dfe6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,4 +5,3 @@ go: before_install: - sudo apt-get remove -y gearman-job-server - sudo apt-get install -y gearman-job-server - - sudo apt-get install -y gearman-server From 99bcf11768e767de7d79b177e1e6fba42317e4ce Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Sat, 7 Jun 2014 07:22:40 +0100 Subject: [PATCH 10/34] FIX: The error handler is not called on disconnect Using the disconnect test showed the agent was silently closing on disconnect --- worker/agent.go | 8 +-- worker/worker_disconnect_test.go | 106 +++++++++++++++++++++---------- 2 files changed, 76 insertions(+), 38 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 2236173..194545b 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -53,13 +53,13 @@ func (a *agent) work() { for { if data, err = a.read(bufferSize); err != nil { if opErr, ok := err.(*net.OpError); ok { - if opErr.Timeout() { - a.worker.err(err) - } if opErr.Temporary() { continue + }else{ + a.worker.err(err) + break } - break + } a.worker.err(err) // If it is unexpected error and the connection wasn't diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go index fcfe1d9..e87e228 100644 --- a/worker/worker_disconnect_test.go +++ b/worker/worker_disconnect_test.go @@ -1,6 +1,7 @@ package worker import ( + "../client" "log" "net" "os/exec" @@ -26,15 +27,16 @@ func init() { } func run_gearman() { - gm_cmd := exec.Command(`gearmand`, `--port`, port) + gm_cmd := exec.Command(`/usr/sbin/gearmand`, `--port`, port) start_err := gm_cmd.Start() if start_err != nil { panic(`could not start gearman, aborting test :` + start_err.Error()) } - + // Make sure we clear up our gearman: - defer func(){ + defer func() { + log.Println("killing gearmand") gm_cmd.Process.Kill() }() @@ -64,20 +66,40 @@ func check_gearman_present() bool { return true } +func check_gearman_is_dead() bool { + + for tries := 10; tries > 0; tries-- { + if !check_gearman_present() { + return true + } + time.Sleep(250 * time.Millisecond) + } + return false +} + +/* + Checks for a disconnect whilst not working +*/ func TestBasicDisconnect(t *testing.T) { <- gearman_ready worker := New(Unlimited) + timeout := make(chan bool, 1) + done := make( chan bool, 1) if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil { t.Error(err) } - if err := worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { + work_done := false; + if err := worker.AddFunc("gearman-go-workertest", + func(j Job)(b []byte, e error){ + work_done = true; + done <- true + return}, 0); + err != nil { t.Error(err) } - timeout := make(chan bool, 1) - done := make( chan bool, 1) worker.JobHandler = func( j Job ) error { if( ! worker.ready ){ @@ -90,6 +112,7 @@ func TestBasicDisconnect(t *testing.T) { c_error := make( chan bool) worker.ErrorHandler = func( e error ){ + log.Println( e ) handled_errors = true c_error <- true } @@ -99,34 +122,49 @@ func TestBasicDisconnect(t *testing.T) { timeout <- true }() - go func(){ - worker.Work(); - }() + err := worker.Ready() - // With the all-in-one Work() we don't know if the - // worker is ready at this stage so we may have to wait a sec: - go func(){ - tries := 3 - for( tries > 0 ){ - if worker.ready { - worker.Echo([]byte("Hello")) - kill_gearman <- true - log.Println("ok...") - worker.Echo([]byte("Hello")) - break - } - - // still waiting for it to be ready.. - time.Sleep(250 * time.Millisecond) - tries-- - } - }() - - - select{ - case <- c_error: - log.Println("eoo") - case <- timeout: - t.Error( "Test timed out waiting for the error handler" ) + if err != nil { + t.Error(err) } + + go worker.Work() + + kill_gearman <- true + + check_gearman_is_dead() + go run_gearman() + + select { + case <-gearman_ready: + case <-timeout: + } + + send_client_request() + + select { + case <- done: + t.Error("Client request handled (somehow), did we magically reconnect?") + case <-timeout: + t.Error("Test timed out waiting for the error handler") + case <-c_error: + // error was handled! + } + + kill_gearman <- true + } + +func send_client_request(){ + log.Println("sending client request"); + c, err := client.New( Network, "127.0.0.1:" + port ) + if err == nil { + _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) + if err != nil { + log.Println( "error sending client request " + err.Error() ) + } + + }else{ + log.Println( "error with client " + err.Error() ) + } +} \ No newline at end of file From 1ebb3d5fcc8521363a3f18f3e95f759034dd2e94 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Tue, 10 Jun 2014 03:23:18 +0100 Subject: [PATCH 11/34] Wrap disconnect errors and allow reconnect --- worker/agent.go | 34 ++++++- worker/worker.go | 32 ++++++- worker/worker_disconnect_test.go | 149 +++++++++++++++++++++++-------- 3 files changed, 177 insertions(+), 38 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 194545b..310cca8 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -46,6 +46,7 @@ func (a *agent) work() { a.worker.err(err.(error)) } }() + var inpack *inPack var l int var err error @@ -56,7 +57,17 @@ func (a *agent) work() { if opErr.Temporary() { continue }else{ - a.worker.err(err) + a.Lock() + if( a.conn != nil ){ + a.Unlock() + err = &WorkerDisconnectError{ + OpError : opErr, + agent : a, + } + a.worker.err(err) + } + // else - we're probably dc'ing due to a Close() + break } @@ -107,6 +118,10 @@ func (a *agent) Close() { func (a *agent) Grab() { a.Lock() defer a.Unlock() + a.grab() +} + +func (a *agent) grab(){ outpack := getOutPack() outpack.dataType = dtGrabJobUniq a.write(outpack) @@ -120,6 +135,23 @@ func (a *agent) PreSleep() { a.write(outpack) } +func (a *agent) reconnect() (error){ + a.Lock() + defer a.Unlock() + conn, err := net.Dial(a.net, a.addr) + if err != nil { + return err; + } + a.conn = conn + a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn), + bufio.NewWriter(a.conn)) + a.grab() + a.worker.reRegisterFuncsForAgent(a) + + go a.work() + return nil +} + // read length bytes from the socket func (a *agent) read(length int) (data []byte, err error) { n := 0 diff --git a/worker/worker.go b/worker/worker.go index a81f4ab..3b9f695 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -7,6 +7,7 @@ import ( "fmt" "sync" "time" + "net" ) const ( @@ -30,6 +31,7 @@ type Worker struct { limit chan bool } + // Return a worker. // // If limit is set to Unlimited(=0), the worker will grab all jobs @@ -94,6 +96,11 @@ func (worker *Worker) AddFunc(funcname string, // inner add func (worker *Worker) addFunc(funcname string, timeout uint32) { + outpack := prepFuncOutpack( funcname, timeout) + worker.broadcast(outpack) +} + +func prepFuncOutpack(funcname string, timeout uint32) (*outPack){ outpack := getOutPack() if timeout == 0 { outpack.dataType = dtCanDo @@ -106,7 +113,7 @@ func (worker *Worker) addFunc(funcname string, timeout uint32) { outpack.data[l] = '\x00' binary.BigEndian.PutUint32(outpack.data[l+1:], timeout) } - worker.broadcast(outpack) + return outpack } // Remove a function. @@ -293,6 +300,15 @@ func (worker *Worker) exec(inpack *inPack) (err error) { } return } +func (worker *Worker)reRegisterFuncsForAgent( a * agent ){ + worker.Lock() + defer worker.Unlock() + for funcname, f := range worker.funcs { + outpack := prepFuncOutpack( funcname, f.timeout) + a.write(outpack) + } + +} // inner result type result struct { @@ -316,3 +332,17 @@ func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) { } return r } + +// Error type passed when a worker connection disconnects +type WorkerDisconnectError struct{ + *net.OpError + agent * agent +} +// Responds to the error by asking the worker to reconnect +func (e *WorkerDisconnectError) Reconnect() ( err error ){ + return e.agent.reconnect() +} +// Which server was this for? +func(e *WorkerDisconnectError) Server() ( net string, addr string ){ + return e.agent.net, e.agent.addr +} diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go index e87e228..8925e7a 100644 --- a/worker/worker_disconnect_test.go +++ b/worker/worker_disconnect_test.go @@ -20,8 +20,8 @@ func init() { if check_gearman_present() { panic(`Something already listening on our testing port. Chickening out of testing with it!`) } - gearman_ready = make( chan bool ) - kill_gearman = make( chan bool ) + gearman_ready = make(chan bool) + kill_gearman = make(chan bool) // TODO: verify port is clear go run_gearman() } @@ -61,7 +61,6 @@ func check_gearman_present() bool { log.Println("gearman not ready " + err.Error()) return false } - log.Println("gearman ready") con.Close() return true } @@ -81,38 +80,32 @@ func check_gearman_is_dead() bool { Checks for a disconnect whilst not working */ func TestBasicDisconnect(t *testing.T) { - <- gearman_ready - + <-gearman_ready + worker := New(Unlimited) timeout := make(chan bool, 1) - done := make( chan bool, 1) + done := make(chan bool, 1) - if err := worker.AddServer(Network, "127.0.0.1:" + port); err != nil { + if err := worker.AddServer(Network, "127.0.0.1:"+port); err != nil { t.Error(err) } - work_done := false; - if err := worker.AddFunc("gearman-go-workertest", - func(j Job)(b []byte, e error){ - work_done = true; - done <- true - return}, 0); - err != nil { + work_done := false + if err := worker.AddFunc("gearman-go-workertest", + func(j Job) (b []byte, e error) { + work_done = true + done <- true + return + }, 0); err != nil { t.Error(err) } - - worker.JobHandler = func( j Job ) error { - if( ! worker.ready ){ - t.Error("Worker not ready as expected"); - } - done <-true - return nil - } - handled_errors := false - - c_error := make( chan bool) - worker.ErrorHandler = func( e error ){ - log.Println( e ) + handled_errors := false + + c_error := make(chan bool) + was_dc_err := false + worker.ErrorHandler = func(e error) { + log.Println(e) + _, was_dc_err = e.(*WorkerDisconnectError) handled_errors = true c_error <- true } @@ -149,22 +142,106 @@ func TestBasicDisconnect(t *testing.T) { t.Error("Test timed out waiting for the error handler") case <-c_error: // error was handled! + if ! was_dc_err { + t.Error( "Disconnect didn't manifest as a net.OpError?") + } } - + worker.Close() kill_gearman <- true } -func send_client_request(){ - log.Println("sending client request"); - c, err := client.New( Network, "127.0.0.1:" + port ) +func TestDcRc(t *testing.T) { + check_gearman_is_dead() + go run_gearman() + + <-gearman_ready + + worker := New(Unlimited) + timeout := make(chan bool, 1) + done := make(chan bool, 1) + + if err := worker.AddServer(Network, "127.0.0.1:"+port); err != nil { + t.Error(err) + } + work_done := false + if err := worker.AddFunc("gearman-go-workertest", + func(j Job) (b []byte, e error) { + log.Println("Actual work happens!") + work_done = true + done <- true + return + }, 0); err != nil { + t.Error(err) + } + + worker.ErrorHandler = func(e error) { + wdc, wdcok := e.(*WorkerDisconnectError) + + if( wdcok){ + log.Println("Reconnecting!") + reconnected := false + for tries := 20 ; ! reconnected && tries > 0; tries -- { + rcerr := wdc.Reconnect() + if rcerr != nil{ + time.Sleep(250 * time.Millisecond) + } else{ + reconnected = true; + } + } + + + } else{ + panic("Some other kind of error " + e.Error()) + } + + } + + go func() { + time.Sleep(5 * time.Second) + timeout <- true + }() + + err := worker.Ready() + + if err != nil { + t.Error(err) + } + + go worker.Work() + + kill_gearman <- true + + check_gearman_is_dead() + go run_gearman() + + select { + case <-gearman_ready: + case <-timeout: + } + + send_client_request() + + select { + case <- done: + case <-timeout: + t.Error("Test timed out") + } + worker.Close() + kill_gearman <- true + +} + +func send_client_request() { + log.Println("sending client request") + c, err := client.New(Network, "127.0.0.1:"+port) if err == nil { _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) if err != nil { - log.Println( "error sending client request " + err.Error() ) + log.Println("error sending client request " + err.Error()) } - - }else{ - log.Println( "error with client " + err.Error() ) + + } else { + log.Println("error with client " + err.Error()) } -} \ No newline at end of file +} From 09c626f48830df67b05f5a54ca5e41428a9286dc Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Tue, 10 Jun 2014 03:46:21 +0100 Subject: [PATCH 12/34] Cope with io.EOF as a disconnect error --- worker/agent.go | 23 ++++++++++++++--------- worker/worker.go | 7 +++++-- worker/worker_disconnect_test.go | 3 --- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 310cca8..9befa9c 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -4,6 +4,7 @@ import ( "bufio" "net" "sync" + "io" ) // The agent of job server. @@ -57,20 +58,14 @@ func (a *agent) work() { if opErr.Temporary() { continue }else{ - a.Lock() - if( a.conn != nil ){ - a.Unlock() - err = &WorkerDisconnectError{ - OpError : opErr, - agent : a, - } - a.worker.err(err) - } + a.disconnect_error(err) // else - we're probably dc'ing due to a Close() break } + } else if( err == io.EOF ){ + a.disconnect_error(err) } a.worker.err(err) // If it is unexpected error and the connection wasn't @@ -106,6 +101,16 @@ func (a *agent) work() { } } +func (a * agent) disconnect_error( err error ){ + if( a.conn != nil ){ + err = &WorkerDisconnectError{ + err : err, + agent : a, + } + a.worker.err(err) + } +} + func (a *agent) Close() { a.Lock() defer a.Unlock() diff --git a/worker/worker.go b/worker/worker.go index 3b9f695..0bb7b4c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -7,7 +7,6 @@ import ( "fmt" "sync" "time" - "net" ) const ( @@ -335,9 +334,13 @@ func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) { // Error type passed when a worker connection disconnects type WorkerDisconnectError struct{ - *net.OpError + err error agent * agent } +func (e *WorkerDisconnectError) Error() ( string){ + return e.err.Error(); +} + // Responds to the error by asking the worker to reconnect func (e *WorkerDisconnectError) Reconnect() ( err error ){ return e.agent.reconnect() diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go index 8925e7a..34a3621 100644 --- a/worker/worker_disconnect_test.go +++ b/worker/worker_disconnect_test.go @@ -36,7 +36,6 @@ func run_gearman() { // Make sure we clear up our gearman: defer func() { - log.Println("killing gearmand") gm_cmd.Process.Kill() }() @@ -58,7 +57,6 @@ func run_gearman() { func check_gearman_present() bool { con, err := net.Dial(`tcp`, `127.0.0.1:`+port) if err != nil { - log.Println("gearman not ready " + err.Error()) return false } con.Close() @@ -233,7 +231,6 @@ func TestDcRc(t *testing.T) { } func send_client_request() { - log.Println("sending client request") c, err := client.New(Network, "127.0.0.1:"+port) if err == nil { _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) From 97731e177477b18a0931850228ccb22e005c61d3 Mon Sep 17 00:00:00 2001 From: Joe Higton Date: Tue, 10 Jun 2014 04:09:27 +0100 Subject: [PATCH 13/34] FIX: EOF disconnect error also called raw handler afterwards --- worker/agent.go | 1 + 1 file changed, 1 insertion(+) diff --git a/worker/agent.go b/worker/agent.go index 9befa9c..7385211 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -66,6 +66,7 @@ func (a *agent) work() { } else if( err == io.EOF ){ a.disconnect_error(err) + break } a.worker.err(err) // If it is unexpected error and the connection wasn't From 14f4df8ede0150c51852c14fad816d93102d3175 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Fri, 13 Jun 2014 10:55:32 +0800 Subject: [PATCH 14/34] Joe helped us greatly --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 20c0281..aef8968 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,7 @@ Contributors * [Damian Gryski](https://github.com/dgryski) * [Ingo Oeser](https://github.com/nightlyone) * [jake](https://github.com/jbaikge) + * [Joe Higton](https://github.com/draxil) * [Jonathan Wills](https://github.com/runningwild) * [Kevin Darlington](https://github.com/kdar) * [miraclesu](https://github.com/miraclesu) From 3e556edb2dbe7345d8e22d223c9cba651bcae4b3 Mon Sep 17 00:00:00 2001 From: Graham Barr Date: Fri, 23 May 2014 08:00:58 -0500 Subject: [PATCH 15/34] When a job completes, remove its handler --- client/client.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index b98dffe..b2a8c00 100644 --- a/client/client.go +++ b/client/client.go @@ -145,10 +145,8 @@ func (client *Client) processLoop() { case dtWorkData, dtWorkWarning, dtWorkStatus: resp = client.handleResponse(resp.Handle, resp) case dtWorkComplete, dtWorkFail, dtWorkException: - resp = client.handleResponse(resp.Handle, resp) - if resp != nil { - delete(client.respHandler, resp.Handle) - } + client.handleResponse(resp.Handle, resp) + delete(client.respHandler, resp.Handle) } } } From 49ea8c0ec17715f30929c03b2bf2262b3a3bc492 Mon Sep 17 00:00:00 2001 From: Graham Barr Date: Mon, 18 Aug 2014 09:51:33 -0500 Subject: [PATCH 16/34] Increase buffer size to 8K --- client/common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/common.go b/client/common.go index ad47936..e520bf0 100644 --- a/client/common.go +++ b/client/common.go @@ -5,7 +5,7 @@ const ( // queue size queueSize = 8 // read buffer size - bufferSize = 1024 + bufferSize = 8192 // min packet length minPacketLength = 12 From d82da8fd71a5eb52ce22ada7e7f8df06e7f39c08 Mon Sep 17 00:00:00 2001 From: Graham Barr Date: Mon, 18 Aug 2014 09:58:28 -0500 Subject: [PATCH 17/34] Avoid read channel corruption when response size > bufferSize When receiving a response, what was happening 1. Read bufferSize and it gets assigned to leftdata 2. Read another bufferSize 3. 2 buffers get appended, but leftdata still points to first buffer 4. Process data buffer which contains only complete responses 5. Back to ReadLoop, but leftdata still points to first incomplete buffer causing corrupt data to be processed Solution is to make leftdata nil once we have merged it with the second buffer --- client/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/client.go b/client/client.go index b2a8c00..55ca858 100644 --- a/client/client.go +++ b/client/client.go @@ -104,6 +104,7 @@ ReadLoop: } if len(leftdata) > 0 { // some data left for processing data = append(leftdata, data...) + leftdata = nil } for { l := len(data) From 7bcc4de76f835071363f17e3a0cb8746b4146566 Mon Sep 17 00:00:00 2001 From: Graham Barr Date: Wed, 20 Aug 2014 11:27:32 -0500 Subject: [PATCH 18/34] Do not ignore write errors --- client/client.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index 55ca858..a99d730 100644 --- a/client/client.go +++ b/client/client.go @@ -194,7 +194,11 @@ func (client *Client) do(funcname string, data []byte, id := IdGen.Id() req := getJob(id, []byte(funcname), data) req.DataType = flag - client.write(req) + if err = client.write(req); err != nil { + delete(client.innerHandler, "c") + client.lastcall = "" + return + } mutex.Lock() return } From e005ea44421d7acef6a460449c854a861a5c7c30 Mon Sep 17 00:00:00 2001 From: Gabriel Cristian Alecu Date: Thu, 21 Aug 2014 10:41:23 +0300 Subject: [PATCH 19/34] Fixed WORK_FAIL needing 2 arguments According to http://gearman.org/protocol/ , WORK_FAIL only has 1 argument: the handle --- client/response.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/client/response.go b/client/response.go index bcafc53..ce8e509 100644 --- a/client/response.go +++ b/client/response.go @@ -76,7 +76,7 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) { case dtJobCreated: resp.Handle = string(dt) case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus, - dtWorkComplete, dtWorkFail, dtWorkException: + dtWorkComplete, dtWorkException: s := bytes.SplitN(dt, []byte{'\x00'}, 2) if len(s) >= 2 { resp.Handle = string(s[0]) @@ -85,6 +85,14 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) { err = fmt.Errorf("Invalid data: %v", data) return } + case dtWorkFail: + s := bytes.SplitN(dt, []byte{'\x00'}, 2) + if len(s) >= 1 { + resp.Handle = string(s[0]) + } else { + err = fmt.Errorf("Invalid data: %v", data) + return + } case dtEchoRes: fallthrough default: From a003eac543c881722412169a8f6c028967dcccf2 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Tue, 6 Jan 2015 11:34:39 +0800 Subject: [PATCH 20/34] fixed the closing method --- worker/worker.go | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index 0bb7b4c..c8c91db 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -22,7 +22,7 @@ type Worker struct { funcs jobFuncs in chan *inPack running bool - ready bool + ready bool Id string ErrorHandler ErrorHandler @@ -30,7 +30,6 @@ type Worker struct { limit chan bool } - // Return a worker. // // If limit is set to Unlimited(=0), the worker will grab all jobs @@ -95,11 +94,11 @@ func (worker *Worker) AddFunc(funcname string, // inner add func (worker *Worker) addFunc(funcname string, timeout uint32) { - outpack := prepFuncOutpack( funcname, timeout) + outpack := prepFuncOutpack(funcname, timeout) worker.broadcast(outpack) } -func prepFuncOutpack(funcname string, timeout uint32) (*outPack){ +func prepFuncOutpack(funcname string, timeout uint32) *outPack { outpack := getOutPack() if timeout == 0 { outpack.dataType = dtCanDo @@ -188,19 +187,14 @@ func (worker *Worker) Ready() (err error) { // Main loop, block here // Most of time, this should be evaluated in goroutine. func (worker *Worker) Work() { - if ! worker.ready { + if !worker.ready { // didn't run Ready beforehand, so we'll have to do it: err := worker.Ready() if err != nil { - panic( err ) + panic(err) } } - defer func() { - for _, a := range worker.agents { - a.Close() - } - }() worker.running = true for _, a := range worker.agents { a.Grab() @@ -223,8 +217,11 @@ func (worker *Worker) customeHandler(inpack *inPack) { // Close connection and exit main loop func (worker *Worker) Close() { worker.Lock() - worker.Unlock() + defer worker.Unlock() if worker.running == true { + for _, a := range worker.agents { + a.Close() + } worker.running = false close(worker.in) } @@ -299,11 +296,11 @@ func (worker *Worker) exec(inpack *inPack) (err error) { } return } -func (worker *Worker)reRegisterFuncsForAgent( a * agent ){ +func (worker *Worker) reRegisterFuncsForAgent(a *agent) { worker.Lock() defer worker.Unlock() for funcname, f := range worker.funcs { - outpack := prepFuncOutpack( funcname, f.timeout) + outpack := prepFuncOutpack(funcname, f.timeout) a.write(outpack) } @@ -333,19 +330,21 @@ func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) { } // Error type passed when a worker connection disconnects -type WorkerDisconnectError struct{ - err error - agent * agent +type WorkerDisconnectError struct { + err error + agent *agent } -func (e *WorkerDisconnectError) Error() ( string){ - return e.err.Error(); + +func (e *WorkerDisconnectError) Error() string { + return e.err.Error() } // Responds to the error by asking the worker to reconnect -func (e *WorkerDisconnectError) Reconnect() ( err error ){ +func (e *WorkerDisconnectError) Reconnect() (err error) { return e.agent.reconnect() } -// Which server was this for? -func(e *WorkerDisconnectError) Server() ( net string, addr string ){ + +// Which server was this for? +func (e *WorkerDisconnectError) Server() (net string, addr string) { return e.agent.net, e.agent.addr } From ad9b3cb98893104f9856422edbb05f4c0190c5ca Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Tue, 6 Jan 2015 11:45:18 +0800 Subject: [PATCH 21/34] go fmt --- example/client/client.go | 2 +- example/worker/worker.go | 4 +-- worker/agent.go | 26 ++++++++--------- worker/worker_disconnect_test.go | 33 ++++++++++----------- worker/worker_test.go | 50 ++++++++++++++++---------------- 5 files changed, 57 insertions(+), 58 deletions(-) diff --git a/example/client/client.go b/example/client/client.go index e7a34ea..61b0d61 100644 --- a/example/client/client.go +++ b/example/client/client.go @@ -3,8 +3,8 @@ package main import ( "github.com/mikespook/gearman-go/client" "log" - "sync" "os" + "sync" ) func main() { diff --git a/example/worker/worker.go b/example/worker/worker.go index 7474517..0fff06c 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -4,10 +4,10 @@ import ( "github.com/mikespook/gearman-go/worker" "github.com/mikespook/golib/signal" "log" + "net" "os" "strings" "time" - "net" ) func ToUpper(job worker.Job) ([]byte, error) { @@ -41,7 +41,7 @@ func main() { w.ErrorHandler = func(e error) { log.Println(e) if opErr, ok := e.(*net.OpError); ok { - if ! opErr.Temporary() { + if !opErr.Temporary() { proc, err := os.FindProcess(os.Getpid()) if err != nil { log.Println(err) diff --git a/worker/agent.go b/worker/agent.go index afd6f76..5c2b384 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -4,9 +4,9 @@ import ( "bufio" "bytes" "encoding/binary" + "io" "net" "sync" - "io" ) // The agent of job server. @@ -59,14 +59,14 @@ func (a *agent) work() { if opErr, ok := err.(*net.OpError); ok { if opErr.Temporary() { continue - }else{ + } else { a.disconnect_error(err) // else - we're probably dc'ing due to a Close() break } - - } else if( err == io.EOF ){ + + } else if err == io.EOF { a.disconnect_error(err) break } @@ -104,11 +104,11 @@ func (a *agent) work() { } } -func (a * agent) disconnect_error( err error ){ - if( a.conn != nil ){ +func (a *agent) disconnect_error(err error) { + if a.conn != nil { err = &WorkerDisconnectError{ - err : err, - agent : a, + err: err, + agent: a, } a.worker.err(err) } @@ -129,7 +129,7 @@ func (a *agent) Grab() { a.grab() } -func (a *agent) grab(){ +func (a *agent) grab() { outpack := getOutPack() outpack.dataType = dtGrabJobUniq a.write(outpack) @@ -143,16 +143,16 @@ func (a *agent) PreSleep() { a.write(outpack) } -func (a *agent) reconnect() (error){ - a.Lock() +func (a *agent) reconnect() error { + a.Lock() defer a.Unlock() conn, err := net.Dial(a.net, a.addr) if err != nil { - return err; + return err } a.conn = conn a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn), - bufio.NewWriter(a.conn)) + bufio.NewWriter(a.conn)) a.grab() a.worker.reRegisterFuncsForAgent(a) diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go index 34a3621..d0f6cc2 100644 --- a/worker/worker_disconnect_test.go +++ b/worker/worker_disconnect_test.go @@ -50,8 +50,8 @@ func run_gearman() { panic(`Unable to start gearman aborting test`) } gearman_ready <- true - - <- kill_gearman + + <-kill_gearman } func check_gearman_present() bool { @@ -134,14 +134,14 @@ func TestBasicDisconnect(t *testing.T) { send_client_request() select { - case <- done: + case <-done: t.Error("Client request handled (somehow), did we magically reconnect?") case <-timeout: t.Error("Test timed out waiting for the error handler") case <-c_error: // error was handled! - if ! was_dc_err { - t.Error( "Disconnect didn't manifest as a net.OpError?") + if !was_dc_err { + t.Error("Disconnect didn't manifest as a net.OpError?") } } worker.Close() @@ -175,24 +175,23 @@ func TestDcRc(t *testing.T) { worker.ErrorHandler = func(e error) { wdc, wdcok := e.(*WorkerDisconnectError) - - if( wdcok){ + + if wdcok { log.Println("Reconnecting!") reconnected := false - for tries := 20 ; ! reconnected && tries > 0; tries -- { + for tries := 20; !reconnected && tries > 0; tries-- { rcerr := wdc.Reconnect() - if rcerr != nil{ + if rcerr != nil { time.Sleep(250 * time.Millisecond) - } else{ - reconnected = true; + } else { + reconnected = true } } - - - } else{ + + } else { panic("Some other kind of error " + e.Error()) } - + } go func() { @@ -219,9 +218,9 @@ func TestDcRc(t *testing.T) { } send_client_request() - + select { - case <- done: + case <-done: case <-timeout: t.Error("Test timed out") } diff --git a/worker/worker_test.go b/worker/worker_test.go index b496a47..1029b74 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -139,7 +139,7 @@ func TestWorkerClose(t *testing.T) { worker.Close() } -func TestWorkWithoutReady(t * testing.T){ +func TestWorkWithoutReady(t *testing.T) { other_worker := New(Unlimited) if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil { @@ -148,15 +148,15 @@ func TestWorkWithoutReady(t * testing.T){ if err := other_worker.AddFunc("gearman-go-workertest", foobar, 0); err != nil { t.Error(err) } - - timeout := make(chan bool, 1) - done := make( chan bool, 1) - other_worker.JobHandler = func( j Job ) error { - if( ! other_worker.ready ){ - t.Error("Worker not ready as expected"); + timeout := make(chan bool, 1) + done := make(chan bool, 1) + + other_worker.JobHandler = func(j Job) error { + if !other_worker.ready { + t.Error("Worker not ready as expected") } - done <-true + done <- true return nil } go func() { @@ -164,15 +164,15 @@ func TestWorkWithoutReady(t * testing.T){ timeout <- true }() - go func(){ - other_worker.Work(); + go func() { + other_worker.Work() }() - // With the all-in-one Work() we don't know if the + // With the all-in-one Work() we don't know if the // worker is ready at this stage so we may have to wait a sec: - go func(){ + go func() { tries := 5 - for( tries > 0 ){ + for tries > 0 { if other_worker.ready { other_worker.Echo([]byte("Hello")) break @@ -183,24 +183,24 @@ func TestWorkWithoutReady(t * testing.T){ tries-- } }() - + // determine if we've finished or timed out: - select{ - case <- timeout: + select { + case <-timeout: t.Error("Test timed out waiting for the worker") - case <- done: + case <-done: } } -func TestWorkWithoutReadyWithPanic(t * testing.T){ +func TestWorkWithoutReadyWithPanic(t *testing.T) { other_worker := New(Unlimited) - + timeout := make(chan bool, 1) - done := make( chan bool, 1) + done := make(chan bool, 1) // Going to work with no worker setup. // when Work (hopefully) calls Ready it will get an error which should cause it to panic() - go func(){ + go func() { defer func() { if err := recover(); err != nil { done <- true @@ -209,17 +209,17 @@ func TestWorkWithoutReadyWithPanic(t * testing.T){ t.Error("Work should raise a panic.") done <- true }() - other_worker.Work(); + other_worker.Work() }() go func() { time.Sleep(2 * time.Second) timeout <- true }() - select{ - case <- timeout: + select { + case <-timeout: t.Error("Test timed out waiting for the worker") - case <- done: + case <-done: } } From c01a2e22c0663d19440e5ca4743e4c45f009b55b Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Sat, 10 Jan 2015 00:09:38 +0800 Subject: [PATCH 22/34] upgrade travis to Go 1.4 --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index b20dfe6..5eb5bad 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go go: - - 1.2 + - 1.4 before_install: - sudo apt-get remove -y gearman-job-server From 9dbb3ea3fc1e5d5c3966db467958e3c82219e49e Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Thu, 15 Jan 2015 10:26:53 +0800 Subject: [PATCH 23/34] added lock-write to co-ordinate package sequence #56 --- worker/agent.go | 7 +++++++ worker/worker.go | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/worker/agent.go b/worker/agent.go index 5c2b384..659960a 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -200,3 +200,10 @@ func (a *agent) write(outpack *outPack) (err error) { } return a.rw.Flush() } + +// Write with lock +func (a *agent) Write(outpack *outPack) (err error) { + a.Lock() + defer a.Unlock() + return a.write(outpack) +} diff --git a/worker/worker.go b/worker/worker.go index c8c91db..25d6a60 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -292,7 +292,7 @@ func (worker *Worker) exec(inpack *inPack) (err error) { } outpack.handle = inpack.handle outpack.data = r.data - inpack.a.write(outpack) + inpack.a.Write(outpack) } return } From 939189448e7aae3f68c2c5a5f5abde9df3e0089d Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Thu, 15 Jan 2015 10:28:21 +0800 Subject: [PATCH 24/34] golib/signal upgrade --- example/worker/worker.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/example/worker/worker.go b/example/worker/worker.go index 0fff06c..035a5f8 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -1,13 +1,14 @@ package main import ( - "github.com/mikespook/gearman-go/worker" - "github.com/mikespook/golib/signal" "log" "net" "os" "strings" "time" + + "github.com/mikespook/gearman-go/worker" + "github.com/mikespook/golib/signal" ) func ToUpper(job worker.Job) ([]byte, error) { @@ -68,7 +69,6 @@ func main() { return } go w.Work() - sh := signal.NewHandler() - sh.Bind(os.Interrupt, func() bool { return true }) - sh.Loop() + signal.Bind(os.Interrupt, func() uint { return signal.BreakExit }) + signal.Loop() } From bc80b2f85383e694d7f81b166ce4975405ac39dd Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Fri, 16 Jan 2015 17:41:19 +0800 Subject: [PATCH 25/34] fixed signal method --- example/worker/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/worker/worker.go b/example/worker/worker.go index 035a5f8..3b0ddad 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -70,5 +70,5 @@ func main() { } go w.Work() signal.Bind(os.Interrupt, func() uint { return signal.BreakExit }) - signal.Loop() + signal.Wait() } From 37db439a4a8cb0d0c1fcd820e99470e69e695f8e Mon Sep 17 00:00:00 2001 From: "C.R. Kirkwood-Watts" Date: Mon, 19 Jan 2015 10:28:21 -0800 Subject: [PATCH 26/34] Update README.md Updated to reflect new type names. --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index aef8968..ad25606 100644 --- a/README.md +++ b/README.md @@ -67,10 +67,10 @@ Usage echomsg, err := c.Echo(echo) // ... error handling log.Println(string(echomsg)) - jobHandler := func(job *client.Job) { - log.Printf("%s", job.Data) + jobHandler := func(resp *client.Response) { + log.Printf("%s", resp.Data) } - handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL, jobHandler) + handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler) // ... Branches From b612b80f80c3e6b75f90d46c7746b7cc25fc9a2a Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Tue, 20 Jan 2015 10:27:12 +0800 Subject: [PATCH 27/34] complete the list of contributors --- README.md | 81 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index ad25606..e008c82 100644 --- a/README.md +++ b/README.md @@ -37,41 +37,45 @@ Usage ## Worker - // Limit number of concurrent jobs execution. Use worker.Unlimited (0) if you want no limitation. - w := worker.New(worker.OneByOne) - w.ErrHandler = func(e error) { - log.Println(e) - } - w.AddServer("127.0.0.1:4730") - // Use worker.Unlimited (0) if you want no timeout - w.AddFunc("ToUpper", ToUpper, worker.Unlimited) - // This will give a timeout of 5 seconds - w.AddFunc("ToUpperTimeOut5", ToUpper, 5) - if err := w.Ready(); err != nil { - log.Fatal(err) - return - } - go w.Work() - +```go +// Limit number of concurrent jobs execution. Use worker.Unlimited (0) if you want no limitation. +w := worker.New(worker.OneByOne) +w.ErrHandler = func(e error) { + log.Println(e) +} +w.AddServer("127.0.0.1:4730") +// Use worker.Unlimited (0) if you want no timeout +w.AddFunc("ToUpper", ToUpper, worker.Unlimited) +// This will give a timeout of 5 seconds +w.AddFunc("ToUpperTimeOut5", ToUpper, 5) + +if err := w.Ready(); err != nil { + log.Fatal(err) + return +} +go w.Work() +``` ## Client - // ... - c, err := client.New("tcp4", "127.0.0.1:4730") - // ... error handling - defer c.Close() - c.ErrorHandler = func(e error) { - log.Println(e) - } - echo := []byte("Hello\x00 world") - echomsg, err := c.Echo(echo) - // ... error handling - log.Println(string(echomsg)) - jobHandler := func(resp *client.Response) { - log.Printf("%s", resp.Data) - } - handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler) - // ... +```go +// ... +c, err := client.New("tcp4", "127.0.0.1:4730") +// ... error handling +defer c.Close() +c.ErrorHandler = func(e error) { + log.Println(e) +} +echo := []byte("Hello\x00 world") +echomsg, err := c.Echo(echo) +// ... error handling +log.Println(string(echomsg)) +jobHandler := func(resp *client.Response) { + log.Printf("%s", resp.Data) +} +handle, err := c.Do("ToUpper", echo, client.JobNormal, jobHandler) +// ... +``` Branches ======== @@ -87,10 +91,15 @@ __Use at your own risk!__ Contributors ============ +Great thanks to all of you for your support and interest! + (_Alphabetic order_) * [Alex Zylman](https://github.com/azylman) + * [C.R. Kirkwood-Watts](https://github.com/kirkwood) * [Damian Gryski](https://github.com/dgryski) + * [Gabriel Cristian Alecu](https://github.com/AzuraMeta) + * [Graham Barr](https://github.com/gbarr) * [Ingo Oeser](https://github.com/nightlyone) * [jake](https://github.com/jbaikge) * [Joe Higton](https://github.com/draxil) @@ -98,9 +107,13 @@ Contributors * [Kevin Darlington](https://github.com/kdar) * [miraclesu](https://github.com/miraclesu) * [Paul Mach](https://github.com/paulmach) + * [Randall McPherson](https://github.com/rlmcpherson) * [Sam Grimee](https://github.com/sgrimee) - * suchj - * [Xing Xing](http://mikespook.com) [@Twitter](http://twitter.com/mikespook) + +Maintainer +========== + + * [Xing Xing](http://mikespook.com) <> [@Twitter](http://twitter.com/mikespook) Open Source - MIT Software License ================================== From df1af4f8cb64a0166eab7ba99a1bcc9db48107d0 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Tue, 20 Jan 2015 10:28:36 +0800 Subject: [PATCH 28/34] 80 chars/line --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e008c82..43b94fa 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,8 @@ Usage ## Worker ```go -// Limit number of concurrent jobs execution. Use worker.Unlimited (0) if you want no limitation. +// Limit number of concurrent jobs execution. +// Use worker.Unlimited (0) if you want no limitation. w := worker.New(worker.OneByOne) w.ErrHandler = func(e error) { log.Println(e) From c615e74af8cefbdb80f1f4db16b76434c75e96d6 Mon Sep 17 00:00:00 2001 From: Endre Hirling Date: Mon, 6 Jul 2015 21:40:49 +0200 Subject: [PATCH 29/34] Replace mutex in client.do() with a channel to avoid deadlock and introduce command timeout --- client/client.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index a99d730..e09b0a0 100644 --- a/client/client.go +++ b/client/client.go @@ -6,6 +6,7 @@ import ( "bufio" "net" "sync" + "time" ) // One client connect to one server. @@ -20,6 +21,8 @@ type Client struct { conn net.Conn rw *bufio.ReadWriter + RespTimeout time.Duration // response timeout for do() in ms + ErrorHandler ErrorHandler } @@ -31,6 +34,7 @@ func New(network, addr string) (client *Client, err error) { respHandler: make(map[string]ResponseHandler, queueSize), innerHandler: make(map[string]ResponseHandler, queueSize), in: make(chan *Response, queueSize), + RespTimeout: 1000, } client.conn, err = net.Dial(client.net, client.addr) if err != nil { @@ -175,21 +179,26 @@ func (client *Client) handleInner(key string, resp *Response) *Response { return resp } +type handleOrError struct { + handle string + err error +} + func (client *Client) do(funcname string, data []byte, flag uint32) (handle string, err error) { if client.conn == nil { return "", ErrLostConn } - var mutex sync.Mutex - mutex.Lock() + var result = make(chan handleOrError, 1) client.lastcall = "c" client.innerHandler["c"] = func(resp *Response) { - defer mutex.Unlock() if resp.DataType == dtError { err = getError(resp.Data) + result <- handleOrError{"", err} return } handle = resp.Handle + result <- handleOrError{handle, nil} } id := IdGen.Id() req := getJob(id, []byte(funcname), data) @@ -199,7 +208,15 @@ func (client *Client) do(funcname string, data []byte, client.lastcall = "" return } - mutex.Lock() + var timer = time.After(client.RespTimeout * time.Millisecond) + select { + case ret := <-result: + return ret.handle, ret.err + case <-timer: + delete(client.innerHandler, "c") + client.lastcall = "" + return "", ErrLostConn + } return } From c6c6c9cac20f68da5ff49ed92153aab6e91959d1 Mon Sep 17 00:00:00 2001 From: Xing Xing Date: Fri, 10 Jul 2015 20:30:35 +0800 Subject: [PATCH 30/34] don't use sigillum --- client/client.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/client/client.go b/client/client.go index e09b0a0..907704f 100644 --- a/client/client.go +++ b/client/client.go @@ -9,6 +9,10 @@ import ( "time" ) +var ( + DefaultTimeout time.Duration = 1000 +) + // One client connect to one server. // Use Pool for multi-connections. type Client struct { @@ -21,7 +25,7 @@ type Client struct { conn net.Conn rw *bufio.ReadWriter - RespTimeout time.Duration // response timeout for do() in ms + ResponseTimeout time.Duration // response timeout for do() in ms ErrorHandler ErrorHandler } @@ -29,12 +33,12 @@ type Client struct { // Return a client. func New(network, addr string) (client *Client, err error) { client = &Client{ - net: network, - addr: addr, - respHandler: make(map[string]ResponseHandler, queueSize), - innerHandler: make(map[string]ResponseHandler, queueSize), - in: make(chan *Response, queueSize), - RespTimeout: 1000, + net: network, + addr: addr, + respHandler: make(map[string]ResponseHandler, queueSize), + innerHandler: make(map[string]ResponseHandler, queueSize), + in: make(chan *Response, queueSize), + ResponseTimeout: DefaultTimeout, } client.conn, err = net.Dial(client.net, client.addr) if err != nil { @@ -181,7 +185,7 @@ func (client *Client) handleInner(key string, resp *Response) *Response { type handleOrError struct { handle string - err error + err error } func (client *Client) do(funcname string, data []byte, @@ -208,7 +212,7 @@ func (client *Client) do(funcname string, data []byte, client.lastcall = "" return } - var timer = time.After(client.RespTimeout * time.Millisecond) + var timer = time.After(client.ResponseTimeout * time.Millisecond) select { case ret := <-result: return ret.handle, ret.err From d20c3c7bd19a7d1c0007922a175e046ecc067610 Mon Sep 17 00:00:00 2001 From: John Ku Date: Thu, 10 Dec 2015 11:19:04 -0800 Subject: [PATCH 31/34] Allow custom Pool without constructor --- client/pool.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/client/pool.go b/client/pool.go index 23efb74..8a86b3b 100644 --- a/client/pool.go +++ b/client/pool.go @@ -16,15 +16,15 @@ var ( SelectRandom = selectRandom ) -type poolClient struct { +type PoolClient struct { *Client Rate int mutex sync.Mutex } -type SelectionHandler func(map[string]*poolClient, string) string +type SelectionHandler func(map[string]*PoolClient, string) string -func selectWithRate(pool map[string]*poolClient, +func selectWithRate(pool map[string]*PoolClient, last string) (addr string) { total := 0 for _, item := range pool { @@ -36,7 +36,7 @@ func selectWithRate(pool map[string]*poolClient, return last } -func selectRandom(pool map[string]*poolClient, +func selectRandom(pool map[string]*PoolClient, last string) (addr string) { r := rand.Intn(len(pool)) i := 0 @@ -53,7 +53,7 @@ type Pool struct { SelectionHandler SelectionHandler ErrorHandler ErrorHandler - clients map[string]*poolClient + clients map[string]*PoolClient last string mutex sync.Mutex @@ -62,7 +62,7 @@ type Pool struct { // Return a new pool. func NewPool() (pool *Pool) { return &Pool{ - clients: make(map[string]*poolClient, poolSize), + clients: make(map[string]*PoolClient, poolSize), SelectionHandler: SelectWithRate, } } @@ -71,7 +71,7 @@ func NewPool() (pool *Pool) { func (pool *Pool) Add(net, addr string, rate int) (err error) { pool.mutex.Lock() defer pool.mutex.Unlock() - var item *poolClient + var item *PoolClient var ok bool if item, ok = pool.clients[addr]; ok { item.Rate = rate @@ -79,7 +79,7 @@ func (pool *Pool) Add(net, addr string, rate int) (err error) { var client *Client client, err = New(net, addr) if err == nil { - item = &poolClient{Client: client, Rate: rate} + item = &PoolClient{Client: client, Rate: rate} pool.clients[addr] = item } } @@ -128,7 +128,7 @@ func (pool *Pool) Status(addr, handle string) (status *Status, err error) { // Send a something out, get the samething back. func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { - var client *poolClient + var client *PoolClient if addr == "" { client = pool.selectServer() } else { @@ -154,7 +154,7 @@ func (pool *Pool) Close() (err map[string]error) { } // selecting server -func (pool *Pool) selectServer() (client *poolClient) { +func (pool *Pool) selectServer() (client *PoolClient) { for client == nil { addr := pool.SelectionHandler(pool.clients, pool.last) var ok bool From 1c4b8aa000274a05e3492279b8cc61d97792f1a6 Mon Sep 17 00:00:00 2001 From: John Ku Date: Thu, 10 Dec 2015 13:54:48 -0800 Subject: [PATCH 32/34] Export pool.clients for custom Pool instantiation --- client/pool.go | 22 +++++++++++----------- client/pool_test.go | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pool.go b/client/pool.go index 8a86b3b..a6e7243 100644 --- a/client/pool.go +++ b/client/pool.go @@ -52,9 +52,9 @@ func selectRandom(pool map[string]*PoolClient, type Pool struct { SelectionHandler SelectionHandler ErrorHandler ErrorHandler + Clients map[string]*PoolClient - clients map[string]*PoolClient - last string + last string mutex sync.Mutex } @@ -62,7 +62,7 @@ type Pool struct { // Return a new pool. func NewPool() (pool *Pool) { return &Pool{ - clients: make(map[string]*PoolClient, poolSize), + Clients: make(map[string]*PoolClient, poolSize), SelectionHandler: SelectWithRate, } } @@ -73,14 +73,14 @@ func (pool *Pool) Add(net, addr string, rate int) (err error) { defer pool.mutex.Unlock() var item *PoolClient var ok bool - if item, ok = pool.clients[addr]; ok { + if item, ok = pool.Clients[addr]; ok { item.Rate = rate } else { var client *Client client, err = New(net, addr) if err == nil { item = &PoolClient{Client: client, Rate: rate} - pool.clients[addr] = item + pool.Clients[addr] = item } } return @@ -90,7 +90,7 @@ func (pool *Pool) Add(net, addr string, rate int) (err error) { func (pool *Pool) Remove(addr string) { pool.mutex.Lock() defer pool.mutex.Unlock() - delete(pool.clients, addr) + delete(pool.Clients, addr) } func (pool *Pool) Do(funcname string, data []byte, @@ -116,7 +116,7 @@ func (pool *Pool) DoBg(funcname string, data []byte, // Get job status from job server. // !!!Not fully tested.!!! func (pool *Pool) Status(addr, handle string) (status *Status, err error) { - if client, ok := pool.clients[addr]; ok { + if client, ok := pool.Clients[addr]; ok { client.Lock() defer client.Unlock() status, err = client.Status(handle) @@ -133,7 +133,7 @@ func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { client = pool.selectServer() } else { var ok bool - if client, ok = pool.clients[addr]; !ok { + if client, ok = pool.Clients[addr]; !ok { err = ErrNotFound return } @@ -147,7 +147,7 @@ func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) { // Close func (pool *Pool) Close() (err map[string]error) { err = make(map[string]error) - for _, c := range pool.clients { + for _, c := range pool.Clients { err[c.addr] = c.Close() } return @@ -156,9 +156,9 @@ func (pool *Pool) Close() (err map[string]error) { // selecting server func (pool *Pool) selectServer() (client *PoolClient) { for client == nil { - addr := pool.SelectionHandler(pool.clients, pool.last) + addr := pool.SelectionHandler(pool.Clients, pool.last) var ok bool - if client, ok = pool.clients[addr]; ok { + if client, ok = pool.Clients[addr]; ok { pool.last = addr break } diff --git a/client/pool_test.go b/client/pool_test.go index 5324db9..54e7e03 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -18,8 +18,8 @@ func TestPoolAdd(t *testing.T) { t.Log(err) c -= 1 } - if len(pool.clients) != c { - t.Errorf("%d servers expected, %d got.", c, len(pool.clients)) + if len(pool.Clients) != c { + t.Errorf("%d servers expected, %d got.", c, len(pool.Clients)) } } From 9d99accce2ab959162523b19771186ced220fa49 Mon Sep 17 00:00:00 2001 From: John Ku Date: Fri, 11 Dec 2015 13:48:02 -0800 Subject: [PATCH 33/34] Allow built in selectionHandlers to be reused outside of package --- client/pool.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/client/pool.go b/client/pool.go index a6e7243..9b94888 100644 --- a/client/pool.go +++ b/client/pool.go @@ -11,9 +11,7 @@ const ( ) var ( - ErrNotFound = errors.New("Server Not Found") - SelectWithRate = selectWithRate - SelectRandom = selectRandom + ErrNotFound = errors.New("Server Not Found") ) type PoolClient struct { @@ -24,7 +22,7 @@ type PoolClient struct { type SelectionHandler func(map[string]*PoolClient, string) string -func selectWithRate(pool map[string]*PoolClient, +func SelectWithRate(pool map[string]*PoolClient, last string) (addr string) { total := 0 for _, item := range pool { @@ -36,7 +34,7 @@ func selectWithRate(pool map[string]*PoolClient, return last } -func selectRandom(pool map[string]*PoolClient, +func SelectRandom(pool map[string]*PoolClient, last string) (addr string) { r := rand.Intn(len(pool)) i := 0 From 99c803238419cf5d0bf62b7f2df9edaa81069a89 Mon Sep 17 00:00:00 2001 From: JessonChan Date: Wed, 20 Apr 2016 12:54:21 +0800 Subject: [PATCH 34/34] fix a bug when high qps --- worker/agent.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index 659960a..3367ad3 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -90,16 +90,22 @@ func (a *agent) work() { leftdata = data continue } - if inpack, l, err = decodeInPack(data); err != nil { - a.worker.err(err) - leftdata = data - continue - } - leftdata = nil - inpack.a = a - a.worker.in <- inpack - if len(data) > l { - leftdata = data[l:] + for { + if inpack, l, err = decodeInPack(data); err != nil { + a.worker.err(err) + leftdata = data + break + } else { + leftdata = nil + inpack.a = a + a.worker.in <- inpack + if len(data) == l { + break + } + if len(data) > l { + data = data[l:] + } + } } } }