forked from yuxh/gearman-go
149 lines
3.4 KiB
Go
149 lines
3.4 KiB
Go
package client
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"strconv"
|
|
)
|
|
|
|
// Response handler
|
|
type ResponseHandler func(*Response)
|
|
|
|
// response
|
|
type Response struct {
|
|
DataType uint32
|
|
Data, UID []byte
|
|
Handle string
|
|
}
|
|
|
|
// Extract the Response's result.
|
|
// if data == nil, err != nil, then worker failing to execute job
|
|
// if data != nil, err != nil, then worker has a exception
|
|
// if data != nil, err == nil, then worker complate job
|
|
// after calling this method, the Response.Handle will be filled
|
|
func (resp *Response) Result() (data []byte, err error) {
|
|
switch resp.DataType {
|
|
case dtWorkFail:
|
|
resp.Handle = string(resp.Data)
|
|
err = ErrWorkFail
|
|
return
|
|
case dtWorkException:
|
|
err = ErrWorkException
|
|
fallthrough
|
|
case dtWorkComplete:
|
|
data = resp.Data
|
|
default:
|
|
err = ErrDataType
|
|
}
|
|
return
|
|
}
|
|
|
|
// Extract the job's update
|
|
func (resp *Response) Update() (data []byte, err error) {
|
|
if resp.DataType != dtWorkData &&
|
|
resp.DataType != dtWorkWarning {
|
|
err = ErrDataType
|
|
return
|
|
}
|
|
data = resp.Data
|
|
if resp.DataType == dtWorkWarning {
|
|
err = ErrWorkWarning
|
|
}
|
|
return
|
|
}
|
|
|
|
// Decode a job from byte slice
|
|
func decodeResponse(data []byte) (resp *Response, l int, err error) {
|
|
a := len(data)
|
|
if a < minPacketLength { // valid package should not less 12 bytes
|
|
err = fmt.Errorf("Invalid data: %V", data)
|
|
return
|
|
}
|
|
dl := int(binary.BigEndian.Uint32(data[8:12]))
|
|
if a < minPacketLength+dl {
|
|
err = fmt.Errorf("Invalid data: %V", data)
|
|
return
|
|
}
|
|
dt := data[minPacketLength : dl+minPacketLength]
|
|
if len(dt) != int(dl) { // length not equal
|
|
err = fmt.Errorf("Invalid data: %V", data)
|
|
return
|
|
}
|
|
resp = getResponse()
|
|
resp.DataType = binary.BigEndian.Uint32(data[4:8])
|
|
switch resp.DataType {
|
|
case dtJobCreated:
|
|
resp.Handle = string(dt)
|
|
case dtStatusRes, dtWorkData, dtWorkWarning, dtWorkStatus,
|
|
dtWorkComplete, dtWorkFail, dtWorkException:
|
|
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
|
|
if len(s) >= 2 {
|
|
resp.Handle = string(s[0])
|
|
resp.Data = s[1]
|
|
} else {
|
|
err = fmt.Errorf("Invalid data: %V", data)
|
|
return
|
|
}
|
|
case dtEchoRes:
|
|
fallthrough
|
|
default:
|
|
resp.Data = dt
|
|
}
|
|
l = dl + minPacketLength
|
|
return
|
|
}
|
|
|
|
func (resp *Response) Status() (status *Status, err error) {
|
|
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
|
|
if len(data) != 2 {
|
|
err = fmt.Errorf("Invalid data: %V", resp.Data)
|
|
return
|
|
}
|
|
status = &Status{}
|
|
status.Handle = resp.Handle
|
|
status.Known = true
|
|
status.Running = true
|
|
status.Numerator, err = strconv.ParseUint(string(data[0]), 10, 0)
|
|
if err != nil {
|
|
err = fmt.Errorf("Invalid Integer: %s", data[0])
|
|
return
|
|
}
|
|
status.Denominator, err = strconv.ParseUint(string(data[1]), 10, 0)
|
|
if err != nil {
|
|
err = fmt.Errorf("Invalid Integer: %s", data[1])
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// status handler
|
|
func (resp *Response) _status() (status *Status, err error) {
|
|
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 4)
|
|
if len(data) != 4 {
|
|
err = fmt.Errorf("Invalid data: %V", resp.Data)
|
|
return
|
|
}
|
|
status = &Status{}
|
|
status.Handle = resp.Handle
|
|
status.Known = (data[0][0] == '1')
|
|
status.Running = (data[1][0] == '1')
|
|
status.Numerator, err = strconv.ParseUint(string(data[2]), 10, 0)
|
|
if err != nil {
|
|
err = fmt.Errorf("Invalid Integer: %s", data[2])
|
|
return
|
|
}
|
|
status.Denominator, err = strconv.ParseUint(string(data[3]), 10, 0)
|
|
if err != nil {
|
|
err = fmt.Errorf("Invalid Integer: %s", data[3])
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func getResponse() (resp *Response) {
|
|
// TODO add a pool
|
|
resp = &Response{}
|
|
return
|
|
}
|