You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

283 lines
7.6 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com>.
  2. // All rights reserved.
  3. // Use of this source code is governed by a MIT
  4. // license that can be found in the LICENSE file.
  5. package client
  6. import (
  7. "io"
  8. "net"
  9. "time"
  10. "bytes"
  11. "strconv"
  12. "bitbucket.org/mikespook/golib/autoinc"
  13. "bitbucket.org/mikespook/gearman-go/common"
  14. )
  15. // Job handler
  16. type JobHandler func(*Job) error
  17. // Status handler
  18. // handle, known, running, numerator, denominator
  19. type StatusHandler func(string, bool, bool, uint64, uint64)
  20. /*
  21. The client side api for gearman
  22. usage:
  23. c := client.New("tcp4", "127.0.0.1:4730")
  24. handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
  25. */
  26. type Client struct {
  27. ErrHandler common.ErrorHandler
  28. JobHandler JobHandler
  29. StatusHandler StatusHandler
  30. TimeOut time.Duration
  31. in chan []byte
  32. out chan *Job
  33. jobCreated chan *Job
  34. conn net.Conn
  35. ai *autoinc.AutoInc
  36. }
  37. // Create a new client.
  38. // Connect to "addr" through "network"
  39. // Eg.
  40. // client, err := client.New("127.0.0.1:4730")
  41. func New(addr string) (client *Client, err error) {
  42. conn, err := net.Dial(common.NETWORK, addr)
  43. if err != nil {
  44. return
  45. }
  46. client = &Client{
  47. jobCreated: make(chan *Job),
  48. in: make(chan []byte, common.QUEUE_SIZE),
  49. out: make(chan *Job, common.QUEUE_SIZE),
  50. conn: conn,
  51. ai: autoinc.New(0, 1),
  52. TimeOut: time.Second,
  53. }
  54. go client.inLoop()
  55. go client.outLoop()
  56. return
  57. }
  58. // out loop
  59. func (client *Client) outLoop() {
  60. ok := true
  61. for ok {
  62. if job, ok := <-client.out; ok {
  63. if err := client.write(job.Encode()); err != nil {
  64. client.err(err)
  65. }
  66. }
  67. }
  68. }
  69. // in loop
  70. func (client *Client) inLoop() {
  71. defer common.DisablePanic()
  72. for {
  73. rel, err := client.read()
  74. if err != nil {
  75. if err == common.ErrConnection {
  76. client.Close()
  77. break
  78. }
  79. client.err(err)
  80. continue
  81. }
  82. job, err := decodeJob(rel)
  83. if err != nil {
  84. client.err(err)
  85. continue
  86. }
  87. switch job.DataType {
  88. case common.ERROR:
  89. _, err := common.GetError(job.Data)
  90. client.err(err)
  91. case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
  92. common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION,
  93. common.ECHO_RES:
  94. go client.handleJob(job)
  95. case common.JOB_CREATED:
  96. client.jobCreated <- job
  97. case common.STATUS_RES:
  98. go client.handleStatus(job)
  99. }
  100. }
  101. }
  102. // inner read
  103. func (client *Client) read() (data []byte, err error) {
  104. if len(client.in) > 0 {
  105. // incoming queue is not empty
  106. data = <-client.in
  107. } else {
  108. // empty queue, read data from socket
  109. for {
  110. buf := make([]byte, common.BUFFER_SIZE)
  111. var n int
  112. if n, err = client.conn.Read(buf); err != nil {
  113. if err == io.EOF && n == 0 {
  114. if data == nil {
  115. err = common.ErrConnection
  116. return
  117. }
  118. break
  119. }
  120. return
  121. }
  122. data = append(data, buf[0:n]...)
  123. if n < common.BUFFER_SIZE {
  124. break
  125. }
  126. }
  127. }
  128. // split package
  129. tl := len(data)
  130. start, end := 0, 4
  131. for i := 0; i < tl; i++ {
  132. if string(data[start:end]) == common.RES_STR {
  133. l := int(common.BytesToUint32([4]byte{data[start+8], data[start+9], data[start+10], data[start+11]}))
  134. total := l + 12
  135. if total == tl {
  136. return
  137. } else {
  138. client.in <- data[total:]
  139. data = data[:total]
  140. return
  141. }
  142. } else {
  143. start++
  144. end++
  145. }
  146. }
  147. return nil, common.Errorf("Invalid data: %V", data)
  148. }
  149. // error handler
  150. func (client *Client) err (e error) {
  151. if client.ErrHandler != nil {
  152. client.ErrHandler(e)
  153. }
  154. }
  155. // job handler
  156. func (client *Client) handleJob(job *Job) {
  157. if client.JobHandler != nil {
  158. if err := client.JobHandler(job); err != nil {
  159. client.err(err)
  160. }
  161. }
  162. }
  163. // status handler
  164. func (client *Client) handleStatus(job *Job) {
  165. if client.StatusHandler != nil {
  166. data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
  167. if len(data) != 5 {
  168. client.err(common.Errorf("Invalid data: %V", job.Data))
  169. return
  170. }
  171. handle := string(data[0])
  172. known := (data[1][0] == '1')
  173. running := (data[2][0] == '1')
  174. numerator, err := strconv.ParseUint(string(data[3][0]), 10, 0)
  175. if err != nil {
  176. client.err(common.Errorf("Invalid handle: %s", data[3][0]))
  177. return
  178. }
  179. denominator, err := strconv.ParseUint(string(data[4][0]), 10, 0)
  180. if err != nil {
  181. client.err(common.Errorf("Invalid handle: %s", data[4][0]))
  182. return
  183. }
  184. client.StatusHandler(handle, known, running, numerator, denominator)
  185. }
  186. }
  187. // Do the function.
  188. // funcname is a string with function name.
  189. // data is encoding to byte array.
  190. // flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
  191. // and if it is background job: JOB_BG.
  192. // JOB_LOW | JOB_BG means the job is running with low level in background.
  193. func (client *Client) Do(funcname string, data []byte, flag byte) (handle string, err error) {
  194. var datatype uint32
  195. if flag & JOB_LOW == JOB_LOW {
  196. if flag & JOB_BG == JOB_BG {
  197. datatype = common.SUBMIT_JOB_LOW_BG
  198. } else {
  199. datatype = common.SUBMIT_JOB_LOW
  200. }
  201. } else if flag & JOB_HIGH == JOB_HIGH {
  202. if flag & JOB_BG == JOB_BG {
  203. datatype = common.SUBMIT_JOB_HIGH_BG
  204. } else {
  205. datatype = common.SUBMIT_JOB_HIGH
  206. }
  207. } else if flag & JOB_BG == JOB_BG {
  208. datatype = common.SUBMIT_JOB_BG
  209. } else {
  210. datatype = common.SUBMIT_JOB
  211. }
  212. uid := strconv.Itoa(int(client.ai.Id()))
  213. l := len(funcname) + len(uid) + len(data) + 2
  214. rel := make([]byte, 0, l)
  215. rel = append(rel, []byte(funcname)...) // len(funcname)
  216. rel = append(rel, '\x00') // 1 Byte
  217. rel = append(rel, []byte(uid)...) // len(uid)
  218. rel = append(rel, '\x00') // 1 Byte
  219. rel = append(rel, data...) // len(data)
  220. client.writeJob(newJob(common.REQ, datatype, rel))
  221. // Waiting for JOB_CREATED
  222. select {
  223. case job := <-client.jobCreated:
  224. return string(job.Data), nil
  225. case <-time.After(client.TimeOut):
  226. return "", common.ErrJobTimeOut
  227. }
  228. return
  229. }
  230. // Get job status from job server.
  231. // !!!Not fully tested.!!!
  232. func (client *Client) Status(handle string) {
  233. job := newJob(common.REQ, common.GET_STATUS, []byte(handle))
  234. client.writeJob(job)
  235. }
  236. // Send a something out, get the samething back.
  237. func (client *Client) Echo(data []byte) {
  238. client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
  239. }
  240. // Send the job to job server.
  241. func (client *Client) writeJob(job *Job) {
  242. client.out <- job
  243. }
  244. // Internal write
  245. func (client *Client) write(buf []byte) (err error) {
  246. var n int
  247. for i := 0; i < len(buf); i += n {
  248. n, err = client.conn.Write(buf[i:])
  249. if err != nil {
  250. return
  251. }
  252. }
  253. return
  254. }
  255. // Close
  256. func (client *Client) Close() (err error) {
  257. close(client.jobCreated)
  258. close(client.in)
  259. close(client.out)
  260. return client.conn.Close();
  261. }