You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

287 lines
7.2 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. gearman "bitbucket.org/mikespook/gearman-go"
  7. "bytes"
  8. "sync"
  9. )
  10. const (
  11. Unlimit = 0
  12. OneByOne = 1
  13. )
  14. // The definition of the callback function.
  15. type JobFunction func(job *WorkerJob) ([]byte, error)
  16. // Map for added function.
  17. type JobFunctionMap map[string]JobFunction
  18. // Error Function
  19. type ErrFunc func(e error)
  20. /*
  21. Worker side api for gearman.
  22. usage:
  23. w = worker.New(worker.Unlimit)
  24. w.AddFunction("foobar", foobar)
  25. w.AddServer("127.0.0.1:4730")
  26. w.Work() // Enter the worker's main loop
  27. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  28. It looks like this:
  29. func foobar(job *WorkerJob) (data []byte, err os.Error) {
  30. //sth. here
  31. //plaplapla...
  32. return
  33. }
  34. */
  35. type Worker struct {
  36. clients []*jobAgent
  37. functions JobFunctionMap
  38. running bool
  39. incoming chan *WorkerJob
  40. mutex sync.Mutex
  41. limit chan bool
  42. JobQueue chan *WorkerJob
  43. // assign a ErrFunc to handle errors
  44. // Must assign befor AddServer
  45. ErrFunc ErrFunc
  46. }
  47. // Get a new worker
  48. func New(l int) (worker *Worker) {
  49. worker = &Worker{
  50. // job server list
  51. clients: make([]*jobAgent, 0, gearman.WORKER_SERVER_CAP),
  52. // function list
  53. functions: make(JobFunctionMap),
  54. incoming: make(chan *WorkerJob, gearman.QUEUE_CAP),
  55. JobQueue: make(chan *WorkerJob, gearman.QUEUE_CAP),
  56. running: true,
  57. }
  58. if l != Unlimit {
  59. worker.limit = make(chan bool, l)
  60. for i := 0; i < l; i ++ {
  61. worker.limit <- true
  62. }
  63. }
  64. return
  65. }
  66. //
  67. func (worker *Worker)err(e error) {
  68. if worker.ErrFunc != nil {
  69. worker.ErrFunc(e)
  70. }
  71. }
  72. // Add a server. The addr should be 'host:port' format.
  73. // The connection is established at this time.
  74. func (worker *Worker) AddServer(addr string) (err error) {
  75. worker.mutex.Lock()
  76. defer worker.mutex.Unlock()
  77. if len(worker.clients) == cap(worker.clients) {
  78. return gearman.ErrOutOfCap
  79. }
  80. // Create a new job server's client as a agent of server
  81. server, err := newJobAgent(addr, worker)
  82. if err != nil {
  83. return err
  84. }
  85. n := len(worker.clients)
  86. worker.clients = worker.clients[0 : n+1]
  87. worker.clients[n] = server
  88. return
  89. }
  90. // Add a function.
  91. // Plz added job servers first, then functions.
  92. // The API will tell every connected job server that 'I can do this'
  93. func (worker *Worker) AddFunction(funcname string,
  94. f JobFunction, timeout uint32) (err error) {
  95. if len(worker.clients) < 1 {
  96. return gearman.ErrNotConn
  97. }
  98. worker.mutex.Lock()
  99. defer worker.mutex.Unlock()
  100. worker.functions[funcname] = f
  101. var datatype uint32
  102. var data []byte
  103. if timeout == 0 {
  104. datatype = gearman.CAN_DO
  105. data = []byte(funcname)
  106. } else {
  107. datatype = gearman.CAN_DO_TIMEOUT
  108. data = []byte(funcname + "\x00")
  109. t := gearman.Uint32ToBytes(timeout)
  110. data = append(data, t[:]...)
  111. }
  112. job := NewWorkerJob(gearman.REQ, datatype, data)
  113. worker.WriteJob(job)
  114. return
  115. }
  116. // Remove a function.
  117. // Tell job servers 'I can not do this now' at the same time.
  118. func (worker *Worker) RemoveFunction(funcname string) (err error) {
  119. worker.mutex.Lock()
  120. defer worker.mutex.Unlock()
  121. if worker.functions[funcname] == nil {
  122. return gearman.ErrFuncNotFound
  123. }
  124. delete(worker.functions, funcname)
  125. job := NewWorkerJob(gearman.REQ, gearman.CANT_DO, []byte(funcname))
  126. worker.WriteJob(job)
  127. return
  128. }
  129. // Main loop
  130. func (worker *Worker) Work() {
  131. for _, v := range worker.clients {
  132. go v.Work()
  133. }
  134. for worker.running || len(worker.incoming) > 0{
  135. select {
  136. case job := <-worker.incoming:
  137. if job == nil {
  138. break
  139. }
  140. switch job.DataType {
  141. case gearman.NO_JOB:
  142. // do nothing
  143. case gearman.ERROR:
  144. _, err := gearman.GetError(job.Data)
  145. worker.err(err)
  146. case gearman.JOB_ASSIGN, gearman.JOB_ASSIGN_UNIQ:
  147. go func() {
  148. if err := worker.exec(job); err != nil {
  149. worker.err(err)
  150. }
  151. }()
  152. default:
  153. worker.JobQueue <- job
  154. }
  155. }
  156. }
  157. close(worker.incoming)
  158. }
  159. // Get the last job in queue.
  160. // If there are more than one job in the queue,
  161. // the last one will be returned,
  162. // the others will be lost.
  163. func (worker *Worker) LastJob() (job *WorkerJob) {
  164. if l := len(worker.JobQueue); l != 1 {
  165. if l == 0 {
  166. return
  167. }
  168. for i := 0; i < l-1; i++ {
  169. <-worker.JobQueue
  170. }
  171. }
  172. return <-worker.JobQueue
  173. }
  174. // Close.
  175. func (worker *Worker) Close() (err error) {
  176. for _, v := range worker.clients {
  177. err = v.Close()
  178. }
  179. worker.running = false
  180. return err
  181. }
  182. // Write a job to job server.
  183. // Here, the job's mean is not the oraginal mean.
  184. // Just looks like a network package for job's result or tell job server, there was a fail.
  185. func (worker *Worker) WriteJob(job *WorkerJob) (err error) {
  186. e := make(chan error)
  187. for _, v := range worker.clients {
  188. go func() {
  189. e <- v.WriteJob(job)
  190. }()
  191. }
  192. return <-e
  193. }
  194. // Send a something out, get the samething back.
  195. func (worker *Worker) Echo(data []byte) (err error) {
  196. job := NewWorkerJob(gearman.REQ, gearman.ECHO_REQ, data)
  197. return worker.WriteJob(job)
  198. }
  199. // Remove all of functions.
  200. // Both from the worker or job servers.
  201. func (worker *Worker) Reset() (err error) {
  202. job := NewWorkerJob(gearman.REQ, gearman.RESET_ABILITIES, nil)
  203. err = worker.WriteJob(job)
  204. worker.functions = make(JobFunctionMap)
  205. return
  206. }
  207. // Set the worker's unique id.
  208. func (worker *Worker) SetId(id string) (err error) {
  209. job := NewWorkerJob(gearman.REQ, gearman.SET_CLIENT_ID, []byte(id))
  210. return worker.WriteJob(job)
  211. }
  212. // Execute the job. And send back the result.
  213. func (worker *Worker) exec(job *WorkerJob) (err error) {
  214. if worker.limit != nil {
  215. <- worker.limit
  216. defer func() {
  217. worker.limit <- true
  218. }()
  219. }
  220. var limit int
  221. if job.DataType == gearman.JOB_ASSIGN {
  222. limit = 3
  223. } else {
  224. limit = 4
  225. }
  226. jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit)
  227. job.Handle = string(jobdata[0])
  228. funcname := string(jobdata[1])
  229. if job.DataType == gearman.JOB_ASSIGN {
  230. job.Data = jobdata[2]
  231. } else {
  232. job.UniqueId = string(jobdata[2])
  233. job.Data = jobdata[3]
  234. }
  235. f, ok := worker.functions[funcname]
  236. if !ok {
  237. return gearman.ErrFuncNotFound
  238. }
  239. result, err := f(job)
  240. var datatype uint32
  241. if err == nil {
  242. datatype = gearman.WORK_COMPLETE
  243. } else {
  244. if result == nil {
  245. datatype = gearman.WORK_FAIL
  246. } else {
  247. datatype = gearman.WORK_EXCEPTION
  248. }
  249. }
  250. job.magicCode = gearman.REQ
  251. job.DataType = datatype
  252. job.Data = result
  253. worker.WriteJob(job)
  254. return
  255. }