app/vmselect: optimize /api/v1/labels and /api/v1/label/.../values handlers when match[] query arg is passed to them

This commit is contained in:
Aliaksandr Valialkin 2022-06-12 04:32:13 +03:00
parent 89b778902b
commit 374beb350e
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
15 changed files with 443 additions and 918 deletions

View File

@ -275,6 +275,8 @@ By default cardinality explorer analyzes time series for the current date. It pr
By default all the time series for the selected date are analyzed. It is possible to narrow down the analysis to series By default all the time series for the selected date are analyzed. It is possible to narrow down the analysis to series
matching the specified [series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors). matching the specified [series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors).
Cardinality explorer takes into account [deleted time series](#how-to-delete-time-series), because they stay in the inverted index for up to [-retentionPeriod](#retention). This means that the deleted time series take RAM, CPU, disk IO and disk space for the inverted index in the same way as other time series.
Cardinality explorer is built on top of [/api/v1/status/tsdb](#tsdb-stats). Cardinality explorer is built on top of [/api/v1/status/tsdb](#tsdb-stats).
See [cardinality explorer playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/#/cardinality). See [cardinality explorer playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/#/cardinality).
@ -617,7 +619,6 @@ Additionally, VictoriaMetrics provides the following handlers:
* `/api/v1/series/count` - returns the total number of time series in the database. Some notes: * `/api/v1/series/count` - returns the total number of time series in the database. Some notes:
* the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series; * the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series;
* the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions; * the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions;
* `/api/v1/labels/count` - returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values.
* `/api/v1/status/active_queries` - returns a list of currently running queries. * `/api/v1/status/active_queries` - returns a list of currently running queries.
* `/api/v1/status/top_queries` - returns the following query lists: * `/api/v1/status/top_queries` - returns the following query lists:
* the most frequently executed queries - `topByCount` * the most frequently executed queries - `topByCount`
@ -1245,11 +1246,12 @@ and [clustered VictoriaMetrics](https://grafana.com/grafana/dashboards/11176) Gr
See more details in [monitoring docs](#monitoring). See more details in [monitoring docs](#monitoring).
The `merge` process is usually named "compaction", because the resulting `part` size is usually smaller than The `merge` process is usually named "compaction", because the resulting `part` size is usually smaller than
the sum of the source `parts`. There are following benefits of doing the merge process: the sum of the source `parts` because of better compression rate. The merge process provides the following additional benefits:
* it improves query performance, since lower number of `parts` are inspected with each query; * it improves query performance, since lower number of `parts` are inspected with each query
* it reduces the number of data files, since each `part`contains fixed number of files; * it reduces the number of data files, since each `part` contains fixed number of files
* better compression rate for the resulting part. * various background maintenance tasks such as [de-duplication](#deduplication), [downsampling](#downsampling)
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are perfomed during the merge.
Newly added `parts` either appear in the storage or fail to appear. Newly added `parts` either appear in the storage or fail to appear.
Storage never contains partially created parts. The same applies to merge process — `parts` are either fully Storage never contains partially created parts. The same applies to merge process — `parts` are either fully
@ -1411,7 +1413,7 @@ See the example of alerting rules for VM components [here](https://github.com/Vi
VictoriaMetrics returns TSDB stats at `/api/v1/status/tsdb` page in the way similar to Prometheus - see [these Prometheus docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). VictoriaMetrics accepts the following optional query args at `/api/v1/status/tsdb` page: VictoriaMetrics returns TSDB stats at `/api/v1/status/tsdb` page in the way similar to Prometheus - see [these Prometheus docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). VictoriaMetrics accepts the following optional query args at `/api/v1/status/tsdb` page:
* `topN=N` where `N` is the number of top entries to return in the response. By default top 10 entries are returned. * `topN=N` where `N` is the number of top entries to return in the response. By default top 10 entries are returned.
* `date=YYYY-MM-DD` where `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day. * `date=YYYY-MM-DD` where `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day. Pass `date=1970-01-01` in order to collect global stats across all the days.
* `match[]=SELECTOR` where `SELECTOR` is an arbitrary [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for series to take into account during stats calculation. By default all the series are taken into account. * `match[]=SELECTOR` where `SELECTOR` is an arbitrary [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for series to take into account during stats calculation. By default all the series are taken into account.
* `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details. * `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details.

View File

@ -197,7 +197,8 @@ func MetricsExpandHandler(startTime time.Time, w http.ResponseWriter, r *http.Re
func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
deadline := searchutils.GetDeadlineForQuery(r, startTime) deadline := searchutils.GetDeadlineForQuery(r, startTime)
jsonp := r.FormValue("jsonp") jsonp := r.FormValue("jsonp")
metricNames, err := netstorage.GetLabelValues(nil, "__name__", 0, deadline) sq := storage.NewSearchQuery(0, 0, nil, 0)
metricNames, err := netstorage.GetLabelValues(nil, "__name__", sq, 0, deadline)
if err != nil { if err != nil {
return fmt.Errorf(`cannot obtain metric names: %w`, err) return fmt.Errorf(`cannot obtain metric names: %w`, err)
} }

View File

@ -257,15 +257,6 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true return true
} }
return true return true
case "/api/v1/labels/count":
labelsCountRequests.Inc()
httpserver.EnableCORS(w, r)
if err := prometheus.LabelsCountHandler(startTime, w, r); err != nil {
labelsCountErrors.Inc()
sendPrometheusError(w, r, err)
return true
}
return true
case "/api/v1/status/tsdb": case "/api/v1/status/tsdb":
statusTSDBRequests.Inc() statusTSDBRequests.Inc()
httpserver.EnableCORS(w, r) httpserver.EnableCORS(w, r)
@ -502,9 +493,6 @@ var (
labelsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/labels"}`) labelsRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/labels"}`)
labelsErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/labels"}`) labelsErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/labels"}`)
labelsCountRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/labels/count"}`)
labelsCountErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/labels/count"}`)
statusTSDBRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/status/tsdb"}`) statusTSDBRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/status/tsdb"}`)
statusTSDBErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/status/tsdb"}`) statusTSDBErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/status/tsdb"}`)

View File

@ -611,27 +611,28 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
return vmstorage.DeleteMetrics(tfss) return vmstorage.DeleteMetrics(tfss)
} }
// GetLabelsOnTimeRange returns labels for the given tr until the given deadline. // GetLabelNames returns label names matching the given sq until the given deadline.
func GetLabelsOnTimeRange(qt *querytracer.Tracer, tr storage.TimeRange, limit int, deadline searchutils.Deadline) ([]string, error) { func GetLabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline searchutils.Deadline) ([]string, error) {
qt = qt.NewChild("get labels on timeRange=%s", &tr) qt = qt.NewChild("get labels: %s", sq)
defer qt.Done() defer qt.Done()
if deadline.Exceeded() { if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
} }
if limit > *maxTagKeysPerSearch || limit <= 0 { if maxLabelNames > *maxTagKeysPerSearch || maxLabelNames <= 0 {
limit = *maxTagKeysPerSearch maxLabelNames = *maxTagKeysPerSearch
} }
labels, err := vmstorage.SearchTagKeysOnTimeRange(tr, limit, deadline.Deadline()) tr := storage.TimeRange{
qt.Printf("get %d labels", len(labels)) MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil {
return nil, err
}
labels, err := vmstorage.SearchLabelNamesWithFiltersOnTimeRange(qt, tfss, tr, maxLabelNames, sq.MaxMetrics, deadline.Deadline())
if err != nil { if err != nil {
return nil, fmt.Errorf("error during labels search on time range: %w", err) return nil, fmt.Errorf("error during labels search on time range: %w", err)
} }
// Substitute "" with "__name__"
for i := range labels {
if labels[i] == "" {
labels[i] = "__name__"
}
}
// Sort labels like Prometheus does // Sort labels like Prometheus does
sort.Strings(labels) sort.Strings(labels)
qt.Printf("sort %d labels", len(labels)) qt.Printf("sort %d labels", len(labels))
@ -645,7 +646,8 @@ func GetGraphiteTags(qt *querytracer.Tracer, filter string, limit int, deadline
if deadline.Exceeded() { if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
} }
labels, err := GetLabels(nil, 0, deadline) sq := storage.NewSearchQuery(0, 0, nil, 0)
labels, err := GetLabelNames(qt, sq, 0, deadline)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -685,50 +687,25 @@ func hasString(a []string, s string) bool {
return false return false
} }
// GetLabels returns labels until the given deadline. // GetLabelValues returns label values matching the given labelName and sq until the given deadline.
func GetLabels(qt *querytracer.Tracer, limit int, deadline searchutils.Deadline) ([]string, error) { func GetLabelValues(qt *querytracer.Tracer, labelName string, sq *storage.SearchQuery, maxLabelValues int, deadline searchutils.Deadline) ([]string, error) {
qt = qt.NewChild("get labels") qt = qt.NewChild("get values for label %s: %s", labelName, sq)
defer qt.Done() defer qt.Done()
if deadline.Exceeded() { if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
} }
if limit > *maxTagKeysPerSearch || limit <= 0 { if maxLabelValues > *maxTagValuesPerSearch || maxLabelValues <= 0 {
limit = *maxTagKeysPerSearch maxLabelValues = *maxTagValuesPerSearch
} }
labels, err := vmstorage.SearchTagKeys(limit, deadline.Deadline()) tr := storage.TimeRange{
qt.Printf("get %d labels from global index", len(labels)) MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
tfss, err := setupTfss(tr, sq.TagFilterss, sq.MaxMetrics, deadline)
if err != nil { if err != nil {
return nil, fmt.Errorf("error during labels search: %w", err) return nil, err
} }
// Substitute "" with "__name__" labelValues, err := vmstorage.SearchLabelValuesWithFiltersOnTimeRange(qt, labelName, tfss, tr, maxLabelValues, sq.MaxMetrics, deadline.Deadline())
for i := range labels {
if labels[i] == "" {
labels[i] = "__name__"
}
}
// Sort labels like Prometheus does
sort.Strings(labels)
qt.Printf("sort %d labels", len(labels))
return labels, nil
}
// GetLabelValuesOnTimeRange returns label values for the given labelName on the given tr
// until the given deadline.
func GetLabelValuesOnTimeRange(qt *querytracer.Tracer, labelName string, tr storage.TimeRange, limit int, deadline searchutils.Deadline) ([]string, error) {
qt = qt.NewChild("get values for label %s on a timeRange %s", labelName, &tr)
defer qt.Done()
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
if labelName == "__name__" {
labelName = ""
}
// Search for tag values
if limit > *maxTagValuesPerSearch || limit <= 0 {
limit = *maxTagValuesPerSearch
}
labelValues, err := vmstorage.SearchTagValuesOnTimeRange([]byte(labelName), tr, limit, deadline.Deadline())
qt.Printf("get %d label values", len(labelValues))
if err != nil { if err != nil {
return nil, fmt.Errorf("error during label values search on time range for labelName=%q: %w", labelName, err) return nil, fmt.Errorf("error during label values search on time range for labelName=%q: %w", labelName, err)
} }
@ -748,7 +725,8 @@ func GetGraphiteTagValues(qt *querytracer.Tracer, tagName, filter string, limit
if tagName == "name" { if tagName == "name" {
tagName = "" tagName = ""
} }
tagValues, err := GetLabelValues(nil, tagName, 0, deadline) sq := storage.NewSearchQuery(0, 0, nil, 0)
tagValues, err := GetLabelValues(qt, tagName, sq, 0, deadline)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -764,32 +742,6 @@ func GetGraphiteTagValues(qt *querytracer.Tracer, tagName, filter string, limit
return tagValues, nil return tagValues, nil
} }
// GetLabelValues returns label values for the given labelName
// until the given deadline.
func GetLabelValues(qt *querytracer.Tracer, labelName string, limit int, deadline searchutils.Deadline) ([]string, error) {
qt = qt.NewChild("get values for label %s", labelName)
defer qt.Done()
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
if labelName == "__name__" {
labelName = ""
}
// Search for tag values
if limit > *maxTagValuesPerSearch || limit <= 0 {
limit = *maxTagValuesPerSearch
}
labelValues, err := vmstorage.SearchTagValues([]byte(labelName), limit, deadline.Deadline())
qt.Printf("get %d label values", len(labelValues))
if err != nil {
return nil, fmt.Errorf("error during label values search for labelName=%q: %w", labelName, err)
}
// Sort labelValues like Prometheus does
sort.Strings(labelValues)
qt.Printf("sort %d label values", len(labelValues))
return labelValues, nil
}
// GetTagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix. // GetTagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix.
// //
// It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find // It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
@ -812,40 +764,6 @@ func GetTagValueSuffixes(qt *querytracer.Tracer, tr storage.TimeRange, tagKey, t
return suffixes, nil return suffixes, nil
} }
// GetLabelEntries returns all the label entries until the given deadline.
func GetLabelEntries(qt *querytracer.Tracer, deadline searchutils.Deadline) ([]storage.TagEntry, error) {
qt = qt.NewChild("get label entries")
defer qt.Done()
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
labelEntries, err := vmstorage.SearchTagEntries(*maxTagKeysPerSearch, *maxTagValuesPerSearch, deadline.Deadline())
if err != nil {
return nil, fmt.Errorf("error during label entries request: %w", err)
}
qt.Printf("get %d label entries", len(labelEntries))
// Substitute "" with "__name__"
for i := range labelEntries {
e := &labelEntries[i]
if e.Key == "" {
e.Key = "__name__"
}
}
// Sort labelEntries by the number of label values in each entry.
sort.Slice(labelEntries, func(i, j int) bool {
a, b := labelEntries[i].Values, labelEntries[j].Values
if len(a) != len(b) {
return len(a) > len(b)
}
return labelEntries[i].Key > labelEntries[j].Key
})
qt.Printf("sort %d label entries", len(labelEntries))
return labelEntries, nil
}
// GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats // GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
func GetTSDBStatusForDate(qt *querytracer.Tracer, deadline searchutils.Deadline, date uint64, topN, maxMetrics int) (*storage.TSDBStatus, error) { func GetTSDBStatusForDate(qt *querytracer.Tracer, deadline searchutils.Deadline, date uint64, topN, maxMetrics int) (*storage.TSDBStatus, error) {
qt = qt.NewChild("get tsdb stats for date=%d, topN=%d", date, topN) qt = qt.NewChild("get tsdb stats for date=%d, topN=%d", date, topN)

View File

@ -1,17 +0,0 @@
{% import "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" %}
{% stripspace %}
LabelsCountResponse generates response for /api/v1/labels/count .
{% func LabelsCountResponse(labelEntries []storage.TagEntry) %}
{
"status":"success",
"data":{
{% for i, e := range labelEntries %}
{%q= e.Key %}:{%d= len(e.Values) %}
{% if i+1 < len(labelEntries) %},{% endif %}
{% endfor %}
}
}
{% endfunc %}
{% endstripspace %}

View File

@ -1,74 +0,0 @@
// Code generated by qtc from "labels_count_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmselect/prometheus/labels_count_response.qtpl:1
package prometheus
//line app/vmselect/prometheus/labels_count_response.qtpl:1
import "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
// LabelsCountResponse generates response for /api/v1/labels/count .
//line app/vmselect/prometheus/labels_count_response.qtpl:5
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/labels_count_response.qtpl:5
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/prometheus/labels_count_response.qtpl:5
func StreamLabelsCountResponse(qw422016 *qt422016.Writer, labelEntries []storage.TagEntry) {
//line app/vmselect/prometheus/labels_count_response.qtpl:5
qw422016.N().S(`{"status":"success","data":{`)
//line app/vmselect/prometheus/labels_count_response.qtpl:9
for i, e := range labelEntries {
//line app/vmselect/prometheus/labels_count_response.qtpl:10
qw422016.N().Q(e.Key)
//line app/vmselect/prometheus/labels_count_response.qtpl:10
qw422016.N().S(`:`)
//line app/vmselect/prometheus/labels_count_response.qtpl:10
qw422016.N().D(len(e.Values))
//line app/vmselect/prometheus/labels_count_response.qtpl:11
if i+1 < len(labelEntries) {
//line app/vmselect/prometheus/labels_count_response.qtpl:11
qw422016.N().S(`,`)
//line app/vmselect/prometheus/labels_count_response.qtpl:11
}
//line app/vmselect/prometheus/labels_count_response.qtpl:12
}
//line app/vmselect/prometheus/labels_count_response.qtpl:12
qw422016.N().S(`}}`)
//line app/vmselect/prometheus/labels_count_response.qtpl:15
}
//line app/vmselect/prometheus/labels_count_response.qtpl:15
func WriteLabelsCountResponse(qq422016 qtio422016.Writer, labelEntries []storage.TagEntry) {
//line app/vmselect/prometheus/labels_count_response.qtpl:15
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/labels_count_response.qtpl:15
StreamLabelsCountResponse(qw422016, labelEntries)
//line app/vmselect/prometheus/labels_count_response.qtpl:15
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/labels_count_response.qtpl:15
}
//line app/vmselect/prometheus/labels_count_response.qtpl:15
func LabelsCountResponse(labelEntries []storage.TagEntry) string {
//line app/vmselect/prometheus/labels_count_response.qtpl:15
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/labels_count_response.qtpl:15
WriteLabelsCountResponse(qb422016, labelEntries)
//line app/vmselect/prometheus/labels_count_response.qtpl:15
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/labels_count_response.qtpl:15
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/labels_count_response.qtpl:15
return qs422016
//line app/vmselect/prometheus/labels_count_response.qtpl:15
}

View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"math" "math"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -454,38 +453,10 @@ func LabelValuesHandler(qt *querytracer.Tracer, startTime time.Time, labelName s
if err != nil { if err != nil {
return err return err
} }
var labelValues []string sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, *maxUniqueTimeseries)
if len(cp.filterss) == 0 { labelValues, err := netstorage.GetLabelValues(qt, labelName, sq, limit, cp.deadline)
if cp.IsDefaultTimeRange() {
labelValues, err = netstorage.GetLabelValues(qt, labelName, limit, cp.deadline)
if err != nil { if err != nil {
return fmt.Errorf(`cannot obtain label values for %q: %w`, labelName, err) return fmt.Errorf("cannot obtain values for label %q: %w", labelName, err)
}
} else {
if cp.start == 0 {
cp.start = cp.end - defaultStep
}
tr := storage.TimeRange{
MinTimestamp: cp.start,
MaxTimestamp: cp.end,
}
labelValues, err = netstorage.GetLabelValuesOnTimeRange(qt, labelName, tr, limit, cp.deadline)
if err != nil {
return fmt.Errorf(`cannot obtain label values on time range for %q: %w`, labelName, err)
}
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/label/foo/values?match[]=foobar{baz="abc"}&start=...&end=...
// is equivalent to `label_values(foobar{baz="abc"}, foo)` call on the selected
// time range in Grafana templating.
if cp.start == 0 {
cp.start = cp.end - defaultStep
}
labelValues, err = labelValuesWithMatches(qt, labelName, cp, limit)
if err != nil {
return fmt.Errorf("cannot obtain label values for %q on time range [%d...%d]: %w", labelName, cp.start, cp.end, err)
}
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@ -498,89 +469,8 @@ func LabelValuesHandler(qt *querytracer.Tracer, startTime time.Time, labelName s
return nil return nil
} }
func labelValuesWithMatches(qt *querytracer.Tracer, labelName string, cp *commonParams, limit int) ([]string, error) {
// Add `labelName!=''` tag filter in order to filter out series without the labelName.
// There is no need in adding `__name__!=''` filter, since all the time series should
// already have non-empty name.
if labelName != "__name__" {
key := []byte(labelName)
for i, tfs := range cp.filterss {
cp.filterss[i] = append(tfs, storage.TagFilter{
Key: key,
IsNegative: true,
})
}
}
sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, *maxSeriesLimit)
m := make(map[string]struct{})
if cp.end-cp.start > 24*3600*1000 {
// It is cheaper to call SearchMetricNames on time ranges exceeding a day.
mns, err := netstorage.SearchMetricNames(qt, sq, cp.deadline)
if err != nil {
return nil, fmt.Errorf("cannot fetch time series for %q: %w", sq, err)
}
for _, mn := range mns {
labelValue := mn.GetTagValue(labelName)
if len(labelValue) == 0 {
continue
}
m[string(labelValue)] = struct{}{}
}
} else {
rss, err := netstorage.ProcessSearchQuery(qt, sq, false, cp.deadline)
if err != nil {
return nil, fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
var mLock sync.Mutex
err = rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
labelValue := rs.MetricName.GetTagValue(labelName)
if len(labelValue) == 0 {
return nil
}
mLock.Lock()
m[string(labelValue)] = struct{}{}
mLock.Unlock()
return nil
})
if err != nil {
return nil, fmt.Errorf("cannot fetch label values from storage: %w", err)
}
}
labelValues := make([]string, 0, len(m))
for labelValue := range m {
labelValues = append(labelValues, labelValue)
}
if limit > 0 && len(labelValues) > limit {
labelValues = labelValues[:limit]
}
sort.Strings(labelValues)
qt.Printf("sort %d label values", len(labelValues))
return labelValues, nil
}
var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/label/{}/values"}`) var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/label/{}/values"}`)
// LabelsCountHandler processes /api/v1/labels/count request.
func LabelsCountHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
defer labelsCountDuration.UpdateDuration(startTime)
deadline := searchutils.GetDeadlineForStatusRequest(r, startTime)
labelEntries, err := netstorage.GetLabelEntries(nil, deadline)
if err != nil {
return fmt.Errorf(`cannot obtain label entries: %w`, err)
}
w.Header().Set("Content-Type", "application/json")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteLabelsCountResponse(bw, labelEntries)
if err := bw.Flush(); err != nil {
return fmt.Errorf("cannot send labels count response to remote client: %w", err)
}
return nil
}
var labelsCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels/count"}`)
const secsPerDay = 3600 * 24 const secsPerDay = 3600 * 24
// TSDBStatusHandler processes /api/v1/status/tsdb request. // TSDBStatusHandler processes /api/v1/status/tsdb request.
@ -670,37 +560,11 @@ func LabelsHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
if err != nil { if err != nil {
return err return err
} }
var labels []string sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, *maxUniqueTimeseries)
if len(cp.filterss) == 0 { labels, err := netstorage.GetLabelNames(qt, sq, limit, cp.deadline)
if cp.IsDefaultTimeRange() {
labels, err = netstorage.GetLabels(qt, limit, cp.deadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot obtain labels: %w", err) return fmt.Errorf("cannot obtain labels: %w", err)
} }
} else {
if cp.start == 0 {
cp.start = cp.end - defaultStep
}
tr := storage.TimeRange{
MinTimestamp: cp.start,
MaxTimestamp: cp.end,
}
labels, err = netstorage.GetLabelsOnTimeRange(qt, tr, limit, cp.deadline)
if err != nil {
return fmt.Errorf("cannot obtain labels on time range: %w", err)
}
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/labels?match[]=foobar{baz="abc"}&start=...&end=...
if cp.start == 0 {
cp.start = cp.end - defaultStep
}
labels, err = labelsWithMatches(qt, cp, limit)
if err != nil {
return fmt.Errorf("cannot obtain labels for timeRange=[%d..%d]: %w", cp.start, cp.end, err)
}
}
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
bw := bufferedwriter.Get(w) bw := bufferedwriter.Get(w)
@ -712,54 +576,6 @@ func LabelsHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
return nil return nil
} }
func labelsWithMatches(qt *querytracer.Tracer, cp *commonParams, limit int) ([]string, error) {
sq := storage.NewSearchQuery(cp.start, cp.end, cp.filterss, *maxSeriesLimit)
m := make(map[string]struct{})
if cp.end-cp.start > 24*3600*1000 {
// It is cheaper to call SearchMetricNames on time ranges exceeding a day.
mns, err := netstorage.SearchMetricNames(qt, sq, cp.deadline)
if err != nil {
return nil, fmt.Errorf("cannot fetch time series for %q: %w", sq, err)
}
for _, mn := range mns {
for _, tag := range mn.Tags {
m[string(tag.Key)] = struct{}{}
}
}
if len(mns) > 0 {
m["__name__"] = struct{}{}
}
} else {
rss, err := netstorage.ProcessSearchQuery(qt, sq, false, cp.deadline)
if err != nil {
return nil, fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
var mLock sync.Mutex
err = rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
mLock.Lock()
for _, tag := range rs.MetricName.Tags {
m[string(tag.Key)] = struct{}{}
}
m["__name__"] = struct{}{}
mLock.Unlock()
return nil
})
if err != nil {
return nil, fmt.Errorf("cannot fetch labels from storage: %w", err)
}
}
labels := make([]string, 0, len(m))
for label := range m {
labels = append(labels, label)
}
if limit > 0 && limit < len(labels) {
labels = labels[:limit]
}
sort.Strings(labels)
qt.Printf("sort %d labels", len(labels))
return labels, nil
}
var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels"}`) var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels"}`)
// SeriesCountHandler processes /api/v1/series/count request. // SeriesCountHandler processes /api/v1/series/count request.

View File

@ -180,36 +180,21 @@ func SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr st
return mns, err return mns, err
} }
// SearchTagKeysOnTimeRange searches for tag keys on tr. // SearchLabelNamesWithFiltersOnTimeRange searches for tag keys matching the given tfss on tr.
func SearchTagKeysOnTimeRange(tr storage.TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { func SearchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxTagKeys, maxMetrics int, deadline uint64) ([]string, error) {
WG.Add(1) WG.Add(1)
keys, err := Storage.SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline) labelNames, err := Storage.SearchLabelNamesWithFiltersOnTimeRange(qt, tfss, tr, maxTagKeys, maxMetrics, deadline)
WG.Done() WG.Done()
return keys, err return labelNames, err
} }
// SearchTagKeys searches for tag keys // SearchLabelValuesWithFiltersOnTimeRange searches for label values for the given labelName, tfss and tr.
func SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { func SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, labelName string, tfss []*storage.TagFilters,
tr storage.TimeRange, maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) {
WG.Add(1) WG.Add(1)
keys, err := Storage.SearchTagKeys(maxTagKeys, deadline) labelValues, err := Storage.SearchLabelValuesWithFiltersOnTimeRange(qt, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
WG.Done() WG.Done()
return keys, err return labelValues, err
}
// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr.
func SearchTagValuesOnTimeRange(tagKey []byte, tr storage.TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
WG.Add(1)
values, err := Storage.SearchTagValuesOnTimeRange(tagKey, tr, maxTagValues, deadline)
WG.Done()
return values, err
}
// SearchTagValues searches for tag values for the given tagKey
func SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
WG.Add(1)
values, err := Storage.SearchTagValues(tagKey, maxTagValues, deadline)
WG.Done()
return values, err
} }
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr. // SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
@ -230,14 +215,6 @@ func SearchGraphitePaths(tr storage.TimeRange, query []byte, maxPaths int, deadl
return paths, err return paths, err
} }
// SearchTagEntries searches for tag entries.
func SearchTagEntries(maxTagKeys, maxTagValues int, deadline uint64) ([]storage.TagEntry, error) {
WG.Add(1)
tagEntries, err := Storage.SearchTagEntries(maxTagKeys, maxTagValues, deadline)
WG.Done()
return tagEntries, err
}
// GetTSDBStatusForDate returns TSDB status for the given date. // GetTSDBStatusForDate returns TSDB status for the given date.
func GetTSDBStatusForDate(qt *querytracer.Tracer, date uint64, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) { func GetTSDBStatusForDate(qt *querytracer.Tracer, date uint64, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) {
WG.Add(1) WG.Add(1)

View File

@ -23,7 +23,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: add support of `lowercase` and `uppercase` relabeling actions in the same way as [Prometheus 2.36.0 does](https://github.com/prometheus/prometheus/releases/tag/v2.36.0). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2664). * FEATURE: add support of `lowercase` and `uppercase` relabeling actions in the same way as [Prometheus 2.36.0 does](https://github.com/prometheus/prometheus/releases/tag/v2.36.0). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2664).
* FEATURE: add ability to change the `indexdb` rotation timezone offset via `-retentionTimezoneOffset` command-line flag. Previously it was performed at 4am UTC time. This could lead to performance degradation in the middle of the day when VictoriaMetrics runs in time zones located too far from UTC. Thanks to @cnych for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2574). * FEATURE: add ability to change the `indexdb` rotation timezone offset via `-retentionTimezoneOffset` command-line flag. Previously it was performed at 4am UTC time. This could lead to performance degradation in the middle of the day when VictoriaMetrics runs in time zones located too far from UTC. Thanks to @cnych for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2574).
* FEATURE: limit the number of background merge threads on systems with big number of CPU cores by default. This increases the max size of parts, which can be created during background merge when `-storageDataPath` directory has limited free disk space. This may improve on-disk data compression efficiency and query performance. The limits can be tuned if needed with `-smallMergeConcurrency` and `-bigMergeConcurrency` command-line flags. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2673). * FEATURE: limit the number of background merge threads on systems with big number of CPU cores by default. This increases the max size of parts, which can be created during background merge when `-storageDataPath` directory has limited free disk space. This may improve on-disk data compression efficiency and query performance. The limits can be tuned if needed with `-smallMergeConcurrency` and `-bigMergeConcurrency` command-line flags. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2673).
* FEATURE: accept optional `limit` query arg at [/api/v1/labels](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names) and [/api/v1/label_values](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values) for limiting the numbef of sample entries returned from these endpoints. See [these docs](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements). * FEATURE: accept optional `limit` query arg at [/api/v1/labels](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names) and [/api/v1/label/.../values](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values) for limiting the numbef of sample entries returned from these endpoints. See [these docs](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements).
* FEATURE: optimize performance for [/api/v1/labels](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names) and [/api/v1/label/.../values](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values) endpoints when `match[]`, `extra_label` or `extra_filters[]` query args are passed to these endpoints.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `limit` param per-group for limiting number of produced samples per each rule. Thanks to @Howie59 for [implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2676). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `limit` param per-group for limiting number of produced samples per each rule. Thanks to @Howie59 for [implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2676).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): remove dependency on Internet access at [web API pages](https://docs.victoriametrics.com/vmalert.html#web). Previously the functionality and the layout of these pages was broken without Internet access. See [shis issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2594). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): remove dependency on Internet access at [web API pages](https://docs.victoriametrics.com/vmalert.html#web). Previously the functionality and the layout of these pages was broken without Internet access. See [shis issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2594).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): implement the `http://vmagent:8429/service-discovery` page in the same way as Prometheus does. This page shows the original labels for all the discovered targets alongside the resulting labels after the relabeling. This simplifies service discovery debugging. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): implement the `http://vmagent:8429/service-discovery` page in the same way as Prometheus does. This page shows the original labels for all the discovered targets alongside the resulting labels after the relabeling. This simplifies service discovery debugging.

View File

@ -275,6 +275,8 @@ By default cardinality explorer analyzes time series for the current date. It pr
By default all the time series for the selected date are analyzed. It is possible to narrow down the analysis to series By default all the time series for the selected date are analyzed. It is possible to narrow down the analysis to series
matching the specified [series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors). matching the specified [series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors).
Cardinality explorer takes into account [deleted time series](#how-to-delete-time-series), because they stay in the inverted index for up to [-retentionPeriod](#retention). This means that the deleted time series take RAM, CPU, disk IO and disk space for the inverted index in the same way as other time series.
Cardinality explorer is built on top of [/api/v1/status/tsdb](#tsdb-stats). Cardinality explorer is built on top of [/api/v1/status/tsdb](#tsdb-stats).
See [cardinality explorer playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/#/cardinality). See [cardinality explorer playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/#/cardinality).
@ -617,7 +619,6 @@ Additionally, VictoriaMetrics provides the following handlers:
* `/api/v1/series/count` - returns the total number of time series in the database. Some notes: * `/api/v1/series/count` - returns the total number of time series in the database. Some notes:
* the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series; * the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series;
* the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions; * the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions;
* `/api/v1/labels/count` - returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values.
* `/api/v1/status/active_queries` - returns a list of currently running queries. * `/api/v1/status/active_queries` - returns a list of currently running queries.
* `/api/v1/status/top_queries` - returns the following query lists: * `/api/v1/status/top_queries` - returns the following query lists:
* the most frequently executed queries - `topByCount` * the most frequently executed queries - `topByCount`
@ -1245,11 +1246,12 @@ and [clustered VictoriaMetrics](https://grafana.com/grafana/dashboards/11176) Gr
See more details in [monitoring docs](#monitoring). See more details in [monitoring docs](#monitoring).
The `merge` process is usually named "compaction", because the resulting `part` size is usually smaller than The `merge` process is usually named "compaction", because the resulting `part` size is usually smaller than
the sum of the source `parts`. There are following benefits of doing the merge process: the sum of the source `parts` because of better compression rate. The merge process provides the following additional benefits:
* it improves query performance, since lower number of `parts` are inspected with each query; * it improves query performance, since lower number of `parts` are inspected with each query
* it reduces the number of data files, since each `part`contains fixed number of files; * it reduces the number of data files, since each `part` contains fixed number of files
* better compression rate for the resulting part. * various background maintenance tasks such as [de-duplication](#deduplication), [downsampling](#downsampling)
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are perfomed during the merge.
Newly added `parts` either appear in the storage or fail to appear. Newly added `parts` either appear in the storage or fail to appear.
Storage never contains partially created parts. The same applies to merge process — `parts` are either fully Storage never contains partially created parts. The same applies to merge process — `parts` are either fully
@ -1411,7 +1413,7 @@ See the example of alerting rules for VM components [here](https://github.com/Vi
VictoriaMetrics returns TSDB stats at `/api/v1/status/tsdb` page in the way similar to Prometheus - see [these Prometheus docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). VictoriaMetrics accepts the following optional query args at `/api/v1/status/tsdb` page: VictoriaMetrics returns TSDB stats at `/api/v1/status/tsdb` page in the way similar to Prometheus - see [these Prometheus docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). VictoriaMetrics accepts the following optional query args at `/api/v1/status/tsdb` page:
* `topN=N` where `N` is the number of top entries to return in the response. By default top 10 entries are returned. * `topN=N` where `N` is the number of top entries to return in the response. By default top 10 entries are returned.
* `date=YYYY-MM-DD` where `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day. * `date=YYYY-MM-DD` where `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day. Pass `date=1970-01-01` in order to collect global stats across all the days.
* `match[]=SELECTOR` where `SELECTOR` is an arbitrary [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for series to take into account during stats calculation. By default all the series are taken into account. * `match[]=SELECTOR` where `SELECTOR` is an arbitrary [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for series to take into account during stats calculation. By default all the series are taken into account.
* `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details. * `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details.

View File

@ -279,6 +279,8 @@ By default cardinality explorer analyzes time series for the current date. It pr
By default all the time series for the selected date are analyzed. It is possible to narrow down the analysis to series By default all the time series for the selected date are analyzed. It is possible to narrow down the analysis to series
matching the specified [series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors). matching the specified [series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors).
Cardinality explorer takes into account [deleted time series](#how-to-delete-time-series), because they stay in the inverted index for up to [-retentionPeriod](#retention). This means that the deleted time series take RAM, CPU, disk IO and disk space for the inverted index in the same way as other time series.
Cardinality explorer is built on top of [/api/v1/status/tsdb](#tsdb-stats). Cardinality explorer is built on top of [/api/v1/status/tsdb](#tsdb-stats).
See [cardinality explorer playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/#/cardinality). See [cardinality explorer playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/#/cardinality).
@ -621,7 +623,6 @@ Additionally, VictoriaMetrics provides the following handlers:
* `/api/v1/series/count` - returns the total number of time series in the database. Some notes: * `/api/v1/series/count` - returns the total number of time series in the database. Some notes:
* the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series; * the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series;
* the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions; * the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions;
* `/api/v1/labels/count` - returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values.
* `/api/v1/status/active_queries` - returns a list of currently running queries. * `/api/v1/status/active_queries` - returns a list of currently running queries.
* `/api/v1/status/top_queries` - returns the following query lists: * `/api/v1/status/top_queries` - returns the following query lists:
* the most frequently executed queries - `topByCount` * the most frequently executed queries - `topByCount`
@ -1249,11 +1250,12 @@ and [clustered VictoriaMetrics](https://grafana.com/grafana/dashboards/11176) Gr
See more details in [monitoring docs](#monitoring). See more details in [monitoring docs](#monitoring).
The `merge` process is usually named "compaction", because the resulting `part` size is usually smaller than The `merge` process is usually named "compaction", because the resulting `part` size is usually smaller than
the sum of the source `parts`. There are following benefits of doing the merge process: the sum of the source `parts` because of better compression rate. The merge process provides the following additional benefits:
* it improves query performance, since lower number of `parts` are inspected with each query; * it improves query performance, since lower number of `parts` are inspected with each query
* it reduces the number of data files, since each `part`contains fixed number of files; * it reduces the number of data files, since each `part` contains fixed number of files
* better compression rate for the resulting part. * various background maintenance tasks such as [de-duplication](#deduplication), [downsampling](#downsampling)
and [freeing up disk space for the deleted time series](#how-to-delete-time-series) are perfomed during the merge.
Newly added `parts` either appear in the storage or fail to appear. Newly added `parts` either appear in the storage or fail to appear.
Storage never contains partially created parts. The same applies to merge process — `parts` are either fully Storage never contains partially created parts. The same applies to merge process — `parts` are either fully
@ -1415,7 +1417,7 @@ See the example of alerting rules for VM components [here](https://github.com/Vi
VictoriaMetrics returns TSDB stats at `/api/v1/status/tsdb` page in the way similar to Prometheus - see [these Prometheus docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). VictoriaMetrics accepts the following optional query args at `/api/v1/status/tsdb` page: VictoriaMetrics returns TSDB stats at `/api/v1/status/tsdb` page in the way similar to Prometheus - see [these Prometheus docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats). VictoriaMetrics accepts the following optional query args at `/api/v1/status/tsdb` page:
* `topN=N` where `N` is the number of top entries to return in the response. By default top 10 entries are returned. * `topN=N` where `N` is the number of top entries to return in the response. By default top 10 entries are returned.
* `date=YYYY-MM-DD` where `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day. * `date=YYYY-MM-DD` where `YYYY-MM-DD` is the date for collecting the stats. By default the stats is collected for the current day. Pass `date=1970-01-01` in order to collect global stats across all the days.
* `match[]=SELECTOR` where `SELECTOR` is an arbitrary [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for series to take into account during stats calculation. By default all the series are taken into account. * `match[]=SELECTOR` where `SELECTOR` is an arbitrary [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for series to take into account during stats calculation. By default all the series are taken into account.
* `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details. * `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details.

View File

@ -425,7 +425,7 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
} }
// Round start and end times to per-day granularity according to per-day inverted index. // Round start and end times to per-day granularity according to per-day inverted index.
startDate := uint64(tr.MinTimestamp) / msecPerDay startDate := uint64(tr.MinTimestamp) / msecPerDay
endDate := uint64(tr.MaxTimestamp) / msecPerDay endDate := uint64(tr.MaxTimestamp-1) / msecPerDay
dst = encoding.MarshalUint64(dst, prefix) dst = encoding.MarshalUint64(dst, prefix)
dst = encoding.MarshalUint64(dst, startDate) dst = encoding.MarshalUint64(dst, startDate)
dst = encoding.MarshalUint64(dst, endDate) dst = encoding.MarshalUint64(dst, endDate)
@ -715,50 +715,65 @@ func putIndexItems(ii *indexItems) {
var indexItemsPool sync.Pool var indexItemsPool sync.Pool
// SearchTagKeysOnTimeRange returns all the tag keys on the given tr. // SearchLabelNamesWithFiltersOnTimeRange returns all the label names, which match the given tfss on the given tr.
func (db *indexDB) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { func (db *indexDB) SearchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxLabelNames, maxMetrics int, deadline uint64) ([]string, error) {
tks := make(map[string]struct{}) qt = qt.NewChild("search for label names: filters=%s, timeRange=%s, maxLabelNames=%d, maxMetrics=%d", tfss, &tr, maxLabelNames, maxMetrics)
defer qt.Done()
lns := make(map[string]struct{})
qtChild := qt.NewChild("search for label names in the current indexdb")
is := db.getIndexSearch(deadline) is := db.getIndexSearch(deadline)
err := is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys) err := is.searchLabelNamesWithFiltersOnTimeRange(qtChild, lns, tfss, tr, maxLabelNames, maxMetrics)
db.putIndexSearch(is) db.putIndexSearch(is)
qtChild.Donef("found %d label names", len(lns))
if err != nil { if err != nil {
return nil, err return nil, err
} }
ok := db.doExtDB(func(extDB *indexDB) { ok := db.doExtDB(func(extDB *indexDB) {
qtChild := qt.NewChild("search for label names in the previous indexdb")
lnsLen := len(lns)
is := extDB.getIndexSearch(deadline) is := extDB.getIndexSearch(deadline)
err = is.searchTagKeysOnTimeRange(tks, tr, maxTagKeys) err = is.searchLabelNamesWithFiltersOnTimeRange(qtChild, lns, tfss, tr, maxLabelNames, maxMetrics)
extDB.putIndexSearch(is) extDB.putIndexSearch(is)
qtChild.Donef("found %d additional label names", len(lns)-lnsLen)
}) })
if ok && err != nil { if ok && err != nil {
return nil, err return nil, err
} }
keys := make([]string, 0, len(tks)) labelNames := make([]string, 0, len(lns))
for key := range tks { for labelName := range lns {
// Do not skip empty keys, since they are converted to __name__ labelNames = append(labelNames, labelName)
keys = append(keys, key)
} }
// Do not sort keys, since they must be sorted by vmselect. // Do not sort label names, since they must be sorted by vmselect.
return keys, nil qt.Printf("found %d label names in the current and the previous indexdb", len(labelNames))
return labelNames, nil
} }
func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr TimeRange, maxTagKeys int) error { func (is *indexSearch) searchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer, lns map[string]struct{}, tfss []*TagFilters, tr TimeRange, maxLabelNames, maxMetrics int) error {
minDate := uint64(tr.MinTimestamp) / msecPerDay minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay
if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch { if maxDate == 0 || minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
return is.searchTagKeys(tks, maxTagKeys) qtChild := qt.NewChild("search for label names in global index: filters=%s", tfss)
err := is.searchLabelNamesWithFiltersOnDate(qtChild, lns, tfss, 0, maxLabelNames, maxMetrics)
qtChild.Done()
return err
} }
var mu sync.Mutex var mu sync.Mutex
wg := getWaitGroup() wg := getWaitGroup()
var errGlobal error var errGlobal error
qt = qt.NewChild("parallel search for label names: filters=%s, timeRange=%s", tfss, &tr)
for date := minDate; date <= maxDate; date++ { for date := minDate; date <= maxDate; date++ {
wg.Add(1) wg.Add(1)
qtChild := qt.NewChild("search for label names: filters=%s, date=%d", tfss, date)
go func(date uint64) { go func(date uint64) {
defer wg.Done() defer func() {
tksLocal := make(map[string]struct{}) qtChild.Done()
wg.Done()
}()
lnsLocal := make(map[string]struct{})
isLocal := is.db.getIndexSearch(is.deadline) isLocal := is.db.getIndexSearch(is.deadline)
err := isLocal.searchTagKeysOnDate(tksLocal, date, maxTagKeys) err := isLocal.searchLabelNamesWithFiltersOnDate(qtChild, lnsLocal, tfss, date, maxLabelNames, maxMetrics)
is.db.putIndexSearch(isLocal) is.db.putIndexSearch(isLocal)
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
@ -769,31 +784,43 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time
errGlobal = err errGlobal = err
return return
} }
if len(tks) >= maxTagKeys { if len(lns) >= maxLabelNames {
return return
} }
for k := range tksLocal { for k := range lnsLocal {
tks[k] = struct{}{} lns[k] = struct{}{}
} }
}(date) }(date)
} }
wg.Wait() wg.Wait()
putWaitGroup(wg) putWaitGroup(wg)
qt.Done()
return errGlobal return errGlobal
} }
func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64, maxTagKeys int) error { func (is *indexSearch) searchLabelNamesWithFiltersOnDate(qt *querytracer.Tracer, lns map[string]struct{}, tfss []*TagFilters, date uint64, maxLabelNames, maxMetrics int) error {
filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics)
if err != nil {
return err
}
if filter != nil && filter.Len() == 0 {
qt.Printf("found zero label names for filter=%s", tfss)
return nil
}
var prevLabelName []byte
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
mp := &is.mp mp := &is.mp
mp.Reset()
dmis := is.db.s.getDeletedMetricIDs() dmis := is.db.s.getDeletedMetricIDs()
loopsPaceLimiter := 0 loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) nsPrefixExpected := byte(nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date) if date == 0 {
nsPrefixExpected = nsPrefixTagToMetricIDs
}
kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
prefix := kb.B prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for len(tks) < maxTagKeys && ts.NextItem() { for len(lns) < maxLabelNames && ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 { if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err return err
@ -804,30 +831,36 @@ func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64,
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
break break
} }
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil { if err := mp.Init(item, nsPrefixExpected); err != nil {
return err return err
} }
if mp.IsDeletedTag(dmis) { if mp.IsDeletedTag(dmis) {
continue continue
} }
key := mp.Tag.Key if mp.GetMatchingSeriesCount(filter) == 0 {
if !isArtificialTagKey(key) { continue
tks[string(key)] = struct{}{}
} }
labelName := mp.Tag.Key
if len(labelName) == 0 {
labelName = []byte("__name__")
}
if isArtificialTagKey(labelName) || string(labelName) == string(prevLabelName) {
// Search for the next tag key. // Search for the next tag key.
// The last char in kb.B must be tagSeparatorChar. // The last char in kb.B must be tagSeparatorChar.
// Just increment it in order to jump to the next tag key. // Just increment it in order to jump to the next tag key.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
kb.B = encoding.MarshalUint64(kb.B, date) if len(labelName) > 0 && labelName[0] == compositeTagKeyPrefix {
if len(key) > 0 && key[0] == compositeTagKeyPrefix {
// skip composite tag entries // skip composite tag entries
kb.B = append(kb.B, compositeTagKeyPrefix) kb.B = append(kb.B, compositeTagKeyPrefix)
} else { } else {
kb.B = marshalTagValue(kb.B, key) kb.B = marshalTagValue(kb.B, labelName)
} }
kb.B[len(kb.B)-1]++ kb.B[len(kb.B)-1]++
ts.Seek(kb.B) ts.Seek(kb.B)
continue
}
lns[string(labelName)] = struct{}{}
prevLabelName = append(prevLabelName[:0], labelName...)
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return fmt.Errorf("error during search for prefix %q: %w", prefix, err) return fmt.Errorf("error during search for prefix %q: %w", prefix, err)
@ -835,133 +868,71 @@ func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64,
return nil return nil
} }
// SearchTagKeys returns all the tag keys. // SearchLabelValuesWithFiltersOnTimeRange returns label values for the given labelName, tfss and tr.
func (db *indexDB) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { func (db *indexDB) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, labelName string, tfss []*TagFilters, tr TimeRange,
tks := make(map[string]struct{}) maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) {
qt = qt.NewChild("search for label values: labelName=%q, filters=%s, timeRange=%s, maxLabelNames=%d, maxMetrics=%d", labelName, tfss, &tr, maxLabelValues, maxMetrics)
defer qt.Done()
lvs := make(map[string]struct{})
qtChild := qt.NewChild("search for label values in the current indexdb")
is := db.getIndexSearch(deadline) is := db.getIndexSearch(deadline)
err := is.searchTagKeys(tks, maxTagKeys) err := is.searchLabelValuesWithFiltersOnTimeRange(qtChild, lvs, labelName, tfss, tr, maxLabelValues, maxMetrics)
db.putIndexSearch(is) db.putIndexSearch(is)
qtChild.Donef("found %d label values", len(lvs))
if err != nil { if err != nil {
return nil, err return nil, err
} }
ok := db.doExtDB(func(extDB *indexDB) { ok := db.doExtDB(func(extDB *indexDB) {
qtChild := qt.NewChild("search for label values in the previous indexdb")
lvsLen := len(lvs)
is := extDB.getIndexSearch(deadline) is := extDB.getIndexSearch(deadline)
err = is.searchTagKeys(tks, maxTagKeys) err = is.searchLabelValuesWithFiltersOnTimeRange(qtChild, lvs, labelName, tfss, tr, maxLabelValues, maxMetrics)
extDB.putIndexSearch(is) extDB.putIndexSearch(is)
qtChild.Donef("found %d additional label values", len(lvs)-lvsLen)
}) })
if ok && err != nil { if ok && err != nil {
return nil, err return nil, err
} }
keys := make([]string, 0, len(tks)) labelValues := make([]string, 0, len(lvs))
for key := range tks { for labelValue := range lvs {
// Do not skip empty keys, since they are converted to __name__ if len(labelValue) == 0 {
keys = append(keys, key)
}
// Do not sort keys, since they must be sorted by vmselect.
return keys, nil
}
func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) error {
ts := &is.ts
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.s.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
prefix := kb.B
ts.Seek(prefix)
for len(tks) < maxTagKeys && ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item, nsPrefixTagToMetricIDs); err != nil {
return err
}
if mp.IsDeletedTag(dmis) {
continue
}
key := mp.Tag.Key
if !isArtificialTagKey(key) {
tks[string(key)] = struct{}{}
}
// Search for the next tag key.
// The last char in kb.B must be tagSeparatorChar.
// Just increment it in order to jump to the next tag key.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
if len(key) > 0 && key[0] == compositeTagKeyPrefix {
// skip composite tag entries
kb.B = append(kb.B, compositeTagKeyPrefix)
} else {
kb.B = marshalTagValue(kb.B, key)
}
kb.B[len(kb.B)-1]++
ts.Seek(kb.B)
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error during search for prefix %q: %w", prefix, err)
}
return nil
}
// SearchTagValuesOnTimeRange returns all the tag values for the given tagKey on tr.
func (db *indexDB) SearchTagValuesOnTimeRange(tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
tvs := make(map[string]struct{})
is := db.getIndexSearch(deadline)
err := is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
err = is.searchTagValuesOnTimeRange(tvs, tagKey, tr, maxTagValues)
extDB.putIndexSearch(is)
})
if ok && err != nil {
return nil, err
}
tagValues := make([]string, 0, len(tvs))
for tv := range tvs {
if len(tv) == 0 {
// Skip empty values, since they have no any meaning. // Skip empty values, since they have no any meaning.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600
continue continue
} }
tagValues = append(tagValues, tv) labelValues = append(labelValues, labelValue)
} }
// Do not sort tagValues, since they must be sorted by vmselect. // Do not sort labelValues, since they must be sorted by vmselect.
return tagValues, nil qt.Printf("found %d label values in the current and the previous indexdb", len(labelValues))
return labelValues, nil
} }
func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKey []byte, tr TimeRange, maxTagValues int) error { func (is *indexSearch) searchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, tfss []*TagFilters,
tr TimeRange, maxLabelValues, maxMetrics int) error {
minDate := uint64(tr.MinTimestamp) / msecPerDay minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay
if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch { if maxDate == 0 || minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
return is.searchTagValues(tvs, tagKey, maxTagValues) qtChild := qt.NewChild("search for label values in global index: labelName=%q, filters=%s", labelName, tfss)
err := is.searchLabelValuesWithFiltersOnDate(qtChild, lvs, labelName, tfss, 0, maxLabelValues, maxMetrics)
qtChild.Done()
return err
} }
var mu sync.Mutex var mu sync.Mutex
wg := getWaitGroup() wg := getWaitGroup()
var errGlobal error var errGlobal error
qt = qt.NewChild("parallel search for label values: labelName=%q, filters=%s, timeRange=%s", labelName, tfss, &tr)
for date := minDate; date <= maxDate; date++ { for date := minDate; date <= maxDate; date++ {
wg.Add(1) wg.Add(1)
qtChild := qt.NewChild("search for label names: filters=%s, date=%d", tfss, date)
go func(date uint64) { go func(date uint64) {
defer wg.Done() defer func() {
tvsLocal := make(map[string]struct{}) qtChild.Done()
wg.Done()
}()
lvsLocal := make(map[string]struct{})
isLocal := is.db.getIndexSearch(is.deadline) isLocal := is.db.getIndexSearch(is.deadline)
err := isLocal.searchTagValuesOnDate(tvsLocal, tagKey, date, maxTagValues) err := isLocal.searchLabelValuesWithFiltersOnDate(qtChild, lvsLocal, labelName, tfss, date, maxLabelValues, maxMetrics)
is.db.putIndexSearch(isLocal) is.db.putIndexSearch(isLocal)
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
@ -972,117 +943,50 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe
errGlobal = err errGlobal = err
return return
} }
if len(tvs) >= maxTagValues { if len(lvs) >= maxLabelValues {
return return
} }
for v := range tvsLocal { for v := range lvsLocal {
tvs[v] = struct{}{} lvs[v] = struct{}{}
} }
}(date) }(date)
} }
wg.Wait() wg.Wait()
putWaitGroup(wg) putWaitGroup(wg)
qt.Done()
return errGlobal return errGlobal
} }
func (is *indexSearch) searchTagValuesOnDate(tvs map[string]struct{}, tagKey []byte, date uint64, maxTagValues int) error { func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, tfss []*TagFilters,
ts := &is.ts date uint64, maxLabelValues, maxMetrics int) error {
kb := &is.kb filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics)
mp := &is.mp if err != nil {
mp.Reset()
dmis := is.db.s.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, tagKey)
prefix := kb.B
ts.Seek(prefix)
for len(tvs) < maxTagValues && ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err return err
} }
} if filter != nil && filter.Len() == 0 {
loopsPaceLimiter++ qt.Printf("found zero label values for filter=%s", tfss)
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
break
}
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
return err
}
if mp.IsDeletedTag(dmis) {
continue
}
if string(mp.Tag.Key) != string(tagKey) {
break
}
tvs[string(mp.Tag.Value)] = struct{}{}
if mp.MetricIDsLen() < maxMetricIDsPerRow/2 {
// There is no need in searching for the next tag value,
// since it is likely it is located in the next row,
// because the current row contains incomplete metricIDs set.
continue
}
// Search for the next tag value.
// The last char in kb.B must be tagSeparatorChar.
// Just increment it in order to jump to the next tag value.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
kb.B = marshalTagValue(kb.B, mp.Tag.Value)
kb.B[len(kb.B)-1]++
ts.Seek(kb.B)
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag name prefix %q: %w", prefix, err)
}
return nil return nil
} }
if labelName == "__name__" {
// SearchTagValues returns all the tag values for the given tagKey // __name__ label is encoded as empty string in indexdb.
func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { labelName = ""
tvs := make(map[string]struct{})
is := db.getIndexSearch(deadline)
err := is.searchTagValues(tvs, tagKey, maxTagValues)
db.putIndexSearch(is)
if err != nil {
return nil, err
} }
ok := db.doExtDB(func(extDB *indexDB) { labelNameBytes := bytesutil.ToUnsafeBytes(labelName)
is := extDB.getIndexSearch(deadline) var prevLabelValue []byte
err = is.searchTagValues(tvs, tagKey, maxTagValues)
extDB.putIndexSearch(is)
})
if ok && err != nil {
return nil, err
}
tagValues := make([]string, 0, len(tvs))
for tv := range tvs {
if len(tv) == 0 {
// Skip empty values, since they have no any meaning.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/600
continue
}
tagValues = append(tagValues, tv)
}
// Do not sort tagValues, since they must be sorted by vmselect.
return tagValues, nil
}
func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, maxTagValues int) error {
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
mp := &is.mp mp := &is.mp
mp.Reset()
dmis := is.db.s.getDeletedMetricIDs() dmis := is.db.s.getDeletedMetricIDs()
loopsPaceLimiter := 0 loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) nsPrefixExpected := byte(nsPrefixDateTagToMetricIDs)
kb.B = marshalTagValue(kb.B, tagKey) if date == 0 {
nsPrefixExpected = nsPrefixTagToMetricIDs
}
kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
kb.B = marshalTagValue(kb.B, labelNameBytes)
prefix := kb.B prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for len(tvs) < maxTagValues && ts.NextItem() { for len(lvs) < maxLabelValues && ts.NextItem() {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 { if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err return err
@ -1093,30 +997,29 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
break break
} }
if err := mp.Init(item, nsPrefixTagToMetricIDs); err != nil { if err := mp.Init(item, nsPrefixExpected); err != nil {
return err return err
} }
if mp.IsDeletedTag(dmis) { if mp.IsDeletedTag(dmis) {
continue continue
} }
if string(mp.Tag.Key) != string(tagKey) { if mp.GetMatchingSeriesCount(filter) == 0 {
break
}
tvs[string(mp.Tag.Value)] = struct{}{}
if mp.MetricIDsLen() < maxMetricIDsPerRow/2 {
// There is no need in searching for the next tag value,
// since it is likely it is located in the next row,
// because the current row contains incomplete metricIDs set.
continue continue
} }
labelValue := mp.Tag.Value
if string(labelValue) == string(prevLabelValue) {
// Search for the next tag value. // Search for the next tag value.
// The last char in kb.B must be tagSeparatorChar. // The last char in kb.B must be tagSeparatorChar.
// Just increment it in order to jump to the next tag value. // Just increment it in order to jump to the next tag value.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
kb.B = marshalTagValue(kb.B, mp.Tag.Key) kb.B = marshalTagValue(kb.B, labelNameBytes)
kb.B = marshalTagValue(kb.B, mp.Tag.Value) kb.B = marshalTagValue(kb.B, labelValue)
kb.B[len(kb.B)-1]++ kb.B[len(kb.B)-1]++
ts.Seek(kb.B) ts.Seek(kb.B)
continue
}
lvs[string(labelValue)] = struct{}{}
prevLabelValue = append(prevLabelValue[:0], labelValue...)
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag name prefix %q: %w", prefix, err) return fmt.Errorf("error when searching for tag name prefix %q: %w", prefix, err)
@ -1164,7 +1067,7 @@ func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix [
func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct{}, tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error { func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct{}, tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error {
minDate := uint64(tr.MinTimestamp) / msecPerDay minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay
if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch { if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
return is.searchTagValueSuffixesAll(tvss, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes) return is.searchTagValueSuffixesAll(tvss, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
} }
@ -1230,7 +1133,6 @@ func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{},
kb := &is.kb kb := &is.kb
ts := &is.ts ts := &is.ts
mp := &is.mp mp := &is.mp
mp.Reset()
dmis := is.db.s.getDeletedMetricIDs() dmis := is.db.s.getDeletedMetricIDs()
loopsPaceLimiter := 0 loopsPaceLimiter := 0
ts.Seek(prefix) ts.Seek(prefix)
@ -1371,23 +1273,14 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, tfss
// getTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date. // getTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date.
func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, topN, maxMetrics int) (*TSDBStatus, error) { func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, topN, maxMetrics int) (*TSDBStatus, error) {
var filter *uint64set.Set filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics)
if len(tfss) > 0 {
tr := TimeRange{
MinTimestamp: int64(date) * msecPerDay,
MaxTimestamp: int64(date+1)*msecPerDay - 1,
}
metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if metricIDs.Len() == 0 { if filter != nil && filter.Len() == 0 {
// Nothing found. qt.Printf("no matching series for filter=%s", tfss)
return &TSDBStatus{}, nil return &TSDBStatus{}, nil
} }
filter = metricIDs
}
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
mp := &is.mp mp := &is.mp
@ -1400,8 +1293,11 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, t
nameEqualBytes := []byte("__name__=") nameEqualBytes := []byte("__name__=")
loopsPaceLimiter := 0 loopsPaceLimiter := 0
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) nsPrefixExpected := byte(nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date) if date == 0 {
nsPrefixExpected = nsPrefixTagToMetricIDs
}
kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
prefix := kb.B prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
@ -1415,28 +1311,15 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, t
if !bytes.HasPrefix(item, prefix) { if !bytes.HasPrefix(item, prefix) {
break break
} }
matchingSeriesCount := 0 if err := mp.Init(item, nsPrefixExpected); err != nil {
if filter != nil {
if err := mp.Init(item, nsPrefixDateTagToMetricIDs); err != nil {
return nil, err return nil, err
} }
mp.ParseMetricIDs() matchingSeriesCount := mp.GetMatchingSeriesCount(filter)
for _, metricID := range mp.MetricIDs {
if filter.Has(metricID) {
matchingSeriesCount++
}
}
if matchingSeriesCount == 0 { if matchingSeriesCount == 0 {
// Skip rows without matching metricIDs. // Skip rows without matching metricIDs.
continue continue
} }
} tmp = append(tmp[:0], mp.Tag.Key...)
tail := item[len(prefix):]
var err error
tail, tmp, err = unmarshalTagValue(tmp[:0], tail)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tag key from line %q: %w", item, err)
}
tagKey := tmp tagKey := tmp
if isArtificialTagKey(tagKey) { if isArtificialTagKey(tagKey) {
// Skip artificially created tag keys. // Skip artificially created tag keys.
@ -1455,17 +1338,8 @@ func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, t
tmp = tagKey tmp = tagKey
} }
tmp = append(tmp, '=') tmp = append(tmp, '=')
tail, tmp, err = unmarshalTagValue(tmp, tail) tmp = append(tmp, mp.Tag.Value...)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal tag value from line %q: %w", item, err)
}
tagKeyValue := tmp tagKeyValue := tmp
if filter == nil {
if err := mp.InitOnlyTail(item, tail); err != nil {
return nil, err
}
matchingSeriesCount = mp.MetricIDsLen()
}
if string(tagKey) == "__name__" { if string(tagKey) == "__name__" {
totalSeries += uint64(matchingSeriesCount) totalSeries += uint64(matchingSeriesCount)
} }
@ -2243,6 +2117,25 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
return true, nil return true, nil
} }
func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, maxMetrics int) (*uint64set.Set, error) {
if len(tfss) == 0 {
return nil, nil
}
tr := TimeRange{
MinTimestamp: int64(date) * msecPerDay,
MaxTimestamp: int64(date+1)*msecPerDay - 1,
}
if date == 0 {
// Search for metricIDs on the whole time range.
tr.MaxTimestamp = timestampFromTime(time.Now())
}
metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics)
if err != nil {
return nil, err
}
return metricIDs, nil
}
func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) { func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics) metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics)
if err != nil { if err != nil {
@ -2359,7 +2252,6 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, f func(metric
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
mp := &is.mp mp := &is.mp
mp.Reset()
var prevMatchingSuffix []byte var prevMatchingSuffix []byte
var prevMatch bool var prevMatch bool
var loopsCount int64 var loopsCount int64
@ -2467,7 +2359,6 @@ func (is *indexSearch) updateMetricIDsForOrSuffixes(tf *tagFilter, metricIDs *ui
func (is *indexSearch) updateMetricIDsForOrSuffix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) { func (is *indexSearch) updateMetricIDsForOrSuffix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
ts := &is.ts ts := &is.ts
mp := &is.mp mp := &is.mp
mp.Reset()
var loopsCount int64 var loopsCount int64
loopsPaceLimiter := 0 loopsPaceLimiter := 0
ts.Seek(prefix) ts.Seek(prefix)
@ -2505,7 +2396,7 @@ const maxDaysForPerDaySearch = 40
func (is *indexSearch) tryUpdatingMetricIDsForDateRange(qt *querytracer.Tracer, metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error { func (is *indexSearch) tryUpdatingMetricIDsForDateRange(qt *querytracer.Tracer, metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1) atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
minDate := uint64(tr.MinTimestamp) / msecPerDay minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay
if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch { if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
// Too much dates must be covered. Give up, since it may be slow. // Too much dates must be covered. Give up, since it may be slow.
return errFallbackToGlobalSearch return errFallbackToGlobalSearch
@ -2927,14 +2818,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *
} }
kb := kbPool.Get() kb := kbPool.Get()
defer kbPool.Put(kb) defer kbPool.Put(kb)
if date != 0 { kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
// Use per-date search.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
} else {
// Use global search if date isn't set.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
}
prefix := kb.B prefix := kb.B
kb.B = append(kb.B, tf.prefix[len(commonPrefix):]...) kb.B = append(kb.B, tf.prefix[len(commonPrefix):]...)
tfNew := *tf tfNew := *tf
@ -3004,14 +2888,7 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64
// Extract all the metricIDs from (date, __name__=value)->metricIDs entries. // Extract all the metricIDs from (date, __name__=value)->metricIDs entries.
kb := kbPool.Get() kb := kbPool.Get()
defer kbPool.Put(kb) defer kbPool.Put(kb)
if date != 0 { kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
// Use per-date search
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
} else {
// Use global search
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
}
kb.B = marshalTagValue(kb.B, nil) kb.B = marshalTagValue(kb.B, nil)
var metricIDs uint64set.Set var metricIDs uint64set.Set
if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil { if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil {
@ -3085,6 +2962,16 @@ func (is *indexSearch) marshalCommonPrefix(dst []byte, nsPrefix byte) []byte {
return marshalCommonPrefix(dst, nsPrefix) return marshalCommonPrefix(dst, nsPrefix)
} }
func (is *indexSearch) marshalCommonPrefixForDate(dst []byte, date uint64) []byte {
if date == 0 {
// Global index
return is.marshalCommonPrefix(dst, nsPrefixTagToMetricIDs)
}
// Per-day index
dst = is.marshalCommonPrefix(dst, nsPrefixDateTagToMetricIDs)
return encoding.MarshalUint64(dst, date)
}
func unmarshalCommonPrefix(src []byte) ([]byte, byte, error) { func unmarshalCommonPrefix(src []byte) ([]byte, byte, error) {
if len(src) < commonPrefixLen { if len(src) < commonPrefixLen {
return nil, 0, fmt.Errorf("cannot unmarshal common prefix from %d bytes; need at least %d bytes; data=%X", len(src), commonPrefixLen, src) return nil, 0, fmt.Errorf("cannot unmarshal common prefix from %d bytes; need at least %d bytes; data=%X", len(src), commonPrefixLen, src)
@ -3107,6 +2994,9 @@ type tagToMetricIDsRowParser struct {
// MetricIDs contains parsed MetricIDs after ParseMetricIDs call // MetricIDs contains parsed MetricIDs after ParseMetricIDs call
MetricIDs []uint64 MetricIDs []uint64
// metricIDsParsed is set to true after ParseMetricIDs call
metricIDsParsed bool
// Tag contains parsed tag after Init call // Tag contains parsed tag after Init call
Tag Tag Tag Tag
@ -3118,6 +3008,7 @@ func (mp *tagToMetricIDsRowParser) Reset() {
mp.NSPrefix = 0 mp.NSPrefix = 0
mp.Date = 0 mp.Date = 0
mp.MetricIDs = mp.MetricIDs[:0] mp.MetricIDs = mp.MetricIDs[:0]
mp.metricIDsParsed = false
mp.Tag.Reset() mp.Tag.Reset()
mp.tail = nil mp.tail = nil
} }
@ -3171,6 +3062,7 @@ func (mp *tagToMetricIDsRowParser) InitOnlyTail(b, tail []byte) error {
return fmt.Errorf("invalid tail length in the tag->metricIDs row; got %d bytes; must be multiple of 8 bytes", len(tail)) return fmt.Errorf("invalid tail length in the tag->metricIDs row; got %d bytes; must be multiple of 8 bytes", len(tail))
} }
mp.tail = tail mp.tail = tail
mp.metricIDsParsed = false
return nil return nil
} }
@ -3191,6 +3083,9 @@ func (mp *tagToMetricIDsRowParser) MetricIDsLen() int {
// ParseMetricIDs parses MetricIDs from mp.tail into mp.MetricIDs. // ParseMetricIDs parses MetricIDs from mp.tail into mp.MetricIDs.
func (mp *tagToMetricIDsRowParser) ParseMetricIDs() { func (mp *tagToMetricIDsRowParser) ParseMetricIDs() {
if mp.metricIDsParsed {
return
}
tail := mp.tail tail := mp.tail
mp.MetricIDs = mp.MetricIDs[:0] mp.MetricIDs = mp.MetricIDs[:0]
n := len(tail) / 8 n := len(tail) / 8
@ -3210,6 +3105,24 @@ func (mp *tagToMetricIDsRowParser) ParseMetricIDs() {
metricIDs[i] = metricID metricIDs[i] = metricID
tail = tail[8:] tail = tail[8:]
} }
mp.metricIDsParsed = true
}
// GetMatchingSeriesCount returns the number of series in mp, which match metricIDs from the given filter.
//
// if filter is empty, then all series in mp are taken into account.
func (mp *tagToMetricIDsRowParser) GetMatchingSeriesCount(filter *uint64set.Set) int {
if filter == nil {
return mp.MetricIDsLen()
}
mp.ParseMetricIDs()
n := 0
for _, metricID := range mp.MetricIDs {
if filter.Has(metricID) {
n++
}
}
return n
} }
// IsDeletedTag verifies whether the tag from mp is deleted according to dmis. // IsDeletedTag verifies whether the tag from mp is deleted according to dmis.

View File

@ -703,9 +703,9 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
} }
func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isConcurrent bool) error { func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isConcurrent bool) error {
hasValue := func(tvs []string, v []byte) bool { hasValue := func(lvs []string, v []byte) bool {
for _, tv := range tvs { for _, lv := range lvs {
if string(v) == tv { if string(v) == lv {
return true return true
} }
} }
@ -715,7 +715,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
timeseriesCounters := make(map[uint64]bool) timeseriesCounters := make(map[uint64]bool)
var tsidCopy TSID var tsidCopy TSID
var metricNameCopy []byte var metricNameCopy []byte
allKeys := make(map[string]bool) allLabelNames := make(map[string]bool)
for i := range mns { for i := range mns {
mn := &mns[i] mn := &mns[i]
tsid := &tsids[i] tsid := &tsids[i]
@ -757,38 +757,38 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
return fmt.Errorf("expecting empty buf when searching for non-existent metricID; got %X", buf) return fmt.Errorf("expecting empty buf when searching for non-existent metricID; got %X", buf)
} }
// Test SearchTagValues // Test SearchLabelValuesWithFiltersOnTimeRange
tvs, err := db.SearchTagValues(nil, 1e5, noDeadline) lvs, err := db.SearchLabelValuesWithFiltersOnTimeRange(nil, "__name__", nil, TimeRange{}, 1e5, 1e9, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagValues for __name__: %w", err) return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange(labelName=%q): %w", "__name__", err)
} }
if !hasValue(tvs, mn.MetricGroup) { if !hasValue(lvs, mn.MetricGroup) {
return fmt.Errorf("SearchTagValues couldn't find %q; found %q", mn.MetricGroup, tvs) return fmt.Errorf("SearchLabelValuesWithFiltersOnTimeRange(labelName=%q): couldn't find %q; found %q", "__name__", mn.MetricGroup, lvs)
} }
for i := range mn.Tags { for i := range mn.Tags {
tag := &mn.Tags[i] tag := &mn.Tags[i]
tvs, err := db.SearchTagValues(tag.Key, 1e5, noDeadline) lvs, err := db.SearchLabelValuesWithFiltersOnTimeRange(nil, string(tag.Key), nil, TimeRange{}, 1e5, 1e9, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagValues for __name__: %w", err) return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange(labelName=%q): %w", tag.Key, err)
} }
if !hasValue(tvs, tag.Value) { if !hasValue(lvs, tag.Value) {
return fmt.Errorf("SearchTagValues couldn't find %q=%q; found %q", tag.Key, tag.Value, tvs) return fmt.Errorf("SearchLabelValuesWithFiltersOnTimeRange(labelName=%q): couldn't find %q; found %q", tag.Key, tag.Value, lvs)
} }
allKeys[string(tag.Key)] = true allLabelNames[string(tag.Key)] = true
} }
} }
// Test SearchTagKeys // Test SearchLabelNamesWithFiltersOnTimeRange (empty filters, global time range)
tks, err := db.SearchTagKeys(1e5, noDeadline) lns, err := db.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagKeys: %w", err) return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange(empty filter, global time range): %w", err)
} }
if !hasValue(tks, nil) { if !hasValue(lns, []byte("__name__")) {
return fmt.Errorf("cannot find __name__ in %q", tks) return fmt.Errorf("cannot find __name__ in %q", lns)
} }
for key := range allKeys { for labelName := range allLabelNames {
if !hasValue(tks, []byte(key)) { if !hasValue(lns, []byte(labelName)) {
return fmt.Errorf("cannot find %q in %q", key, tks) return fmt.Errorf("cannot find %q in %q", labelName, lns)
} }
} }
@ -1633,13 +1633,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
var metricNameBuf []byte var metricNameBuf []byte
perDayMetricIDs := make(map[uint64]*uint64set.Set) perDayMetricIDs := make(map[uint64]*uint64set.Set)
var allMetricIDs uint64set.Set var allMetricIDs uint64set.Set
tagKeys := []string{ labelNames := []string{
"", "constant", "day", "uniqueid", "__name__", "constant", "day", "uniqueid",
} }
tagValues := []string{ labelValues := []string{
"testMetric", "testMetric",
} }
sort.Strings(tagKeys) sort.Strings(labelNames)
for day := 0; day < days; day++ { for day := 0; day < days; day++ {
var tsids []TSID var tsids []TSID
var mns []MetricName var mns []MetricName
@ -1709,30 +1709,28 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil)) t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
} }
// Check SearchTagKeysOnTimeRange. // Check SearchLabelNamesWithFiltersOnTimeRange with the specified time range.
tks, err := db.SearchTagKeysOnTimeRange(TimeRange{ tr := TimeRange{
MinTimestamp: int64(now) - msecPerDay, MinTimestamp: int64(now) - msecPerDay,
MaxTimestamp: int64(now), MaxTimestamp: int64(now),
}, 10000, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchTagKeysOnTimeRange: %s", err)
} }
sort.Strings(tks) lns, err := db.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, tr, 10000, 1e9, noDeadline)
if !reflect.DeepEqual(tks, tagKeys) { if err != nil {
t.Fatalf("unexpected tagKeys; got\n%s\nwant\n%s", tks, tagKeys) t.Fatalf("unexpected error in SearchLabelNamesWithFiltersOnTimeRange(timeRange=%s): %s", &tr, err)
}
sort.Strings(lns)
if !reflect.DeepEqual(lns, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", lns, labelNames)
} }
// Check SearchTagValuesOnTimeRange. // Check SearchLabelValuesWithFiltersOnTimeRange with the specified time range.
tvs, err := db.SearchTagValuesOnTimeRange([]byte(""), TimeRange{ lvs, err := db.SearchLabelValuesWithFiltersOnTimeRange(nil, "", nil, tr, 10000, 1e9, noDeadline)
MinTimestamp: int64(now) - msecPerDay,
MaxTimestamp: int64(now),
}, 10000, noDeadline)
if err != nil { if err != nil {
t.Fatalf("unexpected error in SearchTagValuesOnTimeRange: %s", err) t.Fatalf("unexpected error in SearchLabelValuesWithFiltersOnTimeRange(timeRange=%s): %s", &tr, err)
} }
sort.Strings(tvs) sort.Strings(lvs)
if !reflect.DeepEqual(tvs, tagValues) { if !reflect.DeepEqual(lvs, labelValues) {
t.Fatalf("unexpected tagValues; got\n%s\nwant\n%s", tvs, tagValues) t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", lvs, labelValues)
} }
// Create a filter that will match series that occur across multiple days // Create a filter that will match series that occur across multiple days
@ -1743,7 +1741,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
// Perform a search within a day. // Perform a search within a day.
// This should return the metrics for the day // This should return the metrics for the day
tr := TimeRange{ tr = TimeRange{
MinTimestamp: int64(now - 2*msecPerHour - 1), MinTimestamp: int64(now - 2*msecPerHour - 1),
MaxTimestamp: int64(now), MaxTimestamp: int64(now),
} }
@ -1755,6 +1753,46 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
t.Fatalf("expected %d time series for current day, got %d time series", metricsPerDay, len(matchedTSIDs)) t.Fatalf("expected %d time series for current day, got %d time series", metricsPerDay, len(matchedTSIDs))
} }
// Check SearchLabelNamesWithFiltersOnTimeRange with the specified filter.
lns, err = db.SearchLabelNamesWithFiltersOnTimeRange(nil, []*TagFilters{tfs}, TimeRange{}, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelNamesWithFiltersOnTimeRange(filters=%s): %s", tfs, err)
}
sort.Strings(lns)
if !reflect.DeepEqual(lns, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", lns, labelNames)
}
// Check SearchLabelNamesWithFiltersOnTimeRange with the specified filter and time range.
lns, err = db.SearchLabelNamesWithFiltersOnTimeRange(nil, []*TagFilters{tfs}, tr, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelNamesWithFiltersOnTimeRange(filters=%s, timeRange=%s): %s", tfs, &tr, err)
}
sort.Strings(lns)
if !reflect.DeepEqual(lns, labelNames) {
t.Fatalf("unexpected labelNames; got\n%s\nwant\n%s", lns, labelNames)
}
// Check SearchLabelValuesWithFiltersOnTimeRange with the specified filter.
lvs, err = db.SearchLabelValuesWithFiltersOnTimeRange(nil, "", []*TagFilters{tfs}, TimeRange{}, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelValuesWithFiltersOnTimeRange(filters=%s): %s", tfs, err)
}
sort.Strings(lvs)
if !reflect.DeepEqual(lvs, labelValues) {
t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", lvs, labelValues)
}
// Check SearchLabelValuesWithFiltersOnTimeRange with the specified filter and time range.
lvs, err = db.SearchLabelValuesWithFiltersOnTimeRange(nil, "", []*TagFilters{tfs}, tr, 10000, 1e9, noDeadline)
if err != nil {
t.Fatalf("unexpected error in SearchLabelValuesWithFiltersOnTimeRange(filters=%s, timeRange=%s): %s", tfs, &tr, err)
}
sort.Strings(lvs)
if !reflect.DeepEqual(lvs, labelValues) {
t.Fatalf("unexpected labelValues; got\n%s\nwant\n%s", lvs, labelValues)
}
// Perform a search across all the days, should match all metrics // Perform a search across all the days, should match all metrics
tr = TimeRange{ tr = TimeRange{
MinTimestamp: int64(now - msecPerDay*days), MinTimestamp: int64(now - msecPerDay*days),

View File

@ -1263,24 +1263,15 @@ func (s *Storage) DeleteMetrics(tfss []*TagFilters) (int, error) {
return deletedCount, nil return deletedCount, nil
} }
// SearchTagKeysOnTimeRange searches for tag keys on tr. // SearchLabelNamesWithFiltersOnTimeRange searches for label names matching the given tfss on tr.
func (s *Storage) SearchTagKeysOnTimeRange(tr TimeRange, maxTagKeys int, deadline uint64) ([]string, error) { func (s *Storage) SearchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxLabelNames, maxMetrics int, deadline uint64) ([]string, error) {
return s.idb().SearchTagKeysOnTimeRange(tr, maxTagKeys, deadline) return s.idb().SearchLabelNamesWithFiltersOnTimeRange(qt, tfss, tr, maxLabelNames, maxMetrics, deadline)
} }
// SearchTagKeys searches for tag keys // SearchLabelValuesWithFiltersOnTimeRange searches for label values for the given labelName, filters and tr.
func (s *Storage) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) { func (s *Storage) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, labelName string, tfss []*TagFilters,
return s.idb().SearchTagKeys(maxTagKeys, deadline) tr TimeRange, maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) {
} return s.idb().SearchLabelValuesWithFiltersOnTimeRange(qt, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
// SearchTagValuesOnTimeRange searches for tag values for the given tagKey on tr.
func (s *Storage) SearchTagValuesOnTimeRange(tagKey []byte, tr TimeRange, maxTagValues int, deadline uint64) ([]string, error) {
return s.idb().SearchTagValuesOnTimeRange(tagKey, tr, maxTagValues, deadline)
}
// SearchTagValues searches for tag values for the given tagKey
func (s *Storage) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
return s.idb().SearchTagValues(tagKey, maxTagValues, deadline)
} }
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr. // SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
@ -1468,39 +1459,6 @@ func getRegexpPartsForGraphiteQuery(q string) ([]string, string) {
} }
} }
// SearchTagEntries returns a list of (tagName -> tagValues)
func (s *Storage) SearchTagEntries(maxTagKeys, maxTagValues int, deadline uint64) ([]TagEntry, error) {
idb := s.idb()
keys, err := idb.SearchTagKeys(maxTagKeys, deadline)
if err != nil {
return nil, fmt.Errorf("cannot search tag keys: %w", err)
}
// Sort keys for faster seeks below
sort.Strings(keys)
tes := make([]TagEntry, len(keys))
for i, key := range keys {
values, err := idb.SearchTagValues([]byte(key), maxTagValues, deadline)
if err != nil {
return nil, fmt.Errorf("cannot search values for tag %q: %w", key, err)
}
te := &tes[i]
te.Key = key
te.Values = values
}
return tes, nil
}
// TagEntry contains (tagName -> tagValues) mapping
type TagEntry struct {
// Key is tagName
Key string
// Values contains all the values for Key.
Values []string
}
// GetSeriesCount returns the approximate number of unique time series. // GetSeriesCount returns the approximate number of unique time series.
// //
// It includes the deleted series too and may count the same series // It includes the deleted series too and may count the same series

View File

@ -502,13 +502,13 @@ func TestStorageDeleteMetrics(t *testing.T) {
t.Fatalf("cannot open storage: %s", err) t.Fatalf("cannot open storage: %s", err)
} }
// Verify no tag keys exist // Verify no label names exist
tks, err := s.SearchTagKeys(1e5, noDeadline) lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
if err != nil { if err != nil {
t.Fatalf("error in SearchTagKeys at the start: %s", err) t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err)
} }
if len(tks) != 0 { if len(lns) != 0 {
t.Fatalf("found non-empty tag keys at the start: %q", tks) t.Fatalf("found non-empty tag keys at the start: %q", lns)
} }
t.Run("serial", func(t *testing.T) { t.Run("serial", func(t *testing.T) {
@ -554,12 +554,12 @@ func TestStorageDeleteMetrics(t *testing.T) {
}) })
// Verify no more tag keys exist // Verify no more tag keys exist
tks, err = s.SearchTagKeys(1e5, noDeadline) lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
if err != nil { if err != nil {
t.Fatalf("error in SearchTagKeys after the test: %s", err) t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange after the test: %s", err)
} }
if len(tks) != 0 { if len(lns) != 0 {
t.Fatalf("found non-empty tag keys after the test: %q", tks) t.Fatalf("found non-empty tag keys after the test: %q", lns)
} }
s.MustClose() s.MustClose()
@ -574,8 +574,8 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
workerTag := []byte(fmt.Sprintf("workerTag_%d", workerNum)) workerTag := []byte(fmt.Sprintf("workerTag_%d", workerNum))
tksAll := make(map[string]bool) lnsAll := make(map[string]bool)
tksAll[""] = true // __name__ lnsAll["__name__"] = true
for i := 0; i < metricsCount; i++ { for i := 0; i < metricsCount; i++ {
var mrs []MetricRow var mrs []MetricRow
var mn MetricName var mn MetricName
@ -587,7 +587,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
{workerTag, []byte("foobar")}, {workerTag, []byte("foobar")},
} }
for i := range mn.Tags { for i := range mn.Tags {
tksAll[string(mn.Tags[i].Key)] = true lnsAll[string(mn.Tags[i].Key)] = true
} }
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", i, workerNum)) mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", i, workerNum))
metricNameRaw := mn.marshalRaw(nil) metricNameRaw := mn.marshalRaw(nil)
@ -610,21 +610,21 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
s.DebugFlush() s.DebugFlush()
// Verify tag values exist // Verify tag values exist
tvs, err := s.SearchTagValues(workerTag, 1e5, noDeadline) tvs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagValues before metrics removal: %w", err) return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange before metrics removal: %w", err)
} }
if len(tvs) == 0 { if len(tvs) == 0 {
return fmt.Errorf("unexpected empty number of tag values for workerTag") return fmt.Errorf("unexpected empty number of tag values for workerTag")
} }
// Verify tag keys exist // Verify tag keys exist
tks, err := s.SearchTagKeys(1e5, noDeadline) lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagKeys before metrics removal: %w", err) return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange before metrics removal: %w", err)
} }
if err := checkTagKeys(tks, tksAll); err != nil { if err := checkLabelNames(lns, lnsAll); err != nil {
return fmt.Errorf("unexpected tag keys before metrics removal: %w", err) return fmt.Errorf("unexpected label names before metrics removal: %w", err)
} }
var sr Search var sr Search
@ -683,9 +683,9 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
if n := metricBlocksCount(tfs); n != 0 { if n := metricBlocksCount(tfs); n != 0 {
return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n) return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n)
} }
tvs, err = s.SearchTagValues(workerTag, 1e5, noDeadline) tvs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagValues after all the metrics are removed: %w", err) return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange after all the metrics are removed: %w", err)
} }
if len(tvs) != 0 { if len(tvs) != 0 {
return fmt.Errorf("found non-empty tag values for %q after metrics removal: %q", workerTag, tvs) return fmt.Errorf("found non-empty tag values for %q after metrics removal: %q", workerTag, tvs)
@ -694,21 +694,21 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
return nil return nil
} }
func checkTagKeys(tks []string, tksExpected map[string]bool) error { func checkLabelNames(lns []string, lnsExpected map[string]bool) error {
if len(tks) < len(tksExpected) { if len(lns) < len(lnsExpected) {
return fmt.Errorf("unexpected number of tag keys found; got %d; want at least %d; tks=%q, tksExpected=%v", len(tks), len(tksExpected), tks, tksExpected) return fmt.Errorf("unexpected number of label names found; got %d; want at least %d; lns=%q, lnsExpected=%v", len(lns), len(lnsExpected), lns, lnsExpected)
} }
hasItem := func(k string, tks []string) bool { hasItem := func(s string, lns []string) bool {
for _, kk := range tks { for _, labelName := range lns {
if k == kk { if s == labelName {
return true return true
} }
} }
return false return false
} }
for k := range tksExpected { for labelName := range lnsExpected {
if !hasItem(k, tks) { if !hasItem(labelName, lns) {
return fmt.Errorf("cannot find %q in tag keys %q", k, tks) return fmt.Errorf("cannot find %q in label names %q", labelName, lns)
} }
} }
return nil return nil
@ -796,23 +796,23 @@ func testStorageRegisterMetricNames(s *Storage) error {
// Verify the storage contains the added metric names. // Verify the storage contains the added metric names.
s.DebugFlush() s.DebugFlush()
// Verify that SearchTagKeys returns correct result. // Verify that SearchLabelNamesWithFiltersOnTimeRange returns correct result.
tksExpected := []string{ lnsExpected := []string{
"", "__name__",
"add_id", "add_id",
"instance", "instance",
"job", "job",
} }
tks, err := s.SearchTagKeys(100, noDeadline) lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 100, 1e9, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagKeys: %w", err) return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err)
} }
sort.Strings(tks) sort.Strings(lns)
if !reflect.DeepEqual(tks, tksExpected) { if !reflect.DeepEqual(lns, lnsExpected) {
return fmt.Errorf("unexpected tag keys returned from SearchTagKeys;\ngot\n%q\nwant\n%q", tks, tksExpected) return fmt.Errorf("unexpected label names returned from SearchLabelNamesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", lns, lnsExpected)
} }
// Verify that SearchTagKeysOnTimeRange returns correct result. // Verify that SearchLabelNamesWithFiltersOnTimeRange with the specified timr range returns correct result.
now := timestampFromTime(time.Now()) now := timestampFromTime(time.Now())
start := now - msecPerDay start := now - msecPerDay
end := now + 60*1000 end := now + 60*1000
@ -820,33 +820,33 @@ func testStorageRegisterMetricNames(s *Storage) error {
MinTimestamp: start, MinTimestamp: start,
MaxTimestamp: end, MaxTimestamp: end,
} }
tks, err = s.SearchTagKeysOnTimeRange(tr, 100, noDeadline) lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, tr, 100, 1e9, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagKeysOnTimeRange: %w", err) return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err)
} }
sort.Strings(tks) sort.Strings(lns)
if !reflect.DeepEqual(tks, tksExpected) { if !reflect.DeepEqual(lns, lnsExpected) {
return fmt.Errorf("unexpected tag keys returned from SearchTagKeysOnTimeRange;\ngot\n%q\nwant\n%q", tks, tksExpected) return fmt.Errorf("unexpected label names returned from SearchLabelNamesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", lns, lnsExpected)
} }
// Verify that SearchTagValues returns correct result. // Verify that SearchLabelValuesWithFiltersOnTimeRange returns correct result.
addIDs, err := s.SearchTagValues([]byte("add_id"), addsCount+100, noDeadline) addIDs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, "add_id", nil, TimeRange{}, addsCount+100, 1e9, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagValues: %w", err) return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err)
} }
sort.Strings(addIDs) sort.Strings(addIDs)
if !reflect.DeepEqual(addIDs, addIDsExpected) { if !reflect.DeepEqual(addIDs, addIDsExpected) {
return fmt.Errorf("unexpected tag values returned from SearchTagValues;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected) return fmt.Errorf("unexpected tag values returned from SearchLabelValuesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
} }
// Verify that SearchTagValuesOnTimeRange returns correct result. // Verify that SearchLabelValuesWithFiltersOnTimeRange with the specified time range returns correct result.
addIDs, err = s.SearchTagValuesOnTimeRange([]byte("add_id"), tr, addsCount+100, noDeadline) addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, "add_id", nil, tr, addsCount+100, 1e9, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagValuesOnTimeRange: %w", err) return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err)
} }
sort.Strings(addIDs) sort.Strings(addIDs)
if !reflect.DeepEqual(addIDs, addIDsExpected) { if !reflect.DeepEqual(addIDs, addIDsExpected) {
return fmt.Errorf("unexpected tag values returned from SearchTagValuesOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected) return fmt.Errorf("unexpected tag values returned from SearchLabelValuesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
} }
// Verify that SearchMetricNames returns correct result. // Verify that SearchMetricNames returns correct result.