From 0d86644d65de5f30be3c592da4a88ecd02a7a41b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 2 May 2022 21:35:14 +0300 Subject: [PATCH] lib/storage: leave the last sample per each discrete interval during the deduplicaton This aligns better with staleness logic in Prometheus - https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness --- README.md | 4 +- app/victoria-metrics/main.go | 2 +- docs/CHANGELOG.md | 1 + docs/README.md | 4 +- docs/Single-server-VictoriaMetrics.md | 4 +- lib/storage/dedup.go | 68 ++++++++-------- lib/storage/dedup_test.go | 110 ++++++++------------------ 7 files changed, 72 insertions(+), 121 deletions(-) diff --git a/README.md b/README.md index 5a6171473..3e76dc0e7 100644 --- a/README.md +++ b/README.md @@ -1097,7 +1097,7 @@ with the enabled de-duplication. See [this section](#deduplication) for details. ## Deduplication -VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command-line flag is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2112#issuecomment-1032587618) for more details on how downsampling works. +VictoriaMetrics leaves a single raw sample with the biggest timestamp per each `-dedup.minScrapeInterval` discrete interval if `-dedup.minScrapeInterval` is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would leave a single raw sample with the biggest timestamp per each discrete 60s interval. If multiple raw samples have the same biggest timestamp on the given `-dedup.minScrapeInterval` discrete interval, then an arbitrary sample out of these samples is left. This aligns with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness). The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. It is safe to use deduplication and downsampling simultaneously. @@ -1622,7 +1622,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The maximum size in bytes of a single DataDog POST request to /api/v1/series Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 67108864) -dedup.minScrapeInterval duration - Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling + Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling -deleteAuthKey string authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries -denyQueriesOutsideRetention diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index 6cdd4d957..44e90005d 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -24,7 +24,7 @@ import ( var ( httpListenAddr = flag.String("httpListenAddr", ":8428", "TCP address to listen for http connections") - minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the first sample in every time series per each discrete interval "+ + minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+ "equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling") dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+ "Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag") diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index dd4977dbc..f2dfa1cf7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -33,6 +33,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): show data pocessing speed during data migration. * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `drop_common_labels()` function, which drops common `label="name"` pairs from the passed time series. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#drop_common_labels). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add `tlast_change_over_time(m[d])` function, which returns the timestamp of the last change of `m` on the given lookbehind window `d`. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#tlast_change_over_time). +* FEATURE: leave the last raw sample per each `-dedup.minScrapeInterval` discrete interval when the [deduplication](https://docs.victoriametrics.com/#deduplication) is enabled. This aligns better with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness) comparing to the previous behaviour when the first sample per each `-dedup.minScrapeInterval` was left. * FEATURE: add a handler for `/api/v1/status/buildinfo` endpoint, which is used by Grafana starting from v8.5.0 . See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2515). * BUGFIX: export staleness markers as `null` values from [JSON export API](https://docs.victoriametrics.com/#how-to-export-data-in-json-line-format). Previously they were exported as `NaN` values. This could break the exported JSON parsing, since `NaN` values aren't supported by [JSON specification](https://www.json.org/). diff --git a/docs/README.md b/docs/README.md index 5a6171473..3e76dc0e7 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1097,7 +1097,7 @@ with the enabled de-duplication. See [this section](#deduplication) for details. ## Deduplication -VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command-line flag is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2112#issuecomment-1032587618) for more details on how downsampling works. +VictoriaMetrics leaves a single raw sample with the biggest timestamp per each `-dedup.minScrapeInterval` discrete interval if `-dedup.minScrapeInterval` is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would leave a single raw sample with the biggest timestamp per each discrete 60s interval. If multiple raw samples have the same biggest timestamp on the given `-dedup.minScrapeInterval` discrete interval, then an arbitrary sample out of these samples is left. This aligns with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness). The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. It is safe to use deduplication and downsampling simultaneously. @@ -1622,7 +1622,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The maximum size in bytes of a single DataDog POST request to /api/v1/series Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 67108864) -dedup.minScrapeInterval duration - Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling + Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling -deleteAuthKey string authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries -denyQueriesOutsideRetention diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index f53ce929d..47da9c441 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -1101,7 +1101,7 @@ with the enabled de-duplication. See [this section](#deduplication) for details. ## Deduplication -VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command-line flag is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points on the same time series if they fall within the same discrete 60s bucket. The earliest data point will be kept. In the case of equal timestamps, an arbitrary data point will be kept. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2112#issuecomment-1032587618) for more details on how downsampling works. +VictoriaMetrics leaves a single raw sample with the biggest timestamp per each `-dedup.minScrapeInterval` discrete interval if `-dedup.minScrapeInterval` is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would leave a single raw sample with the biggest timestamp per each discrete 60s interval. If multiple raw samples have the same biggest timestamp on the given `-dedup.minScrapeInterval` discrete interval, then an arbitrary sample out of these samples is left. This aligns with the [staleness rules in Prometheus](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness). The `-dedup.minScrapeInterval=D` is equivalent to `-downsampling.period=0s:D` if [downsampling](#downsampling) is enabled. It is safe to use deduplication and downsampling simultaneously. @@ -1626,7 +1626,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The maximum size in bytes of a single DataDog POST request to /api/v1/series Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 67108864) -dedup.minScrapeInterval duration - Leave only the first sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling + Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling -deleteAuthKey string authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries -denyQueriesOutsideRetention diff --git a/lib/storage/dedup.go b/lib/storage/dedup.go index c92cc6221..05e1d2ec6 100644 --- a/lib/storage/dedup.go +++ b/lib/storage/dedup.go @@ -30,28 +30,24 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterva // Fast path - nothing to deduplicate return srcTimestamps, srcValues } - return deduplicateInternal(srcTimestamps, srcValues, dedupInterval) -} - -func deduplicateInternal(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) { - tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval - dstTimestamps := srcTimestamps[:1] - dstValues := srcValues[:1] - for i := 1; i < len(srcTimestamps); i++ { - ts := srcTimestamps[i] - if ts < tsNext { + tsNext := srcTimestamps[0] + dedupInterval - 1 + tsNext -= tsNext % dedupInterval + dstTimestamps := srcTimestamps[:0] + dstValues := srcValues[:0] + for i, ts := range srcTimestamps[1:] { + if ts <= tsNext { continue } - dstTimestamps = append(dstTimestamps, ts) + dstTimestamps = append(dstTimestamps, srcTimestamps[i]) dstValues = append(dstValues, srcValues[i]) - - // Update tsNext tsNext += dedupInterval - if ts >= tsNext { - // Slow path for updating ts. - tsNext = (ts - ts%dedupInterval) + dedupInterval + if tsNext < ts { + tsNext = ts + dedupInterval - 1 + tsNext -= tsNext % dedupInterval } } + dstTimestamps = append(dstTimestamps, srcTimestamps[len(srcTimestamps)-1]) + dstValues = append(dstValues, srcValues[len(srcValues)-1]) return dstTimestamps, dstValues } @@ -60,43 +56,41 @@ func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64, dedupInterv // Fast path - nothing to deduplicate return srcTimestamps, srcValues } - return deduplicateDuringMergeInternal(srcTimestamps, srcValues, dedupInterval) -} - -func deduplicateDuringMergeInternal(srcTimestamps, srcValues []int64, dedupInterval int64) ([]int64, []int64) { - tsNext := (srcTimestamps[0] - srcTimestamps[0]%dedupInterval) + dedupInterval - dstTimestamps := srcTimestamps[:1] - dstValues := srcValues[:1] - for i := 1; i < len(srcTimestamps); i++ { - ts := srcTimestamps[i] - if ts < tsNext { + tsNext := srcTimestamps[0] + dedupInterval - 1 + tsNext -= tsNext % dedupInterval + dstTimestamps := srcTimestamps[:0] + dstValues := srcValues[:0] + for i, ts := range srcTimestamps[1:] { + if ts <= tsNext { continue } - dstTimestamps = append(dstTimestamps, ts) + dstTimestamps = append(dstTimestamps, srcTimestamps[i]) dstValues = append(dstValues, srcValues[i]) - - // Update tsNext tsNext += dedupInterval - if ts >= tsNext { - // Slow path for updating ts. - tsNext = (ts - ts%dedupInterval) + dedupInterval + if tsNext < ts { + tsNext = ts + dedupInterval - 1 + tsNext -= tsNext % dedupInterval } } + dstTimestamps = append(dstTimestamps, srcTimestamps[len(srcTimestamps)-1]) + dstValues = append(dstValues, srcValues[len(srcValues)-1]) return dstTimestamps, dstValues } func needsDedup(timestamps []int64, dedupInterval int64) bool { - if len(timestamps) == 0 || dedupInterval <= 0 { + if len(timestamps) < 2 || dedupInterval <= 0 { return false } - tsNext := (timestamps[0] - timestamps[0]%dedupInterval) + dedupInterval + tsNext := timestamps[0] + dedupInterval - 1 + tsNext -= tsNext % dedupInterval for _, ts := range timestamps[1:] { - if ts < tsNext { + if ts <= tsNext { return true } tsNext += dedupInterval - if ts >= tsNext { - tsNext = (ts - ts%dedupInterval) + dedupInterval + if tsNext < ts { + tsNext = ts + dedupInterval - 1 + tsNext -= tsNext % dedupInterval } } return false diff --git a/lib/storage/dedup_test.go b/lib/storage/dedup_test.go index b206adaa2..b5ba85ba0 100644 --- a/lib/storage/dedup_test.go +++ b/lib/storage/dedup_test.go @@ -19,19 +19,26 @@ func TestNeedsDedup(t *testing.T) { f(0, []int64{1, 2}, false) f(10, []int64{1}, false) f(10, []int64{1, 2}, true) - f(10, []int64{9, 10}, false) - f(10, []int64{9, 10, 19}, true) + f(10, []int64{9, 11}, false) + f(10, []int64{10, 11}, false) + f(10, []int64{0, 10, 11}, false) + f(10, []int64{9, 10}, true) + f(10, []int64{0, 10, 19}, false) f(10, []int64{9, 19}, false) - f(10, []int64{0, 9, 19}, true) + f(10, []int64{0, 11, 19}, true) + f(10, []int64{0, 11, 20}, true) + f(10, []int64{0, 11, 21}, false) f(10, []int64{0, 19}, false) - f(10, []int64{0, 35, 40}, false) - f(10, []int64{0, 35, 40, 41}, true) + f(10, []int64{0, 30, 40}, false) + f(10, []int64{0, 31, 40}, true) + f(10, []int64{0, 31, 41}, false) + f(10, []int64{0, 31, 49}, false) } func TestDeduplicateSamples(t *testing.T) { // Disable deduplication before exit, since the rest of tests expect disabled dedup. - f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) { + f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64, valuesExpected []float64) { t.Helper() timestampsCopy := make([]int64, len(timestamps)) values := make([]float64, len(timestamps)) @@ -42,30 +49,10 @@ func TestDeduplicateSamples(t *testing.T) { dedupInterval := scrapeInterval.Milliseconds() timestampsCopy, values = DeduplicateSamples(timestampsCopy, values, dedupInterval) if !reflect.DeepEqual(timestampsCopy, timestampsExpected) { - t.Fatalf("invalid DeduplicateSamples(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected) + t.Fatalf("invalid DeduplicateSamples(%v) timestamps;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected) } - // Verify values - if len(timestampsCopy) == 0 { - if len(values) != 0 { - t.Fatalf("values must be empty; got %v", values) - } - return - } - j := 0 - for i, ts := range timestamps { - if ts != timestampsCopy[j] { - continue - } - if values[j] != float64(i) { - t.Fatalf("unexpected value at index %d; got %v; want %v; values: %v", j, values[j], i, values) - } - j++ - if j == len(timestampsCopy) { - break - } - } - if j != len(timestampsCopy) { - t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:]) + if !reflect.DeepEqual(values, valuesExpected) { + t.Fatalf("invalid DeduplicateSamples(%v) values;\ngot\n%v\nwant\n%v", timestamps, values, valuesExpected) } // Verify that the second call to DeduplicateSamples doesn't modify samples. @@ -78,19 +65,19 @@ func TestDeduplicateSamples(t *testing.T) { t.Fatalf("invalid DeduplicateSamples(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy) } } - f(time.Millisecond, nil, []int64{}) - f(time.Millisecond, []int64{123}, []int64{123}) - f(time.Millisecond, []int64{123, 456}, []int64{123, 456}) - f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}) - f(0, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}) - f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 205, 300, 1000}, []int64{0, 100, 205, 300, 1000}) - f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 45e3}) + f(time.Millisecond, nil, []int64{}, []float64{}) + f(time.Millisecond, []int64{123}, []int64{123}, []float64{0}) + f(time.Millisecond, []int64{123, 456}, []int64{123, 456}, []float64{0, 1}) + f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}, []float64{2, 4, 5, 8, 9}) + f(0, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 205, 300, 1000}, []int64{0, 100, 180, 300, 1000}, []float64{0, 2, 5, 7, 8}) + f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 13e3, 30e3, 39e3, 45e3}, []float64{0, 1, 4, 6, 7}) } func TestDeduplicateSamplesDuringMerge(t *testing.T) { // Disable deduplication before exit, since the rest of tests expect disabled dedup. - f := func(scrapeInterval time.Duration, timestamps, timestampsExpected []int64) { + f := func(scrapeInterval time.Duration, timestamps, timestampsExpected, valuesExpected []int64) { t.Helper() timestampsCopy := make([]int64, len(timestamps)) values := make([]int64, len(timestamps)) @@ -101,30 +88,10 @@ func TestDeduplicateSamplesDuringMerge(t *testing.T) { dedupInterval := scrapeInterval.Milliseconds() timestampsCopy, values = deduplicateSamplesDuringMerge(timestampsCopy, values, dedupInterval) if !reflect.DeepEqual(timestampsCopy, timestampsExpected) { - t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) result;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected) + t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) timestamps;\ngot\n%v\nwant\n%v", timestamps, timestampsCopy, timestampsExpected) } - // Verify values - if len(timestampsCopy) == 0 { - if len(values) != 0 { - t.Fatalf("values must be empty; got %v", values) - } - return - } - j := 0 - for i, ts := range timestamps { - if ts != timestampsCopy[j] { - continue - } - if values[j] != int64(i) { - t.Fatalf("unexpected value at index %d; got %v; want %v; values: %v", j, values[j], i, values) - } - j++ - if j == len(timestampsCopy) { - break - } - } - if j != len(timestampsCopy) { - t.Fatalf("superfluous timestamps found starting from index %d: %v", j, timestampsCopy[j:]) + if !reflect.DeepEqual(values, valuesExpected) { + t.Fatalf("invalid DeduplicateSamples(%v) values;\ngot\n%v\nwant\n%v", timestamps, values, valuesExpected) } // Verify that the second call to DeduplicateSamples doesn't modify samples. @@ -137,21 +104,10 @@ func TestDeduplicateSamplesDuringMerge(t *testing.T) { t.Fatalf("invalid deduplicateSamplesDuringMerge(%v) values for the second call;\ngot\n%v\nwant\n%v", timestamps, values, valuesCopy) } } - f(time.Millisecond, nil, []int64{}) - f(time.Millisecond, []int64{123}, []int64{123}) - f(time.Millisecond, []int64{123, 456}, []int64{123, 456}) - f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}) - f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 200, 300, 1000}) - f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 45e3}) - - var timestamps, timestampsExpected []int64 - for i := 0; i < 40; i++ { - timestamps = append(timestamps, int64(i*1000)) - if i%2 == 0 { - timestampsExpected = append(timestampsExpected, int64(i*1000)) - } - } - f(0, timestamps, timestamps) - f(time.Second, timestamps, timestamps) - f(2*time.Second, timestamps, timestampsExpected) + f(time.Millisecond, nil, []int64{}, []int64{}) + f(time.Millisecond, []int64{123}, []int64{123}, []int64{0}) + f(time.Millisecond, []int64{123, 456}, []int64{123, 456}, []int64{0, 1}) + f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4}, []int64{2, 4, 5, 8, 9}) + f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 200, 300, 1000}, []int64{0, 2, 6, 7, 8}) + f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 13e3, 30e3, 39e3, 45e3}, []int64{0, 1, 4, 6, 7}) }