diff --git a/client/client.go b/client/client.go index 6c2c626..93a08c2 100644 --- a/client/client.go +++ b/client/client.go @@ -8,6 +8,7 @@ package client import ( "io" "net" + "sync" "time" "bytes" "strconv" @@ -15,8 +16,6 @@ import ( "github.com/mikespook/gearman-go/common" ) -// Job handler -type JobHandler func(*Job) error // Status handler // handle, known, running, numerator, denominator type StatusHandler func(string, bool, bool, uint64, uint64) @@ -31,15 +30,14 @@ handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG) */ type Client struct { ErrHandler common.ErrorHandler - JobHandler JobHandler - StatusHandler StatusHandler TimeOut time.Duration in chan []byte out chan *Job - jobCreated chan *Job - conn net.Conn + conn net.Conn + addr string ai *autoinc.AutoInc + mutex sync.RWMutex } // Create a new client. @@ -47,23 +45,125 @@ type Client struct { // Eg. // client, err := client.New("127.0.0.1:4730") func New(addr string) (client *Client, err error) { - conn, err := net.Dial(common.NETWORK, addr) - if err != nil { - return - } client = &Client{ jobCreated: make(chan *Job), in: make(chan []byte, common.QUEUE_SIZE), out: make(chan *Job, common.QUEUE_SIZE), - conn: conn, + addr: addr, ai: autoinc.New(0, 1), TimeOut: time.Second, } + if err = client.connect(); err != nil { + return + } go client.inLoop() go client.outLoop() return } +// {{{ private functions + +// +func (client *Client) connect() (err error) { + client.mutex.Lock() + defer client.mutex.Unlock() + client.conn, err = net.Dial(common.NETWORK, addr) + return +} + +// Internal write +func (client *Client) write(buf []byte) (err error) { + client.mutex.RLock() + defer client.mutex.RUnlock() + var n int + for i := 0; i < len(buf); i += n { + n, err = client.conn.Write(buf[i:]) + if err != nil { + return + } + } + return +} + +// read length bytes from the socket +func (client *Client) readData(length int) (data []byte, err error) { + client.mutex.RLock() + defer client.mutex.RUnlock() + n := 0 + buf := make([]byte, common.BUFFER_SIZE) + // read until data can be unpacked + for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n { + if n, err = client.conn.Read(buf); err != nil { + if err == io.EOF && n == 0 { + if data == nil { + err = common.ErrConnection + return + } + return data, nil + } + return + } + data = append(data, buf[0:n]...) + if n < common.BUFFER_SIZE { + break + } + } + return +} + +// unpack data +func (client *Client) unpack(data []byte) ([]byte, int, bool) { + tl := len(data) + start := 0 + for i := 0; i < tl+1-common.PACKET_LEN; i++ { + if start+common.PACKET_LEN > tl { // too few data to unpack, read more + return nil, common.PACKET_LEN, false + } + if string(data[start:start+4]) == common.RES_STR { + l := int(common.BytesToUint32([4]byte{data[start+8], + data[start+9], data[start+10], data[start+11]})) + total := l + common.PACKET_LEN + if total == tl { // data is what we want + return data, common.PACKET_LEN, true + } else if total < tl { // data[:total] is what we want, data[total:] is the more + client.in <- data[total:] + data = data[start:total] + return data, common.PACKET_LEN, true + } else { // ops! It won't be possible. + return nil, total - tl, false + } + } else { // flag was not found, move to next step + start++ + } + } + return nil, common.PACKET_LEN, false +} + +// Internal read +func (client *Client) read() (rel []byte, err error) { + var data []byte + ok := false + l := common.PACKET_LEN + for !ok { + inlen := len(client.in) + if inlen > 0 { + // in queue is not empty + for i := 0; i < inlen; i++ { + data = append(data, <-client.in...) + } + } else { + var d []byte + d, err = client.readData(l) + if err != nil { + return + } + data = append(data, d...) + } + rel, l, ok = client.unpack(data) + } + return +} + // out loop func (client *Client) outLoop() { ok := true @@ -98,66 +198,19 @@ func (client *Client) inLoop() { case common.ERROR: _, err := common.GetError(job.Data) client.err(err) - case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, - common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION, - common.ECHO_RES: - go client.handleJob(job) + case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS, + common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION: + client.handleJob(job) + case common.ECHO_RES: + client.handleEcho(job) case common.JOB_CREATED: - client.jobCreated <- job + client.handleCreated(job) case common.STATUS_RES: - go client.handleStatus(job) + client.handleStatus(job) } } } -// inner read -func (client *Client) read() (data []byte, err error) { - if len(client.in) > 0 { - // incoming queue is not empty - data = <-client.in - } else { - // empty queue, read data from socket - for { - buf := make([]byte, common.BUFFER_SIZE) - var n int - if n, err = client.conn.Read(buf); err != nil { - if err == io.EOF && n == 0 { - if data == nil { - err = common.ErrConnection - return - } - break - } - return - } - data = append(data, buf[0:n]...) - if n < common.BUFFER_SIZE { - break - } - } - } - // split package - tl := len(data) - start, end := 0, 4 - for i := 0; i < tl; i++ { - if string(data[start:end]) == common.RES_STR { - l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]})) - total := l + 12 - if total == tl { - return - } else { - client.in <- data[total:] - data = data[:total] - return - } - } else { - start++ - end++ - } - } - return nil, common.Errorf("Invalid data: %V", data) -} - // error handler func (client *Client) err (e error) { if client.ErrHandler != nil { @@ -167,16 +220,11 @@ func (client *Client) err (e error) { // job handler func (client *Client) handleJob(job *Job) { - if client.JobHandler != nil { - if err := client.JobHandler(job); err != nil { - client.err(err) - } - } + } // status handler func (client *Client) handleStatus(job *Job) { - if client.StatusHandler != nil { data := bytes.SplitN(job.Data, []byte{'\x00'}, 5) if len(data) != 5 { client.err(common.Errorf("Invalid data: %V", job.Data)) @@ -195,65 +243,6 @@ func (client *Client) handleStatus(job *Job) { client.err(common.Errorf("Invalid handle: %s", data[4][0])) return } - client.StatusHandler(handle, known, running, numerator, denominator) - } -} - -// Do the function. -// funcname is a string with function name. -// data is encoding to byte array. -// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, -// and if it is background job: JOB_BG. -// JOB_LOW | JOB_BG means the job is running with low level in background. -func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) { - var datatype uint32 - if flag & JOB_LOW == JOB_LOW { - if flag & JOB_BG == JOB_BG { - datatype = common.SUBMIT_JOB_LOW_BG - } else { - datatype = common.SUBMIT_JOB_LOW - } - } else if flag & JOB_HIGH == JOB_HIGH { - if flag & JOB_BG == JOB_BG { - datatype = common.SUBMIT_JOB_HIGH_BG - } else { - datatype = common.SUBMIT_JOB_HIGH - } - } else if flag & JOB_BG == JOB_BG { - datatype = common.SUBMIT_JOB_BG - } else { - datatype = common.SUBMIT_JOB - } - - uid := strconv.Itoa(int(client.ai.Id())) - l := len(funcname) + len(uid) + len(data) + 2 - rel := make([]byte, 0, l) - rel = append(rel, []byte(funcname)...) // len(funcname) - rel = append(rel, '\x00') // 1 Byte - rel = append(rel, []byte(uid)...) // len(uid) - rel = append(rel, '\x00') // 1 Byte - rel = append(rel, data...) // len(data) - client.writeJob(newJob(common.REQ, datatype, rel)) - // Waiting for JOB_CREATED - select { - case job := <-client.jobCreated: - return string(job.Data), nil - case <-time.After(client.TimeOut): - return "", common.ErrJobTimeOut - } - return -} - -// Get job status from job server. -// !!!Not fully tested.!!! -func (client *Client) Status(handle string) { - job := newJob(common.REQ, common.GET_STATUS, []byte(handle)) - client.writeJob(job) -} - -// Send a something out, get the samething back. -func (client *Client) Echo(data []byte) { - client.writeJob(newJob(common.REQ, common.ECHO_REQ, data)) } // Send the job to job server. @@ -261,21 +250,71 @@ func (client *Client) writeJob(job *Job) { client.out <- job } -// Internal write -func (client *Client) write(buf []byte) (err error) { - var n int - for i := 0; i < len(buf); i += n { - n, err = client.conn.Write(buf[i:]) - if err != nil { - return - } - } +// Internal do +func (client *Client) do(funcname string, data []byte, flag uint32) (id int) { + id = strconv.Itoa(int(client.ai.Id())) + l := len(funcname) + len(id) + len(data) + 2 + rel := make([]byte, 0, l) + rel = append(rel, []byte(funcname)...) // len(funcname) + rel = append(rel, '\x00') // 1 Byte + rel = append(rel, []byte(id)...) // len(uid) + rel = append(rel, '\x00') // 1 Byte + rel = append(rel, data...) // len(data) + client.writeJob(newJob(common.REQ, flag, rel)) return } +// }}} + +// Do the function. +// funcname is a string with function name. +// data is encoding to byte array. +// flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH, +// and if it is background job: JOB_BG. +// JOB_LOW | JOB_BG means the job is running with low level in background. +func (client *Client) Do(funcname string, data []byte, flag byte) +(handle string, err error) { + var datatype uint32 + switch flag { + case JOB_LOW : + datatype = common.SUBMIT_JOB_LOW + case JOB_HIGH : + datatype = common.SUBMIT_JOB_HIGH + default: + datatype = common.SUBMIT_JOB + } + client.do(funcname, data, datatype) +} + +func (client *Client) DoBg(funcname string, data []byte, flag byte) +(handle string, err error) { + var datatype uint32 + switch flag { + case JOB_LOW : + datatype = common.SUBMIT_JOB_LOW_BG + case JOB_HIGH : + datatype = common.SUBMIT_JOB_HIGH_BG + default: + datatype = common.SUBMIT_JOB_BG + } + client.do(funcname, data, datatype) +} + + +// Get job status from job server. +// !!!Not fully tested.!!! +func (client *Client) Status(handle string, handler StatusHandler) { + client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle))) +} + +// Send a something out, get the samething back. +func (client *Client) Echo(data []byte, handler JobHandler) (echo []byte) { + client.writeJob(newJob(common.REQ, common.ECHO_REQ, data)) + client.echo +} + // Close func (client *Client) Close() (err error) { - close(client.jobCreated) close(client.in) close(client.out) return client.conn.Close(); diff --git a/client/client_test.go b/client/client_test.go index b9cbefd..56d04ae 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -13,31 +13,80 @@ func TestClientAddServer(t *testing.T) { t.Error(err) } } -/* + func TestClientEcho(t *testing.T) { - client.JobHandler = func(job *Job) error { + echoHandler = func(job *Job) { echo := string(job.Data) if echo == "Hello world" { t.Log(echo) } else { t.Errorf("Invalid echo data: %s", job.Data) } - return nil + return } - client.Echo([]byte("Hello world")) + client.Echo([]byte("Hello world"), echoHandler) } -*/ -/* -func TestClientDo(t *testing.T) { - if handle, err := client.Do("ToUpper", []byte("abcdef"), JOB_LOW|JOB_BG); err != nil { + +func TestClientDoBg(t *testing.T) { + if handle, err := client.DoBg("ToUpper", []byte("abcdef"), + JOB_LOW); err != nil { t.Error(err) } else { t.Log(handle) } } -*/ + +func TestClientDo(t *testing.T) { + jobHandler := func(job *Job) { + str := string(job.Data) + if str == "ABCDEF" { + t.Log(str) + } else { + t.Errorf("Invalid data: %s", job.Data) + } + return + } + if handle, err := client.Do("ToUpper", []byte("abcdef"), + JOB_LOW, jobHandler); err != nil { + t.Error(err) + } else { + t.Log(handle) + } +} + +func TestClientStatus(t *testing.T) { + statusHandler = func(handler string, known bool, + running bool, numerator uint64, denominator uint64) { + if known { + t.Errorf("The job (%s) shouldn't be known.", handler) + } + if running { + t.Errorf("The job (%s) shouldn't be running.", handler) + } + } + client.Status("handle not exists", statusHandler) + + if handle, err := client.Do("Delay5sec", []byte("abcdef"), + JOB_LOW, nil); err != nil { + t.Error(err) + } else { + t.Log(handle) + + statusHandler = func(handler string, known bool, + running bool, numerator uint64, denominator uint64) { + if !known { + t.Errorf("The job (%s) shouldn be known.", handler) + } + if !running { + t.Errorf("The job (%s) shouldn be running.", handler) + } + } + client.Status(handle, statusHandler) + } +} + + func TestClientClose(t *testing.T) { - return if err := client.Close(); err != nil { t.Error(err) } diff --git a/client/handler.go b/client/handler.go new file mode 100644 index 0000000..9074d4a --- /dev/null +++ b/client/handler.go @@ -0,0 +1,4 @@ +package client + +// Job handler +type JobHandler func(*Job)