lib/storage: deduplicate samples more thoroughly

Previously some duplicate samples may be left on disk for time series with high churn rate.
This may result in higher disk space usage.
This commit is contained in:
Aliaksandr Valialkin 2021-12-15 15:58:27 +02:00
parent 92070cbb67
commit 4ff647137a
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
10 changed files with 173 additions and 17 deletions

View File

@ -20,6 +20,7 @@ sort: 15
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): restore the ability to use `$labels.alertname` in labels templating. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1921). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): restore the ability to use `$labels.alertname` in labels templating. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1921).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add missing `query` caption to the input field for the query. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1900). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add missing `query` caption to the input field for the query. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1900).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix navigation over query history with `Ctrl+up/down` and fix zoom relatively to the cursor position. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1936). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix navigation over query history with `Ctrl+up/down` and fix zoom relatively to the cursor position. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1936).
* BUGFIX: deduplicate samples more thoroughly if [deduplication](https://docs.victoriametrics.com/#deduplication) is enabled. Previously some duplicate samples may be left on disk for time series with high churn rate. This may result in bigger storage space requirements.
## [v1.70.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.70.0) ## [v1.70.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.70.0)

View File

@ -147,12 +147,25 @@ func (b *Block) tooBig() bool {
return false return false
} }
func (b *Block) deduplicateSamplesDuringMerge(dedupInterval int64) { func (b *Block) deduplicateSamplesDuringMerge() {
if len(b.values) == 0 { if !isDedupEnabled() {
// Nothing to dedup or the data is already marshaled. // Deduplication is disabled
return return
} }
// Unmarshal block if it isn't unmarshaled yet in order to apply the de-duplication to unmarshaled samples.
if err := b.UnmarshalData(); err != nil {
logger.Panicf("FATAL: cannot unmarshal block: %s", err)
}
srcTimestamps := b.timestamps[b.nextIdx:] srcTimestamps := b.timestamps[b.nextIdx:]
if len(srcTimestamps) < 2 {
// Nothing to dedup.
return
}
dedupInterval := GetDedupInterval()
if dedupInterval <= 0 {
// Deduplication is disabled.
return
}
srcValues := b.values[b.nextIdx:] srcValues := b.values[b.nextIdx:]
timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues, dedupInterval) timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues, dedupInterval)
dedups := len(srcTimestamps) - len(timestamps) dedups := len(srcTimestamps) - len(timestamps)

View File

@ -184,12 +184,9 @@ func (bsw *blockStreamWriter) MustClose() {
} }
// WriteExternalBlock writes b to bsw and updates ph and rowsMerged. // WriteExternalBlock writes b to bsw and updates ph and rowsMerged.
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64, needDedup bool) { func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) {
atomic.AddUint64(rowsMerged, uint64(b.rowsCount())) atomic.AddUint64(rowsMerged, uint64(b.rowsCount()))
if needDedup { b.deduplicateSamplesDuringMerge()
dedupInterval := GetDedupInterval()
b.deduplicateSamplesDuringMerge(dedupInterval)
}
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData) usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData)
if usePrevTimestamps { if usePrevTimestamps {

View File

@ -49,7 +49,7 @@ func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeR
bsw.InitFromInmemoryPart(&mp) bsw.InitFromInmemoryPart(&mp)
for i := range ebsCopy { for i := range ebsCopy {
bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged, false) bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged)
} }
bsw.MustClose() bsw.MustClose()
mp.Reset() mp.Reset()

View File

@ -20,6 +20,10 @@ func GetDedupInterval() int64 {
var globalDedupInterval int64 var globalDedupInterval int64
func isDedupEnabled() bool {
return globalDedupInterval > 0
}
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds. // DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds.
func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) { func DeduplicateSamples(srcTimestamps []int64, srcValues []float64, dedupInterval int64) ([]int64, []float64) {
if !needsDedup(srcTimestamps, dedupInterval) { if !needsDedup(srcTimestamps, dedupInterval) {

View File

@ -76,14 +76,14 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
if bsm.Block.bh.TSID.Less(&pendingBlock.bh.TSID) { if bsm.Block.bh.TSID.Less(&pendingBlock.bh.TSID) {
logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &bsm.Block.bh.TSID, &pendingBlock.bh.TSID) logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &bsm.Block.bh.TSID, &pendingBlock.bh.TSID)
} }
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true) bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
pendingBlock.CopyFrom(bsm.Block) pendingBlock.CopyFrom(bsm.Block)
continue continue
} }
if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= bsm.Block.bh.MinTimestamp { if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= bsm.Block.bh.MinTimestamp {
// Fast path - pendingBlock is too big and it doesn't overlap with bsm.Block. // Fast path - pendingBlock is too big and it doesn't overlap with bsm.Block.
// Write the pendingBlock and then deal with bsm.Block. // Write the pendingBlock and then deal with bsm.Block.
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true) bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
pendingBlock.CopyFrom(bsm.Block) pendingBlock.CopyFrom(bsm.Block)
continue continue
} }
@ -119,13 +119,13 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
tmpBlock.timestamps = tmpBlock.timestamps[:maxRowsPerBlock] tmpBlock.timestamps = tmpBlock.timestamps[:maxRowsPerBlock]
tmpBlock.values = tmpBlock.values[:maxRowsPerBlock] tmpBlock.values = tmpBlock.values[:maxRowsPerBlock]
tmpBlock.fixupTimestamps() tmpBlock.fixupTimestamps()
bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged, true) bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged)
} }
if err := bsm.Error(); err != nil { if err := bsm.Error(); err != nil {
return fmt.Errorf("cannot read block to be merged: %w", err) return fmt.Errorf("cannot read block to be merged: %w", err)
} }
if !pendingBlockIsEmpty { if !pendingBlockIsEmpty {
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged, true) bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
} }
return nil return nil
} }

View File

@ -1,11 +1,17 @@
package storage package storage
import ( import (
"errors"
"fmt" "fmt"
"io/ioutil"
"os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/metricsql"
) )
// partHeader represents part header. // partHeader represents part header.
@ -21,6 +27,9 @@ type partHeader struct {
// MaxTimestamp is the maximum timestamp in the part. // MaxTimestamp is the maximum timestamp in the part.
MaxTimestamp int64 MaxTimestamp int64
// MinDedupInterval is minimal dedup interval in milliseconds across all the blocks in the part.
MinDedupInterval int64
} }
// String returns string representation of ph. // String returns string representation of ph.
@ -104,6 +113,10 @@ func (ph *partHeader) ParseFromPath(path string) error {
return fmt.Errorf("blocksCount cannot be bigger than rowsCount; got blocksCount=%d, rowsCount=%d", ph.BlocksCount, ph.RowsCount) return fmt.Errorf("blocksCount cannot be bigger than rowsCount; got blocksCount=%d, rowsCount=%d", ph.BlocksCount, ph.RowsCount)
} }
if err := ph.readMinDedupInterval(path); err != nil {
return fmt.Errorf("cannot read min dedup interval: %w", err)
}
return nil return nil
} }
@ -113,4 +126,34 @@ func (ph *partHeader) Reset() {
ph.BlocksCount = 0 ph.BlocksCount = 0
ph.MinTimestamp = (1 << 63) - 1 ph.MinTimestamp = (1 << 63) - 1
ph.MaxTimestamp = -1 << 63 ph.MaxTimestamp = -1 << 63
ph.MinDedupInterval = 0
}
func (ph *partHeader) readMinDedupInterval(partPath string) error {
filePath := partPath + "/min_dedup_interval"
data, err := ioutil.ReadFile(filePath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// The minimum dedup interval may not exist for old parts.
ph.MinDedupInterval = 0
return nil
}
return fmt.Errorf("cannot read %q: %w", filePath, err)
}
dedupInterval, err := metricsql.DurationValue(string(data), 0)
if err != nil {
return fmt.Errorf("cannot parse minimum dedup interval %q at %q: %w", data, filePath, err)
}
ph.MinDedupInterval = dedupInterval
return nil
}
func (ph *partHeader) writeMinDedupInterval(partPath string) error {
filePath := partPath + "/min_dedup_interval"
dedupInterval := time.Duration(ph.MinDedupInterval) * time.Millisecond
data := dedupInterval.String()
if err := fs.WriteFileAtomically(filePath, []byte(data)); err != nil {
return fmt.Errorf("cannot create %q: %w", filePath, err)
}
return nil
} }

View File

@ -835,7 +835,17 @@ func (pt *partition) ForceMergeAllParts() error {
// Nothing to merge. // Nothing to merge.
return nil return nil
} }
// If len(pws) == 1, then the merge must run anyway, so deleted time series could be removed from the part.
// Check whether there is enough disk space for merging pws.
newPartSize := getPartsSize(pws)
maxOutBytes := fs.MustGetFreeSpace(pt.bigPartsPath)
if newPartSize > maxOutBytes {
freeSpaceNeededBytes := newPartSize - maxOutBytes
logger.Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes", pt.name, freeSpaceNeededBytes)
return nil
}
// If len(pws) == 1, then the merge must run anyway. This allows removing the deleted series and performing de-duplication if needed.
if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil { if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil {
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err) return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
} }
@ -1056,6 +1066,31 @@ func atomicSetBool(p *uint64, b bool) {
atomic.StoreUint64(p, v) atomic.StoreUint64(p, v)
} }
func (pt *partition) runFinalDedup() error {
if !isDedupNeeded(pt) {
return nil
}
t := time.Now()
logger.Infof("starting final dedup for partition %s", pt.name)
if err := pt.ForceMergeAllParts(); err != nil {
return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.name, err)
}
logger.Infof("final dedup for partition %s finished in %.3f seconds", pt.name, time.Since(t).Seconds())
return nil
}
func isDedupNeeded(pt *partition) bool {
pws := pt.GetParts(nil)
defer pt.PutParts(pws)
dedupInterval := GetDedupInterval()
if dedupInterval <= 0 {
// The deduplication isn't needed.
return false
}
minDedupInterval := getMinDedupInterval(pws)
return minDedupInterval < dedupInterval
}
// mergeParts merges pws. // mergeParts merges pws.
// //
// Merging is immediately stopped if stopCh is closed. // Merging is immediately stopped if stopCh is closed.
@ -1146,6 +1181,11 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
} }
bsrs = nil bsrs = nil
ph.MinDedupInterval = getMinDedupInterval(pws)
if err := ph.writeMinDedupInterval(tmpPartPath); err != nil {
return fmt.Errorf("cannot store min dedup interval for part %q: %w", tmpPartPath, err)
}
// Create a transaction for atomic deleting old parts and moving // Create a transaction for atomic deleting old parts and moving
// new part to its destination place. // new part to its destination place.
var bb bytesutil.ByteBuffer var bb bytesutil.ByteBuffer
@ -1225,6 +1265,20 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
return nil return nil
} }
func getMinDedupInterval(pws []*partWrapper) int64 {
if len(pws) == 0 {
return 0
}
dMin := pws[0].p.ph.MinDedupInterval
for _, pw := range pws[1:] {
d := pw.p.ph.MinDedupInterval
if d < dMin {
dMin = d
}
}
return dMin
}
func getCompressLevelForRowsCount(rowsCount, blocksCount uint64) int { func getCompressLevelForRowsCount(rowsCount, blocksCount uint64) int {
avgRowsPerBlock := rowsCount / blocksCount avgRowsPerBlock := rowsCount / blocksCount
if avgRowsPerBlock <= 200 { if avgRowsPerBlock <= 200 {

View File

@ -115,7 +115,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues) rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits) tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged, false) rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
tsid = &r.TSID tsid = &r.TSID
precisionBits = r.PrecisionBits precisionBits = r.PrecisionBits
@ -125,7 +125,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues) rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits) tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged, false) rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
if rowsMerged != uint64(len(rows)) { if rowsMerged != uint64(len(rows)) {
logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", rowsMerged, len(rows)) logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", rowsMerged, len(rows))
} }

View File

@ -31,7 +31,8 @@ type table struct {
stop chan struct{} stop chan struct{}
retentionWatcherWG sync.WaitGroup retentionWatcherWG sync.WaitGroup
finalDedupWatcherWG sync.WaitGroup
} }
// partitionWrapper provides refcounting mechanism for the partition. // partitionWrapper provides refcounting mechanism for the partition.
@ -135,6 +136,7 @@ func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retention
tb.addPartitionNolock(pt) tb.addPartitionNolock(pt)
} }
tb.startRetentionWatcher() tb.startRetentionWatcher()
tb.startFinalDedupWatcher()
return tb, nil return tb, nil
} }
@ -193,6 +195,7 @@ func (tb *table) addPartitionNolock(pt *partition) {
func (tb *table) MustClose() { func (tb *table) MustClose() {
close(tb.stop) close(tb.stop)
tb.retentionWatcherWG.Wait() tb.retentionWatcherWG.Wait()
tb.finalDedupWatcherWG.Wait()
tb.ptwsLock.Lock() tb.ptwsLock.Lock()
ptws := tb.ptws ptws := tb.ptws
@ -435,6 +438,47 @@ func (tb *table) retentionWatcher() {
} }
} }
func (tb *table) startFinalDedupWatcher() {
tb.finalDedupWatcherWG.Add(1)
go func() {
tb.finalDedupWatcher()
tb.finalDedupWatcherWG.Done()
}()
}
func (tb *table) finalDedupWatcher() {
if !isDedupEnabled() {
// Deduplication is disabled.
return
}
f := func() {
ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws)
timestamp := timestampFromTime(time.Now())
currentPartitionName := timestampToPartitionName(timestamp)
for _, ptw := range ptws {
if ptw.pt.name == currentPartitionName {
// Do not run final dedup for the current month.
continue
}
if err := ptw.pt.runFinalDedup(); err != nil {
logger.Errorf("cannot run final dedup for partition %s: %s", ptw.pt.name, err)
continue
}
}
}
t := time.NewTicker(time.Hour)
defer t.Stop()
for {
select {
case <-tb.stop:
return
case <-t.C:
f()
}
}
}
// GetPartitions appends tb's partitions snapshot to dst and returns the result. // GetPartitions appends tb's partitions snapshot to dst and returns the result.
// //
// The returned partitions must be passed to PutPartitions // The returned partitions must be passed to PutPartitions