all: add -dedup.minScrapeInterval command-line flag for data de-duplication

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/86
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/278
This commit is contained in:
Aliaksandr Valialkin 2020-01-31 01:09:44 +02:00
parent 2152f6f0cd
commit c3d86eef96
5 changed files with 137 additions and 2 deletions

View File

@ -95,6 +95,7 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM
- [Federation](#federation)
- [Capacity planning](#capacity-planning)
- [High availability](#high-availability)
- [Deduplication](#deduplication)
- [Retention](#retention)
- [Multiple retentions](#multiple-retentions)
- [Downsampling](#downsampling)
@ -701,6 +702,19 @@ kill -HUP `pidof prometheus`
If you have Prometheus HA pairs with replicas `r1` and `r2` in each pair, then configure each `r1`
to write data to `victoriametrics-addr-1`, while each `r2` should write data to `victoriametrics-addr-2`.
Another option is to write data simultaneously from Prometheus HA pair to a pair of VictoriaMetrics instances
with the enabled de-duplication. See [this section](#deduplication) for details.
### Deduplication
VictoriaMetrics de-duplicates data points if `-dedup.minScrapeInterval` command-line flag
is set to positive duration. For example, `-dedup.minScrapeInterval=60s` would de-duplicate data points
on the same time series if they are located closer than 60s to each other.
The de-duplication reduces disk space usage if multiple identically configured Prometheus instances in HA pair
write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical
`external_labels` section in their configs, so they write data to the same time series.
### Retention

View File

@ -266,7 +266,7 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) {
dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:]...)
dst.Values = append(dst.Values, top.Values[top.NextIdx:]...)
putSortBlock(top)
return
break
}
sbNext := sbh[0]
tsNext := sbNext.Timestamps[sbNext.NextIdx]
@ -287,6 +287,8 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) {
putSortBlock(top)
}
}
dst.Timestamps, dst.Values = storage.DeduplicateSamples(dst.Timestamps, dst.Values)
}
type sortBlock struct {

View File

@ -158,6 +158,25 @@ func (b *Block) tooBig() bool {
return false
}
func (b *Block) deduplicateSamplesDuringMerge() {
if len(b.values) == 0 {
// Nothing to dedup or the data is already marshaled.
return
}
srcTimestamps := b.timestamps[b.nextIdx:]
srcValues := b.values[b.nextIdx:]
timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues)
b.timestamps = b.timestamps[:b.nextIdx+len(timestamps)]
b.values = b.values[:b.nextIdx+len(values)]
}
func (b *Block) rowsCount() int {
if len(b.values) == 0 {
return int(b.bh.RowsCount)
}
return len(b.values[b.nextIdx:])
}
// MarshalData marshals the block into binary representation.
func (b *Block) MarshalData(timestampsBlockOffset, valuesBlockOffset uint64) ([]byte, []byte, []byte) {
if len(b.values) == 0 {

View File

@ -171,6 +171,8 @@ func (bsw *blockStreamWriter) MustClose() {
// WriteExternalBlock writes b to bsw and updates ph and rowsMerged.
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) {
atomic.AddUint64(rowsMerged, uint64(b.rowsCount()))
b.deduplicateSamplesDuringMerge()
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
bsw.indexData = append(bsw.indexData, headerData...)
@ -186,7 +188,6 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM
bsw.valuesBlockOffset += uint64(len(valuesData))
updatePartHeader(b, ph)
atomic.AddUint64(rowsMerged, uint64(b.bh.RowsCount))
}
func updatePartHeader(b *Block, ph *partHeader) {

99
lib/storage/dedup.go Normal file
View File

@ -0,0 +1,99 @@
package storage
import (
"flag"
"github.com/VictoriaMetrics/metrics"
)
var minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Remove superflouos samples from time series if they are located closer to each other than this duration. "+
"This may be useful for reducing overhead when multiple identically configured Prometheus instances write data to the same VictoriaMetrics. "+
"Deduplication is disabled if the `-dedup.minScrapeInterval` is 0")
func getMinDelta() int64 {
// Divide minScrapeInterval by 2 in order to preserve proper data points.
// For instance, if minScrapeInterval=10, the following time series:
// 10 15 19 25 30 34 41
// Would be unexpectedly converted to:
// 10 25 41
// When dividing minScrapeInterval by 2, it will be converted to the expected:
// 10 19 30 41
return minScrapeInterval.Milliseconds() / 2
}
// DeduplicateSamples removes samples from src* if they are closer to each other than minScrapeInterval.
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []float64) {
if *minScrapeInterval <= 0 {
return srcTimestamps, srcValues
}
minDelta := getMinDelta()
if !needsDedup(srcTimestamps, minDelta) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
// Slow path - dedup data points.
prevTimestamp := srcTimestamps[0]
dstTimestamps := srcTimestamps[:1]
dstValues := srcValues[:1]
dedups := 0
for i := 1; i < len(srcTimestamps); i++ {
ts := srcTimestamps[i]
if ts-prevTimestamp < minDelta {
dedups++
continue
}
dstTimestamps = append(dstTimestamps, ts)
dstValues = append(dstValues, srcValues[i])
prevTimestamp = ts
}
dedupsDuringSelect.Add(dedups)
return dstTimestamps, dstValues
}
var dedupsDuringSelect = metrics.NewCounter(`deduplicated_samples_total{type="select"}`)
func deduplicateSamplesDuringMerge(srcTimestamps []int64, srcValues []int64) ([]int64, []int64) {
if *minScrapeInterval <= 0 {
return srcTimestamps, srcValues
}
minDelta := getMinDelta()
if !needsDedup(srcTimestamps, minDelta) {
// Fast path - nothing to deduplicate
return srcTimestamps, srcValues
}
// Slow path - dedup data points.
prevTimestamp := srcTimestamps[0]
dstTimestamps := srcTimestamps[:1]
dstValues := srcValues[:1]
dedups := 0
for i := 1; i < len(srcTimestamps); i++ {
ts := srcTimestamps[i]
if ts-prevTimestamp < minDelta {
dedups++
continue
}
dstTimestamps = append(dstTimestamps, ts)
dstValues = append(dstValues, srcValues[i])
prevTimestamp = ts
}
dedupsDuringMerge.Add(dedups)
return dstTimestamps, dstValues
}
var dedupsDuringMerge = metrics.NewCounter(`deduplicated_samples_total{type="merge"}`)
func needsDedup(timestamps []int64, minDelta int64) bool {
if len(timestamps) == 0 {
return false
}
prevTimestamp := timestamps[0]
for _, ts := range timestamps[1:] {
if ts-prevTimestamp < minDelta {
return true
}
prevTimestamp = ts
}
return false
}