You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

285 lines
6.2 KiB

  1. // The client package helps developers connect to Gearmand, send
  2. // jobs and fetch result.
  3. package client
  4. import (
  5. "bufio"
  6. "io"
  7. "net"
  8. "sync"
  9. )
  10. // One client connect to one server.
  11. // Use Pool for multi-connections.
  12. type Client struct {
  13. sync.Mutex
  14. net, addr, lastcall string
  15. respHandler map[string]ResponseHandler
  16. innerHandler map[string]ResponseHandler
  17. in chan *Response
  18. isConn bool
  19. conn net.Conn
  20. rw *bufio.ReadWriter
  21. ErrorHandler ErrorHandler
  22. }
  23. // Return a client.
  24. func New(network, addr string) (client *Client, err error) {
  25. client = &Client{
  26. net: network,
  27. addr: addr,
  28. respHandler: make(map[string]ResponseHandler, queueSize),
  29. innerHandler: make(map[string]ResponseHandler, queueSize),
  30. in: make(chan *Response, queueSize),
  31. }
  32. client.conn, err = net.Dial(client.net, client.addr)
  33. if err != nil {
  34. return
  35. }
  36. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  37. bufio.NewWriter(client.conn))
  38. client.isConn = true
  39. go client.readLoop()
  40. go client.processLoop()
  41. return
  42. }
  43. func (client *Client) write(req *request) (err error) {
  44. var n int
  45. buf := req.Encode()
  46. for i := 0; i < len(buf); i += n {
  47. n, err = client.rw.Write(buf[i:])
  48. if err != nil {
  49. return
  50. }
  51. }
  52. return client.rw.Flush()
  53. }
  54. func (client *Client) read(length int) (data []byte, err error) {
  55. n := 0
  56. buf := getBuffer(bufferSize)
  57. // read until data can be unpacked
  58. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  59. if n, err = client.rw.Read(buf); err != nil {
  60. if err == io.EOF {
  61. err = ErrLostConn
  62. }
  63. return
  64. }
  65. data = append(data, buf[0:n]...)
  66. if n < bufferSize {
  67. break
  68. }
  69. }
  70. return
  71. }
  72. func (client *Client) readLoop() {
  73. defer close(client.in)
  74. var data, leftdata []byte
  75. var err error
  76. var resp *Response
  77. ReadLoop:
  78. for {
  79. if data, err = client.read(bufferSize); err != nil {
  80. client.err(err)
  81. if err == ErrLostConn {
  82. break
  83. }
  84. // If it is unexpected error and the connection wasn't
  85. // closed by Gearmand, the client should close the conection
  86. // and reconnect to job server.
  87. client.Close()
  88. client.conn, err = net.Dial(client.net, client.addr)
  89. if err != nil {
  90. client.err(err)
  91. break
  92. }
  93. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  94. bufio.NewWriter(client.conn))
  95. continue
  96. }
  97. if len(leftdata) > 0 { // some data left for processing
  98. data = append(leftdata, data...)
  99. }
  100. for {
  101. l := len(data)
  102. if l < minPacketLength { // not enough data
  103. leftdata = data
  104. continue ReadLoop
  105. }
  106. if resp, l, err = decodeResponse(data); err != nil {
  107. leftdata = data[l:]
  108. continue ReadLoop
  109. } else {
  110. client.in <- resp
  111. }
  112. data = data[l:]
  113. if len(data) > 0 {
  114. continue
  115. }
  116. break
  117. }
  118. }
  119. }
  120. func (client *Client) processLoop() {
  121. for resp := range client.in {
  122. switch resp.DataType {
  123. case dtError:
  124. if client.lastcall != "" {
  125. resp = client.handleInner(client.lastcall, resp)
  126. client.lastcall = ""
  127. } else {
  128. client.err(getError(resp.Data))
  129. }
  130. case dtStatusRes:
  131. resp = client.handleInner("s"+resp.Handle, resp)
  132. case dtJobCreated:
  133. resp = client.handleInner("c", resp)
  134. case dtEchoRes:
  135. resp = client.handleInner("e", resp)
  136. case dtWorkData, dtWorkWarning, dtWorkStatus:
  137. resp = client.handleResponse(resp.Handle, resp)
  138. case dtWorkComplete, dtWorkFail, dtWorkException:
  139. resp = client.handleResponse(resp.Handle, resp)
  140. if resp != nil {
  141. delete(client.respHandler, resp.Handle)
  142. }
  143. }
  144. }
  145. }
  146. func (client *Client) err(e error) {
  147. if client.ErrorHandler != nil {
  148. client.ErrorHandler(e)
  149. }
  150. }
  151. func (client *Client) handleResponse(key string, resp *Response) *Response {
  152. if h, ok := client.respHandler[key]; ok {
  153. h(resp)
  154. return nil
  155. }
  156. return resp
  157. }
  158. func (client *Client) handleInner(key string, resp *Response) *Response {
  159. if h, ok := client.innerHandler[key]; ok {
  160. h(resp)
  161. delete(client.innerHandler, key)
  162. return nil
  163. }
  164. return resp
  165. }
  166. func (client *Client) do(funcname string, data []byte,
  167. flag uint32) (handle string, err error) {
  168. var mutex sync.Mutex
  169. mutex.Lock()
  170. client.lastcall = "c"
  171. client.innerHandler["c"] = func(resp *Response) {
  172. if resp.DataType == dtError {
  173. err = getError(resp.Data)
  174. return
  175. }
  176. handle = resp.Handle
  177. mutex.Unlock()
  178. }
  179. id := IdGen.Id()
  180. req := getJob(id, []byte(funcname), data)
  181. req.DataType = flag
  182. client.write(req)
  183. mutex.Lock()
  184. return
  185. }
  186. // Call the function and get a response.
  187. // flag can be set to: JobLow, JobNormal and JobHigh
  188. func (client *Client) Do(funcname string, data []byte,
  189. flag byte, h ResponseHandler) (handle string, err error) {
  190. var datatype uint32
  191. switch flag {
  192. case JobLow:
  193. datatype = dtSubmitJobLow
  194. case JobHigh:
  195. datatype = dtSubmitJobHigh
  196. default:
  197. datatype = dtSubmitJob
  198. }
  199. handle, err = client.do(funcname, data, datatype)
  200. if h != nil {
  201. client.respHandler[handle] = h
  202. }
  203. return
  204. }
  205. // Call the function in background, no response needed.
  206. // flag can be set to: JobLow, JobNormal and JobHigh
  207. func (client *Client) DoBg(funcname string, data []byte,
  208. flag byte) (handle string, err error) {
  209. var datatype uint32
  210. switch flag {
  211. case JobLow:
  212. datatype = dtSubmitJobLowBg
  213. case JobHigh:
  214. datatype = dtSubmitJobHighBg
  215. default:
  216. datatype = dtSubmitJobBg
  217. }
  218. handle, err = client.do(funcname, data, datatype)
  219. return
  220. }
  221. // Get job status from job server.
  222. func (client *Client) Status(handle string) (status *Status, err error) {
  223. var mutex sync.Mutex
  224. mutex.Lock()
  225. client.lastcall = "s" + handle
  226. client.innerHandler["s"+handle] = func(resp *Response) {
  227. var err error
  228. status, err = resp._status()
  229. if err != nil {
  230. client.err(err)
  231. }
  232. mutex.Unlock()
  233. }
  234. req := getRequest()
  235. req.DataType = dtGetStatus
  236. req.Data = []byte(handle)
  237. client.write(req)
  238. mutex.Lock()
  239. return
  240. }
  241. // Echo.
  242. func (client *Client) Echo(data []byte) (echo []byte, err error) {
  243. var mutex sync.Mutex
  244. mutex.Lock()
  245. client.innerHandler["e"] = func(resp *Response) {
  246. echo = resp.Data
  247. mutex.Unlock()
  248. }
  249. req := getRequest()
  250. req.DataType = dtEchoReq
  251. req.Data = data
  252. client.lastcall = "e"
  253. client.write(req)
  254. mutex.Lock()
  255. return
  256. }
  257. // Close connection
  258. func (client *Client) Close() (err error) {
  259. client.Lock()
  260. defer client.Unlock()
  261. if client.conn != nil {
  262. err = client.conn.Close()
  263. client.conn = nil
  264. }
  265. return
  266. }