VictoriaMetrics/lib/logstorage/datadb.go
Zakhar Bessarab 47d9e82b52
lib/storage/partition: add check to ensure parts exist on disk (#5017)
* lib/storage/partition: add check to ensure parts exist on disk

If part exists in parts.json but is missing on disk there will be a misleading error similar to "unexpected number of substrings in the part name".

This change forces verification of part existence and throws a correct error in case it is missing on disk.

Such issue can be result of https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5005 or disk corruption.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/storage/partition: use filepath.Join instead of string concatenation

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/storage/partition: add action points for error message

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* all: add a check for missing part in lib/mergeset and lib/logstorage

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-09-19 11:18:21 +02:00

1034 lines
28 KiB
Go

package logstorage
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
// Default number of parts to merge at once.
//
// This number has been obtained empirically - it gives the lowest possible overhead.
// See appendPartsToMerge tests for details.
const defaultPartsToMerge = 15
// minMergeMultiplier is the minimum multiplier for the size of the output part
// compared to the size of the maximum input part for the merge.
//
// Higher value reduces write amplification (disk write IO induced by the merge),
// while increases the number of unmerged parts.
// The 1.7 is good enough for production workloads.
const minMergeMultiplier = 1.7
// The maximum number of inmemory parts in the partition.
//
// If the number of inmemory parts reaches this value, then assisted merge runs during data ingestion.
const maxInmemoryPartsPerPartition = 20
// datadb represents a database with log data
type datadb struct {
// mergeIdx is used for generating unique directory names for parts
mergeIdx uint64
inmemoryMergesTotal uint64
inmemoryActiveMerges uint64
fileMergesTotal uint64
fileActiveMerges uint64
// pt is the partition the datadb belongs to
pt *partition
// path is the path to the directory with log data
path string
// flushInterval is interval for flushing the inmemory parts to disk
flushInterval time.Duration
// inmemoryParts contains a list of inmemory parts
inmemoryParts []*partWrapper
// fileParts contains a list of file-based parts
fileParts []*partWrapper
// partsLock protects parts from concurrent access
partsLock sync.Mutex
// wg is used for determining when background workers stop
wg sync.WaitGroup
// stopCh is used for notifying background workers to stop
stopCh chan struct{}
// mergeDoneCond is used for pace-limiting the data ingestion rate
mergeDoneCond *sync.Cond
// inmemoryPartsFlushersCount is the number of currently running in-memory parts flushers
//
// This variable must be accessed under partsLock.
inmemoryPartsFlushersCount int
// mergeWorkersCount is the number of currently running merge workers
//
// This variable must be accessed under partsLock.
mergeWorkersCount int
}
// partWrapper is a wrapper for opened part.
type partWrapper struct {
// refCount is the number of references to p.
//
// When the number of references reaches zero, then p is closed.
refCount int32
// The flag, which is set when the part must be deleted after refCount reaches zero.
mustBeDeleted uint32
// p is an opened part
p *part
// mp references inmemory part used for initializing p.
mp *inmemoryPart
// isInMerge is set to true if the part takes part in merge.
isInMerge bool
// The deadline when in-memory part must be flushed to disk.
flushDeadline time.Time
}
func (pw *partWrapper) incRef() {
atomic.AddInt32(&pw.refCount, 1)
}
func (pw *partWrapper) decRef() {
n := atomic.AddInt32(&pw.refCount, -1)
if n > 0 {
return
}
deletePath := ""
if pw.mp == nil {
if atomic.LoadUint32(&pw.mustBeDeleted) != 0 {
deletePath = pw.p.path
}
} else {
putInmemoryPart(pw.mp)
pw.mp = nil
}
mustClosePart(pw.p)
pw.p = nil
if deletePath != "" {
fs.MustRemoveAll(deletePath)
}
}
func mustCreateDatadb(path string) {
fs.MustMkdirFailIfExist(path)
mustWritePartNames(path, []string{})
}
// mustOpenDatadb opens datadb at the given path with the given flushInterval for in-memory data.
func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *datadb {
// Remove temporary directories, which may be left after unclean shutdown.
fs.MustRemoveTemporaryDirs(path)
partNames := mustReadPartNames(path)
mustRemoveUnusedDirs(path, partNames)
pws := make([]*partWrapper, len(partNames))
for i, partName := range partNames {
// Make sure the partName exists on disk.
// If it is missing, then manual action from the user is needed,
// since this is unexpected state, which cannot occur under normal operation,
// including unclean shutdown.
partPath := filepath.Join(path, partName)
if !fs.IsPathExist(partPath) {
partsFile := filepath.Join(path, partsFilename)
logger.Panicf("FATAL: part %q is listed in %q, but is missing on disk; "+
"ensure %q contents is not corrupted; remove %q to rebuild its' content from the list of existing parts",
partPath, partsFile, partsFile, partsFile)
}
p := mustOpenFilePart(pt, partPath)
pws[i] = newPartWrapper(p, nil, time.Time{})
}
ddb := &datadb{
pt: pt,
mergeIdx: uint64(time.Now().UnixNano()),
flushInterval: flushInterval,
path: path,
fileParts: pws,
stopCh: make(chan struct{}),
}
ddb.mergeDoneCond = sync.NewCond(&ddb.partsLock)
// Start merge workers in the hope they'll merge the remaining parts
ddb.partsLock.Lock()
n := getMergeWorkersCount()
for i := 0; i < n; i++ {
ddb.startMergeWorkerLocked()
}
ddb.partsLock.Unlock()
return ddb
}
// startInmemoryPartsFlusherLocked starts flusher for in-memory parts to disk.
//
// This function must be called under partsLock.
func (ddb *datadb) startInmemoryPartsFlusherLocked() {
if ddb.inmemoryPartsFlushersCount >= 1 {
return
}
ddb.inmemoryPartsFlushersCount++
ddb.wg.Add(1)
go func() {
ddb.flushInmemoryParts()
ddb.wg.Done()
}()
}
func (ddb *datadb) flushInmemoryParts() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
ddb.partsLock.Lock()
pws := make([]*partWrapper, 0, len(ddb.inmemoryParts))
pws = appendNotInMergePartsLocked(pws, ddb.inmemoryParts)
currentTime := time.Now()
partsToFlush := pws[:0]
for _, pw := range pws {
if pw.flushDeadline.Before(currentTime) {
partsToFlush = append(partsToFlush, pw)
}
}
setInMergeLocked(partsToFlush)
if len(pws) == 0 {
ddb.inmemoryPartsFlushersCount--
}
ddb.partsLock.Unlock()
if len(pws) == 0 {
// There are no in-memory parts, so stop the flusher.
return
}
ddb.mustMergePartsFinal(partsToFlush)
select {
case <-ddb.stopCh:
return
case <-ticker.C:
}
}
}
// startMergeWorkerLocked starts a merge worker.
//
// This function must be called under locked partsLock.
func (ddb *datadb) startMergeWorkerLocked() {
if ddb.mergeWorkersCount >= getMergeWorkersCount() {
return
}
ddb.mergeWorkersCount++
ddb.wg.Add(1)
go func() {
globalMergeLimitCh <- struct{}{}
ddb.mustMergeExistingParts()
<-globalMergeLimitCh
ddb.wg.Done()
}()
}
// globalMergeLimitCh limits the number of concurrent merges across all the partitions
var globalMergeLimitCh = make(chan struct{}, getMergeWorkersCount())
func getMergeWorkersCount() int {
n := cgroup.AvailableCPUs()
if n < 4 {
// Use bigger number of workers on systems with small number of CPU cores,
// since a single worker may become busy for long time when merging big parts.
// Then the remaining workers may continue performing merges
// for newly added small parts.
return 4
}
return n
}
func (ddb *datadb) mustMergeExistingParts() {
for !needStop(ddb.stopCh) {
maxOutBytes := ddb.availableDiskSpace()
ddb.partsLock.Lock()
parts := make([]*partWrapper, 0, len(ddb.inmemoryParts)+len(ddb.fileParts))
parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts)
parts = appendNotInMergePartsLocked(parts, ddb.fileParts)
pws := appendPartsToMerge(nil, parts, maxOutBytes)
setInMergeLocked(pws)
if len(pws) == 0 {
ddb.mergeWorkersCount--
}
ddb.partsLock.Unlock()
if len(pws) == 0 {
// Nothing to merge at the moment.
return
}
partsSize := getCompressedSize(pws)
if !ddb.reserveDiskSpace(partsSize) {
// There is no free disk space for the merge,
// because concurrent merge workers already reserved the disk space.
// Try again with smaller maxOutBytes.
ddb.releasePartsToMerge(pws)
continue
}
ddb.mustMergeParts(pws, false)
ddb.releaseDiskSpace(partsSize)
}
}
// appendNotInMergePartsLocked appends src parts with isInMerge=false to dst and returns the result.
//
// This function must be called under partsLock.
func appendNotInMergePartsLocked(dst, src []*partWrapper) []*partWrapper {
for _, pw := range src {
if !pw.isInMerge {
dst = append(dst, pw)
}
}
return dst
}
// setInMergeLocked sets isInMerge flag for pws.
//
// This function must be called under partsLock.
func setInMergeLocked(pws []*partWrapper) {
for _, pw := range pws {
if pw.isInMerge {
logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to true")
}
pw.isInMerge = true
}
}
func assertIsInMerge(pws []*partWrapper) {
for _, pw := range pws {
if !pw.isInMerge {
logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to false")
}
}
}
// mustMergeParts merges pws to a single resulting part.
//
// if isFinal is set, then the resulting part will be saved to disk.
//
// All the parts inside pws must have isInMerge field set to true.
func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
if len(pws) == 0 {
// Nothing to merge.
return
}
assertIsInMerge(pws)
startTime := time.Now()
dstPartType := ddb.getDstPartType(pws, isFinal)
if dstPartType == partInmemory {
atomic.AddUint64(&ddb.inmemoryMergesTotal, 1)
atomic.AddUint64(&ddb.inmemoryActiveMerges, 1)
defer atomic.AddUint64(&ddb.inmemoryActiveMerges, ^uint64(0))
} else {
atomic.AddUint64(&ddb.fileMergesTotal, 1)
atomic.AddUint64(&ddb.fileActiveMerges, 1)
defer atomic.AddUint64(&ddb.fileActiveMerges, ^uint64(0))
}
// Initialize destination paths.
mergeIdx := ddb.nextMergeIdx()
dstPartPath := ddb.getDstPartPath(dstPartType, mergeIdx)
if isFinal && len(pws) == 1 && pws[0].mp != nil {
// Fast path: flush a single in-memory part to disk.
mp := pws[0].mp
mp.MustStoreToDisk(dstPartPath)
pwNew := ddb.openCreatedPart(&mp.ph, pws, nil, dstPartPath)
ddb.swapSrcWithDstParts(pws, pwNew, dstPartType)
return
}
// Prepare blockStreamReaders for source parts.
bsrs := mustOpenBlockStreamReaders(pws)
// Prepare BlockStreamWriter for destination part.
srcSize := uint64(0)
srcRowsCount := uint64(0)
srcBlocksCount := uint64(0)
for _, pw := range pws {
srcSize += pw.p.ph.CompressedSizeBytes
srcRowsCount += pw.p.ph.RowsCount
srcBlocksCount += pw.p.ph.BlocksCount
}
bsw := getBlockStreamWriter()
var mpNew *inmemoryPart
if dstPartType == partInmemory {
mpNew = getInmemoryPart()
bsw.MustInitForInmemoryPart(mpNew)
} else {
nocache := !shouldUsePageCacheForPartSize(srcSize)
bsw.MustInitForFilePart(dstPartPath, nocache)
}
// Merge source parts to destination part.
var ph partHeader
stopCh := ddb.stopCh
if isFinal {
// The final merge shouldn't be stopped even if ddb.stopCh is closed.
stopCh = nil
}
mustMergeBlockStreams(&ph, bsw, bsrs, stopCh)
putBlockStreamWriter(bsw)
for _, bsr := range bsrs {
putBlockStreamReader(bsr)
}
// Persist partHeader for destination part after the merge.
if mpNew != nil {
mpNew.ph = ph
} else {
ph.mustWriteMetadata(dstPartPath)
// Make sure the created part directory listing is synced.
fs.MustSyncPath(dstPartPath)
}
if needStop(stopCh) {
ddb.releasePartsToMerge(pws)
ddb.mergeDoneCond.Broadcast()
// Remove incomplete destination part
if dstPartType == partFile {
fs.MustRemoveAll(dstPartPath)
}
return
}
// Atomically swap the source parts with the newly created part.
pwNew := ddb.openCreatedPart(&ph, pws, mpNew, dstPartPath)
dstSize := uint64(0)
dstRowsCount := uint64(0)
dstBlocksCount := uint64(0)
if pwNew != nil {
pDst := pwNew.p
dstSize = pDst.ph.CompressedSizeBytes
dstRowsCount = pDst.ph.RowsCount
dstBlocksCount = pDst.ph.BlocksCount
}
ddb.swapSrcWithDstParts(pws, pwNew, dstPartType)
d := time.Since(startTime)
if d <= 30*time.Second {
return
}
// Log stats for long merges.
durationSecs := d.Seconds()
rowsPerSec := int(float64(srcRowsCount) / durationSecs)
logger.Infof("merged (%d parts, %d rows, %d blocks, %d bytes) into (1 part, %d rows, %d blocks, %d bytes) in %.3f seconds at %d rows/sec to %q",
len(pws), srcRowsCount, srcBlocksCount, srcSize, dstRowsCount, dstBlocksCount, dstSize, durationSecs, rowsPerSec, dstPartPath)
}
func (ddb *datadb) nextMergeIdx() uint64 {
return atomic.AddUint64(&ddb.mergeIdx, 1)
}
type partType int
var (
partInmemory = partType(0)
partFile = partType(1)
)
func (ddb *datadb) getDstPartType(pws []*partWrapper, isFinal bool) partType {
if isFinal {
return partFile
}
dstPartSize := getCompressedSize(pws)
if dstPartSize > getMaxInmemoryPartSize() {
return partFile
}
if !areAllInmemoryParts(pws) {
// If at least a single source part is located in file,
// then the destination part must be in file for durability reasons.
return partFile
}
return partInmemory
}
func (ddb *datadb) getDstPartPath(dstPartType partType, mergeIdx uint64) string {
ptPath := ddb.path
dstPartPath := ""
if dstPartType != partInmemory {
dstPartPath = filepath.Join(ptPath, fmt.Sprintf("%016X", mergeIdx))
}
return dstPartPath
}
func (ddb *datadb) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *inmemoryPart, dstPartPath string) *partWrapper {
// Open the created part.
if ph.RowsCount == 0 {
// The created part is empty. Remove it
if mpNew == nil {
fs.MustRemoveAll(dstPartPath)
}
return nil
}
var p *part
var flushDeadline time.Time
if mpNew != nil {
// Open the created part from memory.
p = mustOpenInmemoryPart(ddb.pt, mpNew)
flushDeadline = ddb.getFlushToDiskDeadline(pws)
} else {
// Open the created part from disk.
p = mustOpenFilePart(ddb.pt, dstPartPath)
}
return newPartWrapper(p, mpNew, flushDeadline)
}
func (ddb *datadb) mustAddRows(lr *LogRows) {
if len(lr.streamIDs) == 0 {
return
}
mp := getInmemoryPart()
mp.mustInitFromRows(lr)
p := mustOpenInmemoryPart(ddb.pt, mp)
flushDeadline := time.Now().Add(ddb.flushInterval)
pw := newPartWrapper(p, mp, flushDeadline)
ddb.partsLock.Lock()
ddb.inmemoryParts = append(ddb.inmemoryParts, pw)
ddb.startInmemoryPartsFlusherLocked()
if len(ddb.inmemoryParts) > defaultPartsToMerge {
ddb.startMergeWorkerLocked()
}
for len(ddb.inmemoryParts) > maxInmemoryPartsPerPartition {
// limit the pace for data ingestion if too many inmemory parts are created
ddb.mergeDoneCond.Wait()
}
ddb.partsLock.Unlock()
}
// DatadbStats contains various stats for datadb.
type DatadbStats struct {
// InmemoryMergesTotal is the number of inmemory merges performed in the given datadb.
InmemoryMergesTotal uint64
// InmemoryActiveMerges is the number of currently active inmemory merges performed by the given datadb.
InmemoryActiveMerges uint64
// FileMergesTotal is the number of file merges performed in the given datadb.
FileMergesTotal uint64
// FileActiveMerges is the number of currently active file merges performed by the given datadb.
FileActiveMerges uint64
// InmemoryRowsCount is the number of rows, which weren't flushed to disk yet.
InmemoryRowsCount uint64
// FileRowsCount is the number of rows stored on disk.
FileRowsCount uint64
// InmemoryParts is the number of in-memory parts, which weren't flushed to disk yet.
InmemoryParts uint64
// FileParts is the number of file-based parts stored on disk.
FileParts uint64
// InmemoryBlocks is the number of in-memory blocks, which weren't flushed to disk yet.
InmemoryBlocks uint64
// FileBlocks is the number of file-based blocks stored on disk.
FileBlocks uint64
// CompressedInmemorySize is the size of compressed data stored in memory.
CompressedInmemorySize uint64
// CompressedFileSize is the size of compressed data stored on disk.
CompressedFileSize uint64
// UncompressedInmemorySize is the size of uncompressed data stored in memory.
UncompressedInmemorySize uint64
// UncompressedFileSize is the size of uncompressed data stored on disk.
UncompressedFileSize uint64
}
func (s *DatadbStats) reset() {
*s = DatadbStats{}
}
// RowsCount returns the number of rows stored in datadb.
func (s *DatadbStats) RowsCount() uint64 {
return s.InmemoryRowsCount + s.FileRowsCount
}
// updateStats updates s with ddb stats
func (ddb *datadb) updateStats(s *DatadbStats) {
s.InmemoryMergesTotal += atomic.LoadUint64(&ddb.inmemoryMergesTotal)
s.InmemoryActiveMerges += atomic.LoadUint64(&ddb.inmemoryActiveMerges)
s.FileMergesTotal += atomic.LoadUint64(&ddb.fileMergesTotal)
s.FileActiveMerges += atomic.LoadUint64(&ddb.fileActiveMerges)
ddb.partsLock.Lock()
s.InmemoryRowsCount += getRowsCount(ddb.inmemoryParts)
s.FileRowsCount += getRowsCount(ddb.fileParts)
s.InmemoryParts += uint64(len(ddb.inmemoryParts))
s.FileParts += uint64(len(ddb.fileParts))
s.InmemoryBlocks += getBlocksCount(ddb.inmemoryParts)
s.FileBlocks += getBlocksCount(ddb.fileParts)
s.CompressedInmemorySize += getCompressedSize(ddb.inmemoryParts)
s.CompressedFileSize += getCompressedSize(ddb.fileParts)
s.UncompressedInmemorySize += getUncompressedSize(ddb.inmemoryParts)
s.UncompressedFileSize += getUncompressedSize(ddb.fileParts)
ddb.partsLock.Unlock()
}
// debugFlush() makes sure that the recently ingested data is availalbe for search.
func (ddb *datadb) debugFlush() {
// Nothing to do, since all the ingested data is available for search via ddb.inmemoryParts.
}
func (ddb *datadb) mustMergePartsFinal(pws []*partWrapper) {
assertIsInMerge(pws)
var pwsChunk []*partWrapper
for len(pws) > 0 {
pwsChunk = appendPartsToMerge(pwsChunk[:0], pws, (1<<64)-1)
if len(pwsChunk) == 0 {
pwsChunk = append(pwsChunk[:0], pws...)
}
ddb.mustMergeParts(pwsChunk, true)
partsToRemove := partsToMap(pwsChunk)
removedParts := 0
pws, removedParts = removeParts(pws, partsToRemove)
if removedParts != len(pwsChunk) {
logger.Panicf("BUG: unexpected number of parts removed; got %d; want %d", removedParts, len(pwsChunk))
}
}
}
func partsToMap(pws []*partWrapper) map[*partWrapper]struct{} {
m := make(map[*partWrapper]struct{}, len(pws))
for _, pw := range pws {
m[pw] = struct{}{}
}
if len(m) != len(pws) {
logger.Panicf("BUG: %d duplicate parts found out of %d parts", len(pws)-len(m), len(pws))
}
return m
}
func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) {
// Atomically unregister old parts and add new part to pt.
partsToRemove := partsToMap(pws)
removedInmemoryParts := 0
removedFileParts := 0
ddb.partsLock.Lock()
ddb.inmemoryParts, removedInmemoryParts = removeParts(ddb.inmemoryParts, partsToRemove)
ddb.fileParts, removedFileParts = removeParts(ddb.fileParts, partsToRemove)
if pwNew != nil {
switch dstPartType {
case partInmemory:
ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew)
ddb.startInmemoryPartsFlusherLocked()
case partFile:
ddb.fileParts = append(ddb.fileParts, pwNew)
default:
logger.Panicf("BUG: unknown partType=%d", dstPartType)
}
if len(ddb.inmemoryParts)+len(ddb.fileParts) > defaultPartsToMerge {
ddb.startMergeWorkerLocked()
}
}
// Atomically store the updated list of file-based parts on disk.
// This must be performed under partsLock in order to prevent from races
// when multiple concurrently running goroutines update the list.
if removedFileParts > 0 || pwNew != nil && dstPartType == partFile {
partNames := getPartNames(ddb.fileParts)
mustWritePartNames(ddb.path, partNames)
}
ddb.partsLock.Unlock()
removedParts := removedInmemoryParts + removedFileParts
if removedParts != len(partsToRemove) {
logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(partsToRemove))
}
// Mark old parts as must be deleted and decrement reference count,
// so they are eventually closed and deleted.
for _, pw := range pws {
atomic.StoreUint32(&pw.mustBeDeleted, 1)
pw.decRef()
}
ddb.mergeDoneCond.Broadcast()
}
func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) {
dst := pws[:0]
for _, pw := range pws {
if _, ok := partsToRemove[pw]; !ok {
dst = append(dst, pw)
}
}
for i := len(dst); i < len(pws); i++ {
pws[i] = nil
}
return dst, len(pws) - len(dst)
}
func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader {
bsrs := make([]*blockStreamReader, 0, len(pws))
for _, pw := range pws {
bsr := getBlockStreamReader()
if pw.mp != nil {
bsr.MustInitFromInmemoryPart(pw.mp)
} else {
bsr.MustInitFromFilePart(pw.p.path)
}
bsrs = append(bsrs, bsr)
}
return bsrs
}
func newPartWrapper(p *part, mp *inmemoryPart, flushDeadline time.Time) *partWrapper {
pw := &partWrapper{
p: p,
mp: mp,
flushDeadline: flushDeadline,
}
// Increase reference counter for newly created part - it is decreased when the part
// is removed from the list of open parts.
pw.incRef()
return pw
}
func (ddb *datadb) getFlushToDiskDeadline(pws []*partWrapper) time.Time {
d := time.Now().Add(ddb.flushInterval)
for _, pw := range pws {
if pw.mp != nil && pw.flushDeadline.Before(d) {
d = pw.flushDeadline
}
}
return d
}
func getMaxInmemoryPartSize() uint64 {
// Allocate 10% of allowed memory for in-memory parts.
n := uint64(0.1 * float64(memory.Allowed()) / maxInmemoryPartsPerPartition)
if n < 1e6 {
n = 1e6
}
return n
}
func areAllInmemoryParts(pws []*partWrapper) bool {
for _, pw := range pws {
if pw.mp == nil {
return false
}
}
return true
}
func (ddb *datadb) releasePartsToMerge(pws []*partWrapper) {
ddb.partsLock.Lock()
for _, pw := range pws {
if !pw.isInMerge {
logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path)
}
pw.isInMerge = false
}
ddb.partsLock.Unlock()
}
func (ddb *datadb) availableDiskSpace() uint64 {
available := fs.MustGetFreeSpace(ddb.path)
reserved := atomic.LoadUint64(&reservedDiskSpace)
if available < reserved {
return 0
}
return available - reserved
}
func (ddb *datadb) reserveDiskSpace(n uint64) bool {
available := fs.MustGetFreeSpace(ddb.path)
reserved := atomic.AddUint64(&reservedDiskSpace, n)
if available > reserved {
return true
}
ddb.releaseDiskSpace(n)
return false
}
func (ddb *datadb) releaseDiskSpace(n uint64) {
atomic.AddUint64(&reservedDiskSpace, -n)
}
// reservedDiskSpace tracks global reserved disk space for currently executed
// background merges across all the partitions.
//
// It should allow avoiding background merges when there is no free disk space.
var reservedDiskSpace uint64
func needStop(stopCh <-chan struct{}) bool {
select {
case <-stopCh:
return true
default:
return false
}
}
// mustCloseDatadb can be called only when nobody accesses ddb.
func mustCloseDatadb(ddb *datadb) {
// Stop background workers
close(ddb.stopCh)
ddb.wg.Wait()
// flush in-memory data to disk
pws := append([]*partWrapper{}, ddb.inmemoryParts...)
setInMergeLocked(pws)
ddb.mustMergePartsFinal(pws)
// There is no need in using ddb.partsLock here, since nobody should acces ddb now.
for _, pw := range ddb.inmemoryParts {
pw.decRef()
if pw.refCount != 0 {
logger.Panicf("BUG: there are %d references to inmemoryPart", pw.refCount)
}
}
ddb.inmemoryParts = nil
for _, pw := range ddb.fileParts {
pw.decRef()
if pw.refCount != 0 {
logger.Panicf("BUG: ther are %d references to filePart", pw.refCount)
}
}
ddb.fileParts = nil
ddb.path = ""
ddb.pt = nil
}
func getPartNames(pws []*partWrapper) []string {
partNames := make([]string, 0, len(pws))
for _, pw := range pws {
if pw.mp != nil {
// Skip in-memory parts
continue
}
partName := filepath.Base(pw.p.path)
partNames = append(partNames, partName)
}
sort.Strings(partNames)
return partNames
}
func mustWritePartNames(path string, partNames []string) {
data, err := json.Marshal(partNames)
if err != nil {
logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err)
}
partNamesPath := filepath.Join(path, partsFilename)
fs.MustWriteAtomic(partNamesPath, data, true)
}
func mustReadPartNames(path string) []string {
partNamesPath := filepath.Join(path, partsFilename)
data, err := os.ReadFile(partNamesPath)
if err != nil {
logger.Panicf("FATAL: cannot read %s: %s", partNamesPath, err)
}
var partNames []string
if err := json.Unmarshal(data, &partNames); err != nil {
logger.Panicf("FATAL: cannot parse %s: %s", partNamesPath, err)
}
return partNames
}
// mustRemoveUnusedDirs removes dirs at path, which are missing in partNames.
//
// These dirs may be left after unclean shutdown.
func mustRemoveUnusedDirs(path string, partNames []string) {
des := fs.MustReadDir(path)
m := make(map[string]struct{}, len(partNames))
for _, partName := range partNames {
m[partName] = struct{}{}
}
removedDirs := 0
for _, de := range des {
if !fs.IsDirOrSymlink(de) {
// Skip non-directories.
continue
}
fn := de.Name()
if _, ok := m[fn]; !ok {
deletePath := filepath.Join(path, fn)
fs.MustRemoveAll(deletePath)
removedDirs++
}
}
if removedDirs > 0 {
fs.MustSyncPath(path)
}
}
// appendPartsToMerge finds optimal parts to merge from src,
// appends them to dst and returns the result.
func appendPartsToMerge(dst, src []*partWrapper, maxOutBytes uint64) []*partWrapper {
if len(src) < 2 {
// There is no need in merging zero or one part :)
return dst
}
// Filter out too big parts.
// This should reduce N for O(N^2) algorithm below.
maxInPartBytes := uint64(float64(maxOutBytes) / minMergeMultiplier)
tmp := make([]*partWrapper, 0, len(src))
for _, pw := range src {
if pw.p.ph.CompressedSizeBytes > maxInPartBytes {
continue
}
tmp = append(tmp, pw)
}
src = tmp
sortPartsForOptimalMerge(src)
maxSrcParts := defaultPartsToMerge
if maxSrcParts > len(src) {
maxSrcParts = len(src)
}
minSrcParts := (maxSrcParts + 1) / 2
if minSrcParts < 2 {
minSrcParts = 2
}
// Exhaustive search for parts giving the lowest write amplification when merged.
var pws []*partWrapper
maxM := float64(0)
for i := minSrcParts; i <= maxSrcParts; i++ {
for j := 0; j <= len(src)-i; j++ {
a := src[j : j+i]
if a[0].p.ph.CompressedSizeBytes*uint64(len(a)) < a[len(a)-1].p.ph.CompressedSizeBytes {
// Do not merge parts with too big difference in size,
// since this results in unbalanced merges.
continue
}
outSize := getCompressedSize(a)
if outSize > maxOutBytes {
// There is no need in verifying remaining parts with bigger sizes.
break
}
m := float64(outSize) / float64(a[len(a)-1].p.ph.CompressedSizeBytes)
if m < maxM {
continue
}
maxM = m
pws = a
}
}
minM := float64(defaultPartsToMerge) / 2
if minM < minMergeMultiplier {
minM = minMergeMultiplier
}
if maxM < minM {
// There is no sense in merging parts with too small m,
// since this leads to high disk write IO.
return dst
}
return append(dst, pws...)
}
func sortPartsForOptimalMerge(pws []*partWrapper) {
// Sort src parts by size and backwards timestamp.
// This should improve adjanced points' locality in the merged parts.
sort.Slice(pws, func(i, j int) bool {
a := &pws[i].p.ph
b := &pws[j].p.ph
if a.CompressedSizeBytes == b.CompressedSizeBytes {
return a.MinTimestamp > b.MinTimestamp
}
return a.CompressedSizeBytes < b.CompressedSizeBytes
})
}
func getCompressedSize(pws []*partWrapper) uint64 {
n := uint64(0)
for _, pw := range pws {
n += pw.p.ph.CompressedSizeBytes
}
return n
}
func getUncompressedSize(pws []*partWrapper) uint64 {
n := uint64(0)
for _, pw := range pws {
n += pw.p.ph.UncompressedSizeBytes
}
return n
}
func getRowsCount(pws []*partWrapper) uint64 {
n := uint64(0)
for _, pw := range pws {
n += pw.p.ph.RowsCount
}
return n
}
func getBlocksCount(pws []*partWrapper) uint64 {
n := uint64(0)
for _, pw := range pws {
n += pw.p.ph.BlocksCount
}
return n
}
func shouldUsePageCacheForPartSize(size uint64) bool {
mem := memory.Remaining() / defaultPartsToMerge
return size <= uint64(mem)
}