mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
lib/storage: properly determine when the deduplication is needed in needsDedup
Previously needsDedup() could return true if the de-duplication wasn't needed for the following case: d < interval / \ | v | v | interval interval Now it properly returns false for this case
This commit is contained in:
parent
f539772ca6
commit
8ca2799478
@ -24,9 +24,11 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []
|
|||||||
// Fast path - nothing to deduplicate
|
// Fast path - nothing to deduplicate
|
||||||
return srcTimestamps, srcValues
|
return srcTimestamps, srcValues
|
||||||
}
|
}
|
||||||
|
return deduplicateInternal(minScrapeInterval, srcTimestamps, srcValues)
|
||||||
|
}
|
||||||
|
|
||||||
// Slow path - dedup data points.
|
func deduplicateInternal(interval int64, srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
|
||||||
tsNext := (srcTimestamps[0] - srcTimestamps[0]%minScrapeInterval) + minScrapeInterval
|
tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval
|
||||||
dstTimestamps := srcTimestamps[:1]
|
dstTimestamps := srcTimestamps[:1]
|
||||||
dstValues := srcValues[:1]
|
dstValues := srcValues[:1]
|
||||||
for i := 1; i < len(srcTimestamps); i++ {
|
for i := 1; i < len(srcTimestamps); i++ {
|
||||||
@ -38,10 +40,10 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []
|
|||||||
dstValues = append(dstValues, srcValues[i])
|
dstValues = append(dstValues, srcValues[i])
|
||||||
|
|
||||||
// Update tsNext
|
// Update tsNext
|
||||||
tsNext += minScrapeInterval
|
tsNext += interval
|
||||||
if ts >= tsNext {
|
if ts >= tsNext {
|
||||||
// Slow path for updating ts.
|
// Slow path for updating ts.
|
||||||
tsNext = (ts - ts%minScrapeInterval) + minScrapeInterval
|
tsNext = (ts - ts%interval) + interval
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return dstTimestamps, dstValues
|
return dstTimestamps, dstValues
|
||||||
@ -55,9 +57,11 @@ func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, [
|
|||||||
// Fast path - nothing to deduplicate
|
// Fast path - nothing to deduplicate
|
||||||
return srcTimestamps, srcValues
|
return srcTimestamps, srcValues
|
||||||
}
|
}
|
||||||
|
return deduplicateDuringMergeInternal(minScrapeInterval, srcTimestamps, srcValues)
|
||||||
|
}
|
||||||
|
|
||||||
// Slow path - dedup data points.
|
func deduplicateDuringMergeInternal(interval int64, srcTimestamps, srcValues []int64) ([]int64, []int64) {
|
||||||
tsNext := (srcTimestamps[0] - srcTimestamps[0]%minScrapeInterval) + minScrapeInterval
|
tsNext := (srcTimestamps[0] - srcTimestamps[0]%interval) + interval
|
||||||
dstTimestamps := srcTimestamps[:1]
|
dstTimestamps := srcTimestamps[:1]
|
||||||
dstValues := srcValues[:1]
|
dstValues := srcValues[:1]
|
||||||
for i := 1; i < len(srcTimestamps); i++ {
|
for i := 1; i < len(srcTimestamps); i++ {
|
||||||
@ -69,25 +73,28 @@ func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, [
|
|||||||
dstValues = append(dstValues, srcValues[i])
|
dstValues = append(dstValues, srcValues[i])
|
||||||
|
|
||||||
// Update tsNext
|
// Update tsNext
|
||||||
tsNext += minScrapeInterval
|
tsNext += interval
|
||||||
if ts >= tsNext {
|
if ts >= tsNext {
|
||||||
// Slow path for updating ts.
|
// Slow path for updating ts.
|
||||||
tsNext = (ts - ts%minScrapeInterval) + minScrapeInterval
|
tsNext = (ts - ts%interval) + interval
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return dstTimestamps, dstValues
|
return dstTimestamps, dstValues
|
||||||
}
|
}
|
||||||
|
|
||||||
func needsDedup(timestamps []int64, minDelta int64) bool {
|
func needsDedup(timestamps []int64, interval int64) bool {
|
||||||
if len(timestamps) == 0 {
|
if len(timestamps) == 0 || interval <= 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
prevTimestamp := timestamps[0]
|
tsNext := (timestamps[0] - timestamps[0]%interval) + interval
|
||||||
for _, ts := range timestamps[1:] {
|
for _, ts := range timestamps[1:] {
|
||||||
if ts-prevTimestamp < minDelta {
|
if ts < tsNext {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
prevTimestamp = ts
|
tsNext += interval
|
||||||
|
if ts >= tsNext {
|
||||||
|
tsNext = (ts - ts%interval) + interval
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,28 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestNeedsDedup(t *testing.T) {
|
||||||
|
f := func(interval int64, timestamps []int64, expectedResult bool) {
|
||||||
|
t.Helper()
|
||||||
|
result := needsDedup(timestamps, interval)
|
||||||
|
if result != expectedResult {
|
||||||
|
t.Fatalf("unexpected result for needsDedup(%d, %d); got %v; want %v", timestamps, interval, result, expectedResult)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f(-1, nil, false)
|
||||||
|
f(-1, []int64{1}, false)
|
||||||
|
f(0, []int64{1, 2}, false)
|
||||||
|
f(10, []int64{1}, false)
|
||||||
|
f(10, []int64{1, 2}, true)
|
||||||
|
f(10, []int64{9, 10}, false)
|
||||||
|
f(10, []int64{9, 10, 19}, true)
|
||||||
|
f(10, []int64{9, 19}, false)
|
||||||
|
f(10, []int64{0, 9, 19}, true)
|
||||||
|
f(10, []int64{0, 19}, false)
|
||||||
|
f(10, []int64{0, 35, 40}, false)
|
||||||
|
f(10, []int64{0, 35, 40, 41}, true)
|
||||||
|
}
|
||||||
|
|
||||||
func TestDeduplicateSamples(t *testing.T) {
|
func TestDeduplicateSamples(t *testing.T) {
|
||||||
// Disable deduplication before exit, since the rest of tests expect disabled dedup.
|
// Disable deduplication before exit, since the rest of tests expect disabled dedup.
|
||||||
defer SetMinScrapeIntervalForDeduplication(0)
|
defer SetMinScrapeIntervalForDeduplication(0)
|
||||||
|
Loading…
Reference in New Issue
Block a user