added counting running jobs, disable/enable worker to recive jobs

This commit is contained in:
sadlil 2016-09-21 12:05:34 +06:00
parent b79fee2965
commit 475f2aa0d0
6 changed files with 221 additions and 60 deletions

1
.gitignore vendored
View File

@ -20,3 +20,4 @@ _cgo_export.*
_testmain.go _testmain.go
*.exe *.exe
.idea/

View File

@ -55,55 +55,57 @@ func (a *agent) work() {
var err error var err error
var data, leftdata []byte var data, leftdata []byte
for { for {
if data, err = a.read(); err != nil { if !a.worker.IsDisabled() {
if opErr, ok := err.(*net.OpError); ok { if data, err = a.read(); err != nil {
if opErr.Temporary() { if opErr, ok := err.(*net.OpError); ok {
continue if opErr.Temporary() {
} else { continue
} else {
a.disconnect_error(err)
// else - we're probably dc'ing due to a Close()
break
}
} else if err == io.EOF {
a.disconnect_error(err) a.disconnect_error(err)
// else - we're probably dc'ing due to a Close()
break break
} }
} else if err == io.EOF {
a.disconnect_error(err)
break
}
a.worker.err(err)
// If it is unexpected error and the connection wasn't
// closed by Gearmand, the agent should close the conection
// and reconnect to job server.
a.Close()
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
a.worker.err(err) a.worker.err(err)
break // If it is unexpected error and the connection wasn't
// closed by Gearmand, the agent should close the conection
// and reconnect to job server.
a.Close()
a.conn, err = net.Dial(a.net, a.addr)
if err != nil {
a.worker.err(err)
break
}
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
bufio.NewWriter(a.conn))
} }
a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn), if len(leftdata) > 0 { // some data left for processing
bufio.NewWriter(a.conn)) data = append(leftdata, data...)
} }
if len(leftdata) > 0 { // some data left for processing if len(data) < minPacketLength { // not enough data
data = append(leftdata, data...)
}
if len(data) < minPacketLength { // not enough data
leftdata = data
continue
}
for {
if inpack, l, err = decodeInPack(data); err != nil {
a.worker.err(err)
leftdata = data leftdata = data
break continue
} else { }
leftdata = nil for {
inpack.a = a if inpack, l, err = decodeInPack(data); err != nil {
a.worker.in <- inpack a.worker.err(err)
if len(data) == l { leftdata = data
break break
} } else {
if len(data) > l { leftdata = nil
data = data[l:] inpack.a = a
a.worker.in <- inpack
if len(data) == l {
break
}
if len(data) > l {
data = data[l:]
}
} }
} }
} }

View File

@ -3,7 +3,7 @@ package worker
const ( const (
Network = "tcp" Network = "tcp"
// queue size // queue size
queueSize = 8 queueSize int = 8
// read buffer size // read buffer size
bufferSize = 1024 bufferSize = 1024
// min packet length // min packet length

View File

@ -18,16 +18,20 @@ const (
// It can connect to multi-server and grab jobs. // It can connect to multi-server and grab jobs.
type Worker struct { type Worker struct {
sync.Mutex sync.Mutex
agents []*agent agents []*agent
funcs jobFuncs funcs jobFuncs
in chan *inPack in chan *inPack
running bool running bool
ready bool ready bool
disabled bool
once sync.Once
Id string Id string
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
JobHandler JobHandler JobHandler JobHandler
limit chan bool limit chan bool
runningJobs int
} }
// Return a worker. // Return a worker.
@ -35,13 +39,14 @@ type Worker struct {
// If limit is set to Unlimited(=0), the worker will grab all jobs // If limit is set to Unlimited(=0), the worker will grab all jobs
// and execute them parallelly. // and execute them parallelly.
// If limit is greater than zero, the number of paralled executing // If limit is greater than zero, the number of paralled executing
// jobs are limited under the number. If limit is assgined to // jobs are limited under the number. If limit is assigned to
// OneByOne(=1), there will be only one job executed in a time. // OneByOne(=1), there will be only one job executed in a time.
func New(limit int) (worker *Worker) { func New(limit int) (worker *Worker) {
worker = &Worker{ worker = &Worker{
agents: make([]*agent, 0, limit), agents: make([]*agent, 0, limit),
funcs: make(jobFuncs), funcs: make(jobFuncs),
in: make(chan *inPack, queueSize), in: make(chan *inPack, queueSize),
runningJobs: 0,
} }
if limit != Unlimited { if limit != Unlimited {
worker.limit = make(chan bool, limit-1) worker.limit = make(chan bool, limit-1)
@ -58,7 +63,7 @@ func (worker *Worker) err(e error) {
// Add a Gearman job server. // Add a Gearman job server.
// //
// addr should be formated as 'host:port'. // addr should be formatted as 'host:port'.
func (worker *Worker) AddServer(net, addr string) (err error) { func (worker *Worker) AddServer(net, addr string) (err error) {
// Create a new job server's client as a agent of server // Create a new job server's client as a agent of server
a, err := newAgent(net, addr, worker) a, err := newAgent(net, addr, worker)
@ -159,7 +164,7 @@ func (worker *Worker) handleInPack(inpack *inPack) {
case dtEchoRes: case dtEchoRes:
fallthrough fallthrough
default: default:
worker.customeHandler(inpack) worker.customHandler(inpack)
} }
} }
@ -177,9 +182,13 @@ func (worker *Worker) Ready() (err error) {
return return
} }
} }
for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout) // `once` protects registering worker functions multiple times.
} worker.once.Do(func() {
for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout)
}
})
worker.ready = true worker.ready = true
return return
} }
@ -205,8 +214,8 @@ func (worker *Worker) Work() {
} }
} }
// custome handling warper // custom handling wrapper
func (worker *Worker) customeHandler(inpack *inPack) { func (worker *Worker) customHandler(inpack *inPack) {
if worker.JobHandler != nil { if worker.JobHandler != nil {
if err := worker.JobHandler(inpack); err != nil { if err := worker.JobHandler(inpack); err != nil {
worker.err(err) worker.err(err)
@ -227,6 +236,19 @@ func (worker *Worker) Close() {
} }
} }
func (worker *Worker) Reconnect() error {
worker.Lock()
defer worker.Unlock()
if worker.running == true {
for _, a := range worker.agents {
if err := a.reconnect(); err != nil {
return err
}
}
}
return nil
}
// Echo // Echo
func (worker *Worker) Echo(data []byte) { func (worker *Worker) Echo(data []byte) {
outpack := getOutPack() outpack := getOutPack()
@ -266,11 +288,19 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
err = ErrUnknown err = ErrUnknown
} }
} }
if worker.runningJobs > 0 {
worker.Lock()
worker.runningJobs--
worker.Unlock()
}
}() }()
f, ok := worker.funcs[inpack.fn] f, ok := worker.funcs[inpack.fn]
if !ok { if !ok {
return fmt.Errorf("The function does not exist: %s", inpack.fn) return fmt.Errorf("The function does not exist: %s", inpack.fn)
} }
worker.Lock()
worker.runningJobs++
worker.Unlock()
var r *result var r *result
if f.timeout == 0 { if f.timeout == 0 {
d, e := f.f(inpack) d, e := f.f(inpack)
@ -306,6 +336,37 @@ func (worker *Worker) reRegisterFuncsForAgent(a *agent) {
} }
// Counts running jobs
func (worker *Worker) Count() int {
worker.Lock()
defer worker.Unlock()
return worker.runningJobs
}
// Stops accepting new jobs
func (worker *Worker) Disable() {
worker.Lock()
defer worker.Unlock()
worker.disabled = true
}
// Renewable disabled workers
func (worker *Worker) Enable() {
worker.Lock()
defer worker.Unlock()
worker.disabled = false
}
func (worker *Worker) IsDisabled() bool {
worker.Lock()
defer worker.Unlock()
return worker.disabled
}
// inner result // inner result
type result struct { type result struct {
data []byte data []byte

View File

@ -15,7 +15,7 @@ var gearman_ready chan bool
var kill_gearman chan bool var kill_gearman chan bool
var bye chan bool var bye chan bool
func init() { func init2() {
if check_gearman_present() { if check_gearman_present() {
panic(`Something already listening on our testing port. Chickening out of testing with it!`) panic(`Something already listening on our testing port. Chickening out of testing with it!`)

View File

@ -5,6 +5,8 @@ import (
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/mikespook/gearman-go/client"
"fmt"
) )
var worker *Worker var worker *Worker
@ -223,3 +225,98 @@ func TestWorkWithoutReadyWithPanic(t *testing.T) {
} }
} }
func TestDisableWorkersAndCountRunningJobs(t *testing.T) {
worker := New(Unlimited)
defer worker.Close()
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
worker.Ready()
var wg sync.WaitGroup
handler := func(job Job) ([]byte, error) {
fmt.Println("running job")
time.Sleep(time.Second*20)
fmt.Println("done")
wg.Done()
return nil, nil
}
if err := worker.AddFunc("handler", handler, 0); err != nil {
wg.Done()
t.Error(err)
}
//worker.JobHandler = handler
worker.ErrorHandler = func(err error) {
t.Fatal("shouldn't have received an error")
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
var cli *client.Client
var err error
if cli, err = client.New(client.Network, "127.0.0.1:4730"); err != nil {
t.Fatal(err)
}
cli.ErrorHandler = func(e error) {
t.Error(e)
}
worker.Disable()
if worker.IsDisabled() {
wg.Add(1)
_, err = cli.Do("handler", bytes.Repeat([]byte("a"), 50), client.JobHigh, func(res *client.Response) {
})
if err != nil {
t.Error(err)
}
wg.Add(1)
_, err = cli.Do("handler", bytes.Repeat([]byte("a"), 50), client.JobHigh, func(res *client.Response) {
})
if err != nil {
t.Error(err)
}
go func () {
for {
time.Sleep(time.Second*10)
if worker.Count() > 0 {
fmt.Println("worker enabled", worker.Count())
break;
} else {
fmt.Println("worker do not have any jobs")
}
}
}()
time.Sleep(time.Second*50)
wg.Add(1)
_, err = cli.Do("handler", bytes.Repeat([]byte("a"), 50), client.JobHigh, func(res *client.Response) {
})
if err != nil {
t.Error(err)
}
worker.Enable()
if !worker.IsDisabled() {
fmt.Println("worker is enabled")
time.Sleep(time.Second)
for i := 1; i < 10; i++ {
fmt.Println("Running Job", worker.Count())
}
} else {
t.Fatal("worker should enabled now")
}
wg.Wait()
} else {
t.Fatal("worker should disabled")
}
}