This commit is contained in:
mikespook 2012-09-25 16:00:57 +08:00
commit 665bef9eb2
10 changed files with 231 additions and 53 deletions

View File

@ -27,3 +27,4 @@ dee83cac69e07ed4f3efde162d981f5855101845 go1-0.0.1
0000000000000000000000000000000000000000 0.1.1 0000000000000000000000000000000000000000 0.1.1
0000000000000000000000000000000000000000 0.1.1 0000000000000000000000000000000000000000 0.1.1
eea0878b43d209630d7b342b1b99c61c839b454f 0.1.1 eea0878b43d209630d7b342b1b99c61c839b454f 0.1.1
67f11fa2301f5e74a436883f66ce7fb121a4df82 0.1.2

View File

@ -32,7 +32,7 @@ Install both:
log.Println(e) log.Println(e)
} }
w.AddServer("127.0.0.1:4730") w.AddServer("127.0.0.1:4730")
w.AddFunc("ToUpper", ToUpper, 0) w.AddFunc("ToUpper", ToUpper, worker.Immediately)
w.AddFunc("ToUpperTimeOut5", ToUpper, 5) w.AddFunc("ToUpperTimeOut5", ToUpper, 5)
w.Work() w.Work()
@ -63,6 +63,7 @@ Xing Xing <mikespook@gmail.com>
# History # History
* 0.1.2 Fixed issues: timeout executing, resources leaking.
* 0.1.1 Fixed the issue of grabbing jobs. * 0.1.1 Fixed the issue of grabbing jobs.
* 0.1 Code refactoring; Redesign the API. * 0.1 Code refactoring; Redesign the API.
* 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API. * 0.0.1 Initial implementation, ugly code-style, slow profermance and unstable API.

View File

@ -5,6 +5,11 @@
package common package common
import (
"bytes"
"encoding/binary"
)
const ( const (
NETWORK = "tcp" NETWORK = "tcp"
// queue size // queue size
@ -55,12 +60,25 @@ const (
// Decode [4]byte to uint32 // Decode [4]byte to uint32
func BytesToUint32(buf [4]byte) uint32 { func BytesToUint32(buf [4]byte) uint32 {
return uint32(buf[0])<<24 + uint32(buf[1])<<16 + uint32(buf[2])<<8 + var r uint32
uint32(buf[3]) b := bytes.NewBuffer(buf[:])
err := binary.Read(b, binary.BigEndian, &r)
if err != nil {
return 0
}
return r
} }
// Encode uint32 to [4]byte // Encode uint32 to [4]byte
func Uint32ToBytes(i uint32) [4]byte { func Uint32ToBytes(i uint32) [4]byte {
return [4]byte{byte((i >> 24) & 0xff), byte((i >> 16) & 0xff), buf := new(bytes.Buffer)
byte((i >> 8) & 0xff), byte(i & 0xff),} err := binary.Write(buf, binary.BigEndian, i)
if err != nil {
return [4]byte{0, 0, 0, 0}
}
var r [4]byte
for k, v := range buf.Bytes() {
r[k] = v
}
return r
} }

38
common/gearman_test.go Normal file
View File

@ -0,0 +1,38 @@
package common
import (
"bytes"
"testing"
)
var (
testCase = map[uint32][4]byte {
0: [...]byte{0, 0, 0, 0},
1: [...]byte{0, 0, 0, 1},
256: [...]byte{0, 0, 1, 0},
256 * 256: [...]byte{0, 1, 0, 0},
256 * 256 * 256: [...]byte{1, 0, 0, 0},
256 * 256 * 256 + 256 * 256 + 256 + 1: [...]byte{1, 1, 1, 1},
4294967295 : [...]byte{0xFF, 0xFF, 0xFF, 0xFF},
}
)
func TestUint32ToBytes(t *testing.T) {
for k, v := range testCase {
b := Uint32ToBytes(k)
if bytes.Compare(b[:], v[:]) != 0 {
t.Errorf("%v was expected, but %v was got", v, b)
}
}
}
func TestBytesToUint32s(t *testing.T) {
for k, v := range testCase {
u := BytesToUint32([4]byte(v))
if u != k {
t.Errorf("%v was expected, but %v was got", k, u)
}
}
}

View File

@ -12,10 +12,38 @@ def check_request_status(job_request):
def main(): def main():
client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730']) client = gearman.GearmanClient(['localhost:4730', 'otherhost:4730'])
try:
completed_job_request = client.submit_job("ToUpper", "arbitrary binary data") completed_job_request = client.submit_job("ToUpper", "arbitrary binary data")
check_request_status(completed_job_request) check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("ToUpperTimeOut5", "arbitrary binary data")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("ToUpperTimeOut20", "arbitrary binary data")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("SysInfo", "")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
try:
completed_job_request = client.submit_job("MemInfo", "")
check_request_status(completed_job_request)
except Exception as e:
print type(e)
if __name__ == '__main__': if __name__ == '__main__':
for i in range(100):
main() main()

View File

@ -3,18 +3,29 @@ package main
import ( import (
"os" "os"
"log" "log"
"time"
"strings" "strings"
"bitbucket.org/mikespook/golib/signal" "bitbucket.org/mikespook/golib/signal"
"bitbucket.org/mikespook/gearman-go/worker" "bitbucket.org/mikespook/gearman-go/worker"
) )
func ToUpper(job *worker.Job) ([]byte, error) { func ToUpper(job *worker.Job) ([]byte, error) {
log.Printf("Handle=[%s]; UID=[%s], Data=[%s]\n", log.Printf("ToUpper: Handle=[%s]; UID=[%s], Data=[%s]\n",
job.Handle, job.UniqueId, job.Data) job.Handle, job.UniqueId, job.Data)
data := []byte(strings.ToUpper(string(job.Data))) data := []byte(strings.ToUpper(string(job.Data)))
return data, nil return data, nil
} }
func ToUpperDelay10(job *worker.Job) ([]byte, error) {
log.Printf("ToUpperDelay10: Handle=[%s]; UID=[%s], Data=[%s]\n",
job.Handle, job.UniqueId, job.Data)
time.Sleep(10 * time.Second)
data := []byte(strings.ToUpper(string(job.Data)))
return data, nil
}
func main() { func main() {
log.Println("Starting ...") log.Println("Starting ...")
defer log.Println("Shutdown complete!") defer log.Println("Shutdown complete!")
@ -33,13 +44,16 @@ func main() {
} }
} }
w.JobHandler = func(job *worker.Job) error { w.JobHandler = func(job *worker.Job) error {
log.Printf("H=%s, UID=%s, Data=%s\n", job.Handle, log.Printf("H=%s, UID=%s, Data=%s, DataType=%d\n", job.Handle,
job.UniqueId, job.Data) job.UniqueId, job.Data, job.DataType)
return nil return nil
} }
w.AddServer("127.0.0.1:4730") w.AddServer("127.0.0.1:4730")
w.AddFunc("ToUpper", ToUpper, 0) w.AddFunc("ToUpper", ToUpper, worker.Immediately)
//w.AddFunc("ToUpperTimeOut5", ToUpper, 5) w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
w.AddFunc("SysInfo", worker.SysInfo, worker.Immediately)
w.AddFunc("MemInfo", worker.MemInfo, worker.Immediately)
go w.Work() go w.Work()
sh := signal.NewHandler() sh := signal.NewHandler()
sh.Bind(os.Interrupt, func() bool {return true}) sh.Bind(os.Interrupt, func() bool {return true})

View File

@ -7,7 +7,6 @@ package worker
import ( import (
"io" "io"
"net" "net"
"time"
"bitbucket.org/mikespook/gearman-go/common" "bitbucket.org/mikespook/gearman-go/common"
) )
@ -33,6 +32,8 @@ func newAgent(addr string, worker *Worker) (a *agent, err error) {
in: make(chan []byte, common.QUEUE_SIZE), in: make(chan []byte, common.QUEUE_SIZE),
out: make(chan *Job, common.QUEUE_SIZE), out: make(chan *Job, common.QUEUE_SIZE),
} }
// reset abilities
a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil))
return return
} }
@ -52,27 +53,20 @@ func (a *agent) outLoop() {
// inputing loop // inputing loop
func (a *agent) inLoop() { func (a *agent) inLoop() {
defer func() { defer func() {
recover() if r := recover(); r != nil {
a.worker.err(common.Errorf("Exiting: %s", r))
}
close(a.in) close(a.in)
close(a.out) close(a.out)
a.worker.removeAgent(a) a.worker.removeAgent(a)
}() }()
noop := true
go func() {
for a.worker.running {
if noop && len(a.in) == 0 {
a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil))
}
<-time.After(time.Second)
}
}()
for a.worker.running { for a.worker.running {
a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil))
RESTART: RESTART:
// got noop msg and in queue is zero, grab job // got noop msg and in queue is zero, grab job
rel, err := a.read() rel, err := a.read()
if err != nil { if err != nil {
if err == common.ErrConnection { if err == common.ErrConnection {
// TODO: reconnection
for i:= 0; i < 3 && a.worker.running; i++ { for i:= 0; i < 3 && a.worker.running; i++ {
if conn, err := net.Dial(common.NETWORK, a.addr); err != nil { if conn, err := net.Dial(common.NETWORK, a.addr); err != nil {
a.worker.err(common.Errorf("Reconnection: %d faild", i)) a.worker.err(common.Errorf("Reconnection: %d faild", i))
@ -95,10 +89,7 @@ func (a *agent) inLoop() {
} }
switch job.DataType { switch job.DataType {
case common.NOOP: case common.NOOP:
noop = true a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil))
case common.NO_JOB:
noop = false
a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil))
case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN: case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
job.agent = a job.agent = a
a.worker.in <- job a.worker.in <- job

31
worker/func.go Normal file
View File

@ -0,0 +1,31 @@
package worker
import (
"runtime"
"encoding/json"
)
type systemInfo struct {
GOOS, GOARCH, GOROOT, Version string
NumCPU, NumGoroutine int
NumCgoCall int64
}
func SysInfo(job *Job) ([]byte, error) {
return json.Marshal(&systemInfo{
GOOS: runtime.GOOS,
GOARCH: runtime.GOARCH,
GOROOT: runtime.GOROOT(),
Version: runtime.Version(),
NumCPU: runtime.NumCPU(),
NumGoroutine: runtime.NumGoroutine(),
NumCgoCall: runtime.NumCgoCall(),
})
}
var memState runtime.MemStats
func MemInfo(job *Job) ([]byte, error) {
runtime.ReadMemStats(&memState)
return json.Marshal(&memState)
}

View File

@ -16,13 +16,16 @@ type Job struct {
Handle, UniqueId string Handle, UniqueId string
agent *agent agent *agent
magicCode, DataType uint32 magicCode, DataType uint32
c chan bool
} }
// Create a new job // Create a new job
func newJob(magiccode, datatype uint32, data []byte) (job *Job) { func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
return &Job{magicCode: magiccode, return &Job{magicCode: magiccode,
DataType: datatype, DataType: datatype,
Data: data} Data: data,
c: make(chan bool),
}
} }
// Decode job from byte slice // Decode job from byte slice
@ -42,24 +45,30 @@ func decodeJob(data []byte) (job *Job, err error) {
// Encode a job to byte slice // Encode a job to byte slice
func (job *Job) Encode() (data []byte) { func (job *Job) Encode() (data []byte) {
l := len(job.Data) var l int
tl := l if job.DataType == common.WORK_FAIL {
l = len(job.Handle)
} else {
l = len(job.Data)
if job.Handle != "" { if job.Handle != "" {
tl += len(job.Handle) + 1 l += len(job.Handle) + 1
} }
data = make([]byte, 0, tl + 12) }
data = make([]byte, 0, l + 12)
magiccode := common.Uint32ToBytes(job.magicCode) magiccode := common.Uint32ToBytes(job.magicCode)
datatype := common.Uint32ToBytes(job.DataType) datatype := common.Uint32ToBytes(job.DataType)
datalength := common.Uint32ToBytes(uint32(tl)) datalength := common.Uint32ToBytes(uint32(l))
data = append(data, magiccode[:]...) data = append(data, magiccode[:]...)
data = append(data, datatype[:]...) data = append(data, datatype[:]...)
data = append(data, datalength[:]...) data = append(data, datalength[:]...)
if job.Handle != "" { if job.Handle != "" {
data = append(data, []byte(job.Handle)...) data = append(data, []byte(job.Handle)...)
if job.DataType != common.WORK_FAIL {
data = append(data, 0) data = append(data, 0)
} }
}
data = append(data, job.Data...) data = append(data, job.Data...)
return return
} }
@ -88,3 +97,19 @@ func (job *Job) UpdateStatus(numerator, denominator int) {
result = append(result, d...) result = append(result, d...)
job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result)) job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result))
} }
// close the job
func (job *Job) Close() {
close(job.c)
}
// cancel the job executing
func (job *Job) cancel() {
defer func() {recover()}()
job.c <- true
}
// When a job was canceled, return a true form a channel
func (job *Job) Canceled() <-chan bool {
return job.c
}

View File

@ -5,6 +5,7 @@
package worker package worker
import ( import (
"time"
"bytes" "bytes"
"bitbucket.org/mikespook/gearman-go/common" "bitbucket.org/mikespook/gearman-go/common"
) )
@ -12,6 +13,8 @@ import (
const ( const (
Unlimited = 0 Unlimited = 0
OneByOne = 1 OneByOne = 1
Immediately = 0
) )
var ( var (
@ -20,7 +23,7 @@ var (
// Job handler // Job handler
type JobHandler func(*Job) error type JobHandler func(*Job) error
type JobFunc func(job *Job) ([]byte, error) type JobFunc func(*Job) ([]byte, error)
// The definition of the callback function. // The definition of the callback function.
type jobFunc struct { type jobFunc struct {
@ -178,21 +181,20 @@ func (worker *Worker) Work() {
var job *Job var job *Job
for ok { for ok {
if job, ok = <-worker.in; ok { if job, ok = <-worker.in; ok {
go func() {
defer job.Close()
switch job.DataType { switch job.DataType {
case common.ERROR: case common.ERROR:
go func() {
_, err := common.GetError(job.Data) _, err := common.GetError(job.Data)
worker.err(err) worker.err(err)
}()
case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ: case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
go func() {
if err := worker.exec(job); err != nil { if err := worker.exec(job); err != nil {
worker.err(err) worker.err(err)
} }
}()
default: default:
go worker.handleJob(job) worker.handleJob(job)
} }
}()
} }
} }
} }
@ -271,21 +273,28 @@ func (worker *Worker) exec(job *Job) (err error) {
if !ok { if !ok {
return common.Errorf("The function does not exist: %s", funcname) return common.Errorf("The function does not exist: %s", funcname)
} }
result, err := f.f(job) var r *result
if f.timeout == 0 {
d, e := f.f(job)
r = &result{data:d, err: e}
} else {
r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second)
}
var datatype uint32 var datatype uint32
if err == nil { if r.err == nil {
datatype = common.WORK_COMPLETE datatype = common.WORK_COMPLETE
} else { } else {
if result == nil { if r.data == nil {
datatype = common.WORK_FAIL datatype = common.WORK_FAIL
} else { } else {
datatype = common.WORK_EXCEPTION datatype = common.WORK_EXCEPTION
} }
err = r.err
} }
job.magicCode = common.REQ job.magicCode = common.REQ
job.DataType = datatype job.DataType = datatype
job.Data = result job.Data = r.data
job.agent.WriteJob(job) job.agent.WriteJob(job)
return return
} }
@ -300,3 +309,25 @@ func (worker *Worker) removeAgent(a *agent) {
worker.err(common.ErrNoActiveAgent) worker.err(common.ErrNoActiveAgent)
} }
} }
type result struct {
data []byte
err error
}
func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) {
rslt := make(chan *result)
defer close(rslt)
go func() {
defer func() {recover()}()
d, e := f(job)
rslt <- &result{data: d, err: e}
}()
select {
case r = <-rslt:
case <-time.After(timeout):
go job.cancel()
return &result{err:common.ErrExecTimeOut}
}
return r
}