You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

166 lines
3.6 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com>.
  2. // All rights reserved.
  3. // Use of this source code is governed by a MIT
  4. // license that can be found in the LICENSE file.
  5. package client
  6. import (
  7. "fmt"
  8. "time"
  9. "errors"
  10. "math/rand"
  11. "github.com/mikespook/gearman-go/common"
  12. )
  13. const (
  14. PoolSize = 10
  15. DefaultRetry = 5
  16. DefaultTimeout = 30 * time.Second
  17. )
  18. var (
  19. ErrTooMany = errors.New("Too many errors occurred.")
  20. )
  21. type poolItem struct {
  22. *Client
  23. Rate int
  24. Addr string
  25. }
  26. func (item *poolItem) connect(pool *Pool) (err error) {
  27. if item.Client, err = New(item.Addr); err != nil {
  28. return
  29. }
  30. if pool.ErrHandler != nil {
  31. item.ErrHandler = pool.ErrHandler
  32. }
  33. if pool.JobHandler != nil {
  34. item.JobHandler = pool.JobHandler
  35. }
  36. if pool.StatusHandler != nil {
  37. item.StatusHandler = pool.StatusHandler
  38. }
  39. item.TimeOut = pool.TimeOut
  40. return
  41. }
  42. type SelectionHandler func(map[string]*poolItem, string) string
  43. func SelectWithRate(pool map[string]*poolItem,
  44. last string) (addr string) {
  45. total := 0
  46. for _, item := range pool {
  47. total += item.Rate
  48. if rand.Intn(total) < item.Rate {
  49. return item.Addr
  50. }
  51. }
  52. return last
  53. }
  54. func SelectRandom(pool map[string]*poolItem,
  55. last string) (addr string) {
  56. r := rand.Intn(len(pool))
  57. i := 0
  58. for k, _ := range pool {
  59. if r == i {
  60. return k
  61. }
  62. i ++
  63. }
  64. return last
  65. }
  66. type Pool struct {
  67. SelectionHandler SelectionHandler
  68. ErrHandler common.ErrorHandler
  69. JobHandler JobHandler
  70. StatusHandler StatusHandler
  71. TimeOut time.Duration
  72. Retry int
  73. items map[string]*poolItem
  74. last string
  75. handles map[string]string
  76. }
  77. // Create a new pool.
  78. func NewPool() (pool *Pool) {
  79. return &Pool{
  80. items: make(map[string]*poolItem, PoolSize),
  81. Retry: DefaultRetry,
  82. SelectionHandler: SelectWithRate,
  83. TimeOut: DefaultTimeout,
  84. }
  85. }
  86. // Add a server with rate.
  87. func (pool *Pool) Add(addr string, rate int) (err error) {
  88. var item *poolItem
  89. var ok bool
  90. if item, ok = pool.items[addr]; ok {
  91. item.Rate = rate
  92. } else {
  93. item = &poolItem{Rate: rate, Addr: addr}
  94. if err = item.connect(pool); err != nil {
  95. return
  96. }
  97. pool.items[addr] = item
  98. }
  99. return
  100. }
  101. func (pool *Pool) Do(funcname string, data []byte,
  102. flag byte) (addr, handle string, err error) {
  103. for i := 0; i < pool.Retry; i ++ {
  104. addr = pool.SelectionHandler(pool.items, pool.last)
  105. item, ok := pool.items[addr]
  106. if ok {
  107. pool.last = addr
  108. handle, err = item.Do(funcname, data, flag)
  109. // error handling
  110. // mapping the handle to the server
  111. return
  112. }
  113. }
  114. err = ErrTooMany
  115. return
  116. }
  117. // Get job status from job server.
  118. // !!!Not fully tested.!!!
  119. func (pool *Pool) Status(addr, handle string) {
  120. if item, ok := pool.items[addr]; ok {
  121. item.Status(handle)
  122. }
  123. }
  124. // Send a something out, get the samething back.
  125. func (pool *Pool) Echo(data []byte) {
  126. for i := 0; i < pool.Retry; i ++ {
  127. addr := pool.SelectionHandler(pool.items, pool.last)
  128. item, ok := pool.items[addr]
  129. if ok {
  130. pool.last = addr
  131. item.Echo(data)
  132. }
  133. }
  134. }
  135. // Close
  136. func (pool *Pool) Close() (err map[string]error) {
  137. err = make(map[string]error)
  138. for _, c := range pool.items {
  139. fmt.Printf("begin")
  140. err[c.Addr] = c.Close()
  141. fmt.Printf("end")
  142. }
  143. fmt.Print("end-for")
  144. return
  145. }