This commit is contained in:
Dmitry Krylov 2022-09-15 12:06:41 +02:00 committed by GitHub
commit 481c90625c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 252 additions and 3 deletions

4
.gitignore vendored
View File

@ -20,3 +20,7 @@ _cgo_export.*
_testmain.go
*.exe
# Other Go stuff
go.mod
go.sum

219
admin/admin.go Normal file
View File

@ -0,0 +1,219 @@
/* Borrowed from: github.com/draxil/gearman_admin */
package admin
import (
"bufio"
"fmt"
"io"
"net"
"strings"
"strconv"
"regexp"
)
// Connection to a gearman server
type Connection struct {
net.Conn
}
var colrx *regexp.Regexp
func init(){
colrx = regexp.MustCompile("[ \t]+")
}
// Connect to a gearman server
// gearadmin, err := gearman_admin.Connect("tcp", "gearman:4730")
func Connect(network, address string) (connection *Connection, err error) {
connection = &Connection{}
connection.Conn, err = net.Dial(network, address)
if err != nil {
err = fmt.Errorf("Error connecting to gearman server: %s", err.Error())
return
}
return
}
// Query connected workers list
func (c *Connection) Workers() (workers []Worker, err error) {
_, err = c.Write([]byte("workers\n"))
if err != nil {
return nil, err
}
var lines []string
lines, err = read_response(c)
if err != nil {
return nil, err
}
workers, err = workers_from_lines(lines)
return workers, err
}
// Query known tasks and their statuses
func (c *Connection) Status() (statuses []FunctionStatus, err error) {
_, err = c.Write([]byte("status\n"))
if err != nil {
err = fmt.Errorf("Error requesting function status list: %s", err.Error())
return
}
var lines []string
lines, err = read_response(c)
if err != nil {
err = fmt.Errorf("Error getting function status list: %s", err.Error())
}
statuses, err = func_statuses_from_lines(lines)
if err != nil {
err = fmt.Errorf("Error getting function status list: %s", err.Error())
statuses = []FunctionStatus{}
}
return
}
func process_line( line string ) []string{
parts := colrx.Split(line, -1)
return parts
}
func func_statuses_from_lines(lines []string) (funcs []FunctionStatus, err error){
funcs = make([]FunctionStatus, 0, len(lines))
for _, line := range lines {
parts := process_line(line)
if len(parts) < 3 {
err = ProtocolError("Incomplete status entry only " + strconv.Itoa(len(parts)) + " fields found: " + line)
return
}
var fs FunctionStatus
fs.Name = parts[0]
var unfinished, running, workers int
unfinished, err = strconv.Atoi(parts[1])
if err != nil {
err = ProtocolError("bad unfinished count format: " + err.Error())
return
}
running, err = strconv.Atoi(parts[2])
if err != nil {
err = ProtocolError("bad running count format: " + err.Error())
return
}
workers, err = strconv.Atoi(parts[3])
if err != nil {
err = ProtocolError("bad worker count format: " + err.Error())
return
}
fs.UnfinishedJobs = unfinished
fs.RunningJobs = running
fs.Workers = workers
funcs = append(funcs, fs)
}
return
}
func workers_from_lines(lines []string) (workers []Worker, err error) {
workers = make([]Worker, 0, len(lines))
for _, line := range lines {
parts := process_line(line)
if len(parts) < 4 {
err = ProtocolError("Incomplete worker entry")
return
}
if parts[3] != `:` {
err = ProtocolError("Malformed worker entry '" + parts[3] + "'")
return
}
worker := Worker {
Fd : parts[0],
Addr : parts[1],
ClientId : parts[2],
}
if len(parts) > 4 {
worker.Functions = parts[4:]
}
workers = append(workers, worker)
}
return
}
// Decoded description of a gearman worker
type Worker struct {
Fd string // File descriptor
Addr string // IP address
ClientId string // Client ID
Functions []string // List of functions
}
// Check a worker for a particular function
func (w *Worker) HasFunction(funcname string) bool {
for _, v := range w.Functions {
if v == funcname {
return true
}
}
return false
}
func read_response(r io.Reader) (lines []string, err error) {
rdr := bufio.NewReader(r)
lines = make([]string, 0, 0)
for {
line := ""
if line, err = rdr.ReadString('\n'); err != nil {
return nil, err
} else if line == ".\n" {
return lines, nil
} else {
lines = append(lines, strings.TrimRight(line, "\n"))
}
}
return lines, nil
}
// Decoded description of a functions current status
type FunctionStatus struct {
Name string // Function name
UnfinishedJobs int // Number of unfinished jobs
RunningJobs int // Number of running jobs
Workers int // Number of workers available
}
// Protocol error
type ProtocolError string
func (p ProtocolError) Error() string {
return "ProtoFail: " + string(p)
}

View File

@ -55,6 +55,10 @@ func (a *agent) work() {
var err error
var data, leftdata []byte
for {
if a.worker.stopped {
return
}
if data, err = a.read(); err != nil {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Temporary() {

View File

@ -22,7 +22,9 @@ type Worker struct {
funcs jobFuncs
in chan *inPack
running bool
stopped bool
ready bool
active sync.WaitGroup
Id string
ErrorHandler ErrorHandler
@ -39,9 +41,10 @@ type Worker struct {
// OneByOne(=1), there will be only one job executed in a time.
func New(limit int) (worker *Worker) {
worker = &Worker{
agents: make([]*agent, 0, limit),
funcs: make(jobFuncs),
in: make(chan *inPack, queueSize),
agents: make([]*agent, 0, limit),
funcs: make(jobFuncs),
in: make(chan *inPack, queueSize),
stopped: false,
}
if limit != Unlimited {
worker.limit = make(chan bool, limit-1)
@ -219,6 +222,20 @@ func (worker *Worker) customeHandler(inpack *inPack) {
}
}
// Stop serving
func (worker *Worker) Stop() {
// Set stopped flag
worker.stopped = true
}
// Wait for completeness serving
func (worker *Worker) WaitRunning() {
// Wait for all the running activities has stopped
if worker.stopped {
worker.active.Wait()
}
}
// Close connection and exit main loop
func (worker *Worker) Close() {
worker.Lock()
@ -261,6 +278,8 @@ func (worker *Worker) SetId(id string) {
// inner job executing
func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() {
worker.active.Done()
if worker.limit != nil {
<-worker.limit
}
@ -276,6 +295,9 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
if !ok {
return fmt.Errorf("The function does not exist: %s", inpack.fn)
}
worker.active.Add(1)
var r *result
if f.timeout == 0 {
d, e := f.f(inpack)