You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

295 lines
6.5 KiB

  1. // The client package helps developers connect to Gearmand, send
  2. // jobs and fetch result.
  3. package client
  4. import (
  5. "bufio"
  6. "net"
  7. "sync"
  8. )
  9. // One client connect to one server.
  10. // Use Pool for multi-connections.
  11. type Client struct {
  12. sync.Mutex
  13. net, addr, lastcall string
  14. respHandler map[string]ResponseHandler
  15. innerHandler map[string]ResponseHandler
  16. in chan *Response
  17. conn net.Conn
  18. rw *bufio.ReadWriter
  19. ErrorHandler ErrorHandler
  20. }
  21. // Return a client.
  22. func New(network, addr string) (client *Client, err error) {
  23. client = &Client{
  24. net: network,
  25. addr: addr,
  26. respHandler: make(map[string]ResponseHandler, queueSize),
  27. innerHandler: make(map[string]ResponseHandler, queueSize),
  28. in: make(chan *Response, queueSize),
  29. }
  30. client.conn, err = net.Dial(client.net, client.addr)
  31. if err != nil {
  32. return
  33. }
  34. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  35. bufio.NewWriter(client.conn))
  36. go client.readLoop()
  37. go client.processLoop()
  38. return
  39. }
  40. func (client *Client) write(req *request) (err error) {
  41. var n int
  42. buf := req.Encode()
  43. for i := 0; i < len(buf); i += n {
  44. n, err = client.rw.Write(buf[i:])
  45. if err != nil {
  46. return
  47. }
  48. }
  49. return client.rw.Flush()
  50. }
  51. func (client *Client) read(length int) (data []byte, err error) {
  52. n := 0
  53. buf := getBuffer(bufferSize)
  54. // read until data can be unpacked
  55. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  56. if n, err = client.rw.Read(buf); err != nil {
  57. return
  58. }
  59. data = append(data, buf[0:n]...)
  60. if n < bufferSize {
  61. break
  62. }
  63. }
  64. return
  65. }
  66. func (client *Client) readLoop() {
  67. defer close(client.in)
  68. var data, leftdata []byte
  69. var err error
  70. var resp *Response
  71. ReadLoop:
  72. for client.conn != nil {
  73. if data, err = client.read(bufferSize); err != nil {
  74. if opErr, ok := err.(*net.OpError); ok {
  75. if opErr.Timeout() {
  76. client.err(err)
  77. }
  78. if opErr.Temporary() {
  79. continue
  80. }
  81. break
  82. }
  83. client.err(err)
  84. // If it is unexpected error and the connection wasn't
  85. // closed by Gearmand, the client should close the conection
  86. // and reconnect to job server.
  87. client.Close()
  88. client.conn, err = net.Dial(client.net, client.addr)
  89. if err != nil {
  90. client.err(err)
  91. break
  92. }
  93. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  94. bufio.NewWriter(client.conn))
  95. continue
  96. }
  97. if len(leftdata) > 0 { // some data left for processing
  98. data = append(leftdata, data...)
  99. }
  100. for {
  101. l := len(data)
  102. if l < minPacketLength { // not enough data
  103. leftdata = data
  104. continue ReadLoop
  105. }
  106. if resp, l, err = decodeResponse(data); err != nil {
  107. leftdata = data[l:]
  108. continue ReadLoop
  109. } else {
  110. client.in <- resp
  111. }
  112. data = data[l:]
  113. if len(data) > 0 {
  114. continue
  115. }
  116. break
  117. }
  118. }
  119. }
  120. func (client *Client) processLoop() {
  121. for resp := range client.in {
  122. switch resp.DataType {
  123. case dtError:
  124. if client.lastcall != "" {
  125. resp = client.handleInner(client.lastcall, resp)
  126. client.lastcall = ""
  127. } else {
  128. client.err(getError(resp.Data))
  129. }
  130. case dtStatusRes:
  131. resp = client.handleInner("s"+resp.Handle, resp)
  132. case dtJobCreated:
  133. resp = client.handleInner("c", resp)
  134. case dtEchoRes:
  135. resp = client.handleInner("e", resp)
  136. case dtWorkData, dtWorkWarning, dtWorkStatus:
  137. resp = client.handleResponse(resp.Handle, resp)
  138. case dtWorkComplete, dtWorkFail, dtWorkException:
  139. client.handleResponse(resp.Handle, resp)
  140. delete(client.respHandler, resp.Handle)
  141. }
  142. }
  143. }
  144. func (client *Client) err(e error) {
  145. if client.ErrorHandler != nil {
  146. client.ErrorHandler(e)
  147. }
  148. }
  149. func (client *Client) handleResponse(key string, resp *Response) *Response {
  150. if h, ok := client.respHandler[key]; ok {
  151. h(resp)
  152. return nil
  153. }
  154. return resp
  155. }
  156. func (client *Client) handleInner(key string, resp *Response) *Response {
  157. if h, ok := client.innerHandler[key]; ok {
  158. h(resp)
  159. delete(client.innerHandler, key)
  160. return nil
  161. }
  162. return resp
  163. }
  164. func (client *Client) do(funcname string, data []byte,
  165. flag uint32) (handle string, err error) {
  166. if client.conn == nil {
  167. return "", ErrLostConn
  168. }
  169. var mutex sync.Mutex
  170. mutex.Lock()
  171. client.lastcall = "c"
  172. client.innerHandler["c"] = func(resp *Response) {
  173. defer mutex.Unlock()
  174. if resp.DataType == dtError {
  175. err = getError(resp.Data)
  176. return
  177. }
  178. handle = resp.Handle
  179. }
  180. id := IdGen.Id()
  181. req := getJob(id, []byte(funcname), data)
  182. req.DataType = flag
  183. client.write(req)
  184. mutex.Lock()
  185. return
  186. }
  187. // Call the function and get a response.
  188. // flag can be set to: JobLow, JobNormal and JobHigh
  189. func (client *Client) Do(funcname string, data []byte,
  190. flag byte, h ResponseHandler) (handle string, err error) {
  191. var datatype uint32
  192. switch flag {
  193. case JobLow:
  194. datatype = dtSubmitJobLow
  195. case JobHigh:
  196. datatype = dtSubmitJobHigh
  197. default:
  198. datatype = dtSubmitJob
  199. }
  200. handle, err = client.do(funcname, data, datatype)
  201. if err == nil && h != nil {
  202. client.respHandler[handle] = h
  203. }
  204. return
  205. }
  206. // Call the function in background, no response needed.
  207. // flag can be set to: JobLow, JobNormal and JobHigh
  208. func (client *Client) DoBg(funcname string, data []byte,
  209. flag byte) (handle string, err error) {
  210. if client.conn == nil {
  211. return "", ErrLostConn
  212. }
  213. var datatype uint32
  214. switch flag {
  215. case JobLow:
  216. datatype = dtSubmitJobLowBg
  217. case JobHigh:
  218. datatype = dtSubmitJobHighBg
  219. default:
  220. datatype = dtSubmitJobBg
  221. }
  222. handle, err = client.do(funcname, data, datatype)
  223. return
  224. }
  225. // Get job status from job server.
  226. func (client *Client) Status(handle string) (status *Status, err error) {
  227. if client.conn == nil {
  228. return nil, ErrLostConn
  229. }
  230. var mutex sync.Mutex
  231. mutex.Lock()
  232. client.lastcall = "s" + handle
  233. client.innerHandler["s"+handle] = func(resp *Response) {
  234. defer mutex.Unlock()
  235. var err error
  236. status, err = resp._status()
  237. if err != nil {
  238. client.err(err)
  239. }
  240. }
  241. req := getRequest()
  242. req.DataType = dtGetStatus
  243. req.Data = []byte(handle)
  244. client.write(req)
  245. mutex.Lock()
  246. return
  247. }
  248. // Echo.
  249. func (client *Client) Echo(data []byte) (echo []byte, err error) {
  250. if client.conn == nil {
  251. return nil, ErrLostConn
  252. }
  253. var mutex sync.Mutex
  254. mutex.Lock()
  255. client.innerHandler["e"] = func(resp *Response) {
  256. echo = resp.Data
  257. mutex.Unlock()
  258. }
  259. req := getRequest()
  260. req.DataType = dtEchoReq
  261. req.Data = data
  262. client.lastcall = "e"
  263. client.write(req)
  264. mutex.Lock()
  265. return
  266. }
  267. // Close connection
  268. func (client *Client) Close() (err error) {
  269. client.Lock()
  270. defer client.Unlock()
  271. if client.conn != nil {
  272. err = client.conn.Close()
  273. client.conn = nil
  274. }
  275. return
  276. }