// The client package helps developers connect to Gearmand, send // jobs and fetch result. package client import ( "bufio" "net" "sync" ) // One client connect to one server. // Use Pool for multi-connections. type Client struct { sync.Mutex net, addr, lastcall string respHandler map[string]ResponseHandler innerHandler map[string]ResponseHandler in chan *Response conn net.Conn rw *bufio.ReadWriter ErrorHandler ErrorHandler } // Return a client. func New(network, addr string) (client *Client, err error) { client = &Client{ net: network, addr: addr, respHandler: make(map[string]ResponseHandler, queueSize), innerHandler: make(map[string]ResponseHandler, queueSize), in: make(chan *Response, queueSize), } client.conn, err = net.Dial(client.net, client.addr) if err != nil { return } client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn), bufio.NewWriter(client.conn)) go client.readLoop() go client.processLoop() return } func (client *Client) write(req *request) (err error) { var n int buf := req.Encode() for i := 0; i < len(buf); i += n { n, err = client.rw.Write(buf[i:]) if err != nil { return } } return client.rw.Flush() } func (client *Client) read(length int) (data []byte, err error) { n := 0 buf := getBuffer(bufferSize) // read until data can be unpacked for i := length; i > 0 || len(data) < minPacketLength; i -= n { if n, err = client.rw.Read(buf); err != nil { return } data = append(data, buf[0:n]...) if n < bufferSize { break } } return } func (client *Client) readLoop() { defer close(client.in) var data, leftdata []byte var err error var resp *Response ReadLoop: for client.conn != nil { if data, err = client.read(bufferSize); err != nil { if opErr, ok := err.(*net.OpError); ok { if opErr.Timeout() { client.err(err) } if opErr.Temporary() { continue } break } client.err(err) // If it is unexpected error and the connection wasn't // closed by Gearmand, the client should close the conection // and reconnect to job server. client.Close() client.conn, err = net.Dial(client.net, client.addr) if err != nil { client.err(err) break } client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn), bufio.NewWriter(client.conn)) continue } if len(leftdata) > 0 { // some data left for processing data = append(leftdata, data...) } for { l := len(data) if l < minPacketLength { // not enough data leftdata = data continue ReadLoop } if resp, l, err = decodeResponse(data); err != nil { leftdata = data[l:] continue ReadLoop } else { client.in <- resp } data = data[l:] if len(data) > 0 { continue } break } } } func (client *Client) processLoop() { for resp := range client.in { switch resp.DataType { case dtError: if client.lastcall != "" { resp = client.handleInner(client.lastcall, resp) client.lastcall = "" } else { client.err(getError(resp.Data)) } case dtStatusRes: resp = client.handleInner("s"+resp.Handle, resp) case dtJobCreated: resp = client.handleInner("c", resp) case dtEchoRes: resp = client.handleInner("e", resp) case dtWorkData, dtWorkWarning, dtWorkStatus: resp = client.handleResponse(resp.Handle, resp) case dtWorkComplete, dtWorkFail, dtWorkException: resp = client.handleResponse(resp.Handle, resp) if resp != nil { delete(client.respHandler, resp.Handle) } } } } func (client *Client) err(e error) { if client.ErrorHandler != nil { client.ErrorHandler(e) } } func (client *Client) handleResponse(key string, resp *Response) *Response { if h, ok := client.respHandler[key]; ok { h(resp) return nil } return resp } func (client *Client) handleInner(key string, resp *Response) *Response { if h, ok := client.innerHandler[key]; ok { h(resp) delete(client.innerHandler, key) return nil } return resp } func (client *Client) do(funcname string, data []byte, flag uint32) (handle string, err error) { if client.conn == nil { return "", ErrLostConn } var mutex sync.Mutex mutex.Lock() client.lastcall = "c" client.innerHandler["c"] = func(resp *Response) { defer mutex.Unlock() if resp.DataType == dtError { err = getError(resp.Data) return } handle = resp.Handle } id := IdGen.Id() req := getJob(id, []byte(funcname), data) req.DataType = flag client.write(req) mutex.Lock() return } // Call the function and get a response. // flag can be set to: JobLow, JobNormal and JobHigh func (client *Client) Do(funcname string, data []byte, flag byte, h ResponseHandler) (handle string, err error) { var datatype uint32 switch flag { case JobLow: datatype = dtSubmitJobLow case JobHigh: datatype = dtSubmitJobHigh default: datatype = dtSubmitJob } handle, err = client.do(funcname, data, datatype) if err == nil && h != nil { client.respHandler[handle] = h } return } // Call the function in background, no response needed. // flag can be set to: JobLow, JobNormal and JobHigh func (client *Client) DoBg(funcname string, data []byte, flag byte) (handle string, err error) { if client.conn == nil { return "", ErrLostConn } var datatype uint32 switch flag { case JobLow: datatype = dtSubmitJobLowBg case JobHigh: datatype = dtSubmitJobHighBg default: datatype = dtSubmitJobBg } handle, err = client.do(funcname, data, datatype) return } // Get job status from job server. func (client *Client) Status(handle string) (status *Status, err error) { if client.conn == nil { return nil, ErrLostConn } var mutex sync.Mutex mutex.Lock() client.lastcall = "s" + handle client.innerHandler["s"+handle] = func(resp *Response) { defer mutex.Unlock() var err error status, err = resp._status() if err != nil { client.err(err) } } req := getRequest() req.DataType = dtGetStatus req.Data = []byte(handle) client.write(req) mutex.Lock() return } // Echo. func (client *Client) Echo(data []byte) (echo []byte, err error) { if client.conn == nil { return nil, ErrLostConn } var mutex sync.Mutex mutex.Lock() client.innerHandler["e"] = func(resp *Response) { echo = resp.Data mutex.Unlock() } req := getRequest() req.DataType = dtEchoReq req.Data = data client.lastcall = "e" client.write(req) mutex.Lock() return } // Close connection func (client *Client) Close() (err error) { client.Lock() defer client.Unlock() if client.conn != nil { err = client.conn.Close() client.conn = nil } return }