mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-05 22:32:20 +01:00
lib/storage: add querytracer to more contexts
querytracer has been added to the following storage.Storage methods: - RegisterMetricNames - DeleteMetrics - SearchTagValueSuffixes - SearchGraphitePaths
This commit is contained in:
parent
6c66804fd3
commit
926fccbb8d
@ -489,7 +489,7 @@ func (s *VMSelectServer) processRegisterMetricNames(ctx *vmselectRequestCtx) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Register metric names from mrs.
|
// Register metric names from mrs.
|
||||||
if err := s.storage.RegisterMetricNames(mrs); err != nil {
|
if err := s.storage.RegisterMetricNames(ctx.qt, mrs); err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -518,7 +518,7 @@ func (s *VMSelectServer) processDeleteMetrics(ctx *vmselectRequestCtx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete the given metrics.
|
// Delete the given metrics.
|
||||||
deletedCount, err := s.storage.DeleteMetrics(ctx.tfss)
|
deletedCount, err := s.storage.DeleteMetrics(ctx.qt, ctx.tfss)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
@ -665,7 +665,7 @@ func (s *VMSelectServer) processTagValueSuffixes(ctx *vmselectRequestCtx) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Search for tag value suffixes
|
// Search for tag value suffixes
|
||||||
suffixes, err := s.storage.SearchTagValueSuffixes(accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, *maxTagValueSuffixesPerSearch, ctx.deadline)
|
suffixes, err := s.storage.SearchTagValueSuffixes(ctx.qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, *maxTagValueSuffixesPerSearch, ctx.deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
@ -965,7 +965,7 @@ func (ctx *vmselectRequestCtx) setupTfss(s *storage.Storage, tr storage.TimeRang
|
|||||||
if string(tf.Key) == "__graphite__" {
|
if string(tf.Key) == "__graphite__" {
|
||||||
query := tf.Value
|
query := tf.Value
|
||||||
maxMetrics := ctx.getMaxMetrics()
|
maxMetrics := ctx.getMaxMetrics()
|
||||||
paths, err := s.SearchGraphitePaths(accountID, projectID, tr, query, maxMetrics, ctx.deadline)
|
paths, err := s.SearchGraphitePaths(ctx.qt, accountID, projectID, tr, query, maxMetrics, ctx.deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
|
return fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
|
||||||
}
|
}
|
||||||
|
@ -1061,8 +1061,12 @@ func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer
|
|||||||
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
||||||
//
|
//
|
||||||
// If it returns maxTagValueSuffixes suffixes, then it is likely more than maxTagValueSuffixes suffixes is found.
|
// If it returns maxTagValueSuffixes suffixes, then it is likely more than maxTagValueSuffixes suffixes is found.
|
||||||
func (db *indexDB) SearchTagValueSuffixes(accountID, projectID uint32, tr TimeRange, tagKey, tagValuePrefix []byte,
|
func (db *indexDB) SearchTagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr TimeRange, tagKey, tagValuePrefix []byte,
|
||||||
delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||||
|
qt = qt.NewChild("search tag value suffixes for accountID=%d, projectID=%d, timeRange=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c, maxTagValueSuffixes=%d",
|
||||||
|
accountID, projectID, &tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||||
|
defer qt.Done()
|
||||||
|
|
||||||
// TODO: cache results?
|
// TODO: cache results?
|
||||||
|
|
||||||
tvss := make(map[string]struct{})
|
tvss := make(map[string]struct{})
|
||||||
@ -1075,7 +1079,9 @@ func (db *indexDB) SearchTagValueSuffixes(accountID, projectID uint32, tr TimeRa
|
|||||||
if len(tvss) < maxTagValueSuffixes {
|
if len(tvss) < maxTagValueSuffixes {
|
||||||
ok := db.doExtDB(func(extDB *indexDB) {
|
ok := db.doExtDB(func(extDB *indexDB) {
|
||||||
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
||||||
|
qtChild := qt.NewChild("search tag value suffixes in the previous indexdb")
|
||||||
err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||||
|
qtChild.Done()
|
||||||
extDB.putIndexSearch(is)
|
extDB.putIndexSearch(is)
|
||||||
})
|
})
|
||||||
if ok && err != nil {
|
if ok && err != nil {
|
||||||
@ -1092,6 +1098,7 @@ func (db *indexDB) SearchTagValueSuffixes(accountID, projectID uint32, tr TimeRa
|
|||||||
suffixes = suffixes[:maxTagValueSuffixes]
|
suffixes = suffixes[:maxTagValueSuffixes]
|
||||||
}
|
}
|
||||||
// Do not sort suffixes, since they must be sorted by vmselect.
|
// Do not sort suffixes, since they must be sorted by vmselect.
|
||||||
|
qt.Printf("found %d suffixes", len(suffixes))
|
||||||
return suffixes, nil
|
return suffixes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1581,7 +1588,9 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64, accoun
|
|||||||
// The caller must reset all the caches which may contain the deleted TSIDs.
|
// The caller must reset all the caches which may contain the deleted TSIDs.
|
||||||
//
|
//
|
||||||
// Returns the number of metrics deleted.
|
// Returns the number of metrics deleted.
|
||||||
func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) {
|
||||||
|
qt = qt.NewChild("deleting series for %s", tfss)
|
||||||
|
defer qt.Done()
|
||||||
if len(tfss) == 0 {
|
if len(tfss) == 0 {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
@ -1592,7 +1601,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
|||||||
MaxTimestamp: (1 << 63) - 1,
|
MaxTimestamp: (1 << 63) - 1,
|
||||||
}
|
}
|
||||||
is := db.getIndexSearch(tfss[0].accountID, tfss[0].projectID, noDeadline)
|
is := db.getIndexSearch(tfss[0].accountID, tfss[0].projectID, noDeadline)
|
||||||
metricIDs, err := is.searchMetricIDs(nil, tfss, tr, 2e9)
|
metricIDs, err := is.searchMetricIDs(qt, tfss, tr, 2e9)
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -1605,7 +1614,9 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
|||||||
deletedCount := len(metricIDs)
|
deletedCount := len(metricIDs)
|
||||||
if db.doExtDB(func(extDB *indexDB) {
|
if db.doExtDB(func(extDB *indexDB) {
|
||||||
var n int
|
var n int
|
||||||
n, err = extDB.DeleteTSIDs(tfss)
|
qtChild := qt.NewChild("deleting series from the previos indexdb")
|
||||||
|
n, err = extDB.DeleteTSIDs(qtChild, tfss)
|
||||||
|
qtChild.Donef("deleted %d series", n)
|
||||||
deletedCount += n
|
deletedCount += n
|
||||||
}) {
|
}) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1343,8 +1343,8 @@ var ErrDeadlineExceeded = fmt.Errorf("deadline exceeded")
|
|||||||
// DeleteMetrics deletes all the metrics matching the given tfss.
|
// DeleteMetrics deletes all the metrics matching the given tfss.
|
||||||
//
|
//
|
||||||
// Returns the number of metrics deleted.
|
// Returns the number of metrics deleted.
|
||||||
func (s *Storage) DeleteMetrics(tfss []*TagFilters) (int, error) {
|
func (s *Storage) DeleteMetrics(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) {
|
||||||
deletedCount, err := s.idb().DeleteTSIDs(tfss)
|
deletedCount, err := s.idb().DeleteTSIDs(qt, tfss)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
|
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
|
||||||
}
|
}
|
||||||
@ -1374,15 +1374,15 @@ func (s *Storage) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer
|
|||||||
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
||||||
//
|
//
|
||||||
// If more than maxTagValueSuffixes suffixes is found, then only the first maxTagValueSuffixes suffixes is returned.
|
// If more than maxTagValueSuffixes suffixes is found, then only the first maxTagValueSuffixes suffixes is returned.
|
||||||
func (s *Storage) SearchTagValueSuffixes(accountID, projectID uint32, tr TimeRange, tagKey, tagValuePrefix []byte,
|
func (s *Storage) SearchTagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr TimeRange, tagKey, tagValuePrefix []byte,
|
||||||
delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||||
return s.idb().SearchTagValueSuffixes(accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
|
return s.idb().SearchTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchGraphitePaths returns all the matching paths for the given graphite query on the given tr.
|
// SearchGraphitePaths returns all the matching paths for the given graphite query on the given tr.
|
||||||
func (s *Storage) SearchGraphitePaths(accountID, projectID uint32, tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
|
func (s *Storage) SearchGraphitePaths(qt *querytracer.Tracer, accountID, projectID uint32, tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||||
query = replaceAlternateRegexpsWithGraphiteWildcards(query)
|
query = replaceAlternateRegexpsWithGraphiteWildcards(query)
|
||||||
return s.searchGraphitePaths(accountID, projectID, tr, nil, query, maxPaths, deadline)
|
return s.searchGraphitePaths(qt, accountID, projectID, tr, nil, query, maxPaths, deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
// replaceAlternateRegexpsWithGraphiteWildcards replaces (foo|..|bar) with {foo,...,bar} in b and returns the new value.
|
// replaceAlternateRegexpsWithGraphiteWildcards replaces (foo|..|bar) with {foo,...,bar} in b and returns the new value.
|
||||||
@ -1427,12 +1427,12 @@ func replaceAlternateRegexpsWithGraphiteWildcards(b []byte) []byte {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) searchGraphitePaths(accountID, projectID uint32, tr TimeRange, qHead, qTail []byte, maxPaths int, deadline uint64) ([]string, error) {
|
func (s *Storage) searchGraphitePaths(qt *querytracer.Tracer, accountID, projectID uint32, tr TimeRange, qHead, qTail []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||||
n := bytes.IndexAny(qTail, "*[{")
|
n := bytes.IndexAny(qTail, "*[{")
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
// Verify that qHead matches a metric name.
|
// Verify that qHead matches a metric name.
|
||||||
qHead = append(qHead, qTail...)
|
qHead = append(qHead, qTail...)
|
||||||
suffixes, err := s.SearchTagValueSuffixes(accountID, projectID, tr, nil, qHead, '.', 1, deadline)
|
suffixes, err := s.SearchTagValueSuffixes(qt, accountID, projectID, tr, nil, qHead, '.', 1, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -1447,7 +1447,7 @@ func (s *Storage) searchGraphitePaths(accountID, projectID uint32, tr TimeRange,
|
|||||||
return []string{string(qHead)}, nil
|
return []string{string(qHead)}, nil
|
||||||
}
|
}
|
||||||
qHead = append(qHead, qTail[:n]...)
|
qHead = append(qHead, qTail[:n]...)
|
||||||
suffixes, err := s.SearchTagValueSuffixes(accountID, projectID, tr, nil, qHead, '.', maxPaths, deadline)
|
suffixes, err := s.SearchTagValueSuffixes(qt, accountID, projectID, tr, nil, qHead, '.', maxPaths, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -1484,7 +1484,7 @@ func (s *Storage) searchGraphitePaths(accountID, projectID uint32, tr TimeRange,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
qHead = append(qHead[:qHeadLen], suffix...)
|
qHead = append(qHead[:qHeadLen], suffix...)
|
||||||
ps, err := s.searchGraphitePaths(accountID, projectID, tr, qHead, qTail, maxPaths, deadline)
|
ps, err := s.searchGraphitePaths(qt, accountID, projectID, tr, qHead, qTail, maxPaths, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -1774,7 +1774,9 @@ var (
|
|||||||
//
|
//
|
||||||
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
|
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
|
||||||
// Th MetricRow.Value field is ignored.
|
// Th MetricRow.Value field is ignored.
|
||||||
func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
|
func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) error {
|
||||||
|
qt = qt.NewChild("registering %d series", len(mrs))
|
||||||
|
defer qt.Done()
|
||||||
var metricName []byte
|
var metricName []byte
|
||||||
var genTSID generationTSID
|
var genTSID generationTSID
|
||||||
mn := GetMetricName()
|
mn := GetMetricName()
|
||||||
|
@ -694,7 +694,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
|||||||
if n := metricBlocksCount(tfs); n == 0 {
|
if n := metricBlocksCount(tfs); n == 0 {
|
||||||
return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs)
|
return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs)
|
||||||
}
|
}
|
||||||
deletedCount, err := s.DeleteMetrics([]*TagFilters{tfs})
|
deletedCount, err := s.DeleteMetrics(nil, []*TagFilters{tfs})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot delete metrics: %w", err)
|
return fmt.Errorf("cannot delete metrics: %w", err)
|
||||||
}
|
}
|
||||||
@ -706,7 +706,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Try deleting empty tfss
|
// Try deleting empty tfss
|
||||||
deletedCount, err = s.DeleteMetrics(nil)
|
deletedCount, err = s.DeleteMetrics(nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot delete empty tfss: %w", err)
|
return fmt.Errorf("cannot delete empty tfss: %w", err)
|
||||||
}
|
}
|
||||||
@ -827,8 +827,8 @@ func testStorageRegisterMetricNames(s *Storage) error {
|
|||||||
}
|
}
|
||||||
mrs = append(mrs, mr)
|
mrs = append(mrs, mr)
|
||||||
}
|
}
|
||||||
if err := s.RegisterMetricNames(mrs); err != nil {
|
if err := s.RegisterMetricNames(nil, mrs); err != nil {
|
||||||
return fmt.Errorf("unexpected error in AddMetrics: %w", err)
|
return fmt.Errorf("unexpected error in RegisterMetricNames: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var addIDsExpected []string
|
var addIDsExpected []string
|
||||||
|
Loading…
Reference in New Issue
Block a user