You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

пре 10 година
пре 10 година
пре 11 година
пре 10 година
пре 10 година
пре 10 година
пре 12 година
пре 10 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 10 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 11 година
пре 11 година
пре 10 година
пре 11 година
пре 11 година
пре 11 година
пре 11 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 11 година
пре 10 година
пре 11 година
пре 11 година
пре 11 година
пре 11 година
пре 10 година
пре 11 година
пре 11 година
пре 11 година
пре 10 година
пре 11 година
пре 11 година
пре 11 година
пре 11 година
пре 10 година
пре 10 година
пре 10 година
пре 10 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 11 година
пре 11 година
пре 10 година
пре 10 година
пре 10 година
пре 10 година
пре 10 година
пре 10 година
пре 10 година
пре 11 година
пре 10 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 11 година
пре 10 година
пре 10 година
пре 10 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 11 година
пре 11 година
пре 11 година
пре 11 година
пре 11 година
пре 11 година
пре 10 година
пре 11 година
пре 10 година
пре 10 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 10 година
пре 11 година
пре 11 година
пре 11 година
пре 11 година
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. // The worker package helps developers to develop Gearman's worker
  2. // in an easy way.
  3. package worker
  4. import (
  5. "encoding/binary"
  6. "fmt"
  7. "sync"
  8. "time"
  9. )
  10. const (
  11. Unlimited = iota
  12. OneByOne
  13. Immediately = iota
  14. )
  15. // Worker is the only structure needed by worker side developing.
  16. // It can connect to multi-server and grab jobs.
  17. type Worker struct {
  18. sync.Mutex
  19. agents []*agent
  20. funcs jobFuncs
  21. in chan *inPack
  22. running bool
  23. Id string
  24. ErrorHandler ErrorHandler
  25. JobHandler JobHandler
  26. limit chan bool
  27. }
  28. // Return a worker.
  29. //
  30. // If limit is set to Unlimited(=0), the worker will grab all jobs
  31. // and execute them parallelly.
  32. // If limit is greater than zero, the number of paralled executing
  33. // jobs are limited under the number. If limit is assgined to
  34. // OneByOne(=1), there will be only one job executed in a time.
  35. func New(limit int) (worker *Worker) {
  36. worker = &Worker{
  37. agents: make([]*agent, 0, limit),
  38. funcs: make(jobFuncs),
  39. in: make(chan *inPack, queueSize),
  40. }
  41. if limit != Unlimited {
  42. worker.limit = make(chan bool, limit-1)
  43. }
  44. return
  45. }
  46. // inner error handling
  47. func (worker *Worker) err(e error) {
  48. if worker.ErrorHandler != nil {
  49. worker.ErrorHandler(e)
  50. }
  51. }
  52. // Add a Gearman job server.
  53. //
  54. // addr should be formated as 'host:port'.
  55. func (worker *Worker) AddServer(net, addr string) (err error) {
  56. // Create a new job server's client as a agent of server
  57. a, err := newAgent(net, addr, worker)
  58. if err != nil {
  59. return err
  60. }
  61. worker.agents = append(worker.agents, a)
  62. return
  63. }
  64. // Broadcast an outpack to all Gearman server.
  65. func (worker *Worker) broadcast(outpack *outPack) {
  66. for _, v := range worker.agents {
  67. v.write(outpack)
  68. }
  69. }
  70. // Add a function.
  71. // Set timeout as Unlimited(=0) to disable executing timeout.
  72. func (worker *Worker) AddFunc(funcname string,
  73. f JobFunc, timeout uint32) (err error) {
  74. worker.Lock()
  75. defer worker.Unlock()
  76. if _, ok := worker.funcs[funcname]; ok {
  77. return fmt.Errorf("The function already exists: %s", funcname)
  78. }
  79. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  80. if worker.running {
  81. worker.addFunc(funcname, timeout)
  82. }
  83. return
  84. }
  85. // inner add
  86. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  87. outpack := getOutPack()
  88. if timeout == 0 {
  89. outpack.dataType = dtCanDo
  90. outpack.data = []byte(funcname)
  91. } else {
  92. outpack.dataType = dtCanDoTimeout
  93. l := len(funcname)
  94. outpack.data = getBuffer(l + 5)
  95. copy(outpack.data, []byte(funcname))
  96. outpack.data[l] = '\x00'
  97. binary.BigEndian.PutUint32(outpack.data[l+1:], timeout)
  98. }
  99. worker.broadcast(outpack)
  100. }
  101. // Remove a function.
  102. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  103. worker.Lock()
  104. defer worker.Unlock()
  105. if _, ok := worker.funcs[funcname]; !ok {
  106. return fmt.Errorf("The function does not exist: %s", funcname)
  107. }
  108. delete(worker.funcs, funcname)
  109. if worker.running {
  110. worker.removeFunc(funcname)
  111. }
  112. return
  113. }
  114. // inner remove
  115. func (worker *Worker) removeFunc(funcname string) {
  116. outpack := getOutPack()
  117. outpack.dataType = dtCantDo
  118. outpack.data = []byte(funcname)
  119. worker.broadcast(outpack)
  120. }
  121. // inner package handling
  122. func (worker *Worker) handleInPack(inpack *inPack) {
  123. switch inpack.dataType {
  124. case dtNoJob:
  125. inpack.a.PreSleep()
  126. case dtNoop:
  127. inpack.a.Grab()
  128. case dtJobAssign, dtJobAssignUniq:
  129. go func() {
  130. if err := worker.exec(inpack); err != nil {
  131. worker.err(err)
  132. }
  133. }()
  134. if worker.limit != nil {
  135. worker.limit <- true
  136. }
  137. inpack.a.Grab()
  138. case dtError:
  139. worker.err(inpack.Err())
  140. fallthrough
  141. case dtEchoRes:
  142. fallthrough
  143. default:
  144. worker.customeHandler(inpack)
  145. }
  146. }
  147. // Connect to Gearman server and tell every server
  148. // what can this worker do.
  149. func (worker *Worker) Ready() (err error) {
  150. if len(worker.agents) == 0 {
  151. return ErrNoneAgents
  152. }
  153. if len(worker.funcs) == 0 {
  154. return ErrNoneFuncs
  155. }
  156. for _, a := range worker.agents {
  157. if err = a.Connect(); err != nil {
  158. return
  159. }
  160. }
  161. for funcname, f := range worker.funcs {
  162. worker.addFunc(funcname, f.timeout)
  163. }
  164. return
  165. }
  166. // Main loop, block here
  167. // Most of time, this should be evaluated in goroutine.
  168. func (worker *Worker) Work() {
  169. defer func() {
  170. for _, a := range worker.agents {
  171. a.Close()
  172. }
  173. }()
  174. worker.running = true
  175. for _, a := range worker.agents {
  176. a.Grab()
  177. }
  178. var inpack *inPack
  179. for inpack = range worker.in {
  180. worker.handleInPack(inpack)
  181. }
  182. }
  183. // custome handling warper
  184. func (worker *Worker) customeHandler(inpack *inPack) {
  185. if worker.JobHandler != nil {
  186. if err := worker.JobHandler(inpack); err != nil {
  187. worker.err(err)
  188. }
  189. }
  190. }
  191. // Close connection and exit main loop
  192. func (worker *Worker) Close() {
  193. worker.Lock()
  194. worker.Unlock()
  195. if worker.running == true {
  196. worker.running = false
  197. close(worker.in)
  198. }
  199. }
  200. // Echo
  201. func (worker *Worker) Echo(data []byte) {
  202. outpack := getOutPack()
  203. outpack.dataType = dtEchoReq
  204. outpack.data = data
  205. worker.broadcast(outpack)
  206. }
  207. // Remove all of functions.
  208. // Both from the worker and job servers.
  209. func (worker *Worker) Reset() {
  210. outpack := getOutPack()
  211. outpack.dataType = dtResetAbilities
  212. worker.broadcast(outpack)
  213. worker.funcs = make(jobFuncs)
  214. }
  215. // Set the worker's unique id.
  216. func (worker *Worker) SetId(id string) {
  217. worker.Id = id
  218. outpack := getOutPack()
  219. outpack.dataType = dtSetClientId
  220. outpack.data = []byte(id)
  221. worker.broadcast(outpack)
  222. }
  223. // inner job executing
  224. func (worker *Worker) exec(inpack *inPack) (err error) {
  225. defer func() {
  226. if worker.limit != nil {
  227. <-worker.limit
  228. }
  229. if r := recover(); r != nil {
  230. if e, ok := r.(error); ok {
  231. err = e
  232. } else {
  233. err = ErrUnknown
  234. }
  235. }
  236. }()
  237. f, ok := worker.funcs[inpack.fn]
  238. if !ok {
  239. return fmt.Errorf("The function does not exist: %s", inpack.fn)
  240. }
  241. var r *result
  242. if f.timeout == 0 {
  243. d, e := f.f(inpack)
  244. r = &result{data: d, err: e}
  245. } else {
  246. r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
  247. }
  248. if worker.running {
  249. outpack := getOutPack()
  250. if r.err == nil {
  251. outpack.dataType = dtWorkComplete
  252. } else {
  253. if len(r.data) == 0 {
  254. outpack.dataType = dtWorkFail
  255. } else {
  256. outpack.dataType = dtWorkException
  257. }
  258. err = r.err
  259. }
  260. outpack.handle = inpack.handle
  261. outpack.data = r.data
  262. inpack.a.write(outpack)
  263. }
  264. return
  265. }
  266. // inner result
  267. type result struct {
  268. data []byte
  269. err error
  270. }
  271. // executing timer
  272. func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
  273. rslt := make(chan *result)
  274. defer close(rslt)
  275. go func() {
  276. defer func() { recover() }()
  277. d, e := f(job)
  278. rslt <- &result{data: d, err: e}
  279. }()
  280. select {
  281. case r = <-rslt:
  282. case <-time.After(timeout):
  283. return &result{err: ErrTimeOut}
  284. }
  285. return r
  286. }