You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

294 lines
7.1 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. "bytes"
  7. "bitbucket.org/mikespook/gearman-go/common"
  8. )
  9. const (
  10. Unlimited = 0
  11. OneByOne = 1
  12. )
  13. var (
  14. ErrConnection = common.ErrConnection
  15. )
  16. // Job handler
  17. type JobHandler func(*Job) error
  18. type JobFunc func(job *Job) ([]byte, error)
  19. // The definition of the callback function.
  20. type jobFunc struct {
  21. f JobFunc
  22. timeout uint32
  23. }
  24. // Map for added function.
  25. type JobFuncs map[string]*jobFunc
  26. /*
  27. Worker side api for gearman
  28. usage:
  29. w = worker.New(worker.Unlimited)
  30. w.AddFunction("foobar", foobar)
  31. w.AddServer("127.0.0.1:4730")
  32. w.Work() // Enter the worker's main loop
  33. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  34. It looks like this:
  35. func foobar(job *Job) (data []byte, err os.Error) {
  36. //sth. here
  37. //plaplapla...
  38. return
  39. }
  40. */
  41. type Worker struct {
  42. agents []*agent
  43. funcs JobFuncs
  44. in chan *Job
  45. running bool
  46. limit chan bool
  47. Id string
  48. // assign a ErrFunc to handle errors
  49. ErrHandler common.ErrorHandler
  50. JobHandler JobHandler
  51. }
  52. // Get a new worker
  53. func New(l int) (worker *Worker) {
  54. worker = &Worker{
  55. agents: make([]*agent, 0),
  56. funcs: make(JobFuncs),
  57. in: make(chan *Job, common.QUEUE_SIZE),
  58. }
  59. if l != Unlimited {
  60. worker.limit = make(chan bool, l)
  61. for i := 0; i < l; i ++ {
  62. worker.limit <- true
  63. }
  64. }
  65. return
  66. }
  67. //
  68. func (worker *Worker)err(e error) {
  69. if worker.ErrHandler != nil {
  70. worker.ErrHandler(e)
  71. }
  72. }
  73. // Add a server. The addr should be 'host:port' format.
  74. // The connection is established at this time.
  75. func (worker *Worker) AddServer(addr string) (err error) {
  76. // Create a new job server's client as a agent of server
  77. server, err := newAgent(addr, worker)
  78. if err != nil {
  79. return err
  80. }
  81. worker.agents = append(worker.agents, server)
  82. return
  83. }
  84. // Write a job to job server.
  85. // Here, the job's mean is not the oraginal mean.
  86. // Just looks like a network package for job's result or tell job server, there was a fail.
  87. func (worker *Worker) broadcast(job *Job) {
  88. for _, v := range worker.agents {
  89. v.WriteJob(job)
  90. }
  91. }
  92. // Add a function.
  93. // Plz added job servers first, then functions.
  94. // The API will tell every connected job server that 'I can do this'
  95. func (worker *Worker) AddFunc(funcname string,
  96. f JobFunc, timeout uint32) (err error) {
  97. if _, ok := worker.funcs[funcname]; ok {
  98. return common.Errorf("The function already exists: %s", funcname)
  99. }
  100. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  101. if worker.running {
  102. worker.addFunc(funcname, timeout)
  103. }
  104. return
  105. }
  106. // inner add function
  107. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  108. var datatype uint32
  109. var data []byte
  110. if timeout == 0 {
  111. datatype = common.CAN_DO
  112. data = []byte(funcname)
  113. } else {
  114. datatype = common.CAN_DO_TIMEOUT
  115. data = []byte(funcname + "\x00")
  116. t := common.Uint32ToBytes(timeout)
  117. data = append(data, t[:]...)
  118. }
  119. job := newJob(common.REQ, datatype, data)
  120. worker.broadcast(job)
  121. }
  122. // Remove a function.
  123. // Tell job servers 'I can not do this now' at the same time.
  124. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  125. if _, ok := worker.funcs[funcname]; !ok {
  126. return common.Errorf("The function does not exist: %s", funcname)
  127. }
  128. delete(worker.funcs, funcname)
  129. if worker.running {
  130. worker.removeFunc(funcname)
  131. }
  132. return
  133. }
  134. // inner remove function
  135. func (worker *Worker) removeFunc(funcname string) {
  136. job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
  137. worker.broadcast(job)
  138. }
  139. // Main loop
  140. func (worker *Worker) Work() {
  141. defer func() {
  142. worker.running = false
  143. for _, v := range worker.agents {
  144. v.Close()
  145. }
  146. }()
  147. for funcname, f := range worker.funcs {
  148. worker.addFunc(funcname, f.timeout)
  149. }
  150. worker.running = true
  151. for _, v := range worker.agents {
  152. go v.Work()
  153. }
  154. ok := true
  155. var job *Job
  156. for ok {
  157. if job, ok = <-worker.in; ok {
  158. switch job.DataType {
  159. case common.ERROR:
  160. go func() {
  161. _, err := common.GetError(job.Data)
  162. worker.err(err)
  163. }()
  164. case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
  165. go func() {
  166. if err := worker.exec(job); err != nil {
  167. worker.err(err)
  168. }
  169. }()
  170. default:
  171. go worker.handleJob(job)
  172. }
  173. }
  174. }
  175. }
  176. // job handler
  177. func (worker *Worker) handleJob(job *Job) {
  178. if worker.JobHandler != nil {
  179. if err := worker.JobHandler(job); err != nil {
  180. worker.err(err)
  181. }
  182. }
  183. }
  184. // Close.
  185. func (worker *Worker) Close() {
  186. close(worker.in)
  187. if worker.limit != nil {
  188. close(worker.limit)
  189. }
  190. }
  191. // Send a something out, get the samething back.
  192. func (worker *Worker) Echo(data []byte) {
  193. job := newJob(common.REQ, common.ECHO_REQ, data)
  194. worker.broadcast(job)
  195. }
  196. // Remove all of functions.
  197. // Both from the worker or job servers.
  198. func (worker *Worker) Reset() {
  199. job := newJob(common.REQ, common.RESET_ABILITIES, nil)
  200. worker.broadcast(job)
  201. worker.funcs = make(JobFuncs)
  202. }
  203. // Set the worker's unique id.
  204. func (worker *Worker) SetId(id string) {
  205. worker.Id = id
  206. job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id))
  207. worker.broadcast(job)
  208. }
  209. // Execute the job. And send back the result.
  210. func (worker *Worker) exec(job *Job) (err error) {
  211. if worker.limit != nil {
  212. <-worker.limit
  213. defer func() {
  214. worker.limit <- true
  215. }()
  216. }
  217. var limit int
  218. if job.DataType == common.JOB_ASSIGN {
  219. limit = 3
  220. } else {
  221. limit = 4
  222. }
  223. jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit)
  224. job.Handle = string(jobdata[0])
  225. funcname := string(jobdata[1])
  226. if job.DataType == common.JOB_ASSIGN {
  227. job.Data = jobdata[2]
  228. } else {
  229. job.UniqueId = string(jobdata[2])
  230. job.Data = jobdata[3]
  231. }
  232. f, ok := worker.funcs[funcname]
  233. if !ok {
  234. return common.Errorf("The function does not exist: %s", funcname)
  235. }
  236. result, err := f.f(job)
  237. var datatype uint32
  238. if err == nil {
  239. datatype = common.WORK_COMPLETE
  240. } else {
  241. if result == nil {
  242. datatype = common.WORK_FAIL
  243. } else {
  244. datatype = common.WORK_EXCEPTION
  245. }
  246. }
  247. job.magicCode = common.REQ
  248. job.DataType = datatype
  249. job.Data = result
  250. job.agent.WriteJob(job)
  251. return
  252. }
  253. func (worker *Worker) removeAgent(a *agent) {
  254. for k, v := range worker.agents {
  255. if v == a {
  256. worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...)
  257. }
  258. }
  259. if len(worker.agents) == 0 {
  260. worker.err(common.ErrNoActiveAgent)
  261. }
  262. }