diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 9c39c7064..8264781af 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -21,6 +21,8 @@ import ( ) var ( + latencyOffset = flag.Duration("search.latencyOffset", time.Second*60, "The time when data points become visible in query results after the colection. "+ + "Too small value can result in incomplete last points for query results") maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution") maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes") maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to `-search.lookback-delta` from Prometheus. "+ @@ -30,10 +32,6 @@ var ( // Default step used if not set. const defaultStep = 5 * 60 * 1000 -// Latency for data processing pipeline, i.e. the time between data is ignested -// into the system and the time it becomes visible to search. -const latencyOffset = 60 * 1000 - // FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/ func FederateHandler(w http.ResponseWriter, r *http.Request) error { startTime := time.Now() @@ -468,7 +466,8 @@ func QueryHandler(w http.ResponseWriter, r *http.Request) error { if err != nil { return err } - step, err := getDuration(r, "step", latencyOffset) + queryOffset := getLatencyOffsetMilliseconds() + step, err := getDuration(r, "step", queryOffset) if err != nil { return err } @@ -481,8 +480,8 @@ func QueryHandler(w http.ResponseWriter, r *http.Request) error { if len(query) > *maxQueryLen { return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen) } - if ct-start < latencyOffset { - start -= latencyOffset + if ct-start < queryOffset { + start -= queryOffset } if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" { var window int64 @@ -587,7 +586,8 @@ func QueryRangeHandler(w http.ResponseWriter, r *http.Request) error { if err != nil { return fmt.Errorf("cannot execute %q: %s", query, err) } - if ct-end < latencyOffset { + queryOffset := getLatencyOffsetMilliseconds() + if ct-end < queryOffset { result = adjustLastPoints(result) } @@ -784,3 +784,11 @@ func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) } return tagFilterss, nil } + +func getLatencyOffsetMilliseconds() int64 { + d := int64(*latencyOffset / time.Millisecond) + if d <= 1000 { + d = 1000 + } + return d +}