app/vmselect: add -search.latencyOffset flag for tuning the time after data collection when data points become visible in query results

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/218
This commit is contained in:
Aliaksandr Valialkin 2019-10-28 12:30:50 +02:00
parent 3cea200309
commit b0295dbf2e

View File

@ -21,6 +21,8 @@ import (
)
var (
latencyOffset = flag.Duration("search.latencyOffset", time.Second*60, "The time when data points become visible in query results after the colection. "+
"Too small value can result in incomplete last points for query results")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution")
maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to `-search.lookback-delta` from Prometheus. "+
@ -30,10 +32,6 @@ var (
// Default step used if not set.
const defaultStep = 5 * 60 * 1000
// Latency for data processing pipeline, i.e. the time between data is ignested
// into the system and the time it becomes visible to search.
const latencyOffset = 60 * 1000
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
func FederateHandler(w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
@ -468,7 +466,8 @@ func QueryHandler(w http.ResponseWriter, r *http.Request) error {
if err != nil {
return err
}
step, err := getDuration(r, "step", latencyOffset)
queryOffset := getLatencyOffsetMilliseconds()
step, err := getDuration(r, "step", queryOffset)
if err != nil {
return err
}
@ -481,8 +480,8 @@ func QueryHandler(w http.ResponseWriter, r *http.Request) error {
if len(query) > *maxQueryLen {
return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen)
}
if ct-start < latencyOffset {
start -= latencyOffset
if ct-start < queryOffset {
start -= queryOffset
}
if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" {
var window int64
@ -587,7 +586,8 @@ func QueryRangeHandler(w http.ResponseWriter, r *http.Request) error {
if err != nil {
return fmt.Errorf("cannot execute %q: %s", query, err)
}
if ct-end < latencyOffset {
queryOffset := getLatencyOffsetMilliseconds()
if ct-end < queryOffset {
result = adjustLastPoints(result)
}
@ -784,3 +784,11 @@ func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error)
}
return tagFilterss, nil
}
func getLatencyOffsetMilliseconds() int64 {
d := int64(*latencyOffset / time.Millisecond)
if d <= 1000 {
d = 1000
}
return d
}