From 157c02622bf5ed7e03d1564a0205823767b51b71 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 3 Feb 2021 00:24:05 +0200 Subject: [PATCH] app/vmselect: add ability to set Graphite-compatible filter via `{__graphite__="foo.*.bar"}` syntax --- README.md | 4 + app/vmselect/graphite/tags_api.go | 17 ++-- app/vmselect/netstorage/netstorage.go | 52 ++++++++---- app/vmselect/prometheus/prometheus.go | 6 +- app/vmstorage/main.go | 8 ++ docs/CHANGELOG.md | 2 + docs/MetricsQL.md | 1 + docs/Single-server-VictoriaMetrics.md | 4 + lib/storage/index_db.go | 37 +++++---- lib/storage/storage.go | 112 ++++++++++++++++++++++++++ lib/storage/storage_test.go | 22 +++++ lib/storage/tag_filters.go | 91 +++++++++++++++++---- lib/storage/tag_filters_test.go | 23 ++++++ 13 files changed, 322 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index d2e3d8798..5e2bea1b9 100644 --- a/README.md +++ b/README.md @@ -588,6 +588,10 @@ VictoriaMetrics supports the following Graphite APIs: * Metrics API - see [these docs](#graphite-metrics-api-usage). * Tags API - see [these docs](#graphite-tags-api-usage). +VictoriaMetrics supports `__graphite__` pseudo-label for filtering time series with Graphite-compatible filters in [MetricsQL](https://victoriametrics.github.io/MetricsQL.html). +For example, `{__graphite__="foo.*.bar"}` is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but it works faster +and it is easier to use when migrating from Graphite to VictoriaMetrics. + ### Graphite Metrics API usage diff --git a/app/vmselect/graphite/tags_api.go b/app/vmselect/graphite/tags_api.go index e1728932f..5d80f1213 100644 --- a/app/vmselect/graphite/tags_api.go +++ b/app/vmselect/graphite/tags_api.go @@ -23,6 +23,7 @@ import ( // // See https://graphite.readthedocs.io/en/stable/tags.html#removing-series-from-the-tagdb func TagsDelSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { + deadline := searchutils.GetDeadlineForQuery(r, startTime) if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } @@ -30,7 +31,7 @@ func TagsDelSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Re totalDeleted := 0 var row graphiteparser.Row var tagsPool []graphiteparser.Tag - ct := time.Now().UnixNano() / 1e6 + ct := startTime.UnixNano() / 1e6 for _, path := range paths { var err error tagsPool, err = row.UnmarshalMetricAndTags(path, tagsPool[:0]) @@ -49,7 +50,7 @@ func TagsDelSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Re }) } sq := storage.NewSearchQuery(0, ct, [][]storage.TagFilter{tfs}) - n, err := netstorage.DeleteSeries(sq) + n, err := netstorage.DeleteSeries(sq, deadline) if err != nil { return fmt.Errorf("cannot delete series for %q: %w", sq, err) } @@ -89,7 +90,7 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request var b []byte var tagsPool []graphiteparser.Tag mrs := make([]storage.MetricRow, len(paths)) - ct := time.Now().UnixNano() / 1e6 + ct := startTime.UnixNano() / 1e6 canonicalPaths := make([]string, len(paths)) for i, path := range paths { var err error @@ -186,7 +187,7 @@ func TagsAutoCompleteValuesHandler(startTime time.Time, w http.ResponseWriter, r } } else { // Slow path: use netstorage.SearchMetricNames for applying `expr` filters. - sq, err := getSearchQueryForExprs(exprs) + sq, err := getSearchQueryForExprs(startTime, exprs) if err != nil { return err } @@ -268,7 +269,7 @@ func TagsAutoCompleteTagsHandler(startTime time.Time, w http.ResponseWriter, r * } } else { // Slow path: use netstorage.SearchMetricNames for applying `expr` filters. - sq, err := getSearchQueryForExprs(exprs) + sq, err := getSearchQueryForExprs(startTime, exprs) if err != nil { return err } @@ -331,7 +332,7 @@ func TagsFindSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.R if len(exprs) == 0 { return fmt.Errorf("expecting at least one `expr` query arg") } - sq, err := getSearchQueryForExprs(exprs) + sq, err := getSearchQueryForExprs(startTime, exprs) if err != nil { return err } @@ -456,12 +457,12 @@ func getInt(r *http.Request, argName string) (int, error) { return n, nil } -func getSearchQueryForExprs(exprs []string) (*storage.SearchQuery, error) { +func getSearchQueryForExprs(startTime time.Time, exprs []string) (*storage.SearchQuery, error) { tfs, err := exprsToTagFilters(exprs) if err != nil { return nil, err } - ct := time.Now().UnixNano() / 1e6 + ct := startTime.UnixNano() / 1e6 sq := storage.NewSearchQuery(0, ct, [][]storage.TagFilter{tfs}) return sq, nil } diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 8e90a8fbd..46874de35 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -446,8 +446,12 @@ func (sbh *sortBlocksHeap) Pop() interface{} { } // DeleteSeries deletes time series matching the given tagFilterss. -func DeleteSeries(sq *storage.SearchQuery) (int, error) { - tfss, err := setupTfss(sq.TagFilterss) +func DeleteSeries(sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) { + tr := storage.TimeRange{ + MinTimestamp: sq.MinTimestamp, + MaxTimestamp: sq.MaxTimestamp, + } + tfss, err := setupTfss(tr, sq.TagFilterss, deadline) if err != nil { return 0, err } @@ -613,6 +617,11 @@ func GetTagValueSuffixes(tr storage.TimeRange, tagKey, tagValuePrefix string, de return nil, fmt.Errorf("error during search for suffixes for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s: %w", tagKey, tagValuePrefix, delimiter, tr.String(), err) } + if len(suffixes) >= *maxTagValueSuffixesPerSearch { + return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d tag value suffixes found for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; "+ + "either narrow down the query or increase -search.maxTagValueSuffixesPerSearch command-line flag value", + *maxTagValueSuffixesPerSearch, tagKey, tagValuePrefix, delimiter, tr.String()) + } return suffixes, nil } @@ -695,10 +704,6 @@ func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func if deadline.Exceeded() { return fmt.Errorf("timeout exceeded before starting data export: %s", deadline.String()) } - tfss, err := setupTfss(sq.TagFilterss) - if err != nil { - return err - } tr := storage.TimeRange{ MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, @@ -706,6 +711,10 @@ func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func if err := vmstorage.CheckTimeRange(tr); err != nil { return err } + tfss, err := setupTfss(tr, sq.TagFilterss, deadline) + if err != nil { + return err + } vmstorage.WG.Add(1) defer vmstorage.WG.Done() @@ -801,10 +810,6 @@ func SearchMetricNames(sq *storage.SearchQuery, deadline searchutils.Deadline) ( } // Setup search. - tfss, err := setupTfss(sq.TagFilterss) - if err != nil { - return nil, err - } tr := storage.TimeRange{ MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, @@ -812,6 +817,10 @@ func SearchMetricNames(sq *storage.SearchQuery, deadline searchutils.Deadline) ( if err := vmstorage.CheckTimeRange(tr); err != nil { return nil, err } + tfss, err := setupTfss(tr, sq.TagFilterss, deadline) + if err != nil { + return nil, err + } mns, err := vmstorage.SearchMetricNames(tfss, tr, *maxMetricsPerSearch, deadline.Deadline()) if err != nil { @@ -829,10 +838,6 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search } // Setup search. - tfss, err := setupTfss(sq.TagFilterss) - if err != nil { - return nil, err - } tr := storage.TimeRange{ MinTimestamp: sq.MinTimestamp, MaxTimestamp: sq.MaxTimestamp, @@ -840,6 +845,10 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search if err := vmstorage.CheckTimeRange(tr); err != nil { return nil, err } + tfss, err := setupTfss(tr, sq.TagFilterss, deadline) + if err != nil { + return nil, err + } vmstorage.WG.Add(1) defer vmstorage.WG.Done() @@ -917,12 +926,25 @@ type blockRef struct { addr tmpBlockAddr } -func setupTfss(tagFilterss [][]storage.TagFilter) ([]*storage.TagFilters, error) { +func setupTfss(tr storage.TimeRange, tagFilterss [][]storage.TagFilter, deadline searchutils.Deadline) ([]*storage.TagFilters, error) { tfss := make([]*storage.TagFilters, 0, len(tagFilterss)) for _, tagFilters := range tagFilterss { tfs := storage.NewTagFilters() for i := range tagFilters { tf := &tagFilters[i] + if string(tf.Key) == "__graphite__" { + query := tf.Value + paths, err := vmstorage.SearchGraphitePaths(tr, query, *maxMetricsPerSearch, deadline.Deadline()) + if err != nil { + return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err) + } + if len(paths) >= *maxMetricsPerSearch { + return nil, fmt.Errorf("more than -search.maxUniqueTimeseries=%d time series match Graphite query %q; "+ + "either narrow down the query or increase -search.maxUniqueTimeseries command-line flag value", *maxMetricsPerSearch, query) + } + tfs.AddGraphiteQuery(query, paths, tf.IsNegative) + continue + } if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil { return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err) } diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 248833aee..98080eca0 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -438,6 +438,7 @@ var exportBlockPool = &sync.Pool{ // // See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series func DeleteHandler(startTime time.Time, r *http.Request) error { + deadline := searchutils.GetDeadlineForQuery(r, startTime) if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } @@ -448,8 +449,9 @@ func DeleteHandler(startTime time.Time, r *http.Request) error { if err != nil { return err } - sq := storage.NewSearchQuery(0, 0, tagFilterss) - deletedCount, err := netstorage.DeleteSeries(sq) + ct := startTime.UnixNano() / 1e6 + sq := storage.NewSearchQuery(0, ct, tagFilterss) + deletedCount, err := netstorage.DeleteSeries(sq, deadline) if err != nil { return fmt.Errorf("cannot delete time series: %w", err) } diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 2ced7346b..cca9d093a 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -186,6 +186,14 @@ func SearchTagValueSuffixes(tr storage.TimeRange, tagKey, tagValuePrefix []byte, return suffixes, err } +// SearchGraphitePaths returns all the metric names matching the given Graphite query. +func SearchGraphitePaths(tr storage.TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) { + WG.Add(1) + paths, err := Storage.SearchGraphitePaths(tr, query, maxPaths, deadline) + WG.Done() + return paths, err +} + // SearchTagEntries searches for tag entries. func SearchTagEntries(maxTagKeys, maxTagValues int, deadline uint64) ([]storage.TagEntry, error) { WG.Add(1) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3d10ff8c8..8741e5514 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -5,6 +5,7 @@ * FEATURE: added [vmctl tool](https://victoriametrics.github.io/vmctl.html) to VictoriaMetrics release process. Now it is packaged in `vmutils-*.tar.gz` archive on [the releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). Source code for `vmctl` tool has been moved from [github.com/VictoriaMetrics/vmctl](https://github.com/VictoriaMetrics/vmctl) to [github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmctl). * FEATURE: added `-loggerTimezone` command-line flag for adjusting time zone for timestamps in log messages. By default UTC is used. * FEATURE: added `-search.maxStepForPointsAdjustment` command-line flag, which can be used for disabling adjustment for points returned by `/api/v1/query_range` handler if such points have timestamps closer than `-search.latencyOffset` to the current time. Such points may contain incomplete data, so they are substituted by the previous values for `step` query args smaller than one minute by default. +* FEATURE: vmselect: added ability to use Graphite-compatible filters in MetricsQL via `{__graphite__="foo.*.bar"}` syntax. This expression is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but it works faster and it is easier to use when migrating from Graphite to VictoriaMetrics. * FEATURE: vmselect: added ability to set additional label filters, which must be applied during queries. Such label filters can be set via optional `extra_label` query arg, which is accepted by [querying API](https://victoriametrics.github.io/#prometheus-querying-api-usage) handlers. For example, the request to `/api/v1/query_range?extra_label=tenant_id=123&query=` adds `{tenant_id="123"}` label filter to the given ``. It is expected that the `extra_label` query arg is automatically set by auth proxy sitting in front of VictoriaMetrics. [Contact us](mailto:sales@victoriametrics.com) if you need assistance with such a proxy. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1021 . * FEATURE: vmalert: added `-datasource.queryStep` command-line flag for passing optional `step` query arg to `/api/v1/query` endpoint. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1025 @@ -18,6 +19,7 @@ in front of VictoriaMetrics. [Contact us](mailto:sales@victoriametrics.com) if y - `vm_promscrape_discovery_retries_total` - `vm_promscrape_scrape_retries_total` - `vm_promscrape_service_discovery_duration_seconds` + * BUGFIX: vmagent: reduce HTTP reconnection rate for scrape targets. Previously vmagent could errorneusly close HTTP keep-alive connections more frequently than needed. * BUGFIX: vmagent: retry scrape and service discovery requests when the remote server closes HTTP keep-alive connection. Previously `disable_keepalive: true` option could be used under `scrape_configs` section when working with such servers. diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index 474121c5d..65d75612c 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -29,6 +29,7 @@ Feel free [filing a feature request](https://github.com/VictoriaMetrics/Victoria This functionality can be tried at [an editable Grafana dashboard](http://play-grafana.victoriametrics.com:3000/d/4ome8yJmz/node-exporter-on-victoriametrics-demo). - [`WITH` templates](https://play.victoriametrics.com/promql/expand-with-exprs). This feature simplifies writing and managing complex queries. Go to [`WITH` templates playground](https://play.victoriametrics.com/promql/expand-with-exprs) and try it. +- Graphite-compatible filters can be passed via `{__graphite__="foo.*.bar"}` syntax. This is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but usually works faster and is easier to use when migrating from Graphite to VictoriaMetrics. - Range duration in functions such as [rate](https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()) may be omitted. VictoriaMetrics automatically selects range duration depending on the current step used for building the graph. For instance, the following query is valid in VictoriaMetrics: `rate(node_network_receive_bytes_total)`. - All the aggregate functions support optional `limit N` suffix in order to limit the number of output series. For example, `sum(x) by (y) limit 10` limits the number of output time series after the aggregation to 10. All the other time series are dropped. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index d2e3d8798..5e2bea1b9 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -588,6 +588,10 @@ VictoriaMetrics supports the following Graphite APIs: * Metrics API - see [these docs](#graphite-metrics-api-usage). * Tags API - see [these docs](#graphite-tags-api-usage). +VictoriaMetrics supports `__graphite__` pseudo-label for filtering time series with Graphite-compatible filters in [MetricsQL](https://victoriametrics.github.io/MetricsQL.html). +For example, `{__graphite__="foo.*.bar"}` is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but it works faster +and it is easier to use when migrating from Graphite to VictoriaMetrics. + ### Graphite Metrics API usage diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index f9d0d3649..319441e66 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1101,6 +1101,8 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m // SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr. // // This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs. +// +// If it returns maxTagValueSuffixes suffixes, then it is likely more than maxTagValueSuffixes suffixes is found. func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) { // TODO: cache results? @@ -1111,13 +1113,15 @@ func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix [ if err != nil { return nil, err } - ok := db.doExtDB(func(extDB *indexDB) { - is := extDB.getIndexSearch(deadline) - err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes) - extDB.putIndexSearch(is) - }) - if ok && err != nil { - return nil, err + if len(tvss) < maxTagValueSuffixes { + ok := db.doExtDB(func(extDB *indexDB) { + is := extDB.getIndexSearch(deadline) + err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes) + extDB.putIndexSearch(is) + }) + if ok && err != nil { + return nil, err + } } suffixes := make([]string, 0, len(tvss)) @@ -1125,6 +1129,9 @@ func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix [ // Do not skip empty suffixes, since they may represent leaf tag values. suffixes = append(suffixes, suffix) } + if len(suffixes) > maxTagValueSuffixes { + suffixes = suffixes[:maxTagValueSuffixes] + } // Do not sort suffixes, since they must be sorted by vmselect. return suffixes, nil } @@ -1156,6 +1163,9 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct errGlobal = err return } + if len(tvss) > maxTagValueSuffixes { + return + } for k := range tvssLocal { tvss[k] = struct{}{} } @@ -1174,7 +1184,7 @@ func (is *indexSearch) searchTagValueSuffixesAll(tvss map[string]struct{}, tagKe kb.B = marshalTagValue(kb.B, tagValuePrefix) kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar from the end of kb.B prefix := append([]byte(nil), kb.B...) - return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, tagValuePrefix, delimiter, maxTagValueSuffixes) + return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, len(tagValuePrefix), delimiter, maxTagValueSuffixes) } func (is *indexSearch) searchTagValueSuffixesForDate(tvss map[string]struct{}, date uint64, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error { @@ -1186,10 +1196,10 @@ func (is *indexSearch) searchTagValueSuffixesForDate(tvss map[string]struct{}, d kb.B = marshalTagValue(kb.B, tagValuePrefix) kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar from the end of kb.B prefix := append([]byte(nil), kb.B...) - return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, tagValuePrefix, delimiter, maxTagValueSuffixes) + return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, len(tagValuePrefix), delimiter, maxTagValueSuffixes) } -func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, nsPrefix byte, prefix, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error { +func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, nsPrefix byte, prefix []byte, tagValuePrefixLen int, delimiter byte, maxTagValueSuffixes int) error { kb := &is.kb ts := &is.ts mp := &is.mp @@ -1215,10 +1225,7 @@ func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, continue } tagValue := mp.Tag.Value - if !bytes.HasPrefix(tagValue, tagValuePrefix) { - continue - } - suffix := tagValue[len(tagValuePrefix):] + suffix := tagValue[tagValuePrefixLen:] n := bytes.IndexByte(suffix, delimiter) if n < 0 { // Found leaf tag value that doesn't have delimiters after the given tagValuePrefix. @@ -2118,7 +2125,7 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) for i, tf := range tfs { - if len(tf.key) == 0 { + if len(tf.key) == 0 || string(tf.key) == "__graphite__" { // Match against mn.MetricGroup. b := marshalTagValue(kb.B, nil) b = marshalTagValue(b, mn.MetricGroup) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 3cef89e86..fea119bfd 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -9,6 +9,7 @@ import ( "path/filepath" "regexp" "sort" + "strings" "sync" "sync/atomic" "time" @@ -979,10 +980,121 @@ func (s *Storage) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint // SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr. // // This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs. +// +// If more than maxTagValueSuffixes suffixes is found, then only the first maxTagValueSuffixes suffixes is returned. func (s *Storage) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) { return s.idb().SearchTagValueSuffixes(tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline) } +// SearchGraphitePaths returns all the matching paths for the given graphite query on the given tr. +// +// If more than maxPaths paths is found, then only the first maxPaths paths is returned. +func (s *Storage) SearchGraphitePaths(tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) { + queryStr := string(query) + n := strings.IndexAny(queryStr, "*[{") + if n < 0 { + // Verify that the query matches a metric name. + suffixes, err := s.SearchTagValueSuffixes(tr, nil, query, '.', 1, deadline) + if err != nil { + return nil, err + } + if len(suffixes) == 0 { + // The query doesn't match anything. + return nil, nil + } + if len(suffixes[0]) > 0 { + // The query matches a metric name with additional suffix. + return nil, nil + } + return []string{queryStr}, nil + } + suffixes, err := s.SearchTagValueSuffixes(tr, nil, query[:n], '.', maxPaths, deadline) + if err != nil { + return nil, err + } + if len(suffixes) == 0 { + return nil, nil + } + if len(suffixes) >= maxPaths { + return nil, fmt.Errorf("more than maxPaths=%d suffixes found", maxPaths) + } + qPrefixStr := queryStr[:n] + qNode := queryStr[n:] + qTail := "" + mustMatchLeafs := true + if m := strings.IndexByte(qNode, '.'); m >= 0 { + qNode = qNode[:m+1] + qTail = qNode[m+1:] + mustMatchLeafs = false + } + re, err := getRegexpForGraphiteNodeQuery(qNode) + if err != nil { + return nil, err + } + var paths []string + for _, suffix := range suffixes { + if len(paths) > maxPaths { + paths = paths[:maxPaths] + break + } + if !re.MatchString(suffix) { + continue + } + if mustMatchLeafs { + paths = append(paths, qPrefixStr+suffix) + continue + } + q := qPrefixStr + suffix + qTail + ps, err := s.SearchGraphitePaths(tr, []byte(q), maxPaths, deadline) + if err != nil { + return nil, err + } + paths = append(paths, ps...) + } + return paths, nil +} + +func getRegexpForGraphiteNodeQuery(q string) (*regexp.Regexp, error) { + parts := getRegexpPartsForGraphiteNodeQuery(q) + reStr := "^" + strings.Join(parts, "") + "$" + return regexp.Compile(reStr) +} + +func getRegexpPartsForGraphiteNodeQuery(q string) []string { + var parts []string + for { + n := strings.IndexAny(q, "*{[") + if n < 0 { + return append(parts, regexp.QuoteMeta(q)) + } + parts = append(parts, regexp.QuoteMeta(q[:n])) + q = q[n:] + switch q[0] { + case '*': + parts = append(parts, "[^.]*") + q = q[1:] + case '{': + n := strings.IndexByte(q, '}') + if n < 0 { + return append(parts, regexp.QuoteMeta(q)) + } + var tmp []string + for _, x := range strings.Split(q[1:n], ",") { + tmp = append(tmp, strings.Join(getRegexpPartsForGraphiteNodeQuery(x), "")) + } + parts = append(parts, "(?:"+strings.Join(tmp, "|")+")") + q = q[n+1:] + case '[': + n := strings.IndexByte(q, ']') + if n < 0 { + return append(parts, regexp.QuoteMeta(q)) + } + parts = append(parts, q[:n+1]) + q = q[n+1:] + } + } +} + // SearchTagEntries returns a list of (tagName -> tagValues) func (s *Storage) SearchTagEntries(maxTagKeys, maxTagValues int, deadline uint64) ([]TagEntry, error) { idb := s.idb() diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index ecacd69ef..faa8cd8e5 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -14,6 +14,28 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) +func TestGetRegexpForGraphiteNodeQuery(t *testing.T) { + f := func(q, expectedRegexp string) { + t.Helper() + re, err := getRegexpForGraphiteNodeQuery(q) + if err != nil { + t.Fatalf("unexpected error for query=%q: %s", q, err) + } + reStr := re.String() + if reStr != expectedRegexp { + t.Fatalf("unexpected regexp for query %q; got %q want %q", q, reStr, expectedRegexp) + } + } + f(``, `^$`) + f(`*`, `^[^.]*$`) + f(`foo.`, `^foo\.$`) + f(`foo.bar`, `^foo\.bar$`) + f(`{foo,b*ar,b[a-z]}`, `^(?:foo|b[^.]*ar|b[a-z])$`) + f(`[-a-zx.]`, `^[-a-zx.]$`) + f(`**`, `^[^.]*[^.]*$`) + f(`a*[de]{x,y}z`, `^a[^.]*[de](?:x|y)z$`) +} + func TestDateMetricIDCacheSerial(t *testing.T) { c := newDateMetricIDCache() if err := testDateMetricIDCache(c, false); err != nil { diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go index 5467b1f36..6d35beaf7 100644 --- a/lib/storage/tag_filters.go +++ b/lib/storage/tag_filters.go @@ -30,6 +30,12 @@ func NewTagFilters() *TagFilters { } } +// AddGraphiteQuery adds the given Graphite query that matches the given paths to tfs. +func (tfs *TagFilters) AddGraphiteQuery(query []byte, paths []string, isNegative bool) { + tf := tfs.addTagFilter() + tf.InitFromGraphiteQuery(tfs.commonPrefix, query, paths, isNegative) +} + // Add adds the given tag filter to tfs. // // MetricGroup must be encoded with nil key. @@ -52,7 +58,7 @@ func (tfs *TagFilters) Add(key, value []byte, isNegative, isRegexp bool) error { } // Substitute negative tag filter matching anything with negative tag filter matching non-empty value - // in order to out all the time series with the given key. + // in order to filter out all the time series with the given key. value = []byte(".+") } @@ -162,6 +168,8 @@ type tagFilter struct { prefix []byte // or values obtained from regexp suffix if it equals to "foo|bar|..." + // + // This array is also populated with matching Graphite metrics if key="__graphite__" orSuffixes []string // Matches regexp suffix. @@ -228,6 +236,49 @@ func (tf *tagFilter) Marshal(dst []byte) []byte { return dst } +// InitFromGraphiteQuery initializes tf from the given graphite query expanded to the given paths. +func (tf *tagFilter) InitFromGraphiteQuery(commonPrefix, query []byte, paths []string, isNegative bool) { + if len(paths) == 0 { + // explicitly add empty path in order match zero metric names. + paths = []string{""} + } + prefix, orSuffixes := getCommonPrefix(paths) + if len(orSuffixes) == 0 { + orSuffixes = append(orSuffixes, "") + } + tf.key = append(tf.key[:0], "__graphite__"...) + tf.value = append(tf.value[:0], query...) + tf.isNegative = isNegative + tf.isRegexp = true // this is needed for tagFilter.matchSuffix + tf.prefix = append(tf.prefix[:0], commonPrefix...) + tf.prefix = marshalTagValue(tf.prefix, nil) + tf.prefix = marshalTagValueNoTrailingTagSeparator(tf.prefix, []byte(prefix)) + tf.orSuffixes = append(tf.orSuffixes[:0], orSuffixes...) + tf.reSuffixMatch, tf.matchCost = newMatchFuncForOrSuffixes(orSuffixes) +} + +func getCommonPrefix(ss []string) (string, []string) { + if len(ss) == 0 { + return "", nil + } + prefix := ss[0] + for _, s := range ss[1:] { + i := 0 + for i < len(s) && i < len(prefix) && s[i] == prefix[i] { + i++ + } + prefix = prefix[:i] + if len(prefix) == 0 { + return "", ss + } + } + result := make([]string, len(ss)) + for i, s := range ss { + result[i] = s[len(prefix):] + } + return prefix, result +} + // Init initializes the tag filter for the given commonPrefix, key and value. // // If isNegaitve is true, then the tag filter matches all the values @@ -242,6 +293,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp tf.value = append(tf.value[:0], value...) tf.isNegative = isNegative tf.isRegexp = isRegexp + tf.matchCost = 0 tf.prefix = tf.prefix[:0] @@ -345,22 +397,7 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) { var reCost uint64 var literalSuffix string if len(orValues) > 0 { - if len(orValues) == 1 { - v := orValues[0] - reMatch = func(b []byte) bool { - return string(b) == v - } - } else { - reMatch = func(b []byte) bool { - for _, v := range orValues { - if string(b) == v { - return true - } - } - return false - } - } - reCost = uint64(len(orValues)) * literalMatchCost + reMatch, reCost = newMatchFuncForOrSuffixes(orValues) } else { reMatch, literalSuffix, reCost = getOptimizedReMatchFunc(re.Match, sExpr) } @@ -388,6 +425,26 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) { return rcv, nil } +func newMatchFuncForOrSuffixes(orValues []string) (reMatch func(b []byte) bool, reCost uint64) { + if len(orValues) == 1 { + v := orValues[0] + reMatch = func(b []byte) bool { + return string(b) == v + } + } else { + reMatch = func(b []byte) bool { + for _, v := range orValues { + if string(b) == v { + return true + } + } + return false + } + } + reCost = uint64(len(orValues)) * literalMatchCost + return reMatch, reCost +} + // getOptimizedReMatchFunc tries returning optimized function for matching the given expr. // '.*' // '.+' diff --git a/lib/storage/tag_filters_test.go b/lib/storage/tag_filters_test.go index db40515c7..be312ff4a 100644 --- a/lib/storage/tag_filters_test.go +++ b/lib/storage/tag_filters_test.go @@ -2,9 +2,32 @@ package storage import ( "reflect" + "strings" "testing" ) +func TestGetCommonPrefix(t *testing.T) { + f := func(a []string, expectedPrefix string) { + t.Helper() + prefix, result := getCommonPrefix(a) + if prefix != expectedPrefix { + t.Fatalf("unexpected prefix; got %q; want %q", prefix, expectedPrefix) + } + for i, s := range a { + if !strings.HasPrefix(s, prefix) { + t.Fatalf("s=%q has no prefix %q", s, prefix) + } + if s[len(prefix):] != result[i] { + t.Fatalf("unexpected result[%d]; got %q; want %q", i, s[len(prefix):], result[i]) + } + } + } + f(nil, "") + f([]string{"foo"}, "foo") + f([]string{"foo", "bar"}, "") + f([]string{"foo1", "foo2", "foo34"}, "foo") +} + func TestExtractRegexpPrefix(t *testing.T) { f := func(s string, expectedPrefix, expectedSuffix string) { t.Helper()