You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

inpack.go 2.8 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package worker
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "strconv"
  7. )
  8. // Worker side job
  9. type inPack struct {
  10. dataType uint32
  11. data []byte
  12. handle, uniqueId, fn string
  13. a *agent
  14. }
  15. // Create a new job
  16. func getInPack() *inPack {
  17. return &inPack{}
  18. }
  19. func (inpack *inPack) Data() []byte {
  20. return inpack.data
  21. }
  22. func (inpack *inPack) Fn() string {
  23. return inpack.fn
  24. }
  25. func (inpack *inPack) Handle() string {
  26. return inpack.handle
  27. }
  28. func (inpack *inPack) UniqueId() string {
  29. return inpack.uniqueId
  30. }
  31. func (inpack *inPack) Err() error {
  32. if inpack.dataType == dtError {
  33. return getError(inpack.data)
  34. }
  35. return nil
  36. }
  37. // Send some datas to client.
  38. // Using this in a job's executing.
  39. func (inpack *inPack) SendData(data []byte) {
  40. outpack := getOutPack()
  41. outpack.dataType = dtWorkData
  42. hl := len(inpack.handle)
  43. l := hl + len(data) + 1
  44. outpack.data = getBuffer(l)
  45. copy(outpack.data, []byte(inpack.handle))
  46. copy(outpack.data[hl+1:], data)
  47. inpack.a.write(outpack)
  48. }
  49. func (inpack *inPack) SendWarning(data []byte) {
  50. outpack := getOutPack()
  51. outpack.dataType = dtWorkWarning
  52. hl := len(inpack.handle)
  53. l := hl + len(data) + 1
  54. outpack.data = getBuffer(l)
  55. copy(outpack.data, []byte(inpack.handle))
  56. copy(outpack.data[hl+1:], data)
  57. inpack.a.write(outpack)
  58. }
  59. // Update status.
  60. // Tall client how many percent job has been executed.
  61. func (inpack *inPack) UpdateStatus(numerator, denominator int) {
  62. n := []byte(strconv.Itoa(numerator))
  63. d := []byte(strconv.Itoa(denominator))
  64. outpack := getOutPack()
  65. outpack.dataType = dtWorkStatus
  66. hl := len(inpack.handle)
  67. nl := len(n)
  68. dl := len(d)
  69. outpack.data = getBuffer(hl + nl + dl + 2)
  70. copy(outpack.data, []byte(inpack.handle))
  71. copy(outpack.data[hl+1:], n)
  72. copy(outpack.data[hl+nl+2:], d)
  73. inpack.a.write(outpack)
  74. }
  75. // Decode job from byte slice
  76. func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
  77. if len(data) < minPacketLength { // valid package should not less 12 bytes
  78. err = fmt.Errorf("Invalid data: %v", data)
  79. return
  80. }
  81. dl := int(binary.BigEndian.Uint32(data[8:12]))
  82. if len(data) < (dl + minPacketLength) {
  83. err = fmt.Errorf("Not enough data: %v", data)
  84. return
  85. }
  86. dt := data[minPacketLength : dl+minPacketLength]
  87. if len(dt) != int(dl) { // length not equal
  88. err = fmt.Errorf("Invalid data: %v", data)
  89. return
  90. }
  91. inpack = getInPack()
  92. inpack.dataType = binary.BigEndian.Uint32(data[4:8])
  93. switch inpack.dataType {
  94. case dtJobAssign:
  95. s := bytes.SplitN(dt, []byte{'\x00'}, 3)
  96. if len(s) == 3 {
  97. inpack.handle = string(s[0])
  98. inpack.fn = string(s[1])
  99. inpack.data = s[2]
  100. }
  101. case dtJobAssignUniq:
  102. s := bytes.SplitN(dt, []byte{'\x00'}, 4)
  103. if len(s) == 4 {
  104. inpack.handle = string(s[0])
  105. inpack.fn = string(s[1])
  106. inpack.uniqueId = string(s[2])
  107. inpack.data = s[3]
  108. }
  109. default:
  110. inpack.data = dt
  111. }
  112. l = dl + minPacketLength
  113. return
  114. }