You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

334 lines
7.9 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. "time"
  7. "bytes"
  8. "github.com/mikespook/gearman-go/common"
  9. )
  10. const (
  11. Unlimited = 0
  12. OneByOne = 1
  13. Immediately = 0
  14. )
  15. var (
  16. ErrConnection = common.ErrConnection
  17. )
  18. // Job handler
  19. type JobHandler func(*Job) error
  20. type JobFunc func(*Job) ([]byte, error)
  21. // The definition of the callback function.
  22. type jobFunc struct {
  23. f JobFunc
  24. timeout uint32
  25. }
  26. // Map for added function.
  27. type JobFuncs map[string]*jobFunc
  28. /*
  29. Worker side api for gearman
  30. usage:
  31. w = worker.New(worker.Unlimited)
  32. w.AddFunction("foobar", foobar)
  33. w.AddServer("127.0.0.1:4730")
  34. w.Work() // Enter the worker's main loop
  35. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  36. It looks like this:
  37. func foobar(job *Job) (data []byte, err os.Error) {
  38. //sth. here
  39. //plaplapla...
  40. return
  41. }
  42. */
  43. type Worker struct {
  44. agents []*agent
  45. funcs JobFuncs
  46. in chan *Job
  47. running bool
  48. limit chan bool
  49. Id string
  50. // assign a ErrFunc to handle errors
  51. ErrHandler common.ErrorHandler
  52. JobHandler JobHandler
  53. }
  54. // Get a new worker
  55. func New(l int) (worker *Worker) {
  56. worker = &Worker{
  57. agents: make([]*agent, 0),
  58. funcs: make(JobFuncs),
  59. in: make(chan *Job, common.QUEUE_SIZE),
  60. }
  61. if l != Unlimited {
  62. worker.limit = make(chan bool, l)
  63. for i := 0; i < l; i ++ {
  64. worker.limit <- true
  65. }
  66. }
  67. return
  68. }
  69. //
  70. func (worker *Worker)err(e error) {
  71. if worker.ErrHandler != nil {
  72. worker.ErrHandler(e)
  73. }
  74. }
  75. // Add a server. The addr should be 'host:port' format.
  76. // The connection is established at this time.
  77. func (worker *Worker) AddServer(addr string) (err error) {
  78. // Create a new job server's client as a agent of server
  79. server, err := newAgent(addr, worker)
  80. if err != nil {
  81. return err
  82. }
  83. worker.agents = append(worker.agents, server)
  84. return
  85. }
  86. // Write a job to job server.
  87. // Here, the job's mean is not the oraginal mean.
  88. // Just looks like a network package for job's result or tell job server, there was a fail.
  89. func (worker *Worker) broadcast(job *Job) {
  90. for _, v := range worker.agents {
  91. v.WriteJob(job)
  92. }
  93. }
  94. // Add a function.
  95. // Plz added job servers first, then functions.
  96. // The API will tell every connected job server that 'I can do this'
  97. func (worker *Worker) AddFunc(funcname string,
  98. f JobFunc, timeout uint32) (err error) {
  99. if _, ok := worker.funcs[funcname]; ok {
  100. return common.Errorf("The function already exists: %s", funcname)
  101. }
  102. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  103. if worker.running {
  104. worker.addFunc(funcname, timeout)
  105. }
  106. return
  107. }
  108. // inner add function
  109. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  110. var datatype uint32
  111. var data []byte
  112. if timeout == 0 {
  113. datatype = common.CAN_DO
  114. data = []byte(funcname)
  115. } else {
  116. datatype = common.CAN_DO_TIMEOUT
  117. data = []byte(funcname + "\x00")
  118. t := common.Uint32ToBytes(timeout)
  119. data = append(data, t[:]...)
  120. }
  121. job := newJob(common.REQ, datatype, data)
  122. worker.broadcast(job)
  123. }
  124. // Remove a function.
  125. // Tell job servers 'I can not do this now' at the same time.
  126. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  127. if _, ok := worker.funcs[funcname]; !ok {
  128. return common.Errorf("The function does not exist: %s", funcname)
  129. }
  130. delete(worker.funcs, funcname)
  131. if worker.running {
  132. worker.removeFunc(funcname)
  133. }
  134. return
  135. }
  136. // inner remove function
  137. func (worker *Worker) removeFunc(funcname string) {
  138. job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
  139. worker.broadcast(job)
  140. }
  141. // Main loop
  142. func (worker *Worker) Work() {
  143. defer func() {
  144. worker.running = false
  145. for _, v := range worker.agents {
  146. v.Close()
  147. }
  148. }()
  149. for funcname, f := range worker.funcs {
  150. worker.addFunc(funcname, f.timeout)
  151. }
  152. worker.running = true
  153. for _, v := range worker.agents {
  154. go v.Work()
  155. }
  156. ok := true
  157. for ok {
  158. var job *Job
  159. if job, ok = <-worker.in; ok {
  160. go func() {
  161. defer job.Close()
  162. switch job.DataType {
  163. case common.ERROR:
  164. _, err := common.GetError(job.Data)
  165. worker.err(err)
  166. case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
  167. if err := worker.exec(job); err != nil {
  168. worker.err(err)
  169. }
  170. default:
  171. worker.handleJob(job)
  172. }
  173. }()
  174. }
  175. }
  176. }
  177. // job handler
  178. func (worker *Worker) handleJob(job *Job) {
  179. if worker.JobHandler != nil {
  180. if err := worker.JobHandler(job); err != nil {
  181. worker.err(err)
  182. }
  183. }
  184. }
  185. // Close.
  186. func (worker *Worker) Close() {
  187. close(worker.in)
  188. if worker.limit != nil {
  189. close(worker.limit)
  190. }
  191. }
  192. // Send a something out, get the samething back.
  193. func (worker *Worker) Echo(data []byte) {
  194. job := newJob(common.REQ, common.ECHO_REQ, data)
  195. worker.broadcast(job)
  196. }
  197. // Remove all of functions.
  198. // Both from the worker or job servers.
  199. func (worker *Worker) Reset() {
  200. job := newJob(common.REQ, common.RESET_ABILITIES, nil)
  201. worker.broadcast(job)
  202. worker.funcs = make(JobFuncs)
  203. }
  204. // Set the worker's unique id.
  205. func (worker *Worker) SetId(id string) {
  206. worker.Id = id
  207. job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id))
  208. worker.broadcast(job)
  209. }
  210. // Execute the job. And send back the result.
  211. func (worker *Worker) exec(job *Job) (err error) {
  212. defer func() {
  213. if r := recover(); r != nil {
  214. if e, ok := r.(error); ok {
  215. err = e
  216. } else {
  217. err = common.ErrUnknown
  218. }
  219. }
  220. } ()
  221. if worker.limit != nil {
  222. <-worker.limit
  223. defer func() {
  224. worker.limit <- true
  225. }()
  226. }
  227. var limit int
  228. if job.DataType == common.JOB_ASSIGN {
  229. limit = 3
  230. } else {
  231. limit = 4
  232. }
  233. jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit)
  234. job.Handle = string(jobdata[0])
  235. funcname := string(jobdata[1])
  236. if job.DataType == common.JOB_ASSIGN {
  237. job.Data = jobdata[2]
  238. } else {
  239. job.UniqueId = string(jobdata[2])
  240. job.Data = jobdata[3]
  241. }
  242. f, ok := worker.funcs[funcname]
  243. if !ok {
  244. return common.Errorf("The function does not exist: %s", funcname)
  245. }
  246. var r *result
  247. if f.timeout == 0 {
  248. d, e := f.f(job)
  249. r = &result{data:d, err: e}
  250. } else {
  251. r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second)
  252. }
  253. var datatype uint32
  254. if r.err == nil {
  255. datatype = common.WORK_COMPLETE
  256. } else {
  257. if r.data == nil {
  258. datatype = common.WORK_FAIL
  259. } else {
  260. datatype = common.WORK_EXCEPTION
  261. }
  262. err = r.err
  263. }
  264. job.magicCode = common.REQ
  265. job.DataType = datatype
  266. job.Data = r.data
  267. job.agent.WriteJob(job)
  268. return
  269. }
  270. func (worker *Worker) removeAgent(a *agent) {
  271. for k, v := range worker.agents {
  272. if v == a {
  273. worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...)
  274. }
  275. }
  276. if len(worker.agents) == 0 {
  277. worker.err(common.ErrNoActiveAgent)
  278. }
  279. }
  280. type result struct {
  281. data []byte
  282. err error
  283. }
  284. func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) {
  285. rslt := make(chan *result)
  286. defer close(rslt)
  287. go func() {
  288. defer func() {recover()}()
  289. d, e := f(job)
  290. rslt <- &result{data: d, err: e}
  291. }()
  292. select {
  293. case r = <-rslt:
  294. case <-time.After(timeout):
  295. go job.cancel()
  296. return &result{err:common.ErrExecTimeOut}
  297. }
  298. return r
  299. }