You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

pool.go 3.1 KiB

11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package client
  2. import (
  3. "errors"
  4. "math/rand"
  5. "sync"
  6. )
  7. const (
  8. poolSize = 10
  9. )
  10. var (
  11. ErrNotFound = errors.New("Server Not Found")
  12. )
  13. type PoolClient struct {
  14. *Client
  15. Rate int
  16. mutex sync.Mutex
  17. }
  18. type SelectionHandler func(map[string]*PoolClient, string) string
  19. func SelectWithRate(pool map[string]*PoolClient,
  20. last string) (addr string) {
  21. total := 0
  22. for _, item := range pool {
  23. total += item.Rate
  24. if rand.Intn(total) < item.Rate {
  25. return item.addr
  26. }
  27. }
  28. return last
  29. }
  30. func SelectRandom(pool map[string]*PoolClient,
  31. last string) (addr string) {
  32. r := rand.Intn(len(pool))
  33. i := 0
  34. for k, _ := range pool {
  35. if r == i {
  36. return k
  37. }
  38. i++
  39. }
  40. return last
  41. }
  42. type Pool struct {
  43. SelectionHandler SelectionHandler
  44. ErrorHandler ErrorHandler
  45. Clients map[string]*PoolClient
  46. last string
  47. mutex sync.Mutex
  48. }
  49. // NewPool returns a new pool.
  50. func NewPool() (pool *Pool) {
  51. return &Pool{
  52. Clients: make(map[string]*PoolClient, poolSize),
  53. SelectionHandler: SelectWithRate,
  54. }
  55. }
  56. // Add a server with rate.
  57. func (pool *Pool) Add(net, addr string, rate int) (err error) {
  58. pool.mutex.Lock()
  59. defer pool.mutex.Unlock()
  60. var item *PoolClient
  61. var ok bool
  62. if item, ok = pool.Clients[addr]; ok {
  63. item.Rate = rate
  64. } else {
  65. var client *Client
  66. client, err = New(net, addr)
  67. if err == nil {
  68. item = &PoolClient{Client: client, Rate: rate}
  69. pool.Clients[addr] = item
  70. }
  71. }
  72. return
  73. }
  74. // Remove a server.
  75. func (pool *Pool) Remove(addr string) {
  76. pool.mutex.Lock()
  77. defer pool.mutex.Unlock()
  78. delete(pool.Clients, addr)
  79. }
  80. func (pool *Pool) Do(funcname string, data []byte,
  81. flag byte, h ResponseHandler) (addr, handle string, err error) {
  82. client := pool.selectServer()
  83. client.Lock()
  84. defer client.Unlock()
  85. handle, err = client.Do(funcname, data, flag, h)
  86. addr = client.addr
  87. return
  88. }
  89. func (pool *Pool) DoBg(funcname string, data []byte,
  90. flag byte) (addr, handle string, err error) {
  91. client := pool.selectServer()
  92. client.Lock()
  93. defer client.Unlock()
  94. handle, err = client.DoBg(funcname, data, flag)
  95. addr = client.addr
  96. return
  97. }
  98. // Status gets job status from job server.
  99. // !!!Not fully tested.!!!
  100. func (pool *Pool) Status(addr, handle string) (status *Status, err error) {
  101. if client, ok := pool.Clients[addr]; ok {
  102. client.Lock()
  103. defer client.Unlock()
  104. status, err = client.Status(handle)
  105. } else {
  106. err = ErrNotFound
  107. }
  108. return
  109. }
  110. // Send a something out, get the samething back.
  111. func (pool *Pool) Echo(addr string, data []byte) (echo []byte, err error) {
  112. var client *PoolClient
  113. if addr == "" {
  114. client = pool.selectServer()
  115. } else {
  116. var ok bool
  117. if client, ok = pool.Clients[addr]; !ok {
  118. err = ErrNotFound
  119. return
  120. }
  121. }
  122. client.Lock()
  123. defer client.Unlock()
  124. echo, err = client.Echo(data)
  125. return
  126. }
  127. // Close
  128. func (pool *Pool) Close() (err map[string]error) {
  129. err = make(map[string]error)
  130. for _, c := range pool.Clients {
  131. err[c.addr] = c.Close()
  132. }
  133. return
  134. }
  135. // selecting server
  136. func (pool *Pool) selectServer() (client *PoolClient) {
  137. for client == nil {
  138. addr := pool.SelectionHandler(pool.Clients, pool.last)
  139. var ok bool
  140. if client, ok = pool.Clients[addr]; ok {
  141. pool.last = addr
  142. break
  143. }
  144. }
  145. return
  146. }