You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

309 lines
6.5 KiB

  1. // The worker package helps developers to develop Gearman's worker
  2. // in an easy way.
  3. package worker
  4. import (
  5. "encoding/binary"
  6. "fmt"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. Unlimited = iota
  12. OneByOne
  13. )
  14. // Worker is the only structure needed by worker side developing.
  15. // It can connect to multi-server and grab jobs.
  16. type Worker struct {
  17. sync.Mutex
  18. agents []*agent
  19. funcs jobFuncs
  20. in chan *inPack
  21. running bool
  22. Id string
  23. ErrorHandler ErrorHandler
  24. JobHandler JobHandler
  25. limit chan bool
  26. }
  27. // Return a worker.
  28. //
  29. // If limit is set to Unlimited(=0), the worker will grab all jobs
  30. // and execute them parallelly.
  31. // If limit is greater than zero, the number of paralled executing
  32. // jobs are limited under the number. If limit is assgined to
  33. // OneByOne(=1), there will be only one job executed in a time.
  34. func New(limit int) (worker *Worker) {
  35. worker = &Worker{
  36. agents: make([]*agent, 0, limit),
  37. funcs: make(jobFuncs),
  38. in: make(chan *inPack, queueSize),
  39. }
  40. if limit != Unlimited {
  41. worker.limit = make(chan bool, limit-1)
  42. }
  43. return
  44. }
  45. // inner error handling
  46. func (worker *Worker) err(e error) {
  47. if worker.ErrorHandler != nil {
  48. worker.ErrorHandler(e)
  49. }
  50. }
  51. // Add a Gearman job server.
  52. //
  53. // addr should be formated as 'host:port'.
  54. func (worker *Worker) AddServer(net, addr string) (err error) {
  55. // Create a new job server's client as a agent of server
  56. a, err := newAgent(net, addr, worker)
  57. if err != nil {
  58. return err
  59. }
  60. worker.agents = append(worker.agents, a)
  61. return
  62. }
  63. // Broadcast an outpack to all Gearman server.
  64. func (worker *Worker) broadcast(outpack *outPack) {
  65. for _, v := range worker.agents {
  66. v.write(outpack)
  67. }
  68. }
  69. // Add a function.
  70. // Set timeout as Unlimited(=0) to disable executing timeout.
  71. func (worker *Worker) AddFunc(funcname string,
  72. f JobFunc, timeout uint32) (err error) {
  73. worker.Lock()
  74. defer worker.Unlock()
  75. if _, ok := worker.funcs[funcname]; ok {
  76. return fmt.Errorf("The function already exists: %s", funcname)
  77. }
  78. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  79. if worker.running {
  80. worker.addFunc(funcname, timeout)
  81. }
  82. return
  83. }
  84. // inner add
  85. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  86. outpack := getOutPack()
  87. if timeout == 0 {
  88. outpack.dataType = dtCanDo
  89. outpack.data = []byte(funcname)
  90. } else {
  91. outpack.dataType = dtCanDoTimeout
  92. l := len(funcname)
  93. outpack.data = getBuffer(l + 5)
  94. copy(outpack.data, []byte(funcname))
  95. outpack.data[l] = '\x00'
  96. binary.BigEndian.PutUint32(outpack.data[l+1:], timeout)
  97. }
  98. worker.broadcast(outpack)
  99. }
  100. // Remove a function.
  101. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  102. worker.Lock()
  103. defer worker.Unlock()
  104. if _, ok := worker.funcs[funcname]; !ok {
  105. return fmt.Errorf("The function does not exist: %s", funcname)
  106. }
  107. delete(worker.funcs, funcname)
  108. if worker.running {
  109. worker.removeFunc(funcname)
  110. }
  111. return
  112. }
  113. // inner remove
  114. func (worker *Worker) removeFunc(funcname string) {
  115. outpack := getOutPack()
  116. outpack.dataType = dtCantDo
  117. outpack.data = []byte(funcname)
  118. worker.broadcast(outpack)
  119. }
  120. // inner package handling
  121. func (worker *Worker) handleInPack(inpack *inPack) {
  122. switch inpack.dataType {
  123. case dtNoJob:
  124. inpack.a.PreSleep()
  125. case dtNoop:
  126. inpack.a.Grab()
  127. case dtJobAssign, dtJobAssignUniq:
  128. go func() {
  129. if err := worker.exec(inpack); err != nil {
  130. worker.err(err)
  131. }
  132. }()
  133. if worker.limit != nil {
  134. worker.limit <- true
  135. }
  136. inpack.a.Grab()
  137. case dtError:
  138. worker.err(inpack.Err())
  139. fallthrough
  140. case dtEchoRes:
  141. fallthrough
  142. default:
  143. worker.customeHandler(inpack)
  144. }
  145. }
  146. // Connect to Gearman server and tell every server
  147. // what can this worker do.
  148. func (worker *Worker) Ready() (err error) {
  149. if len(worker.agents) == 0 {
  150. return ErrNoneAgents
  151. }
  152. if len(worker.funcs) == 0 {
  153. return ErrNoneFuncs
  154. }
  155. for _, a := range worker.agents {
  156. if err = a.Connect(); err != nil {
  157. return
  158. }
  159. }
  160. for funcname, f := range worker.funcs {
  161. worker.addFunc(funcname, f.timeout)
  162. }
  163. return
  164. }
  165. // Main loop, block here
  166. // Most of time, this should be evaluated in goroutine.
  167. func (worker *Worker) Work() {
  168. defer func() {
  169. for _, a := range worker.agents {
  170. a.Close()
  171. }
  172. }()
  173. worker.running = true
  174. for _, a := range worker.agents {
  175. a.Grab()
  176. }
  177. var inpack *inPack
  178. for inpack = range worker.in {
  179. worker.handleInPack(inpack)
  180. }
  181. }
  182. // custome handling warper
  183. func (worker *Worker) customeHandler(inpack *inPack) {
  184. if worker.JobHandler != nil {
  185. if err := worker.JobHandler(inpack); err != nil {
  186. worker.err(err)
  187. }
  188. }
  189. }
  190. // Close connection and exit main loop
  191. func (worker *Worker) Close() {
  192. worker.Lock()
  193. worker.Unlock()
  194. if worker.running == true {
  195. worker.running = false
  196. close(worker.in)
  197. }
  198. }
  199. // Echo
  200. func (worker *Worker) Echo(data []byte) {
  201. outpack := getOutPack()
  202. outpack.dataType = dtEchoReq
  203. outpack.data = data
  204. worker.broadcast(outpack)
  205. }
  206. // Remove all of functions.
  207. // Both from the worker and job servers.
  208. func (worker *Worker) Reset() {
  209. outpack := getOutPack()
  210. outpack.dataType = dtResetAbilities
  211. worker.broadcast(outpack)
  212. worker.funcs = make(jobFuncs)
  213. }
  214. // Set the worker's unique id.
  215. func (worker *Worker) SetId(id string) {
  216. worker.Id = id
  217. outpack := getOutPack()
  218. outpack.dataType = dtSetClientId
  219. outpack.data = []byte(id)
  220. worker.broadcast(outpack)
  221. }
  222. // inner job executing
  223. func (worker *Worker) exec(inpack *inPack) (err error) {
  224. defer func() {
  225. if worker.limit != nil {
  226. <-worker.limit
  227. }
  228. if r := recover(); r != nil {
  229. if e, ok := r.(error); ok {
  230. err = e
  231. } else {
  232. err = ErrUnknown
  233. }
  234. }
  235. }()
  236. f, ok := worker.funcs[inpack.fn]
  237. if !ok {
  238. return fmt.Errorf("The function does not exist: %s", inpack.fn)
  239. }
  240. var r *result
  241. if f.timeout == 0 {
  242. d, e := f.f(inpack)
  243. r = &result{data: d, err: e}
  244. } else {
  245. r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
  246. }
  247. if worker.running {
  248. outpack := getOutPack()
  249. if r.err == nil {
  250. outpack.dataType = dtWorkComplete
  251. } else {
  252. if len(r.data) == 0 {
  253. outpack.dataType = dtWorkFail
  254. } else {
  255. outpack.dataType = dtWorkException
  256. }
  257. err = r.err
  258. }
  259. outpack.handle = inpack.handle
  260. outpack.data = r.data
  261. inpack.a.write(outpack)
  262. }
  263. return
  264. }
  265. // inner result
  266. type result struct {
  267. data []byte
  268. err error
  269. }
  270. // executing timer
  271. func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
  272. rslt := make(chan *result)
  273. defer close(rslt)
  274. go func() {
  275. defer func() { recover() }()
  276. d, e := f(job)
  277. rslt <- &result{data: d, err: e}
  278. }()
  279. select {
  280. case r = <-rslt:
  281. case <-time.After(timeout):
  282. return &result{err: ErrTimeOut}
  283. }
  284. return r
  285. }