mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-12 13:32:25 +01:00
85f60237e2
### Describe Your Changes Add support for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6930 Calculate `-search.maxUniqueTimeseries` by `-search.maxConcurrentRequests` and remaining memory if it's **not set** or **less equal than 0**. The remaining memory is affected by `-memory.allowedPercent`, `-memory.allowedBytes` and cgroup memory limit. ### Checklist The following checks are **mandatory**: - [x] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
295 lines
12 KiB
Go
295 lines
12 KiB
Go
package servers
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
|
|
)
|
|
|
|
var (
|
|
maxUniqueTimeseries = flag.Int("search.maxUniqueTimeseries", 0, "The maximum number of unique time series, which can be scanned during every query. "+
|
|
"This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). See also -search.max* command-line flags at vmselect")
|
|
maxTagKeys = flag.Int("search.maxTagKeys", 100e3, "The maximum number of tag keys returned per search. "+
|
|
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
|
maxTagValues = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search. "+
|
|
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration")
|
|
maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
|
|
maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent vmselect requests "+
|
|
"the vmstorage can process at -vmselectAddr. It shouldn't be high, since a single request usually saturates a CPU core, and many concurrently executed requests "+
|
|
"may require high amounts of memory. See also -search.maxQueueDuration")
|
|
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the incoming vmselect request waits for execution "+
|
|
"when -search.maxConcurrentRequests limit is reached")
|
|
|
|
disableRPCCompression = flag.Bool("rpc.disableCompression", false, "Whether to disable compression of the data sent from vmstorage to vmselect. "+
|
|
"This reduces CPU usage at the cost of higher network bandwidth usage")
|
|
denyQueriesOutsideRetention = flag.Bool("denyQueriesOutsideRetention", false, "Whether to deny queries outside of the configured -retentionPeriod. "+
|
|
"When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. "+
|
|
"This may be useful when multiple data sources with distinct retentions are hidden behind query-tee")
|
|
)
|
|
|
|
var (
|
|
maxUniqueTimeseriesValue int
|
|
maxUniqueTimeseriesValueOnce sync.Once
|
|
)
|
|
|
|
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from the given s.
|
|
func NewVMSelectServer(addr string, s *storage.Storage) (*vmselectapi.Server, error) {
|
|
api := &vmstorageAPI{
|
|
s: s,
|
|
}
|
|
limits := vmselectapi.Limits{
|
|
MaxLabelNames: *maxTagKeys,
|
|
MaxLabelValues: *maxTagValues,
|
|
MaxTagValueSuffixes: *maxTagValueSuffixesPerSearch,
|
|
MaxConcurrentRequests: *maxConcurrentRequests,
|
|
MaxConcurrentRequestsFlagName: "search.maxConcurrentRequests",
|
|
MaxQueueDuration: *maxQueueDuration,
|
|
MaxQueueDurationFlagName: "search.maxQueueDuration",
|
|
}
|
|
return vmselectapi.NewServer(addr, api, limits, *disableRPCCompression)
|
|
}
|
|
|
|
// vmstorageAPI impelements vmselectapi.API
|
|
type vmstorageAPI struct {
|
|
s *storage.Storage
|
|
}
|
|
|
|
func (api *vmstorageAPI) InitSearch(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (vmselectapi.BlockIterator, error) {
|
|
tr := sq.GetTimeRange()
|
|
if err := checkTimeRange(api.s, tr); err != nil {
|
|
return nil, err
|
|
}
|
|
maxMetrics := getMaxMetrics(sq)
|
|
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(tfss) == 0 {
|
|
return nil, fmt.Errorf("missing tag filters")
|
|
}
|
|
bi := getBlockIterator()
|
|
bi.sr.Init(qt, api.s, tfss, tr, maxMetrics, deadline)
|
|
if err := bi.sr.Error(); err != nil {
|
|
bi.MustClose()
|
|
return nil, err
|
|
}
|
|
return bi, nil
|
|
}
|
|
|
|
func (api *vmstorageAPI) SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) ([]string, error) {
|
|
tr := sq.GetTimeRange()
|
|
maxMetrics := getMaxMetrics(sq)
|
|
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(tfss) == 0 {
|
|
return nil, fmt.Errorf("missing tag filters")
|
|
}
|
|
return api.s.SearchMetricNames(qt, tfss, tr, maxMetrics, deadline)
|
|
}
|
|
|
|
func (api *vmstorageAPI) LabelValues(qt *querytracer.Tracer, sq *storage.SearchQuery, labelName string, maxLabelValues int, deadline uint64) ([]string, error) {
|
|
tr := sq.GetTimeRange()
|
|
maxMetrics := getMaxMetrics(sq)
|
|
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return api.s.SearchLabelValuesWithFiltersOnTimeRange(qt, sq.AccountID, sq.ProjectID, labelName, tfss, tr, maxLabelValues, maxMetrics, deadline)
|
|
}
|
|
|
|
func (api *vmstorageAPI) TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte,
|
|
maxSuffixes int, deadline uint64) ([]string, error) {
|
|
suffixes, err := api.s.SearchTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(suffixes) >= maxSuffixes {
|
|
return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; "+
|
|
"either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value", maxSuffixes)
|
|
}
|
|
return suffixes, nil
|
|
}
|
|
|
|
func (api *vmstorageAPI) LabelNames(qt *querytracer.Tracer, sq *storage.SearchQuery, maxLabelNames int, deadline uint64) ([]string, error) {
|
|
tr := sq.GetTimeRange()
|
|
maxMetrics := getMaxMetrics(sq)
|
|
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return api.s.SearchLabelNamesWithFiltersOnTimeRange(qt, sq.AccountID, sq.ProjectID, tfss, tr, maxLabelNames, maxMetrics, deadline)
|
|
}
|
|
|
|
func (api *vmstorageAPI) SeriesCount(_ *querytracer.Tracer, accountID, projectID uint32, deadline uint64) (uint64, error) {
|
|
return api.s.GetSeriesCount(accountID, projectID, deadline)
|
|
}
|
|
|
|
func (api *vmstorageAPI) Tenants(qt *querytracer.Tracer, tr storage.TimeRange, deadline uint64) ([]string, error) {
|
|
return api.s.SearchTenants(qt, tr, deadline)
|
|
}
|
|
|
|
func (api *vmstorageAPI) TSDBStatus(qt *querytracer.Tracer, sq *storage.SearchQuery, focusLabel string, topN int, deadline uint64) (*storage.TSDBStatus, error) {
|
|
tr := sq.GetTimeRange()
|
|
maxMetrics := getMaxMetrics(sq)
|
|
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
date := uint64(sq.MinTimestamp) / (24 * 3600 * 1000)
|
|
return api.s.GetTSDBStatus(qt, sq.AccountID, sq.ProjectID, tfss, date, focusLabel, topN, maxMetrics, deadline)
|
|
}
|
|
|
|
func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline uint64) (int, error) {
|
|
tr := sq.GetTimeRange()
|
|
maxMetrics := getMaxMetrics(sq)
|
|
tfss, err := api.setupTfss(qt, sq, tr, maxMetrics, deadline)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if len(tfss) == 0 {
|
|
return 0, fmt.Errorf("missing tag filters")
|
|
}
|
|
return api.s.DeleteSeries(qt, tfss, maxMetrics)
|
|
}
|
|
|
|
func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, _ uint64) error {
|
|
api.s.RegisterMetricNames(qt, mrs)
|
|
return nil
|
|
}
|
|
|
|
func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) {
|
|
tfss := make([]*storage.TagFilters, 0, len(sq.TagFilterss))
|
|
accountID := sq.AccountID
|
|
projectID := sq.ProjectID
|
|
for _, tagFilters := range sq.TagFilterss {
|
|
tfs := storage.NewTagFilters(accountID, projectID)
|
|
for i := range tagFilters {
|
|
tf := &tagFilters[i]
|
|
if string(tf.Key) == "__graphite__" {
|
|
query := tf.Value
|
|
qtChild := qt.NewChild("searching for series matching __graphite__=%q", query)
|
|
paths, err := api.s.SearchGraphitePaths(qtChild, accountID, projectID, tr, query, maxMetrics, deadline)
|
|
qtChild.Donef("found %d series", len(paths))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
|
|
}
|
|
if len(paths) >= maxMetrics {
|
|
return nil, fmt.Errorf("more than %d time series match Graphite query %q; "+
|
|
"either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes; "+
|
|
"see https://docs.victoriametrics.com/#resource-usage-limits", maxMetrics, query)
|
|
}
|
|
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
|
|
continue
|
|
}
|
|
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
|
|
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
|
|
}
|
|
}
|
|
tfss = append(tfss, tfs)
|
|
}
|
|
return tfss, nil
|
|
}
|
|
|
|
// blockIterator implements vmselectapi.BlockIterator
|
|
type blockIterator struct {
|
|
sr storage.Search
|
|
}
|
|
|
|
var blockIteratorsPool sync.Pool
|
|
|
|
func (bi *blockIterator) MustClose() {
|
|
bi.sr.MustClose()
|
|
blockIteratorsPool.Put(bi)
|
|
}
|
|
|
|
func getBlockIterator() *blockIterator {
|
|
v := blockIteratorsPool.Get()
|
|
if v == nil {
|
|
v = &blockIterator{}
|
|
}
|
|
return v.(*blockIterator)
|
|
}
|
|
|
|
func (bi *blockIterator) NextBlock(mb *storage.MetricBlock) bool {
|
|
if !bi.sr.NextMetricBlock() {
|
|
return false
|
|
}
|
|
mb.MetricName = append(mb.MetricName[:0], bi.sr.MetricBlockRef.MetricName...)
|
|
bi.sr.MetricBlockRef.BlockRef.MustReadBlock(&mb.Block)
|
|
return true
|
|
}
|
|
|
|
func (bi *blockIterator) Error() error {
|
|
return bi.sr.Error()
|
|
}
|
|
|
|
// checkTimeRange returns true if the given tr is denied for querying.
|
|
func checkTimeRange(s *storage.Storage, tr storage.TimeRange) error {
|
|
if !*denyQueriesOutsideRetention {
|
|
return nil
|
|
}
|
|
retentionMsecs := s.RetentionMsecs()
|
|
minAllowedTimestamp := int64(fasttime.UnixTimestamp()*1000) - retentionMsecs
|
|
if tr.MinTimestamp > minAllowedTimestamp {
|
|
return nil
|
|
}
|
|
return &httpserver.ErrorWithStatusCode{
|
|
Err: fmt.Errorf("the given time range %s is outside the allowed retention %.3f days according to -denyQueriesOutsideRetention",
|
|
&tr, float64(retentionMsecs)/(24*3600*1000)),
|
|
StatusCode: http.StatusServiceUnavailable,
|
|
}
|
|
}
|
|
|
|
func getMaxMetrics(sq *storage.SearchQuery) int {
|
|
maxMetrics := sq.MaxMetrics
|
|
maxMetricsLimit := *maxUniqueTimeseries
|
|
if maxMetricsLimit <= 0 {
|
|
maxMetricsLimit = GetMaxUniqueTimeSeries()
|
|
}
|
|
if maxMetrics <= 0 || maxMetrics > maxMetricsLimit {
|
|
maxMetrics = maxMetricsLimit
|
|
}
|
|
return maxMetrics
|
|
}
|
|
|
|
// GetMaxUniqueTimeSeries returns the max metrics limit calculated by available resources.
|
|
// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing.
|
|
func GetMaxUniqueTimeSeries() int {
|
|
maxUniqueTimeseriesValueOnce.Do(func() {
|
|
maxUniqueTimeseriesValue = *maxUniqueTimeseries
|
|
if maxUniqueTimeseriesValue <= 0 {
|
|
maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource(*maxConcurrentRequests, memory.Remaining())
|
|
}
|
|
})
|
|
return maxUniqueTimeseriesValue
|
|
}
|
|
|
|
// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources.
|
|
func calculateMaxUniqueTimeSeriesForResource(maxConcurrentRequests, remainingMemory int) int {
|
|
if maxConcurrentRequests <= 0 {
|
|
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
|
|
// In such cases, fallback to unlimited.
|
|
logger.Warnf("limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d.", 2e9, maxConcurrentRequests)
|
|
return 2e9
|
|
}
|
|
|
|
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
|
|
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
|
|
mts := remainingMemory / 200 / maxConcurrentRequests
|
|
logger.Infof("limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process.", mts, maxConcurrentRequests, remainingMemory)
|
|
return mts
|
|
}
|