You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

290 lines
7.0 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. "bytes"
  7. "bitbucket.org/mikespook/gearman-go/common"
  8. )
  9. const (
  10. Unlimited = 0
  11. OneByOne = 1
  12. )
  13. // Job handler
  14. type JobHandler func(*Job) error
  15. type JobFunc func(job *Job) ([]byte, error)
  16. // The definition of the callback function.
  17. type jobFunc struct {
  18. f JobFunc
  19. timeout uint32
  20. }
  21. // Map for added function.
  22. type JobFuncs map[string]*jobFunc
  23. /*
  24. Worker side api for gearman
  25. usage:
  26. w = worker.New(worker.Unlimited)
  27. w.AddFunction("foobar", foobar)
  28. w.AddServer("127.0.0.1:4730")
  29. w.Work() // Enter the worker's main loop
  30. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  31. It looks like this:
  32. func foobar(job *Job) (data []byte, err os.Error) {
  33. //sth. here
  34. //plaplapla...
  35. return
  36. }
  37. */
  38. type Worker struct {
  39. agents []*agent
  40. funcs JobFuncs
  41. in chan *Job
  42. running bool
  43. limit chan bool
  44. Id string
  45. // assign a ErrFunc to handle errors
  46. ErrHandler common.ErrorHandler
  47. JobHandler JobHandler
  48. }
  49. // Get a new worker
  50. func New(l int) (worker *Worker) {
  51. worker = &Worker{
  52. agents: make([]*agent, 0),
  53. funcs: make(JobFuncs),
  54. in: make(chan *Job, common.QUEUE_SIZE),
  55. }
  56. if l != Unlimited {
  57. worker.limit = make(chan bool, l)
  58. for i := 0; i < l; i ++ {
  59. worker.limit <- true
  60. }
  61. }
  62. return
  63. }
  64. //
  65. func (worker *Worker)err(e error) {
  66. if worker.ErrHandler != nil {
  67. worker.ErrHandler(e)
  68. }
  69. }
  70. // Add a server. The addr should be 'host:port' format.
  71. // The connection is established at this time.
  72. func (worker *Worker) AddServer(addr string) (err error) {
  73. // Create a new job server's client as a agent of server
  74. server, err := newAgent(addr, worker)
  75. if err != nil {
  76. return err
  77. }
  78. worker.agents = append(worker.agents, server)
  79. return
  80. }
  81. // Write a job to job server.
  82. // Here, the job's mean is not the oraginal mean.
  83. // Just looks like a network package for job's result or tell job server, there was a fail.
  84. func (worker *Worker) broadcast(job *Job) {
  85. for _, v := range worker.agents {
  86. v.WriteJob(job)
  87. }
  88. }
  89. // Add a function.
  90. // Plz added job servers first, then functions.
  91. // The API will tell every connected job server that 'I can do this'
  92. func (worker *Worker) AddFunc(funcname string,
  93. f JobFunc, timeout uint32) (err error) {
  94. if _, ok := worker.funcs[funcname]; ok {
  95. return common.Errorf("The function already exists: %s", funcname)
  96. }
  97. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  98. if worker.running {
  99. worker.addFunc(funcname, timeout)
  100. }
  101. return
  102. }
  103. // inner add function
  104. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  105. var datatype uint32
  106. var data []byte
  107. if timeout == 0 {
  108. datatype = common.CAN_DO
  109. data = []byte(funcname)
  110. } else {
  111. datatype = common.CAN_DO_TIMEOUT
  112. data = []byte(funcname + "\x00")
  113. t := common.Uint32ToBytes(timeout)
  114. data = append(data, t[:]...)
  115. }
  116. job := newJob(common.REQ, datatype, data)
  117. worker.broadcast(job)
  118. }
  119. // Remove a function.
  120. // Tell job servers 'I can not do this now' at the same time.
  121. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  122. if _, ok := worker.funcs[funcname]; !ok {
  123. return common.Errorf("The function does not exist: %s", funcname)
  124. }
  125. delete(worker.funcs, funcname)
  126. if worker.running {
  127. worker.removeFunc(funcname)
  128. }
  129. return
  130. }
  131. // inner remove function
  132. func (worker *Worker) removeFunc(funcname string) {
  133. job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
  134. worker.broadcast(job)
  135. }
  136. // Main loop
  137. func (worker *Worker) Work() {
  138. defer func() {
  139. worker.running = false
  140. for _, v := range worker.agents {
  141. v.Close()
  142. }
  143. }()
  144. for funcname, f := range worker.funcs {
  145. worker.addFunc(funcname, f.timeout)
  146. }
  147. worker.running = true
  148. for _, v := range worker.agents {
  149. go v.Work()
  150. }
  151. ok := true
  152. var job *Job
  153. for ok {
  154. if job, ok = <-worker.in; ok {
  155. switch job.DataType {
  156. case common.ERROR:
  157. go func() {
  158. _, err := common.GetError(job.Data)
  159. worker.err(err)
  160. }()
  161. case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
  162. go func() {
  163. if err := worker.exec(job); err != nil {
  164. worker.err(err)
  165. }
  166. }()
  167. default:
  168. go worker.handleJob(job)
  169. }
  170. }
  171. }
  172. }
  173. // job handler
  174. func (worker *Worker) handleJob(job *Job) {
  175. if worker.JobHandler != nil {
  176. if err := worker.JobHandler(job); err != nil {
  177. worker.err(err)
  178. }
  179. }
  180. }
  181. // Close.
  182. func (worker *Worker) Close() {
  183. close(worker.in)
  184. if worker.limit != nil {
  185. close(worker.limit)
  186. }
  187. }
  188. // Send a something out, get the samething back.
  189. func (worker *Worker) Echo(data []byte) {
  190. job := newJob(common.REQ, common.ECHO_REQ, data)
  191. worker.broadcast(job)
  192. }
  193. // Remove all of functions.
  194. // Both from the worker or job servers.
  195. func (worker *Worker) Reset() {
  196. job := newJob(common.REQ, common.RESET_ABILITIES, nil)
  197. worker.broadcast(job)
  198. worker.funcs = make(JobFuncs)
  199. }
  200. // Set the worker's unique id.
  201. func (worker *Worker) SetId(id string) {
  202. worker.Id = id
  203. job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id))
  204. worker.broadcast(job)
  205. }
  206. // Execute the job. And send back the result.
  207. func (worker *Worker) exec(job *Job) (err error) {
  208. if worker.limit != nil {
  209. <-worker.limit
  210. defer func() {
  211. worker.limit <- true
  212. }()
  213. }
  214. var limit int
  215. if job.DataType == common.JOB_ASSIGN {
  216. limit = 3
  217. } else {
  218. limit = 4
  219. }
  220. jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit)
  221. job.Handle = string(jobdata[0])
  222. funcname := string(jobdata[1])
  223. if job.DataType == common.JOB_ASSIGN {
  224. job.Data = jobdata[2]
  225. } else {
  226. job.UniqueId = string(jobdata[2])
  227. job.Data = jobdata[3]
  228. }
  229. f, ok := worker.funcs[funcname]
  230. if !ok {
  231. return common.Errorf("The function does not exist: %s", funcname)
  232. }
  233. result, err := f.f(job)
  234. var datatype uint32
  235. if err == nil {
  236. datatype = common.WORK_COMPLETE
  237. } else {
  238. if result == nil {
  239. datatype = common.WORK_FAIL
  240. } else {
  241. datatype = common.WORK_EXCEPTION
  242. }
  243. }
  244. job.magicCode = common.REQ
  245. job.DataType = datatype
  246. job.Data = result
  247. job.agent.WriteJob(job)
  248. return
  249. }
  250. func (worker *Worker) removeAgent(a *agent) {
  251. for k, v := range worker.agents {
  252. if v == a {
  253. worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...)
  254. }
  255. }
  256. if len(worker.agents) == 0 {
  257. worker.err(common.ErrNoActiveAgent)
  258. }
  259. }