You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

160 lines
2.9 KiB

  1. package worker
  2. import (
  3. "io"
  4. "net"
  5. "strings"
  6. "sync"
  7. "bufio"
  8. )
  9. // The agent of job server.
  10. type agent struct {
  11. sync.Mutex
  12. conn net.Conn
  13. rw *bufio.ReadWriter
  14. worker *Worker
  15. in chan []byte
  16. net, addr string
  17. }
  18. // Create the agent of job server.
  19. func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
  20. a = &agent{
  21. net: net,
  22. addr: addr,
  23. worker: worker,
  24. in: make(chan []byte, queueSize),
  25. }
  26. return
  27. }
  28. func (a *agent) Connect() (err error) {
  29. a.Lock()
  30. defer a.Unlock()
  31. a.conn, err = net.Dial(a.net, a.addr)
  32. if err != nil {
  33. return
  34. }
  35. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  36. bufio.NewWriter(a.conn))
  37. go a.work()
  38. return
  39. }
  40. func (a *agent) work() {
  41. defer func() {
  42. if err := recover(); err != nil {
  43. a.worker.err(err.(error))
  44. }
  45. }()
  46. var inpack *inPack
  47. var l int
  48. var err error
  49. var data, leftdata []byte
  50. for {
  51. if data, err = a.read(bufferSize); err != nil {
  52. if err == ErrLostConn {
  53. break
  54. }
  55. a.worker.err(err)
  56. // If it is unexpected error and the connection wasn't
  57. // closed by Gearmand, the agent should close the conection
  58. // and reconnect to job server.
  59. a.Close()
  60. a.conn, err = net.Dial(a.net, a.addr)
  61. if err != nil {
  62. a.worker.err(err)
  63. break
  64. }
  65. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  66. bufio.NewWriter(a.conn))
  67. }
  68. if len(leftdata) > 0 { // some data left for processing
  69. data = append(leftdata, data...)
  70. }
  71. if len(data) < minPacketLength { // not enough data
  72. leftdata = data
  73. continue
  74. }
  75. if inpack, l, err = decodeInPack(data); err != nil {
  76. a.worker.err(err)
  77. continue
  78. }
  79. leftdata = nil
  80. inpack.a = a
  81. a.worker.in <- inpack
  82. if len(data) > l {
  83. leftdata = data[l:]
  84. }
  85. }
  86. }
  87. func (a *agent) Close() {
  88. a.Lock()
  89. defer a.Unlock()
  90. if a.conn != nil {
  91. a.conn.Close()
  92. a.conn = nil
  93. }
  94. }
  95. func (a *agent) Grab() {
  96. a.Lock()
  97. defer a.Unlock()
  98. outpack := getOutPack()
  99. outpack.dataType = dtGrabJobUniq
  100. a.write(outpack)
  101. }
  102. func (a *agent) PreSleep() {
  103. a.Lock()
  104. defer a.Unlock()
  105. outpack := getOutPack()
  106. outpack.dataType = dtPreSleep
  107. a.write(outpack)
  108. }
  109. func isClosed(err error) bool {
  110. switch {
  111. case err == io.EOF:
  112. fallthrough
  113. case strings.Contains(err.Error(), "use of closed network connection"):
  114. return true
  115. }
  116. return false
  117. }
  118. // read length bytes from the socket
  119. func (a *agent) read(length int) (data []byte, err error) {
  120. n := 0
  121. buf := getBuffer(bufferSize)
  122. // read until data can be unpacked
  123. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  124. if n, err = a.rw.Read(buf); err != nil {
  125. if isClosed(err) {
  126. err = ErrLostConn
  127. }
  128. return
  129. }
  130. data = append(data, buf[0:n]...)
  131. if n < bufferSize {
  132. break
  133. }
  134. }
  135. return
  136. }
  137. // Internal write the encoded job.
  138. func (a *agent) write(outpack *outPack) (err error) {
  139. var n int
  140. buf := outpack.Encode()
  141. for i := 0; i < len(buf); i += n {
  142. n, err = a.rw.Write(buf[i:])
  143. if err != nil {
  144. return err
  145. }
  146. }
  147. return a.rw.Flush()
  148. }