package storage import ( "fmt" "io" "path/filepath" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) // blockStreamReader represents block stream reader. type blockStreamReader struct { // Currently active block. Block Block // Contains TSID for the previous block. // This field is needed for checking that TSIDs // increase over time when reading blocks. tsidPrev TSID // Filesystem path to the stream reader. // // Is empty for inmemory stream readers. path string ph partHeader // Use io.Reader type for timestampsReader and valuesReader // in order to remove I2I conversion in readBlock // when passing them to fs.ReadFullData timestampsReader io.Reader valuesReader io.Reader indexReader filestream.ReadCloser mrs []metaindexRow // Points the current mr from mrs. mr *metaindexRow // The total number of rows read so far. rowsCount uint64 // The total number of blocks read so far. blocksCount uint64 // The number of block headers in the current index block. indexBlockHeadersCount uint32 timestampsBlockOffset uint64 valuesBlockOffset uint64 indexBlockOffset uint64 prevTimestampsBlockOffset uint64 prevTimestampsData []byte indexData []byte compressedIndexData []byte // Cursor to indexData. indexCursor []byte err error } func (bsr *blockStreamReader) assertWriteClosers() { _ = bsr.timestampsReader.(filestream.ReadCloser) _ = bsr.valuesReader.(filestream.ReadCloser) } func (bsr *blockStreamReader) reset() { bsr.Block.Reset() bsr.path = "" bsr.ph.Reset() bsr.timestampsReader = nil bsr.valuesReader = nil bsr.indexReader = nil bsr.mrs = bsr.mrs[:0] bsr.mr = nil bsr.rowsCount = 0 bsr.blocksCount = 0 bsr.indexBlockHeadersCount = 0 bsr.timestampsBlockOffset = 0 bsr.valuesBlockOffset = 0 bsr.indexBlockOffset = 0 bsr.prevTimestampsBlockOffset = 0 bsr.prevTimestampsData = bsr.prevTimestampsData[:0] bsr.indexData = bsr.indexData[:0] bsr.compressedIndexData = bsr.compressedIndexData[:0] bsr.indexCursor = nil bsr.err = nil } // String returns human-readable representation of bsr. func (bsr *blockStreamReader) String() string { if len(bsr.path) > 0 { return bsr.path } return bsr.ph.String() } // InitFromInmemoryPart initializes bsr from the given mp. func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) { bsr.reset() bsr.ph = mp.ph bsr.timestampsReader = mp.timestampsData.NewReader() bsr.valuesReader = mp.valuesData.NewReader() bsr.indexReader = mp.indexData.NewReader() var err error bsr.mrs, err = unmarshalMetaindexRows(bsr.mrs[:0], mp.metaindexData.NewReader()) if err != nil { logger.Panicf("BUG: cannot unmarshal metaindex rows from inmemoryPart: %s", err) } bsr.assertWriteClosers() } // InitFromFilePart initializes bsr from a file-based part on the given path. // // Files in the part are always read without OS cache pollution, // since they are usually deleted after the merge. func (bsr *blockStreamReader) InitFromFilePart(path string) error { bsr.reset() path = filepath.Clean(path) if err := bsr.ph.ReadMetadata(path); err != nil { return fmt.Errorf("cannot parse path to part: %w", err) } timestampsPath := filepath.Join(path, timestampsFilename) timestampsFile, err := filestream.Open(timestampsPath, true) if err != nil { return fmt.Errorf("cannot open timestamps file in stream mode: %w", err) } valuesPath := filepath.Join(path, valuesFilename) valuesFile, err := filestream.Open(valuesPath, true) if err != nil { timestampsFile.MustClose() return fmt.Errorf("cannot open values file in stream mode: %w", err) } indexPath := filepath.Join(path, indexFilename) indexFile, err := filestream.Open(indexPath, true) if err != nil { timestampsFile.MustClose() valuesFile.MustClose() return fmt.Errorf("cannot open index file in stream mode: %w", err) } metaindexPath := filepath.Join(path, metaindexFilename) metaindexFile, err := filestream.Open(metaindexPath, true) if err != nil { timestampsFile.MustClose() valuesFile.MustClose() indexFile.MustClose() return fmt.Errorf("cannot open metaindex file in stream mode: %w", err) } mrs, err := unmarshalMetaindexRows(bsr.mrs[:0], metaindexFile) metaindexFile.MustClose() if err != nil { timestampsFile.MustClose() valuesFile.MustClose() indexFile.MustClose() return fmt.Errorf("cannot unmarshal metaindex rows from file part %q: %w", metaindexPath, err) } bsr.path = path bsr.timestampsReader = timestampsFile bsr.valuesReader = valuesFile bsr.indexReader = indexFile bsr.mrs = mrs bsr.assertWriteClosers() return nil } // MustClose closes the bsr. // // It closes *Reader files passed to Init. func (bsr *blockStreamReader) MustClose() { bsr.timestampsReader.(filestream.ReadCloser).MustClose() bsr.valuesReader.(filestream.ReadCloser).MustClose() bsr.indexReader.MustClose() bsr.reset() } // Error returns the last error. func (bsr *blockStreamReader) Error() error { if bsr.err == nil || bsr.err == io.EOF { return nil } return fmt.Errorf("error when reading part %q: %w", bsr, bsr.err) } // NextBlock advances bsr to the next block. func (bsr *blockStreamReader) NextBlock() bool { if bsr.err != nil { return false } bsr.tsidPrev = bsr.Block.bh.TSID bsr.Block.Reset() err := bsr.readBlock() if err == nil { if bsr.Block.bh.TSID.Less(&bsr.tsidPrev) { bsr.err = fmt.Errorf("possible data corruption: the next TSID=%v is smaller than the previous TSID=%v", &bsr.Block.bh.TSID, &bsr.tsidPrev) return false } if bsr.Block.bh.RowsCount == 0 { bsr.err = fmt.Errorf("invalid block read with zero rows; block=%+v", &bsr.Block) return false } return true } if err == io.EOF { bsr.err = io.EOF return false } bsr.err = fmt.Errorf("cannot read next block: %w", err) return false } func (bsr *blockStreamReader) readBlock() error { if len(bsr.indexCursor) == 0 { if bsr.mr != nil && bsr.indexBlockHeadersCount != bsr.mr.BlockHeadersCount { return fmt.Errorf("invalid number of block headers in the previous index block at offset %d; got %d; want %d", bsr.prevIndexBlockOffset(), bsr.indexBlockHeadersCount, bsr.mr.BlockHeadersCount) } bsr.indexBlockHeadersCount = 0 if err := bsr.readIndexBlock(); err != nil { if err == io.EOF { return io.EOF } return fmt.Errorf("cannot read index block: %w", err) } } // Read block header. if len(bsr.indexCursor) < marshaledBlockHeaderSize { return fmt.Errorf("too short index data for reading block header at offset %d; got %d bytes; want %d bytes", bsr.prevIndexBlockOffset(), len(bsr.indexCursor), marshaledBlockHeaderSize) } bsr.Block.headerData = append(bsr.Block.headerData[:0], bsr.indexCursor[:marshaledBlockHeaderSize]...) bsr.indexCursor = bsr.indexCursor[marshaledBlockHeaderSize:] tail, err := bsr.Block.bh.Unmarshal(bsr.Block.headerData) if err != nil { return fmt.Errorf("cannot parse block header read from index data at offset %d: %w", bsr.prevIndexBlockOffset(), err) } if len(tail) > 0 { return fmt.Errorf("non-empty tail left after parsing block header at offset %d: %x", bsr.prevIndexBlockOffset(), tail) } bsr.blocksCount++ if bsr.blocksCount > bsr.ph.BlocksCount { return fmt.Errorf("too many blocks found in the block stream; got %d; cannot be bigger than %d", bsr.blocksCount, bsr.ph.BlocksCount) } // Validate block header. bsr.rowsCount += uint64(bsr.Block.bh.RowsCount) if bsr.rowsCount > bsr.ph.RowsCount { return fmt.Errorf("too many rows found in the block stream; got %d; cannot be bigger than %d", bsr.rowsCount, bsr.ph.RowsCount) } if bsr.Block.bh.MinTimestamp < bsr.ph.MinTimestamp { return fmt.Errorf("invalid MinTimestamp at block header at offset %d; got %d; cannot be smaller than %d", bsr.prevIndexBlockOffset(), bsr.Block.bh.MinTimestamp, bsr.ph.MinTimestamp) } if bsr.Block.bh.MaxTimestamp > bsr.ph.MaxTimestamp { return fmt.Errorf("invalid MaxTimestamp at block header at offset %d; got %d; cannot be bigger than %d", bsr.prevIndexBlockOffset(), bsr.Block.bh.MaxTimestamp, bsr.ph.MaxTimestamp) } usePrevTimestamps := len(bsr.prevTimestampsData) > 0 && bsr.Block.bh.TimestampsBlockOffset == bsr.prevTimestampsBlockOffset if usePrevTimestamps { if int(bsr.Block.bh.TimestampsBlockSize) != len(bsr.prevTimestampsData) { return fmt.Errorf("invalid TimestampsBlockSize at block header at offset %d; got %d; want %d", bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockSize, len(bsr.prevTimestampsData)) } } else if bsr.Block.bh.TimestampsBlockOffset != bsr.timestampsBlockOffset { return fmt.Errorf("invalid TimestampsBlockOffset at block header at offset %d; got %d; want %d", bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockOffset, bsr.timestampsBlockOffset) } if bsr.Block.bh.ValuesBlockOffset != bsr.valuesBlockOffset { return fmt.Errorf("invalid ValuesBlockOffset at block header at offset %d; got %d; want %d", bsr.prevIndexBlockOffset(), bsr.Block.bh.ValuesBlockOffset, bsr.valuesBlockOffset) } // Read timestamps data. if usePrevTimestamps { bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...) } else { bsr.Block.timestampsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize)) if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil { return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err) } bsr.prevTimestampsBlockOffset = bsr.timestampsBlockOffset bsr.prevTimestampsData = append(bsr.prevTimestampsData[:0], bsr.Block.timestampsData...) } // Read values data. bsr.Block.valuesData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize)) if err := fs.ReadFullData(bsr.valuesReader, bsr.Block.valuesData); err != nil { return fmt.Errorf("cannot read values block at offset %d: %w", bsr.valuesBlockOffset, err) } // Update offsets. if !usePrevTimestamps { bsr.timestampsBlockOffset += uint64(bsr.Block.bh.TimestampsBlockSize) } bsr.valuesBlockOffset += uint64(bsr.Block.bh.ValuesBlockSize) bsr.indexBlockHeadersCount++ return nil } func (bsr *blockStreamReader) readIndexBlock() error { // Go to the next metaindex row. if len(bsr.mrs) == 0 { return io.EOF } bsr.mr = &bsr.mrs[0] bsr.mrs = bsr.mrs[1:] // Validate metaindex row. if bsr.indexBlockOffset != bsr.mr.IndexBlockOffset { return fmt.Errorf("invalid IndexBlockOffset in metaindex row; got %d; want %d", bsr.mr.IndexBlockOffset, bsr.indexBlockOffset) } if bsr.mr.MinTimestamp < bsr.ph.MinTimestamp { return fmt.Errorf("invalid MinTimesamp in metaindex row; got %d; cannot be smaller than %d", bsr.mr.MinTimestamp, bsr.ph.MinTimestamp) } if bsr.mr.MaxTimestamp > bsr.ph.MaxTimestamp { return fmt.Errorf("invalid MaxTimestamp in metaindex row; got %d; cannot be bigger than %d", bsr.mr.MaxTimestamp, bsr.ph.MaxTimestamp) } // Read index block. bsr.compressedIndexData = bytesutil.ResizeNoCopyMayOverallocate(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize)) if err := fs.ReadFullData(bsr.indexReader, bsr.compressedIndexData); err != nil { return fmt.Errorf("cannot read index block at offset %d: %w", bsr.indexBlockOffset, err) } tmpData, err := encoding.DecompressZSTD(bsr.indexData[:0], bsr.compressedIndexData) if err != nil { return fmt.Errorf("cannot decompress index block at offset %d: %w", bsr.indexBlockOffset, err) } bsr.indexData = tmpData bsr.indexCursor = bsr.indexData // Update offsets. bsr.indexBlockOffset += uint64(bsr.mr.IndexBlockSize) return nil } func (bsr *blockStreamReader) prevIndexBlockOffset() uint64 { return bsr.indexBlockOffset - uint64(bsr.mr.IndexBlockSize) } func getBlockStreamReader() *blockStreamReader { v := bsrPool.Get() if v == nil { return &blockStreamReader{} } return v.(*blockStreamReader) } func putBlockStreamReader(bsr *blockStreamReader) { bsr.MustClose() bsrPool.Put(bsr) } var bsrPool sync.Pool