VictoriaMetrics/app/vmctl/prometheus.go

132 lines
2.9 KiB
Go
Raw Normal View History

package main
import (
"fmt"
"log"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/cheggaaa/pb/v3"
"github.com/prometheus/prometheus/tsdb"
)
type prometheusProcessor struct {
// prometheus client fetches and reads
// snapshot blocks
cl *prometheus.Client
// importer performs import requests
// for timeseries data returned from
// snapshot blocks
im *vm.Importer
// cc stands for concurrency
// and defines number of concurrently
// running snapshot block readers
cc int
}
func (pp *prometheusProcessor) run(silent bool) error {
blocks, err := pp.cl.Explore()
if err != nil {
return fmt.Errorf("explore failed: %s", err)
}
if len(blocks) < 1 {
return fmt.Errorf("found no blocks to import")
}
question := fmt.Sprintf("Found %d blocks to import. Continue?", len(blocks))
if !silent && !prompt(question) {
return nil
}
bar := pb.StartNew(len(blocks))
blockReadersCh := make(chan tsdb.BlockReader)
errCh := make(chan error, pp.cc)
pp.im.ResetStats()
var wg sync.WaitGroup
wg.Add(pp.cc)
for i := 0; i < pp.cc; i++ {
go func() {
defer wg.Done()
for br := range blockReadersCh {
if err := pp.do(br); err != nil {
errCh <- fmt.Errorf("read failed for block %q: %s", br.Meta().ULID, err)
return
}
bar.Increment()
}
}()
}
// any error breaks the import
for _, br := range blocks {
select {
case promErr := <-errCh:
close(blockReadersCh)
return fmt.Errorf("prometheus error: %s", promErr)
case vmErr := <-pp.im.Errors():
close(blockReadersCh)
return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
case blockReadersCh <- br:
}
}
close(blockReadersCh)
wg.Wait()
// wait for all buffers to flush
pp.im.Close()
// drain import errors channel
for vmErr := range pp.im.Errors() {
return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
}
bar.Finish()
log.Println("Import finished!")
log.Print(pp.im.Stats())
return nil
}
func (pp *prometheusProcessor) do(b tsdb.BlockReader) error {
ss, err := pp.cl.Read(b)
if err != nil {
return fmt.Errorf("failed to read block: %s", err)
}
for ss.Next() {
var name string
var labels []vm.LabelPair
series := ss.At()
for _, label := range series.Labels() {
if label.Name == "__name__" {
name = label.Value
continue
}
labels = append(labels, vm.LabelPair{
Name: label.Name,
Value: label.Value,
})
}
if name == "" {
return fmt.Errorf("failed to find `__name__` label in labelset for block %v", b.Meta().ULID)
}
var timestamps []int64
var values []float64
it := series.Iterator()
for it.Next() {
t, v := it.At()
timestamps = append(timestamps, t)
values = append(values, v)
}
if err := it.Err(); err != nil {
return err
}
pp.im.Input() <- &vm.TimeSeries{
Name: name,
LabelPairs: labels,
Timestamps: timestamps,
Values: values,
}
}
return ss.Err()
}