diff --git a/app/vmstorage/servers/vmselect.go b/app/vmstorage/servers/vmselect.go index c8a64f81e7..fcfdc05588 100644 --- a/app/vmstorage/servers/vmselect.go +++ b/app/vmstorage/servers/vmselect.go @@ -32,7 +32,6 @@ func NewVMSelectServer(addr string, s *storage.Storage) (*vmselectapi.Server, er s: s, } limits := vmselectapi.Limits{ - MaxMetrics: *maxUniqueTimeseries, MaxLabelNames: *maxTagKeys, MaxLabelValues: *maxTagValues, MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch, @@ -40,15 +39,24 @@ func NewVMSelectServer(addr string, s *storage.Storage) (*vmselectapi.Server, er return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression) } -// vmstorageAPI impelemnts vmselectapi.API +// vmstorageAPI impelements vmselectapi.API type vmstorageAPI struct { s *storage.Storage } -func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) (vmselectapi.BlockIterator, error) { +func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) { + tr := sq.GetTimeRange() if err := checkTimeRange(api.s, tr); err != nil { return nil, err } + maxMetrics := getMaxMetrics(sq) + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + if len(tfss) == 0 { + return nil, fmt.Errorf("missing tag filters") + } bi := getBlockIterator() bi.sr.Init(qt, api.s, tfss, tr, maxMetrics, deadline) if err := bi.sr.Error(); err != nil { @@ -58,45 +66,114 @@ func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, tfss []*storage.TagF return bi, nil } -func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]string, error) { +func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) { + tr := sq.GetTimeRange() + maxMetrics := getMaxMetrics(sq) + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + if len(tfss) == 0 { + return nil, fmt.Errorf("missing tag filters") + } return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline) } -func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, labelName string, - maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) { - return api.s.SearchLabelValuesWithFiltersOnTimeRange(qt, accountID, projectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline) +func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) { + tr := sq.GetTimeRange() + maxMetrics := getMaxMetrics(sq) + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + return api.s.SearchLabelValuesWithFiltersOnTimeRange(qt, sq.AccountID, sq.ProjectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline) } func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) { - return api.s.SearchTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline) + suffixes, err := api.s.SearchTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline) + if err != nil { + return nil, err + } + if len(suffixes) >= maxSuffixes { + return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; "+ + "either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value", maxSuffixes) + } + return suffixes, nil } -func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, maxLabelNames, - maxMetrics int, deadline uint64) ([]string, error) { - return api.s.SearchLabelNamesWithFiltersOnTimeRange(qt, accountID, projectID, tfss, tr, maxLabelNames, maxMetrics, deadline) +func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) { + tr := sq.GetTimeRange() + maxMetrics := getMaxMetrics(sq) + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + return api.s.SearchLabelNamesWithFiltersOnTimeRange(qt, sq.AccountID, sq.ProjectID, tfss, tr, maxLabelNames, maxMetrics, deadline) } func (api *vmstorageAPI) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) { return api.s.GetSeriesCount(accountID, projectID, deadline) } -func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, date uint64, focusLabel string, - topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) { - return api.s.GetTSDBStatus(qt, accountID, projectID, tfss, date, focusLabel, topN, maxMetrics, deadline) +func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) { + tr := sq.GetTimeRange() + maxMetrics := getMaxMetrics(sq) + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return nil, err + } + date := uint64(sq.MinTimestamp) / (24 * 3600 * 1000) + return api.s.GetTSDBStatus(qt, sq.AccountID, sq.ProjectID, tfss, date, focusLabel, topN, maxMetrics, deadline) } -func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, tfss []*storage.TagFilters, maxMetrics int, deadline uint64) (int, error) { +func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) { + tr := sq.GetTimeRange() + maxMetrics := getMaxMetrics(sq) + tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline) + if err != nil { + return 0, err + } + if len(tfss) == 0 { + return 0, fmt.Errorf("missing tag filters") + } return api.s.DeleteSeries(qt, tfss) } -func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) error { +func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error { return api.s.RegisterMetricNames(qt, mrs) } -func (api *vmstorageAPI) SearchGraphitePaths(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, query []byte, - maxMetrics int, deadline uint64) ([]string, error) { - return api.s.SearchGraphitePaths(qt, accountID, projectID, tr, query, maxMetrics, deadline) +func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) { + tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss)) + accountID := sq.AccountID + projectID := sq.ProjectID + for _, tagFilters := range sq.TagFilterss { + tfs := storage.NewTagFilters(accountID, projectID) + for i := range tagFilters { + tf := &tagFilters[i] + if string(tf.Key) == "__graphite__" { + query := tf.Value + qtChild := qt.NewChild("searching for series matching __graphite__=%q", query) + paths, err := api.s.SearchGraphitePaths(qtChild, accountID, projectID, tr, query, maxMetrics, deadline) + qtChild.Donef("found %d series", len(paths)) + if err != nil { + return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err) + } + if len(paths) >= maxMetrics { + return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+ + "either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes", maxMetrics, query) + } + tfs.AddGraphiteQuery(query, paths, tf.IsNegative) + continue + } + if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil { + return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err) + } + } + tfss = append(tfss, tfs) + } + return tfss, nil } // blockIterator implements vmselectapi.BlockIterator @@ -148,3 +225,15 @@ func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error { StatusCode: http.StatusServiceUnavailable, } } + +func getMaxMetrics(sq *storage.SearchQuery) int { + maxMetrics := sq.MaxMetrics + maxMetricsLimit := *maxUniqueTimeseries + if maxMetricsLimit <= 0 { + maxMetricsLimit = 2e9 + } + if maxMetrics <= 0 || maxMetrics > maxMetricsLimit { + maxMetrics = maxMetricsLimit + } + return maxMetrics +} diff --git a/lib/storage/search.go b/lib/storage/search.go index 13b6461e29..2ceb319676 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -269,6 +269,14 @@ type SearchQuery struct { MaxMetrics int } +// GetTimeRange returns time range for the given sq. +func (sq *SearchQuery) GetTimeRange() TimeRange { + return TimeRange{ + MinTimestamp: sq.MinTimestamp, + MaxTimestamp: sq.MaxTimestamp, + } +} + // NewSearchQuery creates new search query for the given args. func NewSearchQuery(accountID, projectID uint32, start, end int64, tagFilterss [][]TagFilter, maxMetrics int) *SearchQuery { if maxMetrics <= 0 { diff --git a/lib/vmselectapi/api.go b/lib/vmselectapi/api.go index 9b78980118..3e0bdf917b 100644 --- a/lib/vmselectapi/api.go +++ b/lib/vmselectapi/api.go @@ -7,37 +7,34 @@ import ( // API must implement vmselect API. type API interface { - // InitSearch initialize series search for the given tfss. + // InitSearch initialize series search for the given sq. // // The returned BlockIterator must be closed with MustClose to free up resources when it is no longer needed. - InitSearch(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) (BlockIterator, error) + InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (BlockIterator, error) - // SearchMetricNames returns metric names matching the given tfss. - SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]string, error) + // SearchMetricNames returns metric names matching the given sq. + SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) - // LabelValues returns values for labelName label acorss series matching the given tfss. - LabelValues(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, labelName string, maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) + // LabelValues returns values for labelName label acorss series matching the given sq. + LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) // TagValueSuffixes returns tag value suffixes for the given args. TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error) - // LabelNames returns lable names for series matching the given tfss. - LabelNames(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, maxLableNames, maxMetrics int, deadline uint64) ([]string, error) + // LabelNames returns lable names for series matching the given sq. + LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLableNames int, deadline uint64) ([]string, error) // SeriesCount returns the number of series for the given (accountID, projectID). SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) // TSDBStatus returns tsdb status for the given sq. - TSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) + TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) - // DeleteSeries deletes series matching the given tfss. - DeleteSeries(qt *querytracer.Tracer, tfss []*storage.TagFilters, maxMetrics int, deadline uint64) (int, error) + // DeleteSeries deletes series matching the given sq. + DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) // RegisterMetricNames registers the given mrs in the storage. - RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) error - - // SearchGraphitePaths searches for Graphite paths for the given query. - SearchGraphitePaths(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, query []byte, maxMetrics int, deadline uint64) ([]string, error) + RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error } // BlockIterator must iterate through series blocks found by VMSelect.InitSearch. diff --git a/lib/vmselectapi/server.go b/lib/vmselectapi/server.go index f9e389d01a..2eec19fb5f 100644 --- a/lib/vmselectapi/server.go +++ b/lib/vmselectapi/server.go @@ -66,9 +66,6 @@ type Server struct { // Limits contains various limits for Server. type Limits struct { - // MaxMetrics is the maximum number of time series, which may be returned from various API calls. - MaxMetrics int - // MaxLabelNames is the maximum label names, which may be returned from labelNames request. MaxLabelNames int @@ -526,7 +523,7 @@ func (s *Server) processRegisterMetricNames(ctx *vmselectRequestCtx) error { } // Register metric names from mrs. - if err := s.api.RegisterMetricNames(ctx.qt, mrs); err != nil { + if err := s.api.RegisterMetricNames(ctx.qt, mrs, ctx.deadline); err != nil { return ctx.writeErrorMessage(err) } @@ -546,16 +543,7 @@ func (s *Server) processDeleteSeries(ctx *vmselectRequestCtx) error { } // Execute the request. - tr := storage.TimeRange{ - MinTimestamp: 0, - MaxTimestamp: time.Now().UnixNano() / 1e6, - } - maxMetrics := s.getMaxMetrics(ctx) - tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - deletedCount, err := s.api.DeleteSeries(ctx.qt, tfss, maxMetrics, ctx.deadline) + deletedCount, err := s.api.DeleteSeries(ctx.qt, &ctx.sq, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -587,16 +575,7 @@ func (s *Server) processLabelNames(ctx *vmselectRequestCtx) error { } // Execute the request - tr := storage.TimeRange{ - MinTimestamp: ctx.sq.MinTimestamp, - MaxTimestamp: ctx.sq.MaxTimestamp, - } - maxMetrics := s.getMaxMetrics(ctx) - tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - labelNames, err := s.api.LabelNames(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, tfss, tr, maxLabelNames, maxMetrics, ctx.deadline) + labelNames, err := s.api.LabelNames(ctx.qt, &ctx.sq, maxLabelNames, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -641,16 +620,7 @@ func (s *Server) processLabelValues(ctx *vmselectRequestCtx) error { } // Execute the request - tr := storage.TimeRange{ - MinTimestamp: ctx.sq.MinTimestamp, - MaxTimestamp: ctx.sq.MaxTimestamp, - } - maxMetrics := s.getMaxMetrics(ctx) - tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - labelValues, err := s.api.LabelValues(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, tfss, tr, labelName, maxLabelValues, maxMetrics, ctx.deadline) + labelValues, err := s.api.LabelValues(ctx.qt, &ctx.sq, labelName, maxLabelValues, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -785,17 +755,7 @@ func (s *Server) processTSDBStatus(ctx *vmselectRequestCtx) error { } // Execute the request - tr := storage.TimeRange{ - MinTimestamp: ctx.sq.MinTimestamp, - MaxTimestamp: ctx.sq.MaxTimestamp, - } - maxMetrics := s.getMaxMetrics(ctx) - tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - date := uint64(ctx.sq.MinTimestamp) / (24 * 3600 * 1000) - status, err := s.api.TSDBStatus(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, tfss, date, focusLabel, int(topN), maxMetrics, ctx.deadline) + status, err := s.api.TSDBStatus(ctx.qt, &ctx.sq, focusLabel, int(topN), ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -858,16 +818,7 @@ func (s *Server) processSearchMetricNames(ctx *vmselectRequestCtx) error { } // Execute request. - tr := storage.TimeRange{ - MinTimestamp: ctx.sq.MinTimestamp, - MaxTimestamp: ctx.sq.MaxTimestamp, - } - maxMetrics := s.getMaxMetrics(ctx) - tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - metricNames, err := s.api.SearchMetricNames(ctx.qt, tfss, tr, maxMetrics, ctx.deadline) + metricNames, err := s.api.SearchMetricNames(ctx.qt, &ctx.sq, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -901,16 +852,7 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error { // Initiaialize the search. startTime := time.Now() - tr := storage.TimeRange{ - MinTimestamp: ctx.sq.MinTimestamp, - MaxTimestamp: ctx.sq.MaxTimestamp, - } - maxMetrics := s.getMaxMetrics(ctx) - tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - bi, err := s.api.InitSearch(ctx.qt, tfss, tr, maxMetrics, ctx.deadline) + bi, err := s.api.InitSearch(ctx.qt, &ctx.sq, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -945,47 +887,3 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error { } return nil } - -func (s *Server) getMaxMetrics(ctx *vmselectRequestCtx) int { - maxMetrics := ctx.sq.MaxMetrics - maxMetricsLimit := s.limits.MaxMetrics - if maxMetricsLimit <= 0 { - maxMetricsLimit = 2e9 - } - if maxMetrics <= 0 || maxMetrics > maxMetricsLimit { - maxMetrics = maxMetricsLimit - } - return maxMetrics -} - -func (s *Server) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) { - tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss)) - accountID := sq.AccountID - projectID := sq.ProjectID - for _, tagFilters := range sq.TagFilterss { - tfs := storage.NewTagFilters(accountID, projectID) - for i := range tagFilters { - tf := &tagFilters[i] - if string(tf.Key) == "__graphite__" { - query := tf.Value - qtChild := qt.NewChild("searching for series matching __graphite__=%q", query) - paths, err := s.api.SearchGraphitePaths(qtChild, accountID, projectID, tr, query, maxMetrics, deadline) - qtChild.Donef("found %d series", len(paths)) - if err != nil { - return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err) - } - if len(paths) >= maxMetrics { - return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+ - "either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes", maxMetrics, query) - } - tfs.AddGraphiteQuery(query, paths, tf.IsNegative) - continue - } - if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil { - return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err) - } - } - tfss = append(tfss, tfs) - } - return tfss, nil -}