diff --git a/.gitignore b/.gitignore index 0026861..3cefcde 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,7 @@ _cgo_export.* _testmain.go *.exe + +# Other Go stuff +go.mod +go.sum diff --git a/admin/admin.go b/admin/admin.go new file mode 100644 index 0000000..0f9644a --- /dev/null +++ b/admin/admin.go @@ -0,0 +1,219 @@ +/* Borrowed from: github.com/draxil/gearman_admin */ +package admin + +import ( + "bufio" + "fmt" + "io" + "net" + "strings" + "strconv" + "regexp" +) + +// Connection to a gearman server +type Connection struct { + net.Conn +} + +var colrx *regexp.Regexp + +func init(){ + colrx = regexp.MustCompile("[ \t]+") +} + +// Connect to a gearman server +// gearadmin, err := gearman_admin.Connect("tcp", "gearman:4730") +func Connect(network, address string) (connection *Connection, err error) { + connection = &Connection{} + connection.Conn, err = net.Dial(network, address) + + if err != nil { + err = fmt.Errorf("Error connecting to gearman server: %s", err.Error()) + return + } + + return +} + +// Query connected workers list +func (c *Connection) Workers() (workers []Worker, err error) { + _, err = c.Write([]byte("workers\n")) + if err != nil { + return nil, err + } + + var lines []string + lines, err = read_response(c) + if err != nil { + return nil, err + } + + workers, err = workers_from_lines(lines) + + return workers, err +} + +// Query known tasks and their statuses +func (c *Connection) Status() (statuses []FunctionStatus, err error) { + _, err = c.Write([]byte("status\n")) + if err != nil { + err = fmt.Errorf("Error requesting function status list: %s", err.Error()) + return + } + + var lines []string + lines, err = read_response(c) + + if err != nil { + err = fmt.Errorf("Error getting function status list: %s", err.Error()) + } + + statuses, err = func_statuses_from_lines(lines) + + if err != nil { + err = fmt.Errorf("Error getting function status list: %s", err.Error()) + statuses = []FunctionStatus{} + } + + return +} + +func process_line( line string ) []string{ + parts := colrx.Split(line, -1) + + return parts +} + +func func_statuses_from_lines(lines []string) (funcs []FunctionStatus, err error){ + funcs = make([]FunctionStatus, 0, len(lines)) + + for _, line := range lines { + parts := process_line(line) + + if len(parts) < 3 { + err = ProtocolError("Incomplete status entry only " + strconv.Itoa(len(parts)) + " fields found: " + line) + return + } + + var fs FunctionStatus + + fs.Name = parts[0] + + var unfinished, running, workers int + + unfinished, err = strconv.Atoi(parts[1]) + if err != nil { + err = ProtocolError("bad unfinished count format: " + err.Error()) + return + } + + running, err = strconv.Atoi(parts[2]) + + if err != nil { + err = ProtocolError("bad running count format: " + err.Error()) + return + } + + workers, err = strconv.Atoi(parts[3]) + if err != nil { + err = ProtocolError("bad worker count format: " + err.Error()) + return + } + + fs.UnfinishedJobs = unfinished + fs.RunningJobs = running + fs.Workers = workers + + funcs = append(funcs, fs) + } + + return + +} + +func workers_from_lines(lines []string) (workers []Worker, err error) { + workers = make([]Worker, 0, len(lines)) + + for _, line := range lines { + parts := process_line(line) + + if len(parts) < 4 { + err = ProtocolError("Incomplete worker entry") + return + } + + if parts[3] != `:` { + err = ProtocolError("Malformed worker entry '" + parts[3] + "'") + return + } + + worker := Worker { + Fd : parts[0], + Addr : parts[1], + ClientId : parts[2], + } + + if len(parts) > 4 { + worker.Functions = parts[4:] + } + + workers = append(workers, worker) + } + + return +} + + +// Decoded description of a gearman worker +type Worker struct { + Fd string // File descriptor + Addr string // IP address + ClientId string // Client ID + Functions []string // List of functions +} + +// Check a worker for a particular function +func (w *Worker) HasFunction(funcname string) bool { + for _, v := range w.Functions { + if v == funcname { + return true + } + } + return false +} + +func read_response(r io.Reader) (lines []string, err error) { + rdr := bufio.NewReader(r) + lines = make([]string, 0, 0) + for { + line := "" + if line, err = rdr.ReadString('\n'); err != nil { + return nil, err + + } else if line == ".\n" { + return lines, nil + + } else { + lines = append(lines, strings.TrimRight(line, "\n")) + + } + } + + return lines, nil +} + +// Decoded description of a functions current status +type FunctionStatus struct { + Name string // Function name + UnfinishedJobs int // Number of unfinished jobs + RunningJobs int // Number of running jobs + Workers int // Number of workers available +} + +// Protocol error +type ProtocolError string + +func (p ProtocolError) Error() string { + return "ProtoFail: " + string(p) +} diff --git a/worker/agent.go b/worker/agent.go index 147e887..fddaccf 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -55,6 +55,10 @@ func (a *agent) work() { var err error var data, leftdata []byte for { + if a.worker.stopped { + return + } + if data, err = a.read(); err != nil { if opErr, ok := err.(*net.OpError); ok { if opErr.Temporary() { diff --git a/worker/worker.go b/worker/worker.go index 7ca1389..9964eb4 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -22,7 +22,9 @@ type Worker struct { funcs jobFuncs in chan *inPack running bool + stopped bool ready bool + active sync.WaitGroup Id string ErrorHandler ErrorHandler @@ -39,9 +41,10 @@ type Worker struct { // OneByOne(=1), there will be only one job executed in a time. func New(limit int) (worker *Worker) { worker = &Worker{ - agents: make([]*agent, 0, limit), - funcs: make(jobFuncs), - in: make(chan *inPack, queueSize), + agents: make([]*agent, 0, limit), + funcs: make(jobFuncs), + in: make(chan *inPack, queueSize), + stopped: false, } if limit != Unlimited { worker.limit = make(chan bool, limit-1) @@ -219,6 +222,20 @@ func (worker *Worker) customeHandler(inpack *inPack) { } } +// Stop serving +func (worker *Worker) Stop() { + // Set stopped flag + worker.stopped = true +} + +// Wait for completeness serving +func (worker *Worker) WaitRunning() { + // Wait for all the running activities has stopped + if worker.stopped { + worker.active.Wait() + } +} + // Close connection and exit main loop func (worker *Worker) Close() { worker.Lock() @@ -261,6 +278,8 @@ func (worker *Worker) SetId(id string) { // inner job executing func (worker *Worker) exec(inpack *inPack) (err error) { defer func() { + worker.active.Done() + if worker.limit != nil { <-worker.limit } @@ -276,6 +295,9 @@ func (worker *Worker) exec(inpack *inPack) (err error) { if !ok { return fmt.Errorf("The function does not exist: %s", inpack.fn) } + + worker.active.Add(1) + var r *result if f.timeout == 0 { d, e := f.f(inpack)