You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

300 lines
6.2 KiB

  1. package worker
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. const (
  9. Unlimited = 0
  10. OneByOne = 1
  11. Immediately = 0
  12. )
  13. /*
  14. Worker side api for gearman
  15. usage:
  16. w = worker.New(worker.Unlimited)
  17. w.AddFunction("foobar", foobar)
  18. w.AddServer("127.0.0.1:4730")
  19. w.Work() // Enter the worker's main loop
  20. The definition of the callback function 'foobar' should suit for the type 'JobFunction'.
  21. It looks like this:
  22. func foobar(job *Job) (data []byte, err os.Error) {
  23. //sth. here
  24. //plaplapla...
  25. return
  26. }
  27. */
  28. type Worker struct {
  29. sync.Mutex
  30. agents []*agent
  31. funcs JobFuncs
  32. in chan *inPack
  33. running bool
  34. Id string
  35. // assign a ErrFunc to handle errors
  36. ErrorHandler ErrorHandler
  37. JobHandler JobHandler
  38. limit chan bool
  39. }
  40. // Get a new worker
  41. func New(limit int) (worker *Worker) {
  42. worker = &Worker{
  43. agents: make([]*agent, 0, limit),
  44. funcs: make(JobFuncs),
  45. in: make(chan *inPack, QUEUE_SIZE),
  46. }
  47. if limit != Unlimited {
  48. worker.limit = make(chan bool, limit-1)
  49. }
  50. return
  51. }
  52. //
  53. func (worker *Worker) err(e error) {
  54. if worker.ErrorHandler != nil {
  55. worker.ErrorHandler(e)
  56. }
  57. }
  58. // Add a server. The addr should be 'host:port' format.
  59. // The connection is established at this time.
  60. func (worker *Worker) AddServer(net, addr string) (err error) {
  61. // Create a new job server's client as a agent of server
  62. a, err := newAgent(net, addr, worker)
  63. if err != nil {
  64. return err
  65. }
  66. worker.agents = append(worker.agents, a)
  67. return
  68. }
  69. // Write a job to job server.
  70. // Here, the job's mean is not the oraginal mean.
  71. // Just looks like a network package for job's result or tell job server, there was a fail.
  72. func (worker *Worker) broadcast(outpack *outPack) {
  73. for _, v := range worker.agents {
  74. v.write(outpack)
  75. }
  76. }
  77. // Add a function.
  78. // Plz added job servers first, then functions.
  79. // The API will tell every connected job server that 'I can do this'
  80. func (worker *Worker) AddFunc(funcname string,
  81. f JobFunc, timeout uint32) (err error) {
  82. worker.Lock()
  83. defer worker.Unlock()
  84. if _, ok := worker.funcs[funcname]; ok {
  85. return fmt.Errorf("The function already exists: %s", funcname)
  86. }
  87. worker.funcs[funcname] = &jobFunc{f: f, timeout: timeout}
  88. if worker.running {
  89. worker.addFunc(funcname, timeout)
  90. }
  91. return
  92. }
  93. // inner add function
  94. func (worker *Worker) addFunc(funcname string, timeout uint32) {
  95. outpack := getOutPack()
  96. if timeout == 0 {
  97. outpack.dataType = CAN_DO
  98. outpack.data = []byte(funcname)
  99. } else {
  100. outpack.dataType = CAN_DO_TIMEOUT
  101. l := len(funcname)
  102. outpack.data = getBuffer(l + 5)
  103. copy(outpack.data, []byte(funcname))
  104. outpack.data[l] = '\x00'
  105. binary.BigEndian.PutUint32(outpack.data[l+1:], timeout)
  106. }
  107. worker.broadcast(outpack)
  108. }
  109. // Remove a function.
  110. func (worker *Worker) RemoveFunc(funcname string) (err error) {
  111. worker.Lock()
  112. defer worker.Unlock()
  113. if _, ok := worker.funcs[funcname]; !ok {
  114. return fmt.Errorf("The function does not exist: %s", funcname)
  115. }
  116. delete(worker.funcs, funcname)
  117. if worker.running {
  118. worker.removeFunc(funcname)
  119. }
  120. return
  121. }
  122. // inner remove function
  123. func (worker *Worker) removeFunc(funcname string) {
  124. outpack := getOutPack()
  125. outpack.dataType = CANT_DO
  126. outpack.data = []byte(funcname)
  127. worker.broadcast(outpack)
  128. }
  129. func (worker *Worker) handleInPack(inpack *inPack) {
  130. switch inpack.dataType {
  131. case NO_JOB:
  132. inpack.a.PreSleep()
  133. case NOOP:
  134. inpack.a.Grab()
  135. case ERROR:
  136. worker.err(GetError(inpack.data))
  137. case JOB_ASSIGN, JOB_ASSIGN_UNIQ:
  138. go func() {
  139. if err := worker.exec(inpack); err != nil {
  140. worker.err(err)
  141. }
  142. }()
  143. if worker.limit != nil {
  144. worker.limit <- true
  145. }
  146. inpack.a.Grab()
  147. case ECHO_RES:
  148. fallthrough
  149. default:
  150. worker.customeHandler(inpack)
  151. }
  152. }
  153. func (worker *Worker) Ready() (err error) {
  154. for _, v := range worker.agents {
  155. if err = v.Connect(); err != nil {
  156. return
  157. }
  158. }
  159. for funcname, f := range worker.funcs {
  160. worker.addFunc(funcname, f.timeout)
  161. }
  162. return
  163. }
  164. // Main loop
  165. func (worker *Worker) Work() {
  166. worker.running = true
  167. for _, v := range worker.agents {
  168. v.Grab()
  169. }
  170. var inpack *inPack
  171. for inpack = range worker.in {
  172. worker.handleInPack(inpack)
  173. }
  174. }
  175. // job handler
  176. func (worker *Worker) customeHandler(inpack *inPack) {
  177. if worker.JobHandler != nil {
  178. if err := worker.JobHandler(inpack); err != nil {
  179. worker.err(err)
  180. }
  181. }
  182. }
  183. // Close.
  184. func (worker *Worker) Close() {
  185. worker.running = false
  186. close(worker.in)
  187. }
  188. // Send a something out, get the samething back.
  189. func (worker *Worker) Echo(data []byte) {
  190. outpack := getOutPack()
  191. outpack.dataType = ECHO_REQ
  192. outpack.data = data
  193. worker.broadcast(outpack)
  194. }
  195. // Remove all of functions.
  196. // Both from the worker or job servers.
  197. func (worker *Worker) Reset() {
  198. outpack := getOutPack()
  199. outpack.dataType = RESET_ABILITIES
  200. worker.broadcast(outpack)
  201. worker.funcs = make(JobFuncs)
  202. }
  203. // Set the worker's unique id.
  204. func (worker *Worker) SetId(id string) {
  205. worker.Id = id
  206. outpack := getOutPack()
  207. outpack.dataType = SET_CLIENT_ID
  208. outpack.data = []byte(id)
  209. worker.broadcast(outpack)
  210. }
  211. // Execute the job. And send back the result.
  212. func (worker *Worker) exec(inpack *inPack) (err error) {
  213. defer func() {
  214. if worker.limit != nil {
  215. <-worker.limit
  216. }
  217. if r := recover(); r != nil {
  218. if e, ok := r.(error); ok {
  219. err = e
  220. } else {
  221. err = ErrUnknown
  222. }
  223. }
  224. }()
  225. f, ok := worker.funcs[inpack.fn]
  226. if !ok {
  227. return fmt.Errorf("The function does not exist: %s", inpack.fn)
  228. }
  229. var r *result
  230. if f.timeout == 0 {
  231. d, e := f.f(inpack)
  232. r = &result{data: d, err: e}
  233. } else {
  234. r = execTimeout(f.f, inpack, time.Duration(f.timeout)*time.Second)
  235. }
  236. if worker.running {
  237. outpack := getOutPack()
  238. if r.err == nil {
  239. outpack.dataType = WORK_COMPLETE
  240. } else {
  241. if len(r.data) == 0 {
  242. outpack.dataType = WORK_FAIL
  243. } else {
  244. outpack.dataType = WORK_EXCEPTION
  245. }
  246. err = r.err
  247. }
  248. outpack.handle = inpack.handle
  249. outpack.data = r.data
  250. inpack.a.write(outpack)
  251. }
  252. return
  253. }
  254. type result struct {
  255. data []byte
  256. err error
  257. }
  258. func execTimeout(f JobFunc, job Job, timeout time.Duration) (r *result) {
  259. rslt := make(chan *result)
  260. defer close(rslt)
  261. go func() {
  262. defer func() { recover() }()
  263. d, e := f(job)
  264. rslt <- &result{data: d, err: e}
  265. }()
  266. select {
  267. case r = <-rslt:
  268. case <-time.After(timeout):
  269. return &result{err: ErrTimeOut}
  270. }
  271. return r
  272. }