VictoriaMetrics/app/vmselect/searchutils/searchutils.go
Aliaksandr Valialkin a7fdc3fcc7
all: add support for or filters in series selectors
This commit adds ability to select series matching distinct filters via a single series selector.
For example, the following selector selects series with either {env="prod",job="a"}
or {env="dev",job="b"} labels:

  {env="prod",job="a" or env="dev",job="b"}

The `or` filter is supported in all the VictoriaMetrics tools now.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3997
Uses https://github.com/VictoriaMetrics/metricsql/pull/14
2023-07-15 23:56:18 -07:00

222 lines
6.9 KiB
Go

package searchutils
import (
"flag"
"fmt"
"net/http"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metricsql"
)
var (
maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution")
maxStatusRequestDuration = flag.Duration("search.maxStatusRequestDuration", time.Minute*5, "The maximum duration for /api/v1/status/* requests")
)
// GetMaxQueryDuration returns the maximum duration for query from r.
func GetMaxQueryDuration(r *http.Request) time.Duration {
dms, err := httputils.GetDuration(r, "timeout", 0)
if err != nil {
dms = 0
}
d := time.Duration(dms) * time.Millisecond
if d <= 0 || d > *maxQueryDuration {
d = *maxQueryDuration
}
return d
}
// GetDeadlineForQuery returns deadline for the given query r.
func GetDeadlineForQuery(r *http.Request, startTime time.Time) Deadline {
dMax := maxQueryDuration.Milliseconds()
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration")
}
// GetDeadlineForStatusRequest returns deadline for the given request to /api/v1/status/*.
func GetDeadlineForStatusRequest(r *http.Request, startTime time.Time) Deadline {
dMax := maxStatusRequestDuration.Milliseconds()
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxStatusRequestDuration")
}
// GetDeadlineForExport returns deadline for the given request to /api/v1/export.
func GetDeadlineForExport(r *http.Request, startTime time.Time) Deadline {
dMax := maxExportDuration.Milliseconds()
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration")
}
func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) Deadline {
d, err := httputils.GetDuration(r, "timeout", 0)
if err != nil {
d = 0
}
if d <= 0 || d > dMax {
d = dMax
}
timeout := time.Duration(d) * time.Millisecond
return NewDeadline(startTime, timeout, flagHint)
}
// Deadline contains deadline with the corresponding timeout for pretty error messages.
type Deadline struct {
deadline uint64
timeout time.Duration
flagHint string
}
// NewDeadline returns deadline for the given timeout.
//
// flagHint must contain a hit for command-line flag, which could be used
// in order to increase timeout.
func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline {
return Deadline{
deadline: uint64(startTime.Add(timeout).Unix()),
timeout: timeout,
flagHint: flagHint,
}
}
// DeadlineFromTimestamp returns deadline from the given timestamp in seconds.
func DeadlineFromTimestamp(timestamp uint64) Deadline {
startTime := time.Now()
timeout := time.Unix(int64(timestamp), 0).Sub(startTime)
return NewDeadline(startTime, timeout, "")
}
// Exceeded returns true if deadline is exceeded.
func (d *Deadline) Exceeded() bool {
return fasttime.UnixTimestamp() > d.deadline
}
// Deadline returns deadline in unix timestamp seconds.
func (d *Deadline) Deadline() uint64 {
return d.deadline
}
// String returns human-readable string representation for d.
func (d *Deadline) String() string {
startTime := time.Unix(int64(d.deadline), 0).Add(-d.timeout)
elapsed := time.Since(startTime)
msg := fmt.Sprintf("%.3f seconds (elapsed %.3f seconds)", d.timeout.Seconds(), elapsed.Seconds())
if float64(elapsed)/float64(d.timeout) > 0.9 && d.flagHint != "" {
msg += fmt.Sprintf("; the timeout can be adjusted with `%s` command-line flag", d.flagHint)
}
return msg
}
// GetExtraTagFilters returns additional label filters from request.
//
// Label filters can be present in extra_label and extra_filters[] query args.
// They are combined. For example, the following query args:
//
// extra_label=t1=v1&extra_label=t2=v2&extra_filters[]={env="prod",team="devops"}&extra_filters={env=~"dev|staging",team!="devops"}
//
// should be translated to the following filters joined with "or":
//
// {env="prod",team="devops",t1="v1",t2="v2"}
// {env=~"dev|staging",team!="devops",t1="v1",t2="v2"}
func GetExtraTagFilters(r *http.Request) ([][]storage.TagFilter, error) {
var tagFilters []storage.TagFilter
for _, match := range r.Form["extra_label"] {
tmp := strings.SplitN(match, "=", 2)
if len(tmp) != 2 {
return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", match)
}
if tmp[0] == "__name__" {
// This is required for storage.Search.
tmp[0] = ""
}
tagFilters = append(tagFilters, storage.TagFilter{
Key: []byte(tmp[0]),
Value: []byte(tmp[1]),
})
}
extraFilters := append([]string{}, r.Form["extra_filters"]...)
extraFilters = append(extraFilters, r.Form["extra_filters[]"]...)
if len(extraFilters) == 0 {
if len(tagFilters) == 0 {
return nil, nil
}
return [][]storage.TagFilter{tagFilters}, nil
}
var etfs [][]storage.TagFilter
for _, extraFilter := range extraFilters {
tfss, err := ParseMetricSelector(extraFilter)
if err != nil {
return nil, fmt.Errorf("cannot parse extra_filters=%s: %w", extraFilter, err)
}
for i := range tfss {
tfss[i] = append(tfss[i], tagFilters...)
}
etfs = append(etfs, tfss...)
}
return etfs, nil
}
// JoinTagFilterss adds etfs to every src filter and returns the result.
func JoinTagFilterss(src, etfs [][]storage.TagFilter) [][]storage.TagFilter {
if len(src) == 0 {
return etfs
}
if len(etfs) == 0 {
return src
}
var dst [][]storage.TagFilter
for _, tf := range src {
for _, etf := range etfs {
tfs := append([]storage.TagFilter{}, tf...)
tfs = append(tfs, etf...)
dst = append(dst, tfs)
}
}
return dst
}
// ParseMetricSelector parses s containing PromQL metric selector and returns the corresponding LabelFilters.
func ParseMetricSelector(s string) ([][]storage.TagFilter, error) {
expr, err := metricsql.Parse(s)
if err != nil {
return nil, err
}
me, ok := expr.(*metricsql.MetricExpr)
if !ok {
return nil, fmt.Errorf("expecting metricSelector; got %q", expr.AppendString(nil))
}
if len(me.LabelFilterss) == 0 {
return nil, fmt.Errorf("labelFilterss cannot be empty")
}
tfss := ToTagFilterss(me.LabelFilterss)
return tfss, nil
}
// ToTagFilterss converts lfss to or-delimited slices of storage.TagFilter
func ToTagFilterss(lfss [][]metricsql.LabelFilter) [][]storage.TagFilter {
tfss := make([][]storage.TagFilter, len(lfss))
for i, lfs := range lfss {
tfs := make([]storage.TagFilter, len(lfs))
for j := range lfs {
toTagFilter(&tfs[j], &lfs[j])
}
tfss[i] = tfs
}
return tfss
}
func toTagFilter(dst *storage.TagFilter, src *metricsql.LabelFilter) {
if src.Label != "__name__" {
dst.Key = []byte(src.Label)
} else {
// This is required for storage.Search.
dst.Key = nil
}
dst.Value = []byte(src.Value)
dst.IsRegexp = src.IsRegexp
dst.IsNegative = src.IsNegative
}