From 4e6bf6f538e6b3e9da8945fe79a52cdc20c3f401 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 28 Oct 2019 12:30:50 +0200 Subject: [PATCH] app/vmselect: add `-search.latencyOffset` flag for tuning the time after data collection when data points become visible in query results Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/218 --- app/vmselect/prometheus/prometheus.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index e2f9b8d8e1..3284293214 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -23,6 +23,8 @@ import ( ) var ( + latencyOffset = flag.Duration("search.latencyOffset", time.Second*60, "The time when data points become visible in query results after the colection. "+ + "Too small value can result in incomplete last points for query results") maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution") maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes") maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to `-search.lookback-delta` from Prometheus. "+ @@ -34,10 +36,6 @@ var ( // Default step used if not set. const defaultStep = 5 * 60 * 1000 -// Latency for data processing pipeline, i.e. the time between data is ignested -// into the system and the time it becomes visible to search. -const latencyOffset = 60 * 1000 - // FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/ func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { startTime := time.Now() @@ -542,7 +540,8 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error if err != nil { return err } - step, err := getDuration(r, "step", latencyOffset) + queryOffset := getLatencyOffsetMilliseconds() + step, err := getDuration(r, "step", queryOffset) if err != nil { return err } @@ -555,8 +554,8 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error if len(query) > *maxQueryLen { return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen) } - if ct-start < latencyOffset { - start -= latencyOffset + if ct-start < queryOffset { + start -= queryOffset } if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" { var window int64 @@ -667,7 +666,8 @@ func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) e if err != nil { return fmt.Errorf("cannot execute %q: %s", query, err) } - if ct-end < latencyOffset { + queryOffset := getLatencyOffsetMilliseconds() + if ct-end < queryOffset { result = adjustLastPoints(result) } @@ -865,6 +865,14 @@ func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) return tagFilterss, nil } +func getLatencyOffsetMilliseconds() int64 { + d := int64(*latencyOffset / time.Millisecond) + if d <= 1000 { + d = 1000 + } + return d +} + func getDenyPartialResponse(r *http.Request) bool { if *denyPartialResponse { return true