gearman-go/worker/inpack.go

123 lines
2.7 KiB
Go
Raw Normal View History

2013-08-30 12:36:57 +08:00
package worker
import (
"bytes"
2013-08-30 18:01:10 +08:00
"encoding/binary"
2013-12-23 17:05:42 +08:00
"fmt"
"strconv"
2013-08-30 12:36:57 +08:00
)
// Worker side job
type inPack struct {
2013-12-23 17:05:42 +08:00
dataType uint32
data []byte
handle, uniqueId, fn string
2013-12-23 17:05:42 +08:00
a *agent
2013-08-30 12:36:57 +08:00
}
// Create a new job
func getInPack() *inPack {
return &inPack{}
2013-08-30 12:36:57 +08:00
}
func (inpack *inPack) Data() []byte {
return inpack.data
}
func (inpack *inPack) Fn() string {
return inpack.fn
}
2013-12-31 06:14:19 +08:00
func (inpack *inPack) Handle() string {
return inpack.handle
}
func (inpack *inPack) UniqueId() string {
return inpack.uniqueId
}
2013-12-26 12:06:47 +08:00
func (inpack *inPack) Err() error {
if inpack.dataType == dtError {
return getError(inpack.data)
2013-12-26 12:06:47 +08:00
}
return nil
}
// Send some datas to client.
// Using this in a job's executing.
func (inpack *inPack) SendData(data []byte) {
outpack := getOutPack()
outpack.dataType = dtWorkData
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
2013-12-23 17:05:42 +08:00
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
func (inpack *inPack) SendWarning(data []byte) {
outpack := getOutPack()
outpack.dataType = dtWorkWarning
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = getBuffer(l)
copy(outpack.data, []byte(inpack.handle))
2013-12-23 17:05:42 +08:00
copy(outpack.data[hl+1:], data)
inpack.a.write(outpack)
}
// Update status.
// Tall client how many percent job has been executed.
func (inpack *inPack) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
outpack := getOutPack()
outpack.dataType = dtWorkStatus
hl := len(inpack.handle)
nl := len(n)
dl := len(d)
outpack.data = getBuffer(hl + nl + dl + 3)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], n)
copy(outpack.data[hl+nl+2:], d)
inpack.a.write(outpack)
}
2013-08-30 12:36:57 +08:00
// Decode job from byte slice
func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
if len(data) < minPacketLength { // valid package should not less 12 bytes
2013-08-30 18:01:10 +08:00
err = fmt.Errorf("Invalid data: %V", data)
return
2013-08-30 12:36:57 +08:00
}
2013-08-30 18:01:10 +08:00
dl := int(binary.BigEndian.Uint32(data[8:12]))
dt := data[minPacketLength : dl+minPacketLength]
2013-08-30 18:01:10 +08:00
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %V", data)
return
2013-08-30 12:36:57 +08:00
}
inpack = getInPack()
inpack.dataType = binary.BigEndian.Uint32(data[4:8])
switch inpack.dataType {
case dtJobAssign:
2013-08-30 18:01:10 +08:00
s := bytes.SplitN(dt, []byte{'\x00'}, 3)
2013-08-30 12:36:57 +08:00
if len(s) == 3 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.data = s[2]
2013-08-30 12:36:57 +08:00
}
case dtJobAssignUniq:
2013-08-30 18:01:10 +08:00
s := bytes.SplitN(dt, []byte{'\x00'}, 4)
2013-08-30 12:36:57 +08:00
if len(s) == 4 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.uniqueId = string(s[2])
inpack.data = s[3]
2013-08-30 12:36:57 +08:00
}
2013-08-30 18:01:10 +08:00
default:
inpack.data = dt
2013-08-30 12:36:57 +08:00
}
l = dl + minPacketLength
2013-08-30 12:36:57 +08:00
return
}