From 465923b1815dc590cff16990168cf00a08eb20d8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 16 Nov 2020 10:55:55 +0200 Subject: [PATCH] app/vmselect/graphite: add /tags/findSeries handler from Graphite Tags API See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags --- README.md | 1 + app/vmselect/graphite/tags_api.go | 109 ++++++++++++++++++ .../graphite/tags_find_series_response.qtpl | 12 ++ .../tags_find_series_response.qtpl.go | 65 +++++++++++ app/vmselect/main.go | 13 ++- app/vmselect/netstorage/netstorage.go | 26 +++++ app/vmselect/prometheus/prometheus.go | 28 +++++ app/vmstorage/main.go | 8 ++ docs/Single-server-VictoriaMetrics.md | 1 + lib/storage/storage.go | 35 ++++++ 10 files changed, 297 insertions(+), 1 deletion(-) create mode 100644 app/vmselect/graphite/tags_find_series_response.qtpl create mode 100644 app/vmselect/graphite/tags_find_series_response.qtpl.go diff --git a/README.md b/README.md index 859d44967..0c722fc4f 100644 --- a/README.md +++ b/README.md @@ -550,6 +550,7 @@ VictoriaMetrics supports the following handlers from [Graphite Tags API](https:/ * [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb) * [/tags](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) * [/tags/tag_name](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) +* [/tags/findSeries](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) ### How to build from sources diff --git a/app/vmselect/graphite/tags_api.go b/app/vmselect/graphite/tags_api.go index d7efd35fd..8c76b6031 100644 --- a/app/vmselect/graphite/tags_api.go +++ b/app/vmselect/graphite/tags_api.go @@ -3,15 +3,96 @@ package graphite import ( "fmt" "net/http" + "sort" "strconv" + "strings" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" ) +// TagsFindSeriesHandler implements /tags/findSeries endpoint from Graphite Tags API. +// +// See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags +func TagsFindSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { + deadline := searchutils.GetDeadlineForQuery(r, startTime) + if err := r.ParseForm(); err != nil { + return fmt.Errorf("cannot parse form values: %w", err) + } + limit, err := getInt(r, "limit") + if err != nil { + return err + } + exprs := r.Form["expr"] + if len(exprs) == 0 { + return fmt.Errorf("expecting at least one `expr` query arg") + } + + // Convert exprs to []storage.TagFilter + tfs := make([]storage.TagFilter, 0, len(exprs)) + for _, expr := range exprs { + tf, err := parseFilterExpr(expr) + if err != nil { + return fmt.Errorf("cannot parse `expr` query arg: %w", err) + } + tfs = append(tfs, *tf) + } + + // Send the request to storage + ct := time.Now().UnixNano() / 1e6 + sq := &storage.SearchQuery{ + MinTimestamp: 0, + MaxTimestamp: ct, + TagFilterss: [][]storage.TagFilter{tfs}, + } + mns, err := netstorage.SearchMetricNames(sq, deadline) + if err != nil { + return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err) + } + paths := getCanonicalPaths(mns) + if limit > 0 && limit < len(paths) { + paths = paths[:limit] + } + + w.Header().Set("Content-Type", "application/json; charset=utf-8") + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + WriteTagsFindSeriesResponse(bw, paths) + if err := bw.Flush(); err != nil { + return err + } + tagsFindSeriesDuration.UpdateDuration(startTime) + return nil +} + +func getCanonicalPaths(mns []storage.MetricName) []string { + paths := make([]string, 0, len(mns)) + var b []byte + var tags []storage.Tag + for _, mn := range mns { + b = append(b[:0], mn.MetricGroup...) + tags = append(tags[:0], mn.Tags...) + sort.Slice(tags, func(i, j int) bool { + return string(tags[i].Key) < string(tags[j].Key) + }) + for _, tag := range tags { + b = append(b, ';') + b = append(b, tag.Key...) + b = append(b, '=') + b = append(b, tag.Value...) + } + paths = append(paths, string(b)) + } + sort.Strings(paths) + return paths +} + +var tagsFindSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/findSeries"}`) + // TagValuesHandler implements /tags/ endpoint from Graphite Tags API. // // See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags @@ -85,3 +166,31 @@ func getInt(r *http.Request, argName string) (int, error) { } return n, nil } + +func parseFilterExpr(s string) (*storage.TagFilter, error) { + n := strings.Index(s, "=") + if n < 0 { + return nil, fmt.Errorf("missing tag value in filter expression %q", s) + } + tagName := s[:n] + tagValue := s[n+1:] + isNegative := false + if strings.HasSuffix(tagName, "!") { + isNegative = true + tagName = tagName[:len(tagName)-1] + } + if tagName == "name" { + tagName = "" + } + isRegexp := false + if strings.HasPrefix(tagValue, "~") { + isRegexp = true + tagValue = "^(?:" + tagValue[1:] + ").*" + } + return &storage.TagFilter{ + Key: []byte(tagName), + Value: []byte(tagValue), + IsNegative: isNegative, + IsRegexp: isRegexp, + }, nil +} diff --git a/app/vmselect/graphite/tags_find_series_response.qtpl b/app/vmselect/graphite/tags_find_series_response.qtpl new file mode 100644 index 000000000..a22df4359 --- /dev/null +++ b/app/vmselect/graphite/tags_find_series_response.qtpl @@ -0,0 +1,12 @@ +{% stripspace %} + +{% func TagsFindSeriesResponse(paths []string) %} +[ + {% for i, path := range paths %} + {%q= path %} + {% if i+1 < len(paths) %},{% endif %} + {% endfor %} +] +{% endfunc %} + +{% endstripspace %} diff --git a/app/vmselect/graphite/tags_find_series_response.qtpl.go b/app/vmselect/graphite/tags_find_series_response.qtpl.go new file mode 100644 index 000000000..670f1f530 --- /dev/null +++ b/app/vmselect/graphite/tags_find_series_response.qtpl.go @@ -0,0 +1,65 @@ +// Code generated by qtc from "tags_find_series_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +//line app/vmselect/graphite/tags_find_series_response.qtpl:3 +package graphite + +//line app/vmselect/graphite/tags_find_series_response.qtpl:3 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vmselect/graphite/tags_find_series_response.qtpl:3 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vmselect/graphite/tags_find_series_response.qtpl:3 +func StreamTagsFindSeriesResponse(qw422016 *qt422016.Writer, paths []string) { +//line app/vmselect/graphite/tags_find_series_response.qtpl:3 + qw422016.N().S(`[`) +//line app/vmselect/graphite/tags_find_series_response.qtpl:5 + for i, path := range paths { +//line app/vmselect/graphite/tags_find_series_response.qtpl:6 + qw422016.N().Q(path) +//line app/vmselect/graphite/tags_find_series_response.qtpl:7 + if i+1 < len(paths) { +//line app/vmselect/graphite/tags_find_series_response.qtpl:7 + qw422016.N().S(`,`) +//line app/vmselect/graphite/tags_find_series_response.qtpl:7 + } +//line app/vmselect/graphite/tags_find_series_response.qtpl:8 + } +//line app/vmselect/graphite/tags_find_series_response.qtpl:8 + qw422016.N().S(`]`) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 +} + +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 +func WriteTagsFindSeriesResponse(qq422016 qtio422016.Writer, paths []string) { +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + StreamTagsFindSeriesResponse(qw422016, paths) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + qt422016.ReleaseWriter(qw422016) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 +} + +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 +func TagsFindSeriesResponse(paths []string) string { +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + WriteTagsFindSeriesResponse(qb422016, paths) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + qs422016 := string(qb422016.B) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 + return qs422016 +//line app/vmselect/graphite/tags_find_series_response.qtpl:10 +} diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 270c3aaca..1f3f92d40 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -132,7 +132,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return true } } - if strings.HasPrefix(path, "/tags/") { + if strings.HasPrefix(path, "/tags/") && path != "/tags/findSeries" { tagName := r.URL.Path[len("/tags/"):] graphiteTagValuesRequests.Inc() if err := graphite.TagValuesHandler(startTime, tagName, w, r); err != nil { @@ -277,6 +277,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return true } return true + case "/tags/findSeries": + graphiteTagsFindSeriesRequests.Inc() + if err := graphite.TagsFindSeriesHandler(startTime, w, r); err != nil { + graphiteTagsFindSeriesErrors.Inc() + httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err) + return true + } + return true case "/api/v1/rules": // Return dumb placeholder rulesRequests.Inc() @@ -384,6 +392,9 @@ var ( graphiteTagValuesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/"}`) graphiteTagValuesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/"}`) + graphiteTagsFindSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/findSeries"}`) + graphiteTagsFindSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/findSeries"}`) + rulesRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/rules"}`) alertsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/alerts"}`) metadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/metadata"}`) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index eb7ed9634..a672b6d14 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -777,6 +777,32 @@ var exportWorkPool = &sync.Pool{ }, } +// SearchMetricNames returns all the metric names matching sq until the given deadline. +func SearchMetricNames(sq *storage.SearchQuery, deadline searchutils.Deadline) ([]storage.MetricName, error) { + if deadline.Exceeded() { + return nil, fmt.Errorf("timeout exceeded before starting to search metric names: %s", deadline.String()) + } + + // Setup search. + tfss, err := setupTfss(sq.TagFilterss) + if err != nil { + return nil, err + } + tr := storage.TimeRange{ + MinTimestamp: sq.MinTimestamp, + MaxTimestamp: sq.MaxTimestamp, + } + if err := vmstorage.CheckTimeRange(tr); err != nil { + return nil, err + } + + mns, err := vmstorage.SearchMetricNames(tfss, tr, *maxMetricsPerSearch, deadline.Deadline()) + if err != nil { + return nil, fmt.Errorf("cannot find metric names: %w", err) + } + return mns, nil +} + // ProcessSearchQuery performs sq until the given deadline. // // Results.RunParallel or Results.Cancel must be called on the returned Results. diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 05f316f28..889ab7e29 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -878,6 +878,34 @@ func SeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) MaxTimestamp: end, TagFilterss: tagFilterss, } + if end-start > 24*3600*1000 { + // It is cheaper to call SearchMetricNames on time ranges exceeding a day. + mns, err := netstorage.SearchMetricNames(sq, deadline) + if err != nil { + return fmt.Errorf("cannot fetch time series for %q: %w", sq, err) + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + resultsCh := make(chan *quicktemplate.ByteBuffer) + doneCh := make(chan struct{}) + go func() { + for i := range mns { + bb := quicktemplate.AcquireByteBuffer() + writemetricNameObject(bb, &mns[i]) + resultsCh <- bb + } + close(doneCh) + }() + // WriteSeriesResponse must consume all the data from resultsCh. + WriteSeriesResponse(bw, resultsCh) + if err := bw.Flush(); err != nil { + return err + } + <-doneCh + seriesDuration.UpdateDuration(startTime) + return nil + } rss, err := netstorage.ProcessSearchQuery(sq, false, deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %w", sq, err) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index af461eb93..cee1db78c 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -131,6 +131,14 @@ func DeleteMetrics(tfss []*storage.TagFilters) (int, error) { return n, err } +// SearchMetricNames returns metric names for the given tfss on the given tr. +func SearchMetricNames(tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]storage.MetricName, error) { + WG.Add(1) + mns, err := Storage.SearchMetricNames(tfss, tr, maxMetrics, deadline) + WG.Done() + return mns, err +} + // SearchTagKeysOnTimeRange searches for tag keys on tr. func SearchTagKeysOnTimeRange(tr storage.TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { WG.Add(1) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 859d44967..0c722fc4f 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -550,6 +550,7 @@ VictoriaMetrics supports the following handlers from [Graphite Tags API](https:/ * [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb) * [/tags](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) * [/tags/tag_name](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) +* [/tags/findSeries](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) ### How to build from sources diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 4b6c85ea0..e474a0d20 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -796,6 +796,41 @@ func nextRetentionDuration(retentionMonths int) time.Duration { return deadline.Sub(t) } +// SearchMetricNames returns metric names matching the given tfss on the given tr. +func (s *Storage) SearchMetricNames(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]MetricName, error) { + tsids, err := s.searchTSIDs(tfss, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + if err = s.prefetchMetricNames(tsids, deadline); err != nil { + return nil, err + } + idb := s.idb() + is := idb.getIndexSearch(deadline) + defer idb.putIndexSearch(is) + mns := make([]MetricName, 0, len(tsids)) + var metricName []byte + for i := range tsids { + metricID := tsids[i].MetricID + var err error + metricName, err = is.searchMetricName(metricName[:0], metricID) + if err != nil { + if err == io.EOF { + // Skip missing metricName for metricID. + // It should be automatically fixed. See indexDB.searchMetricName for details. + continue + } + return nil, fmt.Errorf("error when searching metricName for metricID=%d: %w", metricID, err) + } + mns = mns[:len(mns)+1] + mn := &mns[len(mns)-1] + if err = mn.Unmarshal(metricName); err != nil { + return nil, fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err) + } + } + return mns, nil +} + // searchTSIDs returns sorted TSIDs for the given tfss and the given tr. func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) { // Do not cache tfss -> tsids here, since the caching is performed