VictoriaMetrics/lib/storage/dedup.go
Aliaksandr Valialkin 8ed0d5471a lib/storage: correctly handle -dedup.minScrapeInterval values smaller than 8ms
Such small values may be used for removing samples with duplicate timestamps.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/409 for details.
2020-04-10 16:40:41 +03:00

109 lines
3.0 KiB
Go

package storage
import (
"math"
"time"
)
// SetMinScrapeIntervalForDeduplication sets the minimum interval for data points during de-duplication.
//
// De-duplication is disabled if interval is 0.
//
// This function must be called before initializing the storage.
func SetMinScrapeIntervalForDeduplication(interval time.Duration) {
minScrapeInterval = interval
}
var minScrapeInterval = time.Duration(0)
func getMinDelta() int64 {
// Use 7/8 of minScrapeInterval in order to preserve proper data points.
// For instance, if minScrapeInterval=10, the following time series:
// 10 15 19 25 30 34 41
// Would be unexpectedly converted to if using 100% of minScrapeInterval:
// 10 25 41
// When using 7/8 of minScrapeInterval, it will be converted to the expected:
// 10 19 30 41
ms := minScrapeInterval.Milliseconds()
// Try calculating scrape interval via integer arithmetic.
d := (ms / 8) * 7
if d > 0 {
return d
}
// Too small scrape interval for integer arithmetic. Calculate d using floating-point arithmetic.
return int64(math.Round(float64(ms) / 8 * 7))
}
// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval.
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
if minScrapeInterval <= 0 {
return srcTimestamps, srcValues
}
minDelta := getMinDelta()
if !needsDedup(srcTimestamps, minDelta) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
// Slow path - dedup data points.
prevTimestamp := srcTimestamps[0]
dstTimestamps := srcTimestamps[:1]
dstValues := srcValues[:1]
for i := 1; i < len(srcTimestamps); i++ {
ts := srcTimestamps[i]
if ts-prevTimestamp < minDelta {
continue
}
dstTimestamps = append(dstTimestamps, ts)
dstValues = append(dstValues, srcValues[i])
prevTimestamp = ts
}
return dstTimestamps, dstValues
}
func deduplicateSamplesDuringMerge(srcTimestamps, srcValues []int64) ([]int64, []int64) {
if minScrapeInterval <= 0 {
return srcTimestamps, srcValues
}
if len(srcTimestamps) < 32 {
// Do not de-duplicate small number of samples during merge
// in order to improve deduplication accuracy on later stages.
return srcTimestamps, srcValues
}
minDelta := getMinDelta()
if !needsDedup(srcTimestamps, minDelta) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
// Slow path - dedup data points.
prevTimestamp := srcTimestamps[0]
dstTimestamps := srcTimestamps[:1]
dstValues := srcValues[:1]
for i := 1; i < len(srcTimestamps); i++ {
ts := srcTimestamps[i]
if ts-prevTimestamp < minDelta {
continue
}
dstTimestamps = append(dstTimestamps, ts)
dstValues = append(dstValues, srcValues[i])
prevTimestamp = ts
}
return dstTimestamps, dstValues
}
func needsDedup(timestamps []int64, minDelta int64) bool {
if len(timestamps) == 0 {
return false
}
prevTimestamp := timestamps[0]
for _, ts := range timestamps[1:] {
if ts-prevTimestamp < minDelta {
return true
}
prevTimestamp = ts
}
return false
}