You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

352 lines
7.5 KiB

  1. // The worker package helps developers to develop Gearman's worker
  2. // in an easy way.
  3. package worker
  4. import (
  5. "encoding/binary"
  6. "fmt"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. Unlimited = iota
  12. OneByOne
  13. )
  14. // Worker is the only structure needed by worker side developing.
  15. // It can connect to multi-server and grab jobs.
  16. type Worker struct {
  17. sync.Mutex
  18. agents []*agent
  19. funcs jobFuncs
  20. in chan *inPack
  21. running bool
  22. ready bool
  23. Id string
  24. ErrorHandler ErrorHandler
  25. JobHandler JobHandler
  26. limit chan bool
  27. }
  28. // Return a worker.
  29. //
  30. // If limit is set to Unlimited(=0), the worker will grab all jobs
  31. // and execute them parallelly.
  32. // If limit is greater than zero, the number of paralled executing
  33. // jobs are limited under the number. If limit is assgined to
  34. // OneByOne(=1), there will be only one job executed in a time.
  35. func New(limit int) (worker *Worker) {
  36. worker = &Worker{
  37. agents: make([]*agent, 0, limit),
  38. funcs: make(jobFuncs),
  39. in: make(chan *inPack, queueSize),
  40. }
  41. if limit != Unlimited {
  42. worker.limit = make(chan bool, limit-1)
  43. }
  44. return
  45. }
  46. // inner error handling
  47. func (worker *Worker) err(e error) {
  48. if worker.ErrorHandler != nil {
  49. worker.ErrorHandler(e)
  50. }
  51. }
  52. // Add a Gearman job server.
  53. //
  54. // addr should be formated as 'host:port'.
  55. func (worker *Worker) AddServer(net, addr string) (err error) {
  56. // Create a new job server's client as a agent of server
  57. a, err := newAgent(net, addr, worker)
  58. if err != nil {
  59. return err
  60. }
  61. worker.agents = append(worker.agents, a)
  62. return
  63. }
  64. // Broadcast an outpack to all Gearman server.
  65. func (worker *Worker) broadcast(outpack *outPack) {
  66. for _, v := range worker.agents {
  67. v.write(outpack)
  68. }
  69. }
  70. // Add a function.
  71. // Set timeout as Unlimited(=0) to disable executing timeout.
  72. func (worker *Worker) AddFunc(funcname string,
  73. f JobFunc, timeout uint32) (err error) {
  74. worker.Lock()
  75. defer worker.Unlock()
  76. if _, ok := worker.funcs[funcname]; ok {
  77. return fmt.Errorf("The function already exists: %s", funcname)
  78. }
  79. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  80. if worker.running {
  81. worker.addFunc(funcname, timeout)
  82. }
  83. return
  84. }
  85. // inner add
  86. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  87. outpack := prepFuncOutpack( funcname, timeout)
  88. worker.broadcast(outpack)
  89. }
  90. func prepFuncOutpack(funcname string, timeout uint32) (*outPack){
  91. outpack := getOutPack()
  92. if timeout == 0 {
  93. outpack.dataType = dtCanDo
  94. outpack.data = []byte(funcname)
  95. } else {
  96. outpack.dataType = dtCanDoTimeout
  97. l := len(funcname)
  98. outpack.data = getBuffer(l + 5)
  99. copy(outpack.data, []byte(funcname))
  100. outpack.data[l] = '\x00'
  101. binary.BigEndian.PutUint32(outpack.data[l+1:], timeout)
  102. }
  103. return outpack
  104. }
  105. // Remove a function.
  106. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  107. worker.Lock()
  108. defer worker.Unlock()
  109. if _, ok := worker.funcs[funcname]; !ok {
  110. return fmt.Errorf("The function does not exist: %s", funcname)
  111. }
  112. delete(worker.funcs, funcname)
  113. if worker.running {
  114. worker.removeFunc(funcname)
  115. }
  116. return
  117. }
  118. // inner remove
  119. func (worker *Worker) removeFunc(funcname string) {
  120. outpack := getOutPack()
  121. outpack.dataType = dtCantDo
  122. outpack.data = []byte(funcname)
  123. worker.broadcast(outpack)
  124. }
  125. // inner package handling
  126. func (worker *Worker) handleInPack(inpack *inPack) {
  127. switch inpack.dataType {
  128. case dtNoJob:
  129. inpack.a.PreSleep()
  130. case dtNoop:
  131. inpack.a.Grab()
  132. case dtJobAssign, dtJobAssignUniq:
  133. go func() {
  134. if err := worker.exec(inpack); err != nil {
  135. worker.err(err)
  136. }
  137. }()
  138. if worker.limit != nil {
  139. worker.limit <- true
  140. }
  141. inpack.a.Grab()
  142. case dtError:
  143. worker.err(inpack.Err())
  144. fallthrough
  145. case dtEchoRes:
  146. fallthrough
  147. default:
  148. worker.customeHandler(inpack)
  149. }
  150. }
  151. // Connect to Gearman server and tell every server
  152. // what can this worker do.
  153. func (worker *Worker) Ready() (err error) {
  154. if len(worker.agents) == 0 {
  155. return ErrNoneAgents
  156. }
  157. if len(worker.funcs) == 0 {
  158. return ErrNoneFuncs
  159. }
  160. for _, a := range worker.agents {
  161. if err = a.Connect(); err != nil {
  162. return
  163. }
  164. }
  165. for funcname, f := range worker.funcs {
  166. worker.addFunc(funcname, f.timeout)
  167. }
  168. worker.ready = true
  169. return
  170. }
  171. // Main loop, block here
  172. // Most of time, this should be evaluated in goroutine.
  173. func (worker *Worker) Work() {
  174. if ! worker.ready {
  175. // didn't run Ready beforehand, so we'll have to do it:
  176. err := worker.Ready()
  177. if err != nil {
  178. panic( err )
  179. }
  180. }
  181. defer func() {
  182. for _, a := range worker.agents {
  183. a.Close()
  184. }
  185. }()
  186. worker.running = true
  187. for _, a := range worker.agents {
  188. a.Grab()
  189. }
  190. var inpack *inPack
  191. for inpack = range worker.in {
  192. worker.handleInPack(inpack)
  193. }
  194. }
  195. // custome handling warper
  196. func (worker *Worker) customeHandler(inpack *inPack) {
  197. if worker.JobHandler != nil {
  198. if err := worker.JobHandler(inpack); err != nil {
  199. worker.err(err)
  200. }
  201. }
  202. }
  203. // Close connection and exit main loop
  204. func (worker *Worker) Close() {
  205. worker.Lock()
  206. worker.Unlock()
  207. if worker.running == true {
  208. worker.running = false
  209. close(worker.in)
  210. }
  211. }
  212. // Echo
  213. func (worker *Worker) Echo(data []byte) {
  214. outpack := getOutPack()
  215. outpack.dataType = dtEchoReq
  216. outpack.data = data
  217. worker.broadcast(outpack)
  218. }
  219. // Remove all of functions.
  220. // Both from the worker and job servers.
  221. func (worker *Worker) Reset() {
  222. outpack := getOutPack()
  223. outpack.dataType = dtResetAbilities
  224. worker.broadcast(outpack)
  225. worker.funcs = make(jobFuncs)
  226. }
  227. // Set the worker's unique id.
  228. func (worker *Worker) SetId(id string) {
  229. worker.Id = id
  230. outpack := getOutPack()
  231. outpack.dataType = dtSetClientId
  232. outpack.data = []byte(id)
  233. worker.broadcast(outpack)
  234. }
  235. // inner job executing
  236. func (worker *Worker) exec(inpack *inPack) (err error) {
  237. defer func() {
  238. if worker.limit != nil {
  239. <-worker.limit
  240. }
  241. if r := recover(); r != nil {
  242. if e, ok := r.(error); ok {
  243. err = e
  244. } else {
  245. err = ErrUnknown
  246. }
  247. }
  248. }()
  249. f, ok := worker.funcs[inpack.fn]
  250. if !ok {
  251. return fmt.Errorf("The function does not exist: %s", inpack.fn)
  252. }
  253. var r *result
  254. if f.timeout == 0 {
  255. d, e := f.f(inpack)
  256. r = &result{data: d, err: e}
  257. } else {
  258. r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
  259. }
  260. if worker.running {
  261. outpack := getOutPack()
  262. if r.err == nil {
  263. outpack.dataType = dtWorkComplete
  264. } else {
  265. if len(r.data) == 0 {
  266. outpack.dataType = dtWorkFail
  267. } else {
  268. outpack.dataType = dtWorkException
  269. }
  270. err = r.err
  271. }
  272. outpack.handle = inpack.handle
  273. outpack.data = r.data
  274. inpack.a.write(outpack)
  275. }
  276. return
  277. }
  278. func (worker *Worker)reRegisterFuncsForAgent( a * agent ){
  279. worker.Lock()
  280. defer worker.Unlock()
  281. for funcname, f := range worker.funcs {
  282. outpack := prepFuncOutpack( funcname, f.timeout)
  283. a.write(outpack)
  284. }
  285. }
  286. // inner result
  287. type result struct {
  288. data []byte
  289. err error
  290. }
  291. // executing timer
  292. func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
  293. rslt := make(chan *result)
  294. defer close(rslt)
  295. go func() {
  296. defer func() { recover() }()
  297. d, e := f(job)
  298. rslt <- &result{data: d, err: e}
  299. }()
  300. select {
  301. case r = <-rslt:
  302. case <-time.After(timeout):
  303. return &result{err: ErrTimeOut}
  304. }
  305. return r
  306. }
  307. // Error type passed when a worker connection disconnects
  308. type WorkerDisconnectError struct{
  309. err error
  310. agent * agent
  311. }
  312. func (e *WorkerDisconnectError) Error() ( string){
  313. return e.err.Error();
  314. }
  315. // Responds to the error by asking the worker to reconnect
  316. func (e *WorkerDisconnectError) Reconnect() ( err error ){
  317. return e.agent.reconnect()
  318. }
  319. // Which server was this for?
  320. func(e *WorkerDisconnectError) Server() ( net string, addr string ){
  321. return e.agent.net, e.agent.addr
  322. }