diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 6cdb67ed2c..87d1c02067 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -264,7 +264,7 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) { dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:]...) dst.Values = append(dst.Values, top.Values[top.NextIdx:]...) putSortBlock(top) - return + break } sbNext := sbh[0] tsNext := sbNext.Timestamps[sbNext.NextIdx] @@ -285,6 +285,8 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) { putSortBlock(top) } } + + dst.Timestamps, dst.Values = storage.DeduplicateSamples(dst.Timestamps, dst.Values) } type sortBlock struct { diff --git a/lib/storage/block.go b/lib/storage/block.go index 252a10ddf6..8cbf090cb0 100644 --- a/lib/storage/block.go +++ b/lib/storage/block.go @@ -158,6 +158,25 @@ func (b *Block) tooBig() bool { return false } +func (b *Block) deduplicateSamplesDuringMerge() { + if len(b.values) == 0 { + // Nothing to dedup or the data is already marshaled. + return + } + srcTimestamps := b.timestamps[b.nextIdx:] + srcValues := b.values[b.nextIdx:] + timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues) + b.timestamps = b.timestamps[:b.nextIdx+len(timestamps)] + b.values = b.values[:b.nextIdx+len(values)] +} + +func (b *Block) rowsCount() int { + if len(b.values) == 0 { + return int(b.bh.RowsCount) + } + return len(b.values[b.nextIdx:]) +} + // MarshalData marshals the block into binary representation. func (b *Block) MarshalData(timestampsBlockOffset, valuesBlockOffset uint64) ([]byte, []byte, []byte) { if len(b.values) == 0 { diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index ee9292251b..ab68e27c8a 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -171,6 +171,8 @@ func (bsw *blockStreamWriter) MustClose() { // WriteExternalBlock writes b to bsw and updates ph and rowsMerged. func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) { + atomic.AddUint64(rowsMerged, uint64(b.rowsCount())) + b.deduplicateSamplesDuringMerge() headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) bsw.indexData = append(bsw.indexData, headerData...) @@ -186,7 +188,6 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM bsw.valuesBlockOffset += uint64(len(valuesData)) updatePartHeader(b, ph) - atomic.AddUint64(rowsMerged, uint64(b.bh.RowsCount)) } func updatePartHeader(b *Block, ph *partHeader) { diff --git a/lib/storage/dedup.go b/lib/storage/dedup.go new file mode 100644 index 0000000000..04e9929a64 --- /dev/null +++ b/lib/storage/dedup.go @@ -0,0 +1,99 @@ +package storage + +import ( + "flag" + + "github.com/VictoriaMetrics/metrics" +) + +var minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+ + "This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+ + "Deduplication is disabled if the `-dedup.minScrapeInterval` is 0") + +func getMinDelta() int64 { + // Divide minScrapeInterval by 2 in order to preserve proper data points. + // For instance, if minScrapeInterval=10, the following time series: + // 10 15 19 25 30 34 41 + // Would be unexpectedly converted to: + // 10 25 41 + // When dividing minScrapeInterval by 2, it will be converted to the expected: + // 10 19 30 41 + return minScrapeInterval.Milliseconds() / 2 +} + +// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval. +func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) { + if *minScrapeInterval <= 0 { + return srcTimestamps, srcValues + } + minDelta := getMinDelta() + if !needsDedup(srcTimestamps, minDelta) { + // Fast path - nothing to deduplicate + return srcTimestamps, srcValues + } + + // Slow path - dedup data points. + prevTimestamp := srcTimestamps[0] + dstTimestamps := srcTimestamps[:1] + dstValues := srcValues[:1] + dedups := 0 + for i := 1; i < len(srcTimestamps); i++ { + ts := srcTimestamps[i] + if ts-prevTimestamp < minDelta { + dedups++ + continue + } + dstTimestamps = append(dstTimestamps, ts) + dstValues = append(dstValues, srcValues[i]) + prevTimestamp = ts + } + dedupsDuringSelect.Add(dedups) + return dstTimestamps, dstValues +} + +var dedupsDuringSelect = metrics.NewCounter(`deduplicated_samples_total{type="select"}`) + +func deduplicateSamplesDuringMerge(srcTimestamps []int64, srcValues []int64) ([]int64, []int64) { + if *minScrapeInterval <= 0 { + return srcTimestamps, srcValues + } + minDelta := getMinDelta() + if !needsDedup(srcTimestamps, minDelta) { + // Fast path - nothing to deduplicate + return srcTimestamps, srcValues + } + + // Slow path - dedup data points. + prevTimestamp := srcTimestamps[0] + dstTimestamps := srcTimestamps[:1] + dstValues := srcValues[:1] + dedups := 0 + for i := 1; i < len(srcTimestamps); i++ { + ts := srcTimestamps[i] + if ts-prevTimestamp < minDelta { + dedups++ + continue + } + dstTimestamps = append(dstTimestamps, ts) + dstValues = append(dstValues, srcValues[i]) + prevTimestamp = ts + } + dedupsDuringMerge.Add(dedups) + return dstTimestamps, dstValues +} + +var dedupsDuringMerge = metrics.NewCounter(`deduplicated_samples_total{type="merge"}`) + +func needsDedup(timestamps []int64, minDelta int64) bool { + if len(timestamps) == 0 { + return false + } + prevTimestamp := timestamps[0] + for _, ts := range timestamps[1:] { + if ts-prevTimestamp < minDelta { + return true + } + prevTimestamp = ts + } + return false +}