You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

297 lines
6.5 KiB

  1. // The client package helps developers connect to Gearmand, send
  2. // jobs and fetch result.
  3. package client
  4. import (
  5. "bufio"
  6. "net"
  7. "sync"
  8. )
  9. // One client connect to one server.
  10. // Use Pool for multi-connections.
  11. type Client struct {
  12. sync.Mutex
  13. net, addr, lastcall string
  14. respHandler map[string]ResponseHandler
  15. innerHandler map[string]ResponseHandler
  16. in chan *Response
  17. conn net.Conn
  18. rw *bufio.ReadWriter
  19. ErrorHandler ErrorHandler
  20. }
  21. // Return a client.
  22. func New(network, addr string) (client *Client, err error) {
  23. client = &Client{
  24. net: network,
  25. addr: addr,
  26. respHandler: make(map[string]ResponseHandler, queueSize),
  27. innerHandler: make(map[string]ResponseHandler, queueSize),
  28. in: make(chan *Response, queueSize),
  29. }
  30. client.conn, err = net.Dial(client.net, client.addr)
  31. if err != nil {
  32. return
  33. }
  34. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  35. bufio.NewWriter(client.conn))
  36. go client.readLoop()
  37. go client.processLoop()
  38. return
  39. }
  40. func (client *Client) write(req *request) (err error) {
  41. var n int
  42. buf := req.Encode()
  43. for i := 0; i < len(buf); i += n {
  44. n, err = client.rw.Write(buf[i:])
  45. if err != nil {
  46. return
  47. }
  48. }
  49. return client.rw.Flush()
  50. }
  51. func (client *Client) read(length int) (data []byte, err error) {
  52. n := 0
  53. buf := getBuffer(bufferSize)
  54. // read until data can be unpacked
  55. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  56. if n, err = client.rw.Read(buf); err != nil {
  57. return
  58. }
  59. data = append(data, buf[0:n]...)
  60. if n < bufferSize {
  61. break
  62. }
  63. }
  64. return
  65. }
  66. func (client *Client) readLoop() {
  67. defer close(client.in)
  68. var data, leftdata []byte
  69. var err error
  70. var resp *Response
  71. ReadLoop:
  72. for client.conn != nil {
  73. if data, err = client.read(bufferSize); err != nil {
  74. if opErr, ok := err.(*net.OpError); ok {
  75. if opErr.Timeout() {
  76. client.err(err)
  77. }
  78. if opErr.Temporary() {
  79. continue
  80. }
  81. break
  82. }
  83. client.err(err)
  84. // If it is unexpected error and the connection wasn't
  85. // closed by Gearmand, the client should close the conection
  86. // and reconnect to job server.
  87. client.Close()
  88. client.conn, err = net.Dial(client.net, client.addr)
  89. if err != nil {
  90. client.err(err)
  91. break
  92. }
  93. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  94. bufio.NewWriter(client.conn))
  95. continue
  96. }
  97. if len(leftdata) > 0 { // some data left for processing
  98. data = append(leftdata, data...)
  99. }
  100. for {
  101. l := len(data)
  102. if l < minPacketLength { // not enough data
  103. leftdata = data
  104. continue ReadLoop
  105. }
  106. if resp, l, err = decodeResponse(data); err != nil {
  107. leftdata = data[l:]
  108. continue ReadLoop
  109. } else {
  110. client.in <- resp
  111. }
  112. data = data[l:]
  113. if len(data) > 0 {
  114. continue
  115. }
  116. break
  117. }
  118. }
  119. }
  120. func (client *Client) processLoop() {
  121. for resp := range client.in {
  122. switch resp.DataType {
  123. case dtError:
  124. if client.lastcall != "" {
  125. resp = client.handleInner(client.lastcall, resp)
  126. client.lastcall = ""
  127. } else {
  128. client.err(getError(resp.Data))
  129. }
  130. case dtStatusRes:
  131. resp = client.handleInner("s"+resp.Handle, resp)
  132. case dtJobCreated:
  133. resp = client.handleInner("c", resp)
  134. case dtEchoRes:
  135. resp = client.handleInner("e", resp)
  136. case dtWorkData, dtWorkWarning, dtWorkStatus:
  137. resp = client.handleResponse(resp.Handle, resp)
  138. case dtWorkComplete, dtWorkFail, dtWorkException:
  139. resp = client.handleResponse(resp.Handle, resp)
  140. if resp != nil {
  141. delete(client.respHandler, resp.Handle)
  142. }
  143. }
  144. }
  145. }
  146. func (client *Client) err(e error) {
  147. if client.ErrorHandler != nil {
  148. client.ErrorHandler(e)
  149. }
  150. }
  151. func (client *Client) handleResponse(key string, resp *Response) *Response {
  152. if h, ok := client.respHandler[key]; ok {
  153. h(resp)
  154. return nil
  155. }
  156. return resp
  157. }
  158. func (client *Client) handleInner(key string, resp *Response) *Response {
  159. if h, ok := client.innerHandler[key]; ok {
  160. h(resp)
  161. delete(client.innerHandler, key)
  162. return nil
  163. }
  164. return resp
  165. }
  166. func (client *Client) do(funcname string, data []byte,
  167. flag uint32) (handle string, err error) {
  168. if client.conn == nil {
  169. return "", ErrLostConn
  170. }
  171. var mutex sync.Mutex
  172. mutex.Lock()
  173. client.lastcall = "c"
  174. client.innerHandler["c"] = func(resp *Response) {
  175. defer mutex.Unlock()
  176. if resp.DataType == dtError {
  177. err = getError(resp.Data)
  178. return
  179. }
  180. handle = resp.Handle
  181. }
  182. id := IdGen.Id()
  183. req := getJob(id, []byte(funcname), data)
  184. req.DataType = flag
  185. client.write(req)
  186. mutex.Lock()
  187. return
  188. }
  189. // Call the function and get a response.
  190. // flag can be set to: JobLow, JobNormal and JobHigh
  191. func (client *Client) Do(funcname string, data []byte,
  192. flag byte, h ResponseHandler) (handle string, err error) {
  193. var datatype uint32
  194. switch flag {
  195. case JobLow:
  196. datatype = dtSubmitJobLow
  197. case JobHigh:
  198. datatype = dtSubmitJobHigh
  199. default:
  200. datatype = dtSubmitJob
  201. }
  202. handle, err = client.do(funcname, data, datatype)
  203. if err == nil && h != nil {
  204. client.respHandler[handle] = h
  205. }
  206. return
  207. }
  208. // Call the function in background, no response needed.
  209. // flag can be set to: JobLow, JobNormal and JobHigh
  210. func (client *Client) DoBg(funcname string, data []byte,
  211. flag byte) (handle string, err error) {
  212. if client.conn == nil {
  213. return "", ErrLostConn
  214. }
  215. var datatype uint32
  216. switch flag {
  217. case JobLow:
  218. datatype = dtSubmitJobLowBg
  219. case JobHigh:
  220. datatype = dtSubmitJobHighBg
  221. default:
  222. datatype = dtSubmitJobBg
  223. }
  224. handle, err = client.do(funcname, data, datatype)
  225. return
  226. }
  227. // Get job status from job server.
  228. func (client *Client) Status(handle string) (status *Status, err error) {
  229. if client.conn == nil {
  230. return nil, ErrLostConn
  231. }
  232. var mutex sync.Mutex
  233. mutex.Lock()
  234. client.lastcall = "s" + handle
  235. client.innerHandler["s"+handle] = func(resp *Response) {
  236. defer mutex.Unlock()
  237. var err error
  238. status, err = resp._status()
  239. if err != nil {
  240. client.err(err)
  241. }
  242. }
  243. req := getRequest()
  244. req.DataType = dtGetStatus
  245. req.Data = []byte(handle)
  246. client.write(req)
  247. mutex.Lock()
  248. return
  249. }
  250. // Echo.
  251. func (client *Client) Echo(data []byte) (echo []byte, err error) {
  252. if client.conn == nil {
  253. return nil, ErrLostConn
  254. }
  255. var mutex sync.Mutex
  256. mutex.Lock()
  257. client.innerHandler["e"] = func(resp *Response) {
  258. echo = resp.Data
  259. mutex.Unlock()
  260. }
  261. req := getRequest()
  262. req.DataType = dtEchoReq
  263. req.Data = data
  264. client.lastcall = "e"
  265. client.write(req)
  266. mutex.Lock()
  267. return
  268. }
  269. // Close connection
  270. func (client *Client) Close() (err error) {
  271. client.Lock()
  272. defer client.Unlock()
  273. if client.conn != nil {
  274. err = client.conn.Close()
  275. client.conn = nil
  276. }
  277. return
  278. }