app/vmselect/graphite: add /tags/findSeries handler from Graphite Tags API

See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags
This commit is contained in:
Aliaksandr Valialkin 2020-11-16 10:55:55 +02:00
parent 97100b1d42
commit eea1be0d5c
11 changed files with 468 additions and 17 deletions

View File

@ -207,6 +207,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json). - `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json).
- `tags` - returns tag names. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags). - `tags` - returns tag names. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/<tag_name>` - returns tag values for the given `<tag_name>`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags). - `tags/<tag_name>` - returns tag values for the given `<tag_name>`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/findSeries` - returns series matching the given `expr`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
* URL for time series deletion: `http://<vmselect>:8481/delete/<accountID>/prometheus/api/v1/admin/tsdb/delete_series?match[]=<timeseries_selector_for_delete>`. * URL for time series deletion: `http://<vmselect>:8481/delete/<accountID>/prometheus/api/v1/admin/tsdb/delete_series?match[]=<timeseries_selector_for_delete>`.
Note that the `delete_series` handler should be used only in exceptional cases such as deletion of accidentally ingested incorrect time series. It shouldn't Note that the `delete_series` handler should be used only in exceptional cases such as deletion of accidentally ingested incorrect time series. It shouldn't

View File

@ -3,16 +3,98 @@ package graphite
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
// TagsFindSeriesHandler implements /tags/findSeries endpoint from Graphite Tags API.
//
// See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags
func TagsFindSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := searchutils.GetDeadlineForQuery(r, startTime)
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
limit, err := getInt(r, "limit")
if err != nil {
return err
}
exprs := r.Form["expr"]
if len(exprs) == 0 {
return fmt.Errorf("expecting at least one `expr` query arg")
}
// Convert exprs to []storage.TagFilter
tfs := make([]storage.TagFilter, 0, len(exprs))
for _, expr := range exprs {
tf, err := parseFilterExpr(expr)
if err != nil {
return fmt.Errorf("cannot parse `expr` query arg: %w", err)
}
tfs = append(tfs, *tf)
}
// Send the request to storage
ct := time.Now().UnixNano() / 1e6
sq := &storage.SearchQuery{
MinTimestamp: 0,
MaxTimestamp: ct,
TagFilterss: [][]storage.TagFilter{tfs},
}
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
mns, isPartial, err := netstorage.SearchMetricNames(at, denyPartialResponse, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
}
paths := getCanonicalPaths(mns)
if limit > 0 && limit < len(paths) {
paths = paths[:limit]
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteTagsFindSeriesResponse(bw, isPartial, paths)
if err := bw.Flush(); err != nil {
return err
}
tagsFindSeriesDuration.UpdateDuration(startTime)
return nil
}
func getCanonicalPaths(mns []storage.MetricName) []string {
paths := make([]string, 0, len(mns))
var b []byte
var tags []storage.Tag
for _, mn := range mns {
b = append(b[:0], mn.MetricGroup...)
tags = append(tags[:0], mn.Tags...)
sort.Slice(tags, func(i, j int) bool {
return string(tags[i].Key) < string(tags[j].Key)
})
for _, tag := range tags {
b = append(b, ';')
b = append(b, tag.Key...)
b = append(b, '=')
b = append(b, tag.Value...)
}
paths = append(paths, string(b))
}
sort.Strings(paths)
return paths
}
var tagsFindSeriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/tags/findSeries"}`)
// TagValuesHandler implements /tags/<tag_name> endpoint from Graphite Tags API. // TagValuesHandler implements /tags/<tag_name> endpoint from Graphite Tags API.
// //
// See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags // See https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags
@ -88,3 +170,31 @@ func getInt(r *http.Request, argName string) (int, error) {
} }
return n, nil return n, nil
} }
func parseFilterExpr(s string) (*storage.TagFilter, error) {
n := strings.Index(s, "=")
if n < 0 {
return nil, fmt.Errorf("missing tag value in filter expression %q", s)
}
tagName := s[:n]
tagValue := s[n+1:]
isNegative := false
if strings.HasSuffix(tagName, "!") {
isNegative = true
tagName = tagName[:len(tagName)-1]
}
if tagName == "name" {
tagName = ""
}
isRegexp := false
if strings.HasPrefix(tagValue, "~") {
isRegexp = true
tagValue = "^(?:" + tagValue[1:] + ").*"
}
return &storage.TagFilter{
Key: []byte(tagName),
Value: []byte(tagValue),
IsNegative: isNegative,
IsRegexp: isRegexp,
}, nil
}

View File

@ -0,0 +1,12 @@
{% stripspace %}
{% func TagsFindSeriesResponse(isPartial bool, paths []string) %}
[
{% for i, path := range paths %}
{%q= path %}
{% if i+1 < len(paths) %},{% endif %}
{% endfor %}
]
{% endfunc %}
{% endstripspace %}

View File

@ -0,0 +1,65 @@
// Code generated by qtc from "tags_find_series_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmselect/graphite/tags_find_series_response.qtpl:3
package graphite
//line app/vmselect/graphite/tags_find_series_response.qtpl:3
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/graphite/tags_find_series_response.qtpl:3
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/graphite/tags_find_series_response.qtpl:3
func StreamTagsFindSeriesResponse(qw422016 *qt422016.Writer, isPartial bool, paths []string) {
//line app/vmselect/graphite/tags_find_series_response.qtpl:3
qw422016.N().S(`[`)
//line app/vmselect/graphite/tags_find_series_response.qtpl:5
for i, path := range paths {
//line app/vmselect/graphite/tags_find_series_response.qtpl:6
qw422016.N().Q(path)
//line app/vmselect/graphite/tags_find_series_response.qtpl:7
if i+1 < len(paths) {
//line app/vmselect/graphite/tags_find_series_response.qtpl:7
qw422016.N().S(`,`)
//line app/vmselect/graphite/tags_find_series_response.qtpl:7
}
//line app/vmselect/graphite/tags_find_series_response.qtpl:8
}
//line app/vmselect/graphite/tags_find_series_response.qtpl:8
qw422016.N().S(`]`)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
}
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
func WriteTagsFindSeriesResponse(qq422016 qtio422016.Writer, isPartial bool, paths []string) {
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
StreamTagsFindSeriesResponse(qw422016, isPartial, paths)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
}
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
func TagsFindSeriesResponse(isPartial bool, paths []string) string {
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
WriteTagsFindSeriesResponse(qb422016, isPartial, paths)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
qs422016 := string(qb422016.B)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
return qs422016
//line app/vmselect/graphite/tags_find_series_response.qtpl:10
}

View File

@ -209,7 +209,7 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request,
return true return true
} }
} }
if strings.HasPrefix(p.Suffix, "graphite/tags/") { if strings.HasPrefix(p.Suffix, "graphite/tags/") && p.Suffix != "graphite/tags/findSeries" {
tagName := p.Suffix[len("graphite/tags/"):] tagName := p.Suffix[len("graphite/tags/"):]
graphiteTagValuesRequests.Inc() graphiteTagValuesRequests.Inc()
if err := graphite.TagValuesHandler(startTime, at, tagName, w, r); err != nil { if err := graphite.TagValuesHandler(startTime, at, tagName, w, r); err != nil {
@ -354,6 +354,14 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request,
return true return true
} }
return true return true
case "graphite/tags/findSeries":
graphiteTagsFindSeriesRequests.Inc()
if err := graphite.TagsFindSeriesHandler(startTime, at, w, r); err != nil {
graphiteTagsFindSeriesErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "prometheus/api/v1/rules": case "prometheus/api/v1/rules":
// Return dumb placeholder // Return dumb placeholder
rulesRequests.Inc() rulesRequests.Inc()
@ -463,6 +471,9 @@ var (
graphiteTagValuesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/tags/<tag_name>"}`) graphiteTagValuesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/tags/<tag_name>"}`)
graphiteTagValuesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/tags/<tag_name>"}`) graphiteTagValuesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/tags/<tag_name>"}`)
graphiteTagsFindSeriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/graphite/tags/findSeries"}`)
graphiteTagsFindSeriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/graphite/tags/findSeries"}`)
rulesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/rules"}`) rulesRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/rules"}`)
alertsRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/alerts"}`) alertsRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/alerts"}`)
metadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/metadata"}`) metadataRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/metadata"}`)

View File

@ -1151,7 +1151,6 @@ func GetSeriesCount(at *auth.Token, denyPartialResponse bool, deadline searchuti
} }
isPartial = true isPartial = true
} }
return n, isPartial, nil return n, isPartial, nil
} }
@ -1237,6 +1236,81 @@ func ExportBlocks(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.
return nil return nil
} }
// SearchMetricNames returns all the metric names matching sq until the given deadline.
func SearchMetricNames(at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) ([]storage.MetricName, bool, error) {
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting to search metric names: %s", deadline.String())
}
requestData := sq.Marshal(nil)
// Send the query to all the storage nodes in parallel.
type nodeResult struct {
metricNames [][]byte
err error
}
resultsCh := make(chan nodeResult, len(storageNodes))
for _, sn := range storageNodes {
go func(sn *storageNode) {
sn.searchMetricNamesRequests.Inc()
metricNames, err := sn.processSearchMetricNames(requestData, deadline)
if err != nil {
sn.searchMetricNamesRequestErrors.Inc()
err = fmt.Errorf("cannot search metric names on vmstorage %s: %w", sn.connPool.Addr(), err)
}
resultsCh <- nodeResult{
metricNames: metricNames,
err: err,
}
}(sn)
}
// Collect results.
var errors []error
metricNames := make(map[string]struct{})
for i := 0; i < len(storageNodes); i++ {
// There is no need in timer here, since all the goroutines executing
// sn.processSearchQuery must be finished until the deadline.
nr := <-resultsCh
if nr.err != nil {
errors = append(errors, nr.err)
continue
}
for _, metricName := range nr.metricNames {
metricNames[string(metricName)] = struct{}{}
}
}
isPartial := false
if len(errors) > 0 {
if len(errors) == len(storageNodes) {
// Return only the first error, since it has no sense in returning all errors.
return nil, false, errors[0]
}
// Just return partial results.
// This allows gracefully degrade vmselect in the case
// if certain storageNodes are temporarily unavailable.
// Do not return the error, since it may spam logs on busy vmselect
// serving high amounts of requests.
partialSearchResults.Inc()
if denyPartialResponse {
return nil, true, errors[0]
}
isPartial = true
}
// Unmarshal metricNames
mns := make([]storage.MetricName, len(metricNames))
i := 0
for metricName := range metricNames {
mn := &mns[i]
if err := mn.Unmarshal(bytesutil.ToUnsafeBytes(metricName)); err != nil {
return nil, false, fmt.Errorf("cannot unmarshal metric name obtained from vmstorage: %w; metricName=%q", err, metricName)
}
i++
}
return mns, isPartial, nil
}
// ProcessSearchQuery performs sq until the given deadline. // ProcessSearchQuery performs sq until the given deadline.
// //
// Results.RunParallel or Results.Cancel must be called on the returned Results. // Results.RunParallel or Results.Cancel must be called on the returned Results.
@ -1399,9 +1473,15 @@ type storageNode struct {
// The number of errors during requests to seriesCount. // The number of errors during requests to seriesCount.
seriesCountRequestErrors *metrics.Counter seriesCountRequestErrors *metrics.Counter
// The number of 'search metric names' requests to storageNode.
searchMetricNamesRequests *metrics.Counter
// The number of search requests to storageNode. // The number of search requests to storageNode.
searchRequests *metrics.Counter searchRequests *metrics.Counter
// The number of 'search metric names' errors to storageNode.
searchMetricNamesRequestErrors *metrics.Counter
// The number of search request errors to storageNode. // The number of search request errors to storageNode.
searchRequestErrors *metrics.Counter searchRequestErrors *metrics.Counter
@ -1593,6 +1673,25 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline sear
return n, nil return n, nil
} }
func (sn *storageNode) processSearchMetricNames(requestData []byte, deadline searchutils.Deadline) ([][]byte, error) {
var metricNames [][]byte
f := func(bc *handshake.BufferedConn) error {
mns, err := sn.processSearchMetricNamesOnConn(bc, requestData)
if err != nil {
return err
}
metricNames = mns
return nil
}
if err := sn.execOnConn("searchMetricNames_v1", f, deadline); err != nil {
// Try again before giving up.
if err = sn.execOnConn("searchMetricNames_v1", f, deadline); err != nil {
return nil, err
}
}
return metricNames, nil
}
func (sn *storageNode) processSearchQuery(requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error { func (sn *storageNode) processSearchQuery(requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error {
var blocksRead int var blocksRead int
f := func(bc *handshake.BufferedConn) error { f := func(bc *handshake.BufferedConn) error {
@ -2078,6 +2177,42 @@ const maxMetricBlockSize = 1024 * 1024
// from vmstorage. // from vmstorage.
const maxErrorMessageSize = 64 * 1024 const maxErrorMessageSize = 64 * 1024
func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn, requestData []byte) ([][]byte, error) {
// Send the requst to sn.
if err := writeBytes(bc, requestData); err != nil {
return nil, fmt.Errorf("cannot write requestData: %w", err)
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush requestData to conn: %w", err)
}
// Read response error.
buf, err := readBytes(nil, bc, maxErrorMessageSize)
if err != nil {
return nil, fmt.Errorf("cannot read error message: %w", err)
}
if len(buf) > 0 {
return nil, newErrRemote(buf)
}
// Read metricNames from response.
metricNamesCount, err := readUint64(bc)
if err != nil {
return nil, fmt.Errorf("cannot read metricNamesCount: %w", err)
}
metricNames := make([][]byte, 0, metricNamesCount)
for i := int64(0); i < int64(metricNamesCount); i++ {
buf, err = readBytes(buf[:0], bc, maxMetricNameSize)
if err != nil {
return nil, fmt.Errorf("cannot read metricName #%d: %w", i+1, err)
}
metricNames[i] = append(metricNames[i][:0], buf...)
}
return metricNames, nil
}
const maxMetricNameSize = 64 * 1024
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error) (int, error) { func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error) (int, error) {
// Send the request to sn. // Send the request to sn.
if err := writeBytes(bc, requestData); err != nil { if err := writeBytes(bc, requestData); err != nil {
@ -2090,11 +2225,8 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ
return 0, fmt.Errorf("cannot flush requestData to conn: %w", err) return 0, fmt.Errorf("cannot flush requestData to conn: %w", err)
} }
var err error
var buf []byte
// Read response error. // Read response error.
buf, err = readBytes(buf[:0], bc, maxErrorMessageSize) buf, err := readBytes(nil, bc, maxErrorMessageSize)
if err != nil { if err != nil {
return 0, fmt.Errorf("cannot read error message: %w", err) return 0, fmt.Errorf("cannot read error message: %w", err)
} }
@ -2248,7 +2380,9 @@ func InitStorageNodes(addrs []string) {
tsdbStatusRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)), tsdbStatusRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)), seriesCountRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)), seriesCountRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)), searchRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)), searchRequestErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)), metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)), metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),

View File

@ -952,6 +952,34 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
TagFilterss: tagFilterss, TagFilterss: tagFilterss,
} }
denyPartialResponse := searchutils.GetDenyPartialResponse(r) denyPartialResponse := searchutils.GetDenyPartialResponse(r)
if end-start > 24*3600*1000 {
// It is cheaper to call SearchMetricNames on time ranges exceeding a day.
mns, isPartial, err := netstorage.SearchMetricNames(at, denyPartialResponse, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch time series for %q: %w", sq, err)
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer)
doneCh := make(chan struct{})
go func() {
for i := range mns {
bb := quicktemplate.AcquireByteBuffer()
writemetricNameObject(bb, &mns[i])
resultsCh <- bb
}
close(doneCh)
}()
// WriteSeriesResponse must consume all the data from resultsCh.
WriteSeriesResponse(bw, isPartial, resultsCh)
if err := bw.Flush(); err != nil {
return err
}
<-doneCh
seriesDuration.UpdateDuration(startTime)
return nil
}
rss, isPartial, err := netstorage.ProcessSearchQuery(at, denyPartialResponse, sq, false, deadline) rss, isPartial, err := netstorage.ProcessSearchQuery(at, denyPartialResponse, sq, false, deadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err) return fmt.Errorf("cannot fetch data for %q: %w", sq, err)

View File

@ -538,6 +538,20 @@ func (ctx *vmselectRequestCtx) readAccountIDProjectID() (uint32, uint32, error)
return accountID, projectID, nil return accountID, projectID, nil
} }
func (ctx *vmselectRequestCtx) readSearchQuery() error {
if err := ctx.readDataBufBytes(maxSearchQuerySize); err != nil {
return fmt.Errorf("cannot read searchQuery: %w", err)
}
tail, err := ctx.sq.Unmarshal(ctx.dataBuf)
if err != nil {
return fmt.Errorf("cannot unmarshal SearchQuery: %w", err)
}
if len(tail) > 0 {
return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail)
}
return nil
}
func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error { func (ctx *vmselectRequestCtx) readDataBufBytes(maxDataSize int) error {
ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 8) ctx.sizeBuf = bytesutil.Resize(ctx.sizeBuf, 8)
if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil { if _, err := io.ReadFull(ctx.bc, ctx.sizeBuf); err != nil {
@ -663,7 +677,9 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
switch rpcName { switch rpcName {
case "search_v4": case "search_v4":
return s.processVMSelectSearchQuery(ctx) return s.processVMSelectSearch(ctx)
case "searchMetricNames_v1":
return s.processVMSelectSearchMetricNames(ctx)
case "labelValuesOnTimeRange_v1": case "labelValuesOnTimeRange_v1":
return s.processVMSelectLabelValuesOnTimeRange(ctx) return s.processVMSelectLabelValuesOnTimeRange(ctx)
case "labelValues_v2": case "labelValues_v2":
@ -1061,19 +1077,52 @@ func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) erro
// maxSearchQuerySize is the maximum size of SearchQuery packet in bytes. // maxSearchQuerySize is the maximum size of SearchQuery packet in bytes.
const maxSearchQuerySize = 1024 * 1024 const maxSearchQuerySize = 1024 * 1024
func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error { func (s *Server) processVMSelectSearchMetricNames(ctx *vmselectRequestCtx) error {
vmselectSearchQueryRequests.Inc() vmselectSearchMetricNamesRequests.Inc()
// Read search query. // Read request.
if err := ctx.readDataBufBytes(maxSearchQuerySize); err != nil { if err := ctx.readSearchQuery(); err != nil {
return fmt.Errorf("cannot read searchQuery: %w", err) return err
} }
tail, err := ctx.sq.Unmarshal(ctx.dataBuf)
// Search metric names.
if err := ctx.setupTfss(); err != nil {
return ctx.writeErrorMessage(err)
}
tr := storage.TimeRange{
MinTimestamp: ctx.sq.MinTimestamp,
MaxTimestamp: ctx.sq.MaxTimestamp,
}
mns, err := s.storage.SearchMetricNames(ctx.sq.AccountID, ctx.sq.ProjectID, ctx.tfss, tr, *maxMetricsPerSearch, ctx.deadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot unmarshal SearchQuery: %w", err) return ctx.writeErrorMessage(err)
} }
if len(tail) > 0 {
return fmt.Errorf("unexpected non-zero tail left after unmarshaling SearchQuery: (len=%d) %q", len(tail), tail) // Send empty error message to vmselect.
if err := ctx.writeString(""); err != nil {
return fmt.Errorf("cannot send empty error message: %w", err)
}
// Send response.
metricNamesCount := len(mns)
if err := ctx.writeUint64(uint64(metricNamesCount)); err != nil {
return fmt.Errorf("cannot send metricNamesCount: %w", err)
}
for i, mn := range mns {
ctx.dataBuf = mn.Marshal(ctx.dataBuf[:0])
if err := ctx.writeDataBufBytes(); err != nil {
return fmt.Errorf("cannot send metricName #%d: %w", i+1, err)
}
}
return nil
}
func (s *Server) processVMSelectSearch(ctx *vmselectRequestCtx) error {
vmselectSearchRequests.Inc()
// Read request.
if err := ctx.readSearchQuery(); err != nil {
return err
} }
fetchData, err := ctx.readBool() fetchData, err := ctx.readBool()
if err != nil { if err != nil {
@ -1153,7 +1202,8 @@ var (
vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total") vmselectLabelEntriesRequests = metrics.NewCounter("vm_vmselect_label_entries_requests_total")
vmselectSeriesCountRequests = metrics.NewCounter("vm_vmselect_series_count_requests_total") vmselectSeriesCountRequests = metrics.NewCounter("vm_vmselect_series_count_requests_total")
vmselectTSDBStatusRequests = metrics.NewCounter("vm_vmselect_tsdb_status_requests_total") vmselectTSDBStatusRequests = metrics.NewCounter("vm_vmselect_tsdb_status_requests_total")
vmselectSearchQueryRequests = metrics.NewCounter("vm_vmselect_search_query_requests_total") vmselectSearchMetricNamesRequests = metrics.NewCounter("vm_vmselect_search_metric_names_requests_total")
vmselectSearchRequests = metrics.NewCounter("vm_vmselect_search_requests_total")
vmselectMetricBlocksRead = metrics.NewCounter("vm_vmselect_metric_blocks_read_total") vmselectMetricBlocksRead = metrics.NewCounter("vm_vmselect_metric_blocks_read_total")
vmselectMetricRowsRead = metrics.NewCounter("vm_vmselect_metric_rows_read_total") vmselectMetricRowsRead = metrics.NewCounter("vm_vmselect_metric_rows_read_total")
) )

View File

@ -207,6 +207,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json). - `metrics/index.json` - returns all the metric names. See [these docs](https://graphite-api.readthedocs.io/en/latest/api.html#metrics-index-json).
- `tags` - returns tag names. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags). - `tags` - returns tag names. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/<tag_name>` - returns tag values for the given `<tag_name>`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags). - `tags/<tag_name>` - returns tag values for the given `<tag_name>`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
- `tags/findSeries` - returns series matching the given `expr`. See [these docs](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags).
* URL for time series deletion: `http://<vmselect>:8481/delete/<accountID>/prometheus/api/v1/admin/tsdb/delete_series?match[]=<timeseries_selector_for_delete>`. * URL for time series deletion: `http://<vmselect>:8481/delete/<accountID>/prometheus/api/v1/admin/tsdb/delete_series?match[]=<timeseries_selector_for_delete>`.
Note that the `delete_series` handler should be used only in exceptional cases such as deletion of accidentally ingested incorrect time series. It shouldn't Note that the `delete_series` handler should be used only in exceptional cases such as deletion of accidentally ingested incorrect time series. It shouldn't

View File

@ -550,6 +550,7 @@ VictoriaMetrics supports the following handlers from [Graphite Tags API](https:/
* [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb) * [/tags/tagMultiSeries](https://graphite.readthedocs.io/en/stable/tags.html#adding-series-to-the-tagdb)
* [/tags](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) * [/tags](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
* [/tags/tag_name](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags) * [/tags/tag_name](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
* [/tags/findSeries](https://graphite.readthedocs.io/en/stable/tags.html#exploring-tags)
### How to build from sources ### How to build from sources

View File

@ -859,6 +859,44 @@ func nextRetentionDuration(retentionMonths int) time.Duration {
return deadline.Sub(t) return deadline.Sub(t)
} }
// SearchMetricNames returns metric names matching the given tfss on the given tr.
func (s *Storage) SearchMetricNames(accountID, projectID uint32, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]MetricName, error) {
tsids, err := s.searchTSIDs(tfss, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
if len(tsids) == 0 {
return nil, nil
}
if err = s.prefetchMetricNames(tsids, deadline); err != nil {
return nil, err
}
idb := s.idb()
is := idb.getIndexSearch(accountID, projectID, deadline)
defer idb.putIndexSearch(is)
mns := make([]MetricName, 0, len(tsids))
var metricName []byte
for i := range tsids {
metricID := tsids[i].MetricID
var err error
metricName, err = is.searchMetricName(metricName[:0], metricID)
if err != nil {
if err == io.EOF {
// Skip missing metricName for metricID.
// It should be automatically fixed. See indexDB.searchMetricName for details.
continue
}
return nil, fmt.Errorf("error when searching metricName for metricID=%d: %w", metricID, err)
}
mns = mns[:len(mns)+1]
mn := &mns[len(mns)-1]
if err = mn.Unmarshal(metricName); err != nil {
return nil, fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
}
}
return mns, nil
}
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr. // searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) { func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
// Do not cache tfss -> tsids here, since the caching is performed // Do not cache tfss -> tsids here, since the caching is performed