You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

client.go 6.5 KiB

11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. // The client package helps developers connect to Gearmand, send
  2. // jobs and fetch result.
  3. package client
  4. import (
  5. "bufio"
  6. "net"
  7. "sync"
  8. )
  9. // One client connect to one server.
  10. // Use Pool for multi-connections.
  11. type Client struct {
  12. sync.Mutex
  13. net, addr, lastcall string
  14. respHandler map[string]ResponseHandler
  15. innerHandler map[string]ResponseHandler
  16. in chan *Response
  17. conn net.Conn
  18. rw *bufio.ReadWriter
  19. ErrorHandler ErrorHandler
  20. }
  21. // Return a client.
  22. func New(network, addr string) (client *Client, err error) {
  23. client = &Client{
  24. net: network,
  25. addr: addr,
  26. respHandler: make(map[string]ResponseHandler, queueSize),
  27. innerHandler: make(map[string]ResponseHandler, queueSize),
  28. in: make(chan *Response, queueSize),
  29. }
  30. client.conn, err = net.Dial(client.net, client.addr)
  31. if err != nil {
  32. return
  33. }
  34. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  35. bufio.NewWriter(client.conn))
  36. go client.readLoop()
  37. go client.processLoop()
  38. return
  39. }
  40. func (client *Client) write(req *request) (err error) {
  41. var n int
  42. buf := req.Encode()
  43. for i := 0; i < len(buf); i += n {
  44. n, err = client.rw.Write(buf[i:])
  45. if err != nil {
  46. return
  47. }
  48. }
  49. return client.rw.Flush()
  50. }
  51. func (client *Client) read(length int) (data []byte, err error) {
  52. n := 0
  53. buf := getBuffer(bufferSize)
  54. // read until data can be unpacked
  55. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  56. if n, err = client.rw.Read(buf); err != nil {
  57. return
  58. }
  59. data = append(data, buf[0:n]...)
  60. if n < bufferSize {
  61. break
  62. }
  63. }
  64. return
  65. }
  66. func (client *Client) readLoop() {
  67. defer close(client.in)
  68. var data, leftdata []byte
  69. var err error
  70. var resp *Response
  71. ReadLoop:
  72. for client.conn != nil {
  73. if data, err = client.read(bufferSize); err != nil {
  74. if opErr, ok := err.(*net.OpError); ok {
  75. if opErr.Timeout() {
  76. client.err(err)
  77. }
  78. if opErr.Temporary() {
  79. continue
  80. }
  81. break
  82. }
  83. client.err(err)
  84. // If it is unexpected error and the connection wasn't
  85. // closed by Gearmand, the client should close the conection
  86. // and reconnect to job server.
  87. client.Close()
  88. client.conn, err = net.Dial(client.net, client.addr)
  89. if err != nil {
  90. client.err(err)
  91. break
  92. }
  93. client.rw = bufio.NewReadWriter(bufio.NewReader(client.conn),
  94. bufio.NewWriter(client.conn))
  95. continue
  96. }
  97. if len(leftdata) > 0 { // some data left for processing
  98. data = append(leftdata, data...)
  99. }
  100. for {
  101. l := len(data)
  102. if l < minPacketLength { // not enough data
  103. leftdata = data
  104. continue ReadLoop
  105. }
  106. if resp, l, err = decodeResponse(data); err != nil {
  107. leftdata = data[l:]
  108. continue ReadLoop
  109. } else {
  110. client.in <- resp
  111. }
  112. data = data[l:]
  113. if len(data) > 0 {
  114. continue
  115. }
  116. break
  117. }
  118. }
  119. }
  120. func (client *Client) processLoop() {
  121. for resp := range client.in {
  122. switch resp.DataType {
  123. case dtError:
  124. if client.lastcall != "" {
  125. resp = client.handleInner(client.lastcall, resp)
  126. client.lastcall = ""
  127. } else {
  128. client.err(getError(resp.Data))
  129. }
  130. case dtStatusRes:
  131. resp = client.handleInner("s"+resp.Handle, resp)
  132. case dtJobCreated:
  133. resp = client.handleInner("c", resp)
  134. case dtEchoRes:
  135. resp = client.handleInner("e", resp)
  136. case dtWorkData, dtWorkWarning, dtWorkStatus:
  137. resp = client.handleResponse(resp.Handle, resp)
  138. case dtWorkComplete, dtWorkFail, dtWorkException:
  139. client.handleResponse(resp.Handle, resp)
  140. delete(client.respHandler, resp.Handle)
  141. }
  142. }
  143. }
  144. func (client *Client) err(e error) {
  145. if client.ErrorHandler != nil {
  146. client.ErrorHandler(e)
  147. }
  148. }
  149. func (client *Client) handleResponse(key string, resp *Response) *Response {
  150. if h, ok := client.respHandler[key]; ok {
  151. h(resp)
  152. return nil
  153. }
  154. return resp
  155. }
  156. func (client *Client) handleInner(key string, resp *Response) *Response {
  157. if h, ok := client.innerHandler[key]; ok {
  158. h(resp)
  159. delete(client.innerHandler, key)
  160. return nil
  161. }
  162. return resp
  163. }
  164. func (client *Client) do(funcname string, data []byte,
  165. flag uint32) (handle string, err error) {
  166. if client.conn == nil {
  167. return "", ErrLostConn
  168. }
  169. var mutex sync.Mutex
  170. mutex.Lock()
  171. client.lastcall = "c"
  172. client.innerHandler["c"] = func(resp *Response) {
  173. defer mutex.Unlock()
  174. if resp.DataType == dtError {
  175. err = getError(resp.Data)
  176. return
  177. }
  178. handle = resp.Handle
  179. }
  180. id := IdGen.Id()
  181. req := getJob(id, []byte(funcname), data)
  182. req.DataType = flag
  183. client.write(req)
  184. mutex.Lock()
  185. return
  186. }
  187. // Call the function and get a response.
  188. // flag can be set to: JobLow, JobNormal and JobHigh
  189. func (client *Client) Do(funcname string, data []byte,
  190. flag byte, h ResponseHandler) (handle string, err error) {
  191. var datatype uint32
  192. switch flag {
  193. case JobLow:
  194. datatype = dtSubmitJobLow
  195. case JobHigh:
  196. datatype = dtSubmitJobHigh
  197. default:
  198. datatype = dtSubmitJob
  199. }
  200. handle, err = client.do(funcname, data, datatype)
  201. if err == nil && h != nil {
  202. client.respHandler[handle] = h
  203. }
  204. return
  205. }
  206. // Call the function in background, no response needed.
  207. // flag can be set to: JobLow, JobNormal and JobHigh
  208. func (client *Client) DoBg(funcname string, data []byte,
  209. flag byte) (handle string, err error) {
  210. if client.conn == nil {
  211. return "", ErrLostConn
  212. }
  213. var datatype uint32
  214. switch flag {
  215. case JobLow:
  216. datatype = dtSubmitJobLowBg
  217. case JobHigh:
  218. datatype = dtSubmitJobHighBg
  219. default:
  220. datatype = dtSubmitJobBg
  221. }
  222. handle, err = client.do(funcname, data, datatype)
  223. return
  224. }
  225. // Get job status from job server.
  226. func (client *Client) Status(handle string) (status *Status, err error) {
  227. if client.conn == nil {
  228. return nil, ErrLostConn
  229. }
  230. var mutex sync.Mutex
  231. mutex.Lock()
  232. client.lastcall = "s" + handle
  233. client.innerHandler["s"+handle] = func(resp *Response) {
  234. defer mutex.Unlock()
  235. var err error
  236. status, err = resp._status()
  237. if err != nil {
  238. client.err(err)
  239. }
  240. }
  241. req := getRequest()
  242. req.DataType = dtGetStatus
  243. req.Data = []byte(handle)
  244. client.write(req)
  245. mutex.Lock()
  246. return
  247. }
  248. // Echo.
  249. func (client *Client) Echo(data []byte) (echo []byte, err error) {
  250. if client.conn == nil {
  251. return nil, ErrLostConn
  252. }
  253. var mutex sync.Mutex
  254. mutex.Lock()
  255. client.innerHandler["e"] = func(resp *Response) {
  256. echo = resp.Data
  257. mutex.Unlock()
  258. }
  259. req := getRequest()
  260. req.DataType = dtEchoReq
  261. req.Data = data
  262. client.lastcall = "e"
  263. client.write(req)
  264. mutex.Lock()
  265. return
  266. }
  267. // Close connection
  268. func (client *Client) Close() (err error) {
  269. client.Lock()
  270. defer client.Unlock()
  271. if client.conn != nil {
  272. err = client.conn.Close()
  273. client.conn = nil
  274. }
  275. return
  276. }