This commit is contained in:
Xing Xing 2013-12-23 17:05:42 +08:00
parent 2b4cc002d1
commit 2ee2be0891
4 changed files with 20 additions and 21 deletions

View File

@ -11,17 +11,17 @@ import (
// The agent of job server. // The agent of job server.
type agent struct { type agent struct {
conn net.Conn conn net.Conn
worker *Worker worker *Worker
in chan []byte in chan []byte
net, addr string net, addr string
isConn bool isConn bool
} }
// Create the agent of job server. // Create the agent of job server.
func newAgent(net, addr string, worker *Worker) (a *agent, err error) { func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
a = &agent{ a = &agent{
net: net, net: net,
addr: addr, addr: addr,
worker: worker, worker: worker,
in: make(chan []byte, QUEUE_SIZE), in: make(chan []byte, QUEUE_SIZE),

View File

@ -7,17 +7,17 @@ package worker
import ( import (
"bytes" "bytes"
"fmt"
"encoding/binary" "encoding/binary"
"fmt"
"strconv" "strconv"
) )
// Worker side job // Worker side job
type inPack struct { type inPack struct {
dataType uint32 dataType uint32
data []byte data []byte
handle, uniqueId, fn string handle, uniqueId, fn string
a *agent a *agent
} }
// Create a new job // Create a new job
@ -38,7 +38,7 @@ func (inpack *inPack) SendData(data []byte) {
l := hl + len(data) + 1 l := hl + len(data) + 1
outpack.data = getBuffer(l) outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle)) copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl + 1:], data) copy(outpack.data[hl+1:], data)
inpack.a.write(outpack) inpack.a.write(outpack)
} }
@ -49,7 +49,7 @@ func (inpack *inPack) SendWarning(data []byte) {
l := hl + len(data) + 1 l := hl + len(data) + 1
outpack.data = getBuffer(l) outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle)) copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl + 1:], data) copy(outpack.data[hl+1:], data)
inpack.a.write(outpack) inpack.a.write(outpack)
} }
@ -70,7 +70,6 @@ func (inpack *inPack) UpdateStatus(numerator, denominator int) {
inpack.a.write(outpack) inpack.a.write(outpack)
} }
// Decode job from byte slice // Decode job from byte slice
func decodeInPack(data []byte) (inpack *inPack, l int, err error) { func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes

View File

@ -5,10 +5,10 @@
package worker package worker
import ( import (
"fmt"
"time"
"sync"
"encoding/binary" "encoding/binary"
"fmt"
"sync"
"time"
) )
const ( const (
@ -46,8 +46,8 @@ type Worker struct {
Id string Id string
// assign a ErrFunc to handle errors // assign a ErrFunc to handle errors
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
JobHandler JobHandler JobHandler JobHandler
mutex sync.Mutex mutex sync.Mutex
} }
// Get a new worker // Get a new worker
@ -78,7 +78,7 @@ func (worker *Worker) AddServer(net, addr string) (err error) {
if err != nil { if err != nil {
return err return err
} }
worker.agents[net + addr] = a worker.agents[net+addr] = a
return return
} }
@ -120,7 +120,7 @@ func (worker *Worker) addFunc(funcname string, timeout uint32) {
outpack.data = getBuffer(l + 5) outpack.data = getBuffer(l + 5)
copy(outpack.data, []byte(funcname)) copy(outpack.data, []byte(funcname))
outpack.data[l] = '\x00' outpack.data[l] = '\x00'
binary.BigEndian.PutUint32(outpack.data[l + 1:], timeout) binary.BigEndian.PutUint32(outpack.data[l+1:], timeout)
} }
worker.broadcast(outpack) worker.broadcast(outpack)
} }

View File

@ -1,8 +1,8 @@
package worker package worker
import ( import (
"sync" "sync"
"testing" "testing"
) )
var worker *Worker var worker *Worker