lib/storage: drop more samples outside the given retention during background merge

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/17
This commit is contained in:
Aliaksandr Valialkin 2020-10-31 20:42:13 +02:00
parent abdf22e0bb
commit 901514be88
2 changed files with 33 additions and 34 deletions

View File

@ -23,7 +23,7 @@ const (
type Block struct {
bh blockHeader
// nextIdx is the next row index for timestamps and values.
// nextIdx is the next index for reading timestamps and values.
nextIdx int
timestamps []int64

View File

@ -40,8 +40,11 @@ var errForciblyStopped = fmt.Errorf("forcibly stopped")
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{},
dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error {
// Search for the first block to merge
var pendingBlock *Block
pendingBlockIsEmpty := true
pendingBlock := getBlock()
defer putBlock(pendingBlock)
tmpBlock := getBlock()
defer putBlock(tmpBlock)
for bsm.NextBlock() {
select {
case <-stopCh:
@ -58,31 +61,10 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
}
pendingBlock = getBlock()
pendingBlock.CopyFrom(bsm.Block)
break
}
if pendingBlock != nil {
defer putBlock(pendingBlock)
}
// Merge blocks.
tmpBlock := getBlock()
defer putBlock(tmpBlock)
for bsm.NextBlock() {
select {
case <-stopCh:
return errForciblyStopped
default:
}
if dmis.Has(bsm.Block.bh.TSID.MetricID) {
// Skip blocks for deleted metrics.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
}
if bsm.Block.bh.MaxTimestamp < retentionDeadline {
// skip blocks out of the given retention.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
if pendingBlockIsEmpty {
// Load the next block if pendingBlock is empty.
pendingBlock.CopyFrom(bsm.Block)
pendingBlockIsEmpty = false
continue
}
@ -114,16 +96,20 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
tmpBlock.bh.TSID = bsm.Block.bh.TSID
tmpBlock.bh.Scale = bsm.Block.bh.Scale
tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, bsm.Block.bh.PrecisionBits)
mergeBlocks(tmpBlock, pendingBlock, bsm.Block)
mergeBlocks(tmpBlock, pendingBlock, bsm.Block, retentionDeadline, rowsDeleted)
if len(tmpBlock.timestamps) <= maxRowsPerBlock {
// More entries may be added to tmpBlock. Swap it with pendingBlock,
// so more entries may be added to pendingBlock on the next iteration.
tmpBlock.fixupTimestamps()
if len(tmpBlock.timestamps) > 0 {
tmpBlock.fixupTimestamps()
} else {
pendingBlockIsEmpty = true
}
pendingBlock, tmpBlock = tmpBlock, pendingBlock
continue
}
// Write the first len(maxRowsPerBlock) of tmpBlock.timestamps to bsw,
// Write the first maxRowsPerBlock of tmpBlock.timestamps to bsw,
// leave the rest in pendingBlock.
tmpBlock.nextIdx = maxRowsPerBlock
pendingBlock.CopyFrom(tmpBlock)
@ -137,18 +123,21 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
if err := bsm.Error(); err != nil {
return fmt.Errorf("cannot read block to be merged: %w", err)
}
if pendingBlock != nil {
if !pendingBlockIsEmpty {
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
}
return nil
}
// mergeBlocks merges ib1 and ib2 to ob.
func mergeBlocks(ob, ib1, ib2 *Block) {
func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint64) {
ib1.assertMergeable(ib2)
ib1.assertUnmarshaled()
ib2.assertUnmarshaled()
skipSamplesOutsideRetention(ib1, retentionDeadline, rowsDeleted)
skipSamplesOutsideRetention(ib2, retentionDeadline, rowsDeleted)
if ib1.bh.MaxTimestamp < ib2.bh.MinTimestamp {
// Fast path - ib1 values have smaller timestamps than ib2 values.
appendRows(ob, ib1)
@ -186,6 +175,16 @@ func mergeBlocks(ob, ib1, ib2 *Block) {
}
}
func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *uint64) {
timestamps := b.timestamps
nextIdx := b.nextIdx
for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline {
nextIdx++
}
*rowsDeleted += uint64(nextIdx - b.nextIdx)
b.nextIdx = nextIdx
}
func appendRows(ob, ib *Block) {
ob.timestamps = append(ob.timestamps, ib.timestamps[ib.nextIdx:]...)
ob.values = append(ob.values, ib.values[ib.nextIdx:]...)
@ -199,7 +198,7 @@ func unmarshalAndCalibrateScale(b1, b2 *Block) error {
return err
}
scale := decimal.CalibrateScale(b1.values, b1.bh.Scale, b2.values, b2.bh.Scale)
scale := decimal.CalibrateScale(b1.values[b1.nextIdx:], b1.bh.Scale, b2.values[b2.nextIdx:], b2.bh.Scale)
b1.bh.Scale = scale
b2.bh.Scale = scale
return nil