Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

358 рядки
7.8 KiB

  1. // The client package helps developers connect to Gearmand, send
  2. // jobs and fetch result.
  3. package client
  4. import (
  5. "bufio"
  6. "net"
  7. "sync"
  8. "time"
  9. )
  10. var (
  11. DefaultTimeout time.Duration = 1000
  12. )
  13. // One client connect to one server.
  14. // Use Pool for multi-connections.
  15. type Client struct {
  16. sync.Mutex
  17. net, addr, lastcall string
  18. respHandler *responseHandlerMap
  19. innerHandler *responseHandlerMap
  20. in chan *Response
  21. conn net.Conn
  22. rw *bufio.ReadWriter
  23. ResponseTimeout time.Duration // response timeout for do() in ms
  24. ErrorHandler ErrorHandler
  25. }
  26. type responseHandlerMap struct {
  27. sync.Mutex
  28. holder map[string]ResponseHandler
  29. }
  30. func newResponseHandlerMap() *responseHandlerMap {
  31. return &responseHandlerMap{holder: make(map[string]ResponseHandler, queueSize)}
  32. }
  33. func (r *responseHandlerMap) remove(key string) {
  34. r.Lock()
  35. delete(r.holder, key)
  36. r.Unlock()
  37. }
  38. func (r *responseHandlerMap) get(key string) (ResponseHandler, bool) {
  39. r.Lock()
  40. rh, b := r.holder[key]
  41. r.Unlock()
  42. return rh, b
  43. }
  44. func (r *responseHandlerMap) put(key string, rh ResponseHandler) {
  45. r.Lock()
  46. r.holder[key] = rh
  47. r.Unlock()
  48. }
  49. func (r *responseHandlerMap) putNoLock(key string, rh ResponseHandler) {
  50. r.holder[key] = rh
  51. }
  52. // Return a client.
  53. func New(network, addr string) (client *Client, err error) {
  54. client = &Client{
  55. net: network,
  56. addr: addr,
  57. respHandler: newResponseHandlerMap(),
  58. innerHandler: newResponseHandlerMap(),
  59. in: make(chan *Response, queueSize),
  60. ResponseTimeout: DefaultTimeout,
  61. }
  62. client.conn, err = net.Dial(client.net, client.addr)
  63. if err != nil {
  64. return
  65. }
  66. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  67. bufio.NewWriter(client.conn))
  68. go client.readLoop()
  69. go client.processLoop()
  70. return
  71. }
  72. func (client *Client) write(req *request) (err error) {
  73. var n int
  74. buf := req.Encode()
  75. for i := 0; i < len(buf); i += n {
  76. n, err = client.rw.Write(buf[i:])
  77. if err != nil {
  78. return
  79. }
  80. }
  81. return client.rw.Flush()
  82. }
  83. func (client *Client) read(length int) (data []byte, err error) {
  84. n := 0
  85. buf := getBuffer(bufferSize)
  86. // read until data can be unpacked
  87. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  88. if n, err = client.rw.Read(buf); err != nil {
  89. return
  90. }
  91. data = append(data, buf[0:n]...)
  92. if n < bufferSize {
  93. break
  94. }
  95. }
  96. return
  97. }
  98. func (client *Client) readLoop() {
  99. defer close(client.in)
  100. var data, leftdata []byte
  101. var err error
  102. var resp *Response
  103. ReadLoop:
  104. for client.conn != nil {
  105. if data, err = client.read(bufferSize); err != nil {
  106. if opErr, ok := err.(*net.OpError); ok {
  107. if opErr.Timeout() {
  108. client.err(err)
  109. }
  110. if opErr.Temporary() {
  111. continue
  112. }
  113. break
  114. }
  115. client.err(err)
  116. // If it is unexpected error and the connection wasn't
  117. // closed by Gearmand, the client should close the conection
  118. // and reconnect to job server.
  119. client.Close()
  120. client.conn, err = net.Dial(client.net, client.addr)
  121. if err != nil {
  122. client.err(err)
  123. break
  124. }
  125. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  126. bufio.NewWriter(client.conn))
  127. continue
  128. }
  129. if len(leftdata) > 0 { // some data left for processing
  130. data = append(leftdata, data...)
  131. leftdata = nil
  132. }
  133. for {
  134. l := len(data)
  135. if l < minPacketLength { // not enough data
  136. leftdata = data
  137. continue ReadLoop
  138. }
  139. if resp, l, err = decodeResponse(data); err != nil {
  140. leftdata = data[l:]
  141. continue ReadLoop
  142. } else {
  143. client.in <- resp
  144. }
  145. data = data[l:]
  146. if len(data) > 0 {
  147. continue
  148. }
  149. break
  150. }
  151. }
  152. }
  153. func (client *Client) processLoop() {
  154. for resp := range client.in {
  155. switch resp.DataType {
  156. case dtError:
  157. if client.lastcall != "" {
  158. resp = client.handleInner(client.lastcall, resp)
  159. client.lastcall = ""
  160. } else {
  161. client.err(getError(resp.Data))
  162. }
  163. case dtStatusRes:
  164. resp = client.handleInner("s"+resp.Handle, resp)
  165. case dtJobCreated:
  166. resp = client.handleInner("c", resp)
  167. case dtEchoRes:
  168. resp = client.handleInner("e", resp)
  169. case dtWorkData, dtWorkWarning, dtWorkStatus:
  170. resp = client.handleResponse(resp.Handle, resp)
  171. case dtWorkComplete, dtWorkFail, dtWorkException:
  172. client.handleResponse(resp.Handle, resp)
  173. client.respHandler.remove(resp.Handle)
  174. }
  175. }
  176. }
  177. func (client *Client) err(e error) {
  178. if client.ErrorHandler != nil {
  179. client.ErrorHandler(e)
  180. }
  181. }
  182. func (client *Client) handleResponse(key string, resp *Response) *Response {
  183. if h, ok := client.respHandler.get(key); ok {
  184. h(resp)
  185. return nil
  186. }
  187. return resp
  188. }
  189. func (client *Client) handleInner(key string, resp *Response) *Response {
  190. if h, ok := client.innerHandler.get(key); ok {
  191. h(resp)
  192. client.innerHandler.remove(key)
  193. return nil
  194. }
  195. return resp
  196. }
  197. type handleOrError struct {
  198. handle string
  199. err error
  200. }
  201. func (client *Client) do(funcname string, data []byte,
  202. flag uint32) (handle string, err error) {
  203. if client.conn == nil {
  204. return "", ErrLostConn
  205. }
  206. client.Lock()
  207. defer client.Unlock()
  208. var result = make(chan handleOrError, 1)
  209. client.lastcall = "c"
  210. client.innerHandler.put("c", func(resp *Response) {
  211. if resp.DataType == dtError {
  212. err = getError(resp.Data)
  213. result <- handleOrError{"", err}
  214. return
  215. }
  216. handle = resp.Handle
  217. result <- handleOrError{handle, nil}
  218. })
  219. id := IdGen.Id()
  220. req := getJob(id, []byte(funcname), data)
  221. req.DataType = flag
  222. if err = client.write(req); err != nil {
  223. client.innerHandler.remove("c")
  224. client.lastcall = ""
  225. return
  226. }
  227. var timer = time.After(client.ResponseTimeout * time.Millisecond)
  228. select {
  229. case ret := <-result:
  230. return ret.handle, ret.err
  231. case <-timer:
  232. client.innerHandler.remove("c")
  233. client.lastcall = ""
  234. return "", ErrLostConn
  235. }
  236. return
  237. }
  238. // Call the function and get a response.
  239. // flag can be set to: JobLow, JobNormal and JobHigh
  240. func (client *Client) Do(funcname string, data []byte,
  241. flag byte, h ResponseHandler) (handle string, err error) {
  242. var datatype uint32
  243. switch flag {
  244. case JobLow:
  245. datatype = dtSubmitJobLow
  246. case JobHigh:
  247. datatype = dtSubmitJobHigh
  248. default:
  249. datatype = dtSubmitJob
  250. }
  251. client.respHandler.Lock()
  252. defer client.respHandler.Unlock()
  253. handle, err = client.do(funcname, data, datatype)
  254. if err == nil && h != nil {
  255. client.respHandler.putNoLock(handle, h)
  256. }
  257. return
  258. }
  259. // Call the function in background, no response needed.
  260. // flag can be set to: JobLow, JobNormal and JobHigh
  261. func (client *Client) DoBg(funcname string, data []byte,
  262. flag byte) (handle string, err error) {
  263. if client.conn == nil {
  264. return "", ErrLostConn
  265. }
  266. var datatype uint32
  267. switch flag {
  268. case JobLow:
  269. datatype = dtSubmitJobLowBg
  270. case JobHigh:
  271. datatype = dtSubmitJobHighBg
  272. default:
  273. datatype = dtSubmitJobBg
  274. }
  275. handle, err = client.do(funcname, data, datatype)
  276. return
  277. }
  278. // Get job status from job server.
  279. func (client *Client) Status(handle string) (status *Status, err error) {
  280. if client.conn == nil {
  281. return nil, ErrLostConn
  282. }
  283. var mutex sync.Mutex
  284. mutex.Lock()
  285. client.lastcall = "s" + handle
  286. client.innerHandler.put("s"+handle, func(resp *Response) {
  287. defer mutex.Unlock()
  288. var err error
  289. status, err = resp._status()
  290. if err != nil {
  291. client.err(err)
  292. }
  293. })
  294. req := getRequest()
  295. req.DataType = dtGetStatus
  296. req.Data = []byte(handle)
  297. client.write(req)
  298. mutex.Lock()
  299. return
  300. }
  301. // Echo.
  302. func (client *Client) Echo(data []byte) (echo []byte, err error) {
  303. if client.conn == nil {
  304. return nil, ErrLostConn
  305. }
  306. var mutex sync.Mutex
  307. mutex.Lock()
  308. client.innerHandler.put("e", func(resp *Response) {
  309. echo = resp.Data
  310. mutex.Unlock()
  311. })
  312. req := getRequest()
  313. req.DataType = dtEchoReq
  314. req.Data = data
  315. client.lastcall = "e"
  316. client.write(req)
  317. mutex.Lock()
  318. return
  319. }
  320. // Close connection
  321. func (client *Client) Close() (err error) {
  322. client.Lock()
  323. defer client.Unlock()
  324. if client.conn != nil {
  325. err = client.conn.Close()
  326. client.conn = nil
  327. }
  328. return
  329. }