You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

worker.go 6.6 KiB

10 years ago
10 years ago
10 years ago
12 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
10 years ago
10 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago

  1. // The worker package helps developers to develop Gearman's worker
  2. // in an easy way.
  3. package worker
  4. import (
  5. "encoding/binary"
  6. "fmt"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. Unlimited = iota
  12. OneByOne
  13. )
  14. // Worker is the only structure needed by worker side developing.
  15. // It can connect to multi-server and grab jobs.
  16. type Worker struct {
  17. sync.Mutex
  18. agents []*agent
  19. funcs jobFuncs
  20. in chan *inPack
  21. running bool
  22. ready bool
  23. Id string
  24. ErrorHandler ErrorHandler
  25. JobHandler JobHandler
  26. limit chan bool
  27. }
  28. // Return a worker.
  29. //
  30. // If limit is set to Unlimited(=0), the worker will grab all jobs
  31. // and execute them parallelly.
  32. // If limit is greater than zero, the number of paralled executing
  33. // jobs are limited under the number. If limit is assgined to
  34. // OneByOne(=1), there will be only one job executed in a time.
  35. func New(limit int) (worker *Worker) {
  36. worker = &Worker{
  37. agents: make([]*agent, 0, limit),
  38. funcs: make(jobFuncs),
  39. in: make(chan *inPack, queueSize),
  40. }
  41. if limit != Unlimited {
  42. worker.limit = make(chan bool, limit-1)
  43. }
  44. return
  45. }
  46. // inner error handling
  47. func (worker *Worker) err(e error) {
  48. if worker.ErrorHandler != nil {
  49. worker.ErrorHandler(e)
  50. }
  51. }
  52. // Add a Gearman job server.
  53. //
  54. // addr should be formated as 'host:port'.
  55. func (worker *Worker) AddServer(net, addr string) (err error) {
  56. // Create a new job server's client as a agent of server
  57. a, err := newAgent(net, addr, worker)
  58. if err != nil {
  59. return err
  60. }
  61. worker.agents = append(worker.agents, a)
  62. return
  63. }
  64. // Broadcast an outpack to all Gearman server.
  65. func (worker *Worker) broadcast(outpack *outPack) {
  66. for _, v := range worker.agents {
  67. v.write(outpack)
  68. }
  69. }
  70. // Add a function.
  71. // Set timeout as Unlimited(=0) to disable executing timeout.
  72. func (worker *Worker) AddFunc(funcname string,
  73. f JobFunc, timeout uint32) (err error) {
  74. worker.Lock()
  75. defer worker.Unlock()
  76. if _, ok := worker.funcs[funcname]; ok {
  77. return fmt.Errorf("The function already exists: %s", funcname)
  78. }
  79. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  80. if worker.running {
  81. worker.addFunc(funcname, timeout)
  82. }
  83. return
  84. }
  85. // inner add
  86. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  87. outpack := getOutPack()
  88. if timeout == 0 {
  89. outpack.dataType = dtCanDo
  90. outpack.data = []byte(funcname)
  91. } else {
  92. outpack.dataType = dtCanDoTimeout
  93. l := len(funcname)
  94. outpack.data = getBuffer(l + 5)
  95. copy(outpack.data, []byte(funcname))
  96. outpack.data[l] = '\x00'
  97. binary.BigEndian.PutUint32(outpack.data[l+1:], timeout)
  98. }
  99. worker.broadcast(outpack)
  100. }
  101. // Remove a function.
  102. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  103. worker.Lock()
  104. defer worker.Unlock()
  105. if _, ok := worker.funcs[funcname]; !ok {
  106. return fmt.Errorf("The function does not exist: %s", funcname)
  107. }
  108. delete(worker.funcs, funcname)
  109. if worker.running {
  110. worker.removeFunc(funcname)
  111. }
  112. return
  113. }
  114. // inner remove
  115. func (worker *Worker) removeFunc(funcname string) {
  116. outpack := getOutPack()
  117. outpack.dataType = dtCantDo
  118. outpack.data = []byte(funcname)
  119. worker.broadcast(outpack)
  120. }
  121. // inner package handling
  122. func (worker *Worker) handleInPack(inpack *inPack) {
  123. switch inpack.dataType {
  124. case dtNoJob:
  125. inpack.a.PreSleep()
  126. case dtNoop:
  127. inpack.a.Grab()
  128. case dtJobAssign, dtJobAssignUniq:
  129. go func() {
  130. if err := worker.exec(inpack); err != nil {
  131. worker.err(err)
  132. }
  133. }()
  134. if worker.limit != nil {
  135. worker.limit <- true
  136. }
  137. inpack.a.Grab()
  138. case dtError:
  139. worker.err(inpack.Err())
  140. fallthrough
  141. case dtEchoRes:
  142. fallthrough
  143. default:
  144. worker.customeHandler(inpack)
  145. }
  146. }
  147. // Connect to Gearman server and tell every server
  148. // what can this worker do.
  149. func (worker *Worker) Ready() (err error) {
  150. if len(worker.agents) == 0 {
  151. return ErrNoneAgents
  152. }
  153. if len(worker.funcs) == 0 {
  154. return ErrNoneFuncs
  155. }
  156. for _, a := range worker.agents {
  157. if err = a.Connect(); err != nil {
  158. return
  159. }
  160. }
  161. for funcname, f := range worker.funcs {
  162. worker.addFunc(funcname, f.timeout)
  163. }
  164. worker.ready = true
  165. return
  166. }
  167. // Main loop, block here
  168. // Most of time, this should be evaluated in goroutine.
  169. func (worker *Worker) Work() {
  170. if ! worker.ready {
  171. // didn't run Ready beforehand, so we'll have to do it:
  172. err := worker.Ready()
  173. if err != nil {
  174. panic( err )
  175. }
  176. }
  177. defer func() {
  178. for _, a := range worker.agents {
  179. a.Close()
  180. }
  181. }()
  182. worker.running = true
  183. for _, a := range worker.agents {
  184. a.Grab()
  185. }
  186. var inpack *inPack
  187. for inpack = range worker.in {
  188. worker.handleInPack(inpack)
  189. }
  190. }
  191. // custome handling warper
  192. func (worker *Worker) customeHandler(inpack *inPack) {
  193. if worker.JobHandler != nil {
  194. if err := worker.JobHandler(inpack); err != nil {
  195. worker.err(err)
  196. }
  197. }
  198. }
  199. // Close connection and exit main loop
  200. func (worker *Worker) Close() {
  201. worker.Lock()
  202. worker.Unlock()
  203. if worker.running == true {
  204. worker.running = false
  205. close(worker.in)
  206. }
  207. }
  208. // Echo
  209. func (worker *Worker) Echo(data []byte) {
  210. outpack := getOutPack()
  211. outpack.dataType = dtEchoReq
  212. outpack.data = data
  213. worker.broadcast(outpack)
  214. }
  215. // Remove all of functions.
  216. // Both from the worker and job servers.
  217. func (worker *Worker) Reset() {
  218. outpack := getOutPack()
  219. outpack.dataType = dtResetAbilities
  220. worker.broadcast(outpack)
  221. worker.funcs = make(jobFuncs)
  222. }
  223. // Set the worker's unique id.
  224. func (worker *Worker) SetId(id string) {
  225. worker.Id = id
  226. outpack := getOutPack()
  227. outpack.dataType = dtSetClientId
  228. outpack.data = []byte(id)
  229. worker.broadcast(outpack)
  230. }
  231. // inner job executing
  232. func (worker *Worker) exec(inpack *inPack) (err error) {
  233. defer func() {
  234. if worker.limit != nil {
  235. <-worker.limit
  236. }
  237. if r := recover(); r != nil {
  238. if e, ok := r.(error); ok {
  239. err = e
  240. } else {
  241. err = ErrUnknown
  242. }
  243. }
  244. }()
  245. f, ok := worker.funcs[inpack.fn]
  246. if !ok {
  247. return fmt.Errorf("The function does not exist: %s", inpack.fn)
  248. }
  249. var r *result
  250. if f.timeout == 0 {
  251. d, e := f.f(inpack)
  252. r = &result{data: d, err: e}
  253. } else {
  254. r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
  255. }
  256. if worker.running {
  257. outpack := getOutPack()
  258. if r.err == nil {
  259. outpack.dataType = dtWorkComplete
  260. } else {
  261. if len(r.data) == 0 {
  262. outpack.dataType = dtWorkFail
  263. } else {
  264. outpack.dataType = dtWorkException
  265. }
  266. err = r.err
  267. }
  268. outpack.handle = inpack.handle
  269. outpack.data = r.data
  270. inpack.a.write(outpack)
  271. }
  272. return
  273. }
  274. // inner result
  275. type result struct {
  276. data []byte
  277. err error
  278. }
  279. // executing timer
  280. func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
  281. rslt := make(chan *result)
  282. defer close(rslt)
  283. go func() {
  284. defer func() { recover() }()
  285. d, e := f(job)
  286. rslt <- &result{data: d, err: e}
  287. }()
  288. select {
  289. case r = <-rslt:
  290. case <-time.After(timeout):
  291. return &result{err: ErrTimeOut}
  292. }
  293. return r
  294. }