Great. We got it. The client do the job correctly.

This commit is contained in:
mikespook 2011-05-21 21:11:29 +08:00
parent d8882938f7
commit f6d2a16685
2 changed files with 49 additions and 21 deletions

View File

@ -10,18 +10,24 @@ func main() {
defer client.Close() defer client.Close()
client.AddServer("127.0.0.1:4730") client.AddServer("127.0.0.1:4730")
echo := []byte("Hello world") echo := []byte("Hello world")
/*
log.Println(echo) if data, err := client.Echo(echo); err != nil {
log.Println(client.Echo(echo)) log.Println(string(data))
*/ }
handle, err := client.Do("ToUpper", echo, gearman.JOB_NORMAL) handle, err := client.Do("ToUpper", echo, gearman.JOB_NORMAL)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} else { } else {
log.Println(handle) log.Println(handle)
log.Println(<-client.JobQueue) job := <-client.JobQueue
if data, err := job.Result(); err != nil {
log.Println(err)
} else {
log.Println(string(data))
}
} }
/*
known, running, numerator, denominator, err := client.Status(handle) known, running, numerator, denominator, err := client.Status(handle)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
@ -42,5 +48,5 @@ func main() {
log.Println(err) log.Println(err)
log.Println(data) log.Println(data)
} }
*/
} }

View File

@ -4,7 +4,7 @@ import (
"os" "os"
"net" "net"
"sync" "sync"
"log" // "log"
"strconv" "strconv"
) )
@ -33,21 +33,43 @@ func (client *Client) AddServer(addr string) (err os.Error) {
} }
func (client *Client) read() (data []byte, err os.Error) { func (client *Client) read() (data []byte, err os.Error) {
var rel []byte if len(client.incoming) > 0 {
for { data = <-client.incoming
buf := make([]byte, BUFFER_SIZE) } else {
var n int for {
if n, err = client.conn.Read(buf); err != nil { buf := make([]byte, BUFFER_SIZE)
if (err == os.EOF && n == 0) { var n int
if n, err = client.conn.Read(buf); err != nil {
if (err == os.EOF && n == 0) {
break
}
return
}
data = append(data, buf[0: n] ...)
if n < BUFFER_SIZE {
break break
} }
return
}
rel = append(rel, buf[0: n] ...)
if n < BUFFER_SIZE {
break
} }
} }
start, end := 0, 4
for i := 0; i < len(data); i ++{
if string(data[start:end]) == RES_STR {
l := int(byteToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]}))
total := l + 12
if total == len(data) {
return
} else {
client.incoming <- data[total:]
data = data[:total]
return
}
} else {
start++
end++
}
}
err = os.NewError("Invalid data struct.")
return
} }
func (client *Client) ReadJob() (job *ClientJob, err os.Error) { func (client *Client) ReadJob() (job *ClientJob, err os.Error) {
@ -107,7 +129,6 @@ func (client *Client) Do(funcname string, data []byte, flag byte) (handle string
return return
} }
handle = string(job.Data) handle = string(job.Data)
log.Println(handle)
go func() { go func() {
if flag & JOB_BG != JOB_BG { if flag & JOB_BG != JOB_BG {
for { for {
@ -174,7 +195,8 @@ func (client *Client) Echo(data []byte) (echo []byte, err os.Error) {
if job, err = client.readLastJob(ECHO_RES); err != nil { if job, err = client.readLastJob(ECHO_RES); err != nil {
return return
} }
return job.Data, err echo, err = job.Result()
return
} }
func (client *Client) LastResult() (job *ClientJob) { func (client *Client) LastResult() (job *ClientJob) {