gearman-go/client/response.go

143 lines
3.3 KiB
Go
Raw Normal View History

2013-08-29 16:51:23 +08:00
// Copyright 2011 Xing Xing <mikespook@gmail.com>.
// All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package client
import (
2013-08-29 18:08:05 +08:00
"fmt"
2013-08-30 11:20:51 +08:00
"bytes"
2013-08-29 18:08:05 +08:00
"strconv"
2013-08-30 11:20:51 +08:00
"encoding/binary"
2013-08-29 16:51:23 +08:00
)
// response
2013-08-29 18:08:05 +08:00
type Response struct {
DataType uint32
2013-08-30 11:20:51 +08:00
Data, UID []byte
Handle string
2013-08-29 16:51:23 +08:00
}
// Extract the Response's result.
// if data == nil, err != nil, then worker failing to execute job
// if data != nil, err != nil, then worker has a exception
// if data != nil, err == nil, then worker complate job
// after calling this method, the Response.Handle will be filled
2013-08-29 18:08:05 +08:00
func (resp *Response) Result() (data []byte, err error) {
2013-08-29 16:51:23 +08:00
switch resp.DataType {
case WORK_FAIL:
2013-08-29 18:08:05 +08:00
resp.Handle = string(resp.Data)
2013-08-29 16:51:23 +08:00
err = ErrWorkFail
return
case WORK_EXCEPTION:
err = ErrWorkException
fallthrough
case WORK_COMPLETE:
s := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = fmt.Errorf("Invalid data: %V", resp.Data)
return
}
2013-08-29 18:08:05 +08:00
resp.Handle = string(s[0])
2013-08-29 16:51:23 +08:00
data = s[1]
default:
err = ErrDataType
}
return
}
// Extract the job's update
2013-08-29 18:08:05 +08:00
func (resp *Response) Update() (data []byte, err error) {
2013-08-29 16:51:23 +08:00
if resp.DataType != WORK_DATA &&
resp.DataType != WORK_WARNING {
err = ErrDataType
return
}
s := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
if len(s) != 2 {
err = ErrInvalidData
return
}
if resp.DataType == WORK_WARNING {
err = ErrWorkWarning
}
2013-08-29 18:08:05 +08:00
resp.Handle = string(s[0])
2013-08-29 16:51:23 +08:00
data = s[1]
return
}
// Decode a job from byte slice
2013-08-29 18:08:05 +08:00
func decodeResponse(data []byte) (resp *Response, l int, err error) {
2013-08-29 16:51:23 +08:00
if len(data) < MIN_PACKET_LEN { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %V", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
2013-08-29 18:08:05 +08:00
dt := data[MIN_PACKET_LEN : dl+MIN_PACKET_LEN]
2013-08-29 16:51:23 +08:00
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %V", data)
return
}
resp = getResponse()
resp.DataType = binary.BigEndian.Uint32(data[4:8])
switch resp.DataType {
2013-08-30 11:20:51 +08:00
case JOB_CREATED:
resp.Handle = string(dt)
case STATUS_RES, WORK_DATA, WORK_WARNING, WORK_STATUS,
2013-08-29 16:51:23 +08:00
WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION:
2013-08-29 18:08:05 +08:00
s := bytes.SplitN(dt, []byte{'\x00'}, 2)
2013-08-29 16:51:23 +08:00
if len(s) >= 2 {
2013-08-29 18:08:05 +08:00
resp.Handle = string(s[0])
2013-08-29 16:51:23 +08:00
resp.Data = s[1]
} else {
err = fmt.Errorf("Invalid data: %V", data)
return
}
2013-08-30 11:20:51 +08:00
case ECHO_RES:
fallthrough
default:
resp.Data = dt
2013-08-29 16:51:23 +08:00
}
2013-08-30 11:20:51 +08:00
l = dl + MIN_PACKET_LEN
2013-08-29 16:51:23 +08:00
return
}
2013-08-29 18:08:05 +08:00
func (resp *Response) IsEcho() bool {
return resp.DataType == ECHO_RES
}
func (resp *Response) IsStatus() bool {
return resp.DataType == STATUS_RES
}
2013-08-29 16:51:23 +08:00
// status handler
2013-08-29 18:08:05 +08:00
func (resp *Response) Status() (status *Status, err error) {
2013-08-30 11:20:51 +08:00
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 4)
if len(data) != 4 {
2013-08-29 16:51:23 +08:00
err = fmt.Errorf("Invalid data: %V", resp.Data)
return
}
status = &Status{}
2013-08-30 11:20:51 +08:00
status.Handle = resp.Handle
status.Known = (data[0][0] == '1')
status.Running = (data[1][0] == '1')
status.Numerator, err = strconv.ParseUint(string(data[2]), 10, 0)
2013-08-29 16:51:23 +08:00
if err != nil {
2013-08-30 11:20:51 +08:00
err = fmt.Errorf("Invalid Integer: %s", data[2])
2013-08-29 16:51:23 +08:00
return
}
2013-08-30 11:20:51 +08:00
status.Denominator, err = strconv.ParseUint(string(data[3]), 10, 0)
2013-08-29 16:51:23 +08:00
if err != nil {
2013-08-30 11:20:51 +08:00
err = fmt.Errorf("Invalid Integer: %s", data[3])
2013-08-29 16:51:23 +08:00
return
}
return
}
2013-08-29 18:08:05 +08:00
func getResponse() (resp *Response) {
2013-08-29 16:51:23 +08:00
// TODO add a pool
2013-08-29 18:08:05 +08:00
resp = &Response{}
2013-08-29 16:51:23 +08:00
return
}