You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

319 lines
6.6 KiB

  1. // The worker package helps developers to develop Gearman's worker
  2. // in an easy way.
  3. package worker
  4. import (
  5. "encoding/binary"
  6. "fmt"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. Unlimited = iota
  12. OneByOne
  13. )
  14. // Worker is the only structure needed by worker side developing.
  15. // It can connect to multi-server and grab jobs.
  16. type Worker struct {
  17. sync.Mutex
  18. agents []*agent
  19. funcs jobFuncs
  20. in chan *inPack
  21. running bool
  22. ready bool
  23. Id string
  24. ErrorHandler ErrorHandler
  25. JobHandler JobHandler
  26. limit chan bool
  27. }
  28. // Return a worker.
  29. //
  30. // If limit is set to Unlimited(=0), the worker will grab all jobs
  31. // and execute them parallelly.
  32. // If limit is greater than zero, the number of paralled executing
  33. // jobs are limited under the number. If limit is assgined to
  34. // OneByOne(=1), there will be only one job executed in a time.
  35. func New(limit int) (worker *Worker) {
  36. worker = &Worker{
  37. agents: make([]*agent, 0, limit),
  38. funcs: make(jobFuncs),
  39. in: make(chan *inPack, queueSize),
  40. }
  41. if limit != Unlimited {
  42. worker.limit = make(chan bool, limit-1)
  43. }
  44. return
  45. }
  46. // inner error handling
  47. func (worker *Worker) err(e error) {
  48. if worker.ErrorHandler != nil {
  49. worker.ErrorHandler(e)
  50. }
  51. }
  52. // Add a Gearman job server.
  53. //
  54. // addr should be formated as 'host:port'.
  55. func (worker *Worker) AddServer(net, addr string) (err error) {
  56. // Create a new job server's client as a agent of server
  57. a, err := newAgent(net, addr, worker)
  58. if err != nil {
  59. return err
  60. }
  61. worker.agents = append(worker.agents, a)
  62. return
  63. }
  64. // Broadcast an outpack to all Gearman server.
  65. func (worker *Worker) broadcast(outpack *outPack) {
  66. for _, v := range worker.agents {
  67. v.write(outpack)
  68. }
  69. }
  70. // Add a function.
  71. // Set timeout as Unlimited(=0) to disable executing timeout.
  72. func (worker *Worker) AddFunc(funcname string,
  73. f JobFunc, timeout uint32) (err error) {
  74. worker.Lock()
  75. defer worker.Unlock()
  76. if _, ok := worker.funcs[funcname]; ok {
  77. return fmt.Errorf("The function already exists: %s", funcname)
  78. }
  79. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  80. if worker.running {
  81. worker.addFunc(funcname, timeout)
  82. }
  83. return
  84. }
  85. // inner add
  86. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  87. outpack := getOutPack()
  88. if timeout == 0 {
  89. outpack.dataType = dtCanDo
  90. outpack.data = []byte(funcname)
  91. } else {
  92. outpack.dataType = dtCanDoTimeout
  93. l := len(funcname)
  94. outpack.data = getBuffer(l + 5)
  95. copy(outpack.data, []byte(funcname))
  96. outpack.data[l] = '\x00'
  97. binary.BigEndian.PutUint32(outpack.data[l+1:], timeout)
  98. }
  99. worker.broadcast(outpack)
  100. }
  101. // Remove a function.
  102. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  103. worker.Lock()
  104. defer worker.Unlock()
  105. if _, ok := worker.funcs[funcname]; !ok {
  106. return fmt.Errorf("The function does not exist: %s", funcname)
  107. }
  108. delete(worker.funcs, funcname)
  109. if worker.running {
  110. worker.removeFunc(funcname)
  111. }
  112. return
  113. }
  114. // inner remove
  115. func (worker *Worker) removeFunc(funcname string) {
  116. outpack := getOutPack()
  117. outpack.dataType = dtCantDo
  118. outpack.data = []byte(funcname)
  119. worker.broadcast(outpack)
  120. }
  121. // inner package handling
  122. func (worker *Worker) handleInPack(inpack *inPack) {
  123. switch inpack.dataType {
  124. case dtNoJob:
  125. inpack.a.PreSleep()
  126. case dtNoop:
  127. inpack.a.Grab()
  128. case dtJobAssign, dtJobAssignUniq:
  129. go func() {
  130. if err := worker.exec(inpack); err != nil {
  131. worker.err(err)
  132. }
  133. }()
  134. if worker.limit != nil {
  135. worker.limit <- true
  136. }
  137. inpack.a.Grab()
  138. case dtError:
  139. worker.err(inpack.Err())
  140. fallthrough
  141. case dtEchoRes:
  142. fallthrough
  143. default:
  144. worker.customeHandler(inpack)
  145. }
  146. }
  147. // Connect to Gearman server and tell every server
  148. // what can this worker do.
  149. func (worker *Worker) Ready() (err error) {
  150. if len(worker.agents) == 0 {
  151. return ErrNoneAgents
  152. }
  153. if len(worker.funcs) == 0 {
  154. return ErrNoneFuncs
  155. }
  156. for _, a := range worker.agents {
  157. if err = a.Connect(); err != nil {
  158. return
  159. }
  160. }
  161. for funcname, f := range worker.funcs {
  162. worker.addFunc(funcname, f.timeout)
  163. }
  164. worker.ready = true
  165. return
  166. }
  167. // Main loop, block here
  168. // Most of time, this should be evaluated in goroutine.
  169. func (worker *Worker) Work() {
  170. if ! worker.ready {
  171. // didn't run Ready beforehand, so we'll have to do it:
  172. err := worker.Ready()
  173. if err != nil {
  174. panic( err )
  175. }
  176. }
  177. defer func() {
  178. for _, a := range worker.agents {
  179. a.Close()
  180. }
  181. }()
  182. worker.running = true
  183. for _, a := range worker.agents {
  184. a.Grab()
  185. }
  186. var inpack *inPack
  187. for inpack = range worker.in {
  188. worker.handleInPack(inpack)
  189. }
  190. }
  191. // custome handling warper
  192. func (worker *Worker) customeHandler(inpack *inPack) {
  193. if worker.JobHandler != nil {
  194. if err := worker.JobHandler(inpack); err != nil {
  195. worker.err(err)
  196. }
  197. }
  198. }
  199. // Close connection and exit main loop
  200. func (worker *Worker) Close() {
  201. worker.Lock()
  202. worker.Unlock()
  203. if worker.running == true {
  204. worker.running = false
  205. close(worker.in)
  206. }
  207. }
  208. // Echo
  209. func (worker *Worker) Echo(data []byte) {
  210. outpack := getOutPack()
  211. outpack.dataType = dtEchoReq
  212. outpack.data = data
  213. worker.broadcast(outpack)
  214. }
  215. // Remove all of functions.
  216. // Both from the worker and job servers.
  217. func (worker *Worker) Reset() {
  218. outpack := getOutPack()
  219. outpack.dataType = dtResetAbilities
  220. worker.broadcast(outpack)
  221. worker.funcs = make(jobFuncs)
  222. }
  223. // Set the worker's unique id.
  224. func (worker *Worker) SetId(id string) {
  225. worker.Id = id
  226. outpack := getOutPack()
  227. outpack.dataType = dtSetClientId
  228. outpack.data = []byte(id)
  229. worker.broadcast(outpack)
  230. }
  231. // inner job executing
  232. func (worker *Worker) exec(inpack *inPack) (err error) {
  233. defer func() {
  234. if worker.limit != nil {
  235. <-worker.limit
  236. }
  237. if r := recover(); r != nil {
  238. if e, ok := r.(error); ok {
  239. err = e
  240. } else {
  241. err = ErrUnknown
  242. }
  243. }
  244. }()
  245. f, ok := worker.funcs[inpack.fn]
  246. if !ok {
  247. return fmt.Errorf("The function does not exist: %s", inpack.fn)
  248. }
  249. var r *result
  250. if f.timeout == 0 {
  251. d, e := f.f(inpack)
  252. r = &result{data: d, err: e}
  253. } else {
  254. r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
  255. }
  256. if worker.running {
  257. outpack := getOutPack()
  258. if r.err == nil {
  259. outpack.dataType = dtWorkComplete
  260. } else {
  261. if len(r.data) == 0 {
  262. outpack.dataType = dtWorkFail
  263. } else {
  264. outpack.dataType = dtWorkException
  265. }
  266. err = r.err
  267. }
  268. outpack.handle = inpack.handle
  269. outpack.data = r.data
  270. inpack.a.write(outpack)
  271. }
  272. return
  273. }
  274. // inner result
  275. type result struct {
  276. data []byte
  277. err error
  278. }
  279. // executing timer
  280. func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
  281. rslt := make(chan *result)
  282. defer close(rslt)
  283. go func() {
  284. defer func() { recover() }()
  285. d, e := f(job)
  286. rslt <- &result{data: d, err: e}
  287. }()
  288. select {
  289. case r = <-rslt:
  290. case <-time.After(timeout):
  291. return &result{err: ErrTimeOut}
  292. }
  293. return r
  294. }