lib/storage: reuse timestamp blocks for adjancent metric blocks with identical timestamps

This should reduce disk space usage when scraping targets containing metrics with identical names
such as `node_cpu_seconds_total`, histograms, quantiles, etc.

Expose `vm_timestamps_blocks_merged_total` and `vm_timestamps_bytes_saved_total` metrics for monitoring
the effectiveness of timestamp blocks merging.
This commit is contained in:
Aliaksandr Valialkin 2020-09-09 23:18:32 +03:00
parent 475698d2ad
commit f5cb213ef9
4 changed files with 68 additions and 11 deletions

View File

@ -389,6 +389,13 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(m().SlowMetricNameLoads) return float64(m().SlowMetricNameLoads)
}) })
metrics.NewGauge(`vm_timestamps_blocks_merged_total`, func() float64 {
return float64(m().TimestampsBlocksMerged)
})
metrics.NewGauge(`vm_timestamps_bytes_saved_total`, func() float64 {
return float64(m().TimestampsBytesSaved)
})
metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 { metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 {
return float64(tm().BigRowsCount) return float64(tm().BigRowsCount)
}) })

View File

@ -51,6 +51,9 @@ type blockStreamReader struct {
valuesBlockOffset uint64 valuesBlockOffset uint64
indexBlockOffset uint64 indexBlockOffset uint64
prevTimestampsBlockOffset uint64
prevTimestampsData []byte
indexData []byte indexData []byte
compressedIndexData []byte compressedIndexData []byte
@ -87,6 +90,9 @@ func (bsr *blockStreamReader) reset() {
bsr.valuesBlockOffset = 0 bsr.valuesBlockOffset = 0
bsr.indexBlockOffset = 0 bsr.indexBlockOffset = 0
bsr.prevTimestampsBlockOffset = 0
bsr.prevTimestampsData = bsr.prevTimestampsData[:0]
bsr.indexData = bsr.indexData[:0] bsr.indexData = bsr.indexData[:0]
bsr.compressedIndexData = bsr.compressedIndexData[:0] bsr.compressedIndexData = bsr.compressedIndexData[:0]
@ -275,7 +281,13 @@ func (bsr *blockStreamReader) readBlock() error {
return fmt.Errorf("invalid MaxTimestamp at block header at offset %d; got %d; cannot be bigger than %d", return fmt.Errorf("invalid MaxTimestamp at block header at offset %d; got %d; cannot be bigger than %d",
bsr.prevIndexBlockOffset(), bsr.Block.bh.MaxTimestamp, bsr.ph.MaxTimestamp) bsr.prevIndexBlockOffset(), bsr.Block.bh.MaxTimestamp, bsr.ph.MaxTimestamp)
} }
if bsr.Block.bh.TimestampsBlockOffset != bsr.timestampsBlockOffset { usePrevTimestamps := len(bsr.prevTimestampsData) > 0 && bsr.Block.bh.TimestampsBlockOffset == bsr.prevTimestampsBlockOffset
if usePrevTimestamps {
if int(bsr.Block.bh.TimestampsBlockSize) != len(bsr.prevTimestampsData) {
return fmt.Errorf("invalid TimestampsBlockSize at block header at offset %d; got %d; want %d",
bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockSize, len(bsr.prevTimestampsData))
}
} else if bsr.Block.bh.TimestampsBlockOffset != bsr.timestampsBlockOffset {
return fmt.Errorf("invalid TimestampsBlockOffset at block header at offset %d; got %d; want %d", return fmt.Errorf("invalid TimestampsBlockOffset at block header at offset %d; got %d; want %d",
bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockOffset, bsr.timestampsBlockOffset) bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockOffset, bsr.timestampsBlockOffset)
} }
@ -285,10 +297,16 @@ func (bsr *blockStreamReader) readBlock() error {
} }
// Read timestamps data. // Read timestamps data.
if usePrevTimestamps {
bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...)
} else {
bsr.Block.timestampsData = bytesutil.Resize(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize)) bsr.Block.timestampsData = bytesutil.Resize(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize))
if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil { if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil {
return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err) return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err)
} }
bsr.prevTimestampsBlockOffset = bsr.timestampsBlockOffset
bsr.prevTimestampsData = append(bsr.prevTimestampsData[:0], bsr.Block.timestampsData...)
}
// Read values data. // Read values data.
bsr.Block.valuesData = bytesutil.Resize(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize)) bsr.Block.valuesData = bytesutil.Resize(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize))
@ -297,7 +315,9 @@ func (bsr *blockStreamReader) readBlock() error {
} }
// Update offsets. // Update offsets.
if !usePrevTimestamps {
bsr.timestampsBlockOffset += uint64(bsr.Block.bh.TimestampsBlockSize) bsr.timestampsBlockOffset += uint64(bsr.Block.bh.TimestampsBlockSize)
}
bsr.valuesBlockOffset += uint64(bsr.Block.bh.ValuesBlockSize) bsr.valuesBlockOffset += uint64(bsr.Block.bh.ValuesBlockSize)
bsr.indexBlockHeadersCount++ bsr.indexBlockHeadersCount++

View File

@ -1,6 +1,7 @@
package storage package storage
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
@ -38,6 +39,13 @@ type blockStreamWriter struct {
metaindexData []byte metaindexData []byte
compressedMetaindexData []byte compressedMetaindexData []byte
// prevTimestamps* is used as an optimization for reducing disk space usage
// when serially written blocks have identical timestamps.
// This is usually the case when adjancent blocks contain metrics scraped from the same target,
// since such metrics have identical timestamps.
prevTimestampsData []byte
prevTimestampsBlockOffset uint64
} }
func (bsw *blockStreamWriter) assertWriteClosers() { func (bsw *blockStreamWriter) assertWriteClosers() {
@ -66,6 +74,9 @@ func (bsw *blockStreamWriter) reset() {
bsw.metaindexData = bsw.metaindexData[:0] bsw.metaindexData = bsw.metaindexData[:0]
bsw.compressedMetaindexData = bsw.compressedMetaindexData[:0] bsw.compressedMetaindexData = bsw.compressedMetaindexData[:0]
bsw.prevTimestampsData = bsw.prevTimestampsData[:0]
bsw.prevTimestampsBlockOffset = 0
} }
// InitFromInmemoryPart initialzes bsw from inmemory part. // InitFromInmemoryPart initialzes bsw from inmemory part.
@ -177,22 +188,35 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM
atomic.AddUint64(rowsMerged, uint64(b.rowsCount())) atomic.AddUint64(rowsMerged, uint64(b.rowsCount()))
b.deduplicateSamplesDuringMerge() b.deduplicateSamplesDuringMerge()
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData)
if usePrevTimestamps {
// The current timestamps block equals to the previous timestamps block.
// Update headerData so it points to the previous timestamps block. This saves disk space.
headerData, timestampsData, valuesData = b.MarshalData(bsw.prevTimestampsBlockOffset, bsw.valuesBlockOffset)
atomic.AddUint64(&timestampsBlocksMerged, 1)
atomic.AddUint64(&timestampsBytesSaved, uint64(len(timestampsData)))
}
bsw.indexData = append(bsw.indexData, headerData...) bsw.indexData = append(bsw.indexData, headerData...)
bsw.mr.RegisterBlockHeader(&b.bh) bsw.mr.RegisterBlockHeader(&b.bh)
if len(bsw.indexData) >= maxBlockSize { if len(bsw.indexData) >= maxBlockSize {
bsw.flushIndexData() bsw.flushIndexData()
} }
if !usePrevTimestamps {
bsw.prevTimestampsData = append(bsw.prevTimestampsData[:0], timestampsData...)
bsw.prevTimestampsBlockOffset = bsw.timestampsBlockOffset
fs.MustWriteData(bsw.timestampsWriter, timestampsData) fs.MustWriteData(bsw.timestampsWriter, timestampsData)
bsw.timestampsBlockOffset += uint64(len(timestampsData)) bsw.timestampsBlockOffset += uint64(len(timestampsData))
}
fs.MustWriteData(bsw.valuesWriter, valuesData) fs.MustWriteData(bsw.valuesWriter, valuesData)
bsw.valuesBlockOffset += uint64(len(valuesData)) bsw.valuesBlockOffset += uint64(len(valuesData))
updatePartHeader(b, ph) updatePartHeader(b, ph)
} }
var (
timestampsBlocksMerged uint64
timestampsBytesSaved uint64
)
func updatePartHeader(b *Block, ph *partHeader) { func updatePartHeader(b *Block, ph *partHeader) {
ph.BlocksCount++ ph.BlocksCount++
ph.RowsCount += uint64(b.bh.RowsCount) ph.RowsCount += uint64(b.bh.RowsCount)

View File

@ -355,6 +355,9 @@ type Metrics struct {
SlowPerDayIndexInserts uint64 SlowPerDayIndexInserts uint64
SlowMetricNameLoads uint64 SlowMetricNameLoads uint64
TimestampsBlocksMerged uint64
TimestampsBytesSaved uint64
TSIDCacheSize uint64 TSIDCacheSize uint64
TSIDCacheSizeBytes uint64 TSIDCacheSizeBytes uint64
TSIDCacheRequests uint64 TSIDCacheRequests uint64
@ -420,6 +423,9 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads) m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
m.TimestampsBlocksMerged = atomic.LoadUint64(&timestampsBlocksMerged)
m.TimestampsBytesSaved = atomic.LoadUint64(&timestampsBytesSaved)
var cs fastcache.Stats var cs fastcache.Stats
s.tsidCache.UpdateStats(&cs) s.tsidCache.UpdateStats(&cs)
m.TSIDCacheSize += cs.EntriesCount m.TSIDCacheSize += cs.EntriesCount