You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

368 lines
9.5 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com>.
  2. // All rights reserved.
  3. // Use of this source code is governed by a MIT
  4. // license that can be found in the LICENSE file.
  5. package client
  6. import (
  7. "io"
  8. "net"
  9. "sync"
  10. "time"
  11. "bytes"
  12. "strconv"
  13. "github.com/mikespook/golib/autoinc"
  14. "github.com/mikespook/gearman-go/common"
  15. )
  16. var (
  17. ai *autoinc.AutoInc
  18. )
  19. func init() {
  20. ai = autoinc.New(0, 1)
  21. }
  22. // Status handler
  23. // handle, known, running, numerator, denominator
  24. type StatusHandler func(string, bool, bool, uint64, uint64)
  25. /*
  26. The client side api for gearman
  27. usage:
  28. c := client.New("tcp4", "127.0.0.1:4730")
  29. handle := c.Do("foobar", []byte("data here"), JOB_LOW | JOB_BG)
  30. */
  31. type Client struct {
  32. ErrHandler common.ErrorHandler
  33. TimeOut time.Duration
  34. in chan []byte
  35. out chan *Job
  36. created chan string
  37. echo chan []byte
  38. status chan *Status
  39. jobhandlers map[string]JobHandler
  40. conn net.Conn
  41. addr string
  42. mutex sync.RWMutex
  43. }
  44. // Create a new client.
  45. // Connect to "addr" through "network"
  46. // Eg.
  47. // client, err := client.New("127.0.0.1:4730")
  48. func New(addr string) (client *Client, err error) {
  49. client = &Client{
  50. created: make(chan string, common.QUEUE_SIZE),
  51. echo: make(chan []byte, common.QUEUE_SIZE),
  52. status: make(chan *Status, common.QUEUE_SIZE),
  53. jobhandlers: make(map[string]JobHandler, common.QUEUE_SIZE),
  54. in: make(chan []byte, common.QUEUE_SIZE),
  55. out: make(chan *Job, common.QUEUE_SIZE),
  56. addr: addr,
  57. TimeOut: time.Second,
  58. }
  59. if err = client.connect(); err != nil {
  60. return
  61. }
  62. go client.inLoop()
  63. go client.outLoop()
  64. return
  65. }
  66. // {{{ private functions
  67. //
  68. func (client *Client) connect() (err error) {
  69. client.mutex.Lock()
  70. defer client.mutex.Unlock()
  71. client.conn, err = net.Dial(common.NETWORK, client.addr)
  72. return
  73. }
  74. // Internal write
  75. func (client *Client) write(buf []byte) (err error) {
  76. client.mutex.RLock()
  77. defer client.mutex.RUnlock()
  78. var n int
  79. for i := 0; i < len(buf); i += n {
  80. n, err = client.conn.Write(buf[i:])
  81. if err != nil {
  82. return
  83. }
  84. }
  85. return
  86. }
  87. // read length bytes from the socket
  88. func (client *Client) readData(length int) (data []byte, err error) {
  89. client.mutex.RLock()
  90. defer client.mutex.RUnlock()
  91. n := 0
  92. buf := make([]byte, common.BUFFER_SIZE)
  93. // read until data can be unpacked
  94. for i := length; i > 0 || len(data) < common.PACKET_LEN; i -= n {
  95. if n, err = client.conn.Read(buf); err != nil {
  96. if err == io.EOF && n == 0 {
  97. if data == nil {
  98. err = common.ErrConnection
  99. return
  100. }
  101. return data, nil
  102. }
  103. return
  104. }
  105. data = append(data, buf[0:n]...)
  106. if n < common.BUFFER_SIZE {
  107. break
  108. }
  109. }
  110. return
  111. }
  112. // unpack data
  113. func (client *Client) unpack(data []byte) ([]byte, int, bool) {
  114. tl := len(data)
  115. start := 0
  116. for i := 0; i < tl+1-common.PACKET_LEN; i++ {
  117. if start+common.PACKET_LEN > tl { // too few data to unpack, read more
  118. return nil, common.PACKET_LEN, false
  119. }
  120. if string(data[start:start+4]) == common.RES_STR {
  121. l := int(common.BytesToUint32([4]byte{data[start+8],
  122. data[start+9], data[start+10], data[start+11]}))
  123. total := l + common.PACKET_LEN
  124. if total == tl { // data is what we want
  125. return data, common.PACKET_LEN, true
  126. } else if total < tl { // data[:total] is what we want, data[total:] is the more
  127. client.in <- data[total:]
  128. data = data[start:total]
  129. return data, common.PACKET_LEN, true
  130. } else { // ops! It won't be possible.
  131. return nil, total - tl, false
  132. }
  133. } else { // flag was not found, move to next step
  134. start++
  135. }
  136. }
  137. return nil, common.PACKET_LEN, false
  138. }
  139. // Internal read
  140. func (client *Client) read() (rel []byte, err error) {
  141. var data []byte
  142. ok := false
  143. l := common.PACKET_LEN
  144. for !ok {
  145. inlen := len(client.in)
  146. if inlen > 0 {
  147. // in queue is not empty
  148. for i := 0; i < inlen; i++ {
  149. data = append(data, <-client.in...)
  150. }
  151. } else {
  152. var d []byte
  153. d, err = client.readData(l)
  154. if err != nil {
  155. return
  156. }
  157. data = append(data, d...)
  158. }
  159. rel, l, ok = client.unpack(data)
  160. }
  161. return
  162. }
  163. // out loop
  164. func (client *Client) outLoop() {
  165. for job := range client.out {
  166. if err := client.write(job.Encode()); err != nil {
  167. client.err(err)
  168. }
  169. }
  170. }
  171. // in loop
  172. func (client *Client) inLoop() {
  173. defer common.DisablePanic()
  174. for {
  175. rel, err := client.read()
  176. if err != nil {
  177. if err == common.ErrConnection {
  178. client.Close()
  179. }
  180. client.err(err)
  181. break
  182. }
  183. job, err := decodeJob(rel)
  184. if err != nil {
  185. client.err(err)
  186. continue
  187. //break
  188. }
  189. switch job.DataType {
  190. case common.ERROR:
  191. _, err := common.GetError(job.Data)
  192. client.err(err)
  193. case common.WORK_DATA, common.WORK_WARNING, common.WORK_STATUS,
  194. common.WORK_COMPLETE, common.WORK_FAIL, common.WORK_EXCEPTION:
  195. client.handleJob(job)
  196. case common.ECHO_RES:
  197. client.handleEcho(job)
  198. case common.JOB_CREATED:
  199. client.handleCreated(job)
  200. case common.STATUS_RES:
  201. client.handleStatus(job)
  202. default:
  203. break
  204. }
  205. }
  206. }
  207. // error handler
  208. func (client *Client) err (e error) {
  209. if client.ErrHandler != nil {
  210. client.ErrHandler(e)
  211. }
  212. }
  213. // job handler
  214. func (client *Client) handleJob(job *Job) {
  215. if h, ok := client.jobhandlers[job.UniqueId]; ok {
  216. h(job)
  217. delete(client.jobhandlers, job.UniqueId)
  218. }
  219. }
  220. func (client *Client) handleEcho(job *Job) {
  221. client.echo <- job.Data
  222. }
  223. func (client *Client) handleCreated(job *Job) {
  224. client.created <- string(job.Data)
  225. }
  226. // status handler
  227. func (client *Client) handleStatus(job *Job) {
  228. data := bytes.SplitN(job.Data, []byte{'\x00'}, 5)
  229. if len(data) != 5 {
  230. client.err(common.Errorf("Invalid data: %V", job.Data))
  231. return
  232. }
  233. status := &Status{}
  234. status.Handle = string(data[0])
  235. status.Known = (data[1][0] == '1')
  236. status.Running = (data[2][0] == '1')
  237. var err error
  238. status.Numerator, err = strconv.ParseUint(string(data[3][0]), 10, 0)
  239. if err != nil {
  240. client.err(common.Errorf("Invalid handle: %s", data[3][0]))
  241. return
  242. }
  243. status.Denominator, err = strconv.ParseUint(string(data[4][0]), 10, 0)
  244. if err != nil {
  245. client.err(common.Errorf("Invalid handle: %s", data[4][0]))
  246. return
  247. }
  248. client.status <- status
  249. }
  250. // Send the job to job server.
  251. func (client *Client) writeJob(job *Job) {
  252. client.out <- job
  253. }
  254. // Internal do
  255. func (client *Client) do(funcname string, data []byte,
  256. flag uint32, id string) (handle string) {
  257. l := len(funcname) + len(id) + len(data) + 2
  258. rel := make([]byte, 0, l)
  259. rel = append(rel, []byte(funcname)...) // len(funcname)
  260. rel = append(rel, '\x00') // 1 Byte
  261. rel = append(rel, []byte(id)...) // len(uid)
  262. rel = append(rel, '\x00') // 1 Byte
  263. rel = append(rel, data...) // len(data)
  264. client.writeJob(newJob(common.REQ, flag, rel))
  265. // Waiting for JOB_CREATED
  266. handle = <-client.created
  267. return
  268. }
  269. // }}}
  270. // Do the function.
  271. // funcname is a string with function name.
  272. // data is encoding to byte array.
  273. // flag set the job type, include running level: JOB_LOW, JOB_NORMAL, JOB_HIGH,
  274. // and if it is background job: JOB_BG.
  275. // JOB_LOW | JOB_BG means the job is running with low level in background.
  276. func (client *Client) Do(funcname string, data []byte,
  277. flag byte, jobhandler JobHandler) (handle string) {
  278. var datatype uint32
  279. switch flag {
  280. case JOB_LOW :
  281. datatype = common.SUBMIT_JOB_LOW
  282. case JOB_HIGH :
  283. datatype = common.SUBMIT_JOB_HIGH
  284. default:
  285. datatype = common.SUBMIT_JOB
  286. }
  287. id := strconv.Itoa(int(ai.Id()))
  288. handle = client.do(funcname, data, datatype, id)
  289. if jobhandler != nil {
  290. client.jobhandlers[id] = jobhandler
  291. }
  292. return
  293. }
  294. func (client *Client) DoBg(funcname string, data []byte,
  295. flag byte) (handle string) {
  296. var datatype uint32
  297. switch flag {
  298. case JOB_LOW :
  299. datatype = common.SUBMIT_JOB_LOW_BG
  300. case JOB_HIGH :
  301. datatype = common.SUBMIT_JOB_HIGH_BG
  302. default:
  303. datatype = common.SUBMIT_JOB_BG
  304. }
  305. handle = client.do(funcname, data, datatype, "")
  306. return
  307. }
  308. // Get job status from job server.
  309. // !!!Not fully tested.!!!
  310. func (client *Client) Status(handle string) (status *Status) {
  311. client.writeJob(newJob(common.REQ, common.GET_STATUS, []byte(handle)))
  312. status = <-client.status
  313. return
  314. }
  315. // Send a something out, get the samething back.
  316. func (client *Client) Echo(data []byte) (r []byte) {
  317. client.writeJob(newJob(common.REQ, common.ECHO_REQ, data))
  318. r = <-client.echo
  319. return
  320. }
  321. // Close
  322. func (client *Client) Close() (err error) {
  323. close(client.in)
  324. close(client.out)
  325. close(client.echo)
  326. close(client.created)
  327. close(client.status)
  328. return client.conn.Close();
  329. }