From 9cc0c12d964c7bb0c0bb0b6ac62a402008319a14 Mon Sep 17 00:00:00 2001 From: Dmitry Krylov Date: Tue, 4 Jan 2022 15:23:39 +0300 Subject: [PATCH] Added graceful stop --- .gitignore | 4 ++++ worker/agent.go | 8 ++++++++ worker/worker.go | 24 ++++++++++++++++-------- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 0026861..3cefcde 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,7 @@ _cgo_export.* _testmain.go *.exe + +# Other Go stuff +go.mod +go.sum diff --git a/worker/agent.go b/worker/agent.go index 147e887..fc53399 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -55,6 +55,10 @@ func (a *agent) work() { var err error var data, leftdata []byte for { + if a.worker.stopped { + return + } + if data, err = a.read(); err != nil { if opErr, ok := err.(*net.OpError); ok { if opErr.Temporary() { @@ -91,6 +95,10 @@ func (a *agent) work() { continue } for { + if a.worker.stopped { + return + } + if inpack, l, err = decodeInPack(data); err != nil { a.worker.err(err) leftdata = data diff --git a/worker/worker.go b/worker/worker.go index 53c4869..be4a657 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -22,7 +22,9 @@ type Worker struct { funcs jobFuncs in chan *inPack running bool + stopped bool ready bool + active sync.WaitGroup Id string ErrorHandler ErrorHandler @@ -39,9 +41,10 @@ type Worker struct { // OneByOne(=1), there will be only one job executed in a time. func New(limit int) (worker *Worker) { worker = &Worker{ - agents: make([]*agent, 0, limit), - funcs: make(jobFuncs), - in: make(chan *inPack, queueSize), + agents: make([]*agent, 0, limit), + funcs: make(jobFuncs), + in: make(chan *inPack, queueSize), + stopped: false, } if limit != Unlimited { worker.limit = make(chan bool, limit-1) @@ -221,11 +224,11 @@ func (worker *Worker) customeHandler(inpack *inPack) { // Stop serving func (worker *Worker) Stop() { - worker.Lock() - defer worker.Unlock() - if worker.running == true { - close(worker.in) - } + // Set stopped flag + worker.stopped = true + + // Wait for all the running activities has stopped + worker.active.Wait() } // Close connection and exit main loop @@ -270,6 +273,8 @@ func (worker *Worker) SetId(id string) { // inner job executing func (worker *Worker) exec(inpack *inPack) (err error) { defer func() { + worker.active.Done() + if worker.limit != nil { <-worker.limit } @@ -285,6 +290,9 @@ func (worker *Worker) exec(inpack *inPack) (err error) { if !ok { return fmt.Errorf("The function does not exist: %s", inpack.fn) } + + worker.active.Add(1) + var r *result if f.timeout == 0 { d, e := f.f(inpack)