The client can work now...with some known bugs. :(

This commit is contained in:
mikespook 2011-05-20 17:38:10 +08:00
parent c5b9f19962
commit 016dd669aa
8 changed files with 216 additions and 66 deletions

View File

@ -9,7 +9,7 @@ TARG=gearman
# GOFILES=\ # GOFILES=\
# worker.go\ # worker.go\
CLEANFILES+= worker CLEANFILES+= worker client
include $(GOROOT)/src/Make.pkg include $(GOROOT)/src/Make.pkg

40
example/client.go Normal file
View File

@ -0,0 +1,40 @@
package main
import (
"gearman"
"log"
)
func main() {
client := gearman.NewClient()
defer client.Close()
client.AddServer("127.0.0.1:4730")
echo := []byte("Hello world")
log.Println(echo)
log.Println(client.Echo(echo))
handle, err := client.Do("ToUpper", echo, gearman.JOB_HIGH)
if err != nil {
log.Println(err)
}
known, running, numerator, denominator, err := client.Status(handle)
if err != nil {
log.Println(err)
}
if !known {
log.Println("Unknown")
}
if running {
log.Printf("%g%%\n", float32(numerator) * 100 / float32(denominator))
} else {
log.Println("Not running")
}
log.Println("read")
if job, err := client.ReadJob(); err != nil {
log.Println(err)
} else {
data, err := job.Result();
log.Println(err)
log.Println(data)
}
}

View File

@ -3,20 +3,19 @@ package gearman
import ( import (
"os" "os"
"net" "net"
// "log" "sync"
) )
type Client struct { type Client struct {
mutex sync.Mutex
conn net.Conn conn net.Conn
running bool
JobQueue chan *ClientJob JobQueue chan *ClientJob
ErrQueue chan os.Error UId uint32
} }
func NewClient() (client * Client){ func NewClient() (client * Client){
client = &Client{running:false, client = &Client{JobQueue:make(chan *ClientJob, QUEUE_CAP),
JobQueue:make(chan *ClientJob, QUEUE_CAP), UId:1}
ErrQueue:make(chan os.Error, QUEUE_CAP),}
return return
} }
@ -26,49 +25,42 @@ func (client *Client) AddServer(addr string) (err os.Error) {
return return
} }
client.conn = conn client.conn = conn
go client.work()
return return
} }
func (client *Client) work() { func (client *Client) ReadJob() (job *ClientJob, err os.Error) {
OUT: for client.running {
var rel []byte var rel []byte
for { for {
buf := make([]byte, 2048) buf := make([]byte, BUFFER_SIZE)
n, err := client.conn.Read(buf) var n int
if err != nil { if n, err = client.conn.Read(buf); err != nil {
if err == os.EOF && n == 0 { if (err == os.EOF && n == 0) {
break break
} }
client.ErrQueue <- err return
continue OUT
} }
rel = append(rel, buf[0: n] ...) rel = append(rel, buf[0: n] ...)
if n < BUFFER_SIZE {
break
} }
job, err := DecodeClientJob(rel) }
if err != nil { if job, err = DecodeClientJob(rel); err != nil {
client.ErrQueue <- err return
} else { } else {
switch(job.dataType) { switch(job.dataType) {
case ERROR: case ERROR:
_, err := getError(job.Data) _, err = getError(job.Data)
client.ErrQueue <- err return
case ECHO_RES: case WORK_DATA, WORK_WARNING, WORK_STATUS, WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION:
client.JobQueue <- job client.JobQueue <- job
} }
} }
} return
} }
func (client *Client) Do(funcname string, data []byte, flag byte) (err os.Error) { func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err os.Error) {
var datatype uint32 var datatype uint32
if flag & JOB_NORMAL == JOB_NORMAL { if flag & JOB_LOW == JOB_LOW {
if flag & JOB_BG == JOB_BG {
datatype = SUBMIT_JOB_BG
} else {
datatype = SUBMIT_JOB
}
} else if flag & JOB_LOW == JOB_LOW {
if flag & JOB_BG == JOB_BG { if flag & JOB_BG == JOB_BG {
datatype = SUBMIT_JOB_LOW_BG datatype = SUBMIT_JOB_LOW_BG
} else { } else {
@ -80,20 +72,97 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (err os.Error)
} else { } else {
datatype = SUBMIT_JOB_HIGH datatype = SUBMIT_JOB_HIGH
} }
} else if flag & JOB_BG == JOB_BG {
datatype = SUBMIT_JOB_BG
} else {
datatype = SUBMIT_JOB
} }
rel := make([]byte, 0, 1024 * 64) rel := make([]byte, 0, 1024 * 64)
rel = append(rel, []byte(funcname) ...) rel = append(rel, []byte(funcname) ...)
rel = append(rel, '\x00') rel = append(rel, '\x00')
rel = append(rel, '\xFF') client.mutex.Lock()
uid := uint32ToByte(client.UId)
client.UId ++
rel = append(rel, uid[:] ...)
client.mutex.Unlock()
rel = append(rel, '\x00') rel = append(rel, '\x00')
rel = append(rel, data ...) rel = append(rel, data ...)
job := NewClientJob(REQ, datatype, data) if err = client.WriteJob(NewClientJob(REQ, datatype, rel)); err != nil {
return client.WriteJob(job) return
}
var job *ClientJob
if job, err = client.ReadLastJob(JOB_CREATED); err != nil {
return
}
handle = string(job.Data)
go func() {
if flag & JOB_BG != JOB_BG {
for {
if job, err = client.ReadJob(); err != nil {
return
}
switch job.dataType {
case WORK_DATA, WORK_WARNING:
case WORK_STATUS:
case WORK_COMPLETE, WORK_FAIL, WORK_EXCEPTION:
return
}
}
}
}()
return
} }
func (client *Client) Echo(data []byte) (err os.Error) { func (client *Client) ReadLastJob(datatype uint32) (job *ClientJob, err os.Error){
job := NewClientJob(REQ, ECHO_REQ, data) for {
return client.WriteJob(job) if job, err = client.ReadJob(); err != nil {
return
}
if job.dataType == datatype {
break
}
}
if job.dataType != datatype {
err = os.NewError("No job got.")
}
return
}
func (client *Client) Status(handle string) (known, running bool, numerator, denominator uint, err os.Error) {
if err = client.WriteJob(NewClientJob(REQ, GET_STATUS, []byte(handle))); err != nil {
return
}
var job * ClientJob
if job, err = client.ReadLastJob(STATUS_RES); err != nil {
return
}
data := splitByteArray(job.Data, '\x00')
if len(data) != 5 {
err = os.NewError("Data Error.")
return
}
if handle != string(data[0]) {
err = os.NewError("Invalid handle.")
return
}
known = data[1][0] == '1'
running = data[2][0] == '1'
numerator = uint(data[3][0])
denominator = uint(data[4][0])
return
}
func (client *Client) Echo(data []byte) (echo []byte, err os.Error) {
if err = client.WriteJob(NewClientJob(REQ, ECHO_REQ, data)); err != nil {
return
}
var job *ClientJob
if job, err = client.ReadLastJob(ECHO_RES); err != nil {
return
}
return job.Data, err
} }
func (client *Client) LastResult() (job *ClientJob) { func (client *Client) LastResult() (job *ClientJob) {
@ -108,18 +177,6 @@ func (client *Client) LastResult() (job *ClientJob) {
return <-client.JobQueue return <-client.JobQueue
} }
func (client *Client) LastError() (err os.Error) {
if l := len(client.ErrQueue); l != 1 {
if l == 0 {
return
}
for i := 0; i < l - 1; i ++ {
<-client.ErrQueue
}
}
return <-client.ErrQueue
}
func (client *Client) WriteJob(job *ClientJob) (err os.Error) { func (client *Client) WriteJob(job *ClientJob) (err os.Error) {
return client.Write(job.Encode()) return client.Write(job.Encode())
} }
@ -136,9 +193,7 @@ func (client *Client) Write(buf []byte) (err os.Error) {
} }
func (client *Client) Close() (err os.Error) { func (client *Client) Close() (err os.Error) {
client.running = false
err = client.conn.Close() err = client.conn.Close()
close(client.JobQueue) close(client.JobQueue)
close(client.ErrQueue)
return return
} }

View File

@ -2,6 +2,7 @@ package gearman
import ( import (
"os" "os"
// "log"
) )
type ClientJob struct { type ClientJob struct {
@ -47,3 +48,43 @@ func (job *ClientJob) Encode() (data []byte) {
return return
} }
func (job * ClientJob) Result() (data []byte, err os.Error){
switch job.dataType {
case WORK_FAIL:
job.Handle = string(job.Data)
err = os.NewError("Work fail.")
return
case WORK_EXCEPTION:
err = os.NewError("Work exception.")
fallthrough
case WORK_COMPLETE:
s := splitByteArray(job.Data, '\x00')
if len(s) != 2 {
err = os.NewError("Invalid data.")
return
}
job.Handle = string(s[0])
data = s[1]
default:
err = os.NewError("The job is not a result.")
}
return
}
func (job *ClientJob) Update() (data []byte, err os.Error) {
if job.dataType != WORK_DATA && job.dataType != WORK_WARNING {
err = os.NewError("The job is not a update.")
return
}
s := splitByteArray(job.Data, '\x00')
if len(s) != 2 {
err = os.NewError("Invalid data.")
return
}
if job.dataType == WORK_WARNING {
err = os.NewError("Work warning")
}
job.Handle = string(s[0])
data = s[1]
return
}

View File

@ -19,17 +19,21 @@ func TestClientAddServer(t * testing.T) {
} }
func TestClientEcho(t * testing.T) { func TestClientEcho(t * testing.T) {
if err := client.Echo([]byte("Hello world")); err != nil { if echo, err := client.Echo([]byte("Hello world")); err != nil {
t.Error(err) t.Error(err)
} else {
t.Log(echo)
} }
} }
/*
func TestClientDo(t * testing.T) { func TestClientDo(t * testing.T) {
if err := client.Do("ToUpper", []byte("abcdef"), JOB_NORMAL); err != nil { if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW | JOB_BG); err != nil {
t.Error(err) t.Error(err)
} else {
t.Log(handle)
} }
} }
*/
/* /*
func TestClientLastResult(t * testing.T) { func TestClientLastResult(t * testing.T) {
job := client.LastResult() job := client.LastResult()
@ -46,3 +50,4 @@ func TestClientClose(t * testing.T) {
t.Error(err) t.Error(err)
} }
} }

View File

@ -9,27 +9,33 @@ const (
WORKER_SERVER_CAP = 32 WORKER_SERVER_CAP = 32
WORKER_FUNCTION_CAP = 512 WORKER_FUNCTION_CAP = 512
QUEUE_CAP = 512 QUEUE_CAP = 512
BUFFER_SIZE = 1024
// \x00REQ // \x00REQ
REQ = 5391697 REQ = 5391697
REQ_STR = "\x00REQ"
// \x00RES // \x00RES
RES = 5391699 RES = 5391699
RES_STR = "\x00RES"
CAN_DO = 1 CAN_DO = 1
CANT_DO = 2 CANT_DO = 2
RESET_ABILITIES = 3 RESET_ABILITIES = 3
PRE_SLEEP = 4 PRE_SLEEP = 4
NOOP = 6 NOOP = 6
JOB_CREATED = 8
GRAB_JOB = 9 GRAB_JOB = 9
NO_JOB = 10 NO_JOB = 10
JOB_ASSIGN = 11 JOB_ASSIGN = 11
WORK_STATUS = 12 WORK_STATUS = 12
WORK_COMPLETE = 13 WORK_COMPLETE = 13
WORK_FAIL = 14 WORK_FAIL = 14
GET_STATUS = 15
ECHO_REQ = 16 ECHO_REQ = 16
ECHO_RES = 17 ECHO_RES = 17
ERROR = 19 ERROR = 19
STATUS_RES = 20
SET_CLIENT_ID = 22 SET_CLIENT_ID = 22
CAN_DO_TIMEOUT = 23 CAN_DO_TIMEOUT = 23
WORK_EXCEPTION = 25 WORK_EXCEPTION = 25

View File

@ -121,7 +121,7 @@ func (worker * Worker) Work() {
if err := worker.exec(job); err != nil { if err := worker.exec(job); err != nil {
worker.ErrQueue <- err worker.ErrQueue <- err
} }
continue fallthrough
default: default:
worker.JobQueue <- job worker.JobQueue <- job
} }

View File

@ -30,7 +30,7 @@ func (client *jobClient) Work() {
} }
var rel []byte var rel []byte
for { for {
buf := make([]byte, 2048) buf := make([]byte, BUFFER_SIZE)
n, err := client.conn.Read(buf) n, err := client.conn.Read(buf)
if err != nil { if err != nil {
if err == os.EOF && n == 0 { if err == os.EOF && n == 0 {
@ -40,6 +40,9 @@ func (client *jobClient) Work() {
continue OUT continue OUT
} }
rel = append(rel, buf[0: n] ...) rel = append(rel, buf[0: n] ...)
if n < BUFFER_SIZE {
break
}
} }
job, err := DecodeWorkerJob(rel) job, err := DecodeWorkerJob(rel)
if err != nil { if err != nil {