You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

286 lines
6.9 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. "bytes"
  7. "bitbucket.org/mikespook/gearman-go/common"
  8. )
  9. const (
  10. Unlimited = 0
  11. OneByOne = 1
  12. )
  13. // Job handler
  14. type JobHandler func(*Job) error
  15. type JobFunc func(job *Job) ([]byte, error)
  16. // The definition of the callback function.
  17. type jobFunc struct {
  18. f JobFunc
  19. timeout uint32
  20. }
  21. // Map for added function.
  22. type JobFuncs map[string]*jobFunc
  23. /*
  24. Worker side api for gearman
  25. usage:
  26. w = worker.New(worker.Unlimited)
  27. w.AddFunction("foobar", foobar)
  28. w.AddServer("127.0.0.1:4730")
  29. w.Work() // Enter the worker's main loop
  30. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  31. It looks like this:
  32. func foobar(job *Job) (data []byte, err os.Error) {
  33. //sth. here
  34. //plaplapla...
  35. return
  36. }
  37. */
  38. type Worker struct {
  39. agents []*agent
  40. funcs JobFuncs
  41. in chan *Job
  42. running bool
  43. limit chan bool
  44. Id string
  45. // assign a ErrFunc to handle errors
  46. ErrHandler common.ErrorHandler
  47. JobHandler JobHandler
  48. }
  49. // Get a new worker
  50. func New(l int) (worker *Worker) {
  51. worker = &Worker{
  52. agents: make([]*agent, 0),
  53. funcs: make(JobFuncs),
  54. in: make(chan *Job, common.QUEUE_SIZE),
  55. }
  56. if l != Unlimited {
  57. worker.limit = make(chan bool, l)
  58. for i := 0; i < l; i ++ {
  59. worker.limit <- true
  60. }
  61. }
  62. return
  63. }
  64. //
  65. func (worker *Worker)err(e error) {
  66. if worker.ErrHandler != nil {
  67. worker.ErrHandler(e)
  68. }
  69. }
  70. // Add a server. The addr should be 'host:port' format.
  71. // The connection is established at this time.
  72. func (worker *Worker) AddServer(addr string) (err error) {
  73. // Create a new job server's client as a agent of server
  74. server, err := newAgent(addr, worker)
  75. if err != nil {
  76. return err
  77. }
  78. worker.agents = append(worker.agents, server)
  79. return
  80. }
  81. // Write a job to job server.
  82. // Here, the job's mean is not the oraginal mean.
  83. // Just looks like a network package for job's result or tell job server, there was a fail.
  84. func (worker *Worker) broadcast(job *Job) {
  85. for _, v := range worker.agents {
  86. v.WriteJob(job)
  87. }
  88. }
  89. // Add a function.
  90. // Plz added job servers first, then functions.
  91. // The API will tell every connected job server that 'I can do this'
  92. func (worker *Worker) AddFunc(funcname string,
  93. f JobFunc, timeout uint32) (err error) {
  94. if _, ok := worker.funcs[funcname]; ok {
  95. return common.Errorf("The function already exists: %s", funcname)
  96. }
  97. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  98. if worker.running {
  99. worker.addFunc(funcname, timeout)
  100. }
  101. return
  102. }
  103. // inner add function
  104. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  105. var datatype uint32
  106. var data []byte
  107. if timeout == 0 {
  108. datatype = common.CAN_DO
  109. data = []byte(funcname)
  110. } else {
  111. datatype = common.CAN_DO_TIMEOUT
  112. data = []byte(funcname + "\x00")
  113. t := common.Uint32ToBytes(timeout)
  114. data = append(data, t[:]...)
  115. }
  116. job := newJob(common.REQ, datatype, data)
  117. worker.broadcast(job)
  118. }
  119. // Remove a function.
  120. // Tell job servers 'I can not do this now' at the same time.
  121. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  122. if _, ok := worker.funcs[funcname]; !ok {
  123. return common.Errorf("The function does not exist: %s", funcname)
  124. }
  125. delete(worker.funcs, funcname)
  126. if worker.running {
  127. worker.removeFunc(funcname)
  128. }
  129. return
  130. }
  131. // inner remove function
  132. func (worker *Worker) removeFunc(funcname string) {
  133. job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
  134. worker.broadcast(job)
  135. }
  136. // Main loop
  137. func (worker *Worker) Work() {
  138. defer func() {
  139. worker.running = false
  140. }()
  141. for funcname, f := range worker.funcs {
  142. worker.addFunc(funcname, f.timeout)
  143. }
  144. worker.running = true
  145. for _, v := range worker.agents {
  146. go v.Work()
  147. }
  148. ok := true
  149. for ok {
  150. if job, ok := <-worker.in; ok {
  151. switch job.DataType {
  152. case common.ERROR:
  153. go func() {
  154. _, err := common.GetError(job.Data)
  155. worker.err(err)
  156. }()
  157. case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
  158. go func() {
  159. if err := worker.exec(job); err != nil {
  160. worker.err(err)
  161. }
  162. }()
  163. default:
  164. go worker.handleJob(job)
  165. }
  166. }
  167. }
  168. }
  169. // job handler
  170. func (worker *Worker) handleJob(job *Job) {
  171. if worker.JobHandler != nil {
  172. if err := worker.JobHandler(job); err != nil {
  173. worker.err(err)
  174. }
  175. }
  176. }
  177. // Close.
  178. func (worker *Worker) Close() {
  179. close(worker.in)
  180. if worker.limit != nil {
  181. close(worker.limit)
  182. }
  183. }
  184. // Send a something out, get the samething back.
  185. func (worker *Worker) Echo(data []byte) {
  186. job := newJob(common.REQ, common.ECHO_REQ, data)
  187. worker.broadcast(job)
  188. }
  189. // Remove all of functions.
  190. // Both from the worker or job servers.
  191. func (worker *Worker) Reset() {
  192. job := newJob(common.REQ, common.RESET_ABILITIES, nil)
  193. worker.broadcast(job)
  194. worker.funcs = make(JobFuncs)
  195. }
  196. // Set the worker's unique id.
  197. func (worker *Worker) SetId(id string) {
  198. worker.Id = id
  199. job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id))
  200. worker.broadcast(job)
  201. }
  202. // Execute the job. And send back the result.
  203. func (worker *Worker) exec(job *Job) (err error) {
  204. if worker.limit != nil {
  205. <-worker.limit
  206. defer func() {
  207. worker.limit <- true
  208. }()
  209. }
  210. var limit int
  211. if job.DataType == common.JOB_ASSIGN {
  212. limit = 3
  213. } else {
  214. limit = 4
  215. }
  216. jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit)
  217. job.Handle = string(jobdata[0])
  218. funcname := string(jobdata[1])
  219. if job.DataType == common.JOB_ASSIGN {
  220. job.Data = jobdata[2]
  221. } else {
  222. job.UniqueId = string(jobdata[2])
  223. job.Data = jobdata[3]
  224. }
  225. f, ok := worker.funcs[funcname]
  226. if !ok {
  227. return common.Errorf("The function does not exist: %s", funcname)
  228. }
  229. result, err := f.f(job)
  230. var datatype uint32
  231. if err == nil {
  232. datatype = common.WORK_COMPLETE
  233. } else {
  234. if result == nil {
  235. datatype = common.WORK_FAIL
  236. } else {
  237. datatype = common.WORK_EXCEPTION
  238. }
  239. }
  240. job.magicCode = common.REQ
  241. job.DataType = datatype
  242. job.Data = result
  243. job.agent.WriteJob(job)
  244. return
  245. }
  246. func (worker *Worker) removeAgent(a *agent) {
  247. for k, v := range worker.agents {
  248. if v == a {
  249. worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...)
  250. }
  251. }
  252. if len(worker.agents) == 0 {
  253. worker.Close()
  254. }
  255. }