You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

152 lines
2.8 KiB

  1. package worker
  2. import (
  3. "bufio"
  4. "net"
  5. "sync"
  6. )
  7. // The agent of job server.
  8. type agent struct {
  9. sync.Mutex
  10. conn net.Conn
  11. rw *bufio.ReadWriter
  12. worker *Worker
  13. in chan []byte
  14. net, addr string
  15. }
  16. // Create the agent of job server.
  17. func newAgent(net, addr string, worker *Worker) (a *agent, err error) {
  18. a = &agent{
  19. net: net,
  20. addr: addr,
  21. worker: worker,
  22. in: make(chan []byte, queueSize),
  23. }
  24. return
  25. }
  26. func (a *agent) Connect() (err error) {
  27. a.Lock()
  28. defer a.Unlock()
  29. a.conn, err = net.Dial(a.net, a.addr)
  30. if err != nil {
  31. return
  32. }
  33. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  34. bufio.NewWriter(a.conn))
  35. go a.work()
  36. return
  37. }
  38. func (a *agent) work() {
  39. defer func() {
  40. if err := recover(); err != nil {
  41. a.worker.err(err.(error))
  42. }
  43. }()
  44. var inpack *inPack
  45. var l int
  46. var err error
  47. var data, leftdata []byte
  48. for {
  49. if data, err = a.read(bufferSize); err != nil {
  50. if opErr, ok := err.(*net.OpError); ok {
  51. if opErr.Temporary() {
  52. continue
  53. }else{
  54. a.worker.err(err)
  55. break
  56. }
  57. }
  58. a.worker.err(err)
  59. // If it is unexpected error and the connection wasn't
  60. // closed by Gearmand, the agent should close the conection
  61. // and reconnect to job server.
  62. a.Close()
  63. a.conn, err = net.Dial(a.net, a.addr)
  64. if err != nil {
  65. a.worker.err(err)
  66. break
  67. }
  68. a.rw = bufio.NewReadWriter(bufio.NewReader(a.conn),
  69. bufio.NewWriter(a.conn))
  70. }
  71. if len(leftdata) > 0 { // some data left for processing
  72. data = append(leftdata, data...)
  73. }
  74. if len(data) < minPacketLength { // not enough data
  75. leftdata = data
  76. continue
  77. }
  78. if inpack, l, err = decodeInPack(data); err != nil {
  79. a.worker.err(err)
  80. leftdata = data
  81. continue
  82. }
  83. leftdata = nil
  84. inpack.a = a
  85. a.worker.in <- inpack
  86. if len(data) > l {
  87. leftdata = data[l:]
  88. }
  89. }
  90. }
  91. func (a *agent) Close() {
  92. a.Lock()
  93. defer a.Unlock()
  94. if a.conn != nil {
  95. a.conn.Close()
  96. a.conn = nil
  97. }
  98. }
  99. func (a *agent) Grab() {
  100. a.Lock()
  101. defer a.Unlock()
  102. outpack := getOutPack()
  103. outpack.dataType = dtGrabJobUniq
  104. a.write(outpack)
  105. }
  106. func (a *agent) PreSleep() {
  107. a.Lock()
  108. defer a.Unlock()
  109. outpack := getOutPack()
  110. outpack.dataType = dtPreSleep
  111. a.write(outpack)
  112. }
  113. // read length bytes from the socket
  114. func (a *agent) read(length int) (data []byte, err error) {
  115. n := 0
  116. buf := getBuffer(bufferSize)
  117. // read until data can be unpacked
  118. for i := length; i > 0 || len(data) < minPacketLength; i -= n {
  119. if n, err = a.rw.Read(buf); err != nil {
  120. return
  121. }
  122. data = append(data, buf[0:n]...)
  123. if n < bufferSize {
  124. break
  125. }
  126. }
  127. return
  128. }
  129. // Internal write the encoded job.
  130. func (a *agent) write(outpack *outPack) (err error) {
  131. var n int
  132. buf := outpack.Encode()
  133. for i := 0; i < len(buf); i += n {
  134. n, err = a.rw.Write(buf[i:])
  135. if err != nil {
  136. return err
  137. }
  138. }
  139. return a.rw.Flush()
  140. }