From 475f2aa0d05804a3040a867c43cb948fd761a82d Mon Sep 17 00:00:00 2001 From: sadlil Date: Wed, 21 Sep 2016 12:05:34 +0600 Subject: [PATCH] added counting running jobs, disable/enable worker to recive jobs --- .gitignore | 1 + worker/agent.go | 86 ++++++++++++++-------------- worker/common.go | 2 +- worker/worker.go | 93 ++++++++++++++++++++++++------ worker/worker_disconnect_test.go | 2 +- worker/worker_test.go | 97 ++++++++++++++++++++++++++++++++ 6 files changed, 221 insertions(+), 60 deletions(-) diff --git a/.gitignore b/.gitignore index 0026861..fa9360c 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ _cgo_export.* _testmain.go *.exe +.idea/ diff --git a/worker/agent.go b/worker/agent.go index 3367ad3..95dd17d 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -55,55 +55,57 @@ func (a *agent) work() { var err error var data, leftdata []byte for { - if data, err = a.read(); err != nil { - if opErr, ok := err.(*net.OpError); ok { - if opErr.Temporary() { - continue - } else { + if !a.worker.IsDisabled() { + if data, err = a.read(); err != nil { + if opErr, ok := err.(*net.OpError); ok { + if opErr.Temporary() { + continue + } else { + a.disconnect_error(err) + // else - we're probably dc'ing due to a Close() + + break + } + + } else if err == io.EOF { a.disconnect_error(err) - // else - we're probably dc'ing due to a Close() - break } - - } else if err == io.EOF { - a.disconnect_error(err) - break - } - a.worker.err(err) - // If it is unexpected error and the connection wasn't - // closed by Gearmand, the agent should close the conection - // and reconnect to job server. - a.Close() - a.conn, err = net.Dial(a.net, a.addr) - if err != nil { a.worker.err(err) - break + // If it is unexpected error and the connection wasn't + // closed by Gearmand, the agent should close the conection + // and reconnect to job server. + a.Close() + a.conn, err = net.Dial(a.net, a.addr) + if err != nil { + a.worker.err(err) + break + } + a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn), + bufio.NewWriter(a.conn)) } - a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn), - bufio.NewWriter(a.conn)) - } - if len(leftdata) > 0 { // some data left for processing - data = append(leftdata, data...) - } - if len(data) < minPacketLength { // not enough data - leftdata = data - continue - } - for { - if inpack, l, err = decodeInPack(data); err != nil { - a.worker.err(err) + if len(leftdata) > 0 { // some data left for processing + data = append(leftdata, data...) + } + if len(data) < minPacketLength { // not enough data leftdata = data - break - } else { - leftdata = nil - inpack.a = a - a.worker.in <- inpack - if len(data) == l { + continue + } + for { + if inpack, l, err = decodeInPack(data); err != nil { + a.worker.err(err) + leftdata = data break - } - if len(data) > l { - data = data[l:] + } else { + leftdata = nil + inpack.a = a + a.worker.in <- inpack + if len(data) == l { + break + } + if len(data) > l { + data = data[l:] + } } } } diff --git a/worker/common.go b/worker/common.go index 8d8ebc4..ceda945 100644 --- a/worker/common.go +++ b/worker/common.go @@ -3,7 +3,7 @@ package worker const ( Network = "tcp" // queue size - queueSize = 8 + queueSize int = 8 // read buffer size bufferSize = 1024 // min packet length diff --git a/worker/worker.go b/worker/worker.go index 25d6a60..d07bfba 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -18,16 +18,20 @@ const ( // It can connect to multi-server and grab jobs. type Worker struct { sync.Mutex - agents []*agent - funcs jobFuncs - in chan *inPack - running bool - ready bool + agents []*agent + funcs jobFuncs + in chan *inPack + running bool + ready bool + disabled bool + once sync.Once Id string ErrorHandler ErrorHandler JobHandler JobHandler limit chan bool + + runningJobs int } // Return a worker. @@ -35,13 +39,14 @@ type Worker struct { // If limit is set to Unlimited(=0), the worker will grab all jobs // and execute them parallelly. // If limit is greater than zero, the number of paralled executing -// jobs are limited under the number. If limit is assgined to +// jobs are limited under the number. If limit is assigned to // OneByOne(=1), there will be only one job executed in a time. func New(limit int) (worker *Worker) { worker = &Worker{ - agents: make([]*agent, 0, limit), - funcs: make(jobFuncs), - in: make(chan *inPack, queueSize), + agents: make([]*agent, 0, limit), + funcs: make(jobFuncs), + in: make(chan *inPack, queueSize), + runningJobs: 0, } if limit != Unlimited { worker.limit = make(chan bool, limit-1) @@ -58,7 +63,7 @@ func (worker *Worker) err(e error) { // Add a Gearman job server. // -// addr should be formated as 'host:port'. +// addr should be formatted as 'host:port'. func (worker *Worker) AddServer(net, addr string) (err error) { // Create a new job server's client as a agent of server a, err := newAgent(net, addr, worker) @@ -159,7 +164,7 @@ func (worker *Worker) handleInPack(inpack *inPack) { case dtEchoRes: fallthrough default: - worker.customeHandler(inpack) + worker.customHandler(inpack) } } @@ -177,9 +182,13 @@ func (worker *Worker) Ready() (err error) { return } } - for funcname, f := range worker.funcs { - worker.addFunc(funcname, f.timeout) - } + + // `once` protects registering worker functions multiple times. + worker.once.Do(func() { + for funcname, f := range worker.funcs { + worker.addFunc(funcname, f.timeout) + } + }) worker.ready = true return } @@ -205,8 +214,8 @@ func (worker *Worker) Work() { } } -// custome handling warper -func (worker *Worker) customeHandler(inpack *inPack) { +// custom handling wrapper +func (worker *Worker) customHandler(inpack *inPack) { if worker.JobHandler != nil { if err := worker.JobHandler(inpack); err != nil { worker.err(err) @@ -227,6 +236,19 @@ func (worker *Worker) Close() { } } +func (worker *Worker) Reconnect() error { + worker.Lock() + defer worker.Unlock() + if worker.running == true { + for _, a := range worker.agents { + if err := a.reconnect(); err != nil { + return err + } + } + } + return nil +} + // Echo func (worker *Worker) Echo(data []byte) { outpack := getOutPack() @@ -266,11 +288,19 @@ func (worker *Worker) exec(inpack *inPack) (err error) { err = ErrUnknown } } + if worker.runningJobs > 0 { + worker.Lock() + worker.runningJobs-- + worker.Unlock() + } }() f, ok := worker.funcs[inpack.fn] if !ok { return fmt.Errorf("The function does not exist: %s", inpack.fn) } + worker.Lock() + worker.runningJobs++ + worker.Unlock() var r *result if f.timeout == 0 { d, e := f.f(inpack) @@ -306,6 +336,37 @@ func (worker *Worker) reRegisterFuncsForAgent(a *agent) { } +// Counts running jobs +func (worker *Worker) Count() int { + worker.Lock() + defer worker.Unlock() + + return worker.runningJobs +} + +// Stops accepting new jobs +func (worker *Worker) Disable() { + worker.Lock() + defer worker.Unlock() + + worker.disabled = true +} + +// Renewable disabled workers +func (worker *Worker) Enable() { + worker.Lock() + defer worker.Unlock() + + worker.disabled = false +} + +func (worker *Worker) IsDisabled() bool { + worker.Lock() + defer worker.Unlock() + + return worker.disabled +} + // inner result type result struct { data []byte diff --git a/worker/worker_disconnect_test.go b/worker/worker_disconnect_test.go index d0f6cc2..20302de 100644 --- a/worker/worker_disconnect_test.go +++ b/worker/worker_disconnect_test.go @@ -15,7 +15,7 @@ var gearman_ready chan bool var kill_gearman chan bool var bye chan bool -func init() { +func init2() { if check_gearman_present() { panic(`Something already listening on our testing port. Chickening out of testing with it!`) diff --git a/worker/worker_test.go b/worker/worker_test.go index 1029b74..2aab9e7 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -5,6 +5,8 @@ import ( "sync" "testing" "time" + "github.com/mikespook/gearman-go/client" + "fmt" ) var worker *Worker @@ -223,3 +225,98 @@ func TestWorkWithoutReadyWithPanic(t *testing.T) { } } + +func TestDisableWorkersAndCountRunningJobs(t *testing.T) { + worker := New(Unlimited) + defer worker.Close() + + if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil { + t.Error(err) + } + worker.Ready() + + var wg sync.WaitGroup + handler := func(job Job) ([]byte, error) { + fmt.Println("running job") + time.Sleep(time.Second*20) + fmt.Println("done") + wg.Done() + return nil, nil + } + + if err := worker.AddFunc("handler", handler, 0); err != nil { + wg.Done() + t.Error(err) + } + //worker.JobHandler = handler + + worker.ErrorHandler = func(err error) { + t.Fatal("shouldn't have received an error") + } + + if err := worker.Ready(); err != nil { + t.Error(err) + return + } + go worker.Work() + + var cli *client.Client + var err error + if cli, err = client.New(client.Network, "127.0.0.1:4730"); err != nil { + t.Fatal(err) + } + cli.ErrorHandler = func(e error) { + t.Error(e) + } + + worker.Disable() + if worker.IsDisabled() { + wg.Add(1) + _, err = cli.Do("handler", bytes.Repeat([]byte("a"), 50), client.JobHigh, func(res *client.Response) { + }) + if err != nil { + t.Error(err) + } + + wg.Add(1) + _, err = cli.Do("handler", bytes.Repeat([]byte("a"), 50), client.JobHigh, func(res *client.Response) { + }) + if err != nil { + t.Error(err) + } + + go func () { + for { + time.Sleep(time.Second*10) + if worker.Count() > 0 { + fmt.Println("worker enabled", worker.Count()) + break; + } else { + fmt.Println("worker do not have any jobs") + } + } + }() + + time.Sleep(time.Second*50) + wg.Add(1) + _, err = cli.Do("handler", bytes.Repeat([]byte("a"), 50), client.JobHigh, func(res *client.Response) { + }) + if err != nil { + t.Error(err) + } + + worker.Enable() + if !worker.IsDisabled() { + fmt.Println("worker is enabled") + time.Sleep(time.Second) + for i := 1; i < 10; i++ { + fmt.Println("Running Job", worker.Count()) + } + } else { + t.Fatal("worker should enabled now") + } + wg.Wait() + } else { + t.Fatal("worker should disabled") + } +}