You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

155 lines
2.8 KiB

  1. package worker
  2. import (
  3. "io"
  4. "net"
  5. "strings"
  6. "sync"
  7. "bufio"
  8. )
  9. // The agent of job server.
  10. type agent struct {
  11. sync.Mutex
  12. conn net.Conn
  13. rw *bufio.ReadWriter
  14. worker *Worker
  15. in chan []byte
  16. net, addr string
  17. }
  18. // Create the agent of job server.
  19. func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
  20. a = &agent{
  21. net: net,
  22. addr: addr,
  23. worker: worker,
  24. in: make(chan []byte, queueSize),
  25. }
  26. return
  27. }
  28. func (a *agent) Connect() (err error) {
  29. a.Lock()
  30. defer a.Unlock()
  31. a.conn, err = net.Dial(a.net, a.addr)
  32. if err != nil {
  33. return
  34. }
  35. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  36. bufio.NewWriter(a.conn))
  37. go a.work()
  38. return
  39. }
  40. func (a *agent) work() {
  41. var inpack *inPack
  42. var l int
  43. var err error
  44. var data, leftdata []byte
  45. for {
  46. if data, err = a.read(bufferSize); err != nil {
  47. if err == ErrLostConn {
  48. break
  49. }
  50. a.worker.err(err)
  51. // If it is unexpected error and the connection wasn't
  52. // closed by Gearmand, the agent should close the conection
  53. // and reconnect to job server.
  54. a.Close()
  55. a.conn, err = net.Dial(a.net, a.addr)
  56. if err != nil {
  57. a.worker.err(err)
  58. break
  59. }
  60. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  61. bufio.NewWriter(a.conn))
  62. }
  63. if len(leftdata) > 0 { // some data left for processing
  64. data = append(leftdata, data...)
  65. }
  66. if len(data) < minPacketLength { // not enough data
  67. leftdata = data
  68. continue
  69. }
  70. if inpack, l, err = decodeInPack(data); err != nil {
  71. a.worker.err(err)
  72. continue
  73. }
  74. leftdata = nil
  75. inpack.a = a
  76. a.worker.in <- inpack
  77. if len(data) > l {
  78. leftdata = data[l:]
  79. }
  80. }
  81. }
  82. func (a *agent) Close() {
  83. a.Lock()
  84. defer a.Unlock()
  85. if a.conn != nil {
  86. a.conn.Close()
  87. a.conn = nil
  88. }
  89. }
  90. func (a *agent) Grab() {
  91. a.Lock()
  92. defer a.Unlock()
  93. outpack := getOutPack()
  94. outpack.dataType = dtGrabJobUniq
  95. a.write(outpack)
  96. }
  97. func (a *agent) PreSleep() {
  98. a.Lock()
  99. defer a.Unlock()
  100. outpack := getOutPack()
  101. outpack.dataType = dtPreSleep
  102. a.write(outpack)
  103. }
  104. func isClosed(err error) bool {
  105. switch {
  106. case err == io.EOF:
  107. fallthrough
  108. case strings.Contains(err.Error(), "use of closed network connection"):
  109. return true
  110. }
  111. return false
  112. }
  113. // read length bytes from the socket
  114. func (a *agent) read(length int) (data []byte, err error) {
  115. n := 0
  116. buf := getBuffer(bufferSize)
  117. // read until data can be unpacked
  118. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  119. if n, err = a.rw.Read(buf); err != nil {
  120. if isClosed(err) {
  121. err = ErrLostConn
  122. }
  123. return
  124. }
  125. data = append(data, buf[0:n]...)
  126. if n < bufferSize {
  127. break
  128. }
  129. }
  130. return
  131. }
  132. // Internal write the encoded job.
  133. func (a *agent) write(outpack *outPack) (err error) {
  134. var n int
  135. buf := outpack.Encode()
  136. for i := 0; i < len(buf); i += n {
  137. n, err = a.rw.Write(buf[i:])
  138. if err != nil {
  139. return err
  140. }
  141. }
  142. return a.rw.Flush()
  143. }