VictoriaMetrics/app/vmctl/vm_native.go
Dmytro Kozlov 3da856af0c
app/vmctl: add support for the different time format in the native binary protocol (#4189)
* app/vmctl: add support for the different time format in the native binary protocol

* app/vmctl: update flag description, update CHANGELOG.md

* app/vmctl: add comment to exported function
2023-05-08 14:21:59 -07:00

361 lines
9.3 KiB
Go

package main
import (
"context"
"fmt"
"io"
"log"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/cheggaaa/pb/v3"
)
type vmNativeProcessor struct {
filter native.Filter
dst *native.Client
src *native.Client
backoff *backoff.Backoff
s *stats
rateLimit int64
interCluster bool
cc int
disableRetries bool
}
const (
nativeExportAddr = "api/v1/export/native"
nativeImportAddr = "api/v1/import/native"
nativeWithBackoffTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}`
nativeSingleProcessTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
)
func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error {
if p.cc == 0 {
p.cc = 1
}
p.s = &stats{
startTime: time.Now(),
}
start, err := utils.GetTime(p.filter.TimeStart)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err)
}
end := time.Now().In(start.Location())
if p.filter.TimeEnd != "" {
end, err = utils.GetTime(p.filter.TimeEnd)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err)
}
}
ranges := [][]time.Time{{start, end}}
if p.filter.Chunk != "" {
ranges, err = stepper.SplitDateRange(start, end, p.filter.Chunk)
if err != nil {
return fmt.Errorf("failed to create date ranges for the given time filters: %w", err)
}
}
tenants := []string{""}
if p.interCluster {
log.Printf("Discovering tenants...")
tenants, err = p.src.GetSourceTenants(ctx, p.filter)
if err != nil {
return fmt.Errorf("failed to get tenants: %w", err)
}
question := fmt.Sprintf("The following tenants were discovered: %s.\n Continue?", tenants)
if !silent && !prompt(question) {
return nil
}
}
for _, tenantID := range tenants {
err := p.runBackfilling(ctx, tenantID, ranges, silent)
if err != nil {
return fmt.Errorf("migration failed: %s", err)
}
}
log.Println("Import finished!")
log.Print(p.s)
return nil
}
func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL, bar) }
attempts, err := p.backoff.Retry(ctx, retryableFunc)
p.s.Lock()
p.s.retries += attempts
p.s.Unlock()
if err != nil {
return fmt.Errorf("failed to migrate from %s to %s (retry attempts: %d): %w\nwith fileter %s", srcURL, dstURL, attempts, err, f)
}
return nil
}
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
reader, err := p.src.ExportPipe(ctx, srcURL, f)
if err != nil {
return fmt.Errorf("failed to init export pipe: %w", err)
}
if p.disableRetries && bar != nil {
fmt.Printf("Continue import process with filter %s:\n", f.String())
reader = bar.NewProxyReader(reader)
}
pr, pw := io.Pipe()
done := make(chan struct{})
go func() {
defer func() { close(done) }()
if err := p.dst.ImportPipe(ctx, dstURL, pr); err != nil {
logger.Errorf("error initialize import pipe: %s", err)
return
}
}()
w := io.Writer(pw)
if p.rateLimit > 0 {
rl := limiter.NewLimiter(p.rateLimit)
w = limiter.NewWriteLimiter(pw, rl)
}
written, err := io.Copy(w, reader)
if err != nil {
return fmt.Errorf("failed to write into %q: %s", p.dst.Addr, err)
}
p.s.Lock()
p.s.bytes += uint64(written)
p.s.requests++
p.s.Unlock()
if err := pw.Close(); err != nil {
return err
}
<-done
return nil
}
func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string, ranges [][]time.Time, silent bool) error {
exportAddr := nativeExportAddr
srcURL := fmt.Sprintf("%s/%s", p.src.Addr, exportAddr)
importAddr, err := vm.AddExtraLabelsToImportPath(nativeImportAddr, p.dst.ExtraLabels)
if err != nil {
return fmt.Errorf("failed to add labels to import path: %s", err)
}
dstURL := fmt.Sprintf("%s/%s", p.dst.Addr, importAddr)
if p.interCluster {
srcURL = fmt.Sprintf("%s/select/%s/prometheus/%s", p.src.Addr, tenantID, exportAddr)
dstURL = fmt.Sprintf("%s/insert/%s/prometheus/%s", p.dst.Addr, tenantID, importAddr)
}
barPrefix := "Requests to make"
initMessage := "Initing import process from %q to %q with filter %s"
initParams := []interface{}{srcURL, dstURL, p.filter.String()}
if p.interCluster {
barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID)
initMessage = "Initing import process from %q to %q with filter %s for tenant %s"
initParams = []interface{}{srcURL, dstURL, p.filter.String(), tenantID}
}
fmt.Println("") // extra line for better output formatting
log.Printf(initMessage, initParams...)
var foundSeriesMsg string
metrics := []string{p.filter.Match}
if !p.disableRetries {
log.Printf("Exploring metrics...")
metrics, err = p.src.Explore(ctx, p.filter, tenantID)
if err != nil {
return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err)
}
if len(metrics) == 0 {
return fmt.Errorf("no metrics found")
}
foundSeriesMsg = fmt.Sprintf("Found %d metrics to import", len(metrics))
}
if !p.interCluster {
// do not prompt for intercluster because there could be many tenants,
// and we don't want to interrupt the process when moving to the next tenant.
question := foundSeriesMsg + ". Continue?"
if !silent && !prompt(question) {
return nil
}
} else {
log.Print(foundSeriesMsg)
}
processingMsg := fmt.Sprintf("Requests to make: %d", len(metrics)*len(ranges))
if len(ranges) > 1 {
processingMsg = fmt.Sprintf("Selected time range will be split into %d ranges according to %q step. %s", len(ranges), p.filter.Chunk, processingMsg)
}
log.Print(processingMsg)
var bar *pb.ProgressBar
if !silent {
bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeWithBackoffTpl, barPrefix)).New(len(metrics) * len(ranges))
if p.disableRetries {
bar = pb.ProgressBarTemplate(nativeSingleProcessTpl).New(0)
}
bar.Start()
defer bar.Finish()
}
filterCh := make(chan native.Filter)
errCh := make(chan error, p.cc)
var wg sync.WaitGroup
for i := 0; i < p.cc; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for f := range filterCh {
if !p.disableRetries {
if err := p.do(ctx, f, srcURL, dstURL, nil); err != nil {
errCh <- err
return
}
if bar != nil {
bar.Increment()
}
} else {
if err := p.runSingle(ctx, f, srcURL, dstURL, bar); err != nil {
errCh <- err
return
}
}
}
}()
}
// any error breaks the import
for _, s := range metrics {
match, err := buildMatchWithFilter(p.filter.Match, s)
if err != nil {
logger.Errorf("failed to build export filters: %s", err)
continue
}
for _, times := range ranges {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled")
case infErr := <-errCh:
return fmt.Errorf("native error: %s", infErr)
case filterCh <- native.Filter{
Match: match,
TimeStart: times[0].Format(time.RFC3339),
TimeEnd: times[1].Format(time.RFC3339),
}:
}
}
}
close(filterCh)
wg.Wait()
close(errCh)
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
}
return nil
}
// stats represents client statistic
// when processing data
type stats struct {
sync.Mutex
startTime time.Time
bytes uint64
requests uint64
retries uint64
}
func (s *stats) String() string {
s.Lock()
defer s.Unlock()
totalImportDuration := time.Since(s.startTime)
totalImportDurationS := totalImportDuration.Seconds()
bytesPerS := byteCountSI(0)
if s.bytes > 0 && totalImportDurationS > 0 {
bytesPerS = byteCountSI(int64(float64(s.bytes) / totalImportDurationS))
}
return fmt.Sprintf("VictoriaMetrics importer stats:\n"+
" time spent while importing: %v;\n"+
" total bytes: %s;\n"+
" bytes/s: %s;\n"+
" requests: %d;\n"+
" requests retries: %d;",
totalImportDuration,
byteCountSI(int64(s.bytes)), bytesPerS,
s.requests, s.retries)
}
func byteCountSI(b int64) string {
const unit = 1000
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB",
float64(b)/float64(div), "kMGTPE"[exp])
}
func buildMatchWithFilter(filter string, metricName string) (string, error) {
if filter == metricName {
return filter, nil
}
labels, err := searchutils.ParseMetricSelector(filter)
if err != nil {
return "", err
}
str := make([]string, 0, len(labels))
for _, label := range labels {
if len(label.Key) == 0 {
continue
}
str = append(str, label.String())
}
nameFilter := fmt.Sprintf("__name__=%q", metricName)
str = append(str, nameFilter)
match := fmt.Sprintf("{%s}", strings.Join(str, ","))
return match, nil
}