app/vmctl: break explore phase in vm-native mode by time intervals

When `--vm-native-step-interval` is specified, explore phase will be executed
within specified intervals. Discovered metric names will be associated with
time intervals at which they were discovered. This suppose to reduce number
of requests vmctl makes per metric name since it will skip time intervals
when metric name didn't exist.

This should also reduce probability of exceeding complexity limits
for number of selected series in one request during explore phase.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5369
Signed-off-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit 5f8b91186a)
This commit is contained in:
Dmytro Kozlov 2024-03-18 12:18:32 +01:00 committed by hagen1778
parent c0659800d5
commit 9817ecdcc9
No known key found for this signature in database
GPG Key ID: 3BF75F3741CA9640
5 changed files with 65 additions and 41 deletions

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/auth"
)
@ -34,7 +35,7 @@ type Response struct {
}
// Explore finds metric names by provided filter from api/v1/label/__name__/values
func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]string, error) {
func (c *Client) Explore(ctx context.Context, f Filter, tenantID string, start, end time.Time) ([]string, error) {
url := fmt.Sprintf("%s/%s", c.Addr, nativeMetricNamesAddr)
if tenantID != "" {
url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeMetricNamesAddr)
@ -45,12 +46,8 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]stri
}
params := req.URL.Query()
if f.TimeStart != "" {
params.Set("start", f.TimeStart)
}
if f.TimeEnd != "" {
params.Set("end", f.TimeEnd)
}
params.Set("start", start.Format(time.RFC3339))
params.Set("end", end.Format(time.RFC3339))
params.Set("match[]", f.Match)
req.URL.RawQuery = params.Encode()
@ -63,11 +60,7 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]stri
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, fmt.Errorf("cannot decode series response: %s", err)
}
if err := resp.Body.Close(); err != nil {
return nil, fmt.Errorf("cannot close series response body: %s", err)
}
return response.MetricNames, nil
return response.MetricNames, resp.Body.Close()
}
// ImportPipe uses pipe reader in request to process data

View File

@ -1,4 +1,4 @@
package main
package utils
import (
"fmt"
@ -13,7 +13,9 @@ const (
maxTimeMsecs = int64(1<<63-1) / 1e6
)
func parseTime(s string) (time.Time, error) {
// ParseTime parses time in s string and returns time.Time object
// if parse correctly or error if not
func ParseTime(s string) (time.Time, error) {
msecs, err := promutils.ParseTimeMsec(s)
if err != nil {
return time.Time{}, fmt.Errorf("cannot parse %s: %w", s, err)

View File

@ -1,4 +1,4 @@
package main
package utils
import (
"testing"
@ -165,7 +165,7 @@ func TestGetTime(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseTime(tt.s)
got, err := ParseTime(tt.s)
if (err != nil) != tt.wantErr {
t.Errorf("ParseTime() error = %v, wantErr %v", err, tt.wantErr)
return

View File

@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/limiter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -53,14 +54,14 @@ func (p *vmNativeProcessor) run(ctx context.Context) error {
startTime: time.Now(),
}
start, err := parseTime(p.filter.TimeStart)
start, err := utils.ParseTime(p.filter.TimeStart)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeStart, p.filter.TimeStart, err)
}
end := time.Now().In(start.Location())
if p.filter.TimeEnd != "" {
end, err = parseTime(p.filter.TimeEnd)
end, err = utils.ParseTime(p.filter.TimeEnd)
if err != nil {
return fmt.Errorf("failed to parse %s, provided: %s, error: %w", vmNativeFilterTimeEnd, p.filter.TimeEnd, err)
}
@ -174,28 +175,29 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
dstURL = fmt.Sprintf("%s/insert/%s/prometheus/%s", p.dst.Addr, tenantID, importAddr)
}
barPrefix := "Requests to make"
initMessage := "Initing import process from %q to %q with filter %s"
initParams := []interface{}{srcURL, dstURL, p.filter.String()}
if p.interCluster {
barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID)
initMessage = "Initing import process from %q to %q with filter %s for tenant %s"
initParams = []interface{}{srcURL, dstURL, p.filter.String(), tenantID}
}
fmt.Println("") // extra line for better output formatting
log.Printf(initMessage, initParams...)
if len(ranges) > 1 {
log.Printf("Selected time range will be split into %d ranges according to %q step", len(ranges), p.filter.Chunk)
}
var foundSeriesMsg string
metrics := []string{p.filter.Match}
var requestsToMake int
var metrics = map[string][][]time.Time{
"": ranges,
}
if !p.disablePerMetricRequests {
log.Printf("Exploring metrics...")
metrics, err = p.src.Explore(ctx, p.filter, tenantID)
metrics, err = p.explore(ctx, p.src, tenantID, ranges, silent)
if err != nil {
return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err)
return fmt.Errorf("failed to explore metric names: %s", err)
}
if len(metrics) == 0 {
errMsg := "no metrics found"
if tenantID != "" {
@ -204,7 +206,10 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
log.Println(errMsg)
return nil
}
foundSeriesMsg = fmt.Sprintf("Found %d metrics to import", len(metrics))
for _, m := range metrics {
requestsToMake += len(m)
}
foundSeriesMsg = fmt.Sprintf("Found %d unique metric names to import. Total import/export requests to make %d", len(metrics), requestsToMake)
}
if !p.interCluster {
@ -218,15 +223,13 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
log.Print(foundSeriesMsg)
}
processingMsg := fmt.Sprintf("Requests to make: %d", len(metrics)*len(ranges))
if len(ranges) > 1 {
processingMsg = fmt.Sprintf("Selected time range will be split into %d ranges according to %q step. %s", len(ranges), p.filter.Chunk, processingMsg)
}
log.Print(processingMsg)
var bar *pb.ProgressBar
barPrefix := "Requests to make"
if p.interCluster {
barPrefix = fmt.Sprintf("Requests to make for tenant %s", tenantID)
}
if !silent {
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), len(metrics)*len(ranges))
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, barPrefix), requestsToMake)
if p.disablePerMetricRequests {
bar = barpool.NewSingleProgress(nativeSingleProcessTpl, 0)
}
@ -262,20 +265,19 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
}
// any error breaks the import
for _, s := range metrics {
match, err := buildMatchWithFilter(p.filter.Match, s)
for mName, mRanges := range metrics {
match, err := buildMatchWithFilter(p.filter.Match, mName)
if err != nil {
logger.Errorf("failed to build export filters: %s", err)
logger.Errorf("failed to build filter %q for metric name %q: %s", p.filter.Match, mName, err)
continue
}
for _, times := range ranges {
for _, times := range mRanges {
select {
case <-ctx.Done():
return fmt.Errorf("context canceled")
case infErr := <-errCh:
return fmt.Errorf("native error: %s", infErr)
return fmt.Errorf("export/import error: %s", infErr)
case filterCh <- native.Filter{
Match: match,
TimeStart: times[0].Format(time.RFC3339),
@ -296,6 +298,32 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
return nil
}
func (p *vmNativeProcessor) explore(ctx context.Context, src *native.Client, tenantID string, ranges [][]time.Time, silent bool) (map[string][][]time.Time, error) {
log.Printf("Exploring metrics...")
var bar *pb.ProgressBar
if !silent {
bar = barpool.NewSingleProgress(fmt.Sprintf(nativeWithBackoffTpl, "Explore requests to make"), len(ranges))
bar.Start()
defer bar.Finish()
}
metrics := make(map[string][][]time.Time)
for _, r := range ranges {
ms, err := src.Explore(ctx, p.filter, tenantID, r[0], r[1])
if err != nil {
return nil, fmt.Errorf("cannot get metrics from %s on interval %v-%v: %w", src.Addr, r[0], r[1], err)
}
for i := range ms {
metrics[ms[i]] = append(metrics[ms[i]], r)
}
if bar != nil {
bar.Increment()
}
}
return metrics, nil
}
// stats represents client statistic
// when processing data
type stats struct {

View File

@ -942,7 +942,8 @@ To use this mode you need to set `--vm-intercluster` flag to `true`, `--vm-nativ
--vm-native-filter-match='{__name__="vm_app_uptime_seconds"}' \
--vm-native-filter-time-start='2023-02-01T00:00:00Z' \
--vm-native-step-interval=day \
--vm-intercluster
--vm-intercluster
VictoriaMetrics Native import mode
2023/02/28 10:41:42 Discovering tenants...
2023/02/28 10:41:42 The following tenants were discovered: [0:0 1:0 2:0 3:0 4:0]