mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
lib/storage: return marshaled metric names from SearchMetricNames
Previously SearchMetricNames was returning unmarshaled metric names. This wasn't great for vmstorage, which should spend additional CPU time for marshaling the metric names before sending them to vmselect. While at it, remove possible duplicate metric names, which could occur when multiple samples for new time series are ingested via concurrent requests. Also sort the metric names before returning them to the client. This simplifies debugging of the returned metric names across repeated requests to /api/v1/series
This commit is contained in:
parent
69bbdf7304
commit
7d5d33fd71
@ -193,7 +193,7 @@ func TagsAutoCompleteValuesHandler(startTime time.Time, at *auth.Token, w http.R
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
mns, isPartialResponse, err := netstorage.SearchMetricNames(nil, at, denyPartialResponse, sq, deadline)
|
metricNames, isPartialResponse, err := netstorage.SearchMetricNames(nil, at, denyPartialResponse, sq, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
|
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
|
||||||
}
|
}
|
||||||
@ -202,7 +202,11 @@ func TagsAutoCompleteValuesHandler(startTime time.Time, at *auth.Token, w http.R
|
|||||||
if tag == "name" {
|
if tag == "name" {
|
||||||
tag = "__name__"
|
tag = "__name__"
|
||||||
}
|
}
|
||||||
for _, mn := range mns {
|
var mn storage.MetricName
|
||||||
|
for _, metricName := range metricNames {
|
||||||
|
if err := mn.UnmarshalString(metricName); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
|
||||||
|
}
|
||||||
tagValue := mn.GetTagValue(tag)
|
tagValue := mn.GetTagValue(tag)
|
||||||
if len(tagValue) == 0 {
|
if len(tagValue) == 0 {
|
||||||
continue
|
continue
|
||||||
@ -279,13 +283,17 @@ func TagsAutoCompleteTagsHandler(startTime time.Time, at *auth.Token, w http.Res
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
mns, isPartialResponse, err := netstorage.SearchMetricNames(nil, at, denyPartialResponse, sq, deadline)
|
metricNames, isPartialResponse, err := netstorage.SearchMetricNames(nil, at, denyPartialResponse, sq, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
|
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
|
||||||
}
|
}
|
||||||
isPartial = isPartialResponse
|
isPartial = isPartialResponse
|
||||||
m := make(map[string]struct{})
|
m := make(map[string]struct{})
|
||||||
for _, mn := range mns {
|
var mn storage.MetricName
|
||||||
|
for _, metricName := range metricNames {
|
||||||
|
if err := mn.UnmarshalString(metricName); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
|
||||||
|
}
|
||||||
m["name"] = struct{}{}
|
m["name"] = struct{}{}
|
||||||
for _, tag := range mn.Tags {
|
for _, tag := range mn.Tags {
|
||||||
m[string(tag.Key)] = struct{}{}
|
m[string(tag.Key)] = struct{}{}
|
||||||
@ -345,11 +353,14 @@ func TagsFindSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseW
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
|
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
|
||||||
mns, isPartial, err := netstorage.SearchMetricNames(nil, at, denyPartialResponse, sq, deadline)
|
metricNames, isPartial, err := netstorage.SearchMetricNames(nil, at, denyPartialResponse, sq, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
|
return fmt.Errorf("cannot fetch metric names for %q: %w", sq, err)
|
||||||
}
|
}
|
||||||
paths := getCanonicalPaths(mns)
|
paths, err := getCanonicalPaths(metricNames)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot obtain canonical paths: %w", err)
|
||||||
|
}
|
||||||
if limit > 0 && limit < len(paths) {
|
if limit > 0 && limit < len(paths) {
|
||||||
paths = paths[:limit]
|
paths = paths[:limit]
|
||||||
}
|
}
|
||||||
@ -365,14 +376,18 @@ func TagsFindSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseW
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCanonicalPaths(mns []storage.MetricName) []string {
|
func getCanonicalPaths(metricNames []string) ([]string, error) {
|
||||||
paths := make([]string, 0, len(mns))
|
paths := make([]string, 0, len(metricNames))
|
||||||
for _, mn := range mns {
|
var mn storage.MetricName
|
||||||
|
for _, metricName := range metricNames {
|
||||||
|
if err := mn.UnmarshalString(metricName); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
|
||||||
|
}
|
||||||
path := getCanonicalPath(&mn)
|
path := getCanonicalPath(&mn)
|
||||||
paths = append(paths, path)
|
paths = append(paths, path)
|
||||||
}
|
}
|
||||||
sort.Strings(paths)
|
sort.Strings(paths)
|
||||||
return paths
|
return paths, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCanonicalPath(mn *storage.MetricName) string {
|
func getCanonicalPath(mn *storage.MetricName) string {
|
||||||
|
@ -1176,7 +1176,9 @@ func ExportBlocks(qt *querytracer.Tracer, at *auth.Token, sq *storage.SearchQuer
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SearchMetricNames returns all the metric names matching sq until the given deadline.
|
// SearchMetricNames returns all the metric names matching sq until the given deadline.
|
||||||
func SearchMetricNames(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) ([]storage.MetricName, bool, error) {
|
//
|
||||||
|
// The returned metric names must be unmarshaled via storage.MetricName.UnmarshalString().
|
||||||
|
func SearchMetricNames(qt *querytracer.Tracer, at *auth.Token, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutils.Deadline) ([]string, bool, error) {
|
||||||
qt = qt.NewChild("fetch metric names: %s", sq)
|
qt = qt.NewChild("fetch metric names: %s", sq)
|
||||||
defer qt.Done()
|
defer qt.Done()
|
||||||
if deadline.Exceeded() {
|
if deadline.Exceeded() {
|
||||||
@ -1186,7 +1188,7 @@ func SearchMetricNames(qt *querytracer.Tracer, at *auth.Token, denyPartialRespon
|
|||||||
|
|
||||||
// Send the query to all the storage nodes in parallel.
|
// Send the query to all the storage nodes in parallel.
|
||||||
type nodeResult struct {
|
type nodeResult struct {
|
||||||
metricNames [][]byte
|
metricNames []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
||||||
@ -1203,14 +1205,14 @@ func SearchMetricNames(qt *querytracer.Tracer, at *auth.Token, denyPartialRespon
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Collect results.
|
// Collect results.
|
||||||
metricNames := make(map[string]struct{})
|
metricNamesMap := make(map[string]struct{})
|
||||||
isPartial, err := snr.collectResults(partialSearchMetricNamesResults, func(result interface{}) error {
|
isPartial, err := snr.collectResults(partialSearchMetricNamesResults, func(result interface{}) error {
|
||||||
nr := result.(*nodeResult)
|
nr := result.(*nodeResult)
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
return nr.err
|
return nr.err
|
||||||
}
|
}
|
||||||
for _, metricName := range nr.metricNames {
|
for _, metricName := range nr.metricNames {
|
||||||
metricNames[string(metricName)] = struct{}{}
|
metricNamesMap[metricName] = struct{}{}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@ -1218,17 +1220,13 @@ func SearchMetricNames(qt *querytracer.Tracer, at *auth.Token, denyPartialRespon
|
|||||||
return nil, isPartial, fmt.Errorf("cannot fetch metric names from vmstorage nodes: %w", err)
|
return nil, isPartial, fmt.Errorf("cannot fetch metric names from vmstorage nodes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal metricNames
|
metricNames := make([]string, len(metricNamesMap))
|
||||||
mns := make([]storage.MetricName, len(metricNames))
|
for metricName := range metricNamesMap {
|
||||||
i := 0
|
metricNames = append(metricNames, metricName)
|
||||||
for metricName := range metricNames {
|
|
||||||
mn := &mns[i]
|
|
||||||
if err := mn.Unmarshal(bytesutil.ToUnsafeBytes(metricName)); err != nil {
|
|
||||||
return nil, false, fmt.Errorf("cannot unmarshal metric name obtained from vmstorage: %w; metricName=%q", err, metricName)
|
|
||||||
}
|
|
||||||
i++
|
|
||||||
}
|
}
|
||||||
return mns, isPartial, nil
|
sort.Strings(metricNames)
|
||||||
|
qt.Printf("sort %d metric names", len(metricNames))
|
||||||
|
return metricNames, isPartial, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessSearchQuery performs sq until the given deadline.
|
// ProcessSearchQuery performs sq until the given deadline.
|
||||||
@ -1592,8 +1590,8 @@ func (sn *storageNode) getSeriesCount(qt *querytracer.Tracer, accountID, project
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sn *storageNode) processSearchMetricNames(qt *querytracer.Tracer, requestData []byte, deadline searchutils.Deadline) ([][]byte, error) {
|
func (sn *storageNode) processSearchMetricNames(qt *querytracer.Tracer, requestData []byte, deadline searchutils.Deadline) ([]string, error) {
|
||||||
var metricNames [][]byte
|
var metricNames []string
|
||||||
f := func(bc *handshake.BufferedConn) error {
|
f := func(bc *handshake.BufferedConn) error {
|
||||||
mns, err := sn.processSearchMetricNamesOnConn(bc, requestData)
|
mns, err := sn.processSearchMetricNamesOnConn(bc, requestData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -2071,7 +2069,7 @@ const maxMetricBlockSize = 1024 * 1024
|
|||||||
// from vmstorage.
|
// from vmstorage.
|
||||||
const maxErrorMessageSize = 64 * 1024
|
const maxErrorMessageSize = 64 * 1024
|
||||||
|
|
||||||
func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn, requestData []byte) ([][]byte, error) {
|
func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn, requestData []byte) ([]string, error) {
|
||||||
// Send the requst to sn.
|
// Send the requst to sn.
|
||||||
if err := writeBytes(bc, requestData); err != nil {
|
if err := writeBytes(bc, requestData); err != nil {
|
||||||
return nil, fmt.Errorf("cannot write requestData: %w", err)
|
return nil, fmt.Errorf("cannot write requestData: %w", err)
|
||||||
@ -2094,13 +2092,13 @@ func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot read metricNamesCount: %w", err)
|
return nil, fmt.Errorf("cannot read metricNamesCount: %w", err)
|
||||||
}
|
}
|
||||||
metricNames := make([][]byte, metricNamesCount)
|
metricNames := make([]string, metricNamesCount)
|
||||||
for i := int64(0); i < int64(metricNamesCount); i++ {
|
for i := int64(0); i < int64(metricNamesCount); i++ {
|
||||||
buf, err = readBytes(buf[:0], bc, maxMetricNameSize)
|
buf, err = readBytes(buf[:0], bc, maxMetricNameSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot read metricName #%d: %w", i+1, err)
|
return nil, fmt.Errorf("cannot read metricName #%d: %w", i+1, err)
|
||||||
}
|
}
|
||||||
metricNames[i] = append(metricNames[i][:0], buf...)
|
metricNames[i] = string(buf)
|
||||||
}
|
}
|
||||||
return metricNames, nil
|
return metricNames, nil
|
||||||
}
|
}
|
||||||
|
@ -675,7 +675,7 @@ func SeriesHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token,
|
|||||||
}
|
}
|
||||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxSeriesLimit)
|
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxSeriesLimit)
|
||||||
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
|
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
|
||||||
mns, isPartial, err := netstorage.SearchMetricNames(qt, at, denyPartialResponse, sq, cp.deadline)
|
metricNames, isPartial, err := netstorage.SearchMetricNames(qt, at, denyPartialResponse, sq, cp.deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot fetch time series for %q: %w", sq, err)
|
return fmt.Errorf("cannot fetch time series for %q: %w", sq, err)
|
||||||
}
|
}
|
||||||
@ -685,7 +685,7 @@ func SeriesHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token,
|
|||||||
qtDone := func() {
|
qtDone := func() {
|
||||||
qt.Donef("start=%d, end=%d", cp.start, cp.end)
|
qt.Donef("start=%d, end=%d", cp.start, cp.end)
|
||||||
}
|
}
|
||||||
WriteSeriesResponse(bw, isPartial, mns, qt, qtDone)
|
WriteSeriesResponse(bw, isPartial, metricNames, qt, qtDone)
|
||||||
if err := bw.Flush(); err != nil {
|
if err := bw.Flush(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -6,18 +6,24 @@
|
|||||||
{% stripspace %}
|
{% stripspace %}
|
||||||
SeriesResponse generates response for /api/v1/series.
|
SeriesResponse generates response for /api/v1/series.
|
||||||
See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
|
See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
|
||||||
{% func SeriesResponse(isPartial bool, mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) %}
|
{% func SeriesResponse(isPartial bool, metricNames []string, qt *querytracer.Tracer, qtDone func()) %}
|
||||||
{
|
{
|
||||||
"status":"success",
|
"status":"success",
|
||||||
"isPartial":{% if isPartial %}true{% else %}false{% endif %},
|
"isPartial":{% if isPartial %}true{% else %}false{% endif %},
|
||||||
"data":[
|
"data":[
|
||||||
{% for i := range mns %}
|
{% code var mn storage.MetricName %}
|
||||||
{%= metricNameObject(&mns[i]) %}
|
{% for i, metricName := range metricNames %}
|
||||||
{% if i+1 < len(mns) %},{% endif %}
|
{% code err := mn.UnmarshalString(metricName) %}
|
||||||
|
{% if err != nil %}
|
||||||
|
{%q= err.Error() %}
|
||||||
|
{% else %}
|
||||||
|
{%= metricNameObject(&mn) %}
|
||||||
|
{% endif %}
|
||||||
|
{% if i+1 < len(metricNames) %},{% endif %}
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
]
|
]
|
||||||
{% code
|
{% code
|
||||||
qt.Printf("generate response: series=%d", len(mns))
|
qt.Printf("generate response: series=%d", len(metricNames))
|
||||||
qtDone()
|
qtDone()
|
||||||
%}
|
%}
|
||||||
{%= dumpQueryTrace(qt) %}
|
{%= dumpQueryTrace(qt) %}
|
||||||
|
@ -26,7 +26,7 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:9
|
//line app/vmselect/prometheus/series_response.qtpl:9
|
||||||
func StreamSeriesResponse(qw422016 *qt422016.Writer, isPartial bool, mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) {
|
func StreamSeriesResponse(qw422016 *qt422016.Writer, isPartial bool, metricNames []string, qt *querytracer.Tracer, qtDone func()) {
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:9
|
//line app/vmselect/prometheus/series_response.qtpl:9
|
||||||
qw422016.N().S(`{"status":"success","isPartial":`)
|
qw422016.N().S(`{"status":"success","isPartial":`)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:12
|
//line app/vmselect/prometheus/series_response.qtpl:12
|
||||||
@ -42,52 +42,66 @@ func StreamSeriesResponse(qw422016 *qt422016.Writer, isPartial bool, mns []stora
|
|||||||
//line app/vmselect/prometheus/series_response.qtpl:12
|
//line app/vmselect/prometheus/series_response.qtpl:12
|
||||||
qw422016.N().S(`,"data":[`)
|
qw422016.N().S(`,"data":[`)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:14
|
//line app/vmselect/prometheus/series_response.qtpl:14
|
||||||
for i := range mns {
|
var mn storage.MetricName
|
||||||
|
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:15
|
//line app/vmselect/prometheus/series_response.qtpl:15
|
||||||
streammetricNameObject(qw422016, &mns[i])
|
for i, metricName := range metricNames {
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:16
|
//line app/vmselect/prometheus/series_response.qtpl:16
|
||||||
if i+1 < len(mns) {
|
err := mn.UnmarshalString(metricName)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:16
|
|
||||||
qw422016.N().S(`,`)
|
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:16
|
|
||||||
}
|
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:17
|
//line app/vmselect/prometheus/series_response.qtpl:17
|
||||||
}
|
if err != nil {
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:17
|
//line app/vmselect/prometheus/series_response.qtpl:18
|
||||||
qw422016.N().S(`]`)
|
qw422016.N().Q(err.Error())
|
||||||
|
//line app/vmselect/prometheus/series_response.qtpl:19
|
||||||
|
} else {
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:20
|
//line app/vmselect/prometheus/series_response.qtpl:20
|
||||||
qt.Printf("generate response: series=%d", len(mns))
|
streammetricNameObject(qw422016, &mn)
|
||||||
|
//line app/vmselect/prometheus/series_response.qtpl:21
|
||||||
|
}
|
||||||
|
//line app/vmselect/prometheus/series_response.qtpl:22
|
||||||
|
if i+1 < len(metricNames) {
|
||||||
|
//line app/vmselect/prometheus/series_response.qtpl:22
|
||||||
|
qw422016.N().S(`,`)
|
||||||
|
//line app/vmselect/prometheus/series_response.qtpl:22
|
||||||
|
}
|
||||||
|
//line app/vmselect/prometheus/series_response.qtpl:23
|
||||||
|
}
|
||||||
|
//line app/vmselect/prometheus/series_response.qtpl:23
|
||||||
|
qw422016.N().S(`]`)
|
||||||
|
//line app/vmselect/prometheus/series_response.qtpl:26
|
||||||
|
qt.Printf("generate response: series=%d", len(metricNames))
|
||||||
qtDone()
|
qtDone()
|
||||||
|
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:23
|
//line app/vmselect/prometheus/series_response.qtpl:29
|
||||||
streamdumpQueryTrace(qw422016, qt)
|
streamdumpQueryTrace(qw422016, qt)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:23
|
//line app/vmselect/prometheus/series_response.qtpl:29
|
||||||
qw422016.N().S(`}`)
|
qw422016.N().S(`}`)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
}
|
}
|
||||||
|
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
func WriteSeriesResponse(qq422016 qtio422016.Writer, isPartial bool, mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) {
|
func WriteSeriesResponse(qq422016 qtio422016.Writer, isPartial bool, metricNames []string, qt *querytracer.Tracer, qtDone func()) {
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
StreamSeriesResponse(qw422016, isPartial, mns, qt, qtDone)
|
StreamSeriesResponse(qw422016, isPartial, metricNames, qt, qtDone)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
qt422016.ReleaseWriter(qw422016)
|
qt422016.ReleaseWriter(qw422016)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
}
|
}
|
||||||
|
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
func SeriesResponse(isPartial bool, mns []storage.MetricName, qt *querytracer.Tracer, qtDone func()) string {
|
func SeriesResponse(isPartial bool, metricNames []string, qt *querytracer.Tracer, qtDone func()) string {
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
qb422016 := qt422016.AcquireByteBuffer()
|
qb422016 := qt422016.AcquireByteBuffer()
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
WriteSeriesResponse(qb422016, isPartial, mns, qt, qtDone)
|
WriteSeriesResponse(qb422016, isPartial, metricNames, qt, qtDone)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
qs422016 := string(qb422016.B)
|
qs422016 := string(qb422016.B)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
qt422016.ReleaseByteBuffer(qb422016)
|
qt422016.ReleaseByteBuffer(qb422016)
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
return qs422016
|
return qs422016
|
||||||
//line app/vmselect/prometheus/series_response.qtpl:25
|
//line app/vmselect/prometheus/series_response.qtpl:31
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,7 @@ func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, tfss []*storage.TagF
|
|||||||
return bi, nil
|
return bi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]storage.MetricName, error) {
|
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
|
||||||
return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
|
return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,6 +33,7 @@ scrape_configs:
|
|||||||
```
|
```
|
||||||
|
|
||||||
* FEATURE: [query tracing](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#query-tracing): show timestamps in query traces in human-readable format (aka `RFC3339` in UTC timezone) instead of milliseconds since Unix epoch. For example, `2022-06-27T10:32:54.506Z` instead of `1656325974506`.
|
* FEATURE: [query tracing](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#query-tracing): show timestamps in query traces in human-readable format (aka `RFC3339` in UTC timezone) instead of milliseconds since Unix epoch. For example, `2022-06-27T10:32:54.506Z` instead of `1656325974506`.
|
||||||
|
* FEATURE: improve performance of [/api/v1/series](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers) requests, which return big number of time series.
|
||||||
|
|
||||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): allow using `__name__` label (aka [metric name](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)) in alerting annotations. For example `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}`.
|
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): allow using `__name__` label (aka [metric name](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)) in alerting annotations. For example `{{ $labels.__name__ }}: Too high connection number for "{{ $labels.instance }}`.
|
||||||
* BUGFIX: limit max memory occupied by the cache, which stores parsed regular expressions. Previously too long regular expressions passed in [MetricsQL queries](https://docs.victoriametrics.com/MetricsQL.html) could result in big amounts of used memory (e.g. multiple of gigabytes). Now the max cache size for parsed regexps is limited to a a few megabytes.
|
* BUGFIX: limit max memory occupied by the cache, which stores parsed regular expressions. Previously too long regular expressions passed in [MetricsQL queries](https://docs.victoriametrics.com/MetricsQL.html) could result in big amounts of used memory (e.g. multiple of gigabytes). Now the max cache size for parsed regexps is limited to a a few megabytes.
|
||||||
|
@ -3,6 +3,7 @@ package storage
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -392,6 +393,14 @@ func (mn *MetricName) Marshal(dst []byte) []byte {
|
|||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UnmarshalString unmarshals mn from s
|
||||||
|
func (mn *MetricName) UnmarshalString(s string) error {
|
||||||
|
b := bytesutil.ToUnsafeBytes(s)
|
||||||
|
err := mn.Unmarshal(b)
|
||||||
|
runtime.KeepAlive(s)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Unmarshal unmarshals mn from src.
|
// Unmarshal unmarshals mn from src.
|
||||||
func (mn *MetricName) Unmarshal(src []byte) error {
|
func (mn *MetricName) Unmarshal(src []byte) error {
|
||||||
if len(src) < 8 {
|
if len(src) < 8 {
|
||||||
|
@ -1144,8 +1144,10 @@ func nextRetentionDuration(retentionMsecs int64) time.Duration {
|
|||||||
return time.Duration(deadline-t) * time.Millisecond
|
return time.Duration(deadline-t) * time.Millisecond
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchMetricNames returns metric names matching the given tfss on the given tr.
|
// SearchMetricNames returns marshaled metric names matching the given tfss on the given tr.
|
||||||
func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]MetricName, error) {
|
//
|
||||||
|
// The marshaled metric names must be unmarshaled via MetricName.UnmarshalString().
|
||||||
|
func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
|
||||||
qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr)
|
qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr)
|
||||||
defer qt.Done()
|
defer qt.Done()
|
||||||
tsids, err := s.searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
|
tsids, err := s.searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
|
||||||
@ -1161,7 +1163,8 @@ func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
|
|||||||
accountID := tsids[0].AccountID
|
accountID := tsids[0].AccountID
|
||||||
projectID := tsids[0].ProjectID
|
projectID := tsids[0].ProjectID
|
||||||
idb := s.idb()
|
idb := s.idb()
|
||||||
mns := make([]MetricName, 0, len(tsids))
|
metricNames := make([]string, 0, len(tsids))
|
||||||
|
metricNamesSeen := make(map[string]struct{}, len(tsids))
|
||||||
var metricName []byte
|
var metricName []byte
|
||||||
for i := range tsids {
|
for i := range tsids {
|
||||||
if i&paceLimiterSlowIterationsMask == 0 {
|
if i&paceLimiterSlowIterationsMask == 0 {
|
||||||
@ -1180,14 +1183,15 @@ func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
|
|||||||
}
|
}
|
||||||
return nil, fmt.Errorf("error when searching metricName for metricID=%d: %w", metricID, err)
|
return nil, fmt.Errorf("error when searching metricName for metricID=%d: %w", metricID, err)
|
||||||
}
|
}
|
||||||
mns = mns[:len(mns)+1]
|
if _, ok := metricNamesSeen[string(metricName)]; ok {
|
||||||
mn := &mns[len(mns)-1]
|
// The given metric name was already seen; skip it
|
||||||
if err = mn.Unmarshal(metricName); err != nil {
|
continue
|
||||||
return nil, fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
|
|
||||||
}
|
}
|
||||||
|
metricNames = append(metricNames, string(metricName))
|
||||||
|
metricNamesSeen[metricNames[len(metricNames)-1]] = struct{}{}
|
||||||
}
|
}
|
||||||
qt.Printf("loaded %d metric names", len(mns))
|
qt.Printf("loaded %d metric names", len(metricNames))
|
||||||
return mns, nil
|
return metricNames, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
|
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
|
||||||
|
@ -934,14 +934,21 @@ func testStorageRegisterMetricNames(s *Storage) error {
|
|||||||
if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil {
|
if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil {
|
||||||
return fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
|
return fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
|
||||||
}
|
}
|
||||||
mns, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline)
|
metricNames, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in SearchMetricNames: %w", err)
|
return fmt.Errorf("error in SearchMetricNames: %w", err)
|
||||||
}
|
}
|
||||||
if len(mns) < metricsPerAdd {
|
if err != nil {
|
||||||
return fmt.Errorf("unexpected number of metricNames returned from SearchMetricNames; got %d; want at least %d", len(mns), int(metricsPerAdd))
|
return fmt.Errorf("cannot unmarshal metric names: %w", err)
|
||||||
}
|
}
|
||||||
for i, mn := range mns {
|
if len(metricNames) < metricsPerAdd {
|
||||||
|
return fmt.Errorf("unexpected number of metricNames returned from SearchMetricNames; got %d; want at least %d", len(metricNames), int(metricsPerAdd))
|
||||||
|
}
|
||||||
|
var mn MetricName
|
||||||
|
for i, metricName := range metricNames {
|
||||||
|
if err := mn.UnmarshalString(metricName); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
|
||||||
|
}
|
||||||
addID := mn.GetTagValue("add_id")
|
addID := mn.GetTagValue("add_id")
|
||||||
if string(addID) != "0" {
|
if string(addID) != "0" {
|
||||||
return fmt.Errorf("unexpected addID for metricName #%d; got %q; want %q", i, addID, "0")
|
return fmt.Errorf("unexpected addID for metricName #%d; got %q; want %q", i, addID, "0")
|
||||||
@ -957,12 +964,12 @@ func testStorageRegisterMetricNames(s *Storage) error {
|
|||||||
if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil {
|
if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil {
|
||||||
return fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
|
return fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
|
||||||
}
|
}
|
||||||
mns, err = s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline)
|
metricNames, err = s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in SearchMetricNames for incorrect accountID, projectID: %w", err)
|
return fmt.Errorf("error in SearchMetricNames for incorrect accountID, projectID: %w", err)
|
||||||
}
|
}
|
||||||
if len(mns) > 0 {
|
if len(metricNames) > 0 {
|
||||||
return fmt.Errorf("SearchMetricNames with incorrect accountID, projectID returns unexpected non-empty result:\n%+v", mns)
|
return fmt.Errorf("SearchMetricNames with incorrect accountID, projectID returns unexpected non-empty result:\n%+v", metricNames)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -13,7 +13,7 @@ type API interface {
|
|||||||
InitSearch(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) (BlockIterator, error)
|
InitSearch(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) (BlockIterator, error)
|
||||||
|
|
||||||
// SearchMetricNames returns metric names matching the given tfss.
|
// SearchMetricNames returns metric names matching the given tfss.
|
||||||
SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]storage.MetricName, error)
|
SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]string, error)
|
||||||
|
|
||||||
// LabelValues returns values for labelName label acorss series matching the given tfss.
|
// LabelValues returns values for labelName label acorss series matching the given tfss.
|
||||||
LabelValues(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, labelName string, maxLabelValues, maxMetrics int, deadline uint64) ([]string, error)
|
LabelValues(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, labelName string, maxLabelValues, maxMetrics int, deadline uint64) ([]string, error)
|
||||||
|
@ -475,7 +475,7 @@ func (s *Server) processRequest(ctx *vmselectRequestCtx) error {
|
|||||||
func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error {
|
func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error {
|
||||||
switch rpcName {
|
switch rpcName {
|
||||||
case "search_v7":
|
case "search_v7":
|
||||||
return s.processSeriesSearch(ctx)
|
return s.processSearch(ctx)
|
||||||
case "searchMetricNames_v3":
|
case "searchMetricNames_v3":
|
||||||
return s.processSearchMetricNames(ctx)
|
return s.processSearchMetricNames(ctx)
|
||||||
case "labelValues_v5":
|
case "labelValues_v5":
|
||||||
@ -860,7 +860,7 @@ func (s *Server) processSearchMetricNames(ctx *vmselectRequestCtx) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
mns, err := s.api.SearchMetricNames(ctx.qt, tfss, tr, maxMetrics, ctx.deadline)
|
metricNames, err := s.api.SearchMetricNames(ctx.qt, tfss, tr, maxMetrics, ctx.deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
@ -871,21 +871,20 @@ func (s *Server) processSearchMetricNames(ctx *vmselectRequestCtx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send response.
|
// Send response.
|
||||||
metricNamesCount := len(mns)
|
metricNamesCount := len(metricNames)
|
||||||
if err := ctx.writeUint64(uint64(metricNamesCount)); err != nil {
|
if err := ctx.writeUint64(uint64(metricNamesCount)); err != nil {
|
||||||
return fmt.Errorf("cannot send metricNamesCount: %w", err)
|
return fmt.Errorf("cannot send metricNamesCount: %w", err)
|
||||||
}
|
}
|
||||||
for i, mn := range mns {
|
for i, metricName := range metricNames {
|
||||||
ctx.dataBuf = mn.Marshal(ctx.dataBuf[:0])
|
if err := ctx.writeString(metricName); err != nil {
|
||||||
if err := ctx.writeDataBufBytes(); err != nil {
|
|
||||||
return fmt.Errorf("cannot send metricName #%d: %w", i+1, err)
|
return fmt.Errorf("cannot send metricName #%d: %w", i+1, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ctx.qt.Printf("sent %d series to vmselect", len(mns))
|
ctx.qt.Printf("sent %d series to vmselect", len(metricNames))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) processSeriesSearch(ctx *vmselectRequestCtx) error {
|
func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
|
||||||
s.searchRequests.Inc()
|
s.searchRequests.Inc()
|
||||||
|
|
||||||
// Read request.
|
// Read request.
|
||||||
|
Loading…
Reference in New Issue
Block a user