From 2263172a8459e07523db95ce186b5aba7a179fa6 Mon Sep 17 00:00:00 2001 From: Dmitry Krylov Date: Mon, 3 Jan 2022 21:25:22 +0300 Subject: [PATCH 1/7] Added graceful stop for worker --- worker/worker.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/worker/worker.go b/worker/worker.go index 7ca1389..53c4869 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -219,6 +219,15 @@ func (worker *Worker) customeHandler(inpack *inPack) { } } +// Stop serving +func (worker *Worker) Stop() { + worker.Lock() + defer worker.Unlock() + if worker.running == true { + close(worker.in) + } +} + // Close connection and exit main loop func (worker *Worker) Close() { worker.Lock() From 66e96d57ac77469b2fe0ad4604b23b2386093f15 Mon Sep 17 00:00:00 2001 From: Dmitry Krylov Date: Mon, 3 Jan 2022 21:36:25 +0300 Subject: [PATCH 2/7] Update worker.go --- worker/worker.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/worker/worker.go b/worker/worker.go index 7ca1389..2214d54 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -219,6 +219,15 @@ func (worker *Worker) customeHandler(inpack *inPack) { } } +// Graceful shutdown for worker +func (worker *Worker) Stop() { + worker.Lock() + defer worker.Unlock() + if worker.running == true { + close(worker.in) + } +} + // Close connection and exit main loop func (worker *Worker) Close() { worker.Lock() From 9cc0c12d964c7bb0c0bb0b6ac62a402008319a14 Mon Sep 17 00:00:00 2001 From: Dmitry Krylov Date: Tue, 4 Jan 2022 15:23:39 +0300 Subject: [PATCH 3/7] Added graceful stop --- .gitignore | 4 ++++ worker/agent.go | 8 ++++++++ worker/worker.go | 24 ++++++++++++++++-------- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 0026861..3cefcde 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,7 @@ _cgo_export.* _testmain.go *.exe + +# Other Go stuff +go.mod +go.sum diff --git a/worker/agent.go b/worker/agent.go index 147e887..fc53399 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -55,6 +55,10 @@ func (a *agent) work() { var err error var data, leftdata []byte for { + if a.worker.stopped { + return + } + if data, err = a.read(); err != nil { if opErr, ok := err.(*net.OpError); ok { if opErr.Temporary() { @@ -91,6 +95,10 @@ func (a *agent) work() { continue } for { + if a.worker.stopped { + return + } + if inpack, l, err = decodeInPack(data); err != nil { a.worker.err(err) leftdata = data diff --git a/worker/worker.go b/worker/worker.go index 53c4869..be4a657 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -22,7 +22,9 @@ type Worker struct { funcs jobFuncs in chan *inPack running bool + stopped bool ready bool + active sync.WaitGroup Id string ErrorHandler ErrorHandler @@ -39,9 +41,10 @@ type Worker struct { // OneByOne(=1), there will be only one job executed in a time. func New(limit int) (worker *Worker) { worker = &Worker{ - agents: make([]*agent, 0, limit), - funcs: make(jobFuncs), - in: make(chan *inPack, queueSize), + agents: make([]*agent, 0, limit), + funcs: make(jobFuncs), + in: make(chan *inPack, queueSize), + stopped: false, } if limit != Unlimited { worker.limit = make(chan bool, limit-1) @@ -221,11 +224,11 @@ func (worker *Worker) customeHandler(inpack *inPack) { // Stop serving func (worker *Worker) Stop() { - worker.Lock() - defer worker.Unlock() - if worker.running == true { - close(worker.in) - } + // Set stopped flag + worker.stopped = true + + // Wait for all the running activities has stopped + worker.active.Wait() } // Close connection and exit main loop @@ -270,6 +273,8 @@ func (worker *Worker) SetId(id string) { // inner job executing func (worker *Worker) exec(inpack *inPack) (err error) { defer func() { + worker.active.Done() + if worker.limit != nil { <-worker.limit } @@ -285,6 +290,9 @@ func (worker *Worker) exec(inpack *inPack) (err error) { if !ok { return fmt.Errorf("The function does not exist: %s", inpack.fn) } + + worker.active.Add(1) + var r *result if f.timeout == 0 { d, e := f.f(inpack) From a20b263b9ebe8fe8782979d933090cc946aa0b34 Mon Sep 17 00:00:00 2001 From: Dmitry Krylov Date: Tue, 4 Jan 2022 15:44:33 +0300 Subject: [PATCH 4/7] Added Stop/WaitRunning methods --- worker/worker.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/worker/worker.go b/worker/worker.go index be4a657..f00fb9b 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -226,7 +226,10 @@ func (worker *Worker) customeHandler(inpack *inPack) { func (worker *Worker) Stop() { // Set stopped flag worker.stopped = true +} +// Wait for completeness serving +func (worker *Worker) WaitRunning() { // Wait for all the running activities has stopped worker.active.Wait() } From 3b13a9f51d62b41d68c9cde5e4df910d696aaedb Mon Sep 17 00:00:00 2001 From: Dmitry Krylov Date: Tue, 4 Jan 2022 15:46:55 +0300 Subject: [PATCH 5/7] Fixed --- worker/worker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/worker/worker.go b/worker/worker.go index f00fb9b..9964eb4 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -231,7 +231,9 @@ func (worker *Worker) Stop() { // Wait for completeness serving func (worker *Worker) WaitRunning() { // Wait for all the running activities has stopped - worker.active.Wait() + if worker.stopped { + worker.active.Wait() + } } // Close connection and exit main loop From 68ccdfd4089291057bbb6606e1df7bfee01ddd99 Mon Sep 17 00:00:00 2001 From: Dmitry Krylov Date: Tue, 4 Jan 2022 17:58:34 +0300 Subject: [PATCH 6/7] Removed extra check --- worker/agent.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/worker/agent.go b/worker/agent.go index fc53399..fddaccf 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -95,10 +95,6 @@ func (a *agent) work() { continue } for { - if a.worker.stopped { - return - } - if inpack, l, err = decodeInPack(data); err != nil { a.worker.err(err) leftdata = data From 76b2395d70ab7191341c6f1613269a257d039671 Mon Sep 17 00:00:00 2001 From: Dmitry Krylov Date: Thu, 6 Jan 2022 12:21:03 +0300 Subject: [PATCH 7/7] Added admin module --- admin/admin.go | 219 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100644 admin/admin.go diff --git a/admin/admin.go b/admin/admin.go new file mode 100644 index 0000000..0f9644a --- /dev/null +++ b/admin/admin.go @@ -0,0 +1,219 @@ +/* Borrowed from: github.com/draxil/gearman_admin */ +package admin + +import ( + "bufio" + "fmt" + "io" + "net" + "strings" + "strconv" + "regexp" +) + +// Connection to a gearman server +type Connection struct { + net.Conn +} + +var colrx *regexp.Regexp + +func init(){ + colrx = regexp.MustCompile("[ \t]+") +} + +// Connect to a gearman server +// gearadmin, err := gearman_admin.Connect("tcp", "gearman:4730") +func Connect(network, address string) (connection *Connection, err error) { + connection = &Connection{} + connection.Conn, err = net.Dial(network, address) + + if err != nil { + err = fmt.Errorf("Error connecting to gearman server: %s", err.Error()) + return + } + + return +} + +// Query connected workers list +func (c *Connection) Workers() (workers []Worker, err error) { + _, err = c.Write([]byte("workers\n")) + if err != nil { + return nil, err + } + + var lines []string + lines, err = read_response(c) + if err != nil { + return nil, err + } + + workers, err = workers_from_lines(lines) + + return workers, err +} + +// Query known tasks and their statuses +func (c *Connection) Status() (statuses []FunctionStatus, err error) { + _, err = c.Write([]byte("status\n")) + if err != nil { + err = fmt.Errorf("Error requesting function status list: %s", err.Error()) + return + } + + var lines []string + lines, err = read_response(c) + + if err != nil { + err = fmt.Errorf("Error getting function status list: %s", err.Error()) + } + + statuses, err = func_statuses_from_lines(lines) + + if err != nil { + err = fmt.Errorf("Error getting function status list: %s", err.Error()) + statuses = []FunctionStatus{} + } + + return +} + +func process_line( line string ) []string{ + parts := colrx.Split(line, -1) + + return parts +} + +func func_statuses_from_lines(lines []string) (funcs []FunctionStatus, err error){ + funcs = make([]FunctionStatus, 0, len(lines)) + + for _, line := range lines { + parts := process_line(line) + + if len(parts) < 3 { + err = ProtocolError("Incomplete status entry only " + strconv.Itoa(len(parts)) + " fields found: " + line) + return + } + + var fs FunctionStatus + + fs.Name = parts[0] + + var unfinished, running, workers int + + unfinished, err = strconv.Atoi(parts[1]) + if err != nil { + err = ProtocolError("bad unfinished count format: " + err.Error()) + return + } + + running, err = strconv.Atoi(parts[2]) + + if err != nil { + err = ProtocolError("bad running count format: " + err.Error()) + return + } + + workers, err = strconv.Atoi(parts[3]) + if err != nil { + err = ProtocolError("bad worker count format: " + err.Error()) + return + } + + fs.UnfinishedJobs = unfinished + fs.RunningJobs = running + fs.Workers = workers + + funcs = append(funcs, fs) + } + + return + +} + +func workers_from_lines(lines []string) (workers []Worker, err error) { + workers = make([]Worker, 0, len(lines)) + + for _, line := range lines { + parts := process_line(line) + + if len(parts) < 4 { + err = ProtocolError("Incomplete worker entry") + return + } + + if parts[3] != `:` { + err = ProtocolError("Malformed worker entry '" + parts[3] + "'") + return + } + + worker := Worker { + Fd : parts[0], + Addr : parts[1], + ClientId : parts[2], + } + + if len(parts) > 4 { + worker.Functions = parts[4:] + } + + workers = append(workers, worker) + } + + return +} + + +// Decoded description of a gearman worker +type Worker struct { + Fd string // File descriptor + Addr string // IP address + ClientId string // Client ID + Functions []string // List of functions +} + +// Check a worker for a particular function +func (w *Worker) HasFunction(funcname string) bool { + for _, v := range w.Functions { + if v == funcname { + return true + } + } + return false +} + +func read_response(r io.Reader) (lines []string, err error) { + rdr := bufio.NewReader(r) + lines = make([]string, 0, 0) + for { + line := "" + if line, err = rdr.ReadString('\n'); err != nil { + return nil, err + + } else if line == ".\n" { + return lines, nil + + } else { + lines = append(lines, strings.TrimRight(line, "\n")) + + } + } + + return lines, nil +} + +// Decoded description of a functions current status +type FunctionStatus struct { + Name string // Function name + UnfinishedJobs int // Number of unfinished jobs + RunningJobs int // Number of running jobs + Workers int // Number of workers available +} + +// Protocol error +type ProtocolError string + +func (p ProtocolError) Error() string { + return "ProtoFail: " + string(p) +}