grsync/task.go

141 lines
3.0 KiB
Go
Raw Permalink Normal View History

2020-04-03 20:01:47 +08:00
package grsync
import (
"bufio"
"io"
"math"
"strconv"
"strings"
)
// Task is high-level API under rsync
type Task struct {
rsync *Rsync
state *State
log *Log
}
// State contains information about rsync process
type State struct {
Remain int `json:"remain"`
Total int `json:"total"`
Speed string `json:"speed"`
Progress float64 `json:"progress"`
}
// Log contains raw stderr and stdout outputs
type Log struct {
Stderr string `json:"stderr"`
Stdout string `json:"stdout"`
}
// State returns inforation about rsync processing task
func (t Task) State() State {
return *t.state
}
// Log return structure which contains raw stderr and stdout outputs
func (t Task) Log() Log {
return Log{
Stderr: t.log.Stderr,
Stdout: t.log.Stdout,
}
}
// Run starts rsync process with options
func (t *Task) Run() error {
stderr, err := t.rsync.StderrPipe()
if err != nil {
return err
}
defer stderr.Close()
stdout, err := t.rsync.StdoutPipe()
if err != nil {
return err
}
defer stdout.Close()
go processStdout(t, stdout)
go processStderr(t, stderr)
return t.rsync.Run()
}
// NewTask returns new rsync task
func NewTask(source, destination string, rsyncOptions RsyncOptions) *Task {
// Force set required options
rsyncOptions.HumanReadable = true
rsyncOptions.Partial = true
rsyncOptions.Progress = true
rsyncOptions.Archive = true
return &Task{
rsync: NewRsync(source, destination, rsyncOptions),
state: &State{},
log: &Log{},
}
}
func processStdout(task *Task, stdout io.Reader) {
const maxPercents = float64(100)
const minDivider = 1
progressMatcher := newMatcher(`\(.+-chk=(\d+.\d+)`)
speedMatcher := newMatcher(`(\d+\.\d+.{2}\/s)`)
// Extract data from strings:
// 999,999 99% 999.99kB/s 0:00:59 (xfr#9, to-chk=999/9999)
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
logStr := scanner.Text()
if progressMatcher.Match(logStr) {
task.state.Remain, task.state.Total = getTaskProgress(progressMatcher.Extract(logStr))
copiedCount := float64(task.state.Total - task.state.Remain)
task.state.Progress = copiedCount / math.Max(float64(task.state.Total), float64(minDivider)) * maxPercents
}
if speedMatcher.Match(logStr) {
task.state.Speed = getTaskSpeed(speedMatcher.ExtractAllStringSubmatch(logStr, 2))
}
task.log.Stdout += logStr + "\n"
}
}
func processStderr(task *Task, stderr io.Reader) {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
task.log.Stderr += scanner.Text() + "\n"
}
}
func getTaskProgress(remTotalString string) (int, int) {
const remTotalSeparator = "/"
const numbersCount = 2
const (
indexRem = iota
indexTotal
)
info := strings.Split(remTotalString, remTotalSeparator)
if len(info) < numbersCount {
return 0, 0
}
remain, _ := strconv.Atoi(info[indexRem])
total, _ := strconv.Atoi(info[indexTotal])
return remain, total
}
func getTaskSpeed(data [][]string) string {
if len(data) < 2 || len(data[1]) < 2 {
return ""
}
return data[1][1]
}