You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

361 lines
8.3 KiB

  1. // The client package helps developers connect to Gearmand, send
  2. // jobs and fetch result.
  3. package client
  4. import (
  5. "bufio"
  6. "net"
  7. "sync"
  8. "time"
  9. )
  10. var (
  11. DefaultTimeout time.Duration = time.Second
  12. )
  13. // One client connect to one server.
  14. // Use Pool for multi-connections.
  15. type Client struct {
  16. sync.Mutex
  17. net, addr string
  18. innerHandler *responseHandlerMap
  19. in chan *Response
  20. conn net.Conn
  21. rw *bufio.ReadWriter
  22. ResponseTimeout time.Duration // response timeout for do()
  23. ErrorHandler ErrorHandler
  24. }
  25. type responseHandlerMap struct {
  26. sync.Mutex
  27. holder map[string]handledResponse
  28. }
  29. type handledResponse struct {
  30. internal ResponseHandler // internal handler, always non-nil
  31. external ResponseHandler // handler passed in from (*Client).Do, sometimes nil
  32. }
  33. func newResponseHandlerMap() *responseHandlerMap {
  34. return &responseHandlerMap{holder: make(map[string]handledResponse, queueSize)}
  35. }
  36. func (r *responseHandlerMap) remove(key string) {
  37. r.Lock()
  38. delete(r.holder, key)
  39. r.Unlock()
  40. }
  41. func (r *responseHandlerMap) getAndRemove(key string) (handledResponse, bool) {
  42. r.Lock()
  43. rh, b := r.holder[key]
  44. delete(r.holder, key)
  45. r.Unlock()
  46. return rh, b
  47. }
  48. func (r *responseHandlerMap) putWithExternalHandler(key string, internal, external ResponseHandler) {
  49. r.Lock()
  50. r.holder[key] = handledResponse{internal: internal, external: external}
  51. r.Unlock()
  52. }
  53. func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
  54. r.putWithExternalHandler(key, rh, nil)
  55. }
  56. // New returns a client.
  57. func New(network, addr string) (client *Client, err error) {
  58. client = &Client{
  59. net: network,
  60. addr: addr,
  61. innerHandler: newResponseHandlerMap(),
  62. in: make(chan *Response, queueSize),
  63. ResponseTimeout: DefaultTimeout,
  64. }
  65. client.conn, err = net.Dial(client.net, client.addr)
  66. if err != nil {
  67. return
  68. }
  69. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  70. bufio.NewWriter(client.conn))
  71. go client.readLoop()
  72. go client.processLoop()
  73. return
  74. }
  75. func (client *Client) write(req *request) (err error) {
  76. var n int
  77. buf := req.Encode()
  78. for i := 0; i < len(buf); i += n {
  79. n, err = client.rw.Write(buf[i:])
  80. if err != nil {
  81. return
  82. }
  83. }
  84. return client.rw.Flush()
  85. }
  86. func (client *Client) read(length int) (data []byte, err error) {
  87. n := 0
  88. buf := getBuffer(bufferSize)
  89. // read until data can be unpacked
  90. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  91. if n, err = client.rw.Read(buf); err != nil {
  92. return
  93. }
  94. data = append(data, buf[0:n]...)
  95. if n < bufferSize {
  96. break
  97. }
  98. }
  99. return
  100. }
  101. func (client *Client) readLoop() {
  102. defer close(client.in)
  103. var data, leftdata []byte
  104. var err error
  105. var resp *Response
  106. ReadLoop:
  107. for client.conn != nil {
  108. if data, err = client.read(bufferSize); err != nil {
  109. if opErr, ok := err.(*net.OpError); ok {
  110. if opErr.Timeout() {
  111. client.err(err)
  112. }
  113. if opErr.Temporary() {
  114. continue
  115. }
  116. break
  117. }
  118. client.err(err)
  119. // If it is unexpected error and the connection wasn't
  120. // closed by Gearmand, the client should close the conection
  121. // and reconnect to job server.
  122. client.Close()
  123. client.conn, err = net.Dial(client.net, client.addr)
  124. if err != nil {
  125. client.err(err)
  126. break
  127. }
  128. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  129. bufio.NewWriter(client.conn))
  130. continue
  131. }
  132. if len(leftdata) > 0 { // some data left for processing
  133. data = append(leftdata, data...)
  134. leftdata = nil
  135. }
  136. for {
  137. l := len(data)
  138. if l < minPacketLength { // not enough data
  139. leftdata = data
  140. continue ReadLoop
  141. }
  142. if resp, l, err = decodeResponse(data); err != nil {
  143. leftdata = data[l:]
  144. continue ReadLoop
  145. } else {
  146. client.in <- resp
  147. }
  148. data = data[l:]
  149. if len(data) > 0 {
  150. continue
  151. }
  152. break
  153. }
  154. }
  155. }
  156. func (client *Client) processLoop() {
  157. rhandlers := map[string]ResponseHandler{}
  158. for resp := range client.in {
  159. switch resp.DataType {
  160. case dtError:
  161. client.err(getError(resp.Data))
  162. case dtStatusRes:
  163. client.handleInner("s"+resp.Handle, resp, nil)
  164. case dtJobCreated:
  165. client.handleInner("c", resp, rhandlers)
  166. case dtEchoRes:
  167. client.handleInner("e", resp, nil)
  168. case dtWorkData, dtWorkWarning, dtWorkStatus:
  169. if cb := rhandlers[resp.Handle]; cb != nil {
  170. cb(resp)
  171. }
  172. case dtWorkComplete, dtWorkFail, dtWorkException:
  173. if cb := rhandlers[resp.Handle]; cb != nil {
  174. cb(resp)
  175. delete(rhandlers, resp.Handle)
  176. }
  177. }
  178. }
  179. }
  180. func (client *Client) err(e error) {
  181. if client.ErrorHandler != nil {
  182. client.ErrorHandler(e)
  183. }
  184. }
  185. func (client *Client) handleInner(key string, resp *Response, rhandlers map[string]ResponseHandler) {
  186. if h, ok := client.innerHandler.getAndRemove(key); ok {
  187. if h.external != nil && resp.Handle != "" {
  188. rhandlers[resp.Handle] = h.external
  189. }
  190. h.internal(resp)
  191. }
  192. }
  193. type handleOrError struct {
  194. handle string
  195. err error
  196. }
  197. func (client *Client) do(funcname string, data []byte,
  198. flag uint32, h ResponseHandler, id string) (handle string, err error) {
  199. if len(id) == 0 {
  200. return "", ErrInvalidId
  201. }
  202. if client.conn == nil {
  203. return "", ErrLostConn
  204. }
  205. var result = make(chan handleOrError, 1)
  206. client.Lock()
  207. defer client.Unlock()
  208. client.innerHandler.putWithExternalHandler("c", func(resp *Response) {
  209. if resp.DataType == dtError {
  210. err = getError(resp.Data)
  211. result <- handleOrError{"", err}
  212. return
  213. }
  214. handle = resp.Handle
  215. result <- handleOrError{handle, nil}
  216. }, h)
  217. req := getJob(id, []byte(funcname), data)
  218. req.DataType = flag
  219. if err = client.write(req); err != nil {
  220. client.innerHandler.remove("c")
  221. return
  222. }
  223. var timer = time.After(client.ResponseTimeout)
  224. select {
  225. case ret := <-result:
  226. return ret.handle, ret.err
  227. case <-timer:
  228. client.innerHandler.remove("c")
  229. return "", ErrLostConn
  230. }
  231. return
  232. }
  233. // Call the function and get a response.
  234. // flag can be set to: JobLow, JobNormal and JobHigh
  235. func (client *Client) Do(funcname string, data []byte,
  236. flag byte, h ResponseHandler) (handle string, err error) {
  237. handle, err = client.DoWithId(funcname, data, flag, h, IdGen.Id())
  238. return
  239. }
  240. // Call the function in background, no response needed.
  241. // flag can be set to: JobLow, JobNormal and JobHigh
  242. func (client *Client) DoBg(funcname string, data []byte,
  243. flag byte) (handle string, err error) {
  244. handle, err = client.DoBgWithId(funcname, data, flag, IdGen.Id())
  245. return
  246. }
  247. // Status gets job status from job server.
  248. func (client *Client) Status(handle string) (status *Status, err error) {
  249. if client.conn == nil {
  250. return nil, ErrLostConn
  251. }
  252. var mutex sync.Mutex
  253. mutex.Lock()
  254. client.innerHandler.put("s"+handle, func(resp *Response) {
  255. defer mutex.Unlock()
  256. var err error
  257. status, err = resp._status()
  258. if err != nil {
  259. client.err(err)
  260. }
  261. })
  262. req := getRequest()
  263. req.DataType = dtGetStatus
  264. req.Data = []byte(handle)
  265. client.write(req)
  266. mutex.Lock()
  267. return
  268. }
  269. // Echo.
  270. func (client *Client) Echo(data []byte) (echo []byte, err error) {
  271. if client.conn == nil {
  272. return nil, ErrLostConn
  273. }
  274. var mutex sync.Mutex
  275. mutex.Lock()
  276. client.innerHandler.put("e", func(resp *Response) {
  277. echo = resp.Data
  278. mutex.Unlock()
  279. })
  280. req := getRequest()
  281. req.DataType = dtEchoReq
  282. req.Data = data
  283. client.write(req)
  284. mutex.Lock()
  285. return
  286. }
  287. // Close connection
  288. func (client *Client) Close() (err error) {
  289. client.Lock()
  290. defer client.Unlock()
  291. if client.conn != nil {
  292. err = client.conn.Close()
  293. client.conn = nil
  294. }
  295. return
  296. }
  297. // Call the function and get a response.
  298. // flag can be set to: JobLow, JobNormal and JobHigh
  299. func (client *Client) DoWithId(funcname string, data []byte,
  300. flag byte, h ResponseHandler, id string) (handle string, err error) {
  301. var datatype uint32
  302. switch flag {
  303. case JobLow:
  304. datatype = dtSubmitJobLow
  305. case JobHigh:
  306. datatype = dtSubmitJobHigh
  307. default:
  308. datatype = dtSubmitJob
  309. }
  310. handle, err = client.do(funcname, data, datatype, h, id)
  311. return
  312. }
  313. // Call the function in background, no response needed.
  314. // flag can be set to: JobLow, JobNormal and JobHigh
  315. func (client *Client) DoBgWithId(funcname string, data []byte,
  316. flag byte, id string) (handle string, err error) {
  317. if client.conn == nil {
  318. return "", ErrLostConn
  319. }
  320. var datatype uint32
  321. switch flag {
  322. case JobLow:
  323. datatype = dtSubmitJobLowBg
  324. case JobHigh:
  325. datatype = dtSubmitJobHighBg
  326. default:
  327. datatype = dtSubmitJobBg
  328. }
  329. handle, err = client.do(funcname, data, datatype, nil, id)
  330. return
  331. }