From 27942f55cd2c7813cf8a9ba72e0d526fe9706dcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoffer=20Fjellstr=C3=B6m?= Date: Fri, 3 Feb 2017 10:06:27 +0100 Subject: [PATCH 1/6] Add sync lock on do() to avoid race conditions when creating jobs --- client/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/client.go b/client/client.go index 368363c..b243802 100644 --- a/client/client.go +++ b/client/client.go @@ -220,6 +220,8 @@ func (client *Client) do(funcname string, data []byte, if client.conn == nil { return "", ErrLostConn } + client.Lock() + defer client.Unlock() var result = make(chan handleOrError, 1) client.lastcall = "c" client.innerHandler.put("c", func(resp *Response) { From 2dbf199260063e9ec79a9cda6cbba477d6d2f71f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoffer=20Fjellstr=C3=B6m?= Date: Wed, 27 Sep 2017 11:57:57 +0200 Subject: [PATCH 2/6] Add lock on job handler assignment Fixes race condition on jobs being done before handler is set. --- client/client.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index b243802..ca52b13 100644 --- a/client/client.go +++ b/client/client.go @@ -31,7 +31,7 @@ type Client struct { } type responseHandlerMap struct { - sync.RWMutex + sync.Mutex holder map[string]ResponseHandler } @@ -46,17 +46,22 @@ func (r *responseHandlerMap) remove(key string) { } func (r *responseHandlerMap) get(key string) (ResponseHandler, bool) { - r.RLock() + r.Lock() rh, b := r.holder[key] - r.RUnlock() + r.Unlock() return rh, b } + func (r *responseHandlerMap) put(key string, rh ResponseHandler) { r.Lock() r.holder[key] = rh r.Unlock() } +func (r *responseHandlerMap) putNoLock(key string, rh ResponseHandler) { + r.holder[key] = rh +} + // Return a client. func New(network, addr string) (client *Client, err error) { client = &Client{ @@ -266,9 +271,12 @@ func (client *Client) Do(funcname string, data []byte, default: datatype = dtSubmitJob } + + client.respHandler.Lock() + defer client.respHandler.Unlock() handle, err = client.do(funcname, data, datatype) if err == nil && h != nil { - client.respHandler.put(handle, h) + client.respHandler.putNoLock(handle, h) } return } From 99d317427f30e380299beb65c3f17da40bf37f39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoffer=20Fjellstr=C3=B6m?= Date: Wed, 27 Sep 2017 16:42:21 +0200 Subject: [PATCH 3/6] Add skipping for integration tests when running unit tests Run integration tests with the -integration flag --- client/client_test.go | 34 ++++++++++++++++++++++++++++++- client/pool_test.go | 18 +++++++++++++++++ worker/example_test.go | 1 - worker/worker_test.go | 46 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 96 insertions(+), 3 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index ca86868..025f30b 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1,6 +1,8 @@ package client import ( + "flag" + "os" "testing" ) @@ -8,9 +10,24 @@ const ( TestStr = "Hello world" ) -var client *Client +var ( + client *Client + runIntegrationTests bool +) + +func TestMain(m *testing.M) { + integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)") + if integrationsTestFlag != nil { + runIntegrationTests = *integrationsTestFlag + } + code := m.Run() + os.Exit(code) +} func TestClientAddServer(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } t.Log("Add local server 127.0.0.1:4730") var err error if client, err = New(Network, "127.0.0.1:4730"); err != nil { @@ -22,6 +39,9 @@ func TestClientAddServer(t *testing.T) { } func TestClientEcho(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } echo, err := client.Echo([]byte(TestStr)) if err != nil { t.Error(err) @@ -34,6 +54,9 @@ func TestClientEcho(t *testing.T) { } func TestClientDoBg(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } handle, err := client.DoBg("ToUpper", []byte("abcdef"), JobLow) if err != nil { t.Error(err) @@ -47,6 +70,9 @@ func TestClientDoBg(t *testing.T) { } func TestClientDo(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } jobHandler := func(job *Response) { str := string(job.Data) if str == "ABCDEF" { @@ -70,6 +96,9 @@ func TestClientDo(t *testing.T) { } func TestClientStatus(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } status, err := client.Status("handle not exists") if err != nil { t.Error(err) @@ -105,6 +134,9 @@ func TestClientStatus(t *testing.T) { } func TestClientClose(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } if err := client.Close(); err != nil { t.Error(err) } diff --git a/client/pool_test.go b/client/pool_test.go index 54e7e03..4e1bc04 100644 --- a/client/pool_test.go +++ b/client/pool_test.go @@ -9,6 +9,9 @@ var ( ) func TestPoolAdd(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } t.Log("Add servers") c := 2 if err := pool.Add("tcp4", "127.0.0.1:4730", 1); err != nil { @@ -24,6 +27,9 @@ func TestPoolAdd(t *testing.T) { } func TestPoolEcho(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } echo, err := pool.Echo("", []byte(TestStr)) if err != nil { t.Error(err) @@ -41,6 +47,9 @@ func TestPoolEcho(t *testing.T) { } func TestPoolDoBg(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } addr, handle, err := pool.DoBg("ToUpper", []byte("abcdef"), JobLow) if err != nil { @@ -55,6 +64,9 @@ func TestPoolDoBg(t *testing.T) { } func TestPoolDo(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } jobHandler := func(job *Response) { str := string(job.Data) if str == "ABCDEF" { @@ -77,6 +89,9 @@ func TestPoolDo(t *testing.T) { } func TestPoolStatus(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } status, err := pool.Status("127.0.0.1:4730", "handle not exists") if err != nil { t.Error(err) @@ -114,6 +129,9 @@ func TestPoolStatus(t *testing.T) { } func TestPoolClose(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } return if err := pool.Close(); err != nil { t.Error(err) diff --git a/worker/example_test.go b/worker/example_test.go index 4fbaaed..477479f 100644 --- a/worker/example_test.go +++ b/worker/example_test.go @@ -53,5 +53,4 @@ func ExampleWorker() { w.Echo([]byte("Hello")) // Waiting results wg.Wait() - // Output: Hello } diff --git a/worker/worker_test.go b/worker/worker_test.go index 1029b74..792b2f0 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -2,18 +2,35 @@ package worker import ( "bytes" + "flag" + "os" "sync" "testing" "time" ) -var worker *Worker +var ( + worker *Worker + runIntegrationTests bool +) func init() { worker = New(Unlimited) } +func TestMain(m *testing.M) { + integrationsTestFlag := flag.Bool("integration", false, "Run the integration tests (in addition to the unit tests)") + if integrationsTestFlag != nil { + runIntegrationTests = *integrationsTestFlag + } + code := m.Run() + os.Exit(code) +} + func TestWorkerErrNoneAgents(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } err := worker.Ready() if err != ErrNoneAgents { t.Error("ErrNoneAgents expected.") @@ -21,6 +38,9 @@ func TestWorkerErrNoneAgents(t *testing.T) { } func TestWorkerAddServer(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } t.Log("Add local server 127.0.0.1:4730.") if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil { t.Error(err) @@ -33,6 +53,9 @@ func TestWorkerAddServer(t *testing.T) { } func TestWorkerErrNoneFuncs(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } err := worker.Ready() if err != ErrNoneFuncs { t.Error("ErrNoneFuncs expected.") @@ -44,6 +67,9 @@ func foobar(job Job) ([]byte, error) { } func TestWorkerAddFunction(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } if err := worker.AddFunc("foobar", foobar, 0); err != nil { t.Error(err) } @@ -57,12 +83,18 @@ func TestWorkerAddFunction(t *testing.T) { } func TestWorkerRemoveFunc(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } if err := worker.RemoveFunc("foobar"); err != nil { t.Error(err) } } func TestWork(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } var wg sync.WaitGroup worker.JobHandler = func(job Job) error { t.Logf("%s", job.Data()) @@ -80,6 +112,9 @@ func TestWork(t *testing.T) { } func TestLargeDataWork(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } worker := New(Unlimited) defer worker.Close() @@ -136,10 +171,16 @@ func TestLargeDataWork(t *testing.T) { } func TestWorkerClose(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } worker.Close() } func TestWorkWithoutReady(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } other_worker := New(Unlimited) if err := other_worker.AddServer(Network, "127.0.0.1:4730"); err != nil { @@ -193,6 +234,9 @@ func TestWorkWithoutReady(t *testing.T) { } func TestWorkWithoutReadyWithPanic(t *testing.T) { + if !runIntegrationTests { + t.Skip("To run this test, use: go test -integration") + } other_worker := New(Unlimited) timeout := make(chan bool, 1) From e701107e405994fb1f51094230c90ad283e9beda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoffer=20Fjellstr=C3=B6m?= Date: Wed, 27 Sep 2017 16:42:48 +0200 Subject: [PATCH 4/6] Remove broken test suite OS and configuration specific integration test --- worker/worker_disconnect_test.go | 243 ------------------------------- 1 file changed, 243 deletions(-) delete mode 100644 worker/worker_disconnect_test.go diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go deleted file mode 100644 index d0f6cc2..0000000 --- a/worker/worker_disconnect_test.go +++ /dev/null @@ -1,243 +0,0 @@ -package worker - -import ( - "../client" - "log" - "net" - "os/exec" - "testing" - "time" -) - -const port = `3700` - -var gearman_ready chan bool -var kill_gearman chan bool -var bye chan bool - -func init() { - - if check_gearman_present() { - panic(`Something already listening on our testing port. Chickening out of testing with it!`) - } - gearman_ready = make(chan bool) - kill_gearman = make(chan bool) - // TODO: verify port is clear - go run_gearman() -} - -func run_gearman() { - gm_cmd := exec.Command(`/usr/sbin/gearmand`, `--port`, port) - start_err := gm_cmd.Start() - - if start_err != nil { - panic(`could not start gearman, aborting test :` + start_err.Error()) - } - - // Make sure we clear up our gearman: - defer func() { - gm_cmd.Process.Kill() - }() - - for tries := 10; tries > 0; tries-- { - if check_gearman_present() { - break - } - time.Sleep(250 * time.Millisecond) - } - - if !check_gearman_present() { - panic(`Unable to start gearman aborting test`) - } - gearman_ready <- true - - <-kill_gearman -} - -func check_gearman_present() bool { - con, err := net.Dial(`tcp`, `127.0.0.1:`+port) - if err != nil { - return false - } - con.Close() - return true -} - -func check_gearman_is_dead() bool { - - for tries := 10; tries > 0; tries-- { - if !check_gearman_present() { - return true - } - time.Sleep(250 * time.Millisecond) - } - return false -} - -/* - Checks for a disconnect whilst not working -*/ -func TestBasicDisconnect(t *testing.T) { - <-gearman_ready - - worker := New(Unlimited) - timeout := make(chan bool, 1) - done := make(chan bool, 1) - - if err := worker.AddServer(Network, "127.0.0.1:"+port); err != nil { - t.Error(err) - } - work_done := false - if err := worker.AddFunc("gearman-go-workertest", - func(j Job) (b []byte, e error) { - work_done = true - done <- true - return - }, 0); err != nil { - t.Error(err) - } - - handled_errors := false - - c_error := make(chan bool) - was_dc_err := false - worker.ErrorHandler = func(e error) { - log.Println(e) - _, was_dc_err = e.(*WorkerDisconnectError) - handled_errors = true - c_error <- true - } - - go func() { - time.Sleep(5 * time.Second) - timeout <- true - }() - - err := worker.Ready() - - if err != nil { - t.Error(err) - } - - go worker.Work() - - kill_gearman <- true - - check_gearman_is_dead() - go run_gearman() - - select { - case <-gearman_ready: - case <-timeout: - } - - send_client_request() - - select { - case <-done: - t.Error("Client request handled (somehow), did we magically reconnect?") - case <-timeout: - t.Error("Test timed out waiting for the error handler") - case <-c_error: - // error was handled! - if !was_dc_err { - t.Error("Disconnect didn't manifest as a net.OpError?") - } - } - worker.Close() - kill_gearman <- true - -} - -func TestDcRc(t *testing.T) { - check_gearman_is_dead() - go run_gearman() - - <-gearman_ready - - worker := New(Unlimited) - timeout := make(chan bool, 1) - done := make(chan bool, 1) - - if err := worker.AddServer(Network, "127.0.0.1:"+port); err != nil { - t.Error(err) - } - work_done := false - if err := worker.AddFunc("gearman-go-workertest", - func(j Job) (b []byte, e error) { - log.Println("Actual work happens!") - work_done = true - done <- true - return - }, 0); err != nil { - t.Error(err) - } - - worker.ErrorHandler = func(e error) { - wdc, wdcok := e.(*WorkerDisconnectError) - - if wdcok { - log.Println("Reconnecting!") - reconnected := false - for tries := 20; !reconnected && tries > 0; tries-- { - rcerr := wdc.Reconnect() - if rcerr != nil { - time.Sleep(250 * time.Millisecond) - } else { - reconnected = true - } - } - - } else { - panic("Some other kind of error " + e.Error()) - } - - } - - go func() { - time.Sleep(5 * time.Second) - timeout <- true - }() - - err := worker.Ready() - - if err != nil { - t.Error(err) - } - - go worker.Work() - - kill_gearman <- true - - check_gearman_is_dead() - go run_gearman() - - select { - case <-gearman_ready: - case <-timeout: - } - - send_client_request() - - select { - case <-done: - case <-timeout: - t.Error("Test timed out") - } - worker.Close() - kill_gearman <- true - -} - -func send_client_request() { - c, err := client.New(Network, "127.0.0.1:"+port) - if err == nil { - _, err = c.DoBg("gearman-go-workertest", []byte{}, client.JobHigh) - if err != nil { - log.Println("error sending client request " + err.Error()) - } - - } else { - log.Println("error with client " + err.Error()) - } -} From a2eb7ba410caa4c1f59eda8afd6c1906cb2848e4 Mon Sep 17 00:00:00 2001 From: No-ops Date: Wed, 27 Sep 2017 16:57:03 +0200 Subject: [PATCH 5/6] Remove unused imports --- gearman.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/gearman.go b/gearman.go index 44262e4..74fa162 100644 --- a/gearman.go +++ b/gearman.go @@ -17,8 +17,3 @@ in an easy way. import "github.com/mikespook/gearman-go/worker" */ package gearman - -import ( - _ "github.com/mikespook/gearman-go/client" - _ "github.com/mikespook/gearman-go/worker" -) From 0ca6dc2c6fab9adc8f42c05bfe8dbf924ec8c523 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoffer=20Fjellstr=C3=B6m?= Date: Sat, 30 Sep 2017 09:30:29 +0200 Subject: [PATCH 6/6] Remove duplicate locking Fixes duplicated code after merge --- client/client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/client/client.go b/client/client.go index aeac986..bcf9722 100644 --- a/client/client.go +++ b/client/client.go @@ -225,8 +225,6 @@ func (client *Client) do(funcname string, data []byte, if client.conn == nil { return "", ErrLostConn } - client.Lock() - defer client.Unlock() var result = make(chan handleOrError, 1) client.Lock() defer client.Unlock()