mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
lib/storage: improve deduplication algorithm
Now it leaves only the first data point on each `-dedup.minScrapeInterval` interval. Previously it may leave two data points on the interval. This could lead to unexpected results for `histogram_quantile(phi, sum(rate(buckets)) by (le))` query.
This commit is contained in:
parent
d9bdda408c
commit
a0000c3a6e
@ -1,7 +1,6 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -11,53 +10,39 @@ import (
|
||||
//
|
||||
// This function must be called before initializing the storage.
|
||||
func SetMinScrapeIntervalForDeduplication(interval time.Duration) {
|
||||
minScrapeInterval = interval
|
||||
minScrapeInterval = interval.Milliseconds()
|
||||
}
|
||||
|
||||
var minScrapeInterval = time.Duration(0)
|
||||
|
||||
func getMinDelta() int64 {
|
||||
// Use 7/8 of minScrapeInterval in order to preserve proper data points.
|
||||
// For instance, if minScrapeInterval=10, the following time series:
|
||||
// 10 15 19 25 30 34 41
|
||||
// Would be unexpectedly converted to if using 100% of minScrapeInterval:
|
||||
// 10 25 41
|
||||
// When using 7/8 of minScrapeInterval, it will be converted to the expected:
|
||||
// 10 19 30 41
|
||||
ms := minScrapeInterval.Milliseconds()
|
||||
|
||||
// Try calculating scrape interval via integer arithmetic.
|
||||
d := (ms / 8) * 7
|
||||
if d > 0 {
|
||||
return d
|
||||
}
|
||||
// Too small scrape interval for integer arithmetic. Calculate d using floating-point arithmetic.
|
||||
return int64(math.Round(float64(ms) / 8 * 7))
|
||||
}
|
||||
var minScrapeInterval = int64(0)
|
||||
|
||||
// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval.
|
||||
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
|
||||
if minScrapeInterval <= 0 {
|
||||
return srcTimestamps, srcValues
|
||||
}
|
||||
minDelta := getMinDelta()
|
||||
if !needsDedup(srcTimestamps, minDelta) {
|
||||
if !needsDedup(srcTimestamps, minScrapeInterval) {
|
||||
// Fast path - nothing to deduplicate
|
||||
return srcTimestamps, srcValues
|
||||
}
|
||||
|
||||
// Slow path - dedup data points.
|
||||
prevTimestamp := srcTimestamps[0]
|
||||
tsNext := (srcTimestamps[0] - srcTimestamps[0] % minScrapeInterval) + minScrapeInterval
|
||||
dstTimestamps := srcTimestamps[:1]
|
||||
dstValues := srcValues[:1]
|
||||
for i := 1; i < len(srcTimestamps); i++ {
|
||||
ts := srcTimestamps[i]
|
||||
if ts-prevTimestamp < minDelta {
|
||||
if ts < tsNext {
|
||||
continue
|
||||
}
|
||||
dstTimestamps = append(dstTimestamps, ts)
|
||||
dstValues = append(dstValues, srcValues[i])
|
||||
prevTimestamp = ts
|
||||
|
||||
// Update tsNext
|
||||
tsNext += minScrapeInterval
|
||||
if ts >= tsNext {
|
||||
// Slow path for updating ts.
|
||||
tsNext = (ts - ts % minScrapeInterval) + minScrapeInterval
|
||||
}
|
||||
}
|
||||
return dstTimestamps, dstValues
|
||||
}
|
||||
@ -66,29 +51,29 @@ func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, [
|
||||
if minScrapeInterval <= 0 {
|
||||
return srcTimestamps, srcValues
|
||||
}
|
||||
if len(srcTimestamps) < 32 {
|
||||
// Do not de-duplicate small number of samples during merge
|
||||
// in order to improve deduplication accuracy on later stages.
|
||||
return srcTimestamps, srcValues
|
||||
}
|
||||
minDelta := getMinDelta()
|
||||
if !needsDedup(srcTimestamps, minDelta) {
|
||||
if !needsDedup(srcTimestamps, minScrapeInterval) {
|
||||
// Fast path - nothing to deduplicate
|
||||
return srcTimestamps, srcValues
|
||||
}
|
||||
|
||||
// Slow path - dedup data points.
|
||||
prevTimestamp := srcTimestamps[0]
|
||||
tsNext := (srcTimestamps[0] - srcTimestamps[0] % minScrapeInterval) + minScrapeInterval
|
||||
dstTimestamps := srcTimestamps[:1]
|
||||
dstValues := srcValues[:1]
|
||||
for i := 1; i < len(srcTimestamps); i++ {
|
||||
ts := srcTimestamps[i]
|
||||
if ts-prevTimestamp < minDelta {
|
||||
if ts < tsNext {
|
||||
continue
|
||||
}
|
||||
dstTimestamps = append(dstTimestamps, ts)
|
||||
dstValues = append(dstValues, srcValues[i])
|
||||
prevTimestamp = ts
|
||||
|
||||
// Update tsNext
|
||||
tsNext += minScrapeInterval
|
||||
if ts >= tsNext {
|
||||
// Slow path for updating ts.
|
||||
tsNext = (ts - ts % minScrapeInterval) + minScrapeInterval
|
||||
}
|
||||
}
|
||||
return dstTimestamps, dstValues
|
||||
}
|
||||
|
@ -6,27 +6,6 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestGetMinDelta(t *testing.T) {
|
||||
// Disable deduplication before exit, since the rest of tests expect disabled dedup.
|
||||
defer SetMinScrapeIntervalForDeduplication(0)
|
||||
|
||||
f := func(scrapeInterval time.Duration, dExpected int64) {
|
||||
t.Helper()
|
||||
SetMinScrapeIntervalForDeduplication(scrapeInterval)
|
||||
d := getMinDelta()
|
||||
if d != dExpected {
|
||||
t.Fatalf("unexpected getMinDelta(%s); got %d; want %d", scrapeInterval, d, dExpected)
|
||||
}
|
||||
}
|
||||
f(0, 0)
|
||||
f(time.Millisecond, 1)
|
||||
f(5*time.Millisecond, 4)
|
||||
f(8*time.Millisecond, 7)
|
||||
f(100*time.Millisecond, 84)
|
||||
f(time.Second, 875)
|
||||
f(10*time.Second, 8750)
|
||||
}
|
||||
|
||||
func TestDeduplicateSamples(t *testing.T) {
|
||||
// Disable deduplication before exit, since the rest of tests expect disabled dedup.
|
||||
defer SetMinScrapeIntervalForDeduplication(0)
|
||||
@ -73,8 +52,8 @@ func TestDeduplicateSamples(t *testing.T) {
|
||||
f(time.Millisecond, []int64{123, 456}, []int64{123, 456})
|
||||
f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4})
|
||||
f(0, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4})
|
||||
f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 200, 300, 1000})
|
||||
f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 39e3})
|
||||
f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 205, 300, 1000}, []int64{0, 100, 205, 300, 1000})
|
||||
f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 45e3})
|
||||
}
|
||||
|
||||
func TestDeduplicateSamplesDuringMerge(t *testing.T) {
|
||||
@ -121,9 +100,9 @@ func TestDeduplicateSamplesDuringMerge(t *testing.T) {
|
||||
f(time.Millisecond, nil, []int64{})
|
||||
f(time.Millisecond, []int64{123}, []int64{123})
|
||||
f(time.Millisecond, []int64{123, 456}, []int64{123, 456})
|
||||
f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4})
|
||||
f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000})
|
||||
f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3})
|
||||
f(time.Millisecond, []int64{0, 0, 0, 1, 1, 2, 3, 3, 3, 4}, []int64{0, 1, 2, 3, 4})
|
||||
f(100*time.Millisecond, []int64{0, 100, 100, 101, 150, 180, 200, 300, 1000}, []int64{0, 100, 200, 300, 1000})
|
||||
f(10*time.Second, []int64{10e3, 13e3, 21e3, 22e3, 30e3, 33e3, 39e3, 45e3}, []int64{10e3, 21e3, 30e3, 45e3})
|
||||
|
||||
var timestamps, timestampsExpected []int64
|
||||
for i := 0; i < 40; i++ {
|
||||
|
36
lib/storage/dedup_timing_test.go
Normal file
36
lib/storage/dedup_timing_test.go
Normal file
@ -0,0 +1,36 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func BenchmarkDeduplicateSamples(b *testing.B) {
|
||||
const blockSize = 8192
|
||||
timestamps := make([]int64, blockSize)
|
||||
values := make([]float64, blockSize)
|
||||
for i := 0; i < len(timestamps); i++ {
|
||||
timestamps[i] = int64(i) * 1e3
|
||||
}
|
||||
for _, minScrapeInterval := range []time.Duration{time.Second, 2 * time.Second, 5 * time.Second, 10 * time.Second} {
|
||||
b.Run(fmt.Sprintf("minScrapeInterval=%s", minScrapeInterval), func(b *testing.B) {
|
||||
SetMinScrapeIntervalForDeduplication(minScrapeInterval)
|
||||
defer SetMinScrapeIntervalForDeduplication(0)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(blockSize)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
timestampsCopy := make([]int64, 0, blockSize)
|
||||
valuesCopy := make([]float64, 0, blockSize)
|
||||
for pb.Next() {
|
||||
timestampsCopy := append(timestampsCopy[:0], timestamps...)
|
||||
valuesCopy := append(valuesCopy[:0], values...)
|
||||
ts, vs := DeduplicateSamples(timestampsCopy, valuesCopy)
|
||||
if len(ts) == 0 || len(vs) == 0 {
|
||||
panic(fmt.Errorf("expecting non-empty results; got\nts=%v\nvs=%v", ts, vs))
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user