app/vmselect/promql: return matrix instead of vector on subqueries to /api/v1/query like Prometheus does

This commit is contained in:
Aliaksandr Valialkin 2019-12-11 00:42:44 +02:00
parent 5d2ff573aa
commit f79b61e2a1
2 changed files with 69 additions and 15 deletions

View File

@ -560,21 +560,13 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error
start = ct - queryOffset start = ct - queryOffset
} }
if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" { if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" {
var window int64 window, err := parsePositiveDuration(windowStr, step)
if len(windowStr) > 0 {
var err error
window, err = promql.PositiveDurationValue(windowStr, step)
if err != nil { if err != nil {
return err return fmt.Errorf("cannot parse window: %s", err)
} }
} offset, err := parseDuration(offsetStr, step)
var offset int64
if len(offsetStr) > 0 {
var err error
offset, err = promql.DurationValue(offsetStr, step)
if err != nil { if err != nil {
return err return fmt.Errorf("cannot parse offset: %s", err)
}
} }
start -= offset start -= offset
end := start end := start
@ -585,6 +577,31 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error
queryDuration.UpdateDuration(startTime) queryDuration.UpdateDuration(startTime)
return nil return nil
} }
if childQuery, windowStr, stepStr, offsetStr := promql.IsRollup(query); childQuery != "" {
newStep, err := parsePositiveDuration(stepStr, step)
if err != nil {
return fmt.Errorf("cannot parse step: %s", err)
}
if newStep > 0 {
step = newStep
}
window, err := parsePositiveDuration(windowStr, step)
if err != nil {
return fmt.Errorf("cannot parse window: %s", err)
}
offset, err := parseDuration(offsetStr, step)
if err != nil {
return fmt.Errorf("cannot parse offset: %s", err)
}
start -= offset
end := start
start = end - window
if err := queryRangeHandler(at, w, childQuery, start, end, step, r, ct); err != nil {
return err
}
queryDuration.UpdateDuration(startTime)
return nil
}
ec := promql.EvalConfig{ ec := promql.EvalConfig{
AuthToken: at, AuthToken: at,
@ -609,6 +626,20 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error
var queryDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/query"}`) var queryDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/query"}`)
func parseDuration(s string, step int64) (int64, error) {
if len(s) == 0 {
return 0, nil
}
return promql.DurationValue(s, step)
}
func parsePositiveDuration(s string, step int64) (int64, error) {
if len(s) == 0 {
return 0, nil
}
return promql.PositiveDurationValue(s, step)
}
// QueryRangeHandler processes /api/v1/query_range request. // QueryRangeHandler processes /api/v1/query_range request.
// //
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries // See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
@ -632,6 +663,14 @@ func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) e
if err != nil { if err != nil {
return err return err
} }
if err := queryRangeHandler(at, w, query, start, end, step, r, ct); err != nil {
return err
}
queryRangeDuration.UpdateDuration(startTime)
return nil
}
func queryRangeHandler(at *auth.Token, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64) error {
deadline := getDeadline(r) deadline := getDeadline(r)
mayCache := !getBool(r, "nocache") mayCache := !getBool(r, "nocache")
lookbackDelta, err := getMaxLookback(r) lookbackDelta, err := getMaxLookback(r)
@ -679,7 +718,6 @@ func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) e
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
WriteQueryRangeResponse(w, result) WriteQueryRangeResponse(w, result)
queryRangeDuration.UpdateDuration(startTime)
return nil return nil
} }

View File

@ -1290,6 +1290,22 @@ func (p *parser) parseIdentExpr() (expr, error) {
} }
} }
// IsRollup verifies whether s is a rollup with non-empty window.
//
// It returns the wrapped query with the corresponding window, step and offset.
func IsRollup(s string) (childQuery string, window, step, offset string) {
expr, err := parsePromQLWithCache(s)
if err != nil {
return
}
re, ok := expr.(*rollupExpr)
if !ok || len(re.Window) == 0 {
return
}
wrappedQuery := re.Expr.AppendString(nil)
return string(wrappedQuery), re.Window, re.Step, re.Offset
}
// IsMetricSelectorWithRollup verifies whether s contains PromQL metric selector // IsMetricSelectorWithRollup verifies whether s contains PromQL metric selector
// wrapped into rollup. // wrapped into rollup.
// //