refactoring complated

This commit is contained in:
Xing Xing 2013-12-24 14:35:33 +08:00
parent 2ee2be0891
commit 2da5f29cd1
4 changed files with 69 additions and 70 deletions

View File

@ -1,56 +1,60 @@
package main package main
import ( import (
"os" "github.com/mikespook/gearman-go/worker"
"log" "github.com/mikespook/golib/signal"
"time" "log"
"strings" "os"
"github.com/mikespook/golib/signal" "strings"
"github.com/mikespook/gearman-go/worker" "time"
) )
func ToUpper(job worker.Job) ([]byte, error) { func ToUpper(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data()) log.Printf("ToUpper: Data=[%s]\n", job.Data())
data := []byte(strings.ToUpper(string(job.Data()))) data := []byte(strings.ToUpper(string(job.Data())))
return data, nil return data, nil
} }
func ToUpperDelay10(job worker.Job) ([]byte, error) { func ToUpperDelay10(job worker.Job) ([]byte, error) {
log.Printf("ToUpper: Data=[%s]\n", job.Data()) log.Printf("ToUpper: Data=[%s]\n", job.Data())
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
data := []byte(strings.ToUpper(string(job.Data()))) data := []byte(strings.ToUpper(string(job.Data())))
return data, nil return data, nil
} }
func main() { func main() {
log.Println("Starting ...") log.Println("Starting ...")
defer log.Println("Shutdown complete!") defer log.Println("Shutdown complete!")
w := worker.New(worker.Unlimited) w := worker.New(worker.Unlimited)
defer w.Close() defer w.Close()
w.ErrorHandler = func(e error) { w.ErrorHandler = func(e error) {
log.Println(e) log.Println(e)
if e == worker.ErrConnection { if e == worker.ErrConnection {
proc, err := os.FindProcess(os.Getpid()) proc, err := os.FindProcess(os.Getpid())
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
if err := proc.Signal(os.Interrupt); err != nil { if err := proc.Signal(os.Interrupt); err != nil {
log.Println(err) log.Println(err)
} }
} }
} }
w.JobHandler = func(job worker.Job) error { w.JobHandler = func(job worker.Job) error {
log.Printf("Data=%s\n", job.Data()) log.Printf("Data=%s\n", job.Data())
return nil return nil
} }
w.AddServer("tcp4", "127.0.0.1:4730") w.AddServer("tcp4", "127.0.0.1:4730")
w.AddFunc("ToUpper", ToUpper, worker.Immediately) w.AddFunc("ToUpper", ToUpper, worker.Immediately)
w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5) w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20) w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately) w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately)
w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately) w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately)
go w.Work() if err := w.Ready(); err != nil {
sh := signal.NewHandler() log.Fatal(err)
sh.Bind(os.Interrupt, func() bool {return true}) return
sh.Loop() }
go w.Work()
sh := signal.NewHandler()
sh.Bind(os.Interrupt, func() bool { return true })
sh.Loop()
} }

View File

@ -15,7 +15,6 @@ type agent struct {
worker *Worker worker *Worker
in chan []byte in chan []byte
net, addr string net, addr string
isConn bool
} }
// Create the agent of job server. // Create the agent of job server.
@ -34,16 +33,16 @@ func (a *agent) Connect() (err error) {
if err != nil { if err != nil {
return return
} }
a.isConn = true go a.work()
return return
} }
func (a *agent) Work() { func (a *agent) work() {
var inpack *inPack var inpack *inPack
var l int var l int
var err error var err error
var data, leftdata []byte var data, leftdata []byte
for a.isConn { for {
if data, err = a.read(BUFFER_SIZE); err != nil { if data, err = a.read(BUFFER_SIZE); err != nil {
if err == ErrConnClosed { if err == ErrConnClosed {
break break
@ -72,9 +71,7 @@ func (a *agent) Work() {
} }
func (a *agent) Close() { func (a *agent) Close() {
if a.conn != nil { a.conn.Close()
a.conn.Close()
}
} }
func (a *agent) Grab() { func (a *agent) Grab() {
@ -96,13 +93,11 @@ func (a *agent) read(length int) (data []byte, err error) {
// read until data can be unpacked // read until data can be unpacked
for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n { for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n {
if n, err = a.conn.Read(buf); err != nil { if n, err = a.conn.Read(buf); err != nil {
if !a.isConn {
err = ErrConnClosed
return
}
if err == io.EOF && n == 0 { if err == io.EOF && n == 0 {
if data == nil { if data == nil {
err = ErrConnection err = ErrConnection
} else {
err = ErrConnClosed
} }
} }
return return

View File

@ -6,6 +6,7 @@
package worker package worker
import ( import (
// "fmt"
"encoding/binary" "encoding/binary"
) )
@ -43,8 +44,10 @@ func (outpack *outPack) Encode() (data []byte) {
if outpack.dataType != WORK_FAIL { if outpack.dataType != WORK_FAIL {
data[hi] = '\x00' data[hi] = '\x00'
} }
i = i + hi i = hi + 1
}
if outpack.dataType != WORK_FAIL {
copy(data[i:], outpack.data)
} }
copy(data[i:], outpack.data)
return return
} }

View File

@ -174,9 +174,7 @@ func (worker *Worker) Ready() (err error) {
if err = v.Connect(); err != nil { if err = v.Connect(); err != nil {
return return
} }
go v.Work()
} }
worker.Reset()
for funcname, f := range worker.funcs { for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout) worker.addFunc(funcname, f.timeout)
} }
@ -185,12 +183,10 @@ func (worker *Worker) Ready() (err error) {
// Main loop // Main loop
func (worker *Worker) Work() { func (worker *Worker) Work() {
defer func() {
for _, v := range worker.agents {
v.Close()
}
}()
worker.running = true worker.running = true
for _, v := range worker.agents {
v.Grab()
}
var inpack *inPack var inpack *inPack
for inpack = range worker.in { for inpack = range worker.in {
go worker.handleInPack(inpack) go worker.handleInPack(inpack)
@ -263,19 +259,20 @@ func (worker *Worker) exec(inpack *inPack) (err error) {
} else { } else {
r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second) r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
} }
outpack := getOutPack()
if r.err == nil {
outpack.dataType = WORK_COMPLETE
} else {
if r.data == nil {
outpack.dataType = WORK_FAIL
} else {
outpack.dataType = WORK_EXCEPTION
}
err = r.err
}
outpack.data = r.data
if worker.running { if worker.running {
outpack := getOutPack()
if r.err == nil {
outpack.dataType = WORK_COMPLETE
} else {
if len(r.data) == 0 {
outpack.dataType = WORK_FAIL
} else {
outpack.dataType = WORK_EXCEPTION
}
err = r.err
}
outpack.handle = inpack.handle
outpack.data = r.data
inpack.a.write(outpack) inpack.a.write(outpack)
inpack.a.Grab() inpack.a.Grab()
} }