You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent.go 2.8 KiB

11 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package worker
  2. import (
  3. "bufio"
  4. "net"
  5. "sync"
  6. )
  7. // The agent of job server.
  8. type agent struct {
  9. sync.Mutex
  10. conn net.Conn
  11. rw *bufio.ReadWriter
  12. worker *Worker
  13. in chan []byte
  14. net, addr string
  15. }
  16. // Create the agent of job server.
  17. func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
  18. a = &agent{
  19. net: net,
  20. addr: addr,
  21. worker: worker,
  22. in: make(chan []byte, queueSize),
  23. }
  24. return
  25. }
  26. func (a *agent) Connect() (err error) {
  27. a.Lock()
  28. defer a.Unlock()
  29. a.conn, err = net.Dial(a.net, a.addr)
  30. if err != nil {
  31. return
  32. }
  33. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  34. bufio.NewWriter(a.conn))
  35. go a.work()
  36. return
  37. }
  38. func (a *agent) work() {
  39. defer func() {
  40. if err := recover(); err != nil {
  41. a.worker.err(err.(error))
  42. }
  43. }()
  44. var inpack *inPack
  45. var l int
  46. var err error
  47. var data, leftdata []byte
  48. for {
  49. if data, err = a.read(bufferSize); err != nil {
  50. if opErr, ok := err.(*net.OpError); ok {
  51. if opErr.Timeout() {
  52. a.worker.err(err)
  53. }
  54. if opErr.Temporary() {
  55. continue
  56. }
  57. break
  58. }
  59. a.worker.err(err)
  60. // If it is unexpected error and the connection wasn't
  61. // closed by Gearmand, the agent should close the conection
  62. // and reconnect to job server.
  63. a.Close()
  64. a.conn, err = net.Dial(a.net, a.addr)
  65. if err != nil {
  66. a.worker.err(err)
  67. break
  68. }
  69. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  70. bufio.NewWriter(a.conn))
  71. }
  72. if len(leftdata) > 0 { // some data left for processing
  73. data = append(leftdata, data...)
  74. }
  75. if len(data) < minPacketLength { // not enough data
  76. leftdata = data
  77. continue
  78. }
  79. if inpack, l, err = decodeInPack(data); err != nil {
  80. a.worker.err(err)
  81. leftdata = data
  82. continue
  83. }
  84. leftdata = nil
  85. inpack.a = a
  86. a.worker.in <- inpack
  87. if len(data) > l {
  88. leftdata = data[l:]
  89. }
  90. }
  91. }
  92. func (a *agent) Close() {
  93. a.Lock()
  94. defer a.Unlock()
  95. if a.conn != nil {
  96. a.conn.Close()
  97. a.conn = nil
  98. }
  99. }
  100. func (a *agent) Grab() {
  101. a.Lock()
  102. defer a.Unlock()
  103. outpack := getOutPack()
  104. outpack.dataType = dtGrabJobUniq
  105. a.write(outpack)
  106. }
  107. func (a *agent) PreSleep() {
  108. a.Lock()
  109. defer a.Unlock()
  110. outpack := getOutPack()
  111. outpack.dataType = dtPreSleep
  112. a.write(outpack)
  113. }
  114. // read length bytes from the socket
  115. func (a *agent) read(length int) (data []byte, err error) {
  116. n := 0
  117. buf := getBuffer(bufferSize)
  118. // read until data can be unpacked
  119. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  120. if n, err = a.rw.Read(buf); err != nil {
  121. return
  122. }
  123. data = append(data, buf[0:n]...)
  124. if n < bufferSize {
  125. break
  126. }
  127. }
  128. return
  129. }
  130. // Internal write the encoded job.
  131. func (a *agent) write(outpack *outPack) (err error) {
  132. var n int
  133. buf := outpack.Encode()
  134. for i := 0; i < len(buf); i += n {
  135. n, err = a.rw.Write(buf[i:])
  136. if err != nil {
  137. return err
  138. }
  139. }
  140. return a.rw.Flush()
  141. }