Changed agent.read to handle big data.

This commit is contained in:
Kevin Darlington 2014-03-08 19:22:14 -05:00
parent 9d7a29fe26
commit de91c999f7
2 changed files with 83 additions and 13 deletions

View File

@ -2,6 +2,8 @@ package worker
import ( import (
"bufio" "bufio"
"bytes"
"encoding/binary"
"net" "net"
"sync" "sync"
) )
@ -51,7 +53,7 @@ func (a *agent) work() {
var err error var err error
var data, leftdata []byte var data, leftdata []byte
for { for {
if data, err = a.read(bufferSize); err != nil { if data, err = a.read(); err != nil {
if opErr, ok := err.(*net.OpError); ok { if opErr, ok := err.(*net.OpError); ok {
if opErr.Timeout() { if opErr.Timeout() {
a.worker.err(err) a.worker.err(err)
@ -121,20 +123,31 @@ func (a *agent) PreSleep() {
} }
// read length bytes from the socket // read length bytes from the socket
func (a *agent) read(length int) (data []byte, err error) { func (a *agent) read() (data []byte, err error) {
n := 0 n := 0
buf := getBuffer(bufferSize)
// read until data can be unpacked tmp := getBuffer(bufferSize)
for i := length; i > 0 || len(data) < minPacketLength; i -= n { var buf bytes.Buffer
if n, err = a.rw.Read(buf); err != nil {
return // read the header so we can get the length of the data
} if n, err = a.rw.Read(tmp); err != nil {
data = append(data, buf[0:n]...) return
if n < bufferSize {
break
}
} }
return dl := int(binary.BigEndian.Uint32(tmp[8:12]))
// write what we read so far
buf.Write(tmp[:n])
// read until we receive all the data
for buf.Len() < dl+minPacketLength {
if n, err = a.rw.Read(tmp); err != nil {
return buf.Bytes(), err
}
buf.Write(tmp[:n])
}
return buf.Bytes(), err
} }
// Internal write the encoded job. // Internal write the encoded job.

View File

@ -1,6 +1,7 @@
package worker package worker
import ( import (
"bytes"
"sync" "sync"
"testing" "testing"
) )
@ -77,6 +78,62 @@ func TestWork(t *testing.T) {
wg.Wait() wg.Wait()
} }
func TestLargeDataWork(t *testing.T) {
worker := New(Unlimited)
defer worker.Close()
if err := worker.AddServer(Network, "127.0.0.1:4730"); err != nil {
t.Error(err)
}
worker.Ready()
l := 5714
var wg sync.WaitGroup
bigdataHandler := func(job Job) error {
defer wg.Done()
if len(job.Data()) != l {
t.Errorf("expected length %d. got %d.", l, len(job.Data()))
}
return nil
}
if err := worker.AddFunc("bigdata", foobar, 0); err != nil {
defer wg.Done()
t.Error(err)
}
worker.JobHandler = bigdataHandler
worker.ErrorHandler = func(err error) {
t.Fatal("shouldn't have received an error")
}
if err := worker.Ready(); err != nil {
t.Error(err)
return
}
go worker.Work()
wg.Add(1)
// var cli *client.Client
// var err error
// if cli, err = client.New(client.Network, "127.0.0.1:4730"); err != nil {
// t.Fatal(err)
// }
// cli.ErrorHandler = func(e error) {
// t.Error(e)
// }
// _, err = cli.Do("bigdata", bytes.Repeat([]byte("a"), l), client.JobLow, func(res *client.Response) {
// })
// if err != nil {
// t.Error(err)
// }
worker.Echo(bytes.Repeat([]byte("a"), l))
wg.Wait()
}
func TestWorkerClose(t *testing.T) { func TestWorkerClose(t *testing.T) {
worker.Close() worker.Close()
} }