VictoriaMetrics/lib/mergeset/part.go
Aliaksandr Valialkin 0bbb281c3d
lib/filestream: transform Open() -> MustOpen()
Callers of this function log the returned error and exit.
Let's log the error with the path to the filename and call stack
inside the function. This simplifies the code at callers' side
without reducing the level of debuggability.
2023-04-14 15:04:54 -07:00

147 lines
3.6 KiB
Go

package mergeset
import (
"fmt"
"path/filepath"
"sync"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
var idxbCache = blockcache.NewCache(getMaxIndexBlocksCacheSize)
var ibCache = blockcache.NewCache(getMaxInmemoryBlocksCacheSize)
// SetIndexBlocksCacheSize overrides the default size of indexdb/indexBlock cache
func SetIndexBlocksCacheSize(size int) {
maxIndexBlockCacheSize = size
}
func getMaxIndexBlocksCacheSize() int {
maxIndexBlockCacheSizeOnce.Do(func() {
if maxIndexBlockCacheSize <= 0 {
maxIndexBlockCacheSize = int(0.10 * float64(memory.Allowed()))
}
})
return maxIndexBlockCacheSize
}
var (
maxIndexBlockCacheSize int
maxIndexBlockCacheSizeOnce sync.Once
)
// SetDataBlocksCacheSize overrides the default size of indexdb/dataBlocks cache
func SetDataBlocksCacheSize(size int) {
maxInmemoryBlockCacheSize = size
}
func getMaxInmemoryBlocksCacheSize() int {
maxInmemoryBlockCacheSizeOnce.Do(func() {
if maxInmemoryBlockCacheSize <= 0 {
maxInmemoryBlockCacheSize = int(0.25 * float64(memory.Allowed()))
}
})
return maxInmemoryBlockCacheSize
}
var (
maxInmemoryBlockCacheSize int
maxInmemoryBlockCacheSizeOnce sync.Once
)
type part struct {
ph partHeader
path string
size uint64
mrs []metaindexRow
indexFile fs.MustReadAtCloser
itemsFile fs.MustReadAtCloser
lensFile fs.MustReadAtCloser
}
func openFilePart(path string) (*part, error) {
var ph partHeader
if err := ph.ReadMetadata(path); err != nil {
return nil, fmt.Errorf("cannot read part metadata: %w", err)
}
metaindexPath := filepath.Join(path, metaindexFilename)
metaindexFile := filestream.MustOpen(metaindexPath, true)
metaindexSize := fs.MustFileSize(metaindexPath)
indexPath := filepath.Join(path, indexFilename)
indexFile := fs.MustOpenReaderAt(indexPath)
indexSize := fs.MustFileSize(indexPath)
itemsPath := filepath.Join(path, itemsFilename)
itemsFile := fs.MustOpenReaderAt(itemsPath)
itemsSize := fs.MustFileSize(itemsPath)
lensPath := filepath.Join(path, lensFilename)
lensFile := fs.MustOpenReaderAt(lensPath)
lensSize := fs.MustFileSize(lensPath)
size := metaindexSize + indexSize + itemsSize + lensSize
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
}
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) (*part, error) {
var errors []error
mrs, err := unmarshalMetaindexRows(nil, metaindexReader)
if err != nil {
errors = append(errors, fmt.Errorf("cannot unmarshal metaindexRows: %w", err))
}
metaindexReader.MustClose()
var p part
p.path = path
p.size = size
p.mrs = mrs
p.indexFile = indexFile
p.itemsFile = itemsFile
p.lensFile = lensFile
p.ph.CopyFrom(ph)
if len(errors) > 0 {
// Return only the first error, since it has no sense in returning all errors.
err := fmt.Errorf("error opening part %s: %w", p.path, errors[0])
p.MustClose()
return nil, err
}
return &p, nil
}
func (p *part) MustClose() {
p.indexFile.MustClose()
p.itemsFile.MustClose()
p.lensFile.MustClose()
idxbCache.RemoveBlocksForPart(p)
ibCache.RemoveBlocksForPart(p)
}
type indexBlock struct {
bhs []blockHeader
// The buffer for holding the data referrred by bhs
buf []byte
}
func (idxb *indexBlock) SizeBytes() int {
bhs := idxb.bhs[:cap(idxb.bhs)]
n := int(unsafe.Sizeof(*idxb))
for i := range bhs {
n += bhs[i].SizeBytes()
}
return n
}