app/vmselect/graphite: add /tags/findSeries handler from Graphite Tags API

See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags
This commit is contained in:
Aliaksandr Valialkin 2020-11-16 10:55:55 +02:00
parent a1f3795b78
commit 465923b181
10 changed files with 297 additions and 1 deletions

View File

@ -550,6 +550,7 @@ VictoriaMetrics supports the following handlers from [Graphite Tags API](https:/
* [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb)
* [/tags](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
* [/tags/tag_name](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
* [/tags/findSeries](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
### How to build from sources

View File

@ -3,15 +3,96 @@ package graphite
import (
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// TagsFindSeriesHandler implements /tags/findSeries endpoint from Graphite Tags API.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags
func TagsFindSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
limit, err := getInt(r, "limit")
if err != nil {
return err
}
exprs := r.Form["expr"]
if len(exprs) == 0 {
return fmt.Errorf("expecting at least one `expr` query arg")
}
// Convert exprs to []storage.TagFilter
tfs := make([]storage.TagFilter, 0, len(exprs))
for _, expr := range exprs {
tf, err := parseFilterExpr(expr)
if err != nil {
return fmt.Errorf("cannot parse `expr` query arg: %w", err)
}
tfs = append(tfs, *tf)
}
// Send the request to storage
ct := time.Now().UnixNano() / 1e6
sq := &storage.SearchQuery{
MinTimestamp: 0,
MaxTimestamp: ct,
TagFilterss: [][]storage.TagFilter{tfs},
}
mns, err := netstorage.SearchMetricNames(sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
}
paths := getCanonicalPaths(mns)
if limit > 0 && limit < len(paths) {
paths = paths[:limit]
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteTagsFindSeriesResponse(bw, paths)
if err := bw.Flush(); err != nil {
return err
}
tagsFindSeriesDuration.UpdateDuration(startTime)
return nil
}
func getCanonicalPaths(mns []storage.MetricName) []string {
paths := make([]string, 0, len(mns))
var b []byte
var tags []storage.Tag
for _, mn := range mns {
b = append(b[:0], mn.MetricGroup...)
tags = append(tags[:0], mn.Tags...)
sort.Slice(tags, func(i, j int) bool {
return string(tags[i].Key) < string(tags[j].Key)
})
for _, tag := range tags {
b = append(b, ';')
b = append(b, tag.Key...)
b = append(b, '=')
b = append(b, tag.Value...)
}
paths = append(paths, string(b))
}
sort.Strings(paths)
return paths
}
var tagsFindSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/findSeries"}`)
// TagValuesHandler implements /tags/<tag_name> endpoint from Graphite Tags API.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags
@ -85,3 +166,31 @@ func getInt(r *http.Request, argName string) (int, error) {
}
return n, nil
}
func parseFilterExpr(s string) (*storage.TagFilter, error) {
n := strings.Index(s, "=")
if n < 0 {
return nil, fmt.Errorf("missing tag value in filter expression %q", s)
}
tagName := s[:n]
tagValue := s[n+1:]
isNegative := false
if strings.HasSuffix(tagName, "!") {
isNegative = true
tagName = tagName[:len(tagName)-1]
}
if tagName == "name" {
tagName = ""
}
isRegexp := false
if strings.HasPrefix(tagValue, "~") {
isRegexp = true
tagValue = "^(?:" + tagValue[1:] + ").*"
}
return &storage.TagFilter{
Key: []byte(tagName),
Value: []byte(tagValue),
IsNegative: isNegative,
IsRegexp: isRegexp,
}, nil
}

View File

@ -0,0 +1,12 @@
{% stripspace %}
{% func TagsFindSeriesResponse(paths []string) %}
[
{% for i, path := range paths %}
{%q= path %}
{% if i+1 < len(paths) %},{% endif %}
{% endfor %}
]
{% endfunc %}
{% endstripspace %}

View File

@ -0,0 +1,65 @@
// Code generated by qtc from "tags_find_series_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmselect/graphite/tags_find_series_response.qtpl:3
package graphite
//line app/vmselect/graphite/tags_find_series_response.qtpl:3
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/graphite/tags_find_series_response.qtpl:3
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/graphite/tags_find_series_response.qtpl:3
func StreamTagsFindSeriesResponse(qw422016 *qt422016.Writer, paths []string) {
//line app/vmselect/graphite/tags_find_series_response.qtpl:3
qw422016.N().S(`[`)
//line app/vmselect/graphite/tags_find_series_response.qtpl:5
for i, path := range paths {
//line app/vmselect/graphite/tags_find_series_response.qtpl:6
qw422016.N().Q(path)
//line app/vmselect/graphite/tags_find_series_response.qtpl:7
if i+1 < len(paths) {
//line app/vmselect/graphite/tags_find_series_response.qtpl:7
qw422016.N().S(`,`)
//line app/vmselect/graphite/tags_find_series_response.qtpl:7
}
//line app/vmselect/graphite/tags_find_series_response.qtpl:8
}
//line app/vmselect/graphite/tags_find_series_response.qtpl:8
qw422016.N().S(`]`)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
}
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
func WriteTagsFindSeriesResponse(qq422016 qtio422016.Writer, paths []string) {
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
StreamTagsFindSeriesResponse(qw422016, paths)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
}
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
func TagsFindSeriesResponse(paths []string) string {
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
WriteTagsFindSeriesResponse(qb422016, paths)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
return qs422016
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
}

View File

@ -132,7 +132,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
}
if strings.HasPrefix(path, "/tags/") {
if strings.HasPrefix(path, "/tags/") && path != "/tags/findSeries" {
tagName := r.URL.Path[len("/tags/"):]
graphiteTagValuesRequests.Inc()
if err := graphite.TagValuesHandler(startTime, tagName, w, r); err != nil {
@ -277,6 +277,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
return true
case "/tags/findSeries":
graphiteTagsFindSeriesRequests.Inc()
if err := graphite.TagsFindSeriesHandler(startTime, w, r); err != nil {
graphiteTagsFindSeriesErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "/api/v1/rules":
// Return dumb placeholder
rulesRequests.Inc()
@ -384,6 +392,9 @@ var (
graphiteTagValuesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/<tag_name>"}`)
graphiteTagValuesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/<tag_name>"}`)
graphiteTagsFindSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/tags/findSeries"}`)
graphiteTagsFindSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/tags/findSeries"}`)
rulesRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/rules"}`)
alertsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/alerts"}`)
metadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/metadata"}`)

View File

@ -777,6 +777,32 @@ var exportWorkPool = &sync.Pool{
},
}
// SearchMetricNames returns all the metric names matching sq until the given deadline.
func SearchMetricNames(sq *storage.SearchQuery, deadline searchutils.Deadline) ([]storage.MetricName, error) {
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting to search metric names: %s", deadline.String())
}
// Setup search.
tfss, err := setupTfss(sq.TagFilterss)
if err != nil {
return nil, err
}
tr := storage.TimeRange{
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
if err := vmstorage.CheckTimeRange(tr); err != nil {
return nil, err
}
mns, err := vmstorage.SearchMetricNames(tfss, tr, *maxMetricsPerSearch, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("cannot find metric names: %w", err)
}
return mns, nil
}
// ProcessSearchQuery performs sq until the given deadline.
//
// Results.RunParallel or Results.Cancel must be called on the returned Results.

View File

@ -878,6 +878,34 @@ func SeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
if end-start > 24*3600*1000 {
// It is cheaper to call SearchMetricNames on time ranges exceeding a day.
mns, err := netstorage.SearchMetricNames(sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch time series for %q: %w", sq, err)
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer)
doneCh := make(chan struct{})
go func() {
for i := range mns {
bb := quicktemplate.AcquireByteBuffer()
writemetricNameObject(bb, &mns[i])
resultsCh <- bb
}
close(doneCh)
}()
// WriteSeriesResponse must consume all the data from resultsCh.
WriteSeriesResponse(bw, resultsCh)
if err := bw.Flush(); err != nil {
return err
}
<-doneCh
seriesDuration.UpdateDuration(startTime)
return nil
}
rss, err := netstorage.ProcessSearchQuery(sq, false, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)

View File

@ -131,6 +131,14 @@ func DeleteMetrics(tfss []*storage.TagFilters) (int, error) {
return n, err
}
// SearchMetricNames returns metric names for the given tfss on the given tr.
func SearchMetricNames(tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]storage.MetricName, error) {
WG.Add(1)
mns, err := Storage.SearchMetricNames(tfss, tr, maxMetrics, deadline)
WG.Done()
return mns, err
}
// SearchTagKeysOnTimeRange searches for tag keys on tr.
func SearchTagKeysOnTimeRange(tr storage.TimeRange, maxTagKeys int, deadline uint64) ([]string, error) {
WG.Add(1)

View File

@ -550,6 +550,7 @@ VictoriaMetrics supports the following handlers from [Graphite Tags API](https:/
* [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb)
* [/tags](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
* [/tags/tag_name](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
* [/tags/findSeries](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
### How to build from sources

View File

@ -796,6 +796,41 @@ func nextRetentionDuration(retentionMonths int) time.Duration {
return deadline.Sub(t)
}
// SearchMetricNames returns metric names matching the given tfss on the given tr.
func (s *Storage) SearchMetricNames(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]MetricName, error) {
tsids, err := s.searchTSIDs(tfss, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
if err = s.prefetchMetricNames(tsids, deadline); err != nil {
return nil, err
}
idb := s.idb()
is := idb.getIndexSearch(deadline)
defer idb.putIndexSearch(is)
mns := make([]MetricName, 0, len(tsids))
var metricName []byte
for i := range tsids {
metricID := tsids[i].MetricID
var err error
metricName, err = is.searchMetricName(metricName[:0], metricID)
if err != nil {
if err == io.EOF {
// Skip missing metricName for metricID.
// It should be automatically fixed. See indexDB.searchMetricName for details.
continue
}
return nil, fmt.Errorf("error when searching metricName for metricID=%d: %w", metricID, err)
}
mns = mns[:len(mns)+1]
mn := &mns[len(mns)-1]
if err = mn.Unmarshal(metricName); err != nil {
return nil, fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
}
}
return mns, nil
}
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
// Do not cache tfss -> tsids here, since the caching is performed