You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

156 lines
2.9 KiB

  1. package worker
  2. import (
  3. "io"
  4. "net"
  5. "strings"
  6. "sync"
  7. "bufio"
  8. )
  9. // The agent of job server.
  10. type agent struct {
  11. sync.Mutex
  12. conn net.Conn
  13. rw *bufio.ReadWriter
  14. worker *Worker
  15. in chan []byte
  16. net, addr string
  17. }
  18. // Create the agent of job server.
  19. func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
  20. a = &agent{
  21. net: net,
  22. addr: addr,
  23. worker: worker,
  24. in: make(chan []byte, queueSize),
  25. }
  26. return
  27. }
  28. func (a *agent) Connect() (err error) {
  29. a.Lock()
  30. defer a.Unlock()
  31. a.conn, err = net.Dial(a.net, a.addr)
  32. if err != nil {
  33. return
  34. }
  35. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  36. bufio.NewWriter(a.conn))
  37. go a.work()
  38. return
  39. }
  40. func (a *agent) work() {
  41. var inpack *inPack
  42. var l int
  43. var err error
  44. var data, leftdata []byte
  45. for {
  46. if data, err = a.read(bufferSize); err != nil {
  47. if err == ErrLostConn {
  48. break
  49. }
  50. a.worker.err(err)
  51. // If it is unexpected error and the connection wasn't
  52. // closed by Gearmand, the agent should close the conection
  53. // and reconnect to job server.
  54. a.Close()
  55. a.conn, err = net.Dial(a.net, a.addr)
  56. if err != nil {
  57. a.worker.err(err)
  58. break
  59. }
  60. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  61. bufio.NewWriter(a.conn))
  62. }
  63. if len(leftdata) > 0 { // some data left for processing
  64. data = append(leftdata, data...)
  65. }
  66. if len(data) < minPacketLength { // not enough data
  67. leftdata = data
  68. continue
  69. }
  70. if inpack, l, err = decodeInPack(data); err != nil {
  71. a.worker.err(err)
  72. leftdata = data
  73. continue
  74. }
  75. leftdata = nil
  76. inpack.a = a
  77. a.worker.in <- inpack
  78. if len(data) > l {
  79. leftdata = data[l:]
  80. }
  81. }
  82. }
  83. func (a *agent) Close() {
  84. a.Lock()
  85. defer a.Unlock()
  86. if a.conn != nil {
  87. a.conn.Close()
  88. a.conn = nil
  89. }
  90. }
  91. func (a *agent) Grab() {
  92. a.Lock()
  93. defer a.Unlock()
  94. outpack := getOutPack()
  95. outpack.dataType = dtGrabJobUniq
  96. a.write(outpack)
  97. }
  98. func (a *agent) PreSleep() {
  99. a.Lock()
  100. defer a.Unlock()
  101. outpack := getOutPack()
  102. outpack.dataType = dtPreSleep
  103. a.write(outpack)
  104. }
  105. func isClosed(err error) bool {
  106. switch {
  107. case err == io.EOF:
  108. fallthrough
  109. case strings.Contains(err.Error(), "use of closed network connection"):
  110. return true
  111. }
  112. return false
  113. }
  114. // read length bytes from the socket
  115. func (a *agent) read(length int) (data []byte, err error) {
  116. n := 0
  117. buf := getBuffer(bufferSize)
  118. // read until data can be unpacked
  119. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  120. if n, err = a.rw.Read(buf); err != nil {
  121. if isClosed(err) {
  122. err = ErrLostConn
  123. }
  124. return
  125. }
  126. data = append(data, buf[0:n]...)
  127. if n < bufferSize {
  128. break
  129. }
  130. }
  131. return
  132. }
  133. // Internal write the encoded job.
  134. func (a *agent) write(outpack *outPack) (err error) {
  135. var n int
  136. buf := outpack.Encode()
  137. for i := 0; i < len(buf); i += n {
  138. n, err = a.rw.Write(buf[i:])
  139. if err != nil {
  140. return err
  141. }
  142. }
  143. return a.rw.Flush()
  144. }