mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
b9ff61811d
vmctl: fix vmctl blocking on process interrupt This change prevents vmctl from indefinite blocking on receiving the interrupt signal. The update touches all import modes and suppose to improve tool reliability. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2491
171 lines
4.7 KiB
Go
171 lines
4.7 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/opentsdb"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
|
"github.com/cheggaaa/pb/v3"
|
|
)
|
|
|
|
type otsdbProcessor struct {
|
|
oc *opentsdb.Client
|
|
im *vm.Importer
|
|
otsdbcc int
|
|
}
|
|
|
|
type queryObj struct {
|
|
Series opentsdb.Meta
|
|
Rt opentsdb.RetentionMeta
|
|
Tr opentsdb.TimeRange
|
|
StartTime int64
|
|
}
|
|
|
|
func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int) *otsdbProcessor {
|
|
if otsdbcc < 1 {
|
|
otsdbcc = 1
|
|
}
|
|
return &otsdbProcessor{
|
|
oc: oc,
|
|
im: im,
|
|
otsdbcc: otsdbcc,
|
|
}
|
|
}
|
|
|
|
func (op *otsdbProcessor) run(silent, verbose bool) error {
|
|
log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters)
|
|
var metrics []string
|
|
for _, filter := range op.oc.Filters {
|
|
q := fmt.Sprintf("%s/api/suggest?type=metrics&q=%s&max=%d", op.oc.Addr, filter, op.oc.Limit)
|
|
m, err := op.oc.FindMetrics(q)
|
|
if err != nil {
|
|
return fmt.Errorf("metric discovery failed for %q: %s", q, err)
|
|
}
|
|
metrics = append(metrics, m...)
|
|
}
|
|
if len(metrics) < 1 {
|
|
return fmt.Errorf("found no timeseries to import with filters %q", op.oc.Filters)
|
|
}
|
|
|
|
question := fmt.Sprintf("Found %d metrics to import. Continue?", len(metrics))
|
|
if !silent && !prompt(question) {
|
|
return nil
|
|
}
|
|
op.im.ResetStats()
|
|
var startTime int64
|
|
if op.oc.HardTS != 0 {
|
|
startTime = op.oc.HardTS
|
|
} else {
|
|
startTime = time.Now().Unix()
|
|
}
|
|
queryRanges := 0
|
|
// pre-calculate the number of query ranges we'll be processing
|
|
for _, rt := range op.oc.Retentions {
|
|
queryRanges += len(rt.QueryRanges)
|
|
}
|
|
for _, metric := range metrics {
|
|
log.Println(fmt.Sprintf("Starting work on %s", metric))
|
|
serieslist, err := op.oc.FindSeries(metric)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't retrieve series list for %s : %s", metric, err)
|
|
}
|
|
/*
|
|
Create channels for collecting/processing series and errors
|
|
We'll create them per metric to reduce pressure against OpenTSDB
|
|
|
|
Limit the size of seriesCh so we can't get too far ahead of actual processing
|
|
*/
|
|
seriesCh := make(chan queryObj, op.otsdbcc)
|
|
errCh := make(chan error)
|
|
// we're going to make serieslist * queryRanges queries, so we should represent that in the progress bar
|
|
bar := pb.StartNew(len(serieslist) * queryRanges)
|
|
var wg sync.WaitGroup
|
|
wg.Add(op.otsdbcc)
|
|
for i := 0; i < op.otsdbcc; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for s := range seriesCh {
|
|
if err := op.do(s); err != nil {
|
|
errCh <- fmt.Errorf("couldn't retrieve series for %s : %s", metric, err)
|
|
return
|
|
}
|
|
bar.Increment()
|
|
}
|
|
}()
|
|
}
|
|
/*
|
|
Loop through all series for this metric, processing all retentions and time ranges
|
|
requested. This loop is our primary "collect data from OpenTSDB loop" and should
|
|
be async, sending data to VictoriaMetrics over time.
|
|
|
|
The idea with having the select at the inner-most loop is to ensure quick
|
|
short-circuiting on error.
|
|
*/
|
|
for _, series := range serieslist {
|
|
for _, rt := range op.oc.Retentions {
|
|
for _, tr := range rt.QueryRanges {
|
|
select {
|
|
case otsdbErr := <-errCh:
|
|
return fmt.Errorf("opentsdb error: %s", otsdbErr)
|
|
case vmErr := <-op.im.Errors():
|
|
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
|
|
case seriesCh <- queryObj{
|
|
Tr: tr, StartTime: startTime,
|
|
Series: series, Rt: opentsdb.RetentionMeta{
|
|
FirstOrder: rt.FirstOrder, SecondOrder: rt.SecondOrder, AggTime: rt.AggTime}}:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Drain channels per metric
|
|
close(seriesCh)
|
|
wg.Wait()
|
|
close(errCh)
|
|
// check for any lingering errors on the query side
|
|
for otsdbErr := range errCh {
|
|
return fmt.Errorf("Import process failed: \n%s", otsdbErr)
|
|
}
|
|
bar.Finish()
|
|
log.Print(op.im.Stats())
|
|
}
|
|
op.im.Close()
|
|
for vmErr := range op.im.Errors() {
|
|
if vmErr.Err != nil {
|
|
return fmt.Errorf("import process failed: %s", wrapErr(vmErr, verbose))
|
|
}
|
|
}
|
|
log.Println("Import finished!")
|
|
log.Print(op.im.Stats())
|
|
return nil
|
|
}
|
|
|
|
func (op *otsdbProcessor) do(s queryObj) error {
|
|
start := s.StartTime - s.Tr.Start
|
|
end := s.StartTime - s.Tr.End
|
|
data, err := op.oc.GetData(s.Series, s.Rt, start, end, op.oc.MsecsTime)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to collect data for %v in %v:%v :: %v", s.Series, s.Rt, s.Tr, err)
|
|
}
|
|
if len(data.Timestamps) < 1 || len(data.Values) < 1 {
|
|
return nil
|
|
}
|
|
labels := make([]vm.LabelPair, len(data.Tags))
|
|
for k, v := range data.Tags {
|
|
labels = append(labels, vm.LabelPair{Name: k, Value: v})
|
|
}
|
|
ts := vm.TimeSeries{
|
|
Name: data.Metric,
|
|
LabelPairs: labels,
|
|
Timestamps: data.Timestamps,
|
|
Values: data.Values,
|
|
}
|
|
if err := op.im.Input(&ts); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|