You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

116 lines
3.0 KiB

  1. // Copyright 2011 Xing Xing <mikespook@gmail.com>
  2. // All rights reserved.
  3. // Use of this source code is governed by a MIT
  4. // license that can be found in the LICENSE file.
  5. package worker
  6. import (
  7. "strconv"
  8. "github.com/mikespook/gearman-go/common"
  9. )
  10. // Worker side job
  11. type Job struct {
  12. Data []byte
  13. Handle, UniqueId string
  14. agent *agent
  15. magicCode, DataType uint32
  16. c chan bool
  17. }
  18. // Create a new job
  19. func newJob(magiccode, datatype uint32, data []byte) (job *Job) {
  20. return &Job{magicCode: magiccode,
  21. DataType: datatype,
  22. Data: data,
  23. c: make(chan bool),
  24. }
  25. }
  26. // Decode job from byte slice
  27. func decodeJob(data []byte) (job *Job, err error) {
  28. if len(data) < 12 {
  29. return nil, common.Errorf("Invalid data: %V", data)
  30. }
  31. datatype := common.BytesToUint32([4]byte{data[4], data[5], data[6], data[7]})
  32. l := common.BytesToUint32([4]byte{data[8], data[9], data[10], data[11]})
  33. if len(data[12:]) != int(l) {
  34. return nil, common.Errorf("Invalid data: %V", data)
  35. }
  36. data = data[12:]
  37. job = newJob(common.RES, datatype, data)
  38. return
  39. }
  40. // Encode a job to byte slice
  41. func (job *Job) Encode() (data []byte) {
  42. var l int
  43. if job.DataType == common.WORK_FAIL {
  44. l = len(job.Handle)
  45. } else {
  46. l = len(job.Data)
  47. if job.Handle != "" {
  48. l += len(job.Handle) + 1
  49. }
  50. }
  51. data = make([]byte, 0, l + 12)
  52. magiccode := common.Uint32ToBytes(job.magicCode)
  53. datatype := common.Uint32ToBytes(job.DataType)
  54. datalength := common.Uint32ToBytes(uint32(l))
  55. data = append(data, magiccode[:]...)
  56. data = append(data, datatype[:]...)
  57. data = append(data, datalength[:]...)
  58. if job.Handle != "" {
  59. data = append(data, []byte(job.Handle)...)
  60. if job.DataType != common.WORK_FAIL {
  61. data = append(data, 0)
  62. }
  63. }
  64. data = append(data, job.Data...)
  65. return
  66. }
  67. // Send some datas to client.
  68. // Using this in a job's executing.
  69. func (job *Job) UpdateData(data []byte, iswarning bool) {
  70. result := append([]byte(job.Handle), 0)
  71. result = append(result, data...)
  72. var datatype uint32
  73. if iswarning {
  74. datatype = common.WORK_WARNING
  75. } else {
  76. datatype = common.WORK_DATA
  77. }
  78. job.agent.WriteJob(newJob(common.REQ, datatype, result))
  79. }
  80. // Update status.
  81. // Tall client how many percent job has been executed.
  82. func (job *Job) UpdateStatus(numerator, denominator int) {
  83. n := []byte(strconv.Itoa(numerator))
  84. d := []byte(strconv.Itoa(denominator))
  85. result := append([]byte(job.Handle), 0)
  86. result = append(result, n...)
  87. result = append(result, d...)
  88. job.agent.WriteJob(newJob(common.REQ, common.WORK_STATUS, result))
  89. }
  90. // close the job
  91. func (job *Job) Close() {
  92. close(job.c)
  93. }
  94. // cancel the job executing
  95. func (job *Job) cancel() {
  96. defer func() {recover()}()
  97. job.c <- true
  98. }
  99. // When a job was canceled, return a true form a channel
  100. func (job *Job) Canceled() <-chan bool {
  101. return job.c
  102. }