Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 

166 lines
3.1 KiB

  1. package client
  2. import (
  3. "errors"
  4. "math/rand"
  5. "sync"
  6. )
  7. const (
  8. poolSize = 10
  9. )
  10. var (
  11. ErrNotFound = errors.New("Server Not Found")
  12. )
  13. type PoolClient struct {
  14. *Client
  15. Rate int
  16. mutex sync.Mutex
  17. }
  18. type SelectionHandler func(map[string]*PoolClient, string) string
  19. func SelectWithRate(pool map[string]*PoolClient,
  20. last string) (addr string) {
  21. total := 0
  22. for _, item := range pool {
  23. total += item.Rate
  24. if rand.Intn(total) < item.Rate {
  25. return item.addr
  26. }
  27. }
  28. return last
  29. }
  30. func SelectRandom(pool map[string]*PoolClient,
  31. last string) (addr string) {
  32. r := rand.Intn(len(pool))
  33. i := 0
  34. for k, _ := range pool {
  35. if r == i {
  36. return k
  37. }
  38. i++
  39. }
  40. return last
  41. }
  42. type Pool struct {
  43. SelectionHandler SelectionHandler
  44. ErrorHandler ErrorHandler
  45. Clients map[string]*PoolClient
  46. last string
  47. mutex sync.Mutex
  48. }
  49. // NewPool returns a new pool.
  50. func NewPool() (pool *Pool) {
  51. return &Pool{
  52. Clients: make(map[string]*PoolClient, poolSize),
  53. SelectionHandler: SelectWithRate,
  54. }
  55. }
  56. // Add a server with rate.
  57. func (pool *Pool) Add(net, addr string, rate int) (err error) {
  58. pool.mutex.Lock()
  59. defer pool.mutex.Unlock()
  60. var item *PoolClient
  61. var ok bool
  62. if item, ok = pool.Clients[addr]; ok {
  63. item.Rate = rate
  64. } else {
  65. var client *Client
  66. client, err = New(net, addr)
  67. if err == nil {
  68. item = &PoolClient{Client: client, Rate: rate}
  69. pool.Clients[addr] = item
  70. }
  71. }
  72. return
  73. }
  74. // Remove a server.
  75. func (pool *Pool) Remove(addr string) {
  76. pool.mutex.Lock()
  77. defer pool.mutex.Unlock()
  78. delete(pool.Clients, addr)
  79. }
  80. func (pool *Pool) Do(funcname string, data []byte,
  81. flag byte, h ResponseHandler) (addr, handle string, err error) {
  82. client := pool.selectServer()
  83. client.Lock()
  84. defer client.Unlock()
  85. handle, err = client.Do(funcname, data, flag, h)
  86. addr = client.addr
  87. return
  88. }
  89. func (pool *Pool) DoBg(funcname string, data []byte,
  90. flag byte) (addr, handle string, err error) {
  91. client := pool.selectServer()
  92. client.Lock()
  93. defer client.Unlock()
  94. handle, err = client.DoBg(funcname, data, flag)
  95. addr = client.addr
  96. return
  97. }
  98. // Status gets job status from job server.
  99. // !!!Not fully tested.!!!
  100. func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
  101. if client, ok := pool.Clients[addr]; ok {
  102. client.Lock()
  103. defer client.Unlock()
  104. status, err = client.Status(handle)
  105. } else {
  106. err = ErrNotFound
  107. }
  108. return
  109. }
  110. // Send a something out, get the samething back.
  111. func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) {
  112. var client *PoolClient
  113. if addr == "" {
  114. client = pool.selectServer()
  115. } else {
  116. var ok bool
  117. if client, ok = pool.Clients[addr]; !ok {
  118. err = ErrNotFound
  119. return
  120. }
  121. }
  122. client.Lock()
  123. defer client.Unlock()
  124. echo, err = client.Echo(data)
  125. return
  126. }
  127. // Close
  128. func (pool *Pool) Close() (err map[string]error) {
  129. err = make(map[string]error)
  130. for _, c := range pool.Clients {
  131. err[c.addr] = c.Close()
  132. }
  133. return
  134. }
  135. // selecting server
  136. func (pool *Pool) selectServer() (client *PoolClient) {
  137. for client == nil {
  138. addr := pool.SelectionHandler(pool.Clients, pool.last)
  139. var ok bool
  140. if client, ok = pool.Clients[addr]; ok {
  141. pool.last = addr
  142. break
  143. }
  144. }
  145. return
  146. }