You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

156 lines
3.7 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. "io"
  7. "net"
  8. "bitbucket.org/mikespook/gearman-go/common"
  9. )
  10. // The agent of job server.
  11. type agent struct {
  12. conn net.Conn
  13. worker *Worker
  14. in chan []byte
  15. out chan *Job
  16. }
  17. // Create the agent of job server.
  18. func newAgent(addr string, worker *Worker) (a *agent, err error) {
  19. conn, err := net.Dial(common.NETWORK, addr)
  20. if err != nil {
  21. return
  22. }
  23. a = &agent{
  24. conn: conn,
  25. worker: worker,
  26. in: make(chan []byte, common.QUEUE_SIZE),
  27. out: make(chan *Job, common.QUEUE_SIZE),
  28. }
  29. return
  30. }
  31. // outputing loop
  32. func (a *agent) outLoop() {
  33. ok := true
  34. for ok {
  35. if job, ok := <-a.out; ok {
  36. if err := a.write(job.Encode()); err != nil {
  37. a.worker.err(err)
  38. }
  39. }
  40. }
  41. }
  42. // inputing loop
  43. func (a *agent) inLoop() {
  44. defer func() {
  45. a.conn.Close()
  46. close(a.in)
  47. close(a.out)
  48. a.worker.removeAgent(a)
  49. }()
  50. noop := true
  51. for a.worker.running {
  52. // got noop msg and in queue is zero, grab job
  53. if noop && len(a.in) == 0 {
  54. a.WriteJob(newJob(common.REQ, common.GRAB_JOB, nil))
  55. }
  56. rel, err := a.read()
  57. if err != nil {
  58. if err == common.ErrEmptyReading {
  59. break
  60. }
  61. a.worker.err(err)
  62. continue
  63. }
  64. job, err := decodeJob(rel)
  65. if err != nil {
  66. a.worker.err(err)
  67. continue
  68. }
  69. switch job.DataType {
  70. case common.NOOP:
  71. noop = true
  72. case common.NO_JOB:
  73. noop = false
  74. a.WriteJob(newJob(common.REQ, common.PRE_SLEEP, nil))
  75. case common.ERROR, common.ECHO_RES, common.JOB_ASSIGN_UNIQ, common.JOB_ASSIGN:
  76. job.agent = a
  77. a.worker.in <- job
  78. }
  79. }
  80. }
  81. func (a *agent) Work() {
  82. go a.outLoop()
  83. go a.inLoop()
  84. }
  85. // Internal read
  86. func (a *agent) read() (data []byte, err error) {
  87. if len(a.in) > 0 {
  88. // in queue is not empty
  89. data = <-a.in
  90. } else {
  91. for {
  92. buf := make([]byte, common.BUFFER_SIZE)
  93. var n int
  94. if n, err = a.conn.Read(buf); err != nil {
  95. if err == io.EOF && n == 0 {
  96. if data == nil {
  97. err = common.ErrEmptyReading
  98. return
  99. }
  100. break
  101. }
  102. return
  103. }
  104. data = append(data, buf[0:n]...)
  105. if n < common.BUFFER_SIZE {
  106. break
  107. }
  108. }
  109. }
  110. // split package
  111. tl := len(data)
  112. start := 0
  113. for i := 0; i < tl; i++ {
  114. if string(data[start:start+4]) == common.RES_STR {
  115. l := int(common.BytesToUint32([4]byte{data[start+8],
  116. data[start+9], data[start+10], data[start+11]}))
  117. total := l + 12
  118. if total == tl {
  119. return
  120. } else {
  121. a.in <- data[total:]
  122. data = data[:total]
  123. return
  124. }
  125. } else {
  126. start++
  127. }
  128. }
  129. return nil, common.Errorf("Invalid data: %V", data)
  130. }
  131. // Send a job to the job server.
  132. func (a *agent) WriteJob(job *Job) {
  133. a.out <- job
  134. }
  135. // Internal write the encoded job.
  136. func (a *agent) write(buf []byte) (err error) {
  137. var n int
  138. for i := 0; i < len(buf); i += n {
  139. n, err = a.conn.Write(buf[i:])
  140. if err != nil {
  141. return err
  142. }
  143. }
  144. return
  145. }