You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

303 lines
6.2 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com> All rights reserved.
  2. // Use of this source code is governed by a MIT
  3. // license that can be found in the LICENSE file.
  4. package worker
  5. import (
  6. "fmt"
  7. "time"
  8. "sync"
  9. "encoding/binary"
  10. )
  11. const (
  12. Unlimited = 0
  13. OneByOne = 1
  14. Immediately = 0
  15. )
  16. /*
  17. Worker side api for gearman
  18. usage:
  19. w = worker.New(worker.Unlimited)
  20. w.AddFunction("foobar", foobar)
  21. w.AddServer("127.0.0.1:4730")
  22. w.Work() // Enter the worker's main loop
  23. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  24. It looks like this:
  25. func foobar(job *Job) (data []byte, err os.Error) {
  26. //sth. here
  27. //plaplapla...
  28. return
  29. }
  30. */
  31. type Worker struct {
  32. agents map[string]*agent
  33. funcs JobFuncs
  34. in chan *Response
  35. running bool
  36. limit chan bool
  37. Id string
  38. // assign a ErrFunc to handle errors
  39. ErrorHandler ErrorHandler
  40. JobHandler JobHandler
  41. mutex sync.Mutex
  42. }
  43. // Get a new worker
  44. func New(l int) (worker *Worker) {
  45. worker = &Worker{
  46. agents: make(map[string]*agent, QUEUE_SIZE),
  47. funcs: make(JobFuncs),
  48. in: make(chan *Response, QUEUE_SIZE),
  49. }
  50. if l != Unlimited {
  51. worker.limit = make(chan bool, l)
  52. }
  53. return
  54. }
  55. //
  56. func (worker *Worker) err(e error) {
  57. if worker.ErrorHandler != nil {
  58. worker.ErrorHandler(e)
  59. }
  60. }
  61. // Add a server. The addr should be 'host:port' format.
  62. // The connection is established at this time.
  63. func (worker *Worker) AddServer(net, addr string) (err error) {
  64. // Create a new job server's client as a agent of server
  65. a, err := newAgent(net, addr, worker)
  66. if err != nil {
  67. return err
  68. }
  69. worker.agents[net + addr] = a
  70. return
  71. }
  72. // Write a job to job server.
  73. // Here, the job's mean is not the oraginal mean.
  74. // Just looks like a network package for job's result or tell job server, there was a fail.
  75. func (worker *Worker) broadcast(req *request) {
  76. for _, v := range worker.agents {
  77. v.write(req)
  78. }
  79. }
  80. // Add a function.
  81. // Plz added job servers first, then functions.
  82. // The API will tell every connected job server that 'I can do this'
  83. func (worker *Worker) AddFunc(funcname string,
  84. f JobFunc, timeout uint32) (err error) {
  85. worker.mutex.Lock()
  86. defer worker.mutex.Unlock()
  87. if _, ok := worker.funcs[funcname]; ok {
  88. return fmt.Errorf("The function already exists: %s", funcname)
  89. }
  90. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  91. if worker.running {
  92. worker.addFunc(funcname, timeout)
  93. }
  94. return
  95. }
  96. // inner add function
  97. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  98. req := getRequest()
  99. if timeout == 0 {
  100. req.DataType = CAN_DO
  101. req.Data = []byte(funcname)
  102. } else {
  103. req.DataType = CAN_DO_TIMEOUT
  104. l := len(funcname)
  105. req.Data = getBuffer(l + 5)
  106. copy(req.Data, []byte(funcname))
  107. req.Data[l] = '\x00'
  108. binary.BigEndian.PutUint32(req.Data[l + 1:], timeout)
  109. }
  110. worker.broadcast(req)
  111. }
  112. // Remove a function.
  113. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  114. worker.mutex.Lock()
  115. defer worker.mutex.Unlock()
  116. if _, ok := worker.funcs[funcname]; !ok {
  117. return fmt.Errorf("The function does not exist: %s", funcname)
  118. }
  119. delete(worker.funcs, funcname)
  120. if worker.running {
  121. worker.removeFunc(funcname)
  122. }
  123. return
  124. }
  125. // inner remove function
  126. func (worker *Worker) removeFunc(funcname string) {
  127. req := getRequest()
  128. req.DataType = CANT_DO
  129. req.Data = []byte(funcname)
  130. worker.broadcast(req)
  131. }
  132. func (worker *Worker) dealResp(resp *Response) {
  133. defer func() {
  134. if worker.running && worker.limit != nil {
  135. <-worker.limit
  136. }
  137. }()
  138. switch resp.DataType {
  139. case ERROR:
  140. worker.err(GetError(resp.Data))
  141. case JOB_ASSIGN, JOB_ASSIGN_UNIQ:
  142. if err := worker.exec(resp); err != nil {
  143. worker.err(err)
  144. }
  145. default:
  146. worker.handleResponse(resp)
  147. }
  148. }
  149. // Main loop
  150. func (worker *Worker) Work() {
  151. defer func() {
  152. for _, v := range worker.agents {
  153. v.Close()
  154. }
  155. }()
  156. worker.running = true
  157. for _, v := range worker.agents {
  158. v.Connect()
  159. go v.Work()
  160. }
  161. worker.Reset()
  162. for funcname, f := range worker.funcs {
  163. worker.addFunc(funcname, f.timeout)
  164. }
  165. var resp *Response
  166. for resp = range worker.in {
  167. go worker.dealResp(resp)
  168. }
  169. }
  170. // job handler
  171. func (worker *Worker) handleResponse(resp *Response) {
  172. if worker.JobHandler != nil {
  173. job := getJob()
  174. job.a = worker.agents[resp.agentId]
  175. job.Handle = resp.Handle
  176. if resp.DataType == ECHO_RES {
  177. job.data = resp.Data
  178. }
  179. if err := worker.JobHandler(job); err != nil {
  180. worker.err(err)
  181. }
  182. }
  183. }
  184. // Close.
  185. func (worker *Worker) Close() {
  186. worker.running = false
  187. close(worker.in)
  188. if worker.limit != nil {
  189. close(worker.limit)
  190. }
  191. }
  192. // Send a something out, get the samething back.
  193. func (worker *Worker) Echo(data []byte) {
  194. req := getRequest()
  195. req.DataType = ECHO_REQ
  196. req.Data = data
  197. worker.broadcast(req)
  198. }
  199. // Remove all of functions.
  200. // Both from the worker or job servers.
  201. func (worker *Worker) Reset() {
  202. req := getRequest()
  203. req.DataType = RESET_ABILITIES
  204. worker.broadcast(req)
  205. worker.funcs = make(JobFuncs)
  206. }
  207. // Set the worker's unique id.
  208. func (worker *Worker) SetId(id string) {
  209. worker.Id = id
  210. req := getRequest()
  211. req.DataType = SET_CLIENT_ID
  212. req.Data = []byte(id)
  213. worker.broadcast(req)
  214. }
  215. // Execute the job. And send back the result.
  216. func (worker *Worker) exec(resp *Response) (err error) {
  217. defer func() {
  218. if r := recover(); r != nil {
  219. if e, ok := r.(error); ok {
  220. err = e
  221. } else {
  222. err = ErrUnknown
  223. }
  224. }
  225. }()
  226. f, ok := worker.funcs[resp.Fn]
  227. if !ok {
  228. return fmt.Errorf("The function does not exist: %s", resp.Fn)
  229. }
  230. var r *result
  231. job := getJob()
  232. job.a = worker.agents[resp.agentId]
  233. job.Handle = resp.Handle
  234. if f.timeout == 0 {
  235. d, e := f.f(job)
  236. r = &result{data: d, err: e}
  237. } else {
  238. r = execTimeout(f.f, job, time.Duration(f.timeout)*time.Second)
  239. }
  240. req := getRequest()
  241. if r.err == nil {
  242. req.DataType = WORK_COMPLETE
  243. } else {
  244. if r.data == nil {
  245. req.DataType = WORK_FAIL
  246. } else {
  247. req.DataType = WORK_EXCEPTION
  248. }
  249. err = r.err
  250. }
  251. req.Data = r.data
  252. if worker.running {
  253. job.a.write(req)
  254. }
  255. return
  256. }
  257. type result struct {
  258. data []byte
  259. err error
  260. }
  261. func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
  262. rslt := make(chan *result)
  263. defer close(rslt)
  264. go func() {
  265. defer func() { recover() }()
  266. d, e := f(job)
  267. rslt <- &result{data: d, err: e}
  268. }()
  269. select {
  270. case r = <-rslt:
  271. case <-time.After(timeout):
  272. return &result{err: ErrTimeOut}
  273. }
  274. return r
  275. }