You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

203 lines
3.6 KiB

  1. package worker
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/binary"
  6. "net"
  7. "sync"
  8. "io"
  9. )
  10. // The agent of job server.
  11. type agent struct {
  12. sync.Mutex
  13. conn net.Conn
  14. rw *bufio.ReadWriter
  15. worker *Worker
  16. in chan []byte
  17. net, addr string
  18. }
  19. // Create the agent of job server.
  20. func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
  21. a = &agent{
  22. net: net,
  23. addr: addr,
  24. worker: worker,
  25. in: make(chan []byte, queueSize),
  26. }
  27. return
  28. }
  29. func (a *agent) Connect() (err error) {
  30. a.Lock()
  31. defer a.Unlock()
  32. a.conn, err = net.Dial(a.net, a.addr)
  33. if err != nil {
  34. return
  35. }
  36. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  37. bufio.NewWriter(a.conn))
  38. go a.work()
  39. return
  40. }
  41. func (a *agent) work() {
  42. defer func() {
  43. if err := recover(); err != nil {
  44. a.worker.err(err.(error))
  45. }
  46. }()
  47. var inpack *inPack
  48. var l int
  49. var err error
  50. var data, leftdata []byte
  51. for {
  52. if data, err = a.read(); err != nil {
  53. if opErr, ok := err.(*net.OpError); ok {
  54. if opErr.Temporary() {
  55. continue
  56. }else{
  57. a.disconnect_error(err)
  58. // else - we're probably dc'ing due to a Close()
  59. break
  60. }
  61. } else if( err == io.EOF ){
  62. a.disconnect_error(err)
  63. break
  64. }
  65. a.worker.err(err)
  66. // If it is unexpected error and the connection wasn't
  67. // closed by Gearmand, the agent should close the conection
  68. // and reconnect to job server.
  69. a.Close()
  70. a.conn, err = net.Dial(a.net, a.addr)
  71. if err != nil {
  72. a.worker.err(err)
  73. break
  74. }
  75. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  76. bufio.NewWriter(a.conn))
  77. }
  78. if len(leftdata) > 0 { // some data left for processing
  79. data = append(leftdata, data...)
  80. }
  81. if len(data) < minPacketLength { // not enough data
  82. leftdata = data
  83. continue
  84. }
  85. if inpack, l, err = decodeInPack(data); err != nil {
  86. a.worker.err(err)
  87. leftdata = data
  88. continue
  89. }
  90. leftdata = nil
  91. inpack.a = a
  92. a.worker.in <- inpack
  93. if len(data) > l {
  94. leftdata = data[l:]
  95. }
  96. }
  97. }
  98. func (a * agent) disconnect_error( err error ){
  99. if( a.conn != nil ){
  100. err = &WorkerDisconnectError{
  101. err : err,
  102. agent : a,
  103. }
  104. a.worker.err(err)
  105. }
  106. }
  107. func (a *agent) Close() {
  108. a.Lock()
  109. defer a.Unlock()
  110. if a.conn != nil {
  111. a.conn.Close()
  112. a.conn = nil
  113. }
  114. }
  115. func (a *agent) Grab() {
  116. a.Lock()
  117. defer a.Unlock()
  118. a.grab()
  119. }
  120. func (a *agent) grab(){
  121. outpack := getOutPack()
  122. outpack.dataType = dtGrabJobUniq
  123. a.write(outpack)
  124. }
  125. func (a *agent) PreSleep() {
  126. a.Lock()
  127. defer a.Unlock()
  128. outpack := getOutPack()
  129. outpack.dataType = dtPreSleep
  130. a.write(outpack)
  131. }
  132. func (a *agent) reconnect() (error){
  133. a.Lock()
  134. defer a.Unlock()
  135. conn, err := net.Dial(a.net, a.addr)
  136. if err != nil {
  137. return err;
  138. }
  139. a.conn = conn
  140. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  141. bufio.NewWriter(a.conn))
  142. a.grab()
  143. a.worker.reRegisterFuncsForAgent(a)
  144. go a.work()
  145. return nil
  146. }
  147. // read length bytes from the socket
  148. func (a *agent) read() (data []byte, err error) {
  149. n := 0
  150. tmp := getBuffer(bufferSize)
  151. var buf bytes.Buffer
  152. // read the header so we can get the length of the data
  153. if n, err = a.rw.Read(tmp); err != nil {
  154. return
  155. }
  156. dl := int(binary.BigEndian.Uint32(tmp[8:12]))
  157. // write what we read so far
  158. buf.Write(tmp[:n])
  159. // read until we receive all the data
  160. for buf.Len() < dl+minPacketLength {
  161. if n, err = a.rw.Read(tmp); err != nil {
  162. return buf.Bytes(), err
  163. }
  164. buf.Write(tmp[:n])
  165. }
  166. return buf.Bytes(), err
  167. }
  168. // Internal write the encoded job.
  169. func (a *agent) write(outpack *outPack) (err error) {
  170. var n int
  171. buf := outpack.Encode()
  172. for i := 0; i < len(buf); i += n {
  173. n, err = a.rw.Write(buf[i:])
  174. if err != nil {
  175. return err
  176. }
  177. }
  178. return a.rw.Flush()
  179. }