diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 61369c025f..78f03a6127 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -389,6 +389,13 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(m().SlowMetricNameLoads) }) + metrics.NewGauge(`vm_timestamps_blocks_merged_total`, func() float64 { + return float64(m().TimestampsBlocksMerged) + }) + metrics.NewGauge(`vm_timestamps_bytes_saved_total`, func() float64 { + return float64(m().TimestampsBytesSaved) + }) + metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 { return float64(tm().BigRowsCount) }) diff --git a/lib/storage/block_stream_reader.go b/lib/storage/block_stream_reader.go index 9b5e206190..f4aa6fab23 100644 --- a/lib/storage/block_stream_reader.go +++ b/lib/storage/block_stream_reader.go @@ -51,6 +51,9 @@ type blockStreamReader struct { valuesBlockOffset uint64 indexBlockOffset uint64 + prevTimestampsBlockOffset uint64 + prevTimestampsData []byte + indexData []byte compressedIndexData []byte @@ -87,6 +90,9 @@ func (bsr *blockStreamReader) reset() { bsr.valuesBlockOffset = 0 bsr.indexBlockOffset = 0 + bsr.prevTimestampsBlockOffset = 0 + bsr.prevTimestampsData = bsr.prevTimestampsData[:0] + bsr.indexData = bsr.indexData[:0] bsr.compressedIndexData = bsr.compressedIndexData[:0] @@ -275,7 +281,13 @@ func (bsr *blockStreamReader) readBlock() error { return fmt.Errorf("invalid MaxTimestamp at block header at offset %d; got %d; cannot be bigger than %d", bsr.prevIndexBlockOffset(), bsr.Block.bh.MaxTimestamp, bsr.ph.MaxTimestamp) } - if bsr.Block.bh.TimestampsBlockOffset != bsr.timestampsBlockOffset { + usePrevTimestamps := len(bsr.prevTimestampsData) > 0 && bsr.Block.bh.TimestampsBlockOffset == bsr.prevTimestampsBlockOffset + if usePrevTimestamps { + if int(bsr.Block.bh.TimestampsBlockSize) != len(bsr.prevTimestampsData) { + return fmt.Errorf("invalid TimestampsBlockSize at block header at offset %d; got %d; want %d", + bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockSize, len(bsr.prevTimestampsData)) + } + } else if bsr.Block.bh.TimestampsBlockOffset != bsr.timestampsBlockOffset { return fmt.Errorf("invalid TimestampsBlockOffset at block header at offset %d; got %d; want %d", bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockOffset, bsr.timestampsBlockOffset) } @@ -285,9 +297,15 @@ func (bsr *blockStreamReader) readBlock() error { } // Read timestamps data. - bsr.Block.timestampsData = bytesutil.Resize(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize)) - if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil { - return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err) + if usePrevTimestamps { + bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...) + } else { + bsr.Block.timestampsData = bytesutil.Resize(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize)) + if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil { + return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err) + } + bsr.prevTimestampsBlockOffset = bsr.timestampsBlockOffset + bsr.prevTimestampsData = append(bsr.prevTimestampsData[:0], bsr.Block.timestampsData...) } // Read values data. @@ -297,7 +315,9 @@ func (bsr *blockStreamReader) readBlock() error { } // Update offsets. - bsr.timestampsBlockOffset += uint64(bsr.Block.bh.TimestampsBlockSize) + if !usePrevTimestamps { + bsr.timestampsBlockOffset += uint64(bsr.Block.bh.TimestampsBlockSize) + } bsr.valuesBlockOffset += uint64(bsr.Block.bh.ValuesBlockSize) bsr.indexBlockHeadersCount++ diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index ce35c0b0b9..899facbf75 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -1,6 +1,7 @@ package storage import ( + "bytes" "fmt" "io" "path/filepath" @@ -38,6 +39,13 @@ type blockStreamWriter struct { metaindexData []byte compressedMetaindexData []byte + + // prevTimestamps* is used as an optimization for reducing disk space usage + // when serially written blocks have identical timestamps. + // This is usually the case when adjancent blocks contain metrics scraped from the same target, + // since such metrics have identical timestamps. + prevTimestampsData []byte + prevTimestampsBlockOffset uint64 } func (bsw *blockStreamWriter) assertWriteClosers() { @@ -66,6 +74,9 @@ func (bsw *blockStreamWriter) reset() { bsw.metaindexData = bsw.metaindexData[:0] bsw.compressedMetaindexData = bsw.compressedMetaindexData[:0] + + bsw.prevTimestampsData = bsw.prevTimestampsData[:0] + bsw.prevTimestampsBlockOffset = 0 } // InitFromInmemoryPart initialzes bsw from inmemory part. @@ -177,22 +188,35 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM atomic.AddUint64(rowsMerged, uint64(b.rowsCount())) b.deduplicateSamplesDuringMerge() headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) - + usePrevTimestamps := len(bsw.prevTimestampsData) > 0 && bytes.Equal(timestampsData, bsw.prevTimestampsData) + if usePrevTimestamps { + // The current timestamps block equals to the previous timestamps block. + // Update headerData so it points to the previous timestamps block. This saves disk space. + headerData, timestampsData, valuesData = b.MarshalData(bsw.prevTimestampsBlockOffset, bsw.valuesBlockOffset) + atomic.AddUint64(×tampsBlocksMerged, 1) + atomic.AddUint64(×tampsBytesSaved, uint64(len(timestampsData))) + } bsw.indexData = append(bsw.indexData, headerData...) bsw.mr.RegisterBlockHeader(&b.bh) if len(bsw.indexData) >= maxBlockSize { bsw.flushIndexData() } - - fs.MustWriteData(bsw.timestampsWriter, timestampsData) - bsw.timestampsBlockOffset += uint64(len(timestampsData)) - + if !usePrevTimestamps { + bsw.prevTimestampsData = append(bsw.prevTimestampsData[:0], timestampsData...) + bsw.prevTimestampsBlockOffset = bsw.timestampsBlockOffset + fs.MustWriteData(bsw.timestampsWriter, timestampsData) + bsw.timestampsBlockOffset += uint64(len(timestampsData)) + } fs.MustWriteData(bsw.valuesWriter, valuesData) bsw.valuesBlockOffset += uint64(len(valuesData)) - updatePartHeader(b, ph) } +var ( + timestampsBlocksMerged uint64 + timestampsBytesSaved uint64 +) + func updatePartHeader(b *Block, ph *partHeader) { ph.BlocksCount++ ph.RowsCount += uint64(b.bh.RowsCount) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index d4c45fdca3..cc57f1acb1 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -355,6 +355,9 @@ type Metrics struct { SlowPerDayIndexInserts uint64 SlowMetricNameLoads uint64 + TimestampsBlocksMerged uint64 + TimestampsBytesSaved uint64 + TSIDCacheSize uint64 TSIDCacheSizeBytes uint64 TSIDCacheRequests uint64 @@ -420,6 +423,9 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads) + m.TimestampsBlocksMerged = atomic.LoadUint64(×tampsBlocksMerged) + m.TimestampsBytesSaved = atomic.LoadUint64(×tampsBytesSaved) + var cs fastcache.Stats s.tsidCache.UpdateStats(&cs) m.TSIDCacheSize += cs.EntriesCount