You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent.go 2.9 KiB

11 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package worker
  2. import (
  3. "io"
  4. "net"
  5. "strings"
  6. "sync"
  7. "bufio"
  8. )
  9. // The agent of job server.
  10. type agent struct {
  11. sync.Mutex
  12. conn net.Conn
  13. rw *bufio.ReadWriter
  14. worker *Worker
  15. in chan []byte
  16. net, addr string
  17. }
  18. // Create the agent of job server.
  19. func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
  20. a = &agent{
  21. net: net,
  22. addr: addr,
  23. worker: worker,
  24. in: make(chan []byte, queueSize),
  25. }
  26. return
  27. }
  28. func (a *agent) Connect() (err error) {
  29. a.Lock()
  30. defer a.Unlock()
  31. a.conn, err = net.Dial(a.net, a.addr)
  32. if err != nil {
  33. return
  34. }
  35. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  36. bufio.NewWriter(a.conn))
  37. go a.work()
  38. return
  39. }
  40. func (a *agent) work() {
  41. var inpack *inPack
  42. var l int
  43. var err error
  44. var data, leftdata []byte
  45. for {
  46. if data, err = a.read(bufferSize); err != nil {
  47. if err == ErrLostConn {
  48. break
  49. }
  50. a.worker.err(err)
  51. // If it is unexpected error and the connection wasn't
  52. // closed by Gearmand, the agent should close the conection
  53. // and reconnect to job server.
  54. a.Close()
  55. a.conn, err = net.Dial(a.net, a.addr)
  56. if err != nil {
  57. a.worker.err(err)
  58. break
  59. }
  60. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  61. bufio.NewWriter(a.conn))
  62. }
  63. if len(leftdata) > 0 { // some data left for processing
  64. data = append(leftdata, data...)
  65. }
  66. if len(data) < minPacketLength { // not enough data
  67. leftdata = data
  68. continue
  69. }
  70. if inpack, l, err = decodeInPack(data); err != nil {
  71. a.worker.err(err)
  72. leftdata = data
  73. continue
  74. }
  75. leftdata = nil
  76. inpack.a = a
  77. a.worker.in <- inpack
  78. if len(data) > l {
  79. leftdata = data[l:]
  80. }
  81. }
  82. }
  83. func (a *agent) Close() {
  84. a.Lock()
  85. defer a.Unlock()
  86. if a.conn != nil {
  87. a.conn.Close()
  88. a.conn = nil
  89. }
  90. }
  91. func (a *agent) Grab() {
  92. a.Lock()
  93. defer a.Unlock()
  94. outpack := getOutPack()
  95. outpack.dataType = dtGrabJobUniq
  96. a.write(outpack)
  97. }
  98. func (a *agent) PreSleep() {
  99. a.Lock()
  100. defer a.Unlock()
  101. outpack := getOutPack()
  102. outpack.dataType = dtPreSleep
  103. a.write(outpack)
  104. }
  105. func isClosed(err error) bool {
  106. switch {
  107. case err == io.EOF:
  108. fallthrough
  109. case strings.Contains(err.Error(), "use of closed network connection"):
  110. return true
  111. }
  112. return false
  113. }
  114. // read length bytes from the socket
  115. func (a *agent) read(length int) (data []byte, err error) {
  116. n := 0
  117. buf := getBuffer(bufferSize)
  118. // read until data can be unpacked
  119. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  120. if n, err = a.rw.Read(buf); err != nil {
  121. if isClosed(err) {
  122. err = ErrLostConn
  123. }
  124. return
  125. }
  126. data = append(data, buf[0:n]...)
  127. if n < bufferSize {
  128. break
  129. }
  130. }
  131. return
  132. }
  133. // Internal write the encoded job.
  134. func (a *agent) write(outpack *outPack) (err error) {
  135. var n int
  136. buf := outpack.Encode()
  137. for i := 0; i < len(buf); i += n {
  138. n, err = a.rw.Write(buf[i:])
  139. if err != nil {
  140. return err
  141. }
  142. }
  143. return a.rw.Flush()
  144. }