You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

304 lines
6.3 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. "encoding/binary"
  7. "fmt"
  8. "sync"
  9. "time"
  10. )
  11. const (
  12. Unlimited = 0
  13. OneByOne = 1
  14. Immediately = 0
  15. )
  16. /*
  17. Worker side api for gearman
  18. usage:
  19. w = worker.New(worker.Unlimited)
  20. w.AddFunction("foobar", foobar)
  21. w.AddServer("127.0.0.1:4730")
  22. w.Work() // Enter the worker's main loop
  23. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  24. It looks like this:
  25. func foobar(job *Job) (data []byte, err os.Error) {
  26. //sth. here
  27. //plaplapla...
  28. return
  29. }
  30. */
  31. type Worker struct {
  32. sync.Mutex
  33. agents []*agent
  34. funcs JobFuncs
  35. in chan *inPack
  36. running bool
  37. Id string
  38. // assign a ErrFunc to handle errors
  39. ErrorHandler ErrorHandler
  40. JobHandler JobHandler
  41. limit chan bool
  42. }
  43. // Get a new worker
  44. func New(limit int) (worker *Worker) {
  45. worker = &Worker{
  46. agents: make([]*agent, 0),
  47. funcs: make(JobFuncs),
  48. in: make(chan *inPack, QUEUE_SIZE),
  49. }
  50. if limit != Unlimited {
  51. worker.limit = make(chan bool, limit - 1)
  52. }
  53. return
  54. }
  55. //
  56. func (worker *Worker) err(e error) {
  57. if worker.ErrorHandler != nil {
  58. worker.ErrorHandler(e)
  59. }
  60. }
  61. // Add a server. The addr should be 'host:port' format.
  62. // The connection is established at this time.
  63. func (worker *Worker) AddServer(net, addr string) (err error) {
  64. // Create a new job server's client as a agent of server
  65. a, err := newAgent(net, addr, worker)
  66. if err != nil {
  67. return err
  68. }
  69. worker.agents = append(worker.agents, a)
  70. return
  71. }
  72. // Write a job to job server.
  73. // Here, the job's mean is not the oraginal mean.
  74. // Just looks like a network package for job's result or tell job server, there was a fail.
  75. func (worker *Worker) broadcast(outpack *outPack) {
  76. for _, v := range worker.agents {
  77. v.write(outpack)
  78. }
  79. }
  80. // Add a function.
  81. // Plz added job servers first, then functions.
  82. // The API will tell every connected job server that 'I can do this'
  83. func (worker *Worker) AddFunc(funcname string,
  84. f JobFunc, timeout uint32) (err error) {
  85. worker.Lock()
  86. defer worker.Unlock()
  87. if _, ok := worker.funcs[funcname]; ok {
  88. return fmt.Errorf("The function already exists: %s", funcname)
  89. }
  90. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  91. if worker.running {
  92. worker.addFunc(funcname, timeout)
  93. }
  94. return
  95. }
  96. // inner add function
  97. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  98. outpack := getOutPack()
  99. if timeout == 0 {
  100. outpack.dataType = CAN_DO
  101. outpack.data = []byte(funcname)
  102. } else {
  103. outpack.dataType = CAN_DO_TIMEOUT
  104. l := len(funcname)
  105. outpack.data = getBuffer(l + 5)
  106. copy(outpack.data, []byte(funcname))
  107. outpack.data[l] = '\x00'
  108. binary.BigEndian.PutUint32(outpack.data[l+1:], timeout)
  109. }
  110. worker.broadcast(outpack)
  111. }
  112. // Remove a function.
  113. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  114. worker.Lock()
  115. defer worker.Unlock()
  116. if _, ok := worker.funcs[funcname]; !ok {
  117. return fmt.Errorf("The function does not exist: %s", funcname)
  118. }
  119. delete(worker.funcs, funcname)
  120. if worker.running {
  121. worker.removeFunc(funcname)
  122. }
  123. return
  124. }
  125. // inner remove function
  126. func (worker *Worker) removeFunc(funcname string) {
  127. outpack := getOutPack()
  128. outpack.dataType = CANT_DO
  129. outpack.data = []byte(funcname)
  130. worker.broadcast(outpack)
  131. }
  132. func (worker *Worker) handleInPack(inpack *inPack) {
  133. switch inpack.dataType {
  134. case NO_JOB:
  135. inpack.a.PreSleep()
  136. case NOOP:
  137. inpack.a.Grab()
  138. case ERROR:
  139. worker.err(GetError(inpack.data))
  140. case JOB_ASSIGN, JOB_ASSIGN_UNIQ:
  141. go func() {
  142. if err := worker.exec(inpack); err != nil {
  143. worker.err(err)
  144. }
  145. }()
  146. if (worker.limit != nil) {
  147. worker.limit <- true
  148. }
  149. inpack.a.Grab()
  150. case ECHO_RES:
  151. fallthrough
  152. default:
  153. worker.customeHandler(inpack)
  154. }
  155. }
  156. func (worker *Worker) Ready() (err error) {
  157. for _, v := range worker.agents {
  158. if err = v.Connect(); err != nil {
  159. return
  160. }
  161. }
  162. for funcname, f := range worker.funcs {
  163. worker.addFunc(funcname, f.timeout)
  164. }
  165. return
  166. }
  167. // Main loop
  168. func (worker *Worker) Work() {
  169. worker.running = true
  170. for _, v := range worker.agents {
  171. v.Grab()
  172. }
  173. var inpack *inPack
  174. for inpack = range worker.in {
  175. worker.handleInPack(inpack)
  176. }
  177. }
  178. // job handler
  179. func (worker *Worker) customeHandler(inpack *inPack) {
  180. if worker.JobHandler != nil {
  181. if err := worker.JobHandler(inpack); err != nil {
  182. worker.err(err)
  183. }
  184. }
  185. }
  186. // Close.
  187. func (worker *Worker) Close() {
  188. worker.running = false
  189. close(worker.in)
  190. }
  191. // Send a something out, get the samething back.
  192. func (worker *Worker) Echo(data []byte) {
  193. outpack := getOutPack()
  194. outpack.dataType = ECHO_REQ
  195. outpack.data = data
  196. worker.broadcast(outpack)
  197. }
  198. // Remove all of functions.
  199. // Both from the worker or job servers.
  200. func (worker *Worker) Reset() {
  201. outpack := getOutPack()
  202. outpack.dataType = RESET_ABILITIES
  203. worker.broadcast(outpack)
  204. worker.funcs = make(JobFuncs)
  205. }
  206. // Set the worker's unique id.
  207. func (worker *Worker) SetId(id string) {
  208. worker.Id = id
  209. outpack := getOutPack()
  210. outpack.dataType = SET_CLIENT_ID
  211. outpack.data = []byte(id)
  212. worker.broadcast(outpack)
  213. }
  214. // Execute the job. And send back the result.
  215. func (worker *Worker) exec(inpack *inPack) (err error) {
  216. defer func() {
  217. if (worker.limit != nil) {
  218. <-worker.limit
  219. }
  220. if r := recover(); r != nil {
  221. if e, ok := r.(error); ok {
  222. err = e
  223. } else {
  224. err = ErrUnknown
  225. }
  226. }
  227. }()
  228. f, ok := worker.funcs[inpack.fn]
  229. if !ok {
  230. return fmt.Errorf("The function does not exist: %s", inpack.fn)
  231. }
  232. var r *result
  233. if f.timeout == 0 {
  234. d, e := f.f(inpack)
  235. r = &result{data: d, err: e}
  236. } else {
  237. r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
  238. }
  239. if worker.running {
  240. outpack := getOutPack()
  241. if r.err == nil {
  242. outpack.dataType = WORK_COMPLETE
  243. } else {
  244. if len(r.data) == 0 {
  245. outpack.dataType = WORK_FAIL
  246. } else {
  247. outpack.dataType = WORK_EXCEPTION
  248. }
  249. err = r.err
  250. }
  251. outpack.handle = inpack.handle
  252. outpack.data = r.data
  253. inpack.a.write(outpack)
  254. }
  255. return
  256. }
  257. type result struct {
  258. data []byte
  259. err error
  260. }
  261. func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
  262. rslt := make(chan *result)
  263. defer close(rslt)
  264. go func() {
  265. defer func() { recover() }()
  266. d, e := f(job)
  267. rslt <- &result{data: d, err: e}
  268. }()
  269. select {
  270. case r = <-rslt:
  271. case <-time.After(timeout):
  272. return &result{err: ErrTimeOut}
  273. }
  274. return r
  275. }