141 lines
3.0 KiB
Go
141 lines
3.0 KiB
Go
|
package grsync
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"io"
|
||
|
"math"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
)
|
||
|
|
||
|
// Task is high-level API under rsync
|
||
|
type Task struct {
|
||
|
rsync *Rsync
|
||
|
|
||
|
state *State
|
||
|
log *Log
|
||
|
}
|
||
|
|
||
|
// State contains information about rsync process
|
||
|
type State struct {
|
||
|
Remain int `json:"remain"`
|
||
|
Total int `json:"total"`
|
||
|
Speed string `json:"speed"`
|
||
|
Progress float64 `json:"progress"`
|
||
|
}
|
||
|
|
||
|
// Log contains raw stderr and stdout outputs
|
||
|
type Log struct {
|
||
|
Stderr string `json:"stderr"`
|
||
|
Stdout string `json:"stdout"`
|
||
|
}
|
||
|
|
||
|
// State returns inforation about rsync processing task
|
||
|
func (t Task) State() State {
|
||
|
return *t.state
|
||
|
}
|
||
|
|
||
|
// Log return structure which contains raw stderr and stdout outputs
|
||
|
func (t Task) Log() Log {
|
||
|
return Log{
|
||
|
Stderr: t.log.Stderr,
|
||
|
Stdout: t.log.Stdout,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Run starts rsync process with options
|
||
|
func (t *Task) Run() error {
|
||
|
stderr, err := t.rsync.StderrPipe()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer stderr.Close()
|
||
|
|
||
|
stdout, err := t.rsync.StdoutPipe()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer stdout.Close()
|
||
|
|
||
|
go processStdout(t, stdout)
|
||
|
go processStderr(t, stderr)
|
||
|
|
||
|
return t.rsync.Run()
|
||
|
}
|
||
|
|
||
|
// NewTask returns new rsync task
|
||
|
func NewTask(source, destination string, rsyncOptions RsyncOptions) *Task {
|
||
|
// Force set required options
|
||
|
rsyncOptions.HumanReadable = true
|
||
|
rsyncOptions.Partial = true
|
||
|
rsyncOptions.Progress = true
|
||
|
rsyncOptions.Archive = true
|
||
|
|
||
|
return &Task{
|
||
|
rsync: NewRsync(source, destination, rsyncOptions),
|
||
|
state: &State{},
|
||
|
log: &Log{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func processStdout(task *Task, stdout io.Reader) {
|
||
|
const maxPercents = float64(100)
|
||
|
const minDivider = 1
|
||
|
|
||
|
progressMatcher := newMatcher(`\(.+-chk=(\d+.\d+)`)
|
||
|
speedMatcher := newMatcher(`(\d+\.\d+.{2}\/s)`)
|
||
|
|
||
|
// Extract data from strings:
|
||
|
// 999,999 99% 999.99kB/s 0:00:59 (xfr#9, to-chk=999/9999)
|
||
|
scanner := bufio.NewScanner(stdout)
|
||
|
for scanner.Scan() {
|
||
|
logStr := scanner.Text()
|
||
|
if progressMatcher.Match(logStr) {
|
||
|
task.state.Remain, task.state.Total = getTaskProgress(progressMatcher.Extract(logStr))
|
||
|
|
||
|
copiedCount := float64(task.state.Total - task.state.Remain)
|
||
|
task.state.Progress = copiedCount / math.Max(float64(task.state.Total), float64(minDivider)) * maxPercents
|
||
|
}
|
||
|
|
||
|
if speedMatcher.Match(logStr) {
|
||
|
task.state.Speed = getTaskSpeed(speedMatcher.ExtractAllStringSubmatch(logStr, 2))
|
||
|
}
|
||
|
|
||
|
task.log.Stdout += logStr + "\n"
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func processStderr(task *Task, stderr io.Reader) {
|
||
|
scanner := bufio.NewScanner(stderr)
|
||
|
for scanner.Scan() {
|
||
|
task.log.Stderr += scanner.Text() + "\n"
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func getTaskProgress(remTotalString string) (int, int) {
|
||
|
const remTotalSeparator = "/"
|
||
|
const numbersCount = 2
|
||
|
const (
|
||
|
indexRem = iota
|
||
|
indexTotal
|
||
|
)
|
||
|
|
||
|
info := strings.Split(remTotalString, remTotalSeparator)
|
||
|
if len(info) < numbersCount {
|
||
|
return 0, 0
|
||
|
}
|
||
|
|
||
|
remain, _ := strconv.Atoi(info[indexRem])
|
||
|
total, _ := strconv.Atoi(info[indexTotal])
|
||
|
|
||
|
return remain, total
|
||
|
}
|
||
|
|
||
|
func getTaskSpeed(data [][]string) string {
|
||
|
if len(data) < 2 || len(data[1]) < 2 {
|
||
|
return ""
|
||
|
}
|
||
|
|
||
|
return data[1][1]
|
||
|
}
|