mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
all: improve query tracing coverage for indexdb search
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1403
This commit is contained in:
parent
a30333a79e
commit
2bcb960f17
@ -269,7 +269,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||||||
case "/api/v1/status/tsdb":
|
case "/api/v1/status/tsdb":
|
||||||
statusTSDBRequests.Inc()
|
statusTSDBRequests.Inc()
|
||||||
httpserver.EnableCORS(w, r)
|
httpserver.EnableCORS(w, r)
|
||||||
if err := prometheus.TSDBStatusHandler(startTime, w, r); err != nil {
|
if err := prometheus.TSDBStatusHandler(qt, startTime, w, r); err != nil {
|
||||||
statusTSDBErrors.Inc()
|
statusTSDBErrors.Inc()
|
||||||
sendPrometheusError(w, r, err)
|
sendPrometheusError(w, r, err)
|
||||||
return true
|
return true
|
||||||
|
@ -841,7 +841,7 @@ func GetTSDBStatusForDate(qt *querytracer.Tracer, deadline searchutils.Deadline,
|
|||||||
if deadline.Exceeded() {
|
if deadline.Exceeded() {
|
||||||
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
}
|
}
|
||||||
status, err := vmstorage.GetTSDBStatusForDate(date, topN, maxMetrics, deadline.Deadline())
|
status, err := vmstorage.GetTSDBStatusForDate(qt, date, topN, maxMetrics, deadline.Deadline())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error during tsdb status request: %w", err)
|
return nil, fmt.Errorf("error during tsdb status request: %w", err)
|
||||||
}
|
}
|
||||||
@ -866,7 +866,7 @@ func GetTSDBStatusWithFilters(qt *querytracer.Tracer, deadline searchutils.Deadl
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
date := uint64(tr.MinTimestamp) / (3600 * 24 * 1000)
|
date := uint64(tr.MinTimestamp) / (3600 * 24 * 1000)
|
||||||
status, err := vmstorage.GetTSDBStatusWithFiltersForDate(tfss, date, topN, sq.MaxMetrics, deadline.Deadline())
|
status, err := vmstorage.GetTSDBStatusWithFiltersForDate(qt, tfss, date, topN, sq.MaxMetrics, deadline.Deadline())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error during tsdb status with filters request: %w", err)
|
return nil, fmt.Errorf("error during tsdb status with filters request: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -624,7 +624,7 @@ const secsPerDay = 3600 * 24
|
|||||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
||||||
//
|
//
|
||||||
// It can accept `match[]` filters in order to narrow down the search.
|
// It can accept `match[]` filters in order to narrow down the search.
|
||||||
func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
|
func TSDBStatusHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWriter, r *http.Request) error {
|
||||||
defer tsdbStatusDuration.UpdateDuration(startTime)
|
defer tsdbStatusDuration.UpdateDuration(startTime)
|
||||||
|
|
||||||
deadline := searchutils.GetDeadlineForStatusRequest(r, startTime)
|
deadline := searchutils.GetDeadlineForStatusRequest(r, startTime)
|
||||||
@ -660,12 +660,12 @@ func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque
|
|||||||
}
|
}
|
||||||
var status *storage.TSDBStatus
|
var status *storage.TSDBStatus
|
||||||
if len(matches) == 0 && len(etfs) == 0 {
|
if len(matches) == 0 && len(etfs) == 0 {
|
||||||
status, err = netstorage.GetTSDBStatusForDate(nil, deadline, date, topN, *maxTSDBStatusSeries)
|
status, err = netstorage.GetTSDBStatusForDate(qt, deadline, date, topN, *maxTSDBStatusSeries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err)
|
return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %w`, date, topN, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
status, err = tsdbStatusWithMatches(matches, etfs, date, topN, *maxTSDBStatusSeries, deadline)
|
status, err = tsdbStatusWithMatches(qt, matches, etfs, date, topN, *maxTSDBStatusSeries, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot obtain tsdb status with matches for date=%d, topN=%d: %w", date, topN, err)
|
return fmt.Errorf("cannot obtain tsdb status with matches for date=%d, topN=%d: %w", date, topN, err)
|
||||||
}
|
}
|
||||||
@ -673,14 +673,14 @@ func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque
|
|||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
bw := bufferedwriter.Get(w)
|
bw := bufferedwriter.Get(w)
|
||||||
defer bufferedwriter.Put(bw)
|
defer bufferedwriter.Put(bw)
|
||||||
WriteTSDBStatusResponse(bw, status)
|
WriteTSDBStatusResponse(bw, status, qt)
|
||||||
if err := bw.Flush(); err != nil {
|
if err := bw.Flush(); err != nil {
|
||||||
return fmt.Errorf("cannot send tsdb status response to remote client: %w", err)
|
return fmt.Errorf("cannot send tsdb status response to remote client: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func tsdbStatusWithMatches(matches []string, etfs [][]storage.TagFilter, date uint64, topN, maxMetrics int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) {
|
func tsdbStatusWithMatches(qt *querytracer.Tracer, matches []string, etfs [][]storage.TagFilter, date uint64, topN, maxMetrics int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) {
|
||||||
tagFilterss, err := getTagFilterssFromMatches(matches)
|
tagFilterss, err := getTagFilterssFromMatches(matches)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -692,7 +692,7 @@ func tsdbStatusWithMatches(matches []string, etfs [][]storage.TagFilter, date ui
|
|||||||
start := int64(date*secsPerDay) * 1000
|
start := int64(date*secsPerDay) * 1000
|
||||||
end := int64(date*secsPerDay+secsPerDay) * 1000
|
end := int64(date*secsPerDay+secsPerDay) * 1000
|
||||||
sq := storage.NewSearchQuery(start, end, tagFilterss, maxMetrics)
|
sq := storage.NewSearchQuery(start, end, tagFilterss, maxMetrics)
|
||||||
status, err := netstorage.GetTSDBStatusWithFilters(nil, deadline, sq, topN)
|
status, err := netstorage.GetTSDBStatusWithFilters(qt, deadline, sq, topN)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,11 @@
|
|||||||
{% import "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" %}
|
{% import (
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
|
) %}
|
||||||
|
|
||||||
{% stripspace %}
|
{% stripspace %}
|
||||||
TSDBStatusResponse generates response for /api/v1/status/tsdb .
|
TSDBStatusResponse generates response for /api/v1/status/tsdb .
|
||||||
{% func TSDBStatusResponse(status *storage.TSDBStatus) %}
|
{% func TSDBStatusResponse(status *storage.TSDBStatus, qt *querytracer.Tracer) %}
|
||||||
{
|
{
|
||||||
"status":"success",
|
"status":"success",
|
||||||
"data":{
|
"data":{
|
||||||
@ -12,6 +15,8 @@ TSDBStatusResponse generates response for /api/v1/status/tsdb .
|
|||||||
"seriesCountByLabelValuePair":{%= tsdbStatusEntries(status.SeriesCountByLabelValuePair) %},
|
"seriesCountByLabelValuePair":{%= tsdbStatusEntries(status.SeriesCountByLabelValuePair) %},
|
||||||
"labelValueCountByLabelName":{%= tsdbStatusEntries(status.LabelValueCountByLabelName) %}
|
"labelValueCountByLabelName":{%= tsdbStatusEntries(status.LabelValueCountByLabelName) %}
|
||||||
}
|
}
|
||||||
|
{% code qt.Done() %}
|
||||||
|
{%= dumpQueryTrace(qt) %}
|
||||||
}
|
}
|
||||||
{% endfunc %}
|
{% endfunc %}
|
||||||
|
|
||||||
|
@ -5,127 +5,137 @@
|
|||||||
package prometheus
|
package prometheus
|
||||||
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:1
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:1
|
||||||
import "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
import (
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
|
)
|
||||||
|
|
||||||
// TSDBStatusResponse generates response for /api/v1/status/tsdb .
|
// TSDBStatusResponse generates response for /api/v1/status/tsdb .
|
||||||
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:5
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:8
|
||||||
import (
|
import (
|
||||||
qtio422016 "io"
|
qtio422016 "io"
|
||||||
|
|
||||||
qt422016 "github.com/valyala/quicktemplate"
|
qt422016 "github.com/valyala/quicktemplate"
|
||||||
)
|
)
|
||||||
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:5
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:8
|
||||||
var (
|
var (
|
||||||
_ = qtio422016.Copy
|
_ = qtio422016.Copy
|
||||||
_ = qt422016.AcquireByteBuffer
|
_ = qt422016.AcquireByteBuffer
|
||||||
)
|
)
|
||||||
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:5
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:8
|
||||||
func StreamTSDBStatusResponse(qw422016 *qt422016.Writer, status *storage.TSDBStatus) {
|
func StreamTSDBStatusResponse(qw422016 *qt422016.Writer, status *storage.TSDBStatus, qt *querytracer.Tracer) {
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:5
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:8
|
||||||
qw422016.N().S(`{"status":"success","data":{"totalSeries":`)
|
qw422016.N().S(`{"status":"success","data":{"totalSeries":`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:9
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:12
|
||||||
qw422016.N().DUL(status.TotalSeries)
|
qw422016.N().DUL(status.TotalSeries)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:9
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:12
|
||||||
qw422016.N().S(`,"totalLabelValuePairs":`)
|
qw422016.N().S(`,"totalLabelValuePairs":`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:10
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:13
|
||||||
qw422016.N().DUL(status.TotalLabelValuePairs)
|
qw422016.N().DUL(status.TotalLabelValuePairs)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:10
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:13
|
||||||
qw422016.N().S(`,"seriesCountByMetricName":`)
|
qw422016.N().S(`,"seriesCountByMetricName":`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:11
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
|
||||||
streamtsdbStatusEntries(qw422016, status.SeriesCountByMetricName)
|
streamtsdbStatusEntries(qw422016, status.SeriesCountByMetricName)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:11
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:14
|
||||||
qw422016.N().S(`,"seriesCountByLabelValuePair":`)
|
qw422016.N().S(`,"seriesCountByLabelValuePair":`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:12
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:15
|
||||||
streamtsdbStatusEntries(qw422016, status.SeriesCountByLabelValuePair)
|
streamtsdbStatusEntries(qw422016, status.SeriesCountByLabelValuePair)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:12
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:15
|
||||||
qw422016.N().S(`,"labelValueCountByLabelName":`)
|
qw422016.N().S(`,"labelValueCountByLabelName":`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:13
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
||||||
streamtsdbStatusEntries(qw422016, status.LabelValueCountByLabelName)
|
streamtsdbStatusEntries(qw422016, status.LabelValueCountByLabelName)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:13
|
|
||||||
qw422016.N().S(`}}`)
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
||||||
}
|
qw422016.N().S(`}`)
|
||||||
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
func WriteTSDBStatusResponse(qq422016 qtio422016.Writer, status *storage.TSDBStatus) {
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
StreamTSDBStatusResponse(qw422016, status)
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
qt422016.ReleaseWriter(qw422016)
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
}
|
|
||||||
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
func TSDBStatusResponse(status *storage.TSDBStatus) string {
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
qb422016 := qt422016.AcquireByteBuffer()
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
WriteTSDBStatusResponse(qb422016, status)
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
qs422016 := string(qb422016.B)
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
qt422016.ReleaseByteBuffer(qb422016)
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
return qs422016
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:16
|
|
||||||
}
|
|
||||||
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:18
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:18
|
||||||
|
qt.Done()
|
||||||
|
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:19
|
||||||
|
streamdumpQueryTrace(qw422016, qt)
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:19
|
||||||
|
qw422016.N().S(`}`)
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
}
|
||||||
|
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
func WriteTSDBStatusResponse(qq422016 qtio422016.Writer, status *storage.TSDBStatus, qt *querytracer.Tracer) {
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
StreamTSDBStatusResponse(qw422016, status, qt)
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
qt422016.ReleaseWriter(qw422016)
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
}
|
||||||
|
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
func TSDBStatusResponse(status *storage.TSDBStatus, qt *querytracer.Tracer) string {
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
qb422016 := qt422016.AcquireByteBuffer()
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
WriteTSDBStatusResponse(qb422016, status, qt)
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
qs422016 := string(qb422016.B)
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
qt422016.ReleaseByteBuffer(qb422016)
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
return qs422016
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:21
|
||||||
|
}
|
||||||
|
|
||||||
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||||
func streamtsdbStatusEntries(qw422016 *qt422016.Writer, a []storage.TopHeapEntry) {
|
func streamtsdbStatusEntries(qw422016 *qt422016.Writer, a []storage.TopHeapEntry) {
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:18
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
||||||
qw422016.N().S(`[`)
|
qw422016.N().S(`[`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:20
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:25
|
||||||
for i, e := range a {
|
for i, e := range a {
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:20
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:25
|
||||||
qw422016.N().S(`{"name":`)
|
qw422016.N().S(`{"name":`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:22
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:27
|
||||||
qw422016.N().Q(e.Name)
|
qw422016.N().Q(e.Name)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:22
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:27
|
||||||
qw422016.N().S(`,"value":`)
|
qw422016.N().S(`,"value":`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
||||||
qw422016.N().D(int(e.Count))
|
qw422016.N().D(int(e.Count))
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:23
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
||||||
qw422016.N().S(`}`)
|
qw422016.N().S(`}`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:25
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:30
|
||||||
if i+1 < len(a) {
|
if i+1 < len(a) {
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:25
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:30
|
||||||
qw422016.N().S(`,`)
|
qw422016.N().S(`,`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:25
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:30
|
||||||
}
|
}
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:31
|
||||||
}
|
}
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:26
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:31
|
||||||
qw422016.N().S(`]`)
|
qw422016.N().S(`]`)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
}
|
}
|
||||||
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
func writetsdbStatusEntries(qq422016 qtio422016.Writer, a []storage.TopHeapEntry) {
|
func writetsdbStatusEntries(qq422016 qtio422016.Writer, a []storage.TopHeapEntry) {
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
streamtsdbStatusEntries(qw422016, a)
|
streamtsdbStatusEntries(qw422016, a)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
qt422016.ReleaseWriter(qw422016)
|
qt422016.ReleaseWriter(qw422016)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
}
|
}
|
||||||
|
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
func tsdbStatusEntries(a []storage.TopHeapEntry) string {
|
func tsdbStatusEntries(a []storage.TopHeapEntry) string {
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
qb422016 := qt422016.AcquireByteBuffer()
|
qb422016 := qt422016.AcquireByteBuffer()
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
writetsdbStatusEntries(qb422016, a)
|
writetsdbStatusEntries(qb422016, a)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
qs422016 := string(qb422016.B)
|
qs422016 := string(qb422016.B)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
qt422016.ReleaseByteBuffer(qb422016)
|
qt422016.ReleaseByteBuffer(qb422016)
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
return qs422016
|
return qs422016
|
||||||
//line app/vmselect/prometheus/tsdb_status_response.qtpl:28
|
//line app/vmselect/prometheus/tsdb_status_response.qtpl:33
|
||||||
}
|
}
|
||||||
|
@ -213,12 +213,16 @@ func GetExtraTagFilters(r *http.Request) ([][]storage.TagFilter, error) {
|
|||||||
if len(tmp) != 2 {
|
if len(tmp) != 2 {
|
||||||
return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", match)
|
return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", match)
|
||||||
}
|
}
|
||||||
|
if tmp[0] == "__name__" {
|
||||||
|
// This is required for storage.Search.
|
||||||
|
tmp[0] = ""
|
||||||
|
}
|
||||||
tagFilters = append(tagFilters, storage.TagFilter{
|
tagFilters = append(tagFilters, storage.TagFilter{
|
||||||
Key: []byte(tmp[0]),
|
Key: []byte(tmp[0]),
|
||||||
Value: []byte(tmp[1]),
|
Value: []byte(tmp[1]),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
extraFilters := r.Form["extra_filters"]
|
extraFilters := append([]string{}, r.Form["extra_filters"]...)
|
||||||
extraFilters = append(extraFilters, r.Form["extra_filters[]"]...)
|
extraFilters = append(extraFilters, r.Form["extra_filters[]"]...)
|
||||||
if len(extraFilters) == 0 {
|
if len(extraFilters) == 0 {
|
||||||
if len(tagFilters) == 0 {
|
if len(tagFilters) == 0 {
|
||||||
|
@ -239,17 +239,17 @@ func SearchTagEntries(maxTagKeys, maxTagValues int, deadline uint64) ([]storage.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetTSDBStatusForDate returns TSDB status for the given date.
|
// GetTSDBStatusForDate returns TSDB status for the given date.
|
||||||
func GetTSDBStatusForDate(date uint64, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) {
|
func GetTSDBStatusForDate(qt *querytracer.Tracer, date uint64, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) {
|
||||||
WG.Add(1)
|
WG.Add(1)
|
||||||
status, err := Storage.GetTSDBStatusWithFiltersForDate(nil, date, topN, maxMetrics, deadline)
|
status, err := Storage.GetTSDBStatusWithFiltersForDate(qt, nil, date, topN, maxMetrics, deadline)
|
||||||
WG.Done()
|
WG.Done()
|
||||||
return status, err
|
return status, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetTSDBStatusWithFiltersForDate returns TSDB status for given filters on the given date.
|
// GetTSDBStatusWithFiltersForDate returns TSDB status for given filters on the given date.
|
||||||
func GetTSDBStatusWithFiltersForDate(tfss []*storage.TagFilters, date uint64, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) {
|
func GetTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, tfss []*storage.TagFilters, date uint64, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) {
|
||||||
WG.Add(1)
|
WG.Add(1)
|
||||||
status, err := Storage.GetTSDBStatusWithFiltersForDate(tfss, date, topN, maxMetrics, deadline)
|
status, err := Storage.GetTSDBStatusWithFiltersForDate(qt, tfss, date, topN, maxMetrics, deadline)
|
||||||
WG.Done()
|
WG.Done()
|
||||||
return status, err
|
return status, err
|
||||||
}
|
}
|
||||||
|
@ -312,13 +312,17 @@ func (db *indexDB) decRef() {
|
|||||||
logger.Infof("indexDB %q has been dropped", tbPath)
|
logger.Infof("indexDB %q has been dropped", tbPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *indexDB) getFromTagFiltersCache(key []byte) ([]TSID, bool) {
|
func (db *indexDB) getFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]TSID, bool) {
|
||||||
|
qt = qt.NewChild("search for tsids in tag filters cache")
|
||||||
|
defer qt.Done()
|
||||||
compressedBuf := tagBufPool.Get()
|
compressedBuf := tagBufPool.Get()
|
||||||
defer tagBufPool.Put(compressedBuf)
|
defer tagBufPool.Put(compressedBuf)
|
||||||
compressedBuf.B = db.tagFiltersCache.GetBig(compressedBuf.B[:0], key)
|
compressedBuf.B = db.tagFiltersCache.GetBig(compressedBuf.B[:0], key)
|
||||||
if len(compressedBuf.B) == 0 {
|
if len(compressedBuf.B) == 0 {
|
||||||
|
qt.Printf("cache miss")
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
qt.Printf("found tsids with compressed size: %d bytes", len(compressedBuf.B))
|
||||||
buf := tagBufPool.Get()
|
buf := tagBufPool.Get()
|
||||||
defer tagBufPool.Put(buf)
|
defer tagBufPool.Put(buf)
|
||||||
var err error
|
var err error
|
||||||
@ -326,22 +330,29 @@ func (db *indexDB) getFromTagFiltersCache(key []byte) ([]TSID, bool) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("FATAL: cannot decompress tsids from tagFiltersCache: %s", err)
|
logger.Panicf("FATAL: cannot decompress tsids from tagFiltersCache: %s", err)
|
||||||
}
|
}
|
||||||
|
qt.Printf("decompressed tsids to %d bytes", len(buf.B))
|
||||||
tsids, err := unmarshalTSIDs(nil, buf.B)
|
tsids, err := unmarshalTSIDs(nil, buf.B)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err)
|
logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err)
|
||||||
}
|
}
|
||||||
|
qt.Printf("unmarshaled %d tsids", len(tsids))
|
||||||
return tsids, true
|
return tsids, true
|
||||||
}
|
}
|
||||||
|
|
||||||
var tagBufPool bytesutil.ByteBufferPool
|
var tagBufPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
func (db *indexDB) putToTagFiltersCache(tsids []TSID, key []byte) {
|
func (db *indexDB) putToTagFiltersCache(qt *querytracer.Tracer, tsids []TSID, key []byte) {
|
||||||
|
qt = qt.NewChild("put %d tsids in cache", len(tsids))
|
||||||
|
defer qt.Done()
|
||||||
buf := tagBufPool.Get()
|
buf := tagBufPool.Get()
|
||||||
buf.B = marshalTSIDs(buf.B[:0], tsids)
|
buf.B = marshalTSIDs(buf.B[:0], tsids)
|
||||||
|
qt.Printf("marshaled %d tsids into %d bytes", len(tsids), len(buf.B))
|
||||||
compressedBuf := tagBufPool.Get()
|
compressedBuf := tagBufPool.Get()
|
||||||
compressedBuf.B = encoding.CompressZSTDLevel(compressedBuf.B[:0], buf.B, 1)
|
compressedBuf.B = encoding.CompressZSTDLevel(compressedBuf.B[:0], buf.B, 1)
|
||||||
|
qt.Printf("compressed %d tsids into %d bytes", len(tsids), len(compressedBuf.B))
|
||||||
tagBufPool.Put(buf)
|
tagBufPool.Put(buf)
|
||||||
db.tagFiltersCache.SetBig(key, compressedBuf.B)
|
db.tagFiltersCache.SetBig(key, compressedBuf.B)
|
||||||
|
qt.Printf("store %d compressed tsids into cache", len(tsids))
|
||||||
tagBufPool.Put(compressedBuf)
|
tagBufPool.Put(compressedBuf)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1333,9 +1344,11 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date.
|
// GetTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date.
|
||||||
func (db *indexDB) GetTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) {
|
func (db *indexDB) GetTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) {
|
||||||
|
qtChild := qt.NewChild("collect tsdb stats in the current indexdb")
|
||||||
is := db.getIndexSearch(deadline)
|
is := db.getIndexSearch(deadline)
|
||||||
status, err := is.getTSDBStatusWithFiltersForDate(tfss, date, topN, maxMetrics)
|
status, err := is.getTSDBStatusWithFiltersForDate(qtChild, tfss, date, topN, maxMetrics)
|
||||||
|
qtChild.Done()
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -1344,8 +1357,10 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint
|
|||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
ok := db.doExtDB(func(extDB *indexDB) {
|
ok := db.doExtDB(func(extDB *indexDB) {
|
||||||
|
qtChild := qt.NewChild("collect tsdb stats in the previous indexdb")
|
||||||
is := extDB.getIndexSearch(deadline)
|
is := extDB.getIndexSearch(deadline)
|
||||||
status, err = is.getTSDBStatusWithFiltersForDate(tfss, date, topN, maxMetrics)
|
status, err = is.getTSDBStatusWithFiltersForDate(qtChild, tfss, date, topN, maxMetrics)
|
||||||
|
qtChild.Done()
|
||||||
extDB.putIndexSearch(is)
|
extDB.putIndexSearch(is)
|
||||||
})
|
})
|
||||||
if ok && err != nil {
|
if ok && err != nil {
|
||||||
@ -1355,14 +1370,14 @@ func (db *indexDB) GetTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date.
|
// getTSDBStatusWithFiltersForDate returns topN entries for tsdb status for the given tfss and the given date.
|
||||||
func (is *indexSearch) getTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN, maxMetrics int) (*TSDBStatus, error) {
|
func (is *indexSearch) getTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, topN, maxMetrics int) (*TSDBStatus, error) {
|
||||||
var filter *uint64set.Set
|
var filter *uint64set.Set
|
||||||
if len(tfss) > 0 {
|
if len(tfss) > 0 {
|
||||||
tr := TimeRange{
|
tr := TimeRange{
|
||||||
MinTimestamp: int64(date) * msecPerDay,
|
MinTimestamp: int64(date) * msecPerDay,
|
||||||
MaxTimestamp: int64(date+1)*msecPerDay - 1,
|
MaxTimestamp: int64(date+1)*msecPerDay - 1,
|
||||||
}
|
}
|
||||||
metricIDs, err := is.searchMetricIDsInternal(tfss, tr, maxMetrics)
|
metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -1748,43 +1763,49 @@ func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
|
|||||||
tfss = convertToCompositeTagFilterss(tfss)
|
tfss = convertToCompositeTagFilterss(tfss)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qtChild := qt.NewChild("search for tsids in the current indexdb")
|
||||||
|
|
||||||
tfKeyBuf := tagFiltersKeyBufPool.Get()
|
tfKeyBuf := tagFiltersKeyBufPool.Get()
|
||||||
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
|
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
|
||||||
|
|
||||||
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true)
|
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true)
|
||||||
tsids, ok := db.getFromTagFiltersCache(tfKeyBuf.B)
|
tsids, ok := db.getFromTagFiltersCache(qtChild, tfKeyBuf.B)
|
||||||
if ok {
|
if ok {
|
||||||
// Fast path - tsids found in the cache
|
// Fast path - tsids found in the cache
|
||||||
qt.Printf("found %d matching series ids in the cache; they occupy %d bytes of memory", len(tsids), memorySizeForTSIDs(tsids))
|
qtChild.Done()
|
||||||
return tsids, nil
|
return tsids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path - search for tsids in the db and extDB.
|
// Slow path - search for tsids in the db and extDB.
|
||||||
is := db.getIndexSearch(deadline)
|
is := db.getIndexSearch(deadline)
|
||||||
localTSIDs, err := is.searchTSIDs(qt, tfss, tr, maxMetrics)
|
localTSIDs, err := is.searchTSIDs(qtChild, tfss, tr, maxMetrics)
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
qtChild.Done()
|
||||||
|
|
||||||
var extTSIDs []TSID
|
var extTSIDs []TSID
|
||||||
if db.doExtDB(func(extDB *indexDB) {
|
if db.doExtDB(func(extDB *indexDB) {
|
||||||
|
qtChild := qt.NewChild("search for tsids in the previous indexdb")
|
||||||
|
defer qtChild.Done()
|
||||||
|
|
||||||
tfKeyExtBuf := tagFiltersKeyBufPool.Get()
|
tfKeyExtBuf := tagFiltersKeyBufPool.Get()
|
||||||
defer tagFiltersKeyBufPool.Put(tfKeyExtBuf)
|
defer tagFiltersKeyBufPool.Put(tfKeyExtBuf)
|
||||||
|
|
||||||
// Data in extDB cannot be changed, so use unversioned keys for tag cache.
|
// Data in extDB cannot be changed, so use unversioned keys for tag cache.
|
||||||
tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, tr, false)
|
tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, tr, false)
|
||||||
tsids, ok := extDB.getFromTagFiltersCache(tfKeyExtBuf.B)
|
tsids, ok := extDB.getFromTagFiltersCache(qtChild, tfKeyExtBuf.B)
|
||||||
if ok {
|
if ok {
|
||||||
extTSIDs = tsids
|
extTSIDs = tsids
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
is := extDB.getIndexSearch(deadline)
|
is := extDB.getIndexSearch(deadline)
|
||||||
extTSIDs, err = is.searchTSIDs(qt, tfss, tr, maxMetrics)
|
extTSIDs, err = is.searchTSIDs(qtChild, tfss, tr, maxMetrics)
|
||||||
extDB.putIndexSearch(is)
|
extDB.putIndexSearch(is)
|
||||||
|
|
||||||
sort.Slice(extTSIDs, func(i, j int) bool { return extTSIDs[i].Less(&extTSIDs[j]) })
|
sort.Slice(extTSIDs, func(i, j int) bool { return extTSIDs[i].Less(&extTSIDs[j]) })
|
||||||
extDB.putToTagFiltersCache(extTSIDs, tfKeyExtBuf.B)
|
extDB.putToTagFiltersCache(qtChild, extTSIDs, tfKeyExtBuf.B)
|
||||||
}) {
|
}) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -1793,23 +1814,19 @@ func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
|
|||||||
|
|
||||||
// Merge localTSIDs with extTSIDs.
|
// Merge localTSIDs with extTSIDs.
|
||||||
tsids = mergeTSIDs(localTSIDs, extTSIDs)
|
tsids = mergeTSIDs(localTSIDs, extTSIDs)
|
||||||
|
qt.Printf("merge %d tsids from the current indexdb with %d tsids from the previous indexdb; result: %d tsids", len(localTSIDs), len(extTSIDs), len(tsids))
|
||||||
|
|
||||||
// Sort the found tsids, since they must be passed to TSID search
|
// Sort the found tsids, since they must be passed to TSID search
|
||||||
// in the sorted order.
|
// in the sorted order.
|
||||||
sort.Slice(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) })
|
sort.Slice(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) })
|
||||||
qt.Printf("sort the found %d series ids", len(tsids))
|
qt.Printf("sort %d tsids", len(tsids))
|
||||||
|
|
||||||
// Store TSIDs in the cache.
|
// Store TSIDs in the cache.
|
||||||
db.putToTagFiltersCache(tsids, tfKeyBuf.B)
|
db.putToTagFiltersCache(qt, tsids, tfKeyBuf.B)
|
||||||
qt.Printf("store the found %d series ids in cache; they occupy %d bytes of memory", len(tsids), memorySizeForTSIDs(tsids))
|
|
||||||
|
|
||||||
return tsids, err
|
return tsids, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func memorySizeForTSIDs(tsids []TSID) int {
|
|
||||||
return len(tsids) * int(unsafe.Sizeof(TSID{}))
|
|
||||||
}
|
|
||||||
|
|
||||||
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
|
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
|
func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
|
||||||
@ -1988,7 +2005,7 @@ func (is *indexSearch) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, t
|
|||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
tsids = tsids[:i]
|
tsids = tsids[:i]
|
||||||
qt.Printf("load %d series ids from %d metric ids", len(tsids), len(metricIDs))
|
qt.Printf("load %d tsids from %d metric ids", len(tsids), len(metricIDs))
|
||||||
|
|
||||||
// Do not sort the found tsids, since they will be sorted later.
|
// Do not sort the found tsids, since they will be sorted later.
|
||||||
return tsids, nil
|
return tsids, nil
|
||||||
@ -2020,9 +2037,13 @@ func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
|
|||||||
|
|
||||||
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
|
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
|
||||||
// and adds matching metrics to metricIDs.
|
// and adds matching metrics to metricIDs.
|
||||||
func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs *uint64set.Set, tfs []*tagFilter) error {
|
func (is *indexSearch) updateMetricIDsByMetricNameMatch(qt *querytracer.Tracer, metricIDs, srcMetricIDs *uint64set.Set, tfs []*tagFilter) error {
|
||||||
|
qt = qt.NewChild("filter out %d metric ids with filters=%s", srcMetricIDs.Len(), tfs)
|
||||||
|
defer qt.Done()
|
||||||
|
|
||||||
// sort srcMetricIDs in order to speed up Seek below.
|
// sort srcMetricIDs in order to speed up Seek below.
|
||||||
sortedMetricIDs := srcMetricIDs.AppendTo(nil)
|
sortedMetricIDs := srcMetricIDs.AppendTo(nil)
|
||||||
|
qt.Printf("sort %d metric ids", len(sortedMetricIDs))
|
||||||
|
|
||||||
kb := &is.kb
|
kb := &is.kb
|
||||||
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
|
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
|
||||||
@ -2062,6 +2083,7 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
|
|||||||
}
|
}
|
||||||
metricIDs.Add(metricID)
|
metricIDs.Add(metricID)
|
||||||
}
|
}
|
||||||
|
qt.Printf("apply filters %s; resulting metric ids: %d", tfs, metricIDs.Len())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2222,11 +2244,10 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
|
func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
|
||||||
metricIDs, err := is.searchMetricIDsInternal(tfss, tr, maxMetrics)
|
metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
qt.Printf("found %d matching metric ids", metricIDs.Len())
|
|
||||||
if metricIDs.Len() == 0 {
|
if metricIDs.Len() == 0 {
|
||||||
// Nothing found
|
// Nothing found
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@ -2244,14 +2265,16 @@ func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilter
|
|||||||
metricIDsFiltered = append(metricIDsFiltered, metricID)
|
metricIDsFiltered = append(metricIDsFiltered, metricID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
qt.Printf("%d metric ids after removing deleted metric ids", len(metricIDsFiltered))
|
qt.Printf("left %d metric ids after removing deleted metric ids", len(metricIDsFiltered))
|
||||||
sortedMetricIDs = metricIDsFiltered
|
sortedMetricIDs = metricIDsFiltered
|
||||||
}
|
}
|
||||||
|
|
||||||
return sortedMetricIDs, nil
|
return sortedMetricIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) searchMetricIDsInternal(tfss []*TagFilters, tr TimeRange, maxMetrics int) (*uint64set.Set, error) {
|
func (is *indexSearch) searchMetricIDsInternal(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) (*uint64set.Set, error) {
|
||||||
|
qt = qt.NewChild("search for metric ids: filters=%s, timeRange=%s, maxMetrics=%d", tfss, &tr, maxMetrics)
|
||||||
|
defer qt.Done()
|
||||||
metricIDs := &uint64set.Set{}
|
metricIDs := &uint64set.Set{}
|
||||||
for _, tfs := range tfss {
|
for _, tfs := range tfss {
|
||||||
if len(tfs.tfs) == 0 {
|
if len(tfs.tfs) == 0 {
|
||||||
@ -2261,7 +2284,11 @@ func (is *indexSearch) searchMetricIDsInternal(tfss []*TagFilters, tr TimeRange,
|
|||||||
logger.Panicf(`BUG: cannot add {__name__!=""} filter: %s`, err)
|
logger.Panicf(`BUG: cannot add {__name__!=""} filter: %s`, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil {
|
qtChild := qt.NewChild("update metric ids: filters=%s, timeRange=%s", tfs, &tr)
|
||||||
|
prevMetricIDsLen := metricIDs.Len()
|
||||||
|
err := is.updateMetricIDsForTagFilters(qtChild, metricIDs, tfs, tr, maxMetrics+1)
|
||||||
|
qtChild.Donef("updated %d metric ids", metricIDs.Len()-prevMetricIDsLen)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if metricIDs.Len() > maxMetrics {
|
if metricIDs.Len() > maxMetrics {
|
||||||
@ -2272,8 +2299,8 @@ func (is *indexSearch) searchMetricIDsInternal(tfss []*TagFilters, tr TimeRange,
|
|||||||
return metricIDs, nil
|
return metricIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
|
func (is *indexSearch) updateMetricIDsForTagFilters(qt *querytracer.Tracer, metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
|
||||||
err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics)
|
err := is.tryUpdatingMetricIDsForDateRange(qt, metricIDs, tfs, tr, maxMetrics)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// Fast path: found metricIDs by date range.
|
// Fast path: found metricIDs by date range.
|
||||||
return nil
|
return nil
|
||||||
@ -2283,8 +2310,9 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Slow path - fall back to search in the global inverted index.
|
// Slow path - fall back to search in the global inverted index.
|
||||||
|
qt.Printf("cannot find metric ids in per-day index; fall back to global index")
|
||||||
atomic.AddUint64(&is.db.globalSearchCalls, 1)
|
atomic.AddUint64(&is.db.globalSearchCalls, 1)
|
||||||
m, err := is.getMetricIDsForDateAndFilters(0, tfs, maxMetrics)
|
m, err := is.getMetricIDsForDateAndFilters(qt, 0, tfs, maxMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errFallbackToGlobalSearch) {
|
if errors.Is(err, errFallbackToGlobalSearch) {
|
||||||
return fmt.Errorf("the number of matching timeseries exceeds %d; either narrow down the search "+
|
return fmt.Errorf("the number of matching timeseries exceeds %d; either narrow down the search "+
|
||||||
@ -2296,7 +2324,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
|
func (is *indexSearch) getMetricIDsForTagFilter(qt *querytracer.Tracer, tf *tagFilter, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
|
||||||
if tf.isNegative {
|
if tf.isNegative {
|
||||||
logger.Panicf("BUG: isNegative must be false")
|
logger.Panicf("BUG: isNegative must be false")
|
||||||
}
|
}
|
||||||
@ -2304,6 +2332,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, m
|
|||||||
if len(tf.orSuffixes) > 0 {
|
if len(tf.orSuffixes) > 0 {
|
||||||
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
|
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
|
||||||
loopsCount, err := is.updateMetricIDsForOrSuffixes(tf, metricIDs, maxMetrics, maxLoopsCount)
|
loopsCount, err := is.updateMetricIDsForOrSuffixes(tf, metricIDs, maxMetrics, maxLoopsCount)
|
||||||
|
qt.Printf("found %d metric ids for filter={%s} using exact search; spent %d loops", metricIDs.Len(), tf, loopsCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
|
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
|
||||||
}
|
}
|
||||||
@ -2312,6 +2341,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, m
|
|||||||
|
|
||||||
// Slow path - scan for all the rows with the given prefix.
|
// Slow path - scan for all the rows with the given prefix.
|
||||||
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, metricIDs.Add, maxLoopsCount)
|
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, metricIDs.Add, maxLoopsCount)
|
||||||
|
qt.Printf("found %d metric ids for filter={%s} using prefix search; spent %d loops", metricIDs.Len(), tf, loopsCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
|
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
|
||||||
}
|
}
|
||||||
@ -2472,7 +2502,7 @@ var errFallbackToGlobalSearch = errors.New("fall back from per-day index search
|
|||||||
|
|
||||||
const maxDaysForPerDaySearch = 40
|
const maxDaysForPerDaySearch = 40
|
||||||
|
|
||||||
func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
|
func (is *indexSearch) tryUpdatingMetricIDsForDateRange(qt *querytracer.Tracer, metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
|
||||||
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
|
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
|
||||||
minDate := uint64(tr.MinTimestamp) / msecPerDay
|
minDate := uint64(tr.MinTimestamp) / msecPerDay
|
||||||
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
|
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
|
||||||
@ -2482,7 +2512,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
|||||||
}
|
}
|
||||||
if minDate == maxDate {
|
if minDate == maxDate {
|
||||||
// Fast path - query only a single date.
|
// Fast path - query only a single date.
|
||||||
m, err := is.getMetricIDsForDateAndFilters(minDate, tfs, maxMetrics)
|
m, err := is.getMetricIDsForDateAndFilters(qt, minDate, tfs, maxMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -2492,15 +2522,21 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Slower path - search for metricIDs for each day in parallel.
|
// Slower path - search for metricIDs for each day in parallel.
|
||||||
|
qt = qt.NewChild("parallel search for metric ids in per-day index: filters=%s, dayRange=[%d..%d]", tfs, minDate, maxDate)
|
||||||
|
defer qt.Done()
|
||||||
wg := getWaitGroup()
|
wg := getWaitGroup()
|
||||||
var errGlobal error
|
var errGlobal error
|
||||||
var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below
|
var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below
|
||||||
for minDate <= maxDate {
|
for minDate <= maxDate {
|
||||||
|
qtChild := qt.NewChild("parallel thread for date=%d", minDate)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(date uint64) {
|
go func(date uint64) {
|
||||||
defer wg.Done()
|
defer func() {
|
||||||
|
qtChild.Done()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
isLocal := is.db.getIndexSearch(is.deadline)
|
isLocal := is.db.getIndexSearch(is.deadline)
|
||||||
m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics)
|
m, err := isLocal.getMetricIDsForDateAndFilters(qtChild, date, tfs, maxMetrics)
|
||||||
is.db.putIndexSearch(isLocal)
|
is.db.putIndexSearch(isLocal)
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
@ -2527,7 +2563,9 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
|
func (is *indexSearch) getMetricIDsForDateAndFilters(qt *querytracer.Tracer, date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
|
||||||
|
qt = qt.NewChild("search for metric ids on a particular day: filters=%s, date=%d, maxMetrics=%d", tfs, date, maxMetrics)
|
||||||
|
defer qt.Done()
|
||||||
// Sort tfs by loopsCount needed for performing each filter.
|
// Sort tfs by loopsCount needed for performing each filter.
|
||||||
// This stats is usually collected from the previous queries.
|
// This stats is usually collected from the previous queries.
|
||||||
// This way we limit the amount of work below by applying fast filters at first.
|
// This way we limit the amount of work below by applying fast filters at first.
|
||||||
@ -2579,7 +2617,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Populate metricIDs for the first non-negative filter with the cost smaller than maxLoopsCount.
|
// Populate metricIDs for the first non-negative filter with the smallest cost.
|
||||||
|
qtChild := qt.NewChild("search for the first non-negative filter with the smallest cost")
|
||||||
var metricIDs *uint64set.Set
|
var metricIDs *uint64set.Set
|
||||||
tfwsRemaining := tfws[:0]
|
tfwsRemaining := tfws[:0]
|
||||||
maxDateMetrics := intMax
|
maxDateMetrics := intMax
|
||||||
@ -2593,10 +2632,11 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
maxLoopsCount := getFirstPositiveLoopsCount(tfws[i+1:])
|
maxLoopsCount := getFirstPositiveLoopsCount(tfws[i+1:])
|
||||||
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics, maxLoopsCount)
|
m, loopsCount, err := is.getMetricIDsForDateTagFilter(qtChild, tf, date, tfs.commonPrefix, maxDateMetrics, maxLoopsCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errTooManyLoops) {
|
if errors.Is(err, errTooManyLoops) {
|
||||||
// The tf took too many loops compared to the next filter. Postpone applying this filter.
|
// The tf took too many loops compared to the next filter. Postpone applying this filter.
|
||||||
|
qtChild.Printf("the filter={%s} took more than %d loops; postpone it", tf, maxLoopsCount)
|
||||||
storeLoopsCount(&tfw, 2*loopsCount)
|
storeLoopsCount(&tfw, 2*loopsCount)
|
||||||
tfwsRemaining = append(tfwsRemaining, tfw)
|
tfwsRemaining = append(tfwsRemaining, tfw)
|
||||||
continue
|
continue
|
||||||
@ -2607,6 +2647,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||||||
}
|
}
|
||||||
if m.Len() >= maxDateMetrics {
|
if m.Len() >= maxDateMetrics {
|
||||||
// Too many time series found by a single tag filter. Move the filter to the end of list.
|
// Too many time series found by a single tag filter. Move the filter to the end of list.
|
||||||
|
qtChild.Printf("the filter={%s} matches at least %d series; postpone it", tf, maxDateMetrics)
|
||||||
storeLoopsCount(&tfw, int64Max-1)
|
storeLoopsCount(&tfw, int64Max-1)
|
||||||
tfwsRemaining = append(tfwsRemaining, tfw)
|
tfwsRemaining = append(tfwsRemaining, tfw)
|
||||||
continue
|
continue
|
||||||
@ -2614,14 +2655,17 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||||||
storeLoopsCount(&tfw, loopsCount)
|
storeLoopsCount(&tfw, loopsCount)
|
||||||
metricIDs = m
|
metricIDs = m
|
||||||
tfwsRemaining = append(tfwsRemaining, tfws[i+1:]...)
|
tfwsRemaining = append(tfwsRemaining, tfws[i+1:]...)
|
||||||
|
qtChild.Printf("the filter={%s} matches less than %d series (actually %d series); use it", tf, maxDateMetrics, metricIDs.Len())
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
qtChild.Done()
|
||||||
tfws = tfwsRemaining
|
tfws = tfwsRemaining
|
||||||
|
|
||||||
if metricIDs == nil {
|
if metricIDs == nil {
|
||||||
// All the filters in tfs are negative or match too many time series.
|
// All the filters in tfs are negative or match too many time series.
|
||||||
// Populate all the metricIDs for the given (date),
|
// Populate all the metricIDs for the given (date),
|
||||||
// so later they can be filtered out with negative filters.
|
// so later they can be filtered out with negative filters.
|
||||||
|
qt.Printf("all the filters are negative or match more than %d time series; fall back to searching for all the metric ids", maxDateMetrics)
|
||||||
m, err := is.getMetricIDsForDate(date, maxDateMetrics)
|
m, err := is.getMetricIDsForDate(date, maxDateMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot obtain all the metricIDs: %w", err)
|
return nil, fmt.Errorf("cannot obtain all the metricIDs: %w", err)
|
||||||
@ -2631,6 +2675,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||||||
return nil, errFallbackToGlobalSearch
|
return nil, errFallbackToGlobalSearch
|
||||||
}
|
}
|
||||||
metricIDs = m
|
metricIDs = m
|
||||||
|
qt.Printf("found %d metric ids", metricIDs.Len())
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(tfws, func(i, j int) bool {
|
sort.Slice(tfws, func(i, j int) bool {
|
||||||
@ -2660,6 +2705,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||||||
// when the intial tag filters significantly reduce the number of found metricIDs,
|
// when the intial tag filters significantly reduce the number of found metricIDs,
|
||||||
// so the remaining filters could be performed via much faster metricName matching instead
|
// so the remaining filters could be performed via much faster metricName matching instead
|
||||||
// of slow selecting of matching metricIDs.
|
// of slow selecting of matching metricIDs.
|
||||||
|
qtChild = qt.NewChild("intersect the remaining %d filters with the found %d metric ids", len(tfws), metricIDs.Len())
|
||||||
var tfsPostponed []*tagFilter
|
var tfsPostponed []*tagFilter
|
||||||
for i, tfw := range tfws {
|
for i, tfw := range tfws {
|
||||||
tf := tfw.tf
|
tf := tfw.tf
|
||||||
@ -2680,10 +2726,11 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||||||
if maxLoopsCount == int64Max {
|
if maxLoopsCount == int64Max {
|
||||||
maxLoopsCount = int64(metricIDsLen) * loopsCountPerMetricNameMatch
|
maxLoopsCount = int64(metricIDsLen) * loopsCountPerMetricNameMatch
|
||||||
}
|
}
|
||||||
m, filterLoopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, intMax, maxLoopsCount)
|
m, filterLoopsCount, err := is.getMetricIDsForDateTagFilter(qtChild, tf, date, tfs.commonPrefix, intMax, maxLoopsCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errTooManyLoops) {
|
if errors.Is(err, errTooManyLoops) {
|
||||||
// Postpone tf, since it took more loops than the next filter may need.
|
// Postpone tf, since it took more loops than the next filter may need.
|
||||||
|
qtChild.Printf("postpone filter={%s}, since it took more than %d loops", tf, maxLoopsCount)
|
||||||
storeFilterLoopsCount(&tfw, 2*filterLoopsCount)
|
storeFilterLoopsCount(&tfw, 2*filterLoopsCount)
|
||||||
tfsPostponed = append(tfsPostponed, tf)
|
tfsPostponed = append(tfsPostponed, tf)
|
||||||
continue
|
continue
|
||||||
@ -2695,22 +2742,28 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||||||
storeFilterLoopsCount(&tfw, filterLoopsCount)
|
storeFilterLoopsCount(&tfw, filterLoopsCount)
|
||||||
if tf.isNegative || tf.isEmptyMatch {
|
if tf.isNegative || tf.isEmptyMatch {
|
||||||
metricIDs.Subtract(m)
|
metricIDs.Subtract(m)
|
||||||
|
qtChild.Printf("subtract %d metric ids from the found %d metric ids for filter={%s}; resulting metric ids: %d", m.Len(), metricIDsLen, tf, metricIDs.Len())
|
||||||
} else {
|
} else {
|
||||||
metricIDs.Intersect(m)
|
metricIDs.Intersect(m)
|
||||||
|
qtChild.Printf("intersect %d metric ids with the found %d metric ids for filter={%s}; resulting metric ids: %d", m.Len(), metricIDsLen, tf, metricIDs.Len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
qtChild.Done()
|
||||||
if metricIDs.Len() == 0 {
|
if metricIDs.Len() == 0 {
|
||||||
// There is no need in applying tfsPostponed, since the result is empty.
|
// There is no need in applying tfsPostponed, since the result is empty.
|
||||||
|
qt.Printf("found zero metric ids")
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
if len(tfsPostponed) > 0 {
|
if len(tfsPostponed) > 0 {
|
||||||
// Apply the postponed filters via metricName match.
|
// Apply the postponed filters via metricName match.
|
||||||
|
qt.Printf("apply postponed filters=%s to %d metrics ids", tfsPostponed, metricIDs.Len())
|
||||||
var m uint64set.Set
|
var m uint64set.Set
|
||||||
if err := is.updateMetricIDsByMetricNameMatch(&m, metricIDs, tfsPostponed); err != nil {
|
if err := is.updateMetricIDsByMetricNameMatch(qt, &m, metricIDs, tfsPostponed); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &m, nil
|
return &m, nil
|
||||||
}
|
}
|
||||||
|
qt.Printf("found %d metric ids", metricIDs.Len())
|
||||||
return metricIDs, nil
|
return metricIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2819,6 +2872,27 @@ func marshalCompositeTagKey(dst, name, key []byte) []byte {
|
|||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func unmarshalCompositeTagKey(src []byte) ([]byte, []byte, error) {
|
||||||
|
if len(src) == 0 {
|
||||||
|
return nil, nil, fmt.Errorf("composite tag key cannot be empty")
|
||||||
|
}
|
||||||
|
if src[0] != compositeTagKeyPrefix {
|
||||||
|
return nil, nil, fmt.Errorf("missing composite tag key prefix in %q", src)
|
||||||
|
}
|
||||||
|
src = src[1:]
|
||||||
|
tail, n, err := encoding.UnmarshalVarUint64(src)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("cannot unmarshal metric name length from composite tag key: %w", err)
|
||||||
|
}
|
||||||
|
src = tail
|
||||||
|
if uint64(len(src)) < n {
|
||||||
|
return nil, nil, fmt.Errorf("missing metric name with length %d in composite tag key %q", n, src)
|
||||||
|
}
|
||||||
|
name := src[:n]
|
||||||
|
key := src[n:]
|
||||||
|
return name, key, nil
|
||||||
|
}
|
||||||
|
|
||||||
func reverseBytes(dst, src []byte) []byte {
|
func reverseBytes(dst, src []byte) []byte {
|
||||||
for i := len(src) - 1; i >= 0; i-- {
|
for i := len(src) - 1; i >= 0; i-- {
|
||||||
dst = append(dst, src[i])
|
dst = append(dst, src[i])
|
||||||
@ -2844,7 +2918,10 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
|
func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *tagFilter, date uint64, commonPrefix []byte,
|
||||||
|
maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
|
||||||
|
qt = qt.NewChild("get metric ids for filter and date: filter={%s}, date=%d, maxMetrics=%d, maxLoopsCount=%d", tf, date, maxMetrics, maxLoopsCount)
|
||||||
|
defer qt.Done()
|
||||||
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
|
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
|
||||||
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
|
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
|
||||||
}
|
}
|
||||||
@ -2863,7 +2940,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
|
|||||||
tfNew := *tf
|
tfNew := *tf
|
||||||
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
|
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
|
||||||
tfNew.prefix = kb.B
|
tfNew.prefix = kb.B
|
||||||
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics, maxLoopsCount)
|
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(qt, &tfNew, maxMetrics, maxLoopsCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, loopsCount, err
|
return nil, loopsCount, err
|
||||||
}
|
}
|
||||||
@ -2875,16 +2952,19 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
|
|||||||
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601
|
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601
|
||||||
// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/395
|
// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/395
|
||||||
maxLoopsCount -= loopsCount
|
maxLoopsCount -= loopsCount
|
||||||
tfNew = tagFilter{}
|
var tfGross tagFilter
|
||||||
if err := tfNew.Init(prefix, tf.key, []byte(".+"), false, true); err != nil {
|
if err := tfGross.Init(prefix, tf.key, []byte(".+"), false, true); err != nil {
|
||||||
logger.Panicf(`BUG: cannot init tag filter: {%q=~".+"}: %s`, tf.key, err)
|
logger.Panicf(`BUG: cannot init tag filter: {%q=~".+"}: %s`, tf.key, err)
|
||||||
}
|
}
|
||||||
m, lc, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics, maxLoopsCount)
|
m, lc, err := is.getMetricIDsForTagFilter(qt, &tfGross, maxMetrics, maxLoopsCount)
|
||||||
loopsCount += lc
|
loopsCount += lc
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, loopsCount, err
|
return nil, loopsCount, err
|
||||||
}
|
}
|
||||||
|
mLen := m.Len()
|
||||||
m.Subtract(metricIDs)
|
m.Subtract(metricIDs)
|
||||||
|
qt.Printf("subtract %d metric ids for filter={%s} from %d metric ids for filter={%s}", metricIDs.Len(), &tfNew, mLen, &tfGross)
|
||||||
|
qt.Printf("found %d metric ids, spent %d loops", m.Len(), loopsCount)
|
||||||
return m, loopsCount, nil
|
return m, loopsCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1770,7 +1770,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check GetTSDBStatusWithFiltersForDate with nil filters.
|
// Check GetTSDBStatusWithFiltersForDate with nil filters.
|
||||||
status, err := db.GetTSDBStatusWithFiltersForDate(nil, baseDate, 5, 1e6, noDeadline)
|
status, err := db.GetTSDBStatusWithFiltersForDate(nil, nil, baseDate, 5, 1e6, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error in GetTSDBStatusWithFiltersForDate with nil filters: %s", err)
|
t.Fatalf("error in GetTSDBStatusWithFiltersForDate with nil filters: %s", err)
|
||||||
}
|
}
|
||||||
@ -1846,7 +1846,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||||||
if err := tfs.Add([]byte("day"), []byte("0"), false, false); err != nil {
|
if err := tfs.Add([]byte("day"), []byte("0"), false, false); err != nil {
|
||||||
t.Fatalf("cannot add filter: %s", err)
|
t.Fatalf("cannot add filter: %s", err)
|
||||||
}
|
}
|
||||||
status, err = db.GetTSDBStatusWithFiltersForDate([]*TagFilters{tfs}, baseDate, 5, 1e6, noDeadline)
|
status, err = db.GetTSDBStatusWithFiltersForDate(nil, []*TagFilters{tfs}, baseDate, 5, 1e6, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err)
|
t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err)
|
||||||
}
|
}
|
||||||
@ -1875,7 +1875,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||||||
if err := tfs.Add([]byte("uniqueid"), []byte("0|1|3"), false, true); err != nil {
|
if err := tfs.Add([]byte("uniqueid"), []byte("0|1|3"), false, true); err != nil {
|
||||||
t.Fatalf("cannot add filter: %s", err)
|
t.Fatalf("cannot add filter: %s", err)
|
||||||
}
|
}
|
||||||
status, err = db.GetTSDBStatusWithFiltersForDate([]*TagFilters{tfs}, baseDate, 5, 1e6, noDeadline)
|
status, err = db.GetTSDBStatusWithFiltersForDate(nil, []*TagFilters{tfs}, baseDate, 5, 1e6, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err)
|
t.Fatalf("error in GetTSDBStatusWithFiltersForDate: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -1061,7 +1061,7 @@ func nextRetentionDuration(retentionMsecs int64) time.Duration {
|
|||||||
|
|
||||||
// SearchMetricNames returns metric names matching the given tfss on the given tr.
|
// SearchMetricNames returns metric names matching the given tfss on the given tr.
|
||||||
func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]MetricName, error) {
|
func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]MetricName, error) {
|
||||||
qt = qt.NewChild("search for matching metric names")
|
qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr)
|
||||||
defer qt.Done()
|
defer qt.Done()
|
||||||
tsids, err := s.searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
|
tsids, err := s.searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1104,7 +1104,7 @@ func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
|
|||||||
|
|
||||||
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
|
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
|
||||||
func (s *Storage) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
|
func (s *Storage) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
|
||||||
qt = qt.NewChild("search for matching series ids")
|
qt = qt.NewChild("search for matching tsids: filters=%s, timeRange=%s", tfss, &tr)
|
||||||
defer qt.Done()
|
defer qt.Done()
|
||||||
// Do not cache tfss -> tsids here, since the caching is performed
|
// Do not cache tfss -> tsids here, since the caching is performed
|
||||||
// on idb level.
|
// on idb level.
|
||||||
@ -1154,7 +1154,7 @@ var (
|
|||||||
//
|
//
|
||||||
// This should speed-up further searchMetricNameWithCache calls for metricIDs from tsids.
|
// This should speed-up further searchMetricNameWithCache calls for metricIDs from tsids.
|
||||||
func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, tsids []TSID, deadline uint64) error {
|
func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, tsids []TSID, deadline uint64) error {
|
||||||
qt = qt.NewChild("prefetch metric names for %d series ids", len(tsids))
|
qt = qt.NewChild("prefetch metric names for %d tsids", len(tsids))
|
||||||
defer qt.Done()
|
defer qt.Done()
|
||||||
if len(tsids) == 0 {
|
if len(tsids) == 0 {
|
||||||
qt.Printf("nothing to prefetch")
|
qt.Printf("nothing to prefetch")
|
||||||
@ -1510,8 +1510,8 @@ func (s *Storage) GetSeriesCount(deadline uint64) (uint64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetTSDBStatusWithFiltersForDate returns TSDB status data for /api/v1/status/tsdb with match[] filters.
|
// GetTSDBStatusWithFiltersForDate returns TSDB status data for /api/v1/status/tsdb with match[] filters.
|
||||||
func (s *Storage) GetTSDBStatusWithFiltersForDate(tfss []*TagFilters, date uint64, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) {
|
func (s *Storage) GetTSDBStatusWithFiltersForDate(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) {
|
||||||
return s.idb().GetTSDBStatusWithFiltersForDate(tfss, date, topN, maxMetrics, deadline)
|
return s.idb().GetTSDBStatusWithFiltersForDate(qt, tfss, date, topN, maxMetrics, deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetricRow is a metric to insert into storage.
|
// MetricRow is a metric to insert into storage.
|
||||||
|
@ -211,16 +211,11 @@ func (tfs *TagFilters) addTagFilter() *tagFilter {
|
|||||||
|
|
||||||
// String returns human-readable value for tfs.
|
// String returns human-readable value for tfs.
|
||||||
func (tfs *TagFilters) String() string {
|
func (tfs *TagFilters) String() string {
|
||||||
if len(tfs.tfs) == 0 {
|
a := make([]string, 0, len(tfs.tfs))
|
||||||
return "{}"
|
for _, tf := range tfs.tfs {
|
||||||
|
a = append(a, tf.String())
|
||||||
}
|
}
|
||||||
var bb bytes.Buffer
|
return fmt.Sprintf("{%s}", strings.Join(a, ","))
|
||||||
fmt.Fprintf(&bb, "{%s", tfs.tfs[0].String())
|
|
||||||
for i := range tfs.tfs[1:] {
|
|
||||||
fmt.Fprintf(&bb, ", %s", tfs.tfs[i+1].String())
|
|
||||||
}
|
|
||||||
fmt.Fprintf(&bb, "}")
|
|
||||||
return bb.String()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset resets the tf
|
// Reset resets the tf
|
||||||
@ -305,6 +300,16 @@ func (tf *tagFilter) String() string {
|
|||||||
} else if tf.isRegexp {
|
} else if tf.isRegexp {
|
||||||
op = "=~"
|
op = "=~"
|
||||||
}
|
}
|
||||||
|
if bytes.Equal(tf.key, graphiteReverseTagKey) {
|
||||||
|
return fmt.Sprintf("__graphite_reverse__%s%q", op, tf.value)
|
||||||
|
}
|
||||||
|
if tf.isComposite() {
|
||||||
|
metricName, key, err := unmarshalCompositeTagKey(tf.key)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal composite tag key: %s", err)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("composite(%s,%s)%s%q", metricName, key, op, tf.value)
|
||||||
|
}
|
||||||
key := tf.key
|
key := tf.key
|
||||||
if len(key) == 0 {
|
if len(key) == 0 {
|
||||||
key = []byte("__name__")
|
key = []byte("__name__")
|
||||||
|
@ -1263,7 +1263,7 @@ func TestTagFiltersString(t *testing.T) {
|
|||||||
mustAdd("tag_n", "n_value", true, false)
|
mustAdd("tag_n", "n_value", true, false)
|
||||||
mustAdd("tag_re_graphite", "foo\\.bar", false, true)
|
mustAdd("tag_re_graphite", "foo\\.bar", false, true)
|
||||||
s := tfs.String()
|
s := tfs.String()
|
||||||
sExpected := `{__name__="metric_name", tag_re=~"re.value", tag_nre!~"nre.value", tag_n!="n_value", tag_re_graphite="foo.bar"}`
|
sExpected := `{__name__="metric_name",tag_re=~"re.value",tag_nre!~"nre.value",tag_n!="n_value",tag_re_graphite="foo.bar"}`
|
||||||
if s != sExpected {
|
if s != sExpected {
|
||||||
t.Fatalf("unexpected TagFilters.String(); got %q; want %q", s, sExpected)
|
t.Fatalf("unexpected TagFilters.String(); got %q; want %q", s, sExpected)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user