You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

318 lines
7.4 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. "time"
  7. "github.com/mikespook/gearman-go/common"
  8. )
  9. const (
  10. Unlimited = 0
  11. OneByOne = 1
  12. Immediately = 0
  13. )
  14. var (
  15. ErrConnection = common.ErrConnection
  16. )
  17. // Job handler
  18. type JobHandler func(*Job) error
  19. type JobFunc func(*Job) ([]byte, error)
  20. // The definition of the callback function.
  21. type jobFunc struct {
  22. f JobFunc
  23. timeout uint32
  24. }
  25. // Map for added function.
  26. type JobFuncs map[string]*jobFunc
  27. /*
  28. Worker side api for gearman
  29. usage:
  30. w = worker.New(worker.Unlimited)
  31. w.AddFunction("foobar", foobar)
  32. w.AddServer("127.0.0.1:4730")
  33. w.Work() // Enter the worker's main loop
  34. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  35. It looks like this:
  36. func foobar(job *Job) (data []byte, err os.Error) {
  37. //sth. here
  38. //plaplapla...
  39. return
  40. }
  41. */
  42. type Worker struct {
  43. agents []*agent
  44. funcs JobFuncs
  45. in chan *Job
  46. running bool
  47. limit chan bool
  48. Id string
  49. // assign a ErrFunc to handle errors
  50. ErrHandler common.ErrorHandler
  51. JobHandler JobHandler
  52. }
  53. // Get a new worker
  54. func New(l int) (worker *Worker) {
  55. worker = &Worker{
  56. agents: make([]*agent, 0),
  57. funcs: make(JobFuncs),
  58. in: make(chan *Job, common.QUEUE_SIZE),
  59. }
  60. if l != Unlimited {
  61. worker.limit = make(chan bool, l)
  62. }
  63. return
  64. }
  65. //
  66. func (worker *Worker)err(e error) {
  67. if worker.ErrHandler != nil {
  68. worker.ErrHandler(e)
  69. }
  70. }
  71. // Add a server. The addr should be 'host:port' format.
  72. // The connection is established at this time.
  73. func (worker *Worker) AddServer(addr string) (err error) {
  74. // Create a new job server's client as a agent of server
  75. server, err := newAgent(addr, worker)
  76. if err != nil {
  77. return err
  78. }
  79. worker.agents = append(worker.agents, server)
  80. return
  81. }
  82. // Write a job to job server.
  83. // Here, the job's mean is not the oraginal mean.
  84. // Just looks like a network package for job's result or tell job server, there was a fail.
  85. func (worker *Worker) broadcast(job *Job) {
  86. for _, v := range worker.agents {
  87. v.WriteJob(job)
  88. }
  89. }
  90. // Add a function.
  91. // Plz added job servers first, then functions.
  92. // The API will tell every connected job server that 'I can do this'
  93. func (worker *Worker) AddFunc(funcname string,
  94. f JobFunc, timeout uint32) (err error) {
  95. if _, ok := worker.funcs[funcname]; ok {
  96. return common.Errorf("The function already exists: %s", funcname)
  97. }
  98. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  99. if worker.running {
  100. worker.addFunc(funcname, timeout)
  101. }
  102. return
  103. }
  104. // inner add function
  105. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  106. var datatype uint32
  107. var data []byte
  108. if timeout == 0 {
  109. datatype = common.CAN_DO
  110. data = []byte(funcname)
  111. } else {
  112. datatype = common.CAN_DO_TIMEOUT
  113. data = []byte(funcname + "\x00")
  114. t := common.Uint32ToBytes(timeout)
  115. data = append(data, t[:]...)
  116. }
  117. job := newJob(common.REQ, datatype, data)
  118. worker.broadcast(job)
  119. }
  120. // Remove a function.
  121. // Tell job servers 'I can not do this now' at the same time.
  122. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  123. if _, ok := worker.funcs[funcname]; !ok {
  124. return common.Errorf("The function does not exist: %s", funcname)
  125. }
  126. delete(worker.funcs, funcname)
  127. if worker.running {
  128. worker.removeFunc(funcname)
  129. }
  130. return
  131. }
  132. // inner remove function
  133. func (worker *Worker) removeFunc(funcname string) {
  134. job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
  135. worker.broadcast(job)
  136. }
  137. func (worker *Worker) dealJob(job *Job) {
  138. defer func() {
  139. job.Close()
  140. if worker.running && worker.limit != nil {
  141. <-worker.limit
  142. }
  143. }()
  144. switch job.DataType {
  145. case common.ERROR:
  146. _, err := common.GetError(job.Data)
  147. worker.err(err)
  148. case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
  149. if err := worker.exec(job); err != nil {
  150. worker.err(err)
  151. }
  152. default:
  153. worker.handleJob(job)
  154. }
  155. }
  156. // Main loop
  157. func (worker *Worker) Work() {
  158. defer func() {
  159. for _, v := range worker.agents {
  160. v.Close()
  161. }
  162. }()
  163. worker.running = true
  164. for _, v := range worker.agents {
  165. go v.Work()
  166. }
  167. for funcname, f := range worker.funcs {
  168. worker.addFunc(funcname, f.timeout)
  169. }
  170. ok := true
  171. for ok {
  172. var job *Job
  173. if job, ok = <-worker.in; ok {
  174. go worker.dealJob(job)
  175. }
  176. }
  177. }
  178. // job handler
  179. func (worker *Worker) handleJob(job *Job) {
  180. if worker.JobHandler != nil {
  181. if err := worker.JobHandler(job); err != nil {
  182. worker.err(err)
  183. }
  184. }
  185. }
  186. // Close.
  187. func (worker *Worker) Close() {
  188. worker.running = false
  189. close(worker.in)
  190. if worker.limit != nil {
  191. close(worker.limit)
  192. }
  193. }
  194. // Send a something out, get the samething back.
  195. func (worker *Worker) Echo(data []byte) {
  196. job := newJob(common.REQ, common.ECHO_REQ, data)
  197. worker.broadcast(job)
  198. }
  199. // Remove all of functions.
  200. // Both from the worker or job servers.
  201. func (worker *Worker) Reset() {
  202. job := newJob(common.REQ, common.RESET_ABILITIES, nil)
  203. worker.broadcast(job)
  204. worker.funcs = make(JobFuncs)
  205. }
  206. // Set the worker's unique id.
  207. func (worker *Worker) SetId(id string) {
  208. worker.Id = id
  209. job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id))
  210. worker.broadcast(job)
  211. }
  212. // Execute the job. And send back the result.
  213. func (worker *Worker) exec(job *Job) (err error) {
  214. defer func() {
  215. if r := recover(); r != nil {
  216. if e, ok := r.(error); ok {
  217. err = e
  218. } else {
  219. err = common.ErrUnknown
  220. }
  221. }
  222. } ()
  223. f, ok := worker.funcs[job.Fn]
  224. if !ok {
  225. return common.Errorf("The function does not exist: %s", job.Fn)
  226. }
  227. var r *result
  228. if f.timeout == 0 {
  229. d, e := f.f(job)
  230. r = &result{data:d, err: e}
  231. } else {
  232. r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second)
  233. }
  234. var datatype uint32
  235. if r.err == nil {
  236. datatype = common.WORK_COMPLETE
  237. } else {
  238. if r.data == nil {
  239. datatype = common.WORK_FAIL
  240. } else {
  241. datatype = common.WORK_EXCEPTION
  242. }
  243. err = r.err
  244. }
  245. job.magicCode = common.REQ
  246. job.DataType = datatype
  247. job.Data = r.data
  248. if worker.running {
  249. job.agent.WriteJob(job)
  250. }
  251. return
  252. }
  253. func (worker *Worker) removeAgent(a *agent) {
  254. for k, v := range worker.agents {
  255. if v == a {
  256. worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...)
  257. }
  258. }
  259. if len(worker.agents) == 0 {
  260. worker.err(common.ErrNoActiveAgent)
  261. }
  262. }
  263. type result struct {
  264. data []byte
  265. err error
  266. }
  267. func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) {
  268. rslt := make(chan *result)
  269. defer close(rslt)
  270. go func() {
  271. defer func() {recover()}()
  272. d, e := f(job)
  273. rslt <- &result{data: d, err: e}
  274. }()
  275. select {
  276. case r = <-rslt:
  277. case <-time.After(timeout):
  278. go job.cancel()
  279. return &result{err:common.ErrTimeOut}
  280. }
  281. return r
  282. }