You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

149 lines
2.6 KiB

  1. package worker
  2. import (
  3. "io"
  4. "net"
  5. "strings"
  6. "sync"
  7. )
  8. // The agent of job server.
  9. type agent struct {
  10. sync.Mutex
  11. conn net.Conn
  12. worker *Worker
  13. in chan []byte
  14. net, addr string
  15. }
  16. // Create the agent of job server.
  17. func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
  18. a = &agent{
  19. net: net,
  20. addr: addr,
  21. worker: worker,
  22. in: make(chan []byte, queueSize),
  23. }
  24. return
  25. }
  26. func (a *agent) Connect() (err error) {
  27. a.Lock()
  28. defer a.Unlock()
  29. a.conn, err = net.Dial(a.net, a.addr)
  30. if err != nil {
  31. return
  32. }
  33. go a.work()
  34. return
  35. }
  36. func (a *agent) work() {
  37. var inpack *inPack
  38. var l int
  39. var err error
  40. var data, leftdata []byte
  41. for {
  42. if data, err = a.read(bufferSize); err != nil {
  43. if err == ErrLostConn {
  44. break
  45. }
  46. a.worker.err(err)
  47. // If it is unexpected error and the connection wasn't
  48. // closed by Gearmand, the agent should close the conection
  49. // and reconnect to job server.
  50. a.Close()
  51. a.conn, err = net.Dial(a.net, a.addr)
  52. if err != nil {
  53. a.worker.err(err)
  54. break
  55. }
  56. }
  57. if len(leftdata) > 0 { // some data left for processing
  58. data = append(leftdata, data...)
  59. }
  60. if len(data) < minPacketLength { // not enough data
  61. leftdata = data
  62. continue
  63. }
  64. if inpack, l, err = decodeInPack(data); err != nil {
  65. a.worker.err(err)
  66. continue
  67. }
  68. leftdata = nil
  69. inpack.a = a
  70. a.worker.in <- inpack
  71. if len(data) > l {
  72. leftdata = data[l:]
  73. }
  74. }
  75. }
  76. func (a *agent) Close() {
  77. a.Lock()
  78. defer a.Unlock()
  79. if a.conn != nil {
  80. a.conn.Close()
  81. a.conn = nil
  82. }
  83. }
  84. func (a *agent) Grab() {
  85. a.Lock()
  86. defer a.Unlock()
  87. outpack := getOutPack()
  88. outpack.dataType = dtGrabJobUniq
  89. a.write(outpack)
  90. }
  91. func (a *agent) PreSleep() {
  92. a.Lock()
  93. defer a.Unlock()
  94. outpack := getOutPack()
  95. outpack.dataType = dtPreSleep
  96. a.write(outpack)
  97. }
  98. func isClosed(err error) bool {
  99. switch {
  100. case err == io.EOF:
  101. fallthrough
  102. case strings.Contains(err.Error(), "use of closed network connection"):
  103. return true
  104. }
  105. return false
  106. }
  107. // read length bytes from the socket
  108. func (a *agent) read(length int) (data []byte, err error) {
  109. n := 0
  110. buf := getBuffer(bufferSize)
  111. // read until data can be unpacked
  112. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  113. if n, err = a.conn.Read(buf); err != nil {
  114. if isClosed(err) {
  115. err = ErrLostConn
  116. }
  117. return
  118. }
  119. data = append(data, buf[0:n]...)
  120. if n < bufferSize {
  121. break
  122. }
  123. }
  124. return
  125. }
  126. // Internal write the encoded job.
  127. func (a *agent) write(outpack *outPack) (err error) {
  128. var n int
  129. buf := outpack.Encode()
  130. for i := 0; i < len(buf); i += n {
  131. n, err = a.conn.Write(buf[i:])
  132. if err != nil {
  133. return err
  134. }
  135. }
  136. return
  137. }