2022-06-28 13:04:14 +02:00
package servers
2019-05-22 23:23:23 +02:00
import (
"flag"
"fmt"
2020-06-30 23:58:26 +02:00
"net/http"
2019-05-22 23:23:23 +02:00
"sync"
2023-01-07 03:38:02 +01:00
"time"
2019-05-22 23:23:23 +02:00
2023-01-07 03:38:02 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-05-15 14:42:30 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-06-30 23:58:26 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2024-10-18 13:41:43 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2022-06-01 01:31:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2022-06-27 13:20:39 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
2019-05-22 23:23:23 +02:00
)
var (
2024-10-18 13:41:43 +02:00
maxUniqueTimeseries = flag . Int ( "search.maxUniqueTimeseries" , 0 , "The maximum number of unique time series, which can be scanned during every query. " +
"This allows protecting against heavy queries, which select unexpectedly high number of series. When set to zero, the limit is automatically calculated based on -search.maxConcurrentRequests (inversely proportional) and memory available to the process (proportional). See also -search.max* command-line flags at vmselect" )
maxTagKeys = flag . Int ( "search.maxTagKeys" , 100e3 , "The maximum number of tag keys returned per search. " +
2024-02-23 01:44:07 +01:00
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration" )
maxTagValues = flag . Int ( "search.maxTagValues" , 100e3 , "The maximum number of tag values returned per search. " +
"See also -search.maxLabelsAPISeries and -search.maxLabelsAPIDuration" )
2020-09-10 23:29:26 +02:00
maxTagValueSuffixesPerSearch = flag . Int ( "search.maxTagValueSuffixesPerSearch" , 100e3 , "The maximum number of tag value suffixes returned from /metrics/find" )
2023-01-07 03:38:02 +01:00
maxConcurrentRequests = flag . Int ( "search.maxConcurrentRequests" , 2 * cgroup . AvailableCPUs ( ) , "The maximum number of concurrent vmselect requests " +
"the vmstorage can process at -vmselectAddr. It shouldn't be high, since a single request usually saturates a CPU core, and many concurrently executed requests " +
"may require high amounts of memory. See also -search.maxQueueDuration" )
maxQueueDuration = flag . Duration ( "search.maxQueueDuration" , 10 * time . Second , "The maximum time the incoming vmselect request waits for execution " +
"when -search.maxConcurrentRequests limit is reached" )
2019-05-22 23:23:23 +02:00
2024-01-24 12:32:13 +01:00
disableRPCCompression = flag . Bool ( "rpc.disableCompression" , false , "Whether to disable compression of the data sent from vmstorage to vmselect. " +
2022-06-23 18:19:36 +02:00
"This reduces CPU usage at the cost of higher network bandwidth usage" )
2020-06-30 23:58:26 +02:00
denyQueriesOutsideRetention = flag . Bool ( "denyQueriesOutsideRetention" , false , "Whether to deny queries outside of the configured -retentionPeriod. " +
"When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. " +
"This may be useful when multiple data sources with distinct retentions are hidden behind query-tee" )
2019-05-22 23:23:23 +02:00
)
2024-10-18 13:41:43 +02:00
var (
maxUniqueTimeseriesValue int
maxUniqueTimeseriesValueOnce sync . Once
)
2022-06-27 13:20:39 +02:00
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from the given s.
func NewVMSelectServer ( addr string , s * storage . Storage ) ( * vmselectapi . Server , error ) {
api := & vmstorageAPI {
s : s ,
2020-11-16 09:55:55 +01:00
}
2022-06-27 13:20:39 +02:00
limits := vmselectapi . Limits {
2023-01-07 03:38:02 +01:00
MaxLabelNames : * maxTagKeys ,
MaxLabelValues : * maxTagValues ,
MaxTagValueSuffixes : * maxTagValueSuffixesPerSearch ,
MaxConcurrentRequests : * maxConcurrentRequests ,
MaxConcurrentRequestsFlagName : "search.maxConcurrentRequests" ,
MaxQueueDuration : * maxQueueDuration ,
MaxQueueDurationFlagName : "search.maxQueueDuration" ,
2020-11-16 09:55:55 +01:00
}
2022-06-27 13:20:39 +02:00
return vmselectapi . NewServer ( addr , api , limits , * disableRPCCompression )
2020-11-16 09:55:55 +01:00
}
2022-07-05 23:53:03 +02:00
// vmstorageAPI impelements vmselectapi.API
2022-06-27 13:20:39 +02:00
type vmstorageAPI struct {
s * storage . Storage
2019-05-22 23:23:23 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) InitSearch ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline uint64 ) ( vmselectapi . BlockIterator , error ) {
tr := sq . GetTimeRange ( )
2022-06-27 13:20:39 +02:00
if err := checkTimeRange ( api . s , tr ) ; err != nil {
return nil , err
2019-08-04 21:15:33 +02:00
}
2022-07-05 23:53:03 +02:00
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
if len ( tfss ) == 0 {
return nil , fmt . Errorf ( "missing tag filters" )
}
2022-06-27 13:20:39 +02:00
bi := getBlockIterator ( )
bi . sr . Init ( qt , api . s , tfss , tr , maxMetrics , deadline )
if err := bi . sr . Error ( ) ; err != nil {
bi . MustClose ( )
return nil , err
2020-09-10 23:29:26 +02:00
}
2022-06-27 13:20:39 +02:00
return bi , nil
2020-09-10 23:29:26 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) SearchMetricNames ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline uint64 ) ( [ ] string , error ) {
tr := sq . GetTimeRange ( )
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
if len ( tfss ) == 0 {
return nil , fmt . Errorf ( "missing tag filters" )
}
2022-06-27 13:20:39 +02:00
return api . s . SearchMetricNames ( qt , tfss , tr , maxMetrics , deadline )
2019-05-22 23:23:23 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) LabelValues ( qt * querytracer . Tracer , sq * storage . SearchQuery , labelName string , maxLabelValues int , deadline uint64 ) ( [ ] string , error ) {
tr := sq . GetTimeRange ( )
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
return api . s . SearchLabelValuesWithFiltersOnTimeRange ( qt , sq . AccountID , sq . ProjectID , labelName , tfss , tr , maxLabelValues , maxMetrics , deadline )
2020-02-13 16:32:54 +01:00
}
2022-07-05 22:47:46 +02:00
func ( api * vmstorageAPI ) TagValueSuffixes ( qt * querytracer . Tracer , accountID , projectID uint32 , tr storage . TimeRange , tagKey , tagValuePrefix string , delimiter byte ,
2022-06-27 13:20:39 +02:00
maxSuffixes int , deadline uint64 ) ( [ ] string , error ) {
2022-07-05 23:53:03 +02:00
suffixes , err := api . s . SearchTagValueSuffixes ( qt , accountID , projectID , tr , tagKey , tagValuePrefix , delimiter , maxSuffixes , deadline )
if err != nil {
return nil , err
}
if len ( suffixes ) >= maxSuffixes {
return nil , fmt . Errorf ( "more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; " +
"either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value" , maxSuffixes )
}
return suffixes , nil
2019-05-22 23:23:23 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) LabelNames ( qt * querytracer . Tracer , sq * storage . SearchQuery , maxLabelNames int , deadline uint64 ) ( [ ] string , error ) {
tr := sq . GetTimeRange ( )
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
return api . s . SearchLabelNamesWithFiltersOnTimeRange ( qt , sq . AccountID , sq . ProjectID , tfss , tr , maxLabelNames , maxMetrics , deadline )
2019-05-22 23:23:23 +02:00
}
2023-09-01 09:34:16 +02:00
func ( api * vmstorageAPI ) SeriesCount ( _ * querytracer . Tracer , accountID , projectID uint32 , deadline uint64 ) ( uint64 , error ) {
2022-06-27 13:20:39 +02:00
return api . s . GetSeriesCount ( accountID , projectID , deadline )
2019-05-22 23:23:23 +02:00
}
2022-11-25 19:32:45 +01:00
func ( api * vmstorageAPI ) Tenants ( qt * querytracer . Tracer , tr storage . TimeRange , deadline uint64 ) ( [ ] string , error ) {
return api . s . SearchTenants ( qt , tr , deadline )
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) TSDBStatus ( qt * querytracer . Tracer , sq * storage . SearchQuery , focusLabel string , topN int , deadline uint64 ) ( * storage . TSDBStatus , error ) {
tr := sq . GetTimeRange ( )
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
date := uint64 ( sq . MinTimestamp ) / ( 24 * 3600 * 1000 )
return api . s . GetTSDBStatus ( qt , sq . AccountID , sq . ProjectID , tfss , date , focusLabel , topN , maxMetrics , deadline )
2019-05-22 23:23:23 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) DeleteSeries ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline uint64 ) ( int , error ) {
tr := sq . GetTimeRange ( )
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return 0 , err
}
if len ( tfss ) == 0 {
return 0 , fmt . Errorf ( "missing tag filters" )
}
2024-09-30 12:43:11 +02:00
return api . s . DeleteSeries ( qt , tfss , maxMetrics )
2019-05-22 23:23:23 +02:00
}
2023-09-01 09:34:16 +02:00
func ( api * vmstorageAPI ) RegisterMetricNames ( qt * querytracer . Tracer , mrs [ ] storage . MetricRow , _ uint64 ) error {
lib/storage: switch from global to per-day index for `MetricName -> TSID` mapping
Previously all the newly ingested time series were registered in global `MetricName -> TSID` index.
This index was used during data ingestion for locating the TSID (internal series id)
for the given canonical metric name (the canonical metric name consists of metric name plus all its labels sorted by label names).
The `MetricName -> TSID` index is stored on disk in order to make sure that the data
isn't lost on VictoriaMetrics restart or unclean shutdown.
The lookup in this index is relatively slow, since VictoriaMetrics needs to read the corresponding
data block from disk, unpack it, put the unpacked block into `indexdb/dataBlocks` cache,
and then search for the given `MetricName -> TSID` entry there. So VictoriaMetrics
uses in-memory cache for speeding up the lookup for active time series.
This cache is named `storage/tsid`. If this cache capacity is enough for all the currently ingested
active time series, then VictoriaMetrics works fast, since it doesn't need to read the data from disk.
VictoriaMetrics starts reading data from `MetricName -> TSID` on-disk index in the following cases:
- If `storage/tsid` cache capacity isn't enough for active time series.
Then just increase available memory for VictoriaMetrics or reduce the number of active time series
ingested into VictoriaMetrics.
- If new time series is ingested into VictoriaMetrics. In this case it cannot find
the needed entry in the `storage/tsid` cache, so it needs to consult on-disk `MetricName -> TSID` index,
since it doesn't know that the index has no the corresponding entry too.
This is a typical event under high churn rate, when old time series are constantly substituted
with new time series.
Reading the data from `MetricName -> TSID` index is slow, so inserts, which lead to reading this index,
are counted as slow inserts, and they can be monitored via `vm_slow_row_inserts_total` metric exposed by VictoriaMetrics.
Prior to this commit the `MetricName -> TSID` index was global, e.g. it contained entries sorted by `MetricName`
for all the time series ever ingested into VictoriaMetrics during the configured -retentionPeriod.
This index can become very large under high churn rate and long retention. VictoriaMetrics
caches data from this index in `indexdb/dataBlocks` in-memory cache for speeding up index lookups.
The `indexdb/dataBlocks` cache may occupy significant share of available memory for storing
recently accessed blocks at `MetricName -> TSID` index when searching for newly ingested time series.
This commit switches from global `MetricName -> TSID` index to per-day index. This allows significantly
reducing the amounts of data, which needs to be cached in `indexdb/dataBlocks`, since now VictoriaMetrics
consults only the index for the current day when new time series is ingested into it.
The downside of this change is increased indexdb size on disk for workloads without high churn rate,
e.g. with static time series, which do no change over time, since now VictoriaMetrics needs to store
identical `MetricName -> TSID` entries for static time series for every day.
This change removes an optimization for reducing CPU and disk IO spikes at indexdb rotation,
since it didn't work correctly - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 .
At the same time the change fixes the issue, which could result in lost access to time series,
which stop receving new samples during the first hour after indexdb rotation - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698
The issue with the increased CPU and disk IO usage during indexdb rotation will be addressed
in a separate commit according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401#issuecomment-1553488685
This is a follow-up for 1f28b46ae9350795af41cbfc3ca0e8a5af084fce
2023-07-14 00:33:41 +02:00
api . s . RegisterMetricNames ( qt , mrs )
return nil
2019-06-10 17:55:20 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) setupTfss ( qt * querytracer . Tracer , sq * storage . SearchQuery , tr storage . TimeRange , maxMetrics int , deadline uint64 ) ( [ ] * storage . TagFilters , error ) {
tfss := make ( [ ] * storage . TagFilters , 0 , len ( sq . TagFilterss ) )
accountID := sq . AccountID
projectID := sq . ProjectID
for _ , tagFilters := range sq . TagFilterss {
tfs := storage . NewTagFilters ( accountID , projectID )
for i := range tagFilters {
tf := & tagFilters [ i ]
if string ( tf . Key ) == "__graphite__" {
query := tf . Value
qtChild := qt . NewChild ( "searching for series matching __graphite__=%q" , query )
paths , err := api . s . SearchGraphitePaths ( qtChild , accountID , projectID , tr , query , maxMetrics , deadline )
qtChild . Donef ( "found %d series" , len ( paths ) )
if err != nil {
return nil , fmt . Errorf ( "error when searching for Graphite paths for query %q: %w" , query , err )
}
if len ( paths ) >= maxMetrics {
return nil , fmt . Errorf ( "more than %d time series match Graphite query %q; " +
2023-08-14 10:57:31 +02:00
"either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes; " +
"see https://docs.victoriametrics.com/#resource-usage-limits" , maxMetrics , query )
2022-07-05 23:53:03 +02:00
}
tfs . AddGraphiteQuery ( query , paths , tf . IsNegative )
continue
}
if err := tfs . Add ( tf . Key , tf . Value , tf . IsNegative , tf . IsRegexp ) ; err != nil {
return nil , fmt . Errorf ( "cannot parse tag filter %s: %w" , tf , err )
}
}
tfss = append ( tfss , tfs )
}
return tfss , nil
2020-09-10 23:29:26 +02:00
}
2022-06-27 13:20:39 +02:00
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
sr storage . Search
2019-05-22 23:23:23 +02:00
}
2022-06-27 13:20:39 +02:00
var blockIteratorsPool sync . Pool
2020-04-22 18:57:36 +02:00
2022-06-27 13:20:39 +02:00
func ( bi * blockIterator ) MustClose ( ) {
bi . sr . MustClose ( )
blockIteratorsPool . Put ( bi )
2022-06-08 18:25:59 +02:00
}
2022-06-27 13:20:39 +02:00
func getBlockIterator ( ) * blockIterator {
v := blockIteratorsPool . Get ( )
if v == nil {
v = & blockIterator { }
2022-06-16 09:44:29 +02:00
}
2022-06-27 13:20:39 +02:00
return v . ( * blockIterator )
2020-04-22 18:57:36 +02:00
}
2022-06-28 11:55:20 +02:00
func ( bi * blockIterator ) NextBlock ( mb * storage . MetricBlock ) bool {
2022-06-27 13:20:39 +02:00
if ! bi . sr . NextMetricBlock ( ) {
return false
2020-04-22 18:57:36 +02:00
}
2022-10-07 01:56:49 +02:00
mb . MetricName = append ( mb . MetricName [ : 0 ] , bi . sr . MetricBlockRef . MetricName ... )
2022-06-28 11:55:20 +02:00
bi . sr . MetricBlockRef . BlockRef . MustReadBlock ( & mb . Block )
2022-06-27 13:20:39 +02:00
return true
2020-04-22 18:57:36 +02:00
}
2022-06-27 13:20:39 +02:00
func ( bi * blockIterator ) Error ( ) error {
return bi . sr . Error ( )
2020-11-16 09:55:55 +01:00
}
2020-06-30 23:58:26 +02:00
// checkTimeRange returns true if the given tr is denied for querying.
func checkTimeRange ( s * storage . Storage , tr storage . TimeRange ) error {
if ! * denyQueriesOutsideRetention {
return nil
}
2020-10-20 15:10:46 +02:00
retentionMsecs := s . RetentionMsecs ( )
minAllowedTimestamp := int64 ( fasttime . UnixTimestamp ( ) * 1000 ) - retentionMsecs
2020-06-30 23:58:26 +02:00
if tr . MinTimestamp > minAllowedTimestamp {
return nil
}
return & httpserver . ErrorWithStatusCode {
2020-10-20 15:10:46 +02:00
Err : fmt . Errorf ( "the given time range %s is outside the allowed retention %.3f days according to -denyQueriesOutsideRetention" ,
& tr , float64 ( retentionMsecs ) / ( 24 * 3600 * 1000 ) ) ,
2020-06-30 23:58:26 +02:00
StatusCode : http . StatusServiceUnavailable ,
}
}
2022-07-05 23:53:03 +02:00
func getMaxMetrics ( sq * storage . SearchQuery ) int {
maxMetrics := sq . MaxMetrics
maxMetricsLimit := * maxUniqueTimeseries
if maxMetricsLimit <= 0 {
2024-10-18 13:41:43 +02:00
maxMetricsLimit = GetMaxUniqueTimeSeries ( )
2022-07-05 23:53:03 +02:00
}
if maxMetrics <= 0 || maxMetrics > maxMetricsLimit {
maxMetrics = maxMetricsLimit
}
return maxMetrics
}
2024-10-18 13:41:43 +02:00
// GetMaxUniqueTimeSeries returns the max metrics limit calculated by available resources.
// The calculation is split into calculateMaxUniqueTimeSeriesForResource for unit testing.
func GetMaxUniqueTimeSeries ( ) int {
maxUniqueTimeseriesValueOnce . Do ( func ( ) {
maxUniqueTimeseriesValue = * maxUniqueTimeseries
if maxUniqueTimeseriesValue <= 0 {
maxUniqueTimeseriesValue = calculateMaxUniqueTimeSeriesForResource ( * maxConcurrentRequests , memory . Remaining ( ) )
}
} )
return maxUniqueTimeseriesValue
}
// calculateMaxUniqueTimeSeriesForResource calculate the max metrics limit calculated by available resources.
func calculateMaxUniqueTimeSeriesForResource ( maxConcurrentRequests , remainingMemory int ) int {
if maxConcurrentRequests <= 0 {
// This line should NOT be reached unless the user has set an incorrect `search.maxConcurrentRequests`.
// In such cases, fallback to unlimited.
logger . Warnf ( "limiting -search.maxUniqueTimeseries to %v because -search.maxConcurrentRequests=%d." , 2e9 , maxConcurrentRequests )
return 2e9
}
// Calculate the max metrics limit for a single request in the worst-case concurrent scenario.
// The approximate size of 1 unique series that could occupy in the vmstorage is 200 bytes.
mts := remainingMemory / 200 / maxConcurrentRequests
logger . Infof ( "limiting -search.maxUniqueTimeseries to %d according to -search.maxConcurrentRequests=%d and remaining memory=%d bytes. To increase the limit, reduce -search.maxConcurrentRequests or increase memory available to the process." , mts , maxConcurrentRequests , remainingMemory )
return mts
}