2022-06-28 13:04:14 +02:00
package servers
2019-05-22 23:23:23 +02:00
import (
"flag"
"fmt"
2020-06-30 23:58:26 +02:00
"net/http"
2019-05-22 23:23:23 +02:00
"sync"
2023-01-07 03:38:02 +01:00
"time"
2019-05-22 23:23:23 +02:00
2023-01-07 03:38:02 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-05-15 14:42:30 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-06-30 23:58:26 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2022-06-01 01:31:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2022-06-27 13:20:39 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/vmselectapi"
2019-05-22 23:23:23 +02:00
)
var (
2022-06-27 13:20:39 +02:00
maxUniqueTimeseries = flag . Int ( "search.maxUniqueTimeseries" , 0 , "The maximum number of unique time series, which can be scanned during every query. This allows protecting against heavy queries, which select unexpectedly high number of series. Zero means 'no limit'. See also -search.max* command-line flags at vmselect" )
maxTagKeys = flag . Int ( "search.maxTagKeys" , 100e3 , "The maximum number of tag keys returned per search" )
maxTagValues = flag . Int ( "search.maxTagValues" , 100e3 , "The maximum number of tag values returned per search" )
2020-09-10 23:29:26 +02:00
maxTagValueSuffixesPerSearch = flag . Int ( "search.maxTagValueSuffixesPerSearch" , 100e3 , "The maximum number of tag value suffixes returned from /metrics/find" )
2023-01-07 03:38:02 +01:00
maxConcurrentRequests = flag . Int ( "search.maxConcurrentRequests" , 2 * cgroup . AvailableCPUs ( ) , "The maximum number of concurrent vmselect requests " +
"the vmstorage can process at -vmselectAddr. It shouldn't be high, since a single request usually saturates a CPU core, and many concurrently executed requests " +
"may require high amounts of memory. See also -search.maxQueueDuration" )
maxQueueDuration = flag . Duration ( "search.maxQueueDuration" , 10 * time . Second , "The maximum time the incoming vmselect request waits for execution " +
"when -search.maxConcurrentRequests limit is reached" )
2019-05-22 23:23:23 +02:00
2022-06-23 18:19:36 +02:00
disableRPCCompression = flag . Bool ( ` rpc.disableCompression ` , false , "Whether to disable compression of the data sent from vmstorage to vmselect. " +
"This reduces CPU usage at the cost of higher network bandwidth usage" )
2020-06-30 23:58:26 +02:00
denyQueriesOutsideRetention = flag . Bool ( "denyQueriesOutsideRetention" , false , "Whether to deny queries outside of the configured -retentionPeriod. " +
"When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. " +
"This may be useful when multiple data sources with distinct retentions are hidden behind query-tee" )
2019-05-22 23:23:23 +02:00
)
2022-06-27 13:20:39 +02:00
// NewVMSelectServer starts new server at the given addr, which serves vmselect requests from the given s.
func NewVMSelectServer ( addr string , s * storage . Storage ) ( * vmselectapi . Server , error ) {
api := & vmstorageAPI {
s : s ,
2020-11-16 09:55:55 +01:00
}
2022-06-27 13:20:39 +02:00
limits := vmselectapi . Limits {
2023-01-07 03:38:02 +01:00
MaxLabelNames : * maxTagKeys ,
MaxLabelValues : * maxTagValues ,
MaxTagValueSuffixes : * maxTagValueSuffixesPerSearch ,
MaxConcurrentRequests : * maxConcurrentRequests ,
MaxConcurrentRequestsFlagName : "search.maxConcurrentRequests" ,
MaxQueueDuration : * maxQueueDuration ,
MaxQueueDurationFlagName : "search.maxQueueDuration" ,
2020-11-16 09:55:55 +01:00
}
2022-06-27 13:20:39 +02:00
return vmselectapi . NewServer ( addr , api , limits , * disableRPCCompression )
2020-11-16 09:55:55 +01:00
}
2022-07-05 23:53:03 +02:00
// vmstorageAPI impelements vmselectapi.API
2022-06-27 13:20:39 +02:00
type vmstorageAPI struct {
s * storage . Storage
2019-05-22 23:23:23 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) InitSearch ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline uint64 ) ( vmselectapi . BlockIterator , error ) {
tr := sq . GetTimeRange ( )
2022-06-27 13:20:39 +02:00
if err := checkTimeRange ( api . s , tr ) ; err != nil {
return nil , err
2019-08-04 21:15:33 +02:00
}
2022-07-05 23:53:03 +02:00
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
if len ( tfss ) == 0 {
return nil , fmt . Errorf ( "missing tag filters" )
}
2022-06-27 13:20:39 +02:00
bi := getBlockIterator ( )
bi . sr . Init ( qt , api . s , tfss , tr , maxMetrics , deadline )
if err := bi . sr . Error ( ) ; err != nil {
bi . MustClose ( )
return nil , err
2020-09-10 23:29:26 +02:00
}
2022-06-27 13:20:39 +02:00
return bi , nil
2020-09-10 23:29:26 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) SearchMetricNames ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline uint64 ) ( [ ] string , error ) {
tr := sq . GetTimeRange ( )
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
if len ( tfss ) == 0 {
return nil , fmt . Errorf ( "missing tag filters" )
}
2022-06-27 13:20:39 +02:00
return api . s . SearchMetricNames ( qt , tfss , tr , maxMetrics , deadline )
2019-05-22 23:23:23 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) LabelValues ( qt * querytracer . Tracer , sq * storage . SearchQuery , labelName string , maxLabelValues int , deadline uint64 ) ( [ ] string , error ) {
tr := sq . GetTimeRange ( )
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
return api . s . SearchLabelValuesWithFiltersOnTimeRange ( qt , sq . AccountID , sq . ProjectID , labelName , tfss , tr , maxLabelValues , maxMetrics , deadline )
2020-02-13 16:32:54 +01:00
}
2022-07-05 22:47:46 +02:00
func ( api * vmstorageAPI ) TagValueSuffixes ( qt * querytracer . Tracer , accountID , projectID uint32 , tr storage . TimeRange , tagKey , tagValuePrefix string , delimiter byte ,
2022-06-27 13:20:39 +02:00
maxSuffixes int , deadline uint64 ) ( [ ] string , error ) {
2022-07-05 23:53:03 +02:00
suffixes , err := api . s . SearchTagValueSuffixes ( qt , accountID , projectID , tr , tagKey , tagValuePrefix , delimiter , maxSuffixes , deadline )
if err != nil {
return nil , err
}
if len ( suffixes ) >= maxSuffixes {
return nil , fmt . Errorf ( "more than -search.maxTagValueSuffixesPerSearch=%d suffixes returned; " +
"either narrow down the search or increase -search.maxTagValueSuffixesPerSearch command-line flag value" , maxSuffixes )
}
return suffixes , nil
2019-05-22 23:23:23 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) LabelNames ( qt * querytracer . Tracer , sq * storage . SearchQuery , maxLabelNames int , deadline uint64 ) ( [ ] string , error ) {
tr := sq . GetTimeRange ( )
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
return api . s . SearchLabelNamesWithFiltersOnTimeRange ( qt , sq . AccountID , sq . ProjectID , tfss , tr , maxLabelNames , maxMetrics , deadline )
2019-05-22 23:23:23 +02:00
}
2022-06-27 13:20:39 +02:00
func ( api * vmstorageAPI ) SeriesCount ( qt * querytracer . Tracer , accountID , projectID uint32 , deadline uint64 ) ( uint64 , error ) {
return api . s . GetSeriesCount ( accountID , projectID , deadline )
2019-05-22 23:23:23 +02:00
}
2022-11-25 19:32:45 +01:00
func ( api * vmstorageAPI ) Tenants ( qt * querytracer . Tracer , tr storage . TimeRange , deadline uint64 ) ( [ ] string , error ) {
return api . s . SearchTenants ( qt , tr , deadline )
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) TSDBStatus ( qt * querytracer . Tracer , sq * storage . SearchQuery , focusLabel string , topN int , deadline uint64 ) ( * storage . TSDBStatus , error ) {
tr := sq . GetTimeRange ( )
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return nil , err
}
date := uint64 ( sq . MinTimestamp ) / ( 24 * 3600 * 1000 )
return api . s . GetTSDBStatus ( qt , sq . AccountID , sq . ProjectID , tfss , date , focusLabel , topN , maxMetrics , deadline )
2019-05-22 23:23:23 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) DeleteSeries ( qt * querytracer . Tracer , sq * storage . SearchQuery , deadline uint64 ) ( int , error ) {
tr := sq . GetTimeRange ( )
maxMetrics := getMaxMetrics ( sq )
tfss , err := api . setupTfss ( qt , sq , tr , maxMetrics , deadline )
if err != nil {
return 0 , err
}
if len ( tfss ) == 0 {
return 0 , fmt . Errorf ( "missing tag filters" )
}
2022-07-05 22:56:31 +02:00
return api . s . DeleteSeries ( qt , tfss )
2019-05-22 23:23:23 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) RegisterMetricNames ( qt * querytracer . Tracer , mrs [ ] storage . MetricRow , deadline uint64 ) error {
2022-06-27 13:20:39 +02:00
return api . s . RegisterMetricNames ( qt , mrs )
2019-06-10 17:55:20 +02:00
}
2022-07-05 23:53:03 +02:00
func ( api * vmstorageAPI ) setupTfss ( qt * querytracer . Tracer , sq * storage . SearchQuery , tr storage . TimeRange , maxMetrics int , deadline uint64 ) ( [ ] * storage . TagFilters , error ) {
tfss := make ( [ ] * storage . TagFilters , 0 , len ( sq . TagFilterss ) )
accountID := sq . AccountID
projectID := sq . ProjectID
for _ , tagFilters := range sq . TagFilterss {
tfs := storage . NewTagFilters ( accountID , projectID )
for i := range tagFilters {
tf := & tagFilters [ i ]
if string ( tf . Key ) == "__graphite__" {
query := tf . Value
qtChild := qt . NewChild ( "searching for series matching __graphite__=%q" , query )
paths , err := api . s . SearchGraphitePaths ( qtChild , accountID , projectID , tr , query , maxMetrics , deadline )
qtChild . Donef ( "found %d series" , len ( paths ) )
if err != nil {
return nil , fmt . Errorf ( "error when searching for Graphite paths for query %q: %w" , query , err )
}
if len ( paths ) >= maxMetrics {
return nil , fmt . Errorf ( "more than %d time series match Graphite query %q; " +
"either narrow down the query or increase the corresponding -search.max* command-line flag value at vmselect nodes" , maxMetrics , query )
}
tfs . AddGraphiteQuery ( query , paths , tf . IsNegative )
continue
}
if err := tfs . Add ( tf . Key , tf . Value , tf . IsNegative , tf . IsRegexp ) ; err != nil {
return nil , fmt . Errorf ( "cannot parse tag filter %s: %w" , tf , err )
}
}
tfss = append ( tfss , tfs )
}
return tfss , nil
2020-09-10 23:29:26 +02:00
}
2022-06-27 13:20:39 +02:00
// blockIterator implements vmselectapi.BlockIterator
type blockIterator struct {
sr storage . Search
2019-05-22 23:23:23 +02:00
}
2022-06-27 13:20:39 +02:00
var blockIteratorsPool sync . Pool
2020-04-22 18:57:36 +02:00
2022-06-27 13:20:39 +02:00
func ( bi * blockIterator ) MustClose ( ) {
bi . sr . MustClose ( )
blockIteratorsPool . Put ( bi )
2022-06-08 18:25:59 +02:00
}
2022-06-27 13:20:39 +02:00
func getBlockIterator ( ) * blockIterator {
v := blockIteratorsPool . Get ( )
if v == nil {
v = & blockIterator { }
2022-06-16 09:44:29 +02:00
}
2022-06-27 13:20:39 +02:00
return v . ( * blockIterator )
2020-04-22 18:57:36 +02:00
}
2022-06-28 11:55:20 +02:00
func ( bi * blockIterator ) NextBlock ( mb * storage . MetricBlock ) bool {
2022-06-27 13:20:39 +02:00
if ! bi . sr . NextMetricBlock ( ) {
return false
2020-04-22 18:57:36 +02:00
}
2022-10-07 01:56:49 +02:00
mb . MetricName = append ( mb . MetricName [ : 0 ] , bi . sr . MetricBlockRef . MetricName ... )
2022-06-28 11:55:20 +02:00
bi . sr . MetricBlockRef . BlockRef . MustReadBlock ( & mb . Block )
2022-06-27 13:20:39 +02:00
return true
2020-04-22 18:57:36 +02:00
}
2022-06-27 13:20:39 +02:00
func ( bi * blockIterator ) Error ( ) error {
return bi . sr . Error ( )
2020-11-16 09:55:55 +01:00
}
2020-06-30 23:58:26 +02:00
// checkTimeRange returns true if the given tr is denied for querying.
func checkTimeRange ( s * storage . Storage , tr storage . TimeRange ) error {
if ! * denyQueriesOutsideRetention {
return nil
}
2020-10-20 15:10:46 +02:00
retentionMsecs := s . RetentionMsecs ( )
minAllowedTimestamp := int64 ( fasttime . UnixTimestamp ( ) * 1000 ) - retentionMsecs
2020-06-30 23:58:26 +02:00
if tr . MinTimestamp > minAllowedTimestamp {
return nil
}
return & httpserver . ErrorWithStatusCode {
2020-10-20 15:10:46 +02:00
Err : fmt . Errorf ( "the given time range %s is outside the allowed retention %.3f days according to -denyQueriesOutsideRetention" ,
& tr , float64 ( retentionMsecs ) / ( 24 * 3600 * 1000 ) ) ,
2020-06-30 23:58:26 +02:00
StatusCode : http . StatusServiceUnavailable ,
}
}
2022-07-05 23:53:03 +02:00
func getMaxMetrics ( sq * storage . SearchQuery ) int {
maxMetrics := sq . MaxMetrics
maxMetricsLimit := * maxUniqueTimeseries
if maxMetricsLimit <= 0 {
maxMetricsLimit = 2e9
}
if maxMetrics <= 0 || maxMetrics > maxMetricsLimit {
maxMetrics = maxMetricsLimit
}
return maxMetrics
}