You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

163 lines
3.5 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com>.
  2. // All rights reserved.
  3. // Use of this source code is governed by a MIT
  4. // license that can be found in the LICENSE file.
  5. package client
  6. import (
  7. "sync"
  8. "time"
  9. "errors"
  10. "math/rand"
  11. "github.com/mikespook/gearman-go/common"
  12. )
  13. const (
  14. PoolSize = 10
  15. )
  16. var (
  17. ErrNotFound = errors.New("Server Not Found")
  18. )
  19. type poolClient struct {
  20. *Client
  21. Rate int
  22. }
  23. type SelectionHandler func(map[string]*poolClient, string) string
  24. func SelectWithRate(pool map[string]*poolClient,
  25. last string) (addr string) {
  26. total := 0
  27. for _, item := range pool {
  28. total += item.Rate
  29. if rand.Intn(total) < item.Rate {
  30. return item.addr
  31. }
  32. }
  33. return last
  34. }
  35. func SelectRandom(pool map[string]*poolClient,
  36. last string) (addr string) {
  37. r := rand.Intn(len(pool))
  38. i := 0
  39. for k, _ := range pool {
  40. if r == i {
  41. return k
  42. }
  43. i ++
  44. }
  45. return last
  46. }
  47. type Pool struct {
  48. SelectionHandler SelectionHandler
  49. ErrHandler common.ErrorHandler
  50. clients map[string]*poolClient
  51. last string
  52. mutex sync.Mutex
  53. }
  54. // Create a new pool.
  55. func NewPool() (pool *Pool) {
  56. return &Pool{
  57. clients: make(map[string]*poolClient, PoolSize),
  58. SelectionHandler: SelectWithRate,
  59. }
  60. }
  61. // Add a server with rate.
  62. func (pool *Pool) Add(addr string, rate int) (err error) {
  63. pool.mutex.Lock()
  64. defer pool.mutex.Unlock()
  65. var item *poolClient
  66. var ok bool
  67. if item, ok = pool.clients[addr]; ok {
  68. item.Rate = rate
  69. } else {
  70. var client *Client
  71. client, err = New(addr)
  72. item = &poolClient{Client: client, Rate: rate}
  73. pool.clients[addr] = item
  74. }
  75. return
  76. }
  77. // Remove a server.
  78. func (pool *Pool) Remove(addr string) {
  79. pool.mutex.Lock()
  80. defer pool.mutex.Unlock()
  81. delete(pool.clients, addr)
  82. }
  83. func (pool *Pool) Do(funcname string, data []byte,
  84. flag byte, h JobHandler) (addr, handle string) {
  85. client := pool.selectServer()
  86. handle = client.Do(funcname, data, flag, h)
  87. addr = client.addr
  88. return
  89. }
  90. func (pool *Pool) DoBg(funcname string, data []byte,
  91. flag byte) (addr, handle string) {
  92. client := pool.selectServer()
  93. handle = client.DoBg(funcname, data, flag)
  94. addr = client.addr
  95. return
  96. }
  97. // Get job status from job server.
  98. // !!!Not fully tested.!!!
  99. func (pool *Pool) Status(addr, handle string, timeout time.Duration) (status *Status, err error) {
  100. if client, ok := pool.clients[addr]; ok {
  101. status, err = client.Status(handle, timeout)
  102. } else {
  103. err = ErrNotFound
  104. }
  105. return
  106. }
  107. // Send a something out, get the samething back.
  108. func (pool *Pool) Echo(addr string, data []byte, timeout time.Duration) (r []byte, err error) {
  109. var client *poolClient
  110. if addr == "" {
  111. client = pool.selectServer()
  112. } else {
  113. var ok bool
  114. if client, ok = pool.clients[addr]; !ok {
  115. err = ErrNotFound
  116. return
  117. }
  118. }
  119. r, err = client.Echo(data, timeout)
  120. return
  121. }
  122. // Close
  123. func (pool *Pool) Close() (err map[string]error) {
  124. err = make(map[string]error)
  125. for _, c := range pool.clients {
  126. err[c.addr] = c.Close()
  127. }
  128. return
  129. }
  130. func (pool *Pool) selectServer() (client *poolClient) {
  131. for client == nil {
  132. addr := pool.SelectionHandler(pool.clients, pool.last)
  133. var ok bool
  134. if client, ok = pool.clients[addr]; ok {
  135. pool.last = addr
  136. break
  137. }
  138. }
  139. return
  140. }