You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

338 lines
7.9 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. "time"
  7. "bytes"
  8. "github.com/mikespook/gearman-go/common"
  9. )
  10. const (
  11. Unlimited = 0
  12. OneByOne = 1
  13. Immediately = 0
  14. )
  15. var (
  16. ErrConnection = common.ErrConnection
  17. )
  18. // Job handler
  19. type JobHandler func(*Job) error
  20. type JobFunc func(*Job) ([]byte, error)
  21. // The definition of the callback function.
  22. type jobFunc struct {
  23. f JobFunc
  24. timeout uint32
  25. }
  26. // Map for added function.
  27. type JobFuncs map[string]*jobFunc
  28. /*
  29. Worker side api for gearman
  30. usage:
  31. w = worker.New(worker.Unlimited)
  32. w.AddFunction("foobar", foobar)
  33. w.AddServer("127.0.0.1:4730")
  34. w.Work() // Enter the worker's main loop
  35. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  36. It looks like this:
  37. func foobar(job *Job) (data []byte, err os.Error) {
  38. //sth. here
  39. //plaplapla...
  40. return
  41. }
  42. */
  43. type Worker struct {
  44. agents []*agent
  45. funcs JobFuncs
  46. in chan *Job
  47. running bool
  48. limit chan bool
  49. Id string
  50. // assign a ErrFunc to handle errors
  51. ErrHandler common.ErrorHandler
  52. JobHandler JobHandler
  53. }
  54. // Get a new worker
  55. func New(l int) (worker *Worker) {
  56. worker = &Worker{
  57. agents: make([]*agent, 0),
  58. funcs: make(JobFuncs),
  59. in: make(chan *Job, common.QUEUE_SIZE),
  60. }
  61. if l != Unlimited {
  62. worker.limit = make(chan bool, l)
  63. for i := 0; i < l; i ++ {
  64. worker.limit <- true
  65. }
  66. }
  67. return
  68. }
  69. //
  70. func (worker *Worker)err(e error) {
  71. if worker.ErrHandler != nil {
  72. worker.ErrHandler(e)
  73. }
  74. }
  75. // Add a server. The addr should be 'host:port' format.
  76. // The connection is established at this time.
  77. func (worker *Worker) AddServer(addr string) (err error) {
  78. // Create a new job server's client as a agent of server
  79. server, err := newAgent(addr, worker)
  80. if err != nil {
  81. return err
  82. }
  83. worker.agents = append(worker.agents, server)
  84. return
  85. }
  86. // Write a job to job server.
  87. // Here, the job's mean is not the oraginal mean.
  88. // Just looks like a network package for job's result or tell job server, there was a fail.
  89. func (worker *Worker) broadcast(job *Job) {
  90. for _, v := range worker.agents {
  91. v.WriteJob(job)
  92. }
  93. }
  94. // Add a function.
  95. // Plz added job servers first, then functions.
  96. // The API will tell every connected job server that 'I can do this'
  97. func (worker *Worker) AddFunc(funcname string,
  98. f JobFunc, timeout uint32) (err error) {
  99. if _, ok := worker.funcs[funcname]; ok {
  100. return common.Errorf("The function already exists: %s", funcname)
  101. }
  102. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  103. if worker.running {
  104. worker.addFunc(funcname, timeout)
  105. }
  106. return
  107. }
  108. // inner add function
  109. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  110. var datatype uint32
  111. var data []byte
  112. if timeout == 0 {
  113. datatype = common.CAN_DO
  114. data = []byte(funcname)
  115. } else {
  116. datatype = common.CAN_DO_TIMEOUT
  117. data = []byte(funcname + "\x00")
  118. t := common.Uint32ToBytes(timeout)
  119. data = append(data, t[:]...)
  120. }
  121. job := newJob(common.REQ, datatype, data)
  122. worker.broadcast(job)
  123. }
  124. // Remove a function.
  125. // Tell job servers 'I can not do this now' at the same time.
  126. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  127. if _, ok := worker.funcs[funcname]; !ok {
  128. return common.Errorf("The function does not exist: %s", funcname)
  129. }
  130. delete(worker.funcs, funcname)
  131. if worker.running {
  132. worker.removeFunc(funcname)
  133. }
  134. return
  135. }
  136. // inner remove function
  137. func (worker *Worker) removeFunc(funcname string) {
  138. job := newJob(common.REQ, common.CANT_DO, []byte(funcname))
  139. worker.broadcast(job)
  140. }
  141. func (worker *Worker) dealJob(job *Job) {
  142. defer func() {
  143. job.Close()
  144. if worker.limit != nil {
  145. worker.limit <- true
  146. }
  147. }()
  148. switch job.DataType {
  149. case common.ERROR:
  150. _, err := common.GetError(job.Data)
  151. worker.err(err)
  152. case common.JOB_ASSIGN, common.JOB_ASSIGN_UNIQ:
  153. if err := worker.exec(job); err != nil {
  154. worker.err(err)
  155. }
  156. default:
  157. worker.handleJob(job)
  158. }
  159. }
  160. // Main loop
  161. func (worker *Worker) Work() {
  162. defer func() {
  163. worker.running = false
  164. for _, v := range worker.agents {
  165. v.Close()
  166. }
  167. }()
  168. worker.running = true
  169. for _, v := range worker.agents {
  170. go v.Work()
  171. }
  172. for funcname, f := range worker.funcs {
  173. worker.addFunc(funcname, f.timeout)
  174. }
  175. ok := true
  176. for ok {
  177. var job *Job
  178. if job, ok = <-worker.in; ok {
  179. if worker.limit != nil {
  180. <-worker.limit
  181. }
  182. go worker.dealJob(job)
  183. }
  184. }
  185. }
  186. // job handler
  187. func (worker *Worker) handleJob(job *Job) {
  188. if worker.JobHandler != nil {
  189. if err := worker.JobHandler(job); err != nil {
  190. worker.err(err)
  191. }
  192. }
  193. }
  194. // Close.
  195. func (worker *Worker) Close() {
  196. close(worker.in)
  197. if worker.limit != nil {
  198. close(worker.limit)
  199. }
  200. }
  201. // Send a something out, get the samething back.
  202. func (worker *Worker) Echo(data []byte) {
  203. job := newJob(common.REQ, common.ECHO_REQ, data)
  204. worker.broadcast(job)
  205. }
  206. // Remove all of functions.
  207. // Both from the worker or job servers.
  208. func (worker *Worker) Reset() {
  209. job := newJob(common.REQ, common.RESET_ABILITIES, nil)
  210. worker.broadcast(job)
  211. worker.funcs = make(JobFuncs)
  212. }
  213. // Set the worker's unique id.
  214. func (worker *Worker) SetId(id string) {
  215. worker.Id = id
  216. job := newJob(common.REQ, common.SET_CLIENT_ID, []byte(id))
  217. worker.broadcast(job)
  218. }
  219. // Execute the job. And send back the result.
  220. func (worker *Worker) exec(job *Job) (err error) {
  221. defer func() {
  222. if r := recover(); r != nil {
  223. if e, ok := r.(error); ok {
  224. err = e
  225. } else {
  226. err = common.ErrUnknown
  227. }
  228. }
  229. } ()
  230. var limit int
  231. if job.DataType == common.JOB_ASSIGN {
  232. limit = 3
  233. } else {
  234. limit = 4
  235. }
  236. jobdata := bytes.SplitN(job.Data, []byte{'\x00'}, limit)
  237. job.Handle = string(jobdata[0])
  238. funcname := string(jobdata[1])
  239. if job.DataType == common.JOB_ASSIGN {
  240. job.Data = jobdata[2]
  241. } else {
  242. job.UniqueId = string(jobdata[2])
  243. job.Data = jobdata[3]
  244. }
  245. f, ok := worker.funcs[funcname]
  246. if !ok {
  247. return common.Errorf("The function does not exist: %s", funcname)
  248. }
  249. var r *result
  250. if f.timeout == 0 {
  251. d, e := f.f(job)
  252. r = &result{data:d, err: e}
  253. } else {
  254. r = execTimeout(f.f, job, time.Duration(f.timeout) * time.Second)
  255. }
  256. var datatype uint32
  257. if r.err == nil {
  258. datatype = common.WORK_COMPLETE
  259. } else {
  260. if r.data == nil {
  261. datatype = common.WORK_FAIL
  262. } else {
  263. datatype = common.WORK_EXCEPTION
  264. }
  265. err = r.err
  266. }
  267. job.magicCode = common.REQ
  268. job.DataType = datatype
  269. job.Data = r.data
  270. job.agent.WriteJob(job)
  271. return
  272. }
  273. func (worker *Worker) removeAgent(a *agent) {
  274. for k, v := range worker.agents {
  275. if v == a {
  276. worker.agents = append(worker.agents[:k], worker.agents[k + 1:] ...)
  277. }
  278. }
  279. if len(worker.agents) == 0 {
  280. worker.err(common.ErrNoActiveAgent)
  281. }
  282. }
  283. type result struct {
  284. data []byte
  285. err error
  286. }
  287. func execTimeout(f JobFunc, job *Job, timeout time.Duration) (r *result) {
  288. rslt := make(chan *result)
  289. defer close(rslt)
  290. go func() {
  291. defer func() {recover()}()
  292. d, e := f(job)
  293. rslt <- &result{data: d, err: e}
  294. }()
  295. select {
  296. case r = <-rslt:
  297. case <-time.After(timeout):
  298. go job.cancel()
  299. return &result{err:common.ErrExecTimeOut}
  300. }
  301. return r
  302. }