You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
3.0 KiB

  1. package grsync
  2. import (
  3. "bufio"
  4. "io"
  5. "math"
  6. "strconv"
  7. "strings"
  8. )
  9. // Task is high-level API under rsync
  10. type Task struct {
  11. rsync *Rsync
  12. state *State
  13. log *Log
  14. }
  15. // State contains information about rsync process
  16. type State struct {
  17. Remain int `json:"remain"`
  18. Total int `json:"total"`
  19. Speed string `json:"speed"`
  20. Progress float64 `json:"progress"`
  21. }
  22. // Log contains raw stderr and stdout outputs
  23. type Log struct {
  24. Stderr string `json:"stderr"`
  25. Stdout string `json:"stdout"`
  26. }
  27. // State returns inforation about rsync processing task
  28. func (t Task) State() State {
  29. return *t.state
  30. }
  31. // Log return structure which contains raw stderr and stdout outputs
  32. func (t Task) Log() Log {
  33. return Log{
  34. Stderr: t.log.Stderr,
  35. Stdout: t.log.Stdout,
  36. }
  37. }
  38. // Run starts rsync process with options
  39. func (t *Task) Run() error {
  40. stderr, err := t.rsync.StderrPipe()
  41. if err != nil {
  42. return err
  43. }
  44. defer stderr.Close()
  45. stdout, err := t.rsync.StdoutPipe()
  46. if err != nil {
  47. return err
  48. }
  49. defer stdout.Close()
  50. go processStdout(t, stdout)
  51. go processStderr(t, stderr)
  52. return t.rsync.Run()
  53. }
  54. // NewTask returns new rsync task
  55. func NewTask(source, destination string, rsyncOptions RsyncOptions) *Task {
  56. // Force set required options
  57. rsyncOptions.HumanReadable = true
  58. rsyncOptions.Partial = true
  59. rsyncOptions.Progress = true
  60. rsyncOptions.Archive = true
  61. return &Task{
  62. rsync: NewRsync(source, destination, rsyncOptions),
  63. state: &State{},
  64. log: &Log{},
  65. }
  66. }
  67. func processStdout(task *Task, stdout io.Reader) {
  68. const maxPercents = float64(100)
  69. const minDivider = 1
  70. progressMatcher := newMatcher(`\(.+-chk=(\d+.\d+)`)
  71. speedMatcher := newMatcher(`(\d+\.\d+.{2}\/s)`)
  72. // Extract data from strings:
  73. // 999,999 99% 999.99kB/s 0:00:59 (xfr#9, to-chk=999/9999)
  74. scanner := bufio.NewScanner(stdout)
  75. for scanner.Scan() {
  76. logStr := scanner.Text()
  77. if progressMatcher.Match(logStr) {
  78. task.state.Remain, task.state.Total = getTaskProgress(progressMatcher.Extract(logStr))
  79. copiedCount := float64(task.state.Total - task.state.Remain)
  80. task.state.Progress = copiedCount / math.Max(float64(task.state.Total), float64(minDivider)) * maxPercents
  81. }
  82. if speedMatcher.Match(logStr) {
  83. task.state.Speed = getTaskSpeed(speedMatcher.ExtractAllStringSubmatch(logStr, 2))
  84. }
  85. task.log.Stdout += logStr + "\n"
  86. }
  87. }
  88. func processStderr(task *Task, stderr io.Reader) {
  89. scanner := bufio.NewScanner(stderr)
  90. for scanner.Scan() {
  91. task.log.Stderr += scanner.Text() + "\n"
  92. }
  93. }
  94. func getTaskProgress(remTotalString string) (int, int) {
  95. const remTotalSeparator = "/"
  96. const numbersCount = 2
  97. const (
  98. indexRem = iota
  99. indexTotal
  100. )
  101. info := strings.Split(remTotalString, remTotalSeparator)
  102. if len(info) < numbersCount {
  103. return 0, 0
  104. }
  105. remain, _ := strconv.Atoi(info[indexRem])
  106. total, _ := strconv.Atoi(info[indexTotal])
  107. return remain, total
  108. }
  109. func getTaskSpeed(data [][]string) string {
  110. if len(data) < 2 || len(data[1]) < 2 {
  111. return ""
  112. }
  113. return data[1][1]
  114. }