VictoriaMetrics/app/vmctl/influx.go
Roman Khavronenko ac47733044
vmctl: improve logging during import cancels/errors (#2006)
On import process interruption `vmctl` now prints the max and min timestamps of:
* last failed batch if import ended with error;
* last sent batch if import was cancelled by user.

To get more details for each timeseries in batch user needs to specify `--verbose` flag.

The change does not relate to `vm-native` mode, since `vmctl` has no control over
transferred data in this mode.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1236
Signed-off-by: hagen1778 <roman@victoriametrics.com>
2022-01-03 21:12:01 +02:00

149 lines
2.9 KiB
Go

package main
import (
"fmt"
"io"
"log"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/cheggaaa/pb/v3"
)
type influxProcessor struct {
ic *influx.Client
im *vm.Importer
cc int
separator string
}
func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string) *influxProcessor {
if cc < 1 {
cc = 1
}
return &influxProcessor{
ic: ic,
im: im,
cc: cc,
separator: separator,
}
}
func (ip *influxProcessor) run(silent, verbose bool) error {
series, err := ip.ic.Explore()
if err != nil {
return fmt.Errorf("explore query failed: %s", err)
}
if len(series) < 1 {
return fmt.Errorf("found no timeseries to import")
}
question := fmt.Sprintf("Found %d timeseries to import. Continue?", len(series))
if !silent && !prompt(question) {
return nil
}
bar := pb.StartNew(len(series))
seriesCh := make(chan *influx.Series)
errCh := make(chan error)
ip.im.ResetStats()
var wg sync.WaitGroup
wg.Add(ip.cc)
for i := 0; i < ip.cc; i++ {
go func() {
defer wg.Done()
for s := range seriesCh {
if err := ip.do(s); err != nil {
errCh <- fmt.Errorf("request failed for %q.%q: %s", s.Measurement, s.Field, err)
return
}
bar.Increment()
}
}()
}
// any error breaks the import
for _, s := range series {
select {
case infErr := <-errCh:
return fmt.Errorf("influx error: %s", infErr)
case vmErr := <-ip.im.Errors():
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
case seriesCh <- s:
}
}
close(seriesCh)
wg.Wait()
ip.im.Close()
// drain import errors channel
for vmErr := range ip.im.Errors() {
if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
}
}
bar.Finish()
log.Println("Import finished!")
log.Print(ip.im.Stats())
return nil
}
const dbLabel = "db"
func (ip *influxProcessor) do(s *influx.Series) error {
cr, err := ip.ic.FetchDataPoints(s)
if err != nil {
return fmt.Errorf("failed to fetch datapoints: %s", err)
}
defer func() {
_ = cr.Close()
}()
var name string
if s.Measurement != "" {
name = fmt.Sprintf("%s%s%s", s.Measurement, ip.separator, s.Field)
} else {
name = s.Field
}
labels := make([]vm.LabelPair, len(s.LabelPairs))
var containsDBLabel bool
for i, lp := range s.LabelPairs {
if lp.Name == dbLabel {
containsDBLabel = true
break
}
labels[i] = vm.LabelPair{
Name: lp.Name,
Value: lp.Value,
}
}
if !containsDBLabel {
labels = append(labels, vm.LabelPair{
Name: dbLabel,
Value: ip.ic.Database(),
})
}
for {
time, values, err := cr.Next()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
// skip empty results
if len(time) < 1 {
continue
}
ip.im.Input() <- &vm.TimeSeries{
Name: name,
LabelPairs: labels,
Timestamps: time,
Values: values,
}
}
}