From ea66212c9350c7b1e67b7cf27085b515f17ba0d3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 10 Feb 2020 13:03:52 +0200 Subject: [PATCH] lib/storage: move `-dedup.minScrapeInterval` flag outside lib/storage, so it doesnt show up in `vminsert` in cluster version --- app/vmselect/main.go | 9 +++++++-- app/vmstorage/main.go | 4 ++++ lib/storage/dedup.go | 19 +++++++++++++------ 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 701a2f4225..f6e34c17ee 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -18,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" ) @@ -27,8 +28,11 @@ var ( cacheDataPath = flag.String("cacheDataPath", "", "Path to directory for cache files. Cache isn't saved if empty") maxConcurrentRequests = flag.Int("search.maxConcurrentRequests", getDefaultMaxConcurrentRequests(), "The maximum number of concurrent search requests. "+ "It shouldn't be high, since a single request can saturate all the CPU cores. See also -search.maxQueueDuration") - maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests limit is reached") - storageNodes = flagutil.NewArray("storageNode", "Addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1:8401 -storageNode=vmstorage-host2:8401") + maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the request waits for execution when -search.maxConcurrentRequests limit is reached") + minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+ + "This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+ + "Deduplication is disabled if the -dedup.minScrapeInterval is 0") + storageNodes = flagutil.NewArray("storageNode", "Addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1:8401 -storageNode=vmstorage-host2:8401") ) func getDefaultMaxConcurrentRequests() int { @@ -52,6 +56,7 @@ func main() { logger.Infof("starting netstorage at storageNodes %s", *storageNodes) startTime := time.Now() + storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval) if len(*storageNodes) == 0 { logger.Fatalf("missing -storageNode arg") } diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 938d57c018..83f5e3f348 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -28,6 +28,9 @@ var ( bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0") smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0") + minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+ + "This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+ + "Deduplication is disabled if the -dedup.minScrapeInterval is 0") ) func main() { @@ -35,6 +38,7 @@ func main() { buildinfo.Init() logger.Init() + storage.SetMinScrapeIntervalForDeduplication(*minScrapeInterval) storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) diff --git a/lib/storage/dedup.go b/lib/storage/dedup.go index 1d0765f072..3d4dc3c74f 100644 --- a/lib/storage/dedup.go +++ b/lib/storage/dedup.go @@ -1,14 +1,21 @@ package storage import ( - "flag" + "time" "github.com/VictoriaMetrics/metrics" ) -var minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+ - "This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+ - "Deduplication is disabled if the -dedup.minScrapeInterval is 0") +// SetMinScrapeIntervalForDeduplication sets the minimum interval for data points during de-duplication. +// +// De-duplication is disabled if interval is 0. +// +// This function must be called before initializing the storage. +func SetMinScrapeIntervalForDeduplication(interval time.Duration) { + minScrapeInterval = interval +} + +var minScrapeInterval = time.Duration(0) func getMinDelta() int64 { // Divide minScrapeInterval by 2 in order to preserve proper data points. @@ -23,7 +30,7 @@ func getMinDelta() int64 { // DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval. func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) { - if *minScrapeInterval <= 0 { + if minScrapeInterval <= 0 { return srcTimestamps, srcValues } minDelta := getMinDelta() @@ -54,7 +61,7 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, [] var dedupsDuringSelect = metrics.NewCounter(`deduplicated_samples_total{type="select"}`) func deduplicateSamplesDuringMerge(srcTimestamps []int64, srcValues []int64) ([]int64, []int64) { - if *minScrapeInterval <= 0 { + if minScrapeInterval <= 0 { return srcTimestamps, srcValues } if len(srcTimestamps) < 32 {