fixed client:Job.Encode

--HG--
branch : 0.1
This commit is contained in:
mikespook 2012-05-23 15:22:29 +08:00
parent 7614c2678a
commit 563af037cb
6 changed files with 66 additions and 41 deletions

View File

@ -6,9 +6,9 @@
package client package client
import ( import (
"bytes"
"io" "io"
"net" "net"
"bytes"
"strconv" "strconv"
"bitbucket.org/mikespook/golib/autoinc" "bitbucket.org/mikespook/golib/autoinc"
"bitbucket.org/mikespook/gearman-go/common" "bitbucket.org/mikespook/gearman-go/common"
@ -43,9 +43,9 @@ type Client struct {
// Create a new client. // Create a new client.
// Connect to "addr" through "network" // Connect to "addr" through "network"
// Eg. // Eg.
// client, err := client.New("tcp4", "127.0.0.1:4730") // client, err := client.New("127.0.0.1:4730")
func New(network, addr string) (client *Client, err error) { func New(addr string) (client *Client, err error) {
conn, err := net.Dial(network, addr) conn, err := net.Dial("tcp", addr)
if err != nil { if err != nil {
return return
} }
@ -56,8 +56,8 @@ func New(network, addr string) (client *Client, err error) {
conn: conn, conn: conn,
ai: autoinc.New(0, 1), ai: autoinc.New(0, 1),
} }
go client.outLoop()
go client.inLoop() go client.inLoop()
go client.outLoop()
return return
} }
@ -66,7 +66,9 @@ func (client *Client) outLoop() {
ok := true ok := true
for ok { for ok {
if job, ok := <-client.out; ok { if job, ok := <-client.out; ok {
client.write(job.Encode()) if err := client.write(job.Encode()); err != nil {
client.err(err)
}
} }
} }
} }
@ -91,11 +93,11 @@ func (client *Client) inLoop() {
case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION, common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION,
common.ECHO_RES: common.ECHO_RES:
client.handleJob(job) go client.handleJob(job)
case common.JOB_CREATED: case common.JOB_CREATED:
client.jobCreated <- job client.jobCreated <- job
case common.STATUS_RES: case common.STATUS_RES:
client.handleStatus(job) go client.handleStatus(job)
} }
} }
} }
@ -112,6 +114,10 @@ func (client *Client) read() (data []byte, err error) {
var n int var n int
if n, err = client.conn.Read(buf); err != nil { if n, err = client.conn.Read(buf); err != nil {
if err == io.EOF && n == 0 { if err == io.EOF && n == 0 {
if data == nil {
err = common.ErrEmptyReading
return
}
break break
} }
return return
@ -123,8 +129,8 @@ func (client *Client) read() (data []byte, err error) {
} }
} }
// split package // split package
start, end := 0, 4
tl := len(data) tl := len(data)
start, end := 0, 4
for i := 0; i < tl; i++ { for i := 0; i < tl; i++ {
if string(data[start:end]) == common.RES_STR { if string(data[start:end]) == common.RES_STR {
l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]}))
@ -193,19 +199,19 @@ func (client *Client) handleStatus(job *Job) {
// JOB_LOW | JOB_BG means the job is running with low level in background. // JOB_LOW | JOB_BG means the job is running with low level in background.
func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) {
var datatype uint32 var datatype uint32
if flag & common.JOB_LOW == common.JOB_LOW { if flag & JOB_LOW == JOB_LOW {
if flag & common.JOB_BG == common.JOB_BG { if flag & JOB_BG == JOB_BG {
datatype = common.SUBMIT_JOB_LOW_BG datatype = common.SUBMIT_JOB_LOW_BG
} else { } else {
datatype = common.SUBMIT_JOB_LOW datatype = common.SUBMIT_JOB_LOW
} }
} else if flag & common.JOB_HIGH == common.JOB_HIGH { } else if flag & JOB_HIGH == JOB_HIGH {
if flag & common.JOB_BG == common.JOB_BG { if flag & JOB_BG == JOB_BG {
datatype = common.SUBMIT_JOB_HIGH_BG datatype = common.SUBMIT_JOB_HIGH_BG
} else { } else {
datatype = common.SUBMIT_JOB_HIGH datatype = common.SUBMIT_JOB_HIGH
} }
} else if flag & common.JOB_BG == common.JOB_BG { } else if flag & JOB_BG == JOB_BG {
datatype = common.SUBMIT_JOB_BG datatype = common.SUBMIT_JOB_BG
} else { } else {
datatype = common.SUBMIT_JOB datatype = common.SUBMIT_JOB
@ -213,14 +219,13 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string
uid := strconv.Itoa(int(client.ai.Id())) uid := strconv.Itoa(int(client.ai.Id()))
l := len(funcname) + len(uid) + len(data) + 2 l := len(funcname) + len(uid) + len(data) + 2
rel := make([]byte, l) rel := make([]byte, 0, l)
rel = append(rel, []byte(funcname)...) // len(funcname) rel = append(rel, []byte(funcname)...) // len(funcname)
rel = append(rel, '\x00') // 1 Byte rel = append(rel, '\x00') // 1 Byte
rel = append(rel, []byte(uid)...) // len(uid) rel = append(rel, []byte(uid)...) // len(uid)
rel = append(rel, '\x00') // 1 Byte rel = append(rel, '\x00') // 1 Byte
rel = append(rel, data...) // len(data) rel = append(rel, data...) // len(data)
client.writeJob(newJob(common.REQ, datatype, rel)) client.writeJob(newJob(common.REQ, datatype, rel))
// Waiting for JOB_CREATED // Waiting for JOB_CREATED
job := <-client.jobCreated job := <-client.jobCreated
return string(job.Data), nil return string(job.Data), nil
@ -259,5 +264,6 @@ func (client *Client) write(buf []byte) (err error) {
func (client *Client) Close() (err error) { func (client *Client) Close() (err error) {
close(client.jobCreated) close(client.jobCreated)
close(client.in) close(client.in)
close(client.out)
return client.conn.Close(); return client.conn.Close();
} }

View File

@ -1,7 +1,6 @@
package client package client
import ( import (
"bitbucket.org/mikespook/gearman-go/common"
"testing" "testing"
) )
@ -10,9 +9,12 @@ var client *Client
func TestClientAddServer(t *testing.T) { func TestClientAddServer(t *testing.T) {
t.Log("Add local server 127.0.0.1:4730") t.Log("Add local server 127.0.0.1:4730")
var err error var err error
if client, err = New("tcp4", "127.0.0.1:4730"); err != nil { if client, err = New("127.0.0.1:4730"); err != nil {
t.Error(err) t.Error(err)
} }
client.ErrHandler = func(e error) {
t.Error(e)
}
} }
func TestClientEcho(t *testing.T) { func TestClientEcho(t *testing.T) {
@ -29,7 +31,7 @@ func TestClientEcho(t *testing.T) {
} }
func TestClientDo(t *testing.T) { func TestClientDo(t *testing.T) {
if handle, err := client.Do("ToUpper", []byte("abcdef"), common.JOB_LOW|common.JOB_BG); err != nil { if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil {
t.Error(err) t.Error(err)
} else { } else {
t.Log(handle) t.Log(handle)

View File

@ -9,6 +9,20 @@ import (
"bytes" "bytes"
"bitbucket.org/mikespook/gearman-go/common" "bitbucket.org/mikespook/gearman-go/common"
) )
const (
// Job type
// JOB_NORMAL | JOB_BG means a normal level job run in background
// normal level
JOB_NORMAL = 0
// background job
JOB_BG = 1
// low level
JOB_LOW = 2
// high level
JOB_HIGH = 4
)
// An error handler // An error handler
type ErrorHandler func(error) type ErrorHandler func(error)
@ -42,13 +56,15 @@ func decodeJob(data []byte) (job *Job, err error) {
// Encode a job to byte slice // Encode a job to byte slice
func (job *Job) Encode() (data []byte) { func (job *Job) Encode() (data []byte) {
l := len(job.Data) + 12 l := len(job.Data)
data = make([]byte, l) tl := l + 12
data = make([]byte, tl)
magiccode := common.Uint32ToBytes(job.magicCode) magiccode := common.Uint32ToBytes(job.magicCode)
datatype := common.Uint32ToBytes(job.DataType) datatype := common.Uint32ToBytes(job.DataType)
datalength := common.Uint32ToBytes(uint32(l)) datalength := common.Uint32ToBytes(uint32(l))
for i := 0; i < l; i ++ {
for i := 0; i < tl; i ++ {
switch { switch {
case i < 4: case i < 4:
data[i] = magiccode[i] data[i] = magiccode[i]
@ -60,6 +76,13 @@ func (job *Job) Encode() (data []byte) {
data[i] = job.Data[i - 12] data[i] = job.Data[i - 12]
} }
} }
// Alternative
/*
data = append(data, magiccode[:] ...)
data = append(data, datatype[:] ...)
data = append(data, datalength[:] ...)
data = append(data, job.Data ...)
*/
return return
} }

View File

@ -21,6 +21,7 @@ var (
ErrOutOfCap = errors.New("Out of the capability.") ErrOutOfCap = errors.New("Out of the capability.")
ErrNotConn = errors.New("Did not connect to job server.") ErrNotConn = errors.New("Did not connect to job server.")
ErrFuncNotFound = errors.New("The function was not found.") ErrFuncNotFound = errors.New("The function was not found.")
ErrEmptyReading = errors.New("Empty reading.")
) )
// Extract the error message // Extract the error message

View File

@ -54,17 +54,6 @@ const (
SUBMIT_JOB_HIGH_BG = 32 SUBMIT_JOB_HIGH_BG = 32
SUBMIT_JOB_LOW = 33 SUBMIT_JOB_LOW = 33
SUBMIT_JOB_LOW_BG = 34 SUBMIT_JOB_LOW_BG = 34
// Job type
// JOB_NORMAL | JOB_BG means a normal level job run in background
// normal level
JOB_NORMAL = 0
// background job
JOB_BG = 1
// low level
JOB_LOW = 2
// high level
JOB_HIGH = 4
) )
// Decode [4]byte to uint32 // Decode [4]byte to uint32

View File

@ -3,29 +3,32 @@ package main
import ( import (
"log" "log"
"sync" "sync"
"bitbucket.org/mikespook/gearman-go"
"bitbucket.org/mikespook/gearman-go/client" "bitbucket.org/mikespook/gearman-go/client"
) )
func main() { func main() {
var wg sync.WaitGroup var wg sync.WaitGroup
c, err := client.New("tcp4", "127.0.0.1:4730") c, err := client.New("127.0.0.1:4730")
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
defer c.Close() defer c.Close()
echo := []byte("Hello\x00 world") echo := []byte("Hello\x00 world")
c.JobHandler = func(job *client.Job) error { c.JobHandler = func(job *client.Job) error {
log.Printf("%V", job) log.Printf("%s", job.Data)
wg.Done() wg.Done()
return nil return nil
} }
c.ErrHandler = func(e error) {
log.Println(e)
panic(e)
}
wg.Add(1) wg.Add(1)
c.Echo(echo) c.Echo(echo)
wg.Add(1)
handle, err := c.Do("ToUpper", echo, gearman.JOB_NORMAL) handle, err := c.Do("ToUpper", echo, client.JOB_NORMAL)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} else { } else {
@ -38,5 +41,6 @@ func main() {
} }
wg.Add(1) wg.Add(1)
c.Status(handle) c.Status(handle)
wg.Wait() wg.Wait()
} }