You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

302 lines
6.3 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. "encoding/binary"
  7. "fmt"
  8. "sync"
  9. "time"
  10. )
  11. const (
  12. Unlimited = 0
  13. OneByOne = 1
  14. Immediately = 0
  15. )
  16. /*
  17. Worker side api for gearman
  18. usage:
  19. w = worker.New(worker.Unlimited)
  20. w.AddFunction("foobar", foobar)
  21. w.AddServer("127.0.0.1:4730")
  22. w.Work() // Enter the worker's main loop
  23. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  24. It looks like this:
  25. func foobar(job *Job) (data []byte, err os.Error) {
  26. //sth. here
  27. //plaplapla...
  28. return
  29. }
  30. */
  31. type Worker struct {
  32. agents map[string]*agent
  33. funcs JobFuncs
  34. in chan *inPack
  35. running bool
  36. limit chan bool
  37. Id string
  38. // assign a ErrFunc to handle errors
  39. ErrorHandler ErrorHandler
  40. JobHandler JobHandler
  41. mutex sync.Mutex
  42. }
  43. // Get a new worker
  44. func New(l int) (worker *Worker) {
  45. worker = &Worker{
  46. agents: make(map[string]*agent, QUEUE_SIZE),
  47. funcs: make(JobFuncs),
  48. in: make(chan *inPack, QUEUE_SIZE),
  49. }
  50. if l != Unlimited {
  51. worker.limit = make(chan bool, l)
  52. }
  53. return
  54. }
  55. //
  56. func (worker *Worker) err(e error) {
  57. if worker.ErrorHandler != nil {
  58. worker.ErrorHandler(e)
  59. }
  60. }
  61. // Add a server. The addr should be 'host:port' format.
  62. // The connection is established at this time.
  63. func (worker *Worker) AddServer(net, addr string) (err error) {
  64. // Create a new job server's client as a agent of server
  65. a, err := newAgent(net, addr, worker)
  66. if err != nil {
  67. return err
  68. }
  69. worker.agents[net+addr] = a
  70. return
  71. }
  72. // Write a job to job server.
  73. // Here, the job's mean is not the oraginal mean.
  74. // Just looks like a network package for job's result or tell job server, there was a fail.
  75. func (worker *Worker) broadcast(outpack *outPack) {
  76. for _, v := range worker.agents {
  77. v.write(outpack)
  78. }
  79. }
  80. // Add a function.
  81. // Plz added job servers first, then functions.
  82. // The API will tell every connected job server that 'I can do this'
  83. func (worker *Worker) AddFunc(funcname string,
  84. f JobFunc, timeout uint32) (err error) {
  85. worker.mutex.Lock()
  86. defer worker.mutex.Unlock()
  87. if _, ok := worker.funcs[funcname]; ok {
  88. return fmt.Errorf("The function already exists: %s", funcname)
  89. }
  90. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  91. if worker.running {
  92. worker.addFunc(funcname, timeout)
  93. }
  94. return
  95. }
  96. // inner add function
  97. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  98. outpack := getOutPack()
  99. if timeout == 0 {
  100. outpack.dataType = CAN_DO
  101. outpack.data = []byte(funcname)
  102. } else {
  103. outpack.dataType = CAN_DO_TIMEOUT
  104. l := len(funcname)
  105. outpack.data = getBuffer(l + 5)
  106. copy(outpack.data, []byte(funcname))
  107. outpack.data[l] = '\x00'
  108. binary.BigEndian.PutUint32(outpack.data[l+1:], timeout)
  109. }
  110. worker.broadcast(outpack)
  111. }
  112. // Remove a function.
  113. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  114. worker.mutex.Lock()
  115. defer worker.mutex.Unlock()
  116. if _, ok := worker.funcs[funcname]; !ok {
  117. return fmt.Errorf("The function does not exist: %s", funcname)
  118. }
  119. delete(worker.funcs, funcname)
  120. if worker.running {
  121. worker.removeFunc(funcname)
  122. }
  123. return
  124. }
  125. // inner remove function
  126. func (worker *Worker) removeFunc(funcname string) {
  127. outpack := getOutPack()
  128. outpack.dataType = CANT_DO
  129. outpack.data = []byte(funcname)
  130. worker.broadcast(outpack)
  131. }
  132. func (worker *Worker) handleInPack(inpack *inPack) {
  133. defer func() {
  134. if worker.running && worker.limit != nil {
  135. <-worker.limit
  136. }
  137. }()
  138. switch inpack.dataType {
  139. case NO_JOB:
  140. inpack.a.PreSleep()
  141. case NOOP:
  142. inpack.a.Grab()
  143. case ERROR:
  144. worker.err(GetError(inpack.data))
  145. case JOB_ASSIGN, JOB_ASSIGN_UNIQ:
  146. if err := worker.exec(inpack); err != nil {
  147. worker.err(err)
  148. }
  149. default:
  150. worker.customeHandler(inpack)
  151. }
  152. }
  153. func (worker *Worker) Ready() (err error) {
  154. for _, v := range worker.agents {
  155. if err = v.Connect(); err != nil {
  156. return
  157. }
  158. }
  159. for funcname, f := range worker.funcs {
  160. worker.addFunc(funcname, f.timeout)
  161. }
  162. return
  163. }
  164. // Main loop
  165. func (worker *Worker) Work() {
  166. worker.running = true
  167. for _, v := range worker.agents {
  168. v.Grab()
  169. }
  170. var inpack *inPack
  171. for inpack = range worker.in {
  172. go worker.handleInPack(inpack)
  173. }
  174. }
  175. // job handler
  176. func (worker *Worker) customeHandler(inpack *inPack) {
  177. if worker.JobHandler != nil {
  178. if err := worker.JobHandler(inpack); err != nil {
  179. worker.err(err)
  180. }
  181. }
  182. }
  183. // Close.
  184. func (worker *Worker) Close() {
  185. worker.running = false
  186. close(worker.in)
  187. if worker.limit != nil {
  188. close(worker.limit)
  189. }
  190. }
  191. // Send a something out, get the samething back.
  192. func (worker *Worker) Echo(data []byte) {
  193. outpack := getOutPack()
  194. outpack.dataType = ECHO_REQ
  195. outpack.data = data
  196. worker.broadcast(outpack)
  197. }
  198. // Remove all of functions.
  199. // Both from the worker or job servers.
  200. func (worker *Worker) Reset() {
  201. outpack := getOutPack()
  202. outpack.dataType = RESET_ABILITIES
  203. worker.broadcast(outpack)
  204. worker.funcs = make(JobFuncs)
  205. }
  206. // Set the worker's unique id.
  207. func (worker *Worker) SetId(id string) {
  208. worker.Id = id
  209. outpack := getOutPack()
  210. outpack.dataType = SET_CLIENT_ID
  211. outpack.data = []byte(id)
  212. worker.broadcast(outpack)
  213. }
  214. // Execute the job. And send back the result.
  215. func (worker *Worker) exec(inpack *inPack) (err error) {
  216. defer func() {
  217. if r := recover(); r != nil {
  218. if e, ok := r.(error); ok {
  219. err = e
  220. } else {
  221. err = ErrUnknown
  222. }
  223. }
  224. }()
  225. f, ok := worker.funcs[inpack.fn]
  226. if !ok {
  227. return fmt.Errorf("The function does not exist: %s", inpack.fn)
  228. }
  229. var r *result
  230. if f.timeout == 0 {
  231. d, e := f.f(inpack)
  232. r = &result{data: d, err: e}
  233. } else {
  234. r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
  235. }
  236. if worker.running {
  237. outpack := getOutPack()
  238. if r.err == nil {
  239. outpack.dataType = WORK_COMPLETE
  240. } else {
  241. if len(r.data) == 0 {
  242. outpack.dataType = WORK_FAIL
  243. } else {
  244. outpack.dataType = WORK_EXCEPTION
  245. }
  246. err = r.err
  247. }
  248. outpack.handle = inpack.handle
  249. outpack.data = r.data
  250. inpack.a.write(outpack)
  251. inpack.a.Grab()
  252. }
  253. return
  254. }
  255. type result struct {
  256. data []byte
  257. err error
  258. }
  259. func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
  260. rslt := make(chan *result)
  261. defer close(rslt)
  262. go func() {
  263. defer func() { recover() }()
  264. d, e := f(job)
  265. rslt <- &result{data: d, err: e}
  266. }()
  267. select {
  268. case r = <-rslt:
  269. case <-time.After(timeout):
  270. return &result{err: ErrTimeOut}
  271. }
  272. return r
  273. }