diff --git a/README.md b/README.md index 14327e474..3ce398561 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,7 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM - [Federation](#federation) - [Capacity planning](#capacity-planning) - [High availability](#high-availability) + - [Deduplication](#deduplication) - [Retention](#retention) - [Multiple retentions](#multiple-retentions) - [Downsampling](#downsampling) @@ -701,6 +702,19 @@ kill -HUP `pidof prometheus` If you have Prometheus HA pairs with replicas `r1` and `r2` in each pair, then configure each `r1` to write data to `victoriametrics-addr-1`, while each `r2` should write data to `victoriametrics-addr-2`. +Another option is to write data simultaneously from Prometheus HA pair to a pair of VictoriaMetrics instances +with the enabled de-duplication. See [this section](#deduplication) for details. + + +### Deduplication + +VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command-line flag +is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points +on the same time series if they are located closer than 60s to each other. +The de-duplication reduces disk space usage if multiple identically configured Prometheus instances in HA pair +write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical +`external_labels` section in their configs, so they write data to the same time series. + ### Retention diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index fbb4e8c7d..77e057875 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -266,7 +266,7 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) { dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:]...) dst.Values = append(dst.Values, top.Values[top.NextIdx:]...) putSortBlock(top) - return + break } sbNext := sbh[0] tsNext := sbNext.Timestamps[sbNext.NextIdx] @@ -287,6 +287,8 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) { putSortBlock(top) } } + + dst.Timestamps, dst.Values = storage.DeduplicateSamples(dst.Timestamps, dst.Values) } type sortBlock struct { diff --git a/lib/storage/block.go b/lib/storage/block.go index 252a10ddf..8cbf090cb 100644 --- a/lib/storage/block.go +++ b/lib/storage/block.go @@ -158,6 +158,25 @@ func (b *Block) tooBig() bool { return false } +func (b *Block) deduplicateSamplesDuringMerge() { + if len(b.values) == 0 { + // Nothing to dedup or the data is already marshaled. + return + } + srcTimestamps := b.timestamps[b.nextIdx:] + srcValues := b.values[b.nextIdx:] + timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues) + b.timestamps = b.timestamps[:b.nextIdx+len(timestamps)] + b.values = b.values[:b.nextIdx+len(values)] +} + +func (b *Block) rowsCount() int { + if len(b.values) == 0 { + return int(b.bh.RowsCount) + } + return len(b.values[b.nextIdx:]) +} + // MarshalData marshals the block into binary representation. func (b *Block) MarshalData(timestampsBlockOffset, valuesBlockOffset uint64) ([]byte, []byte, []byte) { if len(b.values) == 0 { diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index ee9292251..ab68e27c8 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -171,6 +171,8 @@ func (bsw *blockStreamWriter) MustClose() { // WriteExternalBlock writes b to bsw and updates ph and rowsMerged. func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) { + atomic.AddUint64(rowsMerged, uint64(b.rowsCount())) + b.deduplicateSamplesDuringMerge() headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) bsw.indexData = append(bsw.indexData, headerData...) @@ -186,7 +188,6 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM bsw.valuesBlockOffset += uint64(len(valuesData)) updatePartHeader(b, ph) - atomic.AddUint64(rowsMerged, uint64(b.bh.RowsCount)) } func updatePartHeader(b *Block, ph *partHeader) { diff --git a/lib/storage/dedup.go b/lib/storage/dedup.go new file mode 100644 index 000000000..04e9929a6 --- /dev/null +++ b/lib/storage/dedup.go @@ -0,0 +1,99 @@ +package storage + +import ( + "flag" + + "github.com/VictoriaMetrics/metrics" +) + +var minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+ + "This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+ + "Deduplication is disabled if the `-dedup.minScrapeInterval` is 0") + +func getMinDelta() int64 { + // Divide minScrapeInterval by 2 in order to preserve proper data points. + // For instance, if minScrapeInterval=10, the following time series: + // 10 15 19 25 30 34 41 + // Would be unexpectedly converted to: + // 10 25 41 + // When dividing minScrapeInterval by 2, it will be converted to the expected: + // 10 19 30 41 + return minScrapeInterval.Milliseconds() / 2 +} + +// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval. +func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) { + if *minScrapeInterval <= 0 { + return srcTimestamps, srcValues + } + minDelta := getMinDelta() + if !needsDedup(srcTimestamps, minDelta) { + // Fast path - nothing to deduplicate + return srcTimestamps, srcValues + } + + // Slow path - dedup data points. + prevTimestamp := srcTimestamps[0] + dstTimestamps := srcTimestamps[:1] + dstValues := srcValues[:1] + dedups := 0 + for i := 1; i < len(srcTimestamps); i++ { + ts := srcTimestamps[i] + if ts-prevTimestamp < minDelta { + dedups++ + continue + } + dstTimestamps = append(dstTimestamps, ts) + dstValues = append(dstValues, srcValues[i]) + prevTimestamp = ts + } + dedupsDuringSelect.Add(dedups) + return dstTimestamps, dstValues +} + +var dedupsDuringSelect = metrics.NewCounter(`deduplicated_samples_total{type="select"}`) + +func deduplicateSamplesDuringMerge(srcTimestamps []int64, srcValues []int64) ([]int64, []int64) { + if *minScrapeInterval <= 0 { + return srcTimestamps, srcValues + } + minDelta := getMinDelta() + if !needsDedup(srcTimestamps, minDelta) { + // Fast path - nothing to deduplicate + return srcTimestamps, srcValues + } + + // Slow path - dedup data points. + prevTimestamp := srcTimestamps[0] + dstTimestamps := srcTimestamps[:1] + dstValues := srcValues[:1] + dedups := 0 + for i := 1; i < len(srcTimestamps); i++ { + ts := srcTimestamps[i] + if ts-prevTimestamp < minDelta { + dedups++ + continue + } + dstTimestamps = append(dstTimestamps, ts) + dstValues = append(dstValues, srcValues[i]) + prevTimestamp = ts + } + dedupsDuringMerge.Add(dedups) + return dstTimestamps, dstValues +} + +var dedupsDuringMerge = metrics.NewCounter(`deduplicated_samples_total{type="merge"}`) + +func needsDedup(timestamps []int64, minDelta int64) bool { + if len(timestamps) == 0 { + return false + } + prevTimestamp := timestamps[0] + for _, ts := range timestamps[1:] { + if ts-prevTimestamp < minDelta { + return true + } + prevTimestamp = ts + } + return false +}