diff --git a/worker/agent.go b/worker/agent.go index 7d4fa56..7a9f70d 100644 --- a/worker/agent.go +++ b/worker/agent.go @@ -39,38 +39,10 @@ func (a *agent) Connect() (err error) { } func (a *agent) Work() { - go a.readLoop() - - var resp *Response + var inpack *inPack var l int var err error var data, leftdata []byte - for data = range a.in { - if len(leftdata) > 0 { // some data left for processing - data = append(leftdata, data...) - } - l = len(data) - if l < MIN_PACKET_LEN { // not enough data - leftdata = data - continue - } - if resp, l, err = decodeResponse(data); err != nil { - a.worker.err(err) - continue - } - leftdata = nil - resp.agentId = a.net + a.addr - a.worker.in <- resp - if len(data) > l { - leftdata = data[l:] - } - } -} - -// read data from socket -func (a *agent) readLoop() { - var data []byte - var err error for a.isConn { if data, err = a.read(BUFFER_SIZE); err != nil { if err == ErrConnClosed { @@ -79,9 +51,24 @@ func (a *agent) readLoop() { a.worker.err(err) continue } - a.in <- data + if len(leftdata) > 0 { // some data left for processing + data = append(leftdata, data...) + } + if len(data) < MIN_PACKET_LEN { // not enough data + leftdata = data + continue + } + if inpack, l, err = decodeInPack(data); err != nil { + a.worker.err(err) + continue + } + leftdata = nil + inpack.a = a + a.worker.in <- inpack + if len(data) > l { + leftdata = data[l:] + } } - close(a.in) } func (a *agent) Close() { @@ -115,9 +102,9 @@ func (a *agent) read(length int) (data []byte, err error) { } // Internal write the encoded job. -func (a *agent) write(req *request) (err error) { +func (a *agent) write(outpack *outPack) (err error) { var n int - buf := req.Encode() + buf := outpack.Encode() for i := 0; i < len(buf); i += n { n, err = a.conn.Write(buf[i:]) if err != nil { diff --git a/worker/inpack.go b/worker/inpack.go index 6eb03d1..00d954b 100644 --- a/worker/inpack.go +++ b/worker/inpack.go @@ -9,23 +9,70 @@ import ( "bytes" "fmt" "encoding/binary" + "strconv" ) // Worker side job -type InPack struct { - DataType uint32 - Data []byte - Handle, UniqueId, Fn string +type inPack struct { + dataType uint32 + data []byte + handle, uniqueId, fn string a *agent } // Create a new job -func getInPack() (resp *InPack) { - return &InPack{} +func getInPack() *inPack { + return &inPack{} } +func (inpack *inPack) Data() []byte { + return inpack.data +} + +// Send some datas to client. +// Using this in a job's executing. +func (inpack *inPack) SendData(data []byte) { + outpack := getOutPack() + outpack.dataType = WORK_DATA + hl := len(inpack.handle) + l := hl + len(data) + 1 + outpack.data = getBuffer(l) + copy(outpack.data, []byte(inpack.handle)) + copy(outpack.data[hl + 1:], data) + inpack.a.write(outpack) +} + +func (inpack *inPack) SendWarning(data []byte) { + outpack := getOutPack() + outpack.dataType = WORK_WARNING + hl := len(inpack.handle) + l := hl + len(data) + 1 + outpack.data = getBuffer(l) + copy(outpack.data, []byte(inpack.handle)) + copy(outpack.data[hl + 1:], data) + inpack.a.write(outpack) +} + +// Update status. +// Tall client how many percent job has been executed. +func (inpack *inPack) UpdateStatus(numerator, denominator int) { + n := []byte(strconv.Itoa(numerator)) + d := []byte(strconv.Itoa(denominator)) + outpack := getOutPack() + outpack.dataType = WORK_STATUS + hl := len(inpack.handle) + nl := len(n) + dl := len(d) + outpack.data = getBuffer(hl + nl + dl + 3) + copy(outpack.data, []byte(inpack.handle)) + copy(outpack.data[hl+1:], n) + copy(outpack.data[hl+nl+2:], d) + inpack.a.write(outpack) +} + + // Decode job from byte slice -func decodeInPack(data []byte) (resp *InPack, l int, err error) { +func decodeInPack(data []byte) (inpack *inPack, l int, err error) { if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes err = fmt.Errorf("Invalid data: %V", data) return @@ -36,26 +83,26 @@ func decodeInPack(data []byte) (resp *InPack, l int, err error) { err = fmt.Errorf("Invalid data: %V", data) return } - resp = getInPack() - resp.DataType = binary.BigEndian.Uint32(data[4:8]) - switch resp.DataType { + inpack = getInPack() + inpack.dataType = binary.BigEndian.Uint32(data[4:8]) + switch inpack.dataType { case JOB_ASSIGN: s := bytes.SplitN(dt, []byte{'\x00'}, 3) if len(s) == 3 { - resp.Handle = string(s[0]) - resp.Fn = string(s[1]) - resp.Data = s[2] + inpack.handle = string(s[0]) + inpack.fn = string(s[1]) + inpack.data = s[2] } case JOB_ASSIGN_UNIQ: s := bytes.SplitN(dt, []byte{'\x00'}, 4) if len(s) == 4 { - resp.Handle = string(s[0]) - resp.Fn = string(s[1]) - resp.UniqueId = string(s[2]) - resp.Data = s[3] + inpack.handle = string(s[0]) + inpack.fn = string(s[1]) + inpack.uniqueId = string(s[2]) + inpack.data = s[3] } default: - resp.Data = dt + inpack.data = dt } l = dl + MIN_PACKET_LEN return diff --git a/worker/job.go b/worker/job.go index af032df..4f9950a 100644 --- a/worker/job.go +++ b/worker/job.go @@ -1,68 +1,8 @@ package worker -import ( - "strconv" -) - type Job interface { Data() []byte SendWarning(data []byte) SendData(data []byte) UpdateStatus(numerator, denominator int) } - -type _job struct { - a *agent - Handle string - data []byte -} - -func getJob() *_job { - return &_job{} -} - -func (j *_job) Data() []byte { - return j.data -} - -// Send some datas to client. -// Using this in a job's executing. -func (j *_job) SendData(data []byte) { - req := getRequest() - req.DataType = WORK_DATA - hl := len(j.Handle) - l := hl + len(data) + 1 - req.Data = getBuffer(l) - copy(req.Data, []byte(j.Handle)) - copy(req.Data[hl + 1:], data) - j.a.write(req) -} - -func (j *_job) SendWarning(data []byte) { - req := getRequest() - req.DataType = WORK_WARNING - hl := len(j.Handle) - l := hl + len(data) + 1 - req.Data = getBuffer(l) - copy(req.Data, []byte(j.Handle)) - copy(req.Data[hl + 1:], data) - j.a.write(req) -} - -// Update status. -// Tall client how many percent job has been executed. -func (j *_job) UpdateStatus(numerator, denominator int) { - n := []byte(strconv.Itoa(numerator)) - d := []byte(strconv.Itoa(denominator)) - req := getRequest() - req.DataType = WORK_STATUS - hl := len(j.Handle) - nl := len(n) - dl := len(d) - req.Data = getBuffer(hl + nl + dl + 3) - copy(req.Data, []byte(j.Handle)) - copy(req.Data[hl+1:], n) - copy(req.Data[hl+nl+2:], d) - j.a.write(req) -} - diff --git a/worker/outpack.go b/worker/outpack.go index 17dce2b..109f888 100644 --- a/worker/outpack.go +++ b/worker/outpack.go @@ -10,41 +10,41 @@ import ( ) // Worker side job -type OutPack struct { - DataType uint32 - Data []byte - Handle, UniqueId, Fn string +type outPack struct { + dataType uint32 + data []byte + handle, uniqueId, fn string } -func getOutPack() (req *OutPack) { +func getOutPack() (outpack *outPack) { // TODO pool - return &OutPack{} + return &outPack{} } // Encode a job to byte slice -func (req *OutPack) Encode() (data []byte) { +func (outpack *outPack) Encode() (data []byte) { var l int - if req.DataType == WORK_FAIL { - l = len(req.Handle) + if outpack.dataType == WORK_FAIL { + l = len(outpack.handle) } else { - l = len(req.Data) - if req.Handle != "" { - l += len(req.Handle) + 1 + l = len(outpack.data) + if outpack.handle != "" { + l += len(outpack.handle) + 1 } } data = getBuffer(l + MIN_PACKET_LEN) binary.BigEndian.PutUint32(data[:4], REQ) - binary.BigEndian.PutUint32(data[4:8], req.DataType) + binary.BigEndian.PutUint32(data[4:8], outpack.dataType) binary.BigEndian.PutUint32(data[8:MIN_PACKET_LEN], uint32(l)) i := MIN_PACKET_LEN - if req.Handle != "" { - hi := len(req.Handle) + i - copy(data[i:hi], []byte(req.Handle)) - if req.DataType != WORK_FAIL { + if outpack.handle != "" { + hi := len(outpack.handle) + i + copy(data[i:hi], []byte(outpack.handle)) + if outpack.dataType != WORK_FAIL { data[hi] = '\x00' } i = i + hi } - copy(data[i:], req.Data) + copy(data[i:], outpack.data) return } diff --git a/worker/worker.go b/worker/worker.go index ed9dfc9..6b935e7 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -39,7 +39,7 @@ func foobar(job *Job) (data []byte, err os.Error) { type Worker struct { agents map[string]*agent funcs JobFuncs - in chan *Response + in chan *inPack running bool limit chan bool @@ -55,7 +55,7 @@ func New(l int) (worker *Worker) { worker = &Worker{ agents: make(map[string]*agent, QUEUE_SIZE), funcs: make(JobFuncs), - in: make(chan *Response, QUEUE_SIZE), + in: make(chan *inPack, QUEUE_SIZE), } if l != Unlimited { worker.limit = make(chan bool, l) @@ -85,9 +85,9 @@ func (worker *Worker) AddServer(net, addr string) (err error) { // Write a job to job server. // Here, the job's mean is not the oraginal mean. // Just looks like a network package for job's result or tell job server, there was a fail. -func (worker *Worker) broadcast(req *request) { +func (worker *Worker) broadcast(outpack *outPack) { for _, v := range worker.agents { - v.write(req) + v.write(outpack) } } @@ -110,19 +110,19 @@ func (worker *Worker) AddFunc(funcname string, // inner add function func (worker *Worker) addFunc(funcname string, timeout uint32) { - req := getRequest() + outpack := getOutPack() if timeout == 0 { - req.DataType = CAN_DO - req.Data = []byte(funcname) + outpack.dataType = CAN_DO + outpack.data = []byte(funcname) } else { - req.DataType = CAN_DO_TIMEOUT + outpack.dataType = CAN_DO_TIMEOUT l := len(funcname) - req.Data = getBuffer(l + 5) - copy(req.Data, []byte(funcname)) - req.Data[l] = '\x00' - binary.BigEndian.PutUint32(req.Data[l + 1:], timeout) + outpack.data = getBuffer(l + 5) + copy(outpack.data, []byte(funcname)) + outpack.data[l] = '\x00' + binary.BigEndian.PutUint32(outpack.data[l + 1:], timeout) } - worker.broadcast(req) + worker.broadcast(outpack) } // Remove a function. @@ -141,27 +141,27 @@ func (worker *Worker) RemoveFunc(funcname string) (err error) { // inner remove function func (worker *Worker) removeFunc(funcname string) { - req := getRequest() - req.DataType = CANT_DO - req.Data = []byte(funcname) - worker.broadcast(req) + outpack := getOutPack() + outpack.dataType = CANT_DO + outpack.data = []byte(funcname) + worker.broadcast(outpack) } -func (worker *Worker) dealResp(resp *Response) { +func (worker *Worker) handleInPack(inpack *inPack) { defer func() { if worker.running && worker.limit != nil { <-worker.limit } }() - switch resp.DataType { + switch inpack.dataType { case ERROR: - worker.err(GetError(resp.Data)) + worker.err(GetError(inpack.data)) case JOB_ASSIGN, JOB_ASSIGN_UNIQ: - if err := worker.exec(resp); err != nil { + if err := worker.exec(inpack); err != nil { worker.err(err) } default: - worker.handleResponse(resp) + worker.customeHandler(inpack) } } @@ -181,22 +181,16 @@ func (worker *Worker) Work() { for funcname, f := range worker.funcs { worker.addFunc(funcname, f.timeout) } - var resp *Response - for resp = range worker.in { - go worker.dealResp(resp) + var inpack *inPack + for inpack = range worker.in { + go worker.handleInPack(inpack) } } // job handler -func (worker *Worker) handleResponse(resp *Response) { +func (worker *Worker) customeHandler(inpack *inPack) { if worker.JobHandler != nil { - job := getJob() - job.a = worker.agents[resp.agentId] - job.Handle = resp.Handle - if resp.DataType == ECHO_RES { - job.data = resp.Data - } - if err := worker.JobHandler(job); err != nil { + if err := worker.JobHandler(inpack); err != nil { worker.err(err) } } @@ -213,32 +207,32 @@ func (worker *Worker) Close() { // Send a something out, get the samething back. func (worker *Worker) Echo(data []byte) { - req := getRequest() - req.DataType = ECHO_REQ - req.Data = data - worker.broadcast(req) + outpack := getOutPack() + outpack.dataType = ECHO_REQ + outpack.data = data + worker.broadcast(outpack) } // Remove all of functions. // Both from the worker or job servers. func (worker *Worker) Reset() { - req := getRequest() - req.DataType = RESET_ABILITIES - worker.broadcast(req) + outpack := getOutPack() + outpack.dataType = RESET_ABILITIES + worker.broadcast(outpack) worker.funcs = make(JobFuncs) } // Set the worker's unique id. func (worker *Worker) SetId(id string) { worker.Id = id - req := getRequest() - req.DataType = SET_CLIENT_ID - req.Data = []byte(id) - worker.broadcast(req) + outpack := getOutPack() + outpack.dataType = SET_CLIENT_ID + outpack.data = []byte(id) + worker.broadcast(outpack) } // Execute the job. And send back the result. -func (worker *Worker) exec(resp *Response) (err error) { +func (worker *Worker) exec(inpack *inPack) (err error) { defer func() { if r := recover(); r != nil { if e, ok := r.(error); ok { @@ -248,34 +242,31 @@ func (worker *Worker) exec(resp *Response) (err error) { } } }() - f, ok := worker.funcs[resp.Fn] + f, ok := worker.funcs[inpack.fn] if !ok { - return fmt.Errorf("The function does not exist: %s", resp.Fn) + return fmt.Errorf("The function does not exist: %s", inpack.fn) } var r *result - job := getJob() - job.a = worker.agents[resp.agentId] - job.Handle = resp.Handle if f.timeout == 0 { - d, e := f.f(job) + d, e := f.f(inpack) r = &result{data: d, err: e} } else { - r = execTimeout(f.f, job, time.Duration(f.timeout)*time.Second) + r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second) } - req := getRequest() + outpack := getOutPack() if r.err == nil { - req.DataType = WORK_COMPLETE + outpack.dataType = WORK_COMPLETE } else { if r.data == nil { - req.DataType = WORK_FAIL + outpack.dataType = WORK_FAIL } else { - req.DataType = WORK_EXCEPTION + outpack.dataType = WORK_EXCEPTION } err = r.err } - req.Data = r.data + outpack.data = r.data if worker.running { - job.a.write(req) + inpack.a.write(outpack) } return }