You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

client.go 6.5 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. // The client package helps developers connect to Gearmand, send
  2. // jobs and fetch result.
  3. package client
  4. import (
  5. "bufio"
  6. "net"
  7. "sync"
  8. )
  9. // One client connect to one server.
  10. // Use Pool for multi-connections.
  11. type Client struct {
  12. sync.Mutex
  13. net, addr, lastcall string
  14. respHandler map[string]ResponseHandler
  15. innerHandler map[string]ResponseHandler
  16. in chan *Response
  17. conn net.Conn
  18. rw *bufio.ReadWriter
  19. ErrorHandler ErrorHandler
  20. }
  21. // Return a client.
  22. func New(network, addr string) (client *Client, err error) {
  23. client = &Client{
  24. net: network,
  25. addr: addr,
  26. respHandler: make(map[string]ResponseHandler, queueSize),
  27. innerHandler: make(map[string]ResponseHandler, queueSize),
  28. in: make(chan *Response, queueSize),
  29. }
  30. client.conn, err = net.Dial(client.net, client.addr)
  31. if err != nil {
  32. return
  33. }
  34. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  35. bufio.NewWriter(client.conn))
  36. go client.readLoop()
  37. go client.processLoop()
  38. return
  39. }
  40. func (client *Client) write(req *request) (err error) {
  41. var n int
  42. buf := req.Encode()
  43. for i := 0; i < len(buf); i += n {
  44. n, err = client.rw.Write(buf[i:])
  45. if err != nil {
  46. return
  47. }
  48. }
  49. return client.rw.Flush()
  50. }
  51. func (client *Client) read(length int) (data []byte, err error) {
  52. n := 0
  53. buf := getBuffer(bufferSize)
  54. // read until data can be unpacked
  55. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  56. if n, err = client.rw.Read(buf); err != nil {
  57. return
  58. }
  59. data = append(data, buf[0:n]...)
  60. if n < bufferSize {
  61. break
  62. }
  63. }
  64. return
  65. }
  66. func (client *Client) readLoop() {
  67. defer close(client.in)
  68. var data, leftdata []byte
  69. var err error
  70. var resp *Response
  71. ReadLoop:
  72. for client.conn != nil {
  73. if data, err = client.read(bufferSize); err != nil {
  74. if opErr, ok := err.(*net.OpError); ok {
  75. if opErr.Timeout() {
  76. client.err(err)
  77. }
  78. if opErr.Temporary() {
  79. continue
  80. }
  81. break
  82. }
  83. client.err(err)
  84. // If it is unexpected error and the connection wasn't
  85. // closed by Gearmand, the client should close the conection
  86. // and reconnect to job server.
  87. client.Close()
  88. client.conn, err = net.Dial(client.net, client.addr)
  89. if err != nil {
  90. client.err(err)
  91. break
  92. }
  93. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  94. bufio.NewWriter(client.conn))
  95. continue
  96. }
  97. if len(leftdata) > 0 { // some data left for processing
  98. data = append(leftdata, data...)
  99. }
  100. for {
  101. l := len(data)
  102. if l < minPacketLength { // not enough data
  103. leftdata = data
  104. continue ReadLoop
  105. }
  106. if resp, l, err = decodeResponse(data); err != nil {
  107. leftdata = data[l:]
  108. continue ReadLoop
  109. } else {
  110. client.in <- resp
  111. }
  112. data = data[l:]
  113. if len(data) > 0 {
  114. continue
  115. }
  116. break
  117. }
  118. }
  119. }
  120. func (client *Client) processLoop() {
  121. for resp := range client.in {
  122. switch resp.DataType {
  123. case dtError:
  124. if client.lastcall != "" {
  125. resp = client.handleInner(client.lastcall, resp)
  126. client.lastcall = ""
  127. } else {
  128. client.err(getError(resp.Data))
  129. }
  130. case dtStatusRes:
  131. resp = client.handleInner("s"+resp.Handle, resp)
  132. case dtJobCreated:
  133. resp = client.handleInner("c", resp)
  134. case dtEchoRes:
  135. resp = client.handleInner("e", resp)
  136. case dtWorkData, dtWorkWarning, dtWorkStatus:
  137. resp = client.handleResponse(resp.Handle, resp)
  138. case dtWorkComplete, dtWorkFail, dtWorkException:
  139. resp = client.handleResponse(resp.Handle, resp)
  140. if resp != nil {
  141. delete(client.respHandler, resp.Handle)
  142. }
  143. }
  144. }
  145. }
  146. func (client *Client) err(e error) {
  147. if client.ErrorHandler != nil {
  148. client.ErrorHandler(e)
  149. }
  150. }
  151. func (client *Client) handleResponse(key string, resp *Response) *Response {
  152. if h, ok := client.respHandler[key]; ok {
  153. h(resp)
  154. return nil
  155. }
  156. return resp
  157. }
  158. func (client *Client) handleInner(key string, resp *Response) *Response {
  159. if h, ok := client.innerHandler[key]; ok {
  160. h(resp)
  161. delete(client.innerHandler, key)
  162. return nil
  163. }
  164. return resp
  165. }
  166. func (client *Client) do(funcname string, data []byte,
  167. flag uint32) (handle string, err error) {
  168. if client.conn == nil {
  169. return "", ErrLostConn
  170. }
  171. var mutex sync.Mutex
  172. mutex.Lock()
  173. client.lastcall = "c"
  174. client.innerHandler["c"] = func(resp *Response) {
  175. defer mutex.Unlock()
  176. if resp.DataType == dtError {
  177. err = getError(resp.Data)
  178. return
  179. }
  180. handle = resp.Handle
  181. }
  182. id := IdGen.Id()
  183. req := getJob(id, []byte(funcname), data)
  184. req.DataType = flag
  185. client.write(req)
  186. mutex.Lock()
  187. return
  188. }
  189. // Call the function and get a response.
  190. // flag can be set to: JobLow, JobNormal and JobHigh
  191. func (client *Client) Do(funcname string, data []byte,
  192. flag byte, h ResponseHandler) (handle string, err error) {
  193. var datatype uint32
  194. switch flag {
  195. case JobLow:
  196. datatype = dtSubmitJobLow
  197. case JobHigh:
  198. datatype = dtSubmitJobHigh
  199. default:
  200. datatype = dtSubmitJob
  201. }
  202. handle, err = client.do(funcname, data, datatype)
  203. if err == nil && h != nil {
  204. client.respHandler[handle] = h
  205. }
  206. return
  207. }
  208. // Call the function in background, no response needed.
  209. // flag can be set to: JobLow, JobNormal and JobHigh
  210. func (client *Client) DoBg(funcname string, data []byte,
  211. flag byte) (handle string, err error) {
  212. if client.conn == nil {
  213. return "", ErrLostConn
  214. }
  215. var datatype uint32
  216. switch flag {
  217. case JobLow:
  218. datatype = dtSubmitJobLowBg
  219. case JobHigh:
  220. datatype = dtSubmitJobHighBg
  221. default:
  222. datatype = dtSubmitJobBg
  223. }
  224. handle, err = client.do(funcname, data, datatype)
  225. return
  226. }
  227. // Get job status from job server.
  228. func (client *Client) Status(handle string) (status *Status, err error) {
  229. if client.conn == nil {
  230. return nil, ErrLostConn
  231. }
  232. var mutex sync.Mutex
  233. mutex.Lock()
  234. client.lastcall = "s" + handle
  235. client.innerHandler["s"+handle] = func(resp *Response) {
  236. defer mutex.Unlock()
  237. var err error
  238. status, err = resp._status()
  239. if err != nil {
  240. client.err(err)
  241. }
  242. }
  243. req := getRequest()
  244. req.DataType = dtGetStatus
  245. req.Data = []byte(handle)
  246. client.write(req)
  247. mutex.Lock()
  248. return
  249. }
  250. // Echo.
  251. func (client *Client) Echo(data []byte) (echo []byte, err error) {
  252. if client.conn == nil {
  253. return nil, ErrLostConn
  254. }
  255. var mutex sync.Mutex
  256. mutex.Lock()
  257. client.innerHandler["e"] = func(resp *Response) {
  258. echo = resp.Data
  259. mutex.Unlock()
  260. }
  261. req := getRequest()
  262. req.DataType = dtEchoReq
  263. req.Data = data
  264. client.lastcall = "e"
  265. client.write(req)
  266. mutex.Lock()
  267. return
  268. }
  269. // Close connection
  270. func (client *Client) Close() (err error) {
  271. client.Lock()
  272. defer client.Unlock()
  273. if client.conn != nil {
  274. err = client.conn.Close()
  275. client.conn = nil
  276. }
  277. return
  278. }