You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

162 lines
3.1 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com>.
  2. // All rights reserved.
  3. // Use of this source code is governed by a MIT
  4. // license that can be found in the LICENSE file.
  5. package client
  6. import (
  7. "errors"
  8. "math/rand"
  9. "sync"
  10. )
  11. const (
  12. PoolSize = 10
  13. )
  14. var (
  15. ErrNotFound = errors.New("Server Not Found")
  16. )
  17. type poolClient struct {
  18. *Client
  19. Rate int
  20. }
  21. type SelectionHandler func(map[string]*poolClient, string) string
  22. func SelectWithRate(pool map[string]*poolClient,
  23. last string) (addr string) {
  24. total := 0
  25. for _, item := range pool {
  26. total += item.Rate
  27. if rand.Intn(total) < item.Rate {
  28. return item.addr
  29. }
  30. }
  31. return last
  32. }
  33. func SelectRandom(pool map[string]*poolClient,
  34. last string) (addr string) {
  35. r := rand.Intn(len(pool))
  36. i := 0
  37. for k, _ := range pool {
  38. if r == i {
  39. return k
  40. }
  41. i++
  42. }
  43. return last
  44. }
  45. type Pool struct {
  46. SelectionHandler SelectionHandler
  47. ErrorHandler ErrorHandler
  48. clients map[string]*poolClient
  49. last string
  50. mutex sync.Mutex
  51. }
  52. // Create a new pool.
  53. func NewPool() (pool *Pool) {
  54. return &Pool{
  55. clients: make(map[string]*poolClient, PoolSize),
  56. SelectionHandler: SelectWithRate,
  57. }
  58. }
  59. // Add a server with rate.
  60. func (pool *Pool) Add(net, addr string, rate int) (err error) {
  61. pool.mutex.Lock()
  62. defer pool.mutex.Unlock()
  63. var item *poolClient
  64. var ok bool
  65. if item, ok = pool.clients[addr]; ok {
  66. item.Rate = rate
  67. } else {
  68. var client *Client
  69. client, err = New(net, addr)
  70. if err == nil {
  71. item = &poolClient{Client: client, Rate: rate}
  72. pool.clients[addr] = item
  73. }
  74. }
  75. return
  76. }
  77. // Remove a server.
  78. func (pool *Pool) Remove(addr string) {
  79. pool.mutex.Lock()
  80. defer pool.mutex.Unlock()
  81. delete(pool.clients, addr)
  82. }
  83. func (pool *Pool) Do(funcname string, data []byte,
  84. flag byte, h ResponseHandler) (addr, handle string, err error) {
  85. client := pool.selectServer()
  86. handle, err = client.Do(funcname, data, flag, h)
  87. addr = client.addr
  88. return
  89. }
  90. func (pool *Pool) DoBg(funcname string, data []byte,
  91. flag byte) (addr, handle string, err error) {
  92. client := pool.selectServer()
  93. handle, err = client.DoBg(funcname, data, flag)
  94. addr = client.addr
  95. return
  96. }
  97. // Get job status from job server.
  98. // !!!Not fully tested.!!!
  99. func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
  100. if client, ok := pool.clients[addr]; ok {
  101. status, err = client.Status(handle)
  102. } else {
  103. err = ErrNotFound
  104. }
  105. return
  106. }
  107. // Send a something out, get the samething back.
  108. func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) {
  109. var client *poolClient
  110. if addr == "" {
  111. client = pool.selectServer()
  112. } else {
  113. var ok bool
  114. if client, ok = pool.clients[addr]; !ok {
  115. err = ErrNotFound
  116. return
  117. }
  118. }
  119. echo, err = client.Echo(data)
  120. return
  121. }
  122. // Close
  123. func (pool *Pool) Close() (err map[string]error) {
  124. err = make(map[string]error)
  125. for _, c := range pool.clients {
  126. err[c.addr] = c.Close()
  127. }
  128. return
  129. }
  130. // selecting server
  131. func (pool *Pool) selectServer() (client *poolClient) {
  132. for client == nil {
  133. addr := pool.SelectionHandler(pool.clients, pool.last)
  134. var ok bool
  135. if client, ok = pool.clients[addr]; ok {
  136. pool.last = addr
  137. break
  138. }
  139. }
  140. return
  141. }