app/vmselect/promql: optimize queries, which join on _info metrics.

Automatically add common filters from one side of binary operation
to the other side before sending the query to storage subsystem.

See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
and https://www.robustperception.io/exposing-the-software-version-to-prometheus
This commit is contained in:
Aliaksandr Valialkin 2022-01-31 19:32:36 +02:00
parent 15475a9d1f
commit e7f1ceeb84
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
7 changed files with 259 additions and 76 deletions

View File

@ -4,6 +4,8 @@ import (
"flag"
"fmt"
"math"
"regexp"
"sort"
"strings"
"sync"
@ -275,55 +277,29 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
return rv, nil
}
if be, ok := e.(*metricsql.BinaryOpExpr); ok {
// Execute left and right sides of the binary operation in parallel.
// This should reduce execution times for heavy queries.
// On the other side this can increase CPU and RAM usage when executing heavy queries.
// TODO: think on how to limit CPU and RAM usage while leaving short execution times.
var left, right []*timeseries
var mu sync.Mutex
var wg sync.WaitGroup
var errGlobal error
wg.Add(1)
go func() {
defer wg.Done()
ecCopy := newEvalConfig(ec)
tss, err := evalExpr(ecCopy, be.Left)
mu.Lock()
if err != nil {
if errGlobal == nil {
errGlobal = err
}
}
left = tss
mu.Unlock()
}()
wg.Add(1)
go func() {
defer wg.Done()
ecCopy := newEvalConfig(ec)
tss, err := evalExpr(ecCopy, be.Right)
mu.Lock()
if err != nil {
if errGlobal == nil {
errGlobal = err
}
}
right = tss
mu.Unlock()
}()
wg.Wait()
if errGlobal != nil {
return nil, errGlobal
}
bf := getBinaryOpFunc(be.Op)
if bf == nil {
return nil, fmt.Errorf(`unknown binary op %q`, be.Op)
}
var err error
var tssLeft, tssRight []*timeseries
switch be.Op {
case "and", "if":
// Fetch right-side series at first, since the left side of `and` and `if` operator
// usually contains lower number of time series. This should produce more specific label filters
// for the left side of the query. This, in turn, should reduce the time to select series
// for the left side of the query.
tssRight, tssLeft, err = execBinaryOpArgs(ec, be.Right, be.Left, be)
default:
tssLeft, tssRight, err = execBinaryOpArgs(ec, be.Left, be.Right, be)
}
if err != nil {
return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err)
}
bfa := &binaryOpFuncArg{
be: be,
left: left,
right: right,
left: tssLeft,
right: tssRight,
}
rv, err := bf(bfa)
if err != nil {
@ -348,6 +324,100 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
}
func execBinaryOpArgs(ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
// Execute binary operation in the following way:
//
// 1) execute the exprFirst
// 2) get common label filters for series returned at step 1
// 3) push down the found common label filters to exprSecond. This filters out unneeded series
// during exprSecond exection instead of spending compute resources on extracting and processing these series
// before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
// 4) execute the exprSecond with possible additional filters found at step 3
//
// Typical use cases:
// - Kubernetes-related: show pod creation time with the node name:
//
// kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
//
// Without the optimization `kube_pod_info` would select and spend compute resources
// for more time series than needed. The selected time series would be dropped later
// when matching time series on the right and left sides of binary operand.
//
// - Generic alerting queries, which rely on `info` metrics.
// See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
//
// - Queries, which get additional labels from `info` metrics.
// See https://www.robustperception.io/exposing-the-software-version-to-prometheus
tssFirst, err := evalExpr(ec, exprFirst)
if err != nil {
return nil, nil, err
}
lfs := getCommonLabelFilters(tssFirst)
lfs = metricsql.TrimFiltersByGroupModifier(lfs, be)
exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs)
tssSecond, err := evalExpr(ec, exprSecond)
if err != nil {
return nil, nil, err
}
return tssFirst, tssSecond, nil
}
func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
m := make(map[string][]string)
for _, ts := range tss {
for _, tag := range ts.MetricName.Tags {
m[string(tag.Key)] = append(m[string(tag.Key)], string(tag.Value))
}
}
lfs := make([]metricsql.LabelFilter, 0, len(m))
for key, values := range m {
if len(values) != len(tss) {
// Skip the tag, since it doesn't belong to all the time series.
continue
}
values = getUniqueValues(values)
lf := metricsql.LabelFilter{
Label: key,
}
if len(values) == 1 {
lf.Value = values[0]
} else {
lf.Value = joinRegexpValues(values)
lf.IsRegexp = true
}
lfs = append(lfs, lf)
}
sort.Slice(lfs, func(i, j int) bool {
return lfs[i].Label < lfs[j].Label
})
return lfs
}
func getUniqueValues(a []string) []string {
m := make(map[string]struct{}, len(a))
results := make([]string, 0, len(a))
for _, s := range a {
if _, ok := m[s]; !ok {
results = append(results, s)
m[s] = struct{}{}
}
}
sort.Strings(results)
return results
}
func joinRegexpValues(a []string) string {
var b []byte
for i, s := range a {
sQuoted := regexp.QuoteMeta(s)
b = append(b, sQuoted...)
if i < len(a)-1 {
b = append(b, '|')
}
}
return string(b)
}
func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) {
if len(ae.Args) != 1 {
return nil, nil

View File

@ -0,0 +1,50 @@
package promql
import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metricsql"
)
func TestGetCommonLabelFilters(t *testing.T) {
f := func(metrics string, lfsExpected string) {
t.Helper()
var tss []*timeseries
var rows prometheus.Rows
rows.UnmarshalWithErrLogger(metrics, func(errStr string) {
t.Fatalf("unexpected error when parsing %s: %s", metrics, errStr)
})
for _, row := range rows.Rows {
var tags []storage.Tag
for _, tag := range row.Tags {
tags = append(tags, storage.Tag{
Key: []byte(tag.Key),
Value: []byte(tag.Value),
})
}
var ts timeseries
ts.MetricName.Tags = tags
tss = append(tss, &ts)
}
lfs := getCommonLabelFilters(tss)
me := &metricsql.MetricExpr{
LabelFilters: lfs,
}
lfsMarshaled := me.AppendString(nil)
if string(lfsMarshaled) != lfsExpected {
t.Fatalf("unexpected common label filters;\ngot\n%s\nwant\n%s", lfsMarshaled, lfsExpected)
}
}
f(``, `{}`)
f(`m 1`, `{}`)
f(`m{a="b"} 1`, `{a="b"}`)
f(`m{c="d",a="b"} 1`, `{a="b", c="d"}`)
f(`m1{a="foo"} 1
m2{a="bar"} 1`, `{a=~"bar|foo"}`)
f(`m1{a="foo"} 1
m2{b="bar"} 1`, `{}`)
f(`m1{a="foo",b="bar"} 1
m2{b="bar",c="x"} 1`, `{b="bar"}`)
}

View File

@ -11,6 +11,7 @@ sort: 15
* Binary operations with `on()`, `without()`, `group_left()` and `group_right()` modifiers. For example, `foo{a="b"} on (a) + bar` is now optimized to `foo{a="b"} on (a) + bar{a="b"}`
* Multi-level binary operations. For example, `foo{a="b"} + bar{x="y"} + baz{z="q"}` is now optimized to `foo{a="b",x="y",z="q"} + bar{a="b",x="y",z="q"} + baz{a="b",x="y",z="q"}`
* Aggregate functions. For example, `sum(foo{a="b"}) by (c) + bar{c="d"}` is now optimized to `sum(foo{a="b",c="d"}) by (c) + bar{c="d"}`
* FEATURE [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): optimize joining with `*_info` labels. For example: `kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info` now automatically adds the needed filters on `uid` label to `kube_pod_info` before selecting series for the right side of `*` operation. This may save CPU, RAM and disk IO resources. See [this article](https://www.robustperception.io/exposing-the-software-version-to-prometheus) for details on `*_info` labels.
* BUGFIX: return proper results from `highestMax()` function at [Graphite render API](https://docs.victoriametrics.com/#graphite-render-api-usage). Previously it was incorrectly returning timeseries with min peaks instead of max peaks.
* BUGFIX: properly limit indexdb cache sizes. Previously they could exceed values set via `-memory.allowedPercent` and/or `-memory.allowedBytes` when `indexdb` contained many data parts. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007).

2
go.mod
View File

@ -10,7 +10,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.1.0
github.com/VictoriaMetrics/metrics v1.18.1
github.com/VictoriaMetrics/metricsql v0.38.0
github.com/VictoriaMetrics/metricsql v0.39.0
github.com/aws/aws-sdk-go v1.42.44
github.com/cespare/xxhash/v2 v2.1.2
github.com/cheggaaa/pb/v3 v3.0.8

4
go.sum
View File

@ -115,8 +115,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0 h1:3crd4YWHsMwu60GUXRH6OstowiFvqrwS4a
github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR2uydjiWvoLp5ZTqQ=
github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0=
github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA=
github.com/VictoriaMetrics/metricsql v0.38.0 h1:YBAzxKyr2QLFXYap8Nd0bxIr0e8mE/aUIyBYgDFMpK4=
github.com/VictoriaMetrics/metricsql v0.38.0/go.mod h1:6pP1ZeLVJHqJrHlF6Ij3gmpQIznSsgktEcZgsAWYel0=
github.com/VictoriaMetrics/metricsql v0.39.0 h1:tm1hneyxVhm1oeJ/1T4007Y5Bn+LKN+Aw3l6XGwvgRM=
github.com/VictoriaMetrics/metricsql v0.39.0/go.mod h1:6pP1ZeLVJHqJrHlF6Ij3gmpQIznSsgktEcZgsAWYel0=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=

View File

@ -1,6 +1,7 @@
package metricsql
import (
"fmt"
"sort"
"strings"
)
@ -13,26 +14,64 @@ import (
// according to https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization
// I.e. such query is converted to `foo{filters1, filters2} op bar{filters1, filters2}`
func Optimize(e Expr) Expr {
switch t := e.(type) {
case *RollupExpr:
t.Expr = Optimize(t.Expr)
t.At = Optimize(t.At)
case *FuncExpr:
optimizeFuncArgs(t.Args)
case *AggrFuncExpr:
optimizeFuncArgs(t.Args)
case *BinaryOpExpr:
t.Left = Optimize(t.Left)
t.Right = Optimize(t.Right)
lfs := getCommonLabelFilters(t)
pushdownLabelFilters(t, lfs)
if !canOptimize(e) {
return e
}
return e
eCopy := Clone(e)
optimizeInplace(eCopy)
return eCopy
}
func optimizeFuncArgs(args []Expr) {
for i := range args {
args[i] = Optimize(args[i])
func canOptimize(e Expr) bool {
switch t := e.(type) {
case *RollupExpr:
return canOptimize(t.Expr) || canOptimize(t.At)
case *FuncExpr:
for _, arg := range t.Args {
if canOptimize(arg) {
return true
}
}
case *AggrFuncExpr:
for _, arg := range t.Args {
if canOptimize(arg) {
return true
}
}
case *BinaryOpExpr:
return true
}
return false
}
// Clone clones the given expression e and returns the cloned copy.
func Clone(e Expr) Expr {
s := e.AppendString(nil)
eCopy, err := Parse(string(s))
if err != nil {
panic(fmt.Errorf("BUG: cannot parse the expression %q: %w", s, err))
}
return eCopy
}
func optimizeInplace(e Expr) {
switch t := e.(type) {
case *RollupExpr:
optimizeInplace(t.Expr)
optimizeInplace(t.At)
case *FuncExpr:
for _, arg := range t.Args {
optimizeInplace(arg)
}
case *AggrFuncExpr:
for _, arg := range t.Args {
optimizeInplace(arg)
}
case *BinaryOpExpr:
optimizeInplace(t.Left)
optimizeInplace(t.Right)
lfs := getCommonLabelFilters(t)
pushdownBinaryOpFiltersInplace(t, lfs)
}
}
@ -54,7 +93,7 @@ func getCommonLabelFilters(e Expr) []LabelFilter {
return nil
}
lfs := getCommonLabelFilters(arg)
return filterLabelFiltersByAggrModifier(lfs, t)
return trimFiltersByAggrModifier(lfs, t)
case *BinaryOpExpr:
if !canOptimizeBinaryOp(t) {
return nil
@ -62,13 +101,13 @@ func getCommonLabelFilters(e Expr) []LabelFilter {
lfsLeft := getCommonLabelFilters(t.Left)
lfsRight := getCommonLabelFilters(t.Right)
lfs := unionLabelFilters(lfsLeft, lfsRight)
return filterLabelFiltersByGroupModifier(lfs, t)
return TrimFiltersByGroupModifier(lfs, t)
default:
return nil
}
}
func filterLabelFiltersByAggrModifier(lfs []LabelFilter, afe *AggrFuncExpr) []LabelFilter {
func trimFiltersByAggrModifier(lfs []LabelFilter, afe *AggrFuncExpr) []LabelFilter {
switch strings.ToLower(afe.Modifier.Op) {
case "by":
return filterLabelFiltersOn(lfs, afe.Modifier.Args)
@ -79,7 +118,13 @@ func filterLabelFiltersByAggrModifier(lfs []LabelFilter, afe *AggrFuncExpr) []La
}
}
func filterLabelFiltersByGroupModifier(lfs []LabelFilter, be *BinaryOpExpr) []LabelFilter {
// TrimFiltersByGroupModifier trims lfs by the specified be.GroupModifier.Op (e.g. on() or ignoring()).
//
// The following cases are possible:
// - It returns lfs as is if be doesn't contain any group modifier
// - It returns only filters specified in on()
// - It drops filters specified inside ignoring()
func TrimFiltersByGroupModifier(lfs []LabelFilter, be *BinaryOpExpr) []LabelFilter {
switch strings.ToLower(be.GroupModifier.Op) {
case "on":
return filterLabelFiltersOn(lfs, be.GroupModifier.Args)
@ -100,7 +145,24 @@ func getLabelFiltersWithoutMetricName(lfs []LabelFilter) []LabelFilter {
return lfsNew
}
func pushdownLabelFilters(e Expr, lfs []LabelFilter) {
// PushdownBinaryOpFilters pushes down the given commonFilters to e if possible.
//
// e must be a part of binary operation - either left or right.
//
// For example, if e contains `foo + sum(bar)` and commonFilters={x="y"},
// then the returned expression will contain `foo{x="y"} + sum(bar)`.
// The `{x="y"}` cannot be pusehd down to `sum(bar)`, since this may change binary operation results.
func PushdownBinaryOpFilters(e Expr, commonFilters []LabelFilter) Expr {
if len(commonFilters) == 0 {
// Fast path - nothing to push down.
return e
}
eCopy := Clone(e)
pushdownBinaryOpFiltersInplace(eCopy, commonFilters)
return eCopy
}
func pushdownBinaryOpFiltersInplace(e Expr, lfs []LabelFilter) {
if len(lfs) == 0 {
return
}
@ -109,23 +171,23 @@ func pushdownLabelFilters(e Expr, lfs []LabelFilter) {
t.LabelFilters = unionLabelFilters(t.LabelFilters, lfs)
sortLabelFilters(t.LabelFilters)
case *RollupExpr:
pushdownLabelFilters(t.Expr, lfs)
pushdownBinaryOpFiltersInplace(t.Expr, lfs)
case *FuncExpr:
arg := getFuncArgForOptimization(t.Name, t.Args)
if arg != nil {
pushdownLabelFilters(arg, lfs)
pushdownBinaryOpFiltersInplace(arg, lfs)
}
case *AggrFuncExpr:
lfs = filterLabelFiltersByAggrModifier(lfs, t)
lfs = trimFiltersByAggrModifier(lfs, t)
arg := getFuncArgForOptimization(t.Name, t.Args)
if arg != nil {
pushdownLabelFilters(arg, lfs)
pushdownBinaryOpFiltersInplace(arg, lfs)
}
case *BinaryOpExpr:
if canOptimizeBinaryOp(t) {
lfs = filterLabelFiltersByGroupModifier(lfs, t)
pushdownLabelFilters(t.Left, lfs)
pushdownLabelFilters(t.Right, lfs)
lfs = TrimFiltersByGroupModifier(lfs, t)
pushdownBinaryOpFiltersInplace(t.Left, lfs)
pushdownBinaryOpFiltersInplace(t.Right, lfs)
}
}
}

2
vendor/modules.txt vendored
View File

@ -26,7 +26,7 @@ github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.18.1
## explicit; go 1.12
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.38.0
# github.com/VictoriaMetrics/metricsql v0.39.0
## explicit; go 1.13
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop