VictoriaMetrics/lib/mergeset/part.go

480 lines
11 KiB
Go

package mergeset
import (
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
func getMaxCachedIndexBlocksPerPart() int {
maxCachedIndexBlocksPerPartOnce.Do(func() {
n := memory.Allowed() / 1024 / 1024 / 4
if n == 0 {
n = 10
}
maxCachedIndexBlocksPerPart = n
})
return maxCachedIndexBlocksPerPart
}
var (
maxCachedIndexBlocksPerPart int
maxCachedIndexBlocksPerPartOnce sync.Once
)
func getMaxCachedInmemoryBlocksPerPart() int {
maxCachedInmemoryBlocksPerPartOnce.Do(func() {
n := memory.Allowed() / 1024 / 1024 / 4
if n == 0 {
n = 10
}
maxCachedInmemoryBlocksPerPart = n
})
return maxCachedInmemoryBlocksPerPart
}
var (
maxCachedInmemoryBlocksPerPart int
maxCachedInmemoryBlocksPerPartOnce sync.Once
)
type part struct {
ph partHeader
path string
size uint64
mrs []metaindexRow
indexFile fs.MustReadAtCloser
itemsFile fs.MustReadAtCloser
lensFile fs.MustReadAtCloser
idxbCache *indexBlockCache
ibCache *inmemoryBlockCache
}
func openFilePart(path string) (*part, error) {
path = filepath.Clean(path)
var ph partHeader
if err := ph.ParseFromPath(path); err != nil {
return nil, fmt.Errorf("cannot parse path to part: %s", err)
}
metaindexPath := path + "/metaindex.bin"
metaindexFile, err := filestream.Open(metaindexPath, true)
if err != nil {
return nil, fmt.Errorf("cannot open %q: %s", metaindexPath, err)
}
metaindexSize := fs.MustFileSize(metaindexPath)
indexPath := path + "/index.bin"
indexFile, err := fs.OpenReaderAt(indexPath)
if err != nil {
metaindexFile.MustClose()
return nil, fmt.Errorf("cannot open %q: %s", indexPath, err)
}
indexSize := fs.MustFileSize(indexPath)
itemsPath := path + "/items.bin"
itemsFile, err := fs.OpenReaderAt(itemsPath)
if err != nil {
metaindexFile.MustClose()
indexFile.MustClose()
return nil, fmt.Errorf("cannot open %q: %s", itemsPath, err)
}
itemsSize := fs.MustFileSize(itemsPath)
lensPath := path + "/lens.bin"
lensFile, err := fs.OpenReaderAt(lensPath)
if err != nil {
metaindexFile.MustClose()
indexFile.MustClose()
itemsFile.MustClose()
return nil, fmt.Errorf("cannot open %q: %s", lensPath, err)
}
lensSize := fs.MustFileSize(lensPath)
size := metaindexSize + indexSize + itemsSize + lensSize
return newPart(&ph, path, size, metaindexFile, indexFile, itemsFile, lensFile)
}
func newPart(ph *partHeader, path string, size uint64, metaindexReader filestream.ReadCloser, indexFile, itemsFile, lensFile fs.MustReadAtCloser) (*part, error) {
var errors []error
mrs, err := unmarshalMetaindexRows(nil, metaindexReader)
if err != nil {
errors = append(errors, fmt.Errorf("cannot unmarshal metaindexRows: %s", err))
}
metaindexReader.MustClose()
var p part
p.path = path
p.size = size
p.mrs = mrs
p.indexFile = indexFile
p.itemsFile = itemsFile
p.lensFile = lensFile
p.ph.CopyFrom(ph)
p.idxbCache = newIndexBlockCache()
p.ibCache = newInmemoryBlockCache()
if len(errors) > 0 {
// Return only the first error, since it has no sense in returning all errors.
err := fmt.Errorf("error opening part %s: %s", p.path, errors[0])
p.MustClose()
return nil, err
}
return &p, nil
}
func (p *part) MustClose() {
p.indexFile.MustClose()
p.itemsFile.MustClose()
p.lensFile.MustClose()
p.idxbCache.MustClose()
p.ibCache.MustClose()
}
type indexBlock struct {
bhs []blockHeader
}
func getIndexBlock() *indexBlock {
v := indexBlockPool.Get()
if v == nil {
return &indexBlock{}
}
return v.(*indexBlock)
}
func putIndexBlock(idxb *indexBlock) {
idxb.bhs = idxb.bhs[:0]
indexBlockPool.Put(idxb)
}
var indexBlockPool sync.Pool
type indexBlockCache struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[uint64]*indexBlockCacheEntry
mu sync.RWMutex
cleanerStopCh chan struct{}
cleanerWG sync.WaitGroup
}
type indexBlockCacheEntry struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
idxb *indexBlock
}
func newIndexBlockCache() *indexBlockCache {
var idxbc indexBlockCache
idxbc.m = make(map[uint64]*indexBlockCacheEntry)
idxbc.cleanerStopCh = make(chan struct{})
idxbc.cleanerWG.Add(1)
go func() {
defer idxbc.cleanerWG.Done()
idxbc.cleaner()
}()
return &idxbc
}
func (idxbc *indexBlockCache) MustClose() {
close(idxbc.cleanerStopCh)
idxbc.cleanerWG.Wait()
atomic.AddUint64(&indexBlockCacheRequests, idxbc.requests)
atomic.AddUint64(&indexBlockCacheMisses, idxbc.misses)
// It is safe returning idxbc.m to pool, since the Reset must be called
// when the idxbc entries are no longer accessed by concurrent goroutines.
for _, idxbe := range idxbc.m {
putIndexBlock(idxbe.idxb)
}
idxbc.m = nil
}
// cleaner periodically cleans least recently used items.
func (idxbc *indexBlockCache) cleaner() {
t := time.NewTimer(5 * time.Second)
for {
select {
case <-t.C:
idxbc.cleanByTimeout()
case <-idxbc.cleanerStopCh:
t.Stop()
return
}
}
}
func (idxbc *indexBlockCache) cleanByTimeout() {
currentTime := atomic.LoadUint64(&currentTimestamp)
idxbc.mu.Lock()
for k, idxbe := range idxbc.m {
// Delete items accessed more than 10 minutes ago.
if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 10*60 {
delete(idxbc.m, k)
}
}
idxbc.mu.Unlock()
}
var (
indexBlockCacheRequests uint64
indexBlockCacheMisses uint64
)
func (idxbc *indexBlockCache) Get(k uint64) *indexBlock {
atomic.AddUint64(&idxbc.requests, 1)
idxbc.mu.RLock()
idxbe := idxbc.m[k]
idxbc.mu.RUnlock()
if idxbe != nil {
currentTime := atomic.LoadUint64(&currentTimestamp)
if atomic.LoadUint64(&idxbe.lastAccessTime) != currentTime {
atomic.StoreUint64(&idxbe.lastAccessTime, currentTime)
}
return idxbe.idxb
}
atomic.AddUint64(&idxbc.misses, 1)
return nil
}
// Put puts idxb under the key k into idxbc.
//
// Returns true if the idxb has been put into idxbc.
func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) bool {
idxbc.mu.Lock()
// Remove superflouos entries.
if overflow := len(idxbc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(idxbc.m)) * 0.1)
for k := range idxbc.m {
// Do not return idxb to pool, since these entries may be used
// by concurrent goroutines.
delete(idxbc.m, k)
overflow--
if overflow == 0 {
break
}
}
}
// Store idxb in the cache.
idxbe := &indexBlockCacheEntry{
lastAccessTime: atomic.LoadUint64(&currentTimestamp),
idxb: idxb,
}
idxbc.m[k] = idxbe
idxbc.mu.Unlock()
return true
}
func (idxbc *indexBlockCache) Len() uint64 {
idxbc.mu.RLock()
n := len(idxbc.m)
idxbc.mu.RUnlock()
return uint64(n)
}
func (idxbc *indexBlockCache) Requests() uint64 {
return atomic.LoadUint64(&idxbc.requests)
}
func (idxbc *indexBlockCache) Misses() uint64 {
return atomic.LoadUint64(&idxbc.misses)
}
type inmemoryBlockCache struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[inmemoryBlockCacheKey]*inmemoryBlockCacheEntry
mu sync.RWMutex
cleanerStopCh chan struct{}
cleanerWG sync.WaitGroup
}
type inmemoryBlockCacheKey struct {
firstItem string
itemsBlockOffset uint64
}
func (ibck *inmemoryBlockCacheKey) Init(bh *blockHeader) {
ibck.firstItem = ""
if bh.itemsBlockSize == 0 {
ibck.firstItem = string(bh.firstItem)
}
ibck.itemsBlockOffset = bh.itemsBlockOffset
}
type inmemoryBlockCacheEntry struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
ib *inmemoryBlock
}
func newInmemoryBlockCache() *inmemoryBlockCache {
var ibc inmemoryBlockCache
ibc.m = make(map[inmemoryBlockCacheKey]*inmemoryBlockCacheEntry)
ibc.cleanerStopCh = make(chan struct{})
ibc.cleanerWG.Add(1)
go func() {
defer ibc.cleanerWG.Done()
ibc.cleaner()
}()
return &ibc
}
func (ibc *inmemoryBlockCache) MustClose() {
close(ibc.cleanerStopCh)
ibc.cleanerWG.Wait()
atomic.AddUint64(&inmemoryBlockCacheRequests, ibc.requests)
atomic.AddUint64(&inmemoryBlockCacheMisses, ibc.misses)
// It is safe returning ibc.m entries to pool, since the Reset function may be called
// only if no other goroutines access ibc entries.
for _, ibe := range ibc.m {
putInmemoryBlock(ibe.ib)
}
ibc.m = nil
}
// cleaner periodically cleans least recently used items.
func (ibc *inmemoryBlockCache) cleaner() {
t := time.NewTimer(5 * time.Second)
for {
select {
case <-t.C:
ibc.cleanByTimeout()
case <-ibc.cleanerStopCh:
t.Stop()
return
}
}
}
func (ibc *inmemoryBlockCache) cleanByTimeout() {
currentTime := atomic.LoadUint64(&currentTimestamp)
ibc.mu.Lock()
for k, ibe := range ibc.m {
// Delete items accessed more than 10 minutes ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 10*60 {
delete(ibc.m, k)
}
}
ibc.mu.Unlock()
}
var (
inmemoryBlockCacheRequests uint64
inmemoryBlockCacheMisses uint64
)
func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock {
atomic.AddUint64(&ibc.requests, 1)
ibc.mu.RLock()
ibe := ibc.m[k]
ibc.mu.RUnlock()
if ibe != nil {
currentTime := atomic.LoadUint64(&currentTimestamp)
if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime {
atomic.StoreUint64(&ibe.lastAccessTime, currentTime)
}
return ibe.ib
}
atomic.AddUint64(&ibc.misses, 1)
return nil
}
// Put puts ib under key k into ibc.
//
// Returns true if ib was put into ibc.
func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) bool {
ibc.mu.Lock()
// Clean superflouos entries in cache.
if overflow := len(ibc.m) - getMaxCachedInmemoryBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(ibc.m)) * 0.1)
for k := range ibc.m {
// do not call putInmemoryBlock(ib), since the ib
// may be used by concurrent goroutines.
delete(ibc.m, k)
overflow--
if overflow == 0 {
break
}
}
}
// Store ib in the cache.
ibe := &inmemoryBlockCacheEntry{
lastAccessTime: atomic.LoadUint64(&currentTimestamp),
ib: ib,
}
ibc.m[k] = ibe
ibc.mu.Unlock()
return true
}
func (ibc *inmemoryBlockCache) Len() uint64 {
ibc.mu.RLock()
n := len(ibc.m)
ibc.mu.RUnlock()
return uint64(n)
}
func (ibc *inmemoryBlockCache) Requests() uint64 {
return atomic.LoadUint64(&ibc.requests)
}
func (ibc *inmemoryBlockCache) Misses() uint64 {
return atomic.LoadUint64(&ibc.misses)
}
func init() {
go func() {
t := time.NewTimer(time.Second)
for tm := range t.C {
t := uint64(tm.Unix())
atomic.StoreUint64(&currentTimestamp, t)
}
}()
}
var currentTimestamp uint64