You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

127 lines
2.8 KiB

  1. package worker
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "strconv"
  7. )
  8. // Worker side job
  9. type inPack struct {
  10. dataType uint32
  11. data []byte
  12. handle, uniqueId, fn string
  13. a *agent
  14. }
  15. // Create a new job
  16. func getInPack() *inPack {
  17. return &inPack{}
  18. }
  19. func (inpack *inPack) Data() []byte {
  20. return inpack.data
  21. }
  22. func (inpack *inPack) Fn() string {
  23. return inpack.fn
  24. }
  25. func (inpack *inPack) Handle() string {
  26. return inpack.handle
  27. }
  28. func (inpack *inPack) UniqueId() string {
  29. return inpack.uniqueId
  30. }
  31. func (inpack *inPack) Err() error {
  32. if inpack.dataType == dtError {
  33. return getError(inpack.data)
  34. }
  35. return nil
  36. }
  37. // Send some datas to client.
  38. // Using this in a job's executing.
  39. func (inpack *inPack) SendData(data []byte) {
  40. outpack := getOutPack()
  41. outpack.dataType = dtWorkData
  42. hl := len(inpack.handle)
  43. l := hl + len(data) + 1
  44. outpack.data = getBuffer(l)
  45. copy(outpack.data, []byte(inpack.handle))
  46. copy(outpack.data[hl+1:], data)
  47. inpack.a.write(outpack)
  48. }
  49. func (inpack *inPack) SendWarning(data []byte) {
  50. outpack := getOutPack()
  51. outpack.dataType = dtWorkWarning
  52. hl := len(inpack.handle)
  53. l := hl + len(data) + 1
  54. outpack.data = getBuffer(l)
  55. copy(outpack.data, []byte(inpack.handle))
  56. copy(outpack.data[hl+1:], data)
  57. inpack.a.write(outpack)
  58. }
  59. // Update status.
  60. // Tall client how many percent job has been executed.
  61. func (inpack *inPack) UpdateStatus(numerator, denominator int) {
  62. n := []byte(strconv.Itoa(numerator))
  63. d := []byte(strconv.Itoa(denominator))
  64. outpack := getOutPack()
  65. outpack.dataType = dtWorkStatus
  66. hl := len(inpack.handle)
  67. nl := len(n)
  68. dl := len(d)
  69. outpack.data = getBuffer(hl + nl + dl + 2)
  70. copy(outpack.data, []byte(inpack.handle))
  71. copy(outpack.data[hl+1:], n)
  72. copy(outpack.data[hl+nl+2:], d)
  73. inpack.a.write(outpack)
  74. }
  75. // Decode job from byte slice
  76. func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
  77. if len(data) < minPacketLength { // valid package should not less 12 bytes
  78. err = fmt.Errorf("Invalid data: %v", data)
  79. return
  80. }
  81. dl := int(binary.BigEndian.Uint32(data[8:12]))
  82. if len(data) < (dl + minPacketLength) {
  83. err = fmt.Errorf("Not enough data: %v", data)
  84. return
  85. }
  86. dt := data[minPacketLength : dl+minPacketLength]
  87. if len(dt) != int(dl) { // length not equal
  88. err = fmt.Errorf("Invalid data: %v", data)
  89. return
  90. }
  91. inpack = getInPack()
  92. inpack.dataType = binary.BigEndian.Uint32(data[4:8])
  93. switch inpack.dataType {
  94. case dtJobAssign:
  95. s := bytes.SplitN(dt, []byte{'\x00'}, 3)
  96. if len(s) == 3 {
  97. inpack.handle = string(s[0])
  98. inpack.fn = string(s[1])
  99. inpack.data = s[2]
  100. }
  101. case dtJobAssignUniq:
  102. s := bytes.SplitN(dt, []byte{'\x00'}, 4)
  103. if len(s) == 4 {
  104. inpack.handle = string(s[0])
  105. inpack.fn = string(s[1])
  106. inpack.uniqueId = string(s[2])
  107. inpack.data = s[3]
  108. }
  109. default:
  110. inpack.data = dt
  111. }
  112. l = dl + minPacketLength
  113. return
  114. }