mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-22 16:36:27 +01:00
cbbc6e7141
Substitute '+=' with '=', since tooLongItemsTotal is global counter, which doesn't belong to the Table struct.
This is a follow-up for 69d244e6fb
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6297
1797 lines
47 KiB
Go
1797 lines
47 KiB
Go
package mergeset
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
|
)
|
|
|
|
// maxInmemoryParts is the maximum number of inmemory parts in the table.
|
|
//
|
|
// This limit allows reducing CPU usage under high ingestion rate.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212
|
|
//
|
|
// This number may be reached when the insertion pace outreaches merger pace.
|
|
// If this number is reached, then the data ingestion is paused until background
|
|
// mergers reduce the number of parts below this number.
|
|
const maxInmemoryParts = 30
|
|
|
|
// Default number of parts to merge at once.
|
|
//
|
|
// This number has been obtained empirically - it gives the lowest possible overhead.
|
|
// See appendPartsToMerge tests for details.
|
|
const defaultPartsToMerge = 15
|
|
|
|
// maxPartSize is the maximum part size in bytes.
|
|
//
|
|
// This number should be limited by the amount of time required to merge parts of this summary size.
|
|
// The required time shouldn't exceed a day.
|
|
const maxPartSize = 400e9
|
|
|
|
// The interval for flushing buffered data to parts, so it becomes visible to search.
|
|
const pendingItemsFlushInterval = time.Second
|
|
|
|
// The interval for guaranteed flush of recently ingested data from memory to on-disk parts so they survive process crash.
|
|
var dataFlushInterval = 5 * time.Second
|
|
|
|
// SetDataFlushInterval sets the interval for guaranteed flush of recently ingested data from memory to disk.
|
|
//
|
|
// The data can be flushed from memory to disk more frequently if it doesn't fit the memory limit.
|
|
//
|
|
// This function must be called before initializing the indexdb.
|
|
func SetDataFlushInterval(d time.Duration) {
|
|
if d < pendingItemsFlushInterval {
|
|
// There is no sense in setting dataFlushInterval to values smaller than pendingItemsFlushInterval,
|
|
// since pending rows unconditionally remain in memory for up to pendingItemsFlushInterval.
|
|
d = pendingItemsFlushInterval
|
|
}
|
|
|
|
dataFlushInterval = d
|
|
}
|
|
|
|
// maxItemsPerCachedPart is the maximum items per created part by the merge,
|
|
// which must be cached in the OS page cache.
|
|
//
|
|
// Such parts are usually frequently accessed, so it is good to cache their
|
|
// contents in OS page cache.
|
|
func maxItemsPerCachedPart() uint64 {
|
|
mem := memory.Remaining()
|
|
// Production data shows that each item occupies ~4 bytes in the compressed part.
|
|
// It is expected no more than defaultPartsToMerge/2 parts exist
|
|
// in the OS page cache before they are merged into bigger part.
|
|
// Halft of the remaining RAM must be left for lib/storage parts,
|
|
// so the maxItems is calculated using the below code:
|
|
maxItems := uint64(mem) / (4 * defaultPartsToMerge)
|
|
if maxItems < 1e6 {
|
|
maxItems = 1e6
|
|
}
|
|
return maxItems
|
|
}
|
|
|
|
// Table represents mergeset table.
|
|
type Table struct {
|
|
activeInmemoryMerges atomic.Int64
|
|
activeFileMerges atomic.Int64
|
|
|
|
inmemoryMergesCount atomic.Uint64
|
|
fileMergesCount atomic.Uint64
|
|
|
|
inmemoryItemsMerged atomic.Uint64
|
|
fileItemsMerged atomic.Uint64
|
|
|
|
itemsAdded atomic.Uint64
|
|
itemsAddedSizeBytes atomic.Uint64
|
|
|
|
inmemoryPartsLimitReachedCount atomic.Uint64
|
|
|
|
mergeIdx atomic.Uint64
|
|
|
|
path string
|
|
|
|
flushCallback func()
|
|
needFlushCallbackCall atomic.Bool
|
|
|
|
prepareBlock PrepareBlockCallback
|
|
isReadOnly *atomic.Bool
|
|
|
|
// rawItems contains recently added items that haven't been converted to parts yet.
|
|
//
|
|
// rawItems are converted to inmemoryParts at least every pendingItemsFlushInterval or when rawItems becomes full.
|
|
//
|
|
// rawItems aren't visible for search due to performance reasons.
|
|
rawItems rawItemsShards
|
|
|
|
// partsLock protects inmemoryParts and fileParts.
|
|
partsLock sync.Mutex
|
|
|
|
// inmemoryParts contains inmemory parts, which are visible for search.
|
|
inmemoryParts []*partWrapper
|
|
|
|
// fileParts contains file-backed parts, which are visible for search.
|
|
fileParts []*partWrapper
|
|
|
|
// inmemoryPartsLimitCh limits the number of inmemory parts to maxInmemoryParts
|
|
// in order to prevent from data ingestion slowdown as described at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212
|
|
inmemoryPartsLimitCh chan struct{}
|
|
|
|
// stopCh is used for notifying all the background workers to stop.
|
|
//
|
|
// It must be closed under partsLock in order to prevent from calling wg.Add()
|
|
// after stopCh is closed.
|
|
stopCh chan struct{}
|
|
|
|
// wg is used for waiting for all the background workers to stop.
|
|
//
|
|
// wg.Add() must be called under partsLock after checking whether stopCh isn't closed.
|
|
// This should prevent from calling wg.Add() after stopCh is closed and wg.Wait() is called.
|
|
wg sync.WaitGroup
|
|
|
|
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
|
|
flushPendingItemsWG syncwg.WaitGroup
|
|
}
|
|
|
|
type rawItemsShards struct {
|
|
flushDeadlineMs atomic.Int64
|
|
|
|
shardIdx atomic.Uint32
|
|
|
|
// shards reduce lock contention when adding rows on multi-CPU systems.
|
|
shards []rawItemsShard
|
|
|
|
ibsToFlushLock sync.Mutex
|
|
ibsToFlush []*inmemoryBlock
|
|
}
|
|
|
|
// The number of shards for rawItems per table.
|
|
//
|
|
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
|
|
var rawItemsShardsPerTable = func() int {
|
|
cpus := cgroup.AvailableCPUs()
|
|
multiplier := cpus
|
|
if multiplier > 16 {
|
|
multiplier = 16
|
|
}
|
|
return cpus * multiplier
|
|
}()
|
|
|
|
var maxBlocksPerShard = 256
|
|
|
|
func (riss *rawItemsShards) init() {
|
|
riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)
|
|
}
|
|
|
|
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) {
|
|
shards := riss.shards
|
|
shardsLen := uint32(len(shards))
|
|
for len(items) > 0 {
|
|
n := riss.shardIdx.Add(1)
|
|
idx := n % shardsLen
|
|
tailItems, ibsToFlush := shards[idx].addItems(items)
|
|
riss.addIbsToFlush(tb, ibsToFlush)
|
|
items = tailItems
|
|
}
|
|
}
|
|
|
|
func (riss *rawItemsShards) addIbsToFlush(tb *Table, ibsToFlush []*inmemoryBlock) {
|
|
if len(ibsToFlush) == 0 {
|
|
return
|
|
}
|
|
|
|
var ibsToMerge []*inmemoryBlock
|
|
|
|
riss.ibsToFlushLock.Lock()
|
|
if len(riss.ibsToFlush) == 0 {
|
|
riss.updateFlushDeadline()
|
|
}
|
|
riss.ibsToFlush = append(riss.ibsToFlush, ibsToFlush...)
|
|
if len(riss.ibsToFlush) >= maxBlocksPerShard*cgroup.AvailableCPUs() {
|
|
ibsToMerge = riss.ibsToFlush
|
|
riss.ibsToFlush = nil
|
|
}
|
|
riss.ibsToFlushLock.Unlock()
|
|
|
|
tb.flushBlocksToInmemoryParts(ibsToMerge, false)
|
|
}
|
|
|
|
func (riss *rawItemsShards) Len() int {
|
|
n := 0
|
|
for i := range riss.shards {
|
|
n += riss.shards[i].Len()
|
|
}
|
|
return n
|
|
}
|
|
|
|
func (riss *rawItemsShards) updateFlushDeadline() {
|
|
riss.flushDeadlineMs.Store(time.Now().Add(pendingItemsFlushInterval).UnixMilli())
|
|
}
|
|
|
|
type rawItemsShardNopad struct {
|
|
flushDeadlineMs atomic.Int64
|
|
|
|
mu sync.Mutex
|
|
ibs []*inmemoryBlock
|
|
}
|
|
|
|
type rawItemsShard struct {
|
|
rawItemsShardNopad
|
|
|
|
// The padding prevents false sharing on widespread platforms with
|
|
// 128 mod (cache line size) = 0 .
|
|
_ [128 - unsafe.Sizeof(rawItemsShardNopad{})%128]byte
|
|
}
|
|
|
|
func (ris *rawItemsShard) Len() int {
|
|
ris.mu.Lock()
|
|
n := 0
|
|
for _, ib := range ris.ibs {
|
|
n += len(ib.items)
|
|
}
|
|
ris.mu.Unlock()
|
|
return n
|
|
}
|
|
|
|
func (ris *rawItemsShard) addItems(items [][]byte) ([][]byte, []*inmemoryBlock) {
|
|
var ibsToFlush []*inmemoryBlock
|
|
var tailItems [][]byte
|
|
|
|
ris.mu.Lock()
|
|
ibs := ris.ibs
|
|
if len(ibs) == 0 {
|
|
ibs = append(ibs, &inmemoryBlock{})
|
|
ris.updateFlushDeadline()
|
|
ris.ibs = ibs
|
|
}
|
|
ib := ibs[len(ibs)-1]
|
|
for i, item := range items {
|
|
if ib.Add(item) {
|
|
continue
|
|
}
|
|
if len(ibs) >= maxBlocksPerShard {
|
|
ibsToFlush = append(ibsToFlush, ibs...)
|
|
ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard)
|
|
tailItems = items[i:]
|
|
break
|
|
}
|
|
ib = &inmemoryBlock{}
|
|
if ib.Add(item) {
|
|
ibs = append(ibs, ib)
|
|
continue
|
|
}
|
|
|
|
// Skip too long item
|
|
itemPrefix := item
|
|
if len(itemPrefix) > 128 {
|
|
itemPrefix = itemPrefix[:128]
|
|
}
|
|
tooLongItemsTotal.Add(1)
|
|
tooLongItemLogger.Errorf("skipping adding too long item to indexdb: len(item)=%d; it shouldn't exceed %d bytes; item prefix=%q", len(item), maxInmemoryBlockSize, itemPrefix)
|
|
}
|
|
ris.ibs = ibs
|
|
ris.mu.Unlock()
|
|
|
|
return tailItems, ibsToFlush
|
|
}
|
|
|
|
func (ris *rawItemsShard) updateFlushDeadline() {
|
|
ris.flushDeadlineMs.Store(time.Now().Add(pendingItemsFlushInterval).UnixMilli())
|
|
}
|
|
|
|
var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second)
|
|
|
|
var tooLongItemsTotal atomic.Uint64
|
|
|
|
type partWrapper struct {
|
|
// refCount is the number of references to partWrapper
|
|
refCount atomic.Int32
|
|
|
|
// mustDrop marks partWrapper for deletion.
|
|
// This field should be updated only after partWrapper
|
|
// was removed from the list of active parts.
|
|
mustDrop atomic.Bool
|
|
|
|
p *part
|
|
|
|
mp *inmemoryPart
|
|
|
|
isInMerge bool
|
|
|
|
// The deadline when the in-memory part must be flushed to disk.
|
|
flushToDiskDeadline time.Time
|
|
}
|
|
|
|
func (pw *partWrapper) incRef() {
|
|
pw.refCount.Add(1)
|
|
}
|
|
|
|
func (pw *partWrapper) decRef() {
|
|
n := pw.refCount.Add(-1)
|
|
if n < 0 {
|
|
logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", n)
|
|
}
|
|
if n > 0 {
|
|
return
|
|
}
|
|
|
|
deletePath := ""
|
|
if pw.mp == nil && pw.mustDrop.Load() {
|
|
deletePath = pw.p.path
|
|
}
|
|
if pw.mp != nil {
|
|
// Do not return pw.mp to pool via putInmemoryPart(),
|
|
// since pw.mp size may be too big compared to other entries stored in the pool.
|
|
// This may result in increased memory usage because of high fragmentation.
|
|
pw.mp = nil
|
|
}
|
|
pw.p.MustClose()
|
|
pw.p = nil
|
|
|
|
if deletePath != "" {
|
|
fs.MustRemoveAll(deletePath)
|
|
}
|
|
}
|
|
|
|
// MustOpenTable opens a table on the given path.
|
|
//
|
|
// Optional flushCallback is called every time new data batch is flushed
|
|
// to the underlying storage and becomes visible to search.
|
|
//
|
|
// Optional prepareBlock is called during merge before flushing the prepared block
|
|
// to persistent storage.
|
|
//
|
|
// The table is created if it doesn't exist yet.
|
|
func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *atomic.Bool) *Table {
|
|
path = filepath.Clean(path)
|
|
|
|
// Create a directory for the table if it doesn't exist yet.
|
|
fs.MustMkdirIfNotExist(path)
|
|
|
|
// Open table parts.
|
|
pws := mustOpenParts(path)
|
|
|
|
tb := &Table{
|
|
path: path,
|
|
flushCallback: flushCallback,
|
|
prepareBlock: prepareBlock,
|
|
isReadOnly: isReadOnly,
|
|
fileParts: pws,
|
|
inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
tb.mergeIdx.Store(uint64(time.Now().UnixNano()))
|
|
tb.rawItems.init()
|
|
tb.startBackgroundWorkers()
|
|
|
|
return tb
|
|
}
|
|
|
|
func (tb *Table) startBackgroundWorkers() {
|
|
// Start file parts mergers, so they could start merging unmerged parts if needed.
|
|
// There is no need in starting in-memory parts mergers, since there are no in-memory parts yet.
|
|
tb.startFilePartsMergers()
|
|
|
|
tb.startPendingItemsFlusher()
|
|
tb.startInmemoryPartsFlusher()
|
|
tb.startFlushCallbackWorker()
|
|
}
|
|
|
|
func (tb *Table) startInmemoryPartsMergers() {
|
|
tb.partsLock.Lock()
|
|
for i := 0; i < cap(inmemoryPartsConcurrencyCh); i++ {
|
|
tb.startInmemoryPartsMergerLocked()
|
|
}
|
|
tb.partsLock.Unlock()
|
|
}
|
|
|
|
func (tb *Table) startInmemoryPartsMergerLocked() {
|
|
select {
|
|
case <-tb.stopCh:
|
|
return
|
|
default:
|
|
}
|
|
tb.wg.Add(1)
|
|
go func() {
|
|
tb.inmemoryPartsMerger()
|
|
tb.wg.Done()
|
|
}()
|
|
}
|
|
|
|
func (tb *Table) startFilePartsMergers() {
|
|
tb.partsLock.Lock()
|
|
for i := 0; i < cap(filePartsConcurrencyCh); i++ {
|
|
tb.startFilePartsMergerLocked()
|
|
}
|
|
tb.partsLock.Unlock()
|
|
}
|
|
|
|
func (tb *Table) startFilePartsMergerLocked() {
|
|
select {
|
|
case <-tb.stopCh:
|
|
return
|
|
default:
|
|
}
|
|
tb.wg.Add(1)
|
|
go func() {
|
|
tb.filePartsMerger()
|
|
tb.wg.Done()
|
|
}()
|
|
}
|
|
|
|
func (tb *Table) startPendingItemsFlusher() {
|
|
tb.wg.Add(1)
|
|
go func() {
|
|
tb.pendingItemsFlusher()
|
|
tb.wg.Done()
|
|
}()
|
|
}
|
|
|
|
func (tb *Table) startInmemoryPartsFlusher() {
|
|
tb.wg.Add(1)
|
|
go func() {
|
|
tb.inmemoryPartsFlusher()
|
|
tb.wg.Done()
|
|
}()
|
|
}
|
|
|
|
func (tb *Table) startFlushCallbackWorker() {
|
|
if tb.flushCallback == nil {
|
|
return
|
|
}
|
|
|
|
tb.wg.Add(1)
|
|
go func() {
|
|
// call flushCallback once per 10 seconds in order to improve the effectiveness of caches,
|
|
// which are reset by the flushCallback.
|
|
d := timeutil.AddJitterToDuration(time.Second * 10)
|
|
tc := time.NewTicker(d)
|
|
for {
|
|
select {
|
|
case <-tb.stopCh:
|
|
tb.flushCallback()
|
|
tb.wg.Done()
|
|
return
|
|
case <-tc.C:
|
|
if tb.needFlushCallbackCall.CompareAndSwap(true, false) {
|
|
tb.flushCallback()
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
var (
|
|
inmemoryPartsConcurrencyCh = make(chan struct{}, getInmemoryPartsConcurrency())
|
|
filePartsConcurrencyCh = make(chan struct{}, getFilePartsConcurrency())
|
|
)
|
|
|
|
func getInmemoryPartsConcurrency() int {
|
|
// The concurrency for processing in-memory parts must equal to the number of CPU cores,
|
|
// since these operations are CPU-bound.
|
|
return cgroup.AvailableCPUs()
|
|
}
|
|
|
|
func getFilePartsConcurrency() int {
|
|
n := cgroup.AvailableCPUs()
|
|
if n < 4 {
|
|
// Allow at least 4 concurrent workers for file parts on systems
|
|
// with less than 4 CPU cores in order to be able to make small file merges
|
|
// when big file merges are in progress.
|
|
return 4
|
|
}
|
|
return n
|
|
}
|
|
|
|
// MustClose closes the table.
|
|
func (tb *Table) MustClose() {
|
|
// Notify background workers to stop.
|
|
// The tb.partsLock is aquired in order to guarantee that tb.wg.Add() isn't called
|
|
// after tb.stopCh is closed and tb.wg.Wait() is called below.
|
|
tb.partsLock.Lock()
|
|
close(tb.stopCh)
|
|
tb.partsLock.Unlock()
|
|
|
|
// Wait for background workers to stop.
|
|
tb.wg.Wait()
|
|
|
|
// Flush the remaining in-memory items to files.
|
|
tb.flushInmemoryItemsToFiles()
|
|
|
|
// Remove references to parts from the tb, so they may be eventually closed after all the searches are done.
|
|
tb.partsLock.Lock()
|
|
|
|
if n := tb.rawItems.Len(); n > 0 {
|
|
logger.Panicf("BUG: raw items must be empty at this stage; got %d items", n)
|
|
}
|
|
|
|
if n := len(tb.inmemoryParts); n > 0 {
|
|
logger.Panicf("BUG: in-memory parts must be empty at this stage; got %d parts", n)
|
|
}
|
|
tb.inmemoryParts = nil
|
|
|
|
fileParts := tb.fileParts
|
|
tb.fileParts = nil
|
|
|
|
tb.partsLock.Unlock()
|
|
|
|
for _, pw := range fileParts {
|
|
pw.decRef()
|
|
}
|
|
}
|
|
|
|
// Path returns the path to tb on the filesystem.
|
|
func (tb *Table) Path() string {
|
|
return tb.path
|
|
}
|
|
|
|
// TableMetrics contains essential metrics for the Table.
|
|
type TableMetrics struct {
|
|
ActiveInmemoryMerges uint64
|
|
ActiveFileMerges uint64
|
|
|
|
InmemoryMergesCount uint64
|
|
FileMergesCount uint64
|
|
|
|
InmemoryItemsMerged uint64
|
|
FileItemsMerged uint64
|
|
|
|
ItemsAdded uint64
|
|
ItemsAddedSizeBytes uint64
|
|
|
|
InmemoryPartsLimitReachedCount uint64
|
|
|
|
PendingItems uint64
|
|
|
|
InmemoryPartsCount uint64
|
|
FilePartsCount uint64
|
|
|
|
InmemoryBlocksCount uint64
|
|
FileBlocksCount uint64
|
|
|
|
InmemoryItemsCount uint64
|
|
FileItemsCount uint64
|
|
|
|
InmemorySizeBytes uint64
|
|
FileSizeBytes uint64
|
|
|
|
DataBlocksCacheSize uint64
|
|
DataBlocksCacheSizeBytes uint64
|
|
DataBlocksCacheSizeMaxBytes uint64
|
|
DataBlocksCacheRequests uint64
|
|
DataBlocksCacheMisses uint64
|
|
|
|
IndexBlocksCacheSize uint64
|
|
IndexBlocksCacheSizeBytes uint64
|
|
IndexBlocksCacheSizeMaxBytes uint64
|
|
IndexBlocksCacheRequests uint64
|
|
IndexBlocksCacheMisses uint64
|
|
|
|
PartsRefCount uint64
|
|
|
|
TooLongItemsDroppedTotal uint64
|
|
}
|
|
|
|
// TotalItemsCount returns the total number of items in the table.
|
|
func (tm *TableMetrics) TotalItemsCount() uint64 {
|
|
return tm.InmemoryItemsCount + tm.FileItemsCount
|
|
}
|
|
|
|
// UpdateMetrics updates m with metrics from tb.
|
|
func (tb *Table) UpdateMetrics(m *TableMetrics) {
|
|
m.ActiveInmemoryMerges += uint64(tb.activeInmemoryMerges.Load())
|
|
m.ActiveFileMerges += uint64(tb.activeFileMerges.Load())
|
|
|
|
m.InmemoryMergesCount += tb.inmemoryMergesCount.Load()
|
|
m.FileMergesCount += tb.fileMergesCount.Load()
|
|
|
|
m.InmemoryItemsMerged += tb.inmemoryItemsMerged.Load()
|
|
m.FileItemsMerged += tb.fileItemsMerged.Load()
|
|
|
|
m.ItemsAdded += tb.itemsAdded.Load()
|
|
m.ItemsAddedSizeBytes += tb.itemsAddedSizeBytes.Load()
|
|
|
|
m.InmemoryPartsLimitReachedCount += tb.inmemoryPartsLimitReachedCount.Load()
|
|
|
|
m.PendingItems += uint64(tb.rawItems.Len())
|
|
|
|
tb.partsLock.Lock()
|
|
|
|
m.InmemoryPartsCount += uint64(len(tb.inmemoryParts))
|
|
for _, pw := range tb.inmemoryParts {
|
|
p := pw.p
|
|
m.InmemoryBlocksCount += p.ph.blocksCount
|
|
m.InmemoryItemsCount += p.ph.itemsCount
|
|
m.InmemorySizeBytes += p.size
|
|
m.PartsRefCount += uint64(pw.refCount.Load())
|
|
}
|
|
|
|
m.FilePartsCount += uint64(len(tb.fileParts))
|
|
for _, pw := range tb.fileParts {
|
|
p := pw.p
|
|
m.FileBlocksCount += p.ph.blocksCount
|
|
m.FileItemsCount += p.ph.itemsCount
|
|
m.FileSizeBytes += p.size
|
|
m.PartsRefCount += uint64(pw.refCount.Load())
|
|
}
|
|
tb.partsLock.Unlock()
|
|
|
|
m.DataBlocksCacheSize = uint64(ibCache.Len())
|
|
m.DataBlocksCacheSizeBytes = uint64(ibCache.SizeBytes())
|
|
m.DataBlocksCacheSizeMaxBytes = uint64(ibCache.SizeMaxBytes())
|
|
m.DataBlocksCacheRequests = ibCache.Requests()
|
|
m.DataBlocksCacheMisses = ibCache.Misses()
|
|
|
|
m.IndexBlocksCacheSize = uint64(idxbCache.Len())
|
|
m.IndexBlocksCacheSizeBytes = uint64(idxbCache.SizeBytes())
|
|
m.IndexBlocksCacheSizeMaxBytes = uint64(idxbCache.SizeMaxBytes())
|
|
m.IndexBlocksCacheRequests = idxbCache.Requests()
|
|
m.IndexBlocksCacheMisses = idxbCache.Misses()
|
|
|
|
m.TooLongItemsDroppedTotal = tooLongItemsTotal.Load()
|
|
}
|
|
|
|
// AddItems adds the given items to the tb.
|
|
//
|
|
// The function ignores items with length exceeding maxInmemoryBlockSize.
|
|
// It logs the ignored items, so users could notice and fix the issue.
|
|
func (tb *Table) AddItems(items [][]byte) {
|
|
tb.rawItems.addItems(tb, items)
|
|
tb.itemsAdded.Add(uint64(len(items)))
|
|
n := 0
|
|
for _, item := range items {
|
|
n += len(item)
|
|
}
|
|
tb.itemsAddedSizeBytes.Add(uint64(n))
|
|
}
|
|
|
|
// getParts appends parts snapshot to dst and returns it.
|
|
//
|
|
// The appended parts must be released with putParts.
|
|
func (tb *Table) getParts(dst []*partWrapper) []*partWrapper {
|
|
tb.partsLock.Lock()
|
|
for _, pw := range tb.inmemoryParts {
|
|
pw.incRef()
|
|
}
|
|
for _, pw := range tb.fileParts {
|
|
pw.incRef()
|
|
}
|
|
dst = append(dst, tb.inmemoryParts...)
|
|
dst = append(dst, tb.fileParts...)
|
|
tb.partsLock.Unlock()
|
|
|
|
return dst
|
|
}
|
|
|
|
// putParts releases the given pws obtained via getParts.
|
|
func (tb *Table) putParts(pws []*partWrapper) {
|
|
for _, pw := range pws {
|
|
pw.decRef()
|
|
}
|
|
}
|
|
|
|
func (tb *Table) mergeInmemoryPartsToFiles(pws []*partWrapper) error {
|
|
pwsLen := len(pws)
|
|
|
|
var errGlobal error
|
|
var errGlobalLock sync.Mutex
|
|
wg := getWaitGroup()
|
|
for len(pws) > 0 {
|
|
pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
|
|
wg.Add(1)
|
|
inmemoryPartsConcurrencyCh <- struct{}{}
|
|
go func(pwsChunk []*partWrapper) {
|
|
defer func() {
|
|
<-inmemoryPartsConcurrencyCh
|
|
wg.Done()
|
|
}()
|
|
|
|
if err := tb.mergeParts(pwsChunk, nil, true); err != nil {
|
|
// There is no need for errors.Is(err, errForciblyStopped) check here, since stopCh=nil is passed to mergeParts.
|
|
errGlobalLock.Lock()
|
|
if errGlobal == nil {
|
|
errGlobal = err
|
|
}
|
|
errGlobalLock.Unlock()
|
|
}
|
|
}(pwsToMerge)
|
|
pws = pwsRemaining
|
|
}
|
|
wg.Wait()
|
|
putWaitGroup(wg)
|
|
|
|
if errGlobal != nil {
|
|
return fmt.Errorf("cannot optimally merge %d parts: %w", pwsLen, errGlobal)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DebugFlush makes sure all the recently added data is visible to search.
|
|
//
|
|
// Note: this function doesn't store all the in-memory data to disk - it just converts
|
|
// recently added items to searchable parts, which can be stored either in memory
|
|
// (if they are quite small) or to persistent disk.
|
|
//
|
|
// This function is for debugging and testing purposes only,
|
|
// since it may slow down data ingestion when used frequently.
|
|
func (tb *Table) DebugFlush() {
|
|
tb.flushPendingItems(true)
|
|
|
|
// Wait for background flushers to finish.
|
|
tb.flushPendingItemsWG.Wait()
|
|
}
|
|
|
|
func (tb *Table) pendingItemsFlusher() {
|
|
// do not add jitter in order to guarantee flush interval
|
|
d := pendingItemsFlushInterval
|
|
ticker := time.NewTicker(d)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-tb.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
tb.flushPendingItems(false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tb *Table) inmemoryPartsFlusher() {
|
|
// do not add jitter in order to guarantee flush interval
|
|
d := dataFlushInterval
|
|
ticker := time.NewTicker(d)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-tb.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
tb.flushInmemoryPartsToFiles(false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tb *Table) flushPendingItems(isFinal bool) {
|
|
tb.flushPendingItemsWG.Add(1)
|
|
tb.rawItems.flush(tb, isFinal)
|
|
tb.flushPendingItemsWG.Done()
|
|
}
|
|
|
|
func (tb *Table) flushInmemoryItemsToFiles() {
|
|
tb.flushPendingItems(true)
|
|
tb.flushInmemoryPartsToFiles(true)
|
|
}
|
|
|
|
func (tb *Table) flushInmemoryPartsToFiles(isFinal bool) {
|
|
currentTime := time.Now()
|
|
var pws []*partWrapper
|
|
|
|
tb.partsLock.Lock()
|
|
for _, pw := range tb.inmemoryParts {
|
|
if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) {
|
|
pw.isInMerge = true
|
|
pws = append(pws, pw)
|
|
}
|
|
}
|
|
tb.partsLock.Unlock()
|
|
|
|
if err := tb.mergeInmemoryPartsToFiles(pws); err != nil {
|
|
logger.Panicf("FATAL: cannot merge in-memory parts to files: %s", err)
|
|
}
|
|
}
|
|
|
|
func (riss *rawItemsShards) flush(tb *Table, isFinal bool) {
|
|
var dst []*inmemoryBlock
|
|
|
|
currentTimeMs := time.Now().UnixMilli()
|
|
flushDeadlineMs := riss.flushDeadlineMs.Load()
|
|
if isFinal || currentTimeMs >= flushDeadlineMs {
|
|
riss.ibsToFlushLock.Lock()
|
|
dst = riss.ibsToFlush
|
|
riss.ibsToFlush = nil
|
|
riss.ibsToFlushLock.Unlock()
|
|
}
|
|
|
|
for i := range riss.shards {
|
|
dst = riss.shards[i].appendBlocksToFlush(dst, currentTimeMs, isFinal)
|
|
}
|
|
|
|
tb.flushBlocksToInmemoryParts(dst, isFinal)
|
|
}
|
|
|
|
func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, currentTimeMs int64, isFinal bool) []*inmemoryBlock {
|
|
flushDeadlineMs := ris.flushDeadlineMs.Load()
|
|
if !isFinal && currentTimeMs < flushDeadlineMs {
|
|
// Fast path - nothing to flush
|
|
return dst
|
|
}
|
|
|
|
// Slow path - move ris.ibs to dst
|
|
ris.mu.Lock()
|
|
ibs := ris.ibs
|
|
dst = append(dst, ibs...)
|
|
for i := range ibs {
|
|
ibs[i] = nil
|
|
}
|
|
ris.ibs = ibs[:0]
|
|
ris.mu.Unlock()
|
|
|
|
return dst
|
|
}
|
|
|
|
func (tb *Table) flushBlocksToInmemoryParts(ibs []*inmemoryBlock, isFinal bool) {
|
|
if len(ibs) == 0 {
|
|
return
|
|
}
|
|
|
|
// Merge ibs into in-memory parts.
|
|
var pwsLock sync.Mutex
|
|
pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)
|
|
wg := getWaitGroup()
|
|
for len(ibs) > 0 {
|
|
n := defaultPartsToMerge
|
|
if n > len(ibs) {
|
|
n = len(ibs)
|
|
}
|
|
wg.Add(1)
|
|
inmemoryPartsConcurrencyCh <- struct{}{}
|
|
go func(ibsChunk []*inmemoryBlock) {
|
|
defer func() {
|
|
<-inmemoryPartsConcurrencyCh
|
|
wg.Done()
|
|
}()
|
|
|
|
if pw := tb.createInmemoryPart(ibsChunk); pw != nil {
|
|
pwsLock.Lock()
|
|
pws = append(pws, pw)
|
|
pwsLock.Unlock()
|
|
}
|
|
// Clear references to ibsChunk items, so they may be reclaimed faster by Go GC.
|
|
for i := range ibsChunk {
|
|
ibsChunk[i] = nil
|
|
}
|
|
}(ibs[:n])
|
|
ibs = ibs[n:]
|
|
}
|
|
wg.Wait()
|
|
putWaitGroup(wg)
|
|
|
|
// Merge pws into a single in-memory part.
|
|
maxPartSize := getMaxInmemoryPartSize()
|
|
for len(pws) > 1 {
|
|
pws = tb.mustMergeInmemoryParts(pws)
|
|
|
|
pwsRemaining := pws[:0]
|
|
for _, pw := range pws {
|
|
if pw.p.size >= maxPartSize {
|
|
tb.addToInmemoryParts(pw, isFinal)
|
|
} else {
|
|
pwsRemaining = append(pwsRemaining, pw)
|
|
}
|
|
}
|
|
pws = pwsRemaining
|
|
}
|
|
if len(pws) == 1 {
|
|
tb.addToInmemoryParts(pws[0], isFinal)
|
|
}
|
|
}
|
|
|
|
func (tb *Table) addToInmemoryParts(pw *partWrapper, isFinal bool) {
|
|
// Wait until the number of in-memory parts goes below maxInmemoryParts.
|
|
// This prevents from excess CPU usage during search in tb under high ingestion rate to tb.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212
|
|
select {
|
|
case tb.inmemoryPartsLimitCh <- struct{}{}:
|
|
default:
|
|
tb.inmemoryPartsLimitReachedCount.Add(1)
|
|
select {
|
|
case tb.inmemoryPartsLimitCh <- struct{}{}:
|
|
case <-tb.stopCh:
|
|
}
|
|
}
|
|
|
|
tb.partsLock.Lock()
|
|
tb.inmemoryParts = append(tb.inmemoryParts, pw)
|
|
tb.startInmemoryPartsMergerLocked()
|
|
tb.partsLock.Unlock()
|
|
|
|
if tb.flushCallback != nil {
|
|
if isFinal {
|
|
tb.flushCallback()
|
|
} else {
|
|
// Use Load in front of CompareAndSwap in order to avoid slow inter-CPU synchronization
|
|
// at fast path when needFlushCallbackCall is already set to true.
|
|
if !tb.needFlushCallbackCall.Load() {
|
|
tb.needFlushCallbackCall.CompareAndSwap(false, true)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func getWaitGroup() *sync.WaitGroup {
|
|
v := wgPool.Get()
|
|
if v == nil {
|
|
return &sync.WaitGroup{}
|
|
}
|
|
return v.(*sync.WaitGroup)
|
|
}
|
|
|
|
func putWaitGroup(wg *sync.WaitGroup) {
|
|
wgPool.Put(wg)
|
|
}
|
|
|
|
var wgPool sync.Pool
|
|
|
|
func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper {
|
|
var pwsResult []*partWrapper
|
|
var pwsResultLock sync.Mutex
|
|
wg := getWaitGroup()
|
|
for len(pws) > 0 {
|
|
pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
|
|
wg.Add(1)
|
|
inmemoryPartsConcurrencyCh <- struct{}{}
|
|
go func(pwsChunk []*partWrapper) {
|
|
defer func() {
|
|
<-inmemoryPartsConcurrencyCh
|
|
wg.Done()
|
|
}()
|
|
|
|
pw := tb.mustMergeInmemoryPartsFinal(pwsChunk)
|
|
|
|
pwsResultLock.Lock()
|
|
pwsResult = append(pwsResult, pw)
|
|
pwsResultLock.Unlock()
|
|
}(pwsToMerge)
|
|
pws = pwsRemaining
|
|
}
|
|
wg.Wait()
|
|
putWaitGroup(wg)
|
|
|
|
return pwsResult
|
|
}
|
|
|
|
func (tb *Table) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper {
|
|
if len(pws) == 0 {
|
|
logger.Panicf("BUG: pws must contain at least a single item")
|
|
}
|
|
if len(pws) == 1 {
|
|
// Nothing to merge
|
|
return pws[0]
|
|
}
|
|
|
|
bsrs := make([]*blockStreamReader, 0, len(pws))
|
|
for _, pw := range pws {
|
|
if pw.mp == nil {
|
|
logger.Panicf("BUG: unexpected file part")
|
|
}
|
|
bsr := getBlockStreamReader()
|
|
bsr.MustInitFromInmemoryPart(pw.mp)
|
|
bsrs = append(bsrs, bsr)
|
|
}
|
|
|
|
flushToDiskDeadline := getFlushToDiskDeadline(pws)
|
|
return tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline)
|
|
}
|
|
|
|
func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper {
|
|
// Prepare blockStreamReaders for source blocks.
|
|
bsrs := make([]*blockStreamReader, 0, len(ibs))
|
|
for _, ib := range ibs {
|
|
if len(ib.items) == 0 {
|
|
continue
|
|
}
|
|
bsr := getBlockStreamReader()
|
|
bsr.MustInitFromInmemoryBlock(ib)
|
|
bsrs = append(bsrs, bsr)
|
|
}
|
|
if len(bsrs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
flushToDiskDeadline := time.Now().Add(dataFlushInterval)
|
|
if len(bsrs) == 1 {
|
|
// Nothing to merge. Just return a single inmemory part.
|
|
bsr := bsrs[0]
|
|
mp := &inmemoryPart{}
|
|
mp.Init(&bsr.Block)
|
|
putBlockStreamReader(bsr)
|
|
return newPartWrapperFromInmemoryPart(mp, flushToDiskDeadline)
|
|
}
|
|
|
|
return tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline)
|
|
}
|
|
|
|
func (tb *Table) mustMergeIntoInmemoryPart(bsrs []*blockStreamReader, flushToDiskDeadline time.Time) *partWrapper {
|
|
// Prepare blockStreamWriter for destination part.
|
|
outItemsCount := uint64(0)
|
|
for _, bsr := range bsrs {
|
|
outItemsCount += bsr.ph.itemsCount
|
|
}
|
|
compressLevel := getCompressLevel(outItemsCount)
|
|
bsw := getBlockStreamWriter()
|
|
mpDst := &inmemoryPart{}
|
|
bsw.MustInitFromInmemoryPart(mpDst, compressLevel)
|
|
|
|
// Merge parts.
|
|
// The merge shouldn't be interrupted by stopCh, so use nil stopCh.
|
|
ph, err := tb.mergePartsInternal("", bsw, bsrs, partInmemory, nil)
|
|
putBlockStreamWriter(bsw)
|
|
for _, bsr := range bsrs {
|
|
putBlockStreamReader(bsr)
|
|
}
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err)
|
|
}
|
|
mpDst.ph = *ph
|
|
|
|
return newPartWrapperFromInmemoryPart(mpDst, flushToDiskDeadline)
|
|
}
|
|
|
|
func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.Time) *partWrapper {
|
|
p := mp.NewPart()
|
|
pw := &partWrapper{
|
|
p: p,
|
|
mp: mp,
|
|
flushToDiskDeadline: flushToDiskDeadline,
|
|
}
|
|
pw.incRef()
|
|
return pw
|
|
}
|
|
|
|
func getMaxInmemoryPartSize() uint64 {
|
|
// Allow up to 5% of memory for in-memory parts.
|
|
n := uint64(0.05 * float64(memory.Allowed()) / maxInmemoryParts)
|
|
if n < 1e6 {
|
|
n = 1e6
|
|
}
|
|
return n
|
|
}
|
|
|
|
func (tb *Table) getMaxFilePartSize() uint64 {
|
|
n := fs.MustGetFreeSpace(tb.path)
|
|
// Divide free space by the max number of concurrent merges for file parts.
|
|
maxOutBytes := n / uint64(cap(filePartsConcurrencyCh))
|
|
if maxOutBytes > maxPartSize {
|
|
maxOutBytes = maxPartSize
|
|
}
|
|
return maxOutBytes
|
|
}
|
|
|
|
// NotifyReadWriteMode notifies tb that it may be switched from read-only mode to read-write mode.
|
|
func (tb *Table) NotifyReadWriteMode() {
|
|
tb.startInmemoryPartsMergers()
|
|
tb.startFilePartsMergers()
|
|
}
|
|
|
|
func (tb *Table) inmemoryPartsMerger() {
|
|
for {
|
|
if tb.isReadOnly.Load() {
|
|
return
|
|
}
|
|
maxOutBytes := tb.getMaxFilePartSize()
|
|
|
|
tb.partsLock.Lock()
|
|
pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes)
|
|
tb.partsLock.Unlock()
|
|
|
|
if len(pws) == 0 {
|
|
// Nothing to merge
|
|
return
|
|
}
|
|
|
|
inmemoryPartsConcurrencyCh <- struct{}{}
|
|
err := tb.mergeParts(pws, tb.stopCh, false)
|
|
<-inmemoryPartsConcurrencyCh
|
|
|
|
if err == nil {
|
|
// Try merging additional parts.
|
|
continue
|
|
}
|
|
if errors.Is(err, errForciblyStopped) {
|
|
// Nothing to do - finish the merger.
|
|
return
|
|
}
|
|
// Unexpected error.
|
|
logger.Panicf("FATAL: unrecoverable error when merging inmemory parts in %q: %s", tb.path, err)
|
|
}
|
|
}
|
|
|
|
func (tb *Table) filePartsMerger() {
|
|
for {
|
|
if tb.isReadOnly.Load() {
|
|
return
|
|
}
|
|
maxOutBytes := tb.getMaxFilePartSize()
|
|
|
|
tb.partsLock.Lock()
|
|
pws := getPartsToMerge(tb.fileParts, maxOutBytes)
|
|
tb.partsLock.Unlock()
|
|
|
|
if len(pws) == 0 {
|
|
// Nothing to merge
|
|
return
|
|
}
|
|
|
|
filePartsConcurrencyCh <- struct{}{}
|
|
err := tb.mergeParts(pws, tb.stopCh, false)
|
|
<-filePartsConcurrencyCh
|
|
|
|
if err == nil {
|
|
// Try merging additional parts.
|
|
continue
|
|
}
|
|
if errors.Is(err, errForciblyStopped) {
|
|
// The merger has been stopped.
|
|
return
|
|
}
|
|
// Unexpected error.
|
|
logger.Panicf("FATAL: unrecoverable error when merging file parts in %q: %s", tb.path, err)
|
|
}
|
|
}
|
|
|
|
func assertIsInMerge(pws []*partWrapper) {
|
|
for _, pw := range pws {
|
|
if !pw.isInMerge {
|
|
logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to false")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tb *Table) releasePartsToMerge(pws []*partWrapper) {
|
|
tb.partsLock.Lock()
|
|
for _, pw := range pws {
|
|
if !pw.isInMerge {
|
|
logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path)
|
|
}
|
|
pw.isInMerge = false
|
|
}
|
|
tb.partsLock.Unlock()
|
|
}
|
|
|
|
// mergeParts merges pws to a single resulting part.
|
|
//
|
|
// It is expected that pws contains at least a single part.
|
|
//
|
|
// Merging is immediately stopped if stopCh is closed.
|
|
//
|
|
// If isFinal is set, then the resulting part will be stored to disk.
|
|
// If at least a single source part at pws is stored on disk, then the resulting part
|
|
// will be stored to disk.
|
|
//
|
|
// All the parts inside pws must have isInMerge field set to true.
|
|
// The isInMerge field inside pws parts is set to false before returning from the function.
|
|
func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error {
|
|
if len(pws) == 0 {
|
|
logger.Panicf("BUG: empty pws cannot be passed to mergeParts()")
|
|
}
|
|
|
|
assertIsInMerge(pws)
|
|
defer tb.releasePartsToMerge(pws)
|
|
|
|
startTime := time.Now()
|
|
|
|
// Initialize destination paths.
|
|
dstPartType := getDstPartType(pws, isFinal)
|
|
mergeIdx := tb.nextMergeIdx()
|
|
dstPartPath := ""
|
|
if dstPartType == partFile {
|
|
dstPartPath = filepath.Join(tb.path, fmt.Sprintf("%016X", mergeIdx))
|
|
}
|
|
|
|
if isFinal && len(pws) == 1 && pws[0].mp != nil {
|
|
// Fast path: flush a single in-memory part to disk.
|
|
mp := pws[0].mp
|
|
mp.MustStoreToDisk(dstPartPath)
|
|
pwNew := tb.openCreatedPart(pws, nil, dstPartPath)
|
|
tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
|
|
return nil
|
|
}
|
|
|
|
// Prepare BlockStreamReaders for source parts.
|
|
bsrs := mustOpenBlockStreamReaders(pws)
|
|
|
|
// Prepare BlockStreamWriter for destination part.
|
|
srcSize := uint64(0)
|
|
srcItemsCount := uint64(0)
|
|
srcBlocksCount := uint64(0)
|
|
for _, pw := range pws {
|
|
srcSize += pw.p.size
|
|
srcItemsCount += pw.p.ph.itemsCount
|
|
srcBlocksCount += pw.p.ph.blocksCount
|
|
}
|
|
compressLevel := getCompressLevel(srcItemsCount)
|
|
bsw := getBlockStreamWriter()
|
|
var mpNew *inmemoryPart
|
|
if dstPartType == partInmemory {
|
|
mpNew = &inmemoryPart{}
|
|
bsw.MustInitFromInmemoryPart(mpNew, compressLevel)
|
|
} else {
|
|
nocache := srcItemsCount > maxItemsPerCachedPart()
|
|
bsw.MustInitFromFilePart(dstPartPath, nocache, compressLevel)
|
|
}
|
|
|
|
// Merge source parts to destination part.
|
|
ph, err := tb.mergePartsInternal(dstPartPath, bsw, bsrs, dstPartType, stopCh)
|
|
putBlockStreamWriter(bsw)
|
|
for _, bsr := range bsrs {
|
|
putBlockStreamReader(bsr)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if mpNew != nil {
|
|
// Update partHeader for destination inmemory part after the merge.
|
|
mpNew.ph = *ph
|
|
} else {
|
|
// Make sure the created part directory listing is synced.
|
|
fs.MustSyncPath(dstPartPath)
|
|
}
|
|
|
|
// Atomically swap the source parts with the newly created part.
|
|
pwNew := tb.openCreatedPart(pws, mpNew, dstPartPath)
|
|
pDst := pwNew.p
|
|
dstItemsCount := pDst.ph.itemsCount
|
|
dstBlocksCount := pDst.ph.blocksCount
|
|
dstSize := pDst.size
|
|
|
|
tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
|
|
|
|
d := time.Since(startTime)
|
|
if d <= 30*time.Second {
|
|
return nil
|
|
}
|
|
|
|
// Log stats for long merges.
|
|
durationSecs := d.Seconds()
|
|
itemsPerSec := int(float64(srcItemsCount) / durationSecs)
|
|
logger.Infof("merged (%d parts, %d items, %d blocks, %d bytes) into (1 part, %d items, %d blocks, %d bytes) in %.3f seconds at %d items/sec to %q",
|
|
len(pws), srcItemsCount, srcBlocksCount, srcSize, dstItemsCount, dstBlocksCount, dstSize, durationSecs, itemsPerSec, dstPartPath)
|
|
|
|
return nil
|
|
}
|
|
|
|
func getFlushToDiskDeadline(pws []*partWrapper) time.Time {
|
|
d := time.Now().Add(dataFlushInterval)
|
|
for _, pw := range pws {
|
|
if pw.mp != nil && pw.flushToDiskDeadline.Before(d) {
|
|
d = pw.flushToDiskDeadline
|
|
}
|
|
}
|
|
return d
|
|
}
|
|
|
|
type partType int
|
|
|
|
var (
|
|
partInmemory = partType(0)
|
|
partFile = partType(1)
|
|
)
|
|
|
|
func getDstPartType(pws []*partWrapper, isFinal bool) partType {
|
|
dstPartSize := getPartsSize(pws)
|
|
if isFinal || dstPartSize > getMaxInmemoryPartSize() {
|
|
return partFile
|
|
}
|
|
if !areAllInmemoryParts(pws) {
|
|
// If at least a single source part is located in file,
|
|
// then the destination part must be in file for durability reasons.
|
|
return partFile
|
|
}
|
|
return partInmemory
|
|
}
|
|
|
|
func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader {
|
|
bsrs := make([]*blockStreamReader, 0, len(pws))
|
|
for _, pw := range pws {
|
|
bsr := getBlockStreamReader()
|
|
if pw.mp != nil {
|
|
bsr.MustInitFromInmemoryPart(pw.mp)
|
|
} else {
|
|
bsr.MustInitFromFilePart(pw.p.path)
|
|
}
|
|
bsrs = append(bsrs, bsr)
|
|
}
|
|
return bsrs
|
|
}
|
|
|
|
func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
|
|
var ph partHeader
|
|
var itemsMerged *atomic.Uint64
|
|
var mergesCount *atomic.Uint64
|
|
var activeMerges *atomic.Int64
|
|
switch dstPartType {
|
|
case partInmemory:
|
|
itemsMerged = &tb.inmemoryItemsMerged
|
|
mergesCount = &tb.inmemoryMergesCount
|
|
activeMerges = &tb.activeInmemoryMerges
|
|
case partFile:
|
|
itemsMerged = &tb.fileItemsMerged
|
|
mergesCount = &tb.fileMergesCount
|
|
activeMerges = &tb.activeFileMerges
|
|
default:
|
|
logger.Panicf("BUG: unknown partType=%d", dstPartType)
|
|
}
|
|
activeMerges.Add(1)
|
|
err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, itemsMerged)
|
|
activeMerges.Add(-1)
|
|
mergesCount.Add(1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err)
|
|
}
|
|
if dstPartPath != "" {
|
|
ph.MustWriteMetadata(dstPartPath)
|
|
}
|
|
return &ph, nil
|
|
}
|
|
|
|
func (tb *Table) openCreatedPart(pws []*partWrapper, mpNew *inmemoryPart, dstPartPath string) *partWrapper {
|
|
// Open the created part.
|
|
if mpNew != nil {
|
|
// Open the created part from memory.
|
|
flushToDiskDeadline := getFlushToDiskDeadline(pws)
|
|
pwNew := newPartWrapperFromInmemoryPart(mpNew, flushToDiskDeadline)
|
|
return pwNew
|
|
}
|
|
// Open the created part from disk.
|
|
pNew := mustOpenFilePart(dstPartPath)
|
|
pwNew := &partWrapper{
|
|
p: pNew,
|
|
}
|
|
pwNew.incRef()
|
|
return pwNew
|
|
}
|
|
|
|
func areAllInmemoryParts(pws []*partWrapper) bool {
|
|
for _, pw := range pws {
|
|
if pw.mp == nil {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) {
|
|
// Atomically unregister old parts and add new part to tb.
|
|
m := makeMapFromPartWrappers(pws)
|
|
|
|
removedInmemoryParts := 0
|
|
removedFileParts := 0
|
|
|
|
tb.partsLock.Lock()
|
|
|
|
tb.inmemoryParts, removedInmemoryParts = removeParts(tb.inmemoryParts, m)
|
|
tb.fileParts, removedFileParts = removeParts(tb.fileParts, m)
|
|
switch dstPartType {
|
|
case partInmemory:
|
|
tb.inmemoryParts = append(tb.inmemoryParts, pwNew)
|
|
tb.startInmemoryPartsMergerLocked()
|
|
case partFile:
|
|
tb.fileParts = append(tb.fileParts, pwNew)
|
|
tb.startFilePartsMergerLocked()
|
|
default:
|
|
logger.Panicf("BUG: unknown partType=%d", dstPartType)
|
|
}
|
|
|
|
// Atomically store the updated list of file-based parts on disk.
|
|
// This must be performed under partsLock in order to prevent from races
|
|
// when multiple concurrently running goroutines update the list.
|
|
if removedFileParts > 0 || dstPartType == partFile {
|
|
mustWritePartNames(tb.fileParts, tb.path)
|
|
}
|
|
|
|
tb.partsLock.Unlock()
|
|
|
|
// Update inmemoryPartsLimitCh accordingly to the number of the remaining in-memory parts.
|
|
for i := 0; i < removedInmemoryParts; i++ {
|
|
select {
|
|
case <-tb.inmemoryPartsLimitCh:
|
|
case <-tb.stopCh:
|
|
}
|
|
}
|
|
if dstPartType == partInmemory {
|
|
select {
|
|
case tb.inmemoryPartsLimitCh <- struct{}{}:
|
|
case <-tb.stopCh:
|
|
}
|
|
}
|
|
|
|
removedParts := removedInmemoryParts + removedFileParts
|
|
if removedParts != len(m) {
|
|
logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(m))
|
|
}
|
|
|
|
// Mark old parts as must be deleted and decrement reference count,
|
|
// so they are eventually closed and deleted.
|
|
for _, pw := range pws {
|
|
pw.mustDrop.Store(true)
|
|
pw.decRef()
|
|
}
|
|
}
|
|
|
|
func makeMapFromPartWrappers(pws []*partWrapper) map[*partWrapper]struct{} {
|
|
m := make(map[*partWrapper]struct{}, len(pws))
|
|
for _, pw := range pws {
|
|
m[pw] = struct{}{}
|
|
}
|
|
if len(m) != len(pws) {
|
|
logger.Panicf("BUG: %d duplicate parts found in %d source parts", len(pws)-len(m), len(pws))
|
|
}
|
|
return m
|
|
}
|
|
|
|
func getPartsSize(pws []*partWrapper) uint64 {
|
|
n := uint64(0)
|
|
for _, pw := range pws {
|
|
n += pw.p.size
|
|
}
|
|
return n
|
|
}
|
|
|
|
func getCompressLevel(itemsCount uint64) int {
|
|
if itemsCount <= 1<<16 {
|
|
// -5 is the minimum supported compression for zstd.
|
|
// See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
|
return -5
|
|
}
|
|
if itemsCount <= 1<<17 {
|
|
return -4
|
|
}
|
|
if itemsCount <= 1<<18 {
|
|
return -3
|
|
}
|
|
if itemsCount <= 1<<19 {
|
|
return -2
|
|
}
|
|
if itemsCount <= 1<<20 {
|
|
return -1
|
|
}
|
|
if itemsCount <= 1<<22 {
|
|
return 1
|
|
}
|
|
if itemsCount <= 1<<25 {
|
|
return 2
|
|
}
|
|
return 3
|
|
}
|
|
|
|
func (tb *Table) nextMergeIdx() uint64 {
|
|
return tb.mergeIdx.Add(1)
|
|
}
|
|
|
|
func mustOpenParts(path string) []*partWrapper {
|
|
// The path can be missing after restoring from backup, so create it if needed.
|
|
fs.MustMkdirIfNotExist(path)
|
|
fs.MustRemoveTemporaryDirs(path)
|
|
|
|
// Remove txn and tmp directories, which may be left after the upgrade
|
|
// to v1.90.0 and newer versions.
|
|
fs.MustRemoveAll(filepath.Join(path, "txn"))
|
|
fs.MustRemoveAll(filepath.Join(path, "tmp"))
|
|
|
|
partsFile := filepath.Join(path, partsFilename)
|
|
partNames := mustReadPartNames(partsFile, path)
|
|
|
|
// Remove dirs missing in partNames. These dirs may be left after unclean shutdown
|
|
// or after the update from versions prior to v1.90.0.
|
|
des := fs.MustReadDir(path)
|
|
m := make(map[string]struct{}, len(partNames))
|
|
for _, partName := range partNames {
|
|
// Make sure the partName exists on disk.
|
|
// If it is missing, then manual action from the user is needed,
|
|
// since this is unexpected state, which cannot occur under normal operation,
|
|
// including unclean shutdown.
|
|
partPath := filepath.Join(path, partName)
|
|
if !fs.IsPathExist(partPath) {
|
|
logger.Panicf("FATAL: part %q is listed in %q, but is missing on disk; "+
|
|
"ensure %q contents is not corrupted; remove %q to rebuild its' content from the list of existing parts",
|
|
partPath, partsFile, partsFile, partsFile)
|
|
}
|
|
|
|
m[partName] = struct{}{}
|
|
}
|
|
for _, de := range des {
|
|
if !fs.IsDirOrSymlink(de) {
|
|
// Skip non-directories.
|
|
continue
|
|
}
|
|
fn := de.Name()
|
|
if _, ok := m[fn]; !ok {
|
|
deletePath := filepath.Join(path, fn)
|
|
logger.Infof("deleting %q because it isn't listed in %q; this is the expected case after unclean shutdown", deletePath, partsFile)
|
|
fs.MustRemoveAll(deletePath)
|
|
}
|
|
}
|
|
fs.MustSyncPath(path)
|
|
|
|
// Open parts
|
|
var pws []*partWrapper
|
|
for _, partName := range partNames {
|
|
partPath := filepath.Join(path, partName)
|
|
p := mustOpenFilePart(partPath)
|
|
pw := &partWrapper{
|
|
p: p,
|
|
}
|
|
pw.incRef()
|
|
pws = append(pws, pw)
|
|
}
|
|
if !fs.IsPathExist(partsFile) {
|
|
// Create parts.json file if it doesn't exist yet.
|
|
// This should protect from possible carshloops just after the migration from versions below v1.90.0
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4336
|
|
mustWritePartNames(pws, path)
|
|
}
|
|
|
|
return pws
|
|
}
|
|
|
|
// CreateSnapshotAt creates tb snapshot in the given dstDir.
|
|
//
|
|
// Snapshot is created using linux hard links, so it is usually created very quickly.
|
|
//
|
|
// The caller is responsible for data removal at dstDir on unsuccessful snapshot creation.
|
|
func (tb *Table) CreateSnapshotAt(dstDir string) error {
|
|
logger.Infof("creating Table snapshot of %q...", tb.path)
|
|
startTime := time.Now()
|
|
|
|
var err error
|
|
srcDir := tb.path
|
|
srcDir, err = filepath.Abs(srcDir)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot obtain absolute dir for %q: %w", srcDir, err)
|
|
}
|
|
dstDir, err = filepath.Abs(dstDir)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot obtain absolute dir for %q: %w", dstDir, err)
|
|
}
|
|
if strings.HasPrefix(dstDir, srcDir+string(filepath.Separator)) {
|
|
return fmt.Errorf("cannot create snapshot %q inside the data dir %q", dstDir, srcDir)
|
|
}
|
|
|
|
// Flush inmemory items to disk.
|
|
tb.flushInmemoryItemsToFiles()
|
|
|
|
fs.MustMkdirFailIfExist(dstDir)
|
|
|
|
pws := tb.getParts(nil)
|
|
defer tb.putParts(pws)
|
|
|
|
// Create a file with part names at dstDir
|
|
mustWritePartNames(pws, dstDir)
|
|
|
|
// Make hardlinks for pws at dstDir
|
|
for _, pw := range pws {
|
|
if pw.mp != nil {
|
|
// Skip in-memory parts
|
|
continue
|
|
}
|
|
srcPartPath := pw.p.path
|
|
dstPartPath := filepath.Join(dstDir, filepath.Base(srcPartPath))
|
|
fs.MustHardLinkFiles(srcPartPath, dstPartPath)
|
|
}
|
|
|
|
fs.MustSyncPath(dstDir)
|
|
parentDir := filepath.Dir(dstDir)
|
|
fs.MustSyncPath(parentDir)
|
|
|
|
logger.Infof("created Table snapshot of %q at %q in %.3f seconds", srcDir, dstDir, time.Since(startTime).Seconds())
|
|
return nil
|
|
}
|
|
|
|
func mustWritePartNames(pws []*partWrapper, dstDir string) {
|
|
partNames := make([]string, 0, len(pws))
|
|
for _, pw := range pws {
|
|
if pw.mp != nil {
|
|
// Skip in-memory parts
|
|
continue
|
|
}
|
|
partName := filepath.Base(pw.p.path)
|
|
partNames = append(partNames, partName)
|
|
}
|
|
sort.Strings(partNames)
|
|
data, err := json.Marshal(partNames)
|
|
if err != nil {
|
|
logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err)
|
|
}
|
|
partsFile := filepath.Join(dstDir, partsFilename)
|
|
fs.MustWriteAtomic(partsFile, data, true)
|
|
}
|
|
|
|
func mustReadPartNames(partsFile, srcDir string) []string {
|
|
if fs.IsPathExist(partsFile) {
|
|
data, err := os.ReadFile(partsFile)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot read %q: %s", partsFile, err)
|
|
}
|
|
var partNames []string
|
|
if err := json.Unmarshal(data, &partNames); err != nil {
|
|
logger.Panicf("FATAL: cannot parse %q: %s", partsFile, err)
|
|
}
|
|
return partNames
|
|
}
|
|
// The partsFilename is missing. This is the upgrade from versions previous to v1.90.0.
|
|
// Read part names from directories under srcDir
|
|
des := fs.MustReadDir(srcDir)
|
|
var partNames []string
|
|
for _, de := range des {
|
|
if !fs.IsDirOrSymlink(de) {
|
|
// Skip non-directories.
|
|
continue
|
|
}
|
|
partName := de.Name()
|
|
if isSpecialDir(partName) {
|
|
// Skip special dirs.
|
|
continue
|
|
}
|
|
partNames = append(partNames, partName)
|
|
}
|
|
return partNames
|
|
}
|
|
|
|
// getPartsToMerge returns optimal parts to merge from pws.
|
|
//
|
|
// The summary size of the returned parts must be smaller than the maxOutBytes.
|
|
func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64) []*partWrapper {
|
|
pwsRemaining := make([]*partWrapper, 0, len(pws))
|
|
for _, pw := range pws {
|
|
if !pw.isInMerge {
|
|
pwsRemaining = append(pwsRemaining, pw)
|
|
}
|
|
}
|
|
|
|
pwsToMerge := appendPartsToMerge(nil, pwsRemaining, defaultPartsToMerge, maxOutBytes)
|
|
|
|
for _, pw := range pwsToMerge {
|
|
if pw.isInMerge {
|
|
logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to true")
|
|
}
|
|
pw.isInMerge = true
|
|
}
|
|
|
|
return pwsToMerge
|
|
}
|
|
|
|
// getPartsForOptimalMerge returns parts from pws for optimal merge, plus the remaining parts.
|
|
//
|
|
// the pws items are replaced by nil after the call. This is needed for helping Go GC to reclaim the referenced items.
|
|
func getPartsForOptimalMerge(pws []*partWrapper) ([]*partWrapper, []*partWrapper) {
|
|
pwsToMerge := appendPartsToMerge(nil, pws, defaultPartsToMerge, math.MaxUint64)
|
|
if len(pwsToMerge) == 0 {
|
|
return pws, nil
|
|
}
|
|
|
|
m := makeMapFromPartWrappers(pwsToMerge)
|
|
pwsRemaining := make([]*partWrapper, 0, len(pws)-len(pwsToMerge))
|
|
for _, pw := range pws {
|
|
if _, ok := m[pw]; !ok {
|
|
pwsRemaining = append(pwsRemaining, pw)
|
|
}
|
|
}
|
|
|
|
// Clear references to pws items, so they could be reclaimed faster by Go GC.
|
|
for i := range pws {
|
|
pws[i] = nil
|
|
}
|
|
|
|
return pwsToMerge, pwsRemaining
|
|
}
|
|
|
|
// minMergeMultiplier is the minimum multiplier for the size of the output part
|
|
// compared to the size of the maximum input part for the merge.
|
|
//
|
|
// Higher value reduces write amplification (disk write IO induced by the merge),
|
|
// while increases the number of unmerged parts.
|
|
// The 1.7 is good enough for production workloads.
|
|
const minMergeMultiplier = 1.7
|
|
|
|
// appendPartsToMerge finds optimal parts to merge from src, appends them to dst and returns the result.
|
|
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxOutBytes uint64) []*partWrapper {
|
|
if len(src) < 2 {
|
|
// There is no need in merging zero or one part :)
|
|
return dst
|
|
}
|
|
if maxPartsToMerge < 2 {
|
|
logger.Panicf("BUG: maxPartsToMerge cannot be smaller than 2; got %d", maxPartsToMerge)
|
|
}
|
|
|
|
// Filter out too big parts.
|
|
// This should reduce N for O(n^2) algorithm below.
|
|
maxInPartBytes := uint64(float64(maxOutBytes) / minMergeMultiplier)
|
|
tmp := make([]*partWrapper, 0, len(src))
|
|
for _, pw := range src {
|
|
if pw.p.size > maxInPartBytes {
|
|
continue
|
|
}
|
|
tmp = append(tmp, pw)
|
|
}
|
|
src = tmp
|
|
|
|
sortPartsForOptimalMerge(src)
|
|
|
|
maxSrcParts := maxPartsToMerge
|
|
if maxSrcParts > len(src) {
|
|
maxSrcParts = len(src)
|
|
}
|
|
minSrcParts := (maxSrcParts + 1) / 2
|
|
if minSrcParts < 2 {
|
|
minSrcParts = 2
|
|
}
|
|
|
|
// Exhaustive search for parts giving the lowest write amplification when merged.
|
|
var pws []*partWrapper
|
|
maxM := float64(0)
|
|
for i := minSrcParts; i <= maxSrcParts; i++ {
|
|
for j := 0; j <= len(src)-i; j++ {
|
|
a := src[j : j+i]
|
|
if a[0].p.size*uint64(len(a)) < a[len(a)-1].p.size {
|
|
// Do not merge parts with too big difference in size,
|
|
// since this results in unbalanced merges.
|
|
continue
|
|
}
|
|
outBytes := uint64(0)
|
|
for _, pw := range a {
|
|
outBytes += pw.p.size
|
|
}
|
|
if outBytes > maxOutBytes {
|
|
// There is no sense in checking the remaining bigger parts.
|
|
break
|
|
}
|
|
m := float64(outBytes) / float64(a[len(a)-1].p.size)
|
|
if m < maxM {
|
|
continue
|
|
}
|
|
maxM = m
|
|
pws = a
|
|
}
|
|
}
|
|
|
|
minM := float64(maxPartsToMerge) / 2
|
|
if minM < minMergeMultiplier {
|
|
minM = minMergeMultiplier
|
|
}
|
|
if maxM < minM {
|
|
// There is no sense in merging parts with too small m,
|
|
// since this leads to high disk write IO.
|
|
return dst
|
|
}
|
|
return append(dst, pws...)
|
|
}
|
|
|
|
func sortPartsForOptimalMerge(pws []*partWrapper) {
|
|
// Sort src parts by size.
|
|
sort.Slice(pws, func(i, j int) bool {
|
|
return pws[i].p.size < pws[j].p.size
|
|
})
|
|
}
|
|
|
|
func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) {
|
|
dst := pws[:0]
|
|
for _, pw := range pws {
|
|
if _, ok := partsToRemove[pw]; !ok {
|
|
dst = append(dst, pw)
|
|
}
|
|
}
|
|
for i := len(dst); i < len(pws); i++ {
|
|
pws[i] = nil
|
|
}
|
|
return dst, len(pws) - len(dst)
|
|
}
|
|
|
|
func isSpecialDir(name string) bool {
|
|
// Snapshots and cache dirs aren't used anymore.
|
|
// Keep them here for backwards compatibility.
|
|
return name == "tmp" || name == "txn" || name == "snapshots" || name == "cache" || fs.IsScheduledForRemoval(name)
|
|
}
|