complated in/out-pack name's refactoring

This commit is contained in:
Xing Xing 2013-12-21 11:09:13 +08:00
parent e9c29799fb
commit a33a6cde32
5 changed files with 152 additions and 187 deletions

View File

@ -39,38 +39,10 @@ func (a *agent) Connect() (err error) {
} }
func (a *agent) Work() { func (a *agent) Work() {
go a.readLoop() var inpack *inPack
var resp *Response
var l int var l int
var err error var err error
var data, leftdata []byte var data, leftdata []byte
for data = range a.in {
if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
}
l = len(data)
if l < MIN_PACKET_LEN { // not enough data
leftdata = data
continue
}
if resp, l, err = decodeResponse(data); err != nil {
a.worker.err(err)
continue
}
leftdata = nil
resp.agentId = a.net + a.addr
a.worker.in <- resp
if len(data) > l {
leftdata = data[l:]
}
}
}
// read data from socket
func (a *agent) readLoop() {
var data []byte
var err error
for a.isConn { for a.isConn {
if data, err = a.read(BUFFER_SIZE); err != nil { if data, err = a.read(BUFFER_SIZE); err != nil {
if err == ErrConnClosed { if err == ErrConnClosed {
@ -79,9 +51,24 @@ func (a *agent) readLoop() {
a.worker.err(err) a.worker.err(err)
continue continue
} }
a.in <- data if len(leftdata) > 0 { // some data left for processing
data = append(leftdata, data...)
}
if len(data) < MIN_PACKET_LEN { // not enough data
leftdata = data
continue
}
if inpack, l, err = decodeInPack(data); err != nil {
a.worker.err(err)
continue
}
leftdata = nil
inpack.a = a
a.worker.in <- inpack
if len(data) > l {
leftdata = data[l:]
}
} }
close(a.in)
} }
func (a *agent) Close() { func (a *agent) Close() {
@ -115,9 +102,9 @@ func (a *agent) read(length int) (data []byte, err error) {
} }
// Internal write the encoded job. // Internal write the encoded job.
func (a *agent) write(req *request) (err error) { func (a *agent) write(outpack *outPack) (err error) {
var n int var n int
buf := req.Encode() buf := outpack.Encode()
for i := 0; i < len(buf); i += n { for i := 0; i < len(buf); i += n {
n, err = a.conn.Write(buf[i:]) n, err = a.conn.Write(buf[i:])
if err != nil { if err != nil {

View File

@ -9,23 +9,70 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"encoding/binary" "encoding/binary"
"strconv"
) )
// Worker side job // Worker side job
type InPack struct { type inPack struct {
DataType uint32 dataType uint32
Data []byte data []byte
Handle, UniqueId, Fn string handle, uniqueId, fn string
a *agent a *agent
} }
// Create a new job // Create a new job
func getInPack() (resp *InPack) { func getInPack() *inPack {
return &InPack{} return &inPack{}
} }
func (inpack *inPack) Data() []byte {
return inpack.data
}
// Send some datas to client.
// Using this in a job's executing.
func (inpack *inPack) SendData(data []byte) {
outpack := getOutPack()
outpack.dataType = WORK_DATA
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl + 1:], data)
inpack.a.write(outpack)
}
func (inpack *inPack) SendWarning(data []byte) {
outpack := getOutPack()
outpack.dataType = WORK_WARNING
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl + 1:], data)
inpack.a.write(outpack)
}
// Update status.
// Tall client how many percent job has been executed.
func (inpack *inPack) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
outpack := getOutPack()
outpack.dataType = WORK_STATUS
hl := len(inpack.handle)
nl := len(n)
dl := len(d)
outpack.data = getBuffer(hl + nl + dl + 3)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], n)
copy(outpack.data[hl+nl+2:], d)
inpack.a.write(outpack)
}
// Decode job from byte slice // Decode job from byte slice
func decodeInPack(data []byte) (resp *InPack, l int, err error) { func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %V", data) err = fmt.Errorf("Invalid data: %V", data)
return return
@ -36,26 +83,26 @@ func decodeInPack(data []byte) (resp *InPack, l int, err error) {
err = fmt.Errorf("Invalid data: %V", data) err = fmt.Errorf("Invalid data: %V", data)
return return
} }
resp = getInPack() inpack = getInPack()
resp.DataType = binary.BigEndian.Uint32(data[4:8]) inpack.dataType = binary.BigEndian.Uint32(data[4:8])
switch resp.DataType { switch inpack.dataType {
case JOB_ASSIGN: case JOB_ASSIGN:
s := bytes.SplitN(dt, []byte{'\x00'}, 3) s := bytes.SplitN(dt, []byte{'\x00'}, 3)
if len(s) == 3 { if len(s) == 3 {
resp.Handle = string(s[0]) inpack.handle = string(s[0])
resp.Fn = string(s[1]) inpack.fn = string(s[1])
resp.Data = s[2] inpack.data = s[2]
} }
case JOB_ASSIGN_UNIQ: case JOB_ASSIGN_UNIQ:
s := bytes.SplitN(dt, []byte{'\x00'}, 4) s := bytes.SplitN(dt, []byte{'\x00'}, 4)
if len(s) == 4 { if len(s) == 4 {
resp.Handle = string(s[0]) inpack.handle = string(s[0])
resp.Fn = string(s[1]) inpack.fn = string(s[1])
resp.UniqueId = string(s[2]) inpack.uniqueId = string(s[2])
resp.Data = s[3] inpack.data = s[3]
} }
default: default:
resp.Data = dt inpack.data = dt
} }
l = dl + MIN_PACKET_LEN l = dl + MIN_PACKET_LEN
return return

View File

@ -1,68 +1,8 @@
package worker package worker
import (
"strconv"
)
type Job interface { type Job interface {
Data() []byte Data() []byte
SendWarning(data []byte) SendWarning(data []byte)
SendData(data []byte) SendData(data []byte)
UpdateStatus(numerator, denominator int) UpdateStatus(numerator, denominator int)
} }
type _job struct {
a *agent
Handle string
data []byte
}
func getJob() *_job {
return &_job{}
}
func (j *_job) Data() []byte {
return j.data
}
// Send some datas to client.
// Using this in a job's executing.
func (j *_job) SendData(data []byte) {
req := getRequest()
req.DataType = WORK_DATA
hl := len(j.Handle)
l := hl + len(data) + 1
req.Data = getBuffer(l)
copy(req.Data, []byte(j.Handle))
copy(req.Data[hl + 1:], data)
j.a.write(req)
}
func (j *_job) SendWarning(data []byte) {
req := getRequest()
req.DataType = WORK_WARNING
hl := len(j.Handle)
l := hl + len(data) + 1
req.Data = getBuffer(l)
copy(req.Data, []byte(j.Handle))
copy(req.Data[hl + 1:], data)
j.a.write(req)
}
// Update status.
// Tall client how many percent job has been executed.
func (j *_job) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
req := getRequest()
req.DataType = WORK_STATUS
hl := len(j.Handle)
nl := len(n)
dl := len(d)
req.Data = getBuffer(hl + nl + dl + 3)
copy(req.Data, []byte(j.Handle))
copy(req.Data[hl+1:], n)
copy(req.Data[hl+nl+2:], d)
j.a.write(req)
}

View File

@ -10,41 +10,41 @@ import (
) )
// Worker side job // Worker side job
type OutPack struct { type outPack struct {
DataType uint32 dataType uint32
Data []byte data []byte
Handle, UniqueId, Fn string handle, uniqueId, fn string
} }
func getOutPack() (req *OutPack) { func getOutPack() (outpack *outPack) {
// TODO pool // TODO pool
return &OutPack{} return &outPack{}
} }
// Encode a job to byte slice // Encode a job to byte slice
func (req *OutPack) Encode() (data []byte) { func (outpack *outPack) Encode() (data []byte) {
var l int var l int
if req.DataType == WORK_FAIL { if outpack.dataType == WORK_FAIL {
l = len(req.Handle) l = len(outpack.handle)
} else { } else {
l = len(req.Data) l = len(outpack.data)
if req.Handle != "" { if outpack.handle != "" {
l += len(req.Handle) + 1 l += len(outpack.handle) + 1
} }
} }
data = getBuffer(l + MIN_PACKET_LEN) data = getBuffer(l + MIN_PACKET_LEN)
binary.BigEndian.PutUint32(data[:4], REQ) binary.BigEndian.PutUint32(data[:4], REQ)
binary.BigEndian.PutUint32(data[4:8], req.DataType) binary.BigEndian.PutUint32(data[4:8], outpack.dataType)
binary.BigEndian.PutUint32(data[8:MIN_PACKET_LEN], uint32(l)) binary.BigEndian.PutUint32(data[8:MIN_PACKET_LEN], uint32(l))
i := MIN_PACKET_LEN i := MIN_PACKET_LEN
if req.Handle != "" { if outpack.handle != "" {
hi := len(req.Handle) + i hi := len(outpack.handle) + i
copy(data[i:hi], []byte(req.Handle)) copy(data[i:hi], []byte(outpack.handle))
if req.DataType != WORK_FAIL { if outpack.dataType != WORK_FAIL {
data[hi] = '\x00' data[hi] = '\x00'
} }
i = i + hi i = i + hi
} }
copy(data[i:], req.Data) copy(data[i:], outpack.data)
return return
} }

View File

@ -39,7 +39,7 @@ func foobar(job *Job) (data []byte, err os.Error) {
type Worker struct { type Worker struct {
agents map[string]*agent agents map[string]*agent
funcs JobFuncs funcs JobFuncs
in chan *Response in chan *inPack
running bool running bool
limit chan bool limit chan bool
@ -55,7 +55,7 @@ func New(l int) (worker *Worker) {
worker = &Worker{ worker = &Worker{
agents: make(map[string]*agent, QUEUE_SIZE), agents: make(map[string]*agent, QUEUE_SIZE),
funcs: make(JobFuncs), funcs: make(JobFuncs),
in: make(chan *Response, QUEUE_SIZE), in: make(chan *inPack, QUEUE_SIZE),
} }
if l != Unlimited { if l != Unlimited {
worker.limit = make(chan bool, l) worker.limit = make(chan bool, l)
@ -85,9 +85,9 @@ func (worker *Worker) AddServer(net, addr string) (err error) {
// Write a job to job server. // Write a job to job server.
// Here, the job's mean is not the oraginal mean. // Here, the job's mean is not the oraginal mean.
// Just looks like a network package for job's result or tell job server, there was a fail. // Just looks like a network package for job's result or tell job server, there was a fail.
func (worker *Worker) broadcast(req *request) { func (worker *Worker) broadcast(outpack *outPack) {
for _, v := range worker.agents { for _, v := range worker.agents {
v.write(req) v.write(outpack)
} }
} }
@ -110,19 +110,19 @@ func (worker *Worker) AddFunc(funcname string,
// inner add function // inner add function
func (worker *Worker) addFunc(funcname string, timeout uint32) { func (worker *Worker) addFunc(funcname string, timeout uint32) {
req := getRequest() outpack := getOutPack()
if timeout == 0 { if timeout == 0 {
req.DataType = CAN_DO outpack.dataType = CAN_DO
req.Data = []byte(funcname) outpack.data = []byte(funcname)
} else { } else {
req.DataType = CAN_DO_TIMEOUT outpack.dataType = CAN_DO_TIMEOUT
l := len(funcname) l := len(funcname)
req.Data = getBuffer(l + 5) outpack.data = getBuffer(l + 5)
copy(req.Data, []byte(funcname)) copy(outpack.data, []byte(funcname))
req.Data[l] = '\x00' outpack.data[l] = '\x00'
binary.BigEndian.PutUint32(req.Data[l + 1:], timeout) binary.BigEndian.PutUint32(outpack.data[l + 1:], timeout)
} }
worker.broadcast(req) worker.broadcast(outpack)
} }
// Remove a function. // Remove a function.
@ -141,27 +141,27 @@ func (worker *Worker) RemoveFunc(funcname string) (err error) {
// inner remove function // inner remove function
func (worker *Worker) removeFunc(funcname string) { func (worker *Worker) removeFunc(funcname string) {
req := getRequest() outpack := getOutPack()
req.DataType = CANT_DO outpack.dataType = CANT_DO
req.Data = []byte(funcname) outpack.data = []byte(funcname)
worker.broadcast(req) worker.broadcast(outpack)
} }
func (worker *Worker) dealResp(resp *Response) { func (worker *Worker) handleInPack(inpack *inPack) {
defer func() { defer func() {
if worker.running && worker.limit != nil { if worker.running && worker.limit != nil {
<-worker.limit <-worker.limit
} }
}() }()
switch resp.DataType { switch inpack.dataType {
case ERROR: case ERROR:
worker.err(GetError(resp.Data)) worker.err(GetError(inpack.data))
case JOB_ASSIGN, JOB_ASSIGN_UNIQ: case JOB_ASSIGN, JOB_ASSIGN_UNIQ:
if err := worker.exec(resp); err != nil { if err := worker.exec(inpack); err != nil {
worker.err(err) worker.err(err)
} }
default: default:
worker.handleResponse(resp) worker.customeHandler(inpack)
} }
} }
@ -181,22 +181,16 @@ func (worker *Worker) Work() {
for funcname, f := range worker.funcs { for funcname, f := range worker.funcs {
worker.addFunc(funcname, f.timeout) worker.addFunc(funcname, f.timeout)
} }
var resp *Response var inpack *inPack
for resp = range worker.in { for inpack = range worker.in {
go worker.dealResp(resp) go worker.handleInPack(inpack)
} }
} }
// job handler // job handler
func (worker *Worker) handleResponse(resp *Response) { func (worker *Worker) customeHandler(inpack *inPack) {
if worker.JobHandler != nil { if worker.JobHandler != nil {
job := getJob() if err := worker.JobHandler(inpack); err != nil {
job.a = worker.agents[resp.agentId]
job.Handle = resp.Handle
if resp.DataType == ECHO_RES {
job.data = resp.Data
}
if err := worker.JobHandler(job); err != nil {
worker.err(err) worker.err(err)
} }
} }
@ -213,32 +207,32 @@ func (worker *Worker) Close() {
// Send a something out, get the samething back. // Send a something out, get the samething back.
func (worker *Worker) Echo(data []byte) { func (worker *Worker) Echo(data []byte) {
req := getRequest() outpack := getOutPack()
req.DataType = ECHO_REQ outpack.dataType = ECHO_REQ
req.Data = data outpack.data = data
worker.broadcast(req) worker.broadcast(outpack)
} }
// Remove all of functions. // Remove all of functions.
// Both from the worker or job servers. // Both from the worker or job servers.
func (worker *Worker) Reset() { func (worker *Worker) Reset() {
req := getRequest() outpack := getOutPack()
req.DataType = RESET_ABILITIES outpack.dataType = RESET_ABILITIES
worker.broadcast(req) worker.broadcast(outpack)
worker.funcs = make(JobFuncs) worker.funcs = make(JobFuncs)
} }
// Set the worker's unique id. // Set the worker's unique id.
func (worker *Worker) SetId(id string) { func (worker *Worker) SetId(id string) {
worker.Id = id worker.Id = id
req := getRequest() outpack := getOutPack()
req.DataType = SET_CLIENT_ID outpack.dataType = SET_CLIENT_ID
req.Data = []byte(id) outpack.data = []byte(id)
worker.broadcast(req) worker.broadcast(outpack)
} }
// Execute the job. And send back the result. // Execute the job. And send back the result.
func (worker *Worker) exec(resp *Response) (err error) { func (worker *Worker) exec(inpack *inPack) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
if e, ok := r.(error); ok { if e, ok := r.(error); ok {
@ -248,34 +242,31 @@ func (worker *Worker) exec(resp *Response) (err error) {
} }
} }
}() }()
f, ok := worker.funcs[resp.Fn] f, ok := worker.funcs[inpack.fn]
if !ok { if !ok {
return fmt.Errorf("The function does not exist: %s", resp.Fn) return fmt.Errorf("The function does not exist: %s", inpack.fn)
} }
var r *result var r *result
job := getJob()
job.a = worker.agents[resp.agentId]
job.Handle = resp.Handle
if f.timeout == 0 { if f.timeout == 0 {
d, e := f.f(job) d, e := f.f(inpack)
r = &result{data: d, err: e} r = &result{data: d, err: e}
} else { } else {
r = execTimeout(f.f, job, time.Duration(f.timeout)*time.Second) r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
} }
req := getRequest() outpack := getOutPack()
if r.err == nil { if r.err == nil {
req.DataType = WORK_COMPLETE outpack.dataType = WORK_COMPLETE
} else { } else {
if r.data == nil { if r.data == nil {
req.DataType = WORK_FAIL outpack.dataType = WORK_FAIL
} else { } else {
req.DataType = WORK_EXCEPTION outpack.dataType = WORK_EXCEPTION
} }
err = r.err err = r.err
} }
req.Data = r.data outpack.data = r.data
if worker.running { if worker.running {
job.a.write(req) inpack.a.write(outpack)
} }
return return
} }