You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

168 lines
3.2 KiB

  1. package client
  2. import (
  3. "errors"
  4. "math/rand"
  5. "sync"
  6. )
  7. const (
  8. poolSize = 10
  9. )
  10. var (
  11. ErrNotFound = errors.New("Server Not Found")
  12. SelectWithRate = selectWithRate
  13. SelectRandom = selectRandom
  14. )
  15. type poolClient struct {
  16. *Client
  17. Rate int
  18. mutex sync.Mutex
  19. }
  20. type SelectionHandler func(map[string]*poolClient, string) string
  21. func selectWithRate(pool map[string]*poolClient,
  22. last string) (addr string) {
  23. total := 0
  24. for _, item := range pool {
  25. total += item.Rate
  26. if rand.Intn(total) < item.Rate {
  27. return item.addr
  28. }
  29. }
  30. return last
  31. }
  32. func selectRandom(pool map[string]*poolClient,
  33. last string) (addr string) {
  34. r := rand.Intn(len(pool))
  35. i := 0
  36. for k, _ := range pool {
  37. if r == i {
  38. return k
  39. }
  40. i++
  41. }
  42. return last
  43. }
  44. type Pool struct {
  45. SelectionHandler SelectionHandler
  46. ErrorHandler ErrorHandler
  47. clients map[string]*poolClient
  48. last string
  49. mutex sync.Mutex
  50. }
  51. // Return a new pool.
  52. func NewPool() (pool *Pool) {
  53. return &Pool{
  54. clients: make(map[string]*poolClient, poolSize),
  55. SelectionHandler: SelectWithRate,
  56. }
  57. }
  58. // Add a server with rate.
  59. func (pool *Pool) Add(net, addr string, rate int) (err error) {
  60. pool.mutex.Lock()
  61. defer pool.mutex.Unlock()
  62. var item *poolClient
  63. var ok bool
  64. if item, ok = pool.clients[addr]; ok {
  65. item.Rate = rate
  66. } else {
  67. var client *Client
  68. client, err = New(net, addr)
  69. if err == nil {
  70. item = &poolClient{Client: client, Rate: rate}
  71. pool.clients[addr] = item
  72. }
  73. }
  74. return
  75. }
  76. // Remove a server.
  77. func (pool *Pool) Remove(addr string) {
  78. pool.mutex.Lock()
  79. defer pool.mutex.Unlock()
  80. delete(pool.clients, addr)
  81. }
  82. func (pool *Pool) Do(funcname string, data []byte,
  83. flag byte, h ResponseHandler) (addr, handle string, err error) {
  84. client := pool.selectServer()
  85. client.Lock()
  86. defer client.Unlock()
  87. handle, err = client.Do(funcname, data, flag, h)
  88. addr = client.addr
  89. return
  90. }
  91. func (pool *Pool) DoBg(funcname string, data []byte,
  92. flag byte) (addr, handle string, err error) {
  93. client := pool.selectServer()
  94. client.Lock()
  95. defer client.Unlock()
  96. handle, err = client.DoBg(funcname, data, flag)
  97. addr = client.addr
  98. return
  99. }
  100. // Get job status from job server.
  101. // !!!Not fully tested.!!!
  102. func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
  103. if client, ok := pool.clients[addr]; ok {
  104. client.Lock()
  105. defer client.Unlock()
  106. status, err = client.Status(handle)
  107. } else {
  108. err = ErrNotFound
  109. }
  110. return
  111. }
  112. // Send a something out, get the samething back.
  113. func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) {
  114. var client *poolClient
  115. if addr == "" {
  116. client = pool.selectServer()
  117. } else {
  118. var ok bool
  119. if client, ok = pool.clients[addr]; !ok {
  120. err = ErrNotFound
  121. return
  122. }
  123. }
  124. client.Lock()
  125. defer client.Unlock()
  126. echo, err = client.Echo(data)
  127. return
  128. }
  129. // Close
  130. func (pool *Pool) Close() (err map[string]error) {
  131. err = make(map[string]error)
  132. for _, c := range pool.clients {
  133. err[c.addr] = c.Close()
  134. }
  135. return
  136. }
  137. // selecting server
  138. func (pool *Pool) selectServer() (client *poolClient) {
  139. for client == nil {
  140. addr := pool.SelectionHandler(pool.clients, pool.last)
  141. var ok bool
  142. if client, ok = pool.clients[addr]; ok {
  143. pool.last = addr
  144. break
  145. }
  146. }
  147. return
  148. }