VictoriaMetrics/app/vmctl/prometheus.go
Dmytro Kozlov 488c34f5e1
vmctl: fixed blocking when aborting import process (#2509)
vmctl: fix vmctl blocking on process interrupt

This change prevents vmctl from indefinite blocking on
receiving the interrupt signal. The update touches all
import modes and suppose to improve tool reliability.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2491
2022-05-03 07:03:41 +02:00

145 lines
3.3 KiB
Go

package main
import (
"fmt"
"log"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/prometheus/prometheus/tsdb"
)
type prometheusProcessor struct {
// prometheus client fetches and reads
// snapshot blocks
cl *prometheus.Client
// importer performs import requests
// for timeseries data returned from
// snapshot blocks
im *vm.Importer
// cc stands for concurrency
// and defines number of concurrently
// running snapshot block readers
cc int
}
func (pp *prometheusProcessor) run(silent, verbose bool) error {
blocks, err := pp.cl.Explore()
if err != nil {
return fmt.Errorf("explore failed: %s", err)
}
if len(blocks) < 1 {
return fmt.Errorf("found no blocks to import")
}
question := fmt.Sprintf("Found %d blocks to import. Continue?", len(blocks))
if !silent && !prompt(question) {
return nil
}
bar := barpool.AddWithTemplate(fmt.Sprintf(barTpl, "Processing blocks"), len(blocks))
if err := barpool.Start(); err != nil {
return err
}
blockReadersCh := make(chan tsdb.BlockReader)
errCh := make(chan error, pp.cc)
pp.im.ResetStats()
var wg sync.WaitGroup
wg.Add(pp.cc)
for i := 0; i < pp.cc; i++ {
go func() {
defer wg.Done()
for br := range blockReadersCh {
if err := pp.do(br); err != nil {
errCh <- fmt.Errorf("read failed for block %q: %s", br.Meta().ULID, err)
return
}
bar.Increment()
}
}()
}
// any error breaks the import
for _, br := range blocks {
select {
case promErr := <-errCh:
close(blockReadersCh)
return fmt.Errorf("prometheus error: %s", promErr)
case vmErr := <-pp.im.Errors():
close(blockReadersCh)
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
case blockReadersCh <- br:
}
}
close(blockReadersCh)
wg.Wait()
// wait for all buffers to flush
pp.im.Close()
close(errCh)
// drain import errors channel
for vmErr := range pp.im.Errors() {
if vmErr.Err != nil {
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
}
}
for err := range errCh {
return fmt.Errorf("import process failed: %s", err)
}
barpool.Stop()
log.Println("Import finished!")
log.Print(pp.im.Stats())
return nil
}
func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
ss, err := pp.cl.Read(b)
if err != nil {
return fmt.Errorf("failed to read block: %s", err)
}
for ss.Next() {
var name string
var labels []vm.LabelPair
series := ss.At()
for _, label := range series.Labels() {
if label.Name == "__name__" {
name = label.Value
continue
}
labels = append(labels, vm.LabelPair{
Name: label.Name,
Value: label.Value,
})
}
if name == "" {
return fmt.Errorf("failed to find `__name__` label in labelset for block %v", b.Meta().ULID)
}
var timestamps []int64
var values []float64
it := series.Iterator()
for it.Next() {
t, v := it.At()
timestamps = append(timestamps, t)
values = append(values, v)
}
if err := it.Err(); err != nil {
return err
}
ts := vm.TimeSeries{
Name: name,
LabelPairs: labels,
Timestamps: timestamps,
Values: values,
}
if err := pp.im.Input(&ts); err != nil {
return err
}
}
return ss.Err()
}