started refactoring worker
This commit is contained in:
		
							parent
							
								
									124e686699
								
							
						
					
					
						commit
						e5179e3b5b
					
				@ -46,7 +46,6 @@ func New(net, addr string) (client *Client, err error) {
 | 
			
		||||
	if err = client.connect(); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	client.isConn = true
 | 
			
		||||
	go client.readLoop()
 | 
			
		||||
	go client.processLoop()
 | 
			
		||||
	return
 | 
			
		||||
@ -57,6 +56,10 @@ func New(net, addr string) (client *Client, err error) {
 | 
			
		||||
//
 | 
			
		||||
func (client *Client) connect() (err error) {
 | 
			
		||||
	client.conn, err = net.Dial(client.net, client.addr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	client.isConn = true
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -28,8 +28,6 @@ var (
 | 
			
		||||
	ErrConnClosed    = errors.New("Connection closed")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func DisablePanic() { recover() }
 | 
			
		||||
 | 
			
		||||
// Extract the error message
 | 
			
		||||
func GetError(data []byte) (err error) {
 | 
			
		||||
	rel := bytes.SplitN(data, []byte{'\x00'}, 2)
 | 
			
		||||
 | 
			
		||||
@ -106,14 +106,6 @@ func decodeResponse(data []byte) (resp *Response, l int, err error) {
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (resp *Response) IsEcho() bool {
 | 
			
		||||
	return resp.DataType == ECHO_RES
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (resp *Response) IsStatus() bool {
 | 
			
		||||
	return resp.DataType == STATUS_RES
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// status handler
 | 
			
		||||
func (resp *Response) Status() (status *Status, err error) {
 | 
			
		||||
	data := bytes.SplitN(resp.Data, []byte{'\x00'}, 4)
 | 
			
		||||
 | 
			
		||||
@ -11,6 +11,5 @@ package gearman
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	_ "github.com/mikespook/gearman-go/client"
 | 
			
		||||
	_ "github.com/mikespook/gearman-go/common"
 | 
			
		||||
	_ "github.com/mikespook/gearman-go/worker"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@ -23,19 +23,20 @@ const (
 | 
			
		||||
	GEARMAND = "127.0.0.1:4730"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func ToUpper(job *worker.Job) ([]byte, error) {
 | 
			
		||||
	data := []byte(strings.ToUpper(string(job.Data)))
 | 
			
		||||
func ToUpper(job worker.Job) ([]byte, error) {
 | 
			
		||||
	data := job.Data()
 | 
			
		||||
	data = []byte(strings.ToUpper(string(data)))
 | 
			
		||||
	return data, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Sleep(job *worker.Job) ([]byte, error) {
 | 
			
		||||
func Sleep(job worker.Job) ([]byte, error) {
 | 
			
		||||
	time.Sleep(time.Second * 5)
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestJobs(t *testing.T) {
 | 
			
		||||
	w := worker.New(worker.Unlimited)
 | 
			
		||||
	if err := w.AddServer(GEARMAND); err != nil {
 | 
			
		||||
	if err := w.AddServer("tcp4", GEARMAND); err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
@ -50,7 +51,7 @@ func TestJobs(t *testing.T) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	t.Log("Functions added...")
 | 
			
		||||
	w.ErrHandler = func(e error) {
 | 
			
		||||
	w.ErrorHandler = func(e error) {
 | 
			
		||||
		t.Error(e)
 | 
			
		||||
	}
 | 
			
		||||
	go w.Work()
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										196
									
								
								worker/agent.go
									
									
									
									
									
								
							
							
						
						
									
										196
									
								
								worker/agent.go
									
									
									
									
									
								
							@ -14,184 +14,110 @@ type agent struct {
 | 
			
		||||
	conn   net.Conn
 | 
			
		||||
	worker *Worker
 | 
			
		||||
	in     chan []byte
 | 
			
		||||
	out    chan *Job
 | 
			
		||||
	addr   string
 | 
			
		||||
	net, addr   string
 | 
			
		||||
	isConn bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Create the agent of job server.
 | 
			
		||||
func newAgent(addr string, worker *Worker) (a *agent, err error) {
 | 
			
		||||
	conn, err := net.Dial(common.NETWORK, addr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
 | 
			
		||||
	a = &agent{
 | 
			
		||||
		conn:   conn,
 | 
			
		||||
		worker: worker,
 | 
			
		||||
		net: net,
 | 
			
		||||
		addr:   addr,
 | 
			
		||||
		in:     make(chan []byte, common.QUEUE_SIZE),
 | 
			
		||||
		out:    make(chan *Job, common.QUEUE_SIZE),
 | 
			
		||||
		worker: worker,
 | 
			
		||||
		in:     make(chan []byte, QUEUE_SIZE),
 | 
			
		||||
	}
 | 
			
		||||
	// reset abilities
 | 
			
		||||
	a.WriteJob(newJob(common.REQ, common.RESET_ABILITIES, nil))
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// outputing loop
 | 
			
		||||
func (a *agent) outLoop() {
 | 
			
		||||
	ok := true
 | 
			
		||||
	var job *Job
 | 
			
		||||
	for a.worker.running && ok {
 | 
			
		||||
		if job, ok = <-a.out; ok {
 | 
			
		||||
			if err := a.write(job.Encode()); err != nil {
 | 
			
		||||
				a.worker.err(err)
 | 
			
		||||
			}
 | 
			
		||||
func (a *agent) Connect() (err error) {
 | 
			
		||||
	a.conn, err = net.Dial(a.net, a.addr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	a.isConn = true
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *agent) Work() {
 | 
			
		||||
	go a.readLoop()
 | 
			
		||||
 | 
			
		||||
	var resp *Response
 | 
			
		||||
	var l int
 | 
			
		||||
	var err error
 | 
			
		||||
	var data, leftdata []byte
 | 
			
		||||
	for data = range a.in {
 | 
			
		||||
		if len(leftdata) > 0 { // some data left for processing
 | 
			
		||||
			data = append(leftdata, data...)
 | 
			
		||||
		}
 | 
			
		||||
		l = len(data)
 | 
			
		||||
		if l < MIN_PACKET_LEN { // not enough data
 | 
			
		||||
			leftdata = data
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if resp, l, err = decodeResponse(data); err != nil {
 | 
			
		||||
			a.worker.err(err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		leftdata = nil
 | 
			
		||||
		resp.agentId = a.net + a.addr
 | 
			
		||||
		a.worker.in <- resp
 | 
			
		||||
		if len(data) > l {
 | 
			
		||||
			leftdata = data[l:]
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// inputing loop
 | 
			
		||||
func (a *agent) inLoop() {
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if r := recover(); r != nil {
 | 
			
		||||
			a.worker.err(common.Errorf("Exiting: %s", r))
 | 
			
		||||
		}
 | 
			
		||||
		close(a.in)
 | 
			
		||||
		close(a.out)
 | 
			
		||||
		a.worker.removeAgent(a)
 | 
			
		||||
	}()
 | 
			
		||||
	for a.worker.running {
 | 
			
		||||
		a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil))
 | 
			
		||||
	RESTART:
 | 
			
		||||
		// got noop msg and in queue is zero, grab job
 | 
			
		||||
		rel, err := a.read()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if err == common.ErrConnection {
 | 
			
		||||
				for i := 0; i < 3 && a.worker.running; i++ {
 | 
			
		||||
					if conn, err := net.Dial(common.NETWORK, a.addr); err != nil {
 | 
			
		||||
						a.worker.err(common.Errorf("Reconnection: %d faild", i))
 | 
			
		||||
						continue
 | 
			
		||||
					} else {
 | 
			
		||||
						a.conn = conn
 | 
			
		||||
						goto RESTART
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				a.worker.err(err)
 | 
			
		||||
// read data from socket
 | 
			
		||||
func (a *agent) readLoop() {
 | 
			
		||||
	var data []byte
 | 
			
		||||
	var err error
 | 
			
		||||
	for a.isConn {
 | 
			
		||||
		if data, err = a.read(BUFFER_SIZE); err != nil {
 | 
			
		||||
			if err == ErrConnClosed {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			a.worker.err(err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		job, err := decodeJob(rel)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			a.worker.err(err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		switch job.DataType {
 | 
			
		||||
		case common.NOOP:
 | 
			
		||||
			a.WriteJob(newJob(common.REQ, common.GRAB_JOB_UNIQ, nil))
 | 
			
		||||
		case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
 | 
			
		||||
			if a.worker.running {
 | 
			
		||||
				if a.worker.limit != nil {
 | 
			
		||||
					a.worker.limit <- true
 | 
			
		||||
				}
 | 
			
		||||
				job.agent = a
 | 
			
		||||
				a.worker.in <- job
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		a.in <- data
 | 
			
		||||
	}
 | 
			
		||||
	close(a.in)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *agent) Close() {
 | 
			
		||||
	a.conn.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *agent) Work() {
 | 
			
		||||
	go a.outLoop()
 | 
			
		||||
	go a.inLoop()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *agent) readData(length int) (data []byte, err error) {
 | 
			
		||||
// read length bytes from the socket
 | 
			
		||||
func (a *agent) read(length int) (data []byte, err error) {
 | 
			
		||||
	n := 0
 | 
			
		||||
	buf := make([]byte, common.BUFFER_SIZE)
 | 
			
		||||
	buf := getBuffer(BUFFER_SIZE)
 | 
			
		||||
	// read until data can be unpacked
 | 
			
		||||
	for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n {
 | 
			
		||||
	for i := length; i > 0 || len(data) < MIN_PACKET_LEN; i -= n {
 | 
			
		||||
		if n, err = a.conn.Read(buf); err != nil {
 | 
			
		||||
			if !a.isConn {
 | 
			
		||||
				err = ErrConnClosed
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if err == io.EOF && n == 0 {
 | 
			
		||||
				if data == nil {
 | 
			
		||||
					err = common.ErrConnection
 | 
			
		||||
					return
 | 
			
		||||
					err = ErrConnection
 | 
			
		||||
				}
 | 
			
		||||
				return data, nil
 | 
			
		||||
			}
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		data = append(data, buf[0:n]...)
 | 
			
		||||
		if n < common.BUFFER_SIZE {
 | 
			
		||||
		if n < BUFFER_SIZE {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *agent) unpack(data []byte) ([]byte, int, bool) {
 | 
			
		||||
	tl := len(data)
 | 
			
		||||
	start := 0
 | 
			
		||||
	for i := 0; i < tl+1-common.PACKET_LEN; i++ {
 | 
			
		||||
		if start+common.PACKET_LEN > tl { // too few data to unpack, read more
 | 
			
		||||
			return nil, common.PACKET_LEN, false
 | 
			
		||||
		}
 | 
			
		||||
		if string(data[start:start+4]) == common.RES_STR {
 | 
			
		||||
			l := int(common.BytesToUint32([4]byte{data[start+8],
 | 
			
		||||
				data[start+9], data[start+10], data[start+11]}))
 | 
			
		||||
			total := l + common.PACKET_LEN
 | 
			
		||||
			if total == tl { // data is what we want
 | 
			
		||||
				return data, common.PACKET_LEN, true
 | 
			
		||||
			} else if total < tl { // data[:total] is what we want, data[total:] is the more
 | 
			
		||||
				a.in <- data[total:]
 | 
			
		||||
				data = data[start:total]
 | 
			
		||||
				return data, common.PACKET_LEN, true
 | 
			
		||||
			} else { // ops! It won't be possible.
 | 
			
		||||
				return nil, total - tl, false
 | 
			
		||||
			}
 | 
			
		||||
		} else { // flag was not found, move to next step
 | 
			
		||||
			start++
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil, common.PACKET_LEN, false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a *agent) read() (rel []byte, err error) {
 | 
			
		||||
	var data []byte
 | 
			
		||||
	ok := false
 | 
			
		||||
	l := common.PACKET_LEN
 | 
			
		||||
	for !ok {
 | 
			
		||||
		inlen := len(a.in)
 | 
			
		||||
		if inlen > 0 {
 | 
			
		||||
			// in queue is not empty
 | 
			
		||||
			for i := 0; i < inlen; i++ {
 | 
			
		||||
				data = append(data, <-a.in...)
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			var d []byte
 | 
			
		||||
			d, err = a.readData(l)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			data = append(data, d...)
 | 
			
		||||
		}
 | 
			
		||||
		rel, l, ok = a.unpack(data)
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Send a job to the job server.
 | 
			
		||||
func (a *agent) WriteJob(job *Job) {
 | 
			
		||||
	a.out <- job
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Internal write the encoded job.
 | 
			
		||||
func (a *agent) write(buf []byte) (err error) {
 | 
			
		||||
func (a *agent) write(req *request) (err error) {
 | 
			
		||||
	var n int
 | 
			
		||||
	buf := req.Encode()
 | 
			
		||||
	for i := 0; i < len(buf); i += n {
 | 
			
		||||
		n, err = a.conn.Write(buf[i:])
 | 
			
		||||
		if err != nil {
 | 
			
		||||
 | 
			
		||||
@ -54,3 +54,9 @@ const (
 | 
			
		||||
	SUBMIT_JOB_LOW     = 33
 | 
			
		||||
	SUBMIT_JOB_LOW_BG  = 34
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func getBuffer(l int) (buf []byte) {
 | 
			
		||||
	// TODO add byte buffer pool
 | 
			
		||||
	buf = make([]byte, l)
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -28,8 +28,6 @@ var (
 | 
			
		||||
	ErrConnClosed    = errors.New("Connection closed")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func DisablePanic() { recover() }
 | 
			
		||||
 | 
			
		||||
// Extract the error message
 | 
			
		||||
func GetError(data []byte) (err error) {
 | 
			
		||||
	rel := bytes.SplitN(data, []byte{'\x00'}, 2)
 | 
			
		||||
 | 
			
		||||
@ -5,13 +5,27 @@ import (
 | 
			
		||||
	"runtime"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Job handler
 | 
			
		||||
type JobHandler func(Job) error
 | 
			
		||||
 | 
			
		||||
type JobFunc func(Job) ([]byte, error)
 | 
			
		||||
 | 
			
		||||
// The definition of the callback function.
 | 
			
		||||
type jobFunc struct {
 | 
			
		||||
	f       JobFunc
 | 
			
		||||
	timeout uint32
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Map for added function.
 | 
			
		||||
type JobFuncs map[string]*jobFunc
 | 
			
		||||
 | 
			
		||||
type systemInfo struct {
 | 
			
		||||
	GOOS, GOARCH, GOROOT, Version string
 | 
			
		||||
	NumCPU, NumGoroutine          int
 | 
			
		||||
	NumCgoCall                    int64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func SysInfo(job *Job) ([]byte, error) {
 | 
			
		||||
func SysInfo(job Job) ([]byte, error) {
 | 
			
		||||
	return json.Marshal(&systemInfo{
 | 
			
		||||
		GOOS:         runtime.GOOS,
 | 
			
		||||
		GOARCH:       runtime.GOARCH,
 | 
			
		||||
@ -25,7 +39,7 @@ func SysInfo(job *Job) ([]byte, error) {
 | 
			
		||||
 | 
			
		||||
var memState runtime.MemStats
 | 
			
		||||
 | 
			
		||||
func MemInfo(job *Job) ([]byte, error) {
 | 
			
		||||
func MemInfo(job Job) ([]byte, error) {
 | 
			
		||||
	runtime.ReadMemStats(&memState)
 | 
			
		||||
	return json.Marshal(&memState)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										68
									
								
								worker/job.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										68
									
								
								worker/job.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,68 @@
 | 
			
		||||
package worker
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"strconv"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Job interface {
 | 
			
		||||
	Data() []byte
 | 
			
		||||
	SendWarning(data []byte)
 | 
			
		||||
	SendData(data []byte)
 | 
			
		||||
	UpdateStatus(numerator, denominator int)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type _job struct {
 | 
			
		||||
	a *agent
 | 
			
		||||
	Handle string
 | 
			
		||||
	data []byte
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getJob() *_job {
 | 
			
		||||
	return &_job{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (j *_job) Data() []byte {
 | 
			
		||||
	return j.data
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Send some datas to client.
 | 
			
		||||
// Using this in a job's executing.
 | 
			
		||||
func (j *_job) SendData(data []byte) {
 | 
			
		||||
	req := getRequest()
 | 
			
		||||
	req.DataType = WORK_DATA
 | 
			
		||||
	hl := len(j.Handle)
 | 
			
		||||
	l := hl + len(data) + 1
 | 
			
		||||
	req.Data = getBuffer(l)
 | 
			
		||||
	copy(req.Data, []byte(j.Handle))
 | 
			
		||||
	copy(req.Data[hl + 1:], data)
 | 
			
		||||
	j.a.write(req)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (j *_job) SendWarning(data []byte) {
 | 
			
		||||
	req := getRequest()
 | 
			
		||||
	req.DataType = WORK_WARNING
 | 
			
		||||
	hl := len(j.Handle)
 | 
			
		||||
	l := hl + len(data) + 1
 | 
			
		||||
	req.Data = getBuffer(l)
 | 
			
		||||
	copy(req.Data, []byte(j.Handle))
 | 
			
		||||
	copy(req.Data[hl + 1:], data)
 | 
			
		||||
	j.a.write(req)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Update status.
 | 
			
		||||
// Tall client how many percent job has been executed.
 | 
			
		||||
func (j *_job) UpdateStatus(numerator, denominator int) {
 | 
			
		||||
	n := []byte(strconv.Itoa(numerator))
 | 
			
		||||
	d := []byte(strconv.Itoa(denominator))
 | 
			
		||||
	req := getRequest()
 | 
			
		||||
	req.DataType = WORK_STATUS
 | 
			
		||||
	hl := len(j.Handle)
 | 
			
		||||
	nl := len(n)
 | 
			
		||||
	dl := len(d)
 | 
			
		||||
	req.Data = getBuffer(hl + nl + dl + 3)
 | 
			
		||||
	copy(req.Data, []byte(j.Handle))
 | 
			
		||||
	copy(req.Data[hl+1:], n)
 | 
			
		||||
	copy(req.Data[hl+nl+2:], d)
 | 
			
		||||
	j.a.write(req)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -6,128 +6,45 @@
 | 
			
		||||
package worker
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"encoding/binary"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Worker side job
 | 
			
		||||
type Job struct {
 | 
			
		||||
type request struct {
 | 
			
		||||
	DataType             uint32
 | 
			
		||||
	Data                 []byte
 | 
			
		||||
	Handle, UniqueId, Fn string
 | 
			
		||||
	agent                *agent
 | 
			
		||||
	magicCode, DataType  uint32
 | 
			
		||||
	c                    chan bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Create a new job
 | 
			
		||||
func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
 | 
			
		||||
	return &Job{magicCode: magiccode,
 | 
			
		||||
		DataType: datatype,
 | 
			
		||||
		Data:     data,
 | 
			
		||||
		c:        make(chan bool)}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Decode job from byte slice
 | 
			
		||||
func decodeJob(data []byte) (job *Job, err error) {
 | 
			
		||||
	if len(data) < 12 {
 | 
			
		||||
		return nil, common.Errorf("Invalid data: %V", data)
 | 
			
		||||
	}
 | 
			
		||||
	datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
 | 
			
		||||
	l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
 | 
			
		||||
	if len(data[12:]) != int(l) {
 | 
			
		||||
		return nil, common.Errorf("Invalid data: %V", data)
 | 
			
		||||
	}
 | 
			
		||||
	data = data[12:]
 | 
			
		||||
	job = &Job{magicCode: common.RES, DataType: datatype, c: make(chan bool)}
 | 
			
		||||
	switch datatype {
 | 
			
		||||
	case common.JOB_ASSIGN:
 | 
			
		||||
		s := bytes.SplitN(data, []byte{'\x00'}, 3)
 | 
			
		||||
		if len(s) == 3 {
 | 
			
		||||
			job.Handle = string(s[0])
 | 
			
		||||
			job.Fn = string(s[1])
 | 
			
		||||
			data = s[2]
 | 
			
		||||
		}
 | 
			
		||||
	case common.JOB_ASSIGN_UNIQ:
 | 
			
		||||
		s := bytes.SplitN(data, []byte{'\x00'}, 4)
 | 
			
		||||
		if len(s) == 4 {
 | 
			
		||||
			job.Handle = string(s[0])
 | 
			
		||||
			job.Fn = string(s[1])
 | 
			
		||||
			job.UniqueId = string(s[2])
 | 
			
		||||
			data = s[3]
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	job.Data = data
 | 
			
		||||
	return
 | 
			
		||||
func getRequest() (req *request) {
 | 
			
		||||
	// TODO pool
 | 
			
		||||
	return &request{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Encode a job to byte slice
 | 
			
		||||
func (job *Job) Encode() (data []byte) {
 | 
			
		||||
func (req *request) Encode() (data []byte) {
 | 
			
		||||
	var l int
 | 
			
		||||
	if job.DataType == common.WORK_FAIL {
 | 
			
		||||
		l = len(job.Handle)
 | 
			
		||||
	if req.DataType == WORK_FAIL {
 | 
			
		||||
		l = len(req.Handle)
 | 
			
		||||
	} else {
 | 
			
		||||
		l = len(job.Data)
 | 
			
		||||
		if job.Handle != "" {
 | 
			
		||||
			l += len(job.Handle) + 1
 | 
			
		||||
		l = len(req.Data)
 | 
			
		||||
		if req.Handle != "" {
 | 
			
		||||
			l += len(req.Handle) + 1
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	data = make([]byte, 0, l+12)
 | 
			
		||||
 | 
			
		||||
	magiccode := common.Uint32ToBytes(job.magicCode)
 | 
			
		||||
	datatype := common.Uint32ToBytes(job.DataType)
 | 
			
		||||
	datalength := common.Uint32ToBytes(uint32(l))
 | 
			
		||||
 | 
			
		||||
	data = append(data, magiccode[:]...)
 | 
			
		||||
	data = append(data, datatype[:]...)
 | 
			
		||||
	data = append(data, datalength[:]...)
 | 
			
		||||
	if job.Handle != "" {
 | 
			
		||||
		data = append(data, []byte(job.Handle)...)
 | 
			
		||||
		if job.DataType != common.WORK_FAIL {
 | 
			
		||||
			data = append(data, 0)
 | 
			
		||||
	data = getBuffer(l + MIN_PACKET_LEN)
 | 
			
		||||
	binary.BigEndian.PutUint32(data[:4], REQ)
 | 
			
		||||
	binary.BigEndian.PutUint32(data[4:8], req.DataType)
 | 
			
		||||
	binary.BigEndian.PutUint32(data[8:MIN_PACKET_LEN], uint32(l))
 | 
			
		||||
	i := MIN_PACKET_LEN
 | 
			
		||||
	if req.Handle != "" {
 | 
			
		||||
		hi := len(req.Handle) + i
 | 
			
		||||
		copy(data[i:hi], []byte(req.Handle))
 | 
			
		||||
		if req.DataType != WORK_FAIL {
 | 
			
		||||
			data[hi] = '\x00'
 | 
			
		||||
		}
 | 
			
		||||
		i = i + hi
 | 
			
		||||
	}
 | 
			
		||||
	data = append(data, job.Data...)
 | 
			
		||||
	copy(data[i:], req.Data)
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Send some datas to client.
 | 
			
		||||
// Using this in a job's executing.
 | 
			
		||||
func (job *Job) UpdateData(data []byte, iswarning bool) {
 | 
			
		||||
	result := append([]byte(job.Handle), 0)
 | 
			
		||||
	result = append(result, data...)
 | 
			
		||||
	var datatype uint32
 | 
			
		||||
	if iswarning {
 | 
			
		||||
		datatype = common.WORK_WARNING
 | 
			
		||||
	} else {
 | 
			
		||||
		datatype = common.WORK_DATA
 | 
			
		||||
	}
 | 
			
		||||
	job.agent.WriteJob(newJob(common.REQ, datatype, result))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Update status.
 | 
			
		||||
// Tall client how many percent job has been executed.
 | 
			
		||||
func (job *Job) UpdateStatus(numerator, denominator int) {
 | 
			
		||||
	n := []byte(strconv.Itoa(numerator))
 | 
			
		||||
	d := []byte(strconv.Itoa(denominator))
 | 
			
		||||
	result := append([]byte(job.Handle), '\x00')
 | 
			
		||||
	result = append(result, n...)
 | 
			
		||||
	result = append(result, '\x00')
 | 
			
		||||
	result = append(result, d...)
 | 
			
		||||
	job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// close the job
 | 
			
		||||
func (job *Job) Close() {
 | 
			
		||||
	close(job.c)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// cancel the job executing
 | 
			
		||||
func (job *Job) cancel() {
 | 
			
		||||
	defer func() { recover() }()
 | 
			
		||||
	job.c <- true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// When a job was canceled, return a true form a channel
 | 
			
		||||
func (job *Job) Canceled() <-chan bool {
 | 
			
		||||
	return job.c
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -7,127 +7,56 @@ package worker
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"encoding/binary"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Worker side job
 | 
			
		||||
type Job struct {
 | 
			
		||||
type Response struct {
 | 
			
		||||
	DataType  uint32
 | 
			
		||||
	Data                 []byte
 | 
			
		||||
	Handle, UniqueId, Fn string
 | 
			
		||||
	agent                *agent
 | 
			
		||||
	magicCode, DataType  uint32
 | 
			
		||||
	c                    chan bool
 | 
			
		||||
	agentId	string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Create a new job
 | 
			
		||||
func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
 | 
			
		||||
	return &Job{magicCode: magiccode,
 | 
			
		||||
		DataType: datatype,
 | 
			
		||||
		Data:     data,
 | 
			
		||||
		c:        make(chan bool)}
 | 
			
		||||
func getResponse() (resp *Response) {
 | 
			
		||||
	return &Response{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Decode job from byte slice
 | 
			
		||||
func decodeJob(data []byte) (job *Job, err error) {
 | 
			
		||||
	if len(data) < 12 {
 | 
			
		||||
		return nil, common.Errorf("Invalid data: %V", data)
 | 
			
		||||
func decodeResponse(data []byte) (resp *Response, l int, err error) {
 | 
			
		||||
	if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes
 | 
			
		||||
		err = fmt.Errorf("Invalid data: %V", data)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
 | 
			
		||||
	l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
 | 
			
		||||
	if len(data[12:]) != int(l) {
 | 
			
		||||
		return nil, common.Errorf("Invalid data: %V", data)
 | 
			
		||||
	dl := int(binary.BigEndian.Uint32(data[8:12]))
 | 
			
		||||
	dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN]
 | 
			
		||||
	if len(dt) != int(dl) { // length not equal
 | 
			
		||||
		err = fmt.Errorf("Invalid data: %V", data)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	data = data[12:]
 | 
			
		||||
	job = &Job{magicCode: common.RES, DataType: datatype, c: make(chan bool)}
 | 
			
		||||
	switch datatype {
 | 
			
		||||
	case common.JOB_ASSIGN:
 | 
			
		||||
		s := bytes.SplitN(data, []byte{'\x00'}, 3)
 | 
			
		||||
	resp = getResponse()
 | 
			
		||||
	resp.DataType = binary.BigEndian.Uint32(data[4:8])
 | 
			
		||||
	switch resp.DataType {
 | 
			
		||||
	case JOB_ASSIGN:
 | 
			
		||||
		s := bytes.SplitN(dt, []byte{'\x00'}, 3)
 | 
			
		||||
		if len(s) == 3 {
 | 
			
		||||
			job.Handle = string(s[0])
 | 
			
		||||
			job.Fn = string(s[1])
 | 
			
		||||
			data = s[2]
 | 
			
		||||
			resp.Handle = string(s[0])
 | 
			
		||||
			resp.Fn = string(s[1])
 | 
			
		||||
			resp.Data = s[2]
 | 
			
		||||
		}
 | 
			
		||||
	case common.JOB_ASSIGN_UNIQ:
 | 
			
		||||
		s := bytes.SplitN(data, []byte{'\x00'}, 4)
 | 
			
		||||
	case JOB_ASSIGN_UNIQ:
 | 
			
		||||
		s := bytes.SplitN(dt, []byte{'\x00'}, 4)
 | 
			
		||||
		if len(s) == 4 {
 | 
			
		||||
			job.Handle = string(s[0])
 | 
			
		||||
			job.Fn = string(s[1])
 | 
			
		||||
			job.UniqueId = string(s[2])
 | 
			
		||||
			data = s[3]
 | 
			
		||||
			resp.Handle = string(s[0])
 | 
			
		||||
			resp.Fn = string(s[1])
 | 
			
		||||
			resp.UniqueId = string(s[2])
 | 
			
		||||
			resp.Data = s[3]
 | 
			
		||||
		}
 | 
			
		||||
	default:
 | 
			
		||||
		resp.Data = dt
 | 
			
		||||
	}
 | 
			
		||||
	job.Data = data
 | 
			
		||||
	l = dl + MIN_PACKET_LEN
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Encode a job to byte slice
 | 
			
		||||
func (job *Job) Encode() (data []byte) {
 | 
			
		||||
	var l int
 | 
			
		||||
	if job.DataType == common.WORK_FAIL {
 | 
			
		||||
		l = len(job.Handle)
 | 
			
		||||
	} else {
 | 
			
		||||
		l = len(job.Data)
 | 
			
		||||
		if job.Handle != "" {
 | 
			
		||||
			l += len(job.Handle) + 1
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	data = make([]byte, 0, l+12)
 | 
			
		||||
 | 
			
		||||
	magiccode := common.Uint32ToBytes(job.magicCode)
 | 
			
		||||
	datatype := common.Uint32ToBytes(job.DataType)
 | 
			
		||||
	datalength := common.Uint32ToBytes(uint32(l))
 | 
			
		||||
 | 
			
		||||
	data = append(data, magiccode[:]...)
 | 
			
		||||
	data = append(data, datatype[:]...)
 | 
			
		||||
	data = append(data, datalength[:]...)
 | 
			
		||||
	if job.Handle != "" {
 | 
			
		||||
		data = append(data, []byte(job.Handle)...)
 | 
			
		||||
		if job.DataType != common.WORK_FAIL {
 | 
			
		||||
			data = append(data, 0)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	data = append(data, job.Data...)
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Send some datas to client.
 | 
			
		||||
// Using this in a job's executing.
 | 
			
		||||
func (job *Job) UpdateData(data []byte, iswarning bool) {
 | 
			
		||||
	result := append([]byte(job.Handle), 0)
 | 
			
		||||
	result = append(result, data...)
 | 
			
		||||
	var datatype uint32
 | 
			
		||||
	if iswarning {
 | 
			
		||||
		datatype = common.WORK_WARNING
 | 
			
		||||
	} else {
 | 
			
		||||
		datatype = common.WORK_DATA
 | 
			
		||||
	}
 | 
			
		||||
	job.agent.WriteJob(newJob(common.REQ, datatype, result))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Update status.
 | 
			
		||||
// Tall client how many percent job has been executed.
 | 
			
		||||
func (job *Job) UpdateStatus(numerator, denominator int) {
 | 
			
		||||
	n := []byte(strconv.Itoa(numerator))
 | 
			
		||||
	d := []byte(strconv.Itoa(denominator))
 | 
			
		||||
	result := append([]byte(job.Handle), '\x00')
 | 
			
		||||
	result = append(result, n...)
 | 
			
		||||
	result = append(result, '\x00')
 | 
			
		||||
	result = append(result, d...)
 | 
			
		||||
	job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// close the job
 | 
			
		||||
func (job *Job) Close() {
 | 
			
		||||
	close(job.c)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// cancel the job executing
 | 
			
		||||
func (job *Job) cancel() {
 | 
			
		||||
	defer func() { recover() }()
 | 
			
		||||
	job.c <- true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// When a job was canceled, return a true form a channel
 | 
			
		||||
func (job *Job) Canceled() <-chan bool {
 | 
			
		||||
	return job.c
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										178
									
								
								worker/worker.go
									
									
									
									
									
								
							
							
						
						
									
										178
									
								
								worker/worker.go
									
									
									
									
									
								
							@ -5,7 +5,10 @@
 | 
			
		||||
package worker
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"encoding/binary"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
@ -15,24 +18,6 @@ const (
 | 
			
		||||
	Immediately = 0
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	ErrConnection = common.ErrConnection
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Job handler
 | 
			
		||||
type JobHandler func(*Job) error
 | 
			
		||||
 | 
			
		||||
type JobFunc func(*Job) ([]byte, error)
 | 
			
		||||
 | 
			
		||||
// The definition of the callback function.
 | 
			
		||||
type jobFunc struct {
 | 
			
		||||
	f       JobFunc
 | 
			
		||||
	timeout uint32
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Map for added function.
 | 
			
		||||
type JobFuncs map[string]*jobFunc
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
Worker side api for gearman
 | 
			
		||||
 | 
			
		||||
@ -52,24 +37,25 @@ func foobar(job *Job) (data []byte, err os.Error) {
 | 
			
		||||
}
 | 
			
		||||
*/
 | 
			
		||||
type Worker struct {
 | 
			
		||||
	agents  []*agent
 | 
			
		||||
	agents  map[string]*agent
 | 
			
		||||
	funcs   JobFuncs
 | 
			
		||||
	in      chan *Job
 | 
			
		||||
	in      chan *Response
 | 
			
		||||
	running bool
 | 
			
		||||
	limit   chan bool
 | 
			
		||||
 | 
			
		||||
	Id string
 | 
			
		||||
	// assign a ErrFunc to handle errors
 | 
			
		||||
	ErrHandler common.ErrorHandler
 | 
			
		||||
	ErrorHandler ErrorHandler
 | 
			
		||||
	JobHandler JobHandler
 | 
			
		||||
	mutex sync.Mutex
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Get a new worker
 | 
			
		||||
func New(l int) (worker *Worker) {
 | 
			
		||||
	worker = &Worker{
 | 
			
		||||
		agents: make([]*agent, 0),
 | 
			
		||||
		agents: make(map[string]*agent, QUEUE_SIZE),
 | 
			
		||||
		funcs:  make(JobFuncs),
 | 
			
		||||
		in:     make(chan *Job, common.QUEUE_SIZE),
 | 
			
		||||
		in:     make(chan *Response, QUEUE_SIZE),
 | 
			
		||||
	}
 | 
			
		||||
	if l != Unlimited {
 | 
			
		||||
		worker.limit = make(chan bool, l)
 | 
			
		||||
@ -79,29 +65,29 @@ func New(l int) (worker *Worker) {
 | 
			
		||||
 | 
			
		||||
//
 | 
			
		||||
func (worker *Worker) err(e error) {
 | 
			
		||||
	if worker.ErrHandler != nil {
 | 
			
		||||
		worker.ErrHandler(e)
 | 
			
		||||
	if worker.ErrorHandler != nil {
 | 
			
		||||
		worker.ErrorHandler(e)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Add a server. The addr should be 'host:port' format.
 | 
			
		||||
// The connection is established at this time.
 | 
			
		||||
func (worker *Worker) AddServer(addr string) (err error) {
 | 
			
		||||
func (worker *Worker) AddServer(net, addr string) (err error) {
 | 
			
		||||
	// Create a new job server's client as a agent of server
 | 
			
		||||
	server, err := newAgent(addr, worker)
 | 
			
		||||
	a, err := newAgent(net, addr, worker)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	worker.agents = append(worker.agents, server)
 | 
			
		||||
	worker.agents[net + addr] = a
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Write a job to job server.
 | 
			
		||||
// Here, the job's mean is not the oraginal mean.
 | 
			
		||||
// Just looks like a network package for job's result or tell job server, there was a fail.
 | 
			
		||||
func (worker *Worker) broadcast(job *Job) {
 | 
			
		||||
func (worker *Worker) broadcast(req *request) {
 | 
			
		||||
	for _, v := range worker.agents {
 | 
			
		||||
		v.WriteJob(job)
 | 
			
		||||
		v.write(req)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -110,11 +96,12 @@ func (worker *Worker) broadcast(job *Job) {
 | 
			
		||||
// The API will tell every connected job server that 'I can do this'
 | 
			
		||||
func (worker *Worker) AddFunc(funcname string,
 | 
			
		||||
	f JobFunc, timeout uint32) (err error) {
 | 
			
		||||
	worker.mutex.Lock()
 | 
			
		||||
	defer worker.mutex.Unlock()
 | 
			
		||||
	if _, ok := worker.funcs[funcname]; ok {
 | 
			
		||||
		return common.Errorf("The function already exists: %s", funcname)
 | 
			
		||||
		return fmt.Errorf("The function already exists: %s", funcname)
 | 
			
		||||
	}
 | 
			
		||||
	worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
 | 
			
		||||
 | 
			
		||||
	if worker.running {
 | 
			
		||||
		worker.addFunc(funcname, timeout)
 | 
			
		||||
	}
 | 
			
		||||
@ -123,27 +110,27 @@ func (worker *Worker) AddFunc(funcname string,
 | 
			
		||||
 | 
			
		||||
// inner add function
 | 
			
		||||
func (worker *Worker) addFunc(funcname string, timeout uint32) {
 | 
			
		||||
	var datatype uint32
 | 
			
		||||
	var data []byte
 | 
			
		||||
	req := getRequest()
 | 
			
		||||
	if timeout == 0 {
 | 
			
		||||
		datatype = common.CAN_DO
 | 
			
		||||
		data = []byte(funcname)
 | 
			
		||||
		req.DataType = CAN_DO
 | 
			
		||||
		req.Data = []byte(funcname)
 | 
			
		||||
	} else {
 | 
			
		||||
		datatype = common.CAN_DO_TIMEOUT
 | 
			
		||||
		data = []byte(funcname + "\x00")
 | 
			
		||||
		t := common.Uint32ToBytes(timeout)
 | 
			
		||||
		data = append(data, t[:]...)
 | 
			
		||||
		req.DataType = CAN_DO_TIMEOUT
 | 
			
		||||
		l := len(funcname)
 | 
			
		||||
		req.Data = getBuffer(l + 5)
 | 
			
		||||
		copy(req.Data, []byte(funcname))
 | 
			
		||||
		req.Data[l] = '\x00'
 | 
			
		||||
		binary.BigEndian.PutUint32(req.Data[l + 1:], timeout)
 | 
			
		||||
	}
 | 
			
		||||
	job := newJob(common.REQ, datatype, data)
 | 
			
		||||
	worker.broadcast(job)
 | 
			
		||||
 | 
			
		||||
	worker.broadcast(req)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Remove a function.
 | 
			
		||||
// Tell job servers 'I can not do this now' at the same time.
 | 
			
		||||
func (worker *Worker) RemoveFunc(funcname string) (err error) {
 | 
			
		||||
	worker.mutex.Lock()
 | 
			
		||||
	defer worker.mutex.Unlock()
 | 
			
		||||
	if _, ok := worker.funcs[funcname]; !ok {
 | 
			
		||||
		return common.Errorf("The function does not exist: %s", funcname)
 | 
			
		||||
		return fmt.Errorf("The function does not exist: %s", funcname)
 | 
			
		||||
	}
 | 
			
		||||
	delete(worker.funcs, funcname)
 | 
			
		||||
	if worker.running {
 | 
			
		||||
@ -154,27 +141,27 @@ func (worker *Worker) RemoveFunc(funcname string) (err error) {
 | 
			
		||||
 | 
			
		||||
// inner remove function
 | 
			
		||||
func (worker *Worker) removeFunc(funcname string) {
 | 
			
		||||
	job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
 | 
			
		||||
	worker.broadcast(job)
 | 
			
		||||
	req := getRequest()
 | 
			
		||||
	req.DataType = CANT_DO
 | 
			
		||||
	req.Data = []byte(funcname)
 | 
			
		||||
	worker.broadcast(req)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (worker *Worker) dealJob(job *Job) {
 | 
			
		||||
func (worker *Worker) dealResp(resp *Response) {
 | 
			
		||||
	defer func() {
 | 
			
		||||
		job.Close()
 | 
			
		||||
		if worker.running && worker.limit != nil {
 | 
			
		||||
			<-worker.limit
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	switch job.DataType {
 | 
			
		||||
	case common.ERROR:
 | 
			
		||||
		_, err := common.GetError(job.Data)
 | 
			
		||||
		worker.err(err)
 | 
			
		||||
	case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
 | 
			
		||||
		if err := worker.exec(job); err != nil {
 | 
			
		||||
	switch resp.DataType {
 | 
			
		||||
	case ERROR:
 | 
			
		||||
		worker.err(GetError(resp.Data))
 | 
			
		||||
	case JOB_ASSIGN, JOB_ASSIGN_UNIQ:
 | 
			
		||||
		if err := worker.exec(resp); err != nil {
 | 
			
		||||
			worker.err(err)
 | 
			
		||||
		}
 | 
			
		||||
	default:
 | 
			
		||||
		worker.handleJob(job)
 | 
			
		||||
		worker.handleResponse(resp)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -187,23 +174,29 @@ func (worker *Worker) Work() {
 | 
			
		||||
	}()
 | 
			
		||||
	worker.running = true
 | 
			
		||||
	for _, v := range worker.agents {
 | 
			
		||||
		v.Connect()
 | 
			
		||||
		go v.Work()
 | 
			
		||||
	}
 | 
			
		||||
	worker.Reset()
 | 
			
		||||
	for funcname, f := range worker.funcs {
 | 
			
		||||
		worker.addFunc(funcname, f.timeout)
 | 
			
		||||
	}
 | 
			
		||||
	ok := true
 | 
			
		||||
	for ok {
 | 
			
		||||
		var job *Job
 | 
			
		||||
		if job, ok = <-worker.in; ok {
 | 
			
		||||
			go worker.dealJob(job)
 | 
			
		||||
		}
 | 
			
		||||
	var resp *Response
 | 
			
		||||
	for resp = range worker.in {
 | 
			
		||||
		fmt.Println(resp)
 | 
			
		||||
		go worker.dealResp(resp)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// job handler
 | 
			
		||||
func (worker *Worker) handleJob(job *Job) {
 | 
			
		||||
func (worker *Worker) handleResponse(resp *Response) {
 | 
			
		||||
	if worker.JobHandler != nil {
 | 
			
		||||
		job := getJob()
 | 
			
		||||
		job.a = worker.agents[resp.agentId]
 | 
			
		||||
		job.Handle = resp.Handle
 | 
			
		||||
		if resp.DataType == ECHO_RES {
 | 
			
		||||
			job.data = resp.Data
 | 
			
		||||
		}
 | 
			
		||||
		if err := worker.JobHandler(job); err != nil {
 | 
			
		||||
			worker.err(err)
 | 
			
		||||
		}
 | 
			
		||||
@ -221,85 +214,79 @@ func (worker *Worker) Close() {
 | 
			
		||||
 | 
			
		||||
// Send a something out, get the samething back.
 | 
			
		||||
func (worker *Worker) Echo(data []byte) {
 | 
			
		||||
	job := newJob(common.REQ, common.ECHO_REQ, data)
 | 
			
		||||
	worker.broadcast(job)
 | 
			
		||||
	req := getRequest()
 | 
			
		||||
	req.DataType = ECHO_REQ
 | 
			
		||||
	req.Data = data
 | 
			
		||||
	worker.broadcast(req)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Remove all of functions.
 | 
			
		||||
// Both from the worker or job servers.
 | 
			
		||||
func (worker *Worker) Reset() {
 | 
			
		||||
	job := newJob(common.REQ, common.RESET_ABILITIES, nil)
 | 
			
		||||
	worker.broadcast(job)
 | 
			
		||||
	req := getRequest()
 | 
			
		||||
	req.DataType = RESET_ABILITIES
 | 
			
		||||
	worker.broadcast(req)
 | 
			
		||||
	worker.funcs = make(JobFuncs)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Set the worker's unique id.
 | 
			
		||||
func (worker *Worker) SetId(id string) {
 | 
			
		||||
	worker.Id = id
 | 
			
		||||
	job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id))
 | 
			
		||||
	worker.broadcast(job)
 | 
			
		||||
	req := getRequest()
 | 
			
		||||
	req.DataType = SET_CLIENT_ID
 | 
			
		||||
	req.Data = []byte(id)
 | 
			
		||||
	worker.broadcast(req)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Execute the job. And send back the result.
 | 
			
		||||
func (worker *Worker) exec(job *Job) (err error) {
 | 
			
		||||
func (worker *Worker) exec(resp *Response) (err error) {
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if r := recover(); r != nil {
 | 
			
		||||
			if e, ok := r.(error); ok {
 | 
			
		||||
				err = e
 | 
			
		||||
			} else {
 | 
			
		||||
				err = common.ErrUnknown
 | 
			
		||||
				err = ErrUnknown
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	f, ok := worker.funcs[job.Fn]
 | 
			
		||||
	f, ok := worker.funcs[resp.Fn]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return common.Errorf("The function does not exist: %s", job.Fn)
 | 
			
		||||
		return fmt.Errorf("The function does not exist: %s", resp.Fn)
 | 
			
		||||
	}
 | 
			
		||||
	var r *result
 | 
			
		||||
	job := getJob()
 | 
			
		||||
	job.a = worker.agents[resp.agentId]
 | 
			
		||||
	job.Handle = resp.Handle
 | 
			
		||||
	if f.timeout == 0 {
 | 
			
		||||
		d, e := f.f(job)
 | 
			
		||||
		r = &result{data: d, err: e}
 | 
			
		||||
	} else {
 | 
			
		||||
		r = execTimeout(f.f, job, time.Duration(f.timeout)*time.Second)
 | 
			
		||||
	}
 | 
			
		||||
	var datatype uint32
 | 
			
		||||
	req := getRequest()
 | 
			
		||||
	if r.err == nil {
 | 
			
		||||
		datatype = common.WORK_COMPLETE
 | 
			
		||||
		req.DataType = WORK_COMPLETE
 | 
			
		||||
	} else {
 | 
			
		||||
		if r.data == nil {
 | 
			
		||||
			datatype = common.WORK_FAIL
 | 
			
		||||
			req.DataType = WORK_FAIL
 | 
			
		||||
		} else {
 | 
			
		||||
			datatype = common.WORK_EXCEPTION
 | 
			
		||||
			req.DataType = WORK_EXCEPTION
 | 
			
		||||
		}
 | 
			
		||||
		err = r.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	job.magicCode = common.REQ
 | 
			
		||||
	job.DataType = datatype
 | 
			
		||||
	job.Data = r.data
 | 
			
		||||
	req.Data = r.data
 | 
			
		||||
	if worker.running {
 | 
			
		||||
		job.agent.WriteJob(job)
 | 
			
		||||
		job.a.write(req)
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (worker *Worker) removeAgent(a *agent) {
 | 
			
		||||
	for k, v := range worker.agents {
 | 
			
		||||
		if v == a {
 | 
			
		||||
			worker.agents = append(worker.agents[:k], worker.agents[k+1:]...)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(worker.agents) == 0 {
 | 
			
		||||
		worker.err(common.ErrNoActiveAgent)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type result struct {
 | 
			
		||||
	data []byte
 | 
			
		||||
	err  error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) {
 | 
			
		||||
func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
 | 
			
		||||
	rslt := make(chan *result)
 | 
			
		||||
	defer close(rslt)
 | 
			
		||||
	go func() {
 | 
			
		||||
@ -310,8 +297,7 @@ func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) {
 | 
			
		||||
	select {
 | 
			
		||||
	case r = <-rslt:
 | 
			
		||||
	case <-time.After(timeout):
 | 
			
		||||
		go job.cancel()
 | 
			
		||||
		return &result{err: common.ErrTimeOut}
 | 
			
		||||
		return &result{err: ErrTimeOut}
 | 
			
		||||
	}
 | 
			
		||||
	return r
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -10,7 +10,7 @@ func init() {
 | 
			
		||||
 | 
			
		||||
func TestWorkerAddServer(t *testing.T) {
 | 
			
		||||
	t.Log("Add local server 127.0.0.1:4730.")
 | 
			
		||||
	if err := worker.AddServer("127.0.0.1:4730"); err != nil {
 | 
			
		||||
	if err := worker.AddServer("tcp4", "127.0.0.1:4730"); err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -20,7 +20,7 @@ func TestWorkerAddServer(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func foobar(job *Job) ([]byte, error) {
 | 
			
		||||
func foobar(job Job) ([]byte, error) {
 | 
			
		||||
	return nil, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -42,3 +42,11 @@ func TestWorkerRemoveFunc(t *testing.T) {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWork(t *testing.T) {
 | 
			
		||||
	go worker.Work()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWorkerClose(t *testing.T) {
 | 
			
		||||
	worker.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user