app/vmselect: add -search.latencyOffset flag for tuning the time after data collection when data points become visible in query results

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/218
This commit is contained in:
Aliaksandr Valialkin 2019-10-28 12:30:50 +02:00
parent 121be98325
commit 4e6bf6f538

View File

@ -23,6 +23,8 @@ import (
) )
var ( var (
latencyOffset = flag.Duration("search.latencyOffset", time.Second*60, "The time when data points become visible in query results after the colection. "+
"Too small value can result in incomplete last points for query results")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution") maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution")
maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes") maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to `-search.lookback-delta` from Prometheus. "+ maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to `-search.lookback-delta` from Prometheus. "+
@ -34,10 +36,6 @@ var (
// Default step used if not set. // Default step used if not set.
const defaultStep = 5 * 60 * 1000 const defaultStep = 5 * 60 * 1000
// Latency for data processing pipeline, i.e. the time between data is ignested
// into the system and the time it becomes visible to search.
const latencyOffset = 60 * 1000
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/ // FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now() startTime := time.Now()
@ -542,7 +540,8 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error
if err != nil { if err != nil {
return err return err
} }
step, err := getDuration(r, "step", latencyOffset) queryOffset := getLatencyOffsetMilliseconds()
step, err := getDuration(r, "step", queryOffset)
if err != nil { if err != nil {
return err return err
} }
@ -555,8 +554,8 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error
if len(query) > *maxQueryLen { if len(query) > *maxQueryLen {
return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen) return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen)
} }
if ct-start < latencyOffset { if ct-start < queryOffset {
start -= latencyOffset start -= queryOffset
} }
if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" { if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" {
var window int64 var window int64
@ -667,7 +666,8 @@ func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) e
if err != nil { if err != nil {
return fmt.Errorf("cannot execute %q: %s", query, err) return fmt.Errorf("cannot execute %q: %s", query, err)
} }
if ct-end < latencyOffset { queryOffset := getLatencyOffsetMilliseconds()
if ct-end < queryOffset {
result = adjustLastPoints(result) result = adjustLastPoints(result)
} }
@ -865,6 +865,14 @@ func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error)
return tagFilterss, nil return tagFilterss, nil
} }
func getLatencyOffsetMilliseconds() int64 {
d := int64(*latencyOffset / time.Millisecond)
if d <= 1000 {
d = 1000
}
return d
}
func getDenyPartialResponse(r *http.Request) bool { func getDenyPartialResponse(r *http.Request) bool {
if *denyPartialResponse { if *denyPartialResponse {
return true return true