lib/vmselectapi: pass storage.SearchQuery to API calls instead of []*storage.TagFilters + storage.TimeRange + maxMetrics

This reduces the number of args to vmselectapi calls
This commit is contained in:
Aliaksandr Valialkin 2022-07-06 00:53:03 +03:00
parent 2e721f7d16
commit 1ec4dfd678
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
4 changed files with 135 additions and 143 deletions

View File

@ -32,7 +32,6 @@ func NewVMSelectServer(addr string, s *storage.Storage) (*vmselectapi.Server, er
s: s,
}
limits := vmselectapi.Limits{
MaxMetrics: *maxUniqueTimeseries,
MaxLabelNames: *maxTagKeys,
MaxLabelValues: *maxTagValues,
MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch,
@ -40,15 +39,24 @@ func NewVMSelectServer(addr string, s *storage.Storage) (*vmselectapi.Server, er
return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression)
}
// vmstorageAPI impelemnts vmselectapi.API
// vmstorageAPI impelements vmselectapi.API
type vmstorageAPI struct {
s *storage.Storage
}
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) (vmselectapi.BlockIterator, error) {
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
tr := sq.GetTimeRange()
if err := checkTimeRange(api.s, tr); err != nil {
return nil, err
}
maxMetrics := getMaxMetrics(sq)
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
if len(tfss) == 0 {
return nil, fmt.Errorf("missing tag filters")
}
bi := getBlockIterator()
bi.sr.Init(qt, api.s, tfss, tr, maxMetrics, deadline)
if err := bi.sr.Error(); err != nil {
@ -58,45 +66,114 @@ func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, tfss []*storage.TagF
return bi, nil
}
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
maxMetrics := getMaxMetrics(sq)
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
if len(tfss) == 0 {
return nil, fmt.Errorf("missing tag filters")
}
return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
}
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, labelName string,
maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) {
return api.s.SearchLabelValuesWithFiltersOnTimeRange(qt, accountID, projectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
maxMetrics := getMaxMetrics(sq)
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return api.s.SearchLabelValuesWithFiltersOnTimeRange(qt, sq.AccountID, sq.ProjectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
}
func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
maxSuffixes int, deadline uint64) ([]string, error) {
return api.s.SearchTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
suffixes, err := api.s.SearchTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
if err != nil {
return nil, err
}
if len(suffixes) >= maxSuffixes {
return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; "+
"either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value", maxSuffixes)
}
return suffixes, nil
}
func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, maxLabelNames,
maxMetrics int, deadline uint64) ([]string, error) {
return api.s.SearchLabelNamesWithFiltersOnTimeRange(qt, accountID, projectID, tfss, tr, maxLabelNames, maxMetrics, deadline)
func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
tr := sq.GetTimeRange()
maxMetrics := getMaxMetrics(sq)
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
return api.s.SearchLabelNamesWithFiltersOnTimeRange(qt, sq.AccountID, sq.ProjectID, tfss, tr, maxLabelNames, maxMetrics, deadline)
}
func (api *vmstorageAPI) SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
return api.s.GetSeriesCount(accountID, projectID, deadline)
}
func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, date uint64, focusLabel string,
topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error) {
return api.s.GetTSDBStatus(qt, accountID, projectID, tfss, date, focusLabel, topN, maxMetrics, deadline)
func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
tr := sq.GetTimeRange()
maxMetrics := getMaxMetrics(sq)
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
date := uint64(sq.MinTimestamp) / (24 * 3600 * 1000)
return api.s.GetTSDBStatus(qt, sq.AccountID, sq.ProjectID, tfss, date, focusLabel, topN, maxMetrics, deadline)
}
func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, tfss []*storage.TagFilters, maxMetrics int, deadline uint64) (int, error) {
func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
tr := sq.GetTimeRange()
maxMetrics := getMaxMetrics(sq)
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
if err != nil {
return 0, err
}
if len(tfss) == 0 {
return 0, fmt.Errorf("missing tag filters")
}
return api.s.DeleteSeries(qt, tfss)
}
func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) error {
func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
return api.s.RegisterMetricNames(qt, mrs)
}
func (api *vmstorageAPI) SearchGraphitePaths(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, query []byte,
maxMetrics int, deadline uint64) ([]string, error) {
return api.s.SearchGraphitePaths(qt, accountID, projectID, tr, query, maxMetrics, deadline)
func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) {
tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss))
accountID := sq.AccountID
projectID := sq.ProjectID
for _, tagFilters := range sq.TagFilterss {
tfs := storage.NewTagFilters(accountID, projectID)
for i := range tagFilters {
tf := &tagFilters[i]
if string(tf.Key) == "__graphite__" {
query := tf.Value
qtChild := qt.NewChild("searching for series matching __graphite__=%q", query)
paths, err := api.s.SearchGraphitePaths(qtChild, accountID, projectID, tr, query, maxMetrics, deadline)
qtChild.Donef("found %d series", len(paths))
if err != nil {
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
}
if len(paths) >= maxMetrics {
return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+
"either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes", maxMetrics, query)
}
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
continue
}
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
}
}
tfss = append(tfss, tfs)
}
return tfss, nil
}
// blockIterator implements vmselectapi.BlockIterator
@ -148,3 +225,15 @@ func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error {
StatusCode: http.StatusServiceUnavailable,
}
}
func getMaxMetrics(sq *storage.SearchQuery) int {
maxMetrics := sq.MaxMetrics
maxMetricsLimit := *maxUniqueTimeseries
if maxMetricsLimit <= 0 {
maxMetricsLimit = 2e9
}
if maxMetrics <= 0 || maxMetrics > maxMetricsLimit {
maxMetrics = maxMetricsLimit
}
return maxMetrics
}

View File

@ -269,6 +269,14 @@ type SearchQuery struct {
MaxMetrics int
}
// GetTimeRange returns time range for the given sq.
func (sq *SearchQuery) GetTimeRange() TimeRange {
return TimeRange{
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
}
// NewSearchQuery creates new search query for the given args.
func NewSearchQuery(accountID, projectID uint32, start, end int64, tagFilterss [][]TagFilter, maxMetrics int) *SearchQuery {
if maxMetrics <= 0 {

View File

@ -7,37 +7,34 @@ import (
// API must implement vmselect API.
type API interface {
// InitSearch initialize series search for the given tfss.
// InitSearch initialize series search for the given sq.
//
// The returned BlockIterator must be closed with MustClose to free up resources when it is no longer needed.
InitSearch(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) (BlockIterator, error)
InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (BlockIterator, error)
// SearchMetricNames returns metric names matching the given tfss.
SearchMetricNames(qt *querytracer.Tracer, tfss []*storage.TagFilters, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]string, error)
// SearchMetricNames returns metric names matching the given sq.
SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error)
// LabelValues returns values for labelName label acorss series matching the given tfss.
LabelValues(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, labelName string, maxLabelValues, maxMetrics int, deadline uint64) ([]string, error)
// LabelValues returns values for labelName label acorss series matching the given sq.
LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error)
// TagValueSuffixes returns tag value suffixes for the given args.
TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxSuffixes int, deadline uint64) ([]string, error)
// LabelNames returns lable names for series matching the given tfss.
LabelNames(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, tr storage.TimeRange, maxLableNames, maxMetrics int, deadline uint64) ([]string, error)
// LabelNames returns lable names for series matching the given sq.
LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLableNames int, deadline uint64) ([]string, error)
// SeriesCount returns the number of series for the given (accountID, projectID).
SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error)
// TSDBStatus returns tsdb status for the given sq.
TSDBStatus(qt *querytracer.Tracer, accountID, projectID uint32, tfss []*storage.TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64) (*storage.TSDBStatus, error)
TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error)
// DeleteSeries deletes series matching the given tfss.
DeleteSeries(qt *querytracer.Tracer, tfss []*storage.TagFilters, maxMetrics int, deadline uint64) (int, error)
// DeleteSeries deletes series matching the given sq.
DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error)
// RegisterMetricNames registers the given mrs in the storage.
RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) error
// SearchGraphitePaths searches for Graphite paths for the given query.
SearchGraphitePaths(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, query []byte, maxMetrics int, deadline uint64) ([]string, error)
RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error
}
// BlockIterator must iterate through series blocks found by VMSelect.InitSearch.

View File

@ -66,9 +66,6 @@ type Server struct {
// Limits contains various limits for Server.
type Limits struct {
// MaxMetrics is the maximum number of time series, which may be returned from various API calls.
MaxMetrics int
// MaxLabelNames is the maximum label names, which may be returned from labelNames request.
MaxLabelNames int
@ -526,7 +523,7 @@ func (s *Server) processRegisterMetricNames(ctx *vmselectRequestCtx) error {
}
// Register metric names from mrs.
if err := s.api.RegisterMetricNames(ctx.qt, mrs); err != nil {
if err := s.api.RegisterMetricNames(ctx.qt, mrs, ctx.deadline); err != nil {
return ctx.writeErrorMessage(err)
}
@ -546,16 +543,7 @@ func (s *Server) processDeleteSeries(ctx *vmselectRequestCtx) error {
}
// Execute the request.
tr := storage.TimeRange{
MinTimestamp: 0,
MaxTimestamp: time.Now().UnixNano() / 1e6,
}
maxMetrics := s.getMaxMetrics(ctx)
tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
deletedCount, err := s.api.DeleteSeries(ctx.qt, tfss, maxMetrics, ctx.deadline)
deletedCount, err := s.api.DeleteSeries(ctx.qt, &ctx.sq, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
@ -587,16 +575,7 @@ func (s *Server) processLabelNames(ctx *vmselectRequestCtx) error {
}
// Execute the request
tr := storage.TimeRange{
MinTimestamp: ctx.sq.MinTimestamp,
MaxTimestamp: ctx.sq.MaxTimestamp,
}
maxMetrics := s.getMaxMetrics(ctx)
tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
labelNames, err := s.api.LabelNames(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, tfss, tr, maxLabelNames, maxMetrics, ctx.deadline)
labelNames, err := s.api.LabelNames(ctx.qt, &ctx.sq, maxLabelNames, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
@ -641,16 +620,7 @@ func (s *Server) processLabelValues(ctx *vmselectRequestCtx) error {
}
// Execute the request
tr := storage.TimeRange{
MinTimestamp: ctx.sq.MinTimestamp,
MaxTimestamp: ctx.sq.MaxTimestamp,
}
maxMetrics := s.getMaxMetrics(ctx)
tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
labelValues, err := s.api.LabelValues(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, tfss, tr, labelName, maxLabelValues, maxMetrics, ctx.deadline)
labelValues, err := s.api.LabelValues(ctx.qt, &ctx.sq, labelName, maxLabelValues, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
@ -785,17 +755,7 @@ func (s *Server) processTSDBStatus(ctx *vmselectRequestCtx) error {
}
// Execute the request
tr := storage.TimeRange{
MinTimestamp: ctx.sq.MinTimestamp,
MaxTimestamp: ctx.sq.MaxTimestamp,
}
maxMetrics := s.getMaxMetrics(ctx)
tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
date := uint64(ctx.sq.MinTimestamp) / (24 * 3600 * 1000)
status, err := s.api.TSDBStatus(ctx.qt, ctx.sq.AccountID, ctx.sq.ProjectID, tfss, date, focusLabel, int(topN), maxMetrics, ctx.deadline)
status, err := s.api.TSDBStatus(ctx.qt, &ctx.sq, focusLabel, int(topN), ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
@ -858,16 +818,7 @@ func (s *Server) processSearchMetricNames(ctx *vmselectRequestCtx) error {
}
// Execute request.
tr := storage.TimeRange{
MinTimestamp: ctx.sq.MinTimestamp,
MaxTimestamp: ctx.sq.MaxTimestamp,
}
maxMetrics := s.getMaxMetrics(ctx)
tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
metricNames, err := s.api.SearchMetricNames(ctx.qt, tfss, tr, maxMetrics, ctx.deadline)
metricNames, err := s.api.SearchMetricNames(ctx.qt, &ctx.sq, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
@ -901,16 +852,7 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
// Initiaialize the search.
startTime := time.Now()
tr := storage.TimeRange{
MinTimestamp: ctx.sq.MinTimestamp,
MaxTimestamp: ctx.sq.MaxTimestamp,
}
maxMetrics := s.getMaxMetrics(ctx)
tfss, err := s.setupTfss(ctx.qt, &ctx.sq, tr, maxMetrics, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
bi, err := s.api.InitSearch(ctx.qt, tfss, tr, maxMetrics, ctx.deadline)
bi, err := s.api.InitSearch(ctx.qt, &ctx.sq, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
@ -945,47 +887,3 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
}
return nil
}
func (s *Server) getMaxMetrics(ctx *vmselectRequestCtx) int {
maxMetrics := ctx.sq.MaxMetrics
maxMetricsLimit := s.limits.MaxMetrics
if maxMetricsLimit <= 0 {
maxMetricsLimit = 2e9
}
if maxMetrics <= 0 || maxMetrics > maxMetricsLimit {
maxMetrics = maxMetricsLimit
}
return maxMetrics
}
func (s *Server) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) {
tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss))
accountID := sq.AccountID
projectID := sq.ProjectID
for _, tagFilters := range sq.TagFilterss {
tfs := storage.NewTagFilters(accountID, projectID)
for i := range tagFilters {
tf := &tagFilters[i]
if string(tf.Key) == "__graphite__" {
query := tf.Value
qtChild := qt.NewChild("searching for series matching __graphite__=%q", query)
paths, err := s.api.SearchGraphitePaths(qtChild, accountID, projectID, tr, query, maxMetrics, deadline)
qtChild.Donef("found %d series", len(paths))
if err != nil {
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
}
if len(paths) >= maxMetrics {
return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+
"either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes", maxMetrics, query)
}
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
continue
}
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
}
}
tfss = append(tfss, tfs)
}
return tfss, nil
}