2019-05-22 23:16:55 +02:00
package mergeset
import (
2020-09-17 02:02:35 +02:00
"errors"
2019-05-22 23:16:55 +02:00
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
2022-10-20 15:17:09 +02:00
"unsafe"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-05-14 21:01:51 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-09-09 10:41:30 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2020-07-22 23:58:48 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
)
2022-12-06 00:27:57 +01:00
// maxInmemoryParts is the maximum number of inmemory parts in the table.
2019-05-22 23:16:55 +02:00
//
// This number may be reached when the insertion pace outreaches merger pace.
2022-12-13 01:49:21 +01:00
// If this number is reached, then assisted merges are performed
// during data ingestion.
2022-12-28 23:32:18 +01:00
const maxInmemoryParts = 30
2019-05-22 23:16:55 +02:00
2022-12-13 01:49:21 +01:00
// maxFileParts is the maximum number of file parts in the table.
//
// This number may be reached when the insertion pace outreaches merger pace.
// If this number is reached, then assisted merges are performed
// during data ingestion.
2022-12-13 18:10:44 +01:00
const maxFileParts = 64
2022-12-13 01:49:21 +01:00
2019-05-22 23:16:55 +02:00
// Default number of parts to merge at once.
//
// This number has been obtained empirically - it gives the lowest possible overhead.
// See appendPartsToMerge tests for details.
const defaultPartsToMerge = 15
// The final number of parts to merge at once.
//
// It must be smaller than defaultPartsToMerge.
// Lower value improves select performance at the cost of increased
// write amplification.
const finalPartsToMerge = 2
2021-08-25 08:35:03 +02:00
// maxPartSize is the maximum part size in bytes.
2019-05-22 23:16:55 +02:00
//
2021-08-25 08:35:03 +02:00
// This number should be limited by the amount of time required to merge parts of this summary size.
// The required time shouldn't exceed a day.
const maxPartSize = 400e9
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
// The interval for flushing buffered data to parts, so it becomes visible to search.
const pendingItemsFlushInterval = time . Second
// The interval for guaranteed flush of recently ingested data from memory to on-disk parts,
// so they survive process crash.
var dataFlushInterval = 5 * time . Second
// SetDataFlushInterval sets the interval for guaranteed flush of recently ingested data from memory to disk.
//
// The data can be flushed from memory to disk more frequently if it doesn't fit the memory limit.
//
// This function must be called before initializing the indexdb.
func SetDataFlushInterval ( d time . Duration ) {
if d > pendingItemsFlushInterval {
dataFlushInterval = d
}
}
2019-05-22 23:16:55 +02:00
// maxItemsPerCachedPart is the maximum items per created part by the merge,
// which must be cached in the OS page cache.
//
// Such parts are usually frequently accessed, so it is good to cache their
// contents in OS page cache.
2019-09-09 10:41:30 +02:00
func maxItemsPerCachedPart ( ) uint64 {
mem := memory . Remaining ( )
// Production data shows that each item occupies ~4 bytes in the compressed part.
// It is expected no more than defaultPartsToMerge/2 parts exist
// in the OS page cache before they are merged into bigger part.
// Halft of the remaining RAM must be left for lib/storage parts,
// so the maxItems is calculated using the below code:
maxItems := uint64 ( mem ) / ( 4 * defaultPartsToMerge )
if maxItems < 1e6 {
maxItems = 1e6
}
return maxItems
}
2019-05-22 23:16:55 +02:00
// Table represents mergeset table.
type Table struct {
2019-10-17 17:22:56 +02:00
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
2022-12-06 00:27:57 +01:00
activeInmemoryMerges uint64
activeFileMerges uint64
inmemoryMergesCount uint64
fileMergesCount uint64
inmemoryItemsMerged uint64
fileItemsMerged uint64
2022-12-13 01:49:21 +01:00
inmemoryAssistedMerges uint64
fileAssistedMerges uint64
2022-12-06 00:27:57 +01:00
2022-04-21 12:18:05 +02:00
itemsAdded uint64
itemsAddedSizeBytes uint64
2019-10-17 17:22:56 +02:00
mergeIdx uint64
2019-05-22 23:16:55 +02:00
path string
2021-07-06 11:17:15 +02:00
flushCallback func ( )
flushCallbackWorkerWG sync . WaitGroup
needFlushCallbackCall uint32
2019-08-29 13:39:05 +02:00
2019-09-20 18:46:47 +02:00
prepareBlock PrepareBlockCallback
2022-06-01 13:21:12 +02:00
isReadOnly * uint32
2019-09-20 18:46:47 +02:00
2021-04-27 14:36:31 +02:00
// rawItems contains recently added items that haven't been converted to parts yet.
//
// rawItems aren't used in search for performance reasons
rawItems rawItemsShards
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
// partsLock protects inmemoryParts and fileParts.
partsLock sync . Mutex
// inmemoryParts contains inmemory parts.
inmemoryParts [ ] * partWrapper
// fileParts contains file-backed parts.
fileParts [ ] * partWrapper
2019-05-22 23:16:55 +02:00
snapshotLock sync . RWMutex
flockF * os . File
stopCh chan struct { }
2022-12-04 08:03:05 +01:00
wg sync . WaitGroup
2019-05-22 23:16:55 +02:00
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
rawItemsPendingFlushesWG syncwg . WaitGroup
}
2021-04-27 14:36:31 +02:00
type rawItemsShards struct {
2021-04-27 15:41:22 +02:00
shardIdx uint32
2021-04-27 14:36:31 +02:00
// shards reduce lock contention when adding rows on multi-CPU systems.
shards [ ] rawItemsShard
}
// The number of shards for rawItems per table.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
2022-04-06 18:35:50 +02:00
var rawItemsShardsPerTable = func ( ) int {
cpus := cgroup . AvailableCPUs ( )
multiplier := cpus
if multiplier > 16 {
multiplier = 16
}
2022-04-07 14:21:17 +02:00
return ( cpus * multiplier + 1 ) / 2
2022-04-06 18:35:50 +02:00
} ( )
2021-04-27 14:36:31 +02:00
2022-04-06 18:35:50 +02:00
const maxBlocksPerShard = 256
2021-04-27 14:36:31 +02:00
func ( riss * rawItemsShards ) init ( ) {
riss . shards = make ( [ ] rawItemsShard , rawItemsShardsPerTable )
}
2022-12-04 08:30:31 +01:00
func ( riss * rawItemsShards ) addItems ( tb * Table , items [ ] [ ] byte ) {
2021-04-27 14:36:31 +02:00
shards := riss . shards
2022-12-06 00:27:57 +01:00
shardsLen := uint32 ( len ( shards ) )
for len ( items ) > 0 {
n := atomic . AddUint32 ( & riss . shardIdx , 1 )
idx := n % shardsLen
items = shards [ idx ] . addItems ( tb , items )
}
2021-04-27 14:36:31 +02:00
}
func ( riss * rawItemsShards ) Len ( ) int {
n := 0
for i := range riss . shards {
n += riss . shards [ i ] . Len ( )
}
return n
}
2022-10-20 15:17:09 +02:00
type rawItemsShardNopad struct {
// Put lastFlushTime to the top in order to avoid unaligned memory access on 32-bit architectures
2021-04-27 14:36:31 +02:00
lastFlushTime uint64
2022-10-20 15:17:09 +02:00
mu sync . Mutex
ibs [ ] * inmemoryBlock
}
type rawItemsShard struct {
rawItemsShardNopad
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [ 128 - unsafe . Sizeof ( rawItemsShardNopad { } ) % 128 ] byte
2021-04-27 14:36:31 +02:00
}
func ( ris * rawItemsShard ) Len ( ) int {
ris . mu . Lock ( )
n := 0
for _ , ib := range ris . ibs {
n += len ( ib . items )
}
ris . mu . Unlock ( )
return n
}
2022-12-06 00:27:57 +01:00
func ( ris * rawItemsShard ) addItems ( tb * Table , items [ ] [ ] byte ) [ ] [ ] byte {
var ibsToFlush [ ] * inmemoryBlock
var tailItems [ ] [ ] byte
2021-04-27 14:36:31 +02:00
ris . mu . Lock ( )
ibs := ris . ibs
if len ( ibs ) == 0 {
2021-05-28 00:09:32 +02:00
ib := getInmemoryBlock ( )
2021-04-27 14:36:31 +02:00
ibs = append ( ibs , ib )
ris . ibs = ibs
}
ib := ibs [ len ( ibs ) - 1 ]
2022-12-06 00:27:57 +01:00
for i , item := range items {
2022-12-04 08:30:31 +01:00
if ib . Add ( item ) {
continue
}
2022-12-06 00:27:57 +01:00
if len ( ibs ) >= maxBlocksPerShard {
ibsToFlush = ibs
ibs = make ( [ ] * inmemoryBlock , 0 , maxBlocksPerShard )
tailItems = items [ i : ]
atomic . StoreUint64 ( & ris . lastFlushTime , fasttime . UnixTimestamp ( ) )
break
}
2022-12-04 08:30:31 +01:00
ib = getInmemoryBlock ( )
if ib . Add ( item ) {
2021-04-27 14:36:31 +02:00
ibs = append ( ibs , ib )
2022-12-04 08:30:31 +01:00
continue
2021-04-27 14:36:31 +02:00
}
2022-12-04 08:30:31 +01:00
putInmemoryBlock ( ib )
logger . Panicf ( "BUG: cannot insert too big item into an empty inmemoryBlock len(item)=%d; the caller should be responsible for avoiding too big items" , len ( item ) )
2021-04-27 14:36:31 +02:00
}
2022-12-04 08:30:31 +01:00
ris . ibs = ibs
2021-04-27 14:36:31 +02:00
ris . mu . Unlock ( )
2022-12-06 00:27:57 +01:00
tb . flushBlocksToParts ( ibsToFlush , false )
return tailItems
2021-04-27 14:36:31 +02:00
}
2019-05-22 23:16:55 +02:00
type partWrapper struct {
p * part
mp * inmemoryPart
refCount uint64
isInMerge bool
2022-12-06 00:27:57 +01:00
// The deadline when the in-memory part must be flushed to disk.
flushToDiskDeadline time . Time
2019-05-22 23:16:55 +02:00
}
func ( pw * partWrapper ) incRef ( ) {
atomic . AddUint64 ( & pw . refCount , 1 )
}
func ( pw * partWrapper ) decRef ( ) {
n := atomic . AddUint64 ( & pw . refCount , ^ uint64 ( 0 ) )
if int64 ( n ) < 0 {
logger . Panicf ( "BUG: pw.refCount must be bigger than 0; got %d" , int64 ( n ) )
}
if n > 0 {
return
}
if pw . mp != nil {
2022-03-03 15:46:35 +01:00
// Do not return pw.mp to pool via putInmemoryPart(),
// since pw.mp size may be too big compared to other entries stored in the pool.
// This may result in increased memory usage because of high fragmentation.
2019-05-22 23:16:55 +02:00
pw . mp = nil
}
pw . p . MustClose ( )
pw . p = nil
}
// OpenTable opens a table on the given path.
//
2019-08-29 13:39:05 +02:00
// Optional flushCallback is called every time new data batch is flushed
// to the underlying storage and becomes visible to search.
//
2019-09-20 18:46:47 +02:00
// Optional prepareBlock is called during merge before flushing the prepared block
// to persistent storage.
//
2019-05-22 23:16:55 +02:00
// The table is created if it doesn't exist yet.
2022-06-01 13:21:12 +02:00
func OpenTable ( path string , flushCallback func ( ) , prepareBlock PrepareBlockCallback , isReadOnly * uint32 ) ( * Table , error ) {
2019-05-22 23:16:55 +02:00
path = filepath . Clean ( path )
logger . Infof ( "opening table %q..." , path )
startTime := time . Now ( )
// Create a directory for the table if it doesn't exist yet.
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create directory %q: %w" , path , err )
2019-05-22 23:16:55 +02:00
}
// Protect from concurrent opens.
2019-08-13 00:45:22 +02:00
flockF , err := fs . CreateFlockFile ( path )
2019-05-22 23:16:55 +02:00
if err != nil {
2019-08-13 00:45:22 +02:00
return nil , err
2019-05-22 23:16:55 +02:00
}
// Open table parts.
pws , err := openParts ( path )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot open table parts at %q: %w" , path , err )
2019-05-22 23:16:55 +02:00
}
tb := & Table {
2019-08-29 13:39:05 +02:00
path : path ,
flushCallback : flushCallback ,
2019-09-20 18:46:47 +02:00
prepareBlock : prepareBlock ,
2022-06-01 13:21:12 +02:00
isReadOnly : isReadOnly ,
2022-12-06 00:27:57 +01:00
fileParts : pws ,
2019-08-29 13:39:05 +02:00
mergeIdx : uint64 ( time . Now ( ) . UnixNano ( ) ) ,
flockF : flockF ,
stopCh : make ( chan struct { } ) ,
2019-05-22 23:16:55 +02:00
}
2021-04-27 14:36:31 +02:00
tb . rawItems . init ( )
2022-12-04 09:01:04 +01:00
tb . startBackgroundWorkers ( )
2019-05-22 23:16:55 +02:00
var m TableMetrics
tb . UpdateMetrics ( & m )
2020-01-22 17:27:44 +01:00
logger . Infof ( "table %q has been opened in %.3f seconds; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d" ,
2022-12-06 00:27:57 +01:00
path , time . Since ( startTime ) . Seconds ( ) , m . FilePartsCount , m . FileBlocksCount , m . FileItemsCount , m . FileSizeBytes )
2019-05-22 23:16:55 +02:00
2021-07-06 11:17:15 +02:00
if flushCallback != nil {
tb . flushCallbackWorkerWG . Add ( 1 )
go func ( ) {
// call flushCallback once per 10 seconds in order to improve the effectiveness of caches,
// which are reset by the flushCallback.
tc := time . NewTicker ( 10 * time . Second )
for {
select {
case <- tb . stopCh :
tb . flushCallback ( )
tb . flushCallbackWorkerWG . Done ( )
return
case <- tc . C :
if atomic . CompareAndSwapUint32 ( & tb . needFlushCallbackCall , 1 , 0 ) {
tb . flushCallback ( )
}
}
}
} ( )
}
2019-05-22 23:16:55 +02:00
return tb , nil
}
2022-12-04 09:01:04 +01:00
func ( tb * Table ) startBackgroundWorkers ( ) {
2022-12-06 00:27:57 +01:00
tb . startMergeWorkers ( )
tb . startInmemoryPartsFlusher ( )
tb . startPendingItemsFlusher ( )
2022-12-04 09:01:04 +01:00
}
2019-05-22 23:16:55 +02:00
// MustClose closes the table.
func ( tb * Table ) MustClose ( ) {
close ( tb . stopCh )
2022-12-04 08:03:05 +01:00
logger . Infof ( "waiting for background workers to stop on %q..." , tb . path )
2019-05-22 23:16:55 +02:00
startTime := time . Now ( )
2022-12-04 08:03:05 +01:00
tb . wg . Wait ( )
logger . Infof ( "background workers stopped in %.3f seconds on %q" , time . Since ( startTime ) . Seconds ( ) , tb . path )
2019-05-22 23:16:55 +02:00
logger . Infof ( "flushing inmemory parts to files on %q..." , tb . path )
startTime = time . Now ( )
2022-12-06 00:27:57 +01:00
tb . flushInmemoryItems ( )
2022-12-06 06:30:48 +01:00
logger . Infof ( "inmemory parts have been successfully flushed to files in %.3f seconds at %q" , time . Since ( startTime ) . Seconds ( ) , tb . path )
2019-05-22 23:16:55 +02:00
2021-07-06 11:17:15 +02:00
logger . Infof ( "waiting for flush callback worker to stop on %q..." , tb . path )
startTime = time . Now ( )
tb . flushCallbackWorkerWG . Wait ( )
logger . Infof ( "flush callback worker stopped in %.3f seconds on %q" , time . Since ( startTime ) . Seconds ( ) , tb . path )
2022-12-06 00:27:57 +01:00
// Remove references to parts from the tb, so they may be eventually closed after all the searches are done.
2019-05-22 23:16:55 +02:00
tb . partsLock . Lock ( )
2022-12-06 00:27:57 +01:00
inmemoryParts := tb . inmemoryParts
fileParts := tb . fileParts
tb . inmemoryParts = nil
tb . fileParts = nil
2019-05-22 23:16:55 +02:00
tb . partsLock . Unlock ( )
2022-12-06 00:27:57 +01:00
for _ , pw := range inmemoryParts {
pw . decRef ( )
}
for _ , pw := range fileParts {
2019-05-22 23:16:55 +02:00
pw . decRef ( )
}
// Release flockF
if err := tb . flockF . Close ( ) ; err != nil {
logger . Panicf ( "FATAL:cannot close %q: %s" , tb . flockF . Name ( ) , err )
}
}
// Path returns the path to tb on the filesystem.
func ( tb * Table ) Path ( ) string {
return tb . path
}
// TableMetrics contains essential metrics for the Table.
type TableMetrics struct {
2022-12-06 00:27:57 +01:00
ActiveInmemoryMerges uint64
ActiveFileMerges uint64
InmemoryMergesCount uint64
FileMergesCount uint64
InmemoryItemsMerged uint64
FileItemsMerged uint64
2022-12-13 01:49:21 +01:00
InmemoryAssistedMerges uint64
FileAssistedMerges uint64
2022-12-06 00:27:57 +01:00
2022-04-21 12:18:05 +02:00
ItemsAdded uint64
ItemsAddedSizeBytes uint64
2019-05-22 23:16:55 +02:00
PendingItems uint64
2022-12-06 00:27:57 +01:00
InmemoryPartsCount uint64
FilePartsCount uint64
InmemoryBlocksCount uint64
FileBlocksCount uint64
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
InmemoryItemsCount uint64
FileItemsCount uint64
InmemorySizeBytes uint64
FileSizeBytes uint64
2019-05-22 23:16:55 +02:00
2021-12-02 09:28:45 +01:00
DataBlocksCacheSize uint64
DataBlocksCacheSizeBytes uint64
DataBlocksCacheSizeMaxBytes uint64
DataBlocksCacheRequests uint64
DataBlocksCacheMisses uint64
IndexBlocksCacheSize uint64
IndexBlocksCacheSizeBytes uint64
IndexBlocksCacheSizeMaxBytes uint64
IndexBlocksCacheRequests uint64
IndexBlocksCacheMisses uint64
2019-05-22 23:16:55 +02:00
PartsRefCount uint64
}
2022-12-06 00:27:57 +01:00
// TotalItemsCount returns the total number of items in the table.
func ( tm * TableMetrics ) TotalItemsCount ( ) uint64 {
return tm . InmemoryItemsCount + tm . FileItemsCount
}
2019-05-22 23:16:55 +02:00
// UpdateMetrics updates m with metrics from tb.
func ( tb * Table ) UpdateMetrics ( m * TableMetrics ) {
2022-12-06 00:27:57 +01:00
m . ActiveInmemoryMerges += atomic . LoadUint64 ( & tb . activeInmemoryMerges )
m . ActiveFileMerges += atomic . LoadUint64 ( & tb . activeFileMerges )
m . InmemoryMergesCount += atomic . LoadUint64 ( & tb . inmemoryMergesCount )
m . FileMergesCount += atomic . LoadUint64 ( & tb . fileMergesCount )
m . InmemoryItemsMerged += atomic . LoadUint64 ( & tb . inmemoryItemsMerged )
m . FileItemsMerged += atomic . LoadUint64 ( & tb . fileItemsMerged )
2022-12-13 01:49:21 +01:00
m . InmemoryAssistedMerges += atomic . LoadUint64 ( & tb . inmemoryAssistedMerges )
m . FileAssistedMerges += atomic . LoadUint64 ( & tb . fileAssistedMerges )
2022-12-06 00:27:57 +01:00
2022-04-21 12:18:05 +02:00
m . ItemsAdded += atomic . LoadUint64 ( & tb . itemsAdded )
m . ItemsAddedSizeBytes += atomic . LoadUint64 ( & tb . itemsAddedSizeBytes )
2019-05-22 23:16:55 +02:00
2021-04-27 14:36:31 +02:00
m . PendingItems += uint64 ( tb . rawItems . Len ( ) )
2019-05-22 23:16:55 +02:00
tb . partsLock . Lock ( )
2022-12-06 00:27:57 +01:00
m . InmemoryPartsCount += uint64 ( len ( tb . inmemoryParts ) )
for _ , pw := range tb . inmemoryParts {
p := pw . p
m . InmemoryBlocksCount += p . ph . blocksCount
m . InmemoryItemsCount += p . ph . itemsCount
m . InmemorySizeBytes += p . size
m . PartsRefCount += atomic . LoadUint64 ( & pw . refCount )
}
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
m . FilePartsCount += uint64 ( len ( tb . fileParts ) )
for _ , pw := range tb . fileParts {
p := pw . p
m . FileBlocksCount += p . ph . blocksCount
m . FileItemsCount += p . ph . itemsCount
m . FileSizeBytes += p . size
2019-05-22 23:16:55 +02:00
m . PartsRefCount += atomic . LoadUint64 ( & pw . refCount )
}
tb . partsLock . Unlock ( )
2022-01-20 17:34:59 +01:00
m . DataBlocksCacheSize = uint64 ( ibCache . Len ( ) )
m . DataBlocksCacheSizeBytes = uint64 ( ibCache . SizeBytes ( ) )
m . DataBlocksCacheSizeMaxBytes = uint64 ( ibCache . SizeMaxBytes ( ) )
m . DataBlocksCacheRequests = ibCache . Requests ( )
m . DataBlocksCacheMisses = ibCache . Misses ( )
m . IndexBlocksCacheSize = uint64 ( idxbCache . Len ( ) )
m . IndexBlocksCacheSizeBytes = uint64 ( idxbCache . SizeBytes ( ) )
m . IndexBlocksCacheSizeMaxBytes = uint64 ( idxbCache . SizeMaxBytes ( ) )
m . IndexBlocksCacheRequests = idxbCache . Requests ( )
m . IndexBlocksCacheMisses = idxbCache . Misses ( )
2019-05-22 23:16:55 +02:00
}
// AddItems adds the given items to the tb.
2022-12-04 08:30:31 +01:00
//
// The function panics when items contains an item with length exceeding maxInmemoryBlockSize.
// It is caller's responsibility to make sure there are no too long items.
func ( tb * Table ) AddItems ( items [ ] [ ] byte ) {
tb . rawItems . addItems ( tb , items )
2022-04-21 12:18:05 +02:00
atomic . AddUint64 ( & tb . itemsAdded , uint64 ( len ( items ) ) )
n := 0
for _ , item := range items {
n += len ( item )
}
atomic . AddUint64 ( & tb . itemsAddedSizeBytes , uint64 ( n ) )
2019-05-22 23:16:55 +02:00
}
// getParts appends parts snapshot to dst and returns it.
//
// The appended parts must be released with putParts.
func ( tb * Table ) getParts ( dst [ ] * partWrapper ) [ ] * partWrapper {
tb . partsLock . Lock ( )
2022-12-06 00:27:57 +01:00
for _ , pw := range tb . inmemoryParts {
pw . incRef ( )
}
for _ , pw := range tb . fileParts {
2019-05-22 23:16:55 +02:00
pw . incRef ( )
}
2022-12-06 00:27:57 +01:00
dst = append ( dst , tb . inmemoryParts ... )
dst = append ( dst , tb . fileParts ... )
2019-05-22 23:16:55 +02:00
tb . partsLock . Unlock ( )
return dst
}
// putParts releases the given pws obtained via getParts.
func ( tb * Table ) putParts ( pws [ ] * partWrapper ) {
for _ , pw := range pws {
pw . decRef ( )
}
}
2022-12-06 00:27:57 +01:00
func ( tb * Table ) mergePartsOptimal ( pws [ ] * partWrapper ) error {
sortPartsForOptimalMerge ( pws )
for len ( pws ) > 0 {
n := defaultPartsToMerge
if n > len ( pws ) {
n = len ( pws )
}
pwsChunk := pws [ : n ]
pws = pws [ n : ]
err := tb . mergeParts ( pwsChunk , nil , true )
if err == nil {
continue
}
tb . releasePartsToMerge ( pws )
return fmt . Errorf ( "cannot optimally merge %d parts: %w" , n , err )
}
return nil
}
// DebugFlush flushes all the added items to the storage, so they become visible to search.
//
// This function is only for debugging and testing.
func ( tb * Table ) DebugFlush ( ) {
tb . flushPendingItems ( nil , true )
// Wait for background flushers to finish.
tb . rawItemsPendingFlushesWG . Wait ( )
}
func ( tb * Table ) startInmemoryPartsFlusher ( ) {
tb . wg . Add ( 1 )
go func ( ) {
tb . inmemoryPartsFlusher ( )
tb . wg . Done ( )
} ( )
}
func ( tb * Table ) startPendingItemsFlusher ( ) {
2022-12-04 08:03:05 +01:00
tb . wg . Add ( 1 )
2019-05-22 23:16:55 +02:00
go func ( ) {
2022-12-06 00:27:57 +01:00
tb . pendingItemsFlusher ( )
2022-12-04 08:03:05 +01:00
tb . wg . Done ( )
2019-05-22 23:16:55 +02:00
} ( )
}
2022-12-06 00:27:57 +01:00
func ( tb * Table ) inmemoryPartsFlusher ( ) {
ticker := time . NewTicker ( dataFlushInterval )
2020-02-13 11:55:58 +01:00
defer ticker . Stop ( )
2019-05-22 23:16:55 +02:00
for {
select {
case <- tb . stopCh :
return
2020-02-13 11:55:58 +01:00
case <- ticker . C :
2022-12-06 00:27:57 +01:00
tb . flushInmemoryParts ( false )
2019-05-22 23:16:55 +02:00
}
}
}
2022-12-06 00:27:57 +01:00
func ( tb * Table ) pendingItemsFlusher ( ) {
ticker := time . NewTicker ( pendingItemsFlushInterval )
defer ticker . Stop ( )
var ibs [ ] * inmemoryBlock
for {
select {
case <- tb . stopCh :
return
case <- ticker . C :
ibs = tb . flushPendingItems ( ibs [ : 0 ] , false )
for i := range ibs {
ibs [ i ] = nil
}
2019-05-22 23:16:55 +02:00
}
}
}
2022-12-06 00:27:57 +01:00
func ( tb * Table ) flushPendingItems ( dst [ ] * inmemoryBlock , isFinal bool ) [ ] * inmemoryBlock {
return tb . rawItems . flush ( tb , dst , isFinal )
}
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
func ( tb * Table ) flushInmemoryItems ( ) {
tb . rawItems . flush ( tb , nil , true )
tb . flushInmemoryParts ( true )
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
func ( tb * Table ) flushInmemoryParts ( isFinal bool ) {
for {
currentTime := time . Now ( )
var pws [ ] * partWrapper
tb . partsLock . Lock ( )
for _ , pw := range tb . inmemoryParts {
if ! pw . isInMerge && ( isFinal || pw . flushToDiskDeadline . Before ( currentTime ) ) {
pw . isInMerge = true
pws = append ( pws , pw )
}
}
tb . partsLock . Unlock ( )
if err := tb . mergePartsOptimal ( pws ) ; err != nil {
logger . Panicf ( "FATAL: cannot merge in-memory parts: %s" , err )
}
if ! isFinal {
return
}
tb . partsLock . Lock ( )
n := len ( tb . inmemoryParts )
tb . partsLock . Unlock ( )
if n == 0 {
// All the in-memory parts were flushed to disk.
return
}
// Some parts weren't flushed to disk because they were being merged.
// Sleep for a while and try flushing them again.
time . Sleep ( 10 * time . Millisecond )
}
2021-04-27 14:36:31 +02:00
}
2022-12-06 00:27:57 +01:00
func ( riss * rawItemsShards ) flush ( tb * Table , dst [ ] * inmemoryBlock , isFinal bool ) [ ] * inmemoryBlock {
2019-05-22 23:16:55 +02:00
tb . rawItemsPendingFlushesWG . Add ( 1 )
defer tb . rawItemsPendingFlushesWG . Done ( )
2021-04-27 14:36:31 +02:00
for i := range riss . shards {
2022-12-06 00:27:57 +01:00
dst = riss . shards [ i ] . appendBlocksToFlush ( dst , tb , isFinal )
2021-04-27 14:36:31 +02:00
}
2022-12-06 00:27:57 +01:00
tb . flushBlocksToParts ( dst , isFinal )
return dst
2021-04-27 14:36:31 +02:00
}
2021-06-17 12:42:32 +02:00
func ( ris * rawItemsShard ) appendBlocksToFlush ( dst [ ] * inmemoryBlock , tb * Table , isFinal bool ) [ ] * inmemoryBlock {
2020-05-14 21:01:51 +02:00
currentTime := fasttime . UnixTimestamp ( )
2022-12-06 00:27:57 +01:00
flushSeconds := int64 ( pendingItemsFlushInterval . Seconds ( ) )
2020-05-14 21:01:51 +02:00
if flushSeconds <= 0 {
flushSeconds = 1
}
2022-10-17 17:01:26 +02:00
lastFlushTime := atomic . LoadUint64 ( & ris . lastFlushTime )
2022-12-06 00:27:57 +01:00
if ! isFinal && currentTime < lastFlushTime + uint64 ( flushSeconds ) {
2022-10-21 13:33:03 +02:00
// Fast path - nothing to flush
return dst
2019-05-22 23:16:55 +02:00
}
2022-10-21 13:33:03 +02:00
// Slow path - move ris.ibs to dst
ris . mu . Lock ( )
ibs := ris . ibs
dst = append ( dst , ibs ... )
for i := range ibs {
ibs [ i ] = nil
}
ris . ibs = ibs [ : 0 ]
atomic . StoreUint64 ( & ris . lastFlushTime , currentTime )
ris . mu . Unlock ( )
2021-06-17 12:42:32 +02:00
return dst
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
func ( tb * Table ) flushBlocksToParts ( ibs [ ] * inmemoryBlock , isFinal bool ) {
2021-06-17 12:42:32 +02:00
if len ( ibs ) == 0 {
return
}
var pwsLock sync . Mutex
2022-12-06 00:27:57 +01:00
pws := make ( [ ] * partWrapper , 0 , ( len ( ibs ) + defaultPartsToMerge - 1 ) / defaultPartsToMerge )
wg := getWaitGroup ( )
2021-06-17 12:42:32 +02:00
for len ( ibs ) > 0 {
2019-05-22 23:16:55 +02:00
n := defaultPartsToMerge
2021-06-17 12:42:32 +02:00
if n > len ( ibs ) {
n = len ( ibs )
2019-05-22 23:16:55 +02:00
}
2021-06-17 12:42:32 +02:00
wg . Add ( 1 )
2022-12-06 00:27:57 +01:00
flushConcurrencyCh <- struct { } { }
go func ( ibsChunk [ ] * inmemoryBlock ) {
defer func ( ) {
<- flushConcurrencyCh
wg . Done ( )
} ( )
pw := tb . createInmemoryPart ( ibsChunk )
2021-06-17 12:42:32 +02:00
if pw == nil {
return
}
pwsLock . Lock ( )
pws = append ( pws , pw )
pwsLock . Unlock ( )
} ( ibs [ : n ] )
ibs = ibs [ n : ]
2019-05-22 23:16:55 +02:00
}
2021-06-17 12:42:32 +02:00
wg . Wait ( )
2022-12-06 00:27:57 +01:00
putWaitGroup ( wg )
tb . partsLock . Lock ( )
tb . inmemoryParts = append ( tb . inmemoryParts , pws ... )
tb . partsLock . Unlock ( )
flushConcurrencyCh <- struct { } { }
tb . assistedMergeForInmemoryParts ( )
2022-12-13 01:49:21 +01:00
tb . assistedMergeForFileParts ( )
2022-12-06 00:27:57 +01:00
<- flushConcurrencyCh
if tb . flushCallback != nil {
if isFinal {
tb . flushCallback ( )
} else {
atomic . CompareAndSwapUint32 ( & tb . needFlushCallbackCall , 0 , 1 )
2019-08-29 13:39:05 +02:00
}
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
}
var flushConcurrencyCh = make ( chan struct { } , cgroup . AvailableCPUs ( ) )
2019-05-22 23:16:55 +02:00
2022-12-28 23:32:18 +01:00
func needAssistedMerge ( pws [ ] * partWrapper , maxParts int ) bool {
if len ( pws ) < maxParts {
return false
}
return getNotInMergePartsCount ( pws ) >= defaultPartsToMerge
}
2022-12-06 00:27:57 +01:00
func ( tb * Table ) assistedMergeForInmemoryParts ( ) {
2019-05-22 23:16:55 +02:00
for {
tb . partsLock . Lock ( )
2022-12-28 23:32:18 +01:00
needMerge := needAssistedMerge ( tb . inmemoryParts , maxInmemoryParts )
2019-05-22 23:16:55 +02:00
tb . partsLock . Unlock ( )
2022-12-28 23:32:18 +01:00
if ! needMerge {
2019-05-22 23:16:55 +02:00
return
}
2020-07-22 23:58:48 +02:00
// Prioritize assisted merges over searches.
storagepacelimiter . Search . Inc ( )
2022-12-13 01:49:21 +01:00
atomic . AddUint64 ( & tb . inmemoryAssistedMerges , 1 )
2022-12-06 00:27:57 +01:00
err := tb . mergeInmemoryParts ( )
2020-07-22 23:58:48 +02:00
storagepacelimiter . Search . Dec ( )
2019-05-22 23:16:55 +02:00
if err == nil {
continue
}
2022-12-06 00:27:57 +01:00
if errors . Is ( err , errNothingToMerge ) || errors . Is ( err , errForciblyStopped ) {
2019-05-22 23:16:55 +02:00
return
}
2022-12-06 00:27:57 +01:00
logger . Panicf ( "FATAL: cannot assist with merging inmemory parts: %s" , err )
}
}
2022-12-13 01:49:21 +01:00
func ( tb * Table ) assistedMergeForFileParts ( ) {
for {
tb . partsLock . Lock ( )
2022-12-28 23:32:18 +01:00
needMerge := needAssistedMerge ( tb . fileParts , maxFileParts )
2022-12-13 01:49:21 +01:00
tb . partsLock . Unlock ( )
2022-12-28 23:32:18 +01:00
if ! needMerge {
2022-12-13 01:49:21 +01:00
return
}
// Prioritize assisted merges over searches.
storagepacelimiter . Search . Inc ( )
atomic . AddUint64 ( & tb . fileAssistedMerges , 1 )
err := tb . mergeExistingParts ( false )
storagepacelimiter . Search . Dec ( )
if err == nil {
continue
}
if errors . Is ( err , errNothingToMerge ) || errors . Is ( err , errForciblyStopped ) || errors . Is ( err , errReadOnlyMode ) {
return
}
logger . Panicf ( "FATAL: cannot assist with merging file parts: %s" , err )
}
}
2022-12-06 00:27:57 +01:00
func getNotInMergePartsCount ( pws [ ] * partWrapper ) int {
n := 0
for _ , pw := range pws {
if ! pw . isInMerge {
n ++
}
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
return n
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
func getWaitGroup ( ) * sync . WaitGroup {
v := wgPool . Get ( )
if v == nil {
return & sync . WaitGroup { }
}
return v . ( * sync . WaitGroup )
}
func putWaitGroup ( wg * sync . WaitGroup ) {
wgPool . Put ( wg )
}
2022-07-27 22:47:18 +02:00
2022-12-06 00:27:57 +01:00
var wgPool sync . Pool
func ( tb * Table ) createInmemoryPart ( ibs [ ] * inmemoryBlock ) * partWrapper {
2022-12-04 07:45:53 +01:00
outItemsCount := uint64 ( 0 )
for _ , ib := range ibs {
outItemsCount += uint64 ( ib . Len ( ) )
}
2022-07-27 22:47:18 +02:00
// Prepare blockStreamReaders for source blocks.
bsrs := make ( [ ] * blockStreamReader , 0 , len ( ibs ) )
2021-06-17 12:42:32 +02:00
for _ , ib := range ibs {
2019-05-22 23:16:55 +02:00
if len ( ib . items ) == 0 {
continue
}
2022-07-27 22:47:18 +02:00
bsr := getBlockStreamReader ( )
bsr . InitFromInmemoryBlock ( ib )
2021-05-28 00:09:32 +02:00
putInmemoryBlock ( ib )
2022-07-27 22:47:18 +02:00
bsrs = append ( bsrs , bsr )
2020-02-13 13:06:51 +01:00
}
2022-07-27 22:47:18 +02:00
if len ( bsrs ) == 0 {
2020-02-13 13:06:51 +01:00
return nil
}
2022-12-06 00:27:57 +01:00
flushToDiskDeadline := time . Now ( ) . Add ( dataFlushInterval )
2022-07-27 22:47:18 +02:00
if len ( bsrs ) == 1 {
2020-02-13 13:06:51 +01:00
// Nothing to merge. Just return a single inmemory part.
2022-12-06 00:27:57 +01:00
bsr := bsrs [ 0 ]
2022-08-04 17:22:41 +02:00
mp := & inmemoryPart { }
2022-12-06 00:27:57 +01:00
mp . Init ( & bsr . Block )
putBlockStreamReader ( bsr )
return newPartWrapperFromInmemoryPart ( mp , flushToDiskDeadline )
2019-05-22 23:16:55 +02:00
}
// Prepare blockStreamWriter for destination part.
2022-12-04 07:45:53 +01:00
compressLevel := getCompressLevel ( outItemsCount )
2019-05-22 23:16:55 +02:00
bsw := getBlockStreamWriter ( )
2022-03-03 15:46:35 +01:00
mpDst := & inmemoryPart { }
2022-12-04 07:45:53 +01:00
bsw . InitFromInmemoryPart ( mpDst , compressLevel )
2019-05-22 23:16:55 +02:00
// Merge parts.
// The merge shouldn't be interrupted by stopCh,
// since it may be final after stopCh is closed.
2022-12-06 00:27:57 +01:00
atomic . AddUint64 ( & tb . activeInmemoryMerges , 1 )
err := mergeBlockStreams ( & mpDst . ph , bsw , bsrs , tb . prepareBlock , nil , & tb . inmemoryItemsMerged )
atomic . AddUint64 ( & tb . activeInmemoryMerges , ^ uint64 ( 0 ) )
atomic . AddUint64 ( & tb . inmemoryMergesCount , 1 )
2020-07-22 23:58:48 +02:00
if err != nil {
2019-05-22 23:16:55 +02:00
logger . Panicf ( "FATAL: cannot merge inmemoryBlocks: %s" , err )
}
putBlockStreamWriter ( bsw )
for _ , bsr := range bsrs {
putBlockStreamReader ( bsr )
}
2022-12-06 00:27:57 +01:00
return newPartWrapperFromInmemoryPart ( mpDst , flushToDiskDeadline )
}
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
func newPartWrapperFromInmemoryPart ( mp * inmemoryPart , flushToDiskDeadline time . Time ) * partWrapper {
p := mp . NewPart ( )
2019-05-22 23:16:55 +02:00
return & partWrapper {
2022-12-06 00:27:57 +01:00
p : p ,
mp : mp ,
refCount : 1 ,
flushToDiskDeadline : flushToDiskDeadline ,
2019-05-22 23:16:55 +02:00
}
}
2022-12-06 00:27:57 +01:00
func ( tb * Table ) startMergeWorkers ( ) {
2022-12-13 01:49:21 +01:00
// Start a merge worker per available CPU core.
// The actual number of concurrent merges is limited inside mergeWorker() below.
workersCount := cgroup . AvailableCPUs ( )
for i := 0 ; i < workersCount ; i ++ {
2022-12-04 08:03:05 +01:00
tb . wg . Add ( 1 )
2019-05-22 23:16:55 +02:00
go func ( ) {
2022-12-06 00:27:57 +01:00
tb . mergeWorker ( )
2022-12-04 08:03:05 +01:00
tb . wg . Done ( )
2019-05-22 23:16:55 +02:00
} ( )
}
}
2022-12-06 00:27:57 +01:00
func getMaxInmemoryPartSize ( ) uint64 {
// Allow up to 5% of memory for in-memory parts.
n := uint64 ( 0.05 * float64 ( memory . Allowed ( ) ) / maxInmemoryParts )
if n < 1e6 {
n = 1e6
}
return n
}
func ( tb * Table ) getMaxFilePartSize ( ) uint64 {
n := fs . MustGetFreeSpace ( tb . path )
// Divide free space by the max number of concurrent merges.
maxOutBytes := n / uint64 ( cap ( mergeWorkersLimitCh ) )
if maxOutBytes > maxPartSize {
maxOutBytes = maxPartSize
}
return maxOutBytes
}
2022-06-01 13:21:12 +02:00
func ( tb * Table ) canBackgroundMerge ( ) bool {
return atomic . LoadUint32 ( tb . isReadOnly ) == 0
}
2022-09-26 15:39:56 +02:00
var errReadOnlyMode = fmt . Errorf ( "storage is in readonly mode" )
2022-12-06 00:27:57 +01:00
func ( tb * Table ) mergeInmemoryParts ( ) error {
maxOutBytes := tb . getMaxFilePartSize ( )
tb . partsLock . Lock ( )
pws := getPartsToMerge ( tb . inmemoryParts , maxOutBytes , false )
tb . partsLock . Unlock ( )
return tb . mergeParts ( pws , tb . stopCh , false )
}
2019-09-19 20:48:14 +02:00
func ( tb * Table ) mergeExistingParts ( isFinal bool ) error {
2022-06-01 13:21:12 +02:00
if ! tb . canBackgroundMerge ( ) {
// Do not perform background merge in read-only mode
// in order to prevent from disk space shortage.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
2022-09-26 15:39:56 +02:00
return errReadOnlyMode
2022-06-01 13:21:12 +02:00
}
2022-12-06 00:27:57 +01:00
maxOutBytes := tb . getMaxFilePartSize ( )
2019-05-22 23:16:55 +02:00
tb . partsLock . Lock ( )
2022-12-06 00:27:57 +01:00
dst := make ( [ ] * partWrapper , 0 , len ( tb . inmemoryParts ) + len ( tb . fileParts ) )
dst = append ( dst , tb . inmemoryParts ... )
dst = append ( dst , tb . fileParts ... )
pws := getPartsToMerge ( dst , maxOutBytes , isFinal )
2019-05-22 23:16:55 +02:00
tb . partsLock . Unlock ( )
2022-12-06 00:27:57 +01:00
return tb . mergeParts ( pws , tb . stopCh , isFinal )
2019-05-22 23:16:55 +02:00
}
const (
2022-12-06 00:27:57 +01:00
minMergeSleepTime = 10 * time . Millisecond
maxMergeSleepTime = 10 * time . Second
2019-05-22 23:16:55 +02:00
)
2022-12-06 00:27:57 +01:00
func ( tb * Table ) mergeWorker ( ) {
2019-05-22 23:16:55 +02:00
sleepTime := minMergeSleepTime
2020-05-14 21:01:51 +02:00
var lastMergeTime uint64
2019-05-22 23:16:55 +02:00
isFinal := false
t := time . NewTimer ( sleepTime )
for {
2022-12-13 01:49:21 +01:00
// Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers
// across tables may exceed the the cap(mergeWorkersLimitCh).
2022-12-06 00:27:57 +01:00
mergeWorkersLimitCh <- struct { } { }
2019-09-19 20:48:14 +02:00
err := tb . mergeExistingParts ( isFinal )
2022-12-06 00:27:57 +01:00
<- mergeWorkersLimitCh
2019-05-22 23:16:55 +02:00
if err == nil {
// Try merging additional parts.
sleepTime = minMergeSleepTime
2020-05-14 21:01:51 +02:00
lastMergeTime = fasttime . UnixTimestamp ( )
2019-05-22 23:16:55 +02:00
isFinal = false
continue
}
2020-09-17 02:02:35 +02:00
if errors . Is ( err , errForciblyStopped ) {
2019-05-22 23:16:55 +02:00
// The merger has been stopped.
2022-12-06 00:27:57 +01:00
return
2019-05-22 23:16:55 +02:00
}
2022-09-26 15:39:56 +02:00
if ! errors . Is ( err , errNothingToMerge ) && ! errors . Is ( err , errReadOnlyMode ) {
2022-12-06 00:27:57 +01:00
// Unexpected error.
logger . Panicf ( "FATAL: unrecoverable error when merging inmemory parts in %q: %s" , tb . path , err )
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
if finalMergeDelaySeconds > 0 && fasttime . UnixTimestamp ( ) - lastMergeTime > finalMergeDelaySeconds {
2019-05-22 23:16:55 +02:00
// We have free time for merging into bigger parts.
// This should improve select performance.
2020-05-14 21:01:51 +02:00
lastMergeTime = fasttime . UnixTimestamp ( )
2019-05-22 23:16:55 +02:00
isFinal = true
continue
}
// Nothing to merge. Sleep for a while and try again.
sleepTime *= 2
if sleepTime > maxMergeSleepTime {
sleepTime = maxMergeSleepTime
}
select {
case <- tb . stopCh :
2022-12-06 00:27:57 +01:00
return
2019-05-22 23:16:55 +02:00
case <- t . C :
t . Reset ( sleepTime )
}
}
}
2022-12-06 00:27:57 +01:00
// Disable final merge by default, since it may lead to high disk IO and CPU usage
// after some inactivity time.
var finalMergeDelaySeconds = uint64 ( 0 )
// SetFinalMergeDelay sets the delay before doing final merge for Table without newly ingested data.
//
// This function may be called only before Table initialization.
func SetFinalMergeDelay ( delay time . Duration ) {
if delay <= 0 {
return
}
finalMergeDelaySeconds = uint64 ( delay . Seconds ( ) + 1 )
}
2019-05-22 23:16:55 +02:00
var errNothingToMerge = fmt . Errorf ( "nothing to merge" )
2022-12-01 04:53:02 +01:00
func ( tb * Table ) releasePartsToMerge ( pws [ ] * partWrapper ) {
tb . partsLock . Lock ( )
for _ , pw := range pws {
if ! pw . isInMerge {
logger . Panicf ( "BUG: missing isInMerge flag on the part %q" , pw . p . path )
}
pw . isInMerge = false
}
tb . partsLock . Unlock ( )
}
2022-12-06 00:27:57 +01:00
// mergeParts merges pws to a single resulting part.
2020-09-17 01:05:54 +02:00
//
// Merging is immediately stopped if stopCh is closed.
//
2022-12-06 00:27:57 +01:00
// If isFinal is set, then the resulting part will be stored to disk.
//
2020-09-17 01:05:54 +02:00
// All the parts inside pws must have isInMerge field set to true.
2022-12-06 00:27:57 +01:00
func ( tb * Table ) mergeParts ( pws [ ] * partWrapper , stopCh <- chan struct { } , isFinal bool ) error {
2019-05-22 23:16:55 +02:00
if len ( pws ) == 0 {
// Nothing to merge.
return errNothingToMerge
}
2022-12-01 04:53:02 +01:00
defer tb . releasePartsToMerge ( pws )
2019-05-22 23:16:55 +02:00
startTime := time . Now ( )
2022-12-06 00:27:57 +01:00
// Initialize destination paths.
dstPartType := getDstPartType ( pws , isFinal )
tmpPartPath , mergeIdx := tb . getDstPartPaths ( dstPartType )
if isFinal && len ( pws ) == 1 && pws [ 0 ] . mp != nil {
// Fast path: flush a single in-memory part to disk.
mp := pws [ 0 ] . mp
if tmpPartPath == "" {
logger . Panicf ( "BUG: tmpPartPath must be non-empty" )
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
if err := mp . StoreToDisk ( tmpPartPath ) ; err != nil {
return fmt . Errorf ( "cannot store in-memory part to %q: %w" , tmpPartPath , err )
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
pwNew , err := tb . openCreatedPart ( & mp . ph , pws , nil , tmpPartPath , mergeIdx )
if err != nil {
return fmt . Errorf ( "cannot atomically register the created part: %w" , err )
}
tb . swapSrcWithDstParts ( pws , pwNew , dstPartType )
return nil
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
// Prepare BlockStreamReaders for source parts.
bsrs , err := openBlockStreamReaders ( pws )
if err != nil {
return err
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
closeBlockStreamReaders := func ( ) {
for _ , bsr := range bsrs {
putBlockStreamReader ( bsr )
}
bsrs = nil
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
// Prepare BlockStreamWriter for destination part.
srcSize := uint64 ( 0 )
srcItemsCount := uint64 ( 0 )
srcBlocksCount := uint64 ( 0 )
for _ , pw := range pws {
srcSize += pw . p . size
srcItemsCount += pw . p . ph . itemsCount
srcBlocksCount += pw . p . ph . blocksCount
}
compressLevel := getCompressLevel ( srcItemsCount )
2019-05-22 23:16:55 +02:00
bsw := getBlockStreamWriter ( )
2022-12-06 00:27:57 +01:00
var mpNew * inmemoryPart
if dstPartType == partInmemory {
mpNew = & inmemoryPart { }
bsw . InitFromInmemoryPart ( mpNew , compressLevel )
} else {
if tmpPartPath == "" {
logger . Panicf ( "BUG: tmpPartPath must be non-empty" )
}
nocache := srcItemsCount > maxItemsPerCachedPart ( )
if err := bsw . InitFromFilePart ( tmpPartPath , nocache , compressLevel ) ; err != nil {
closeBlockStreamReaders ( )
return fmt . Errorf ( "cannot create destination part at %q: %w" , tmpPartPath , err )
}
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
// Merge source parts to destination part.
ph , err := tb . mergePartsInternal ( tmpPartPath , bsw , bsrs , dstPartType , stopCh )
2019-05-22 23:16:55 +02:00
putBlockStreamWriter ( bsw )
2022-12-06 00:27:57 +01:00
closeBlockStreamReaders ( )
2019-05-22 23:16:55 +02:00
if err != nil {
2022-12-06 00:27:57 +01:00
return fmt . Errorf ( "cannot merge %d parts: %w" , len ( pws ) , err )
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
if mpNew != nil {
// Update partHeader for destination inmemory part after the merge.
mpNew . ph = * ph
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
// Atomically move the created part from tmpPartPath to its destination
// and swap the source parts with the newly created part.
pwNew , err := tb . openCreatedPart ( ph , pws , mpNew , tmpPartPath , mergeIdx )
if err != nil {
return fmt . Errorf ( "cannot atomically register the created part: %w" , err )
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
tb . swapSrcWithDstParts ( pws , pwNew , dstPartType )
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
d := time . Since ( startTime )
if d <= 30 * time . Second {
return nil
}
// Log stats for long merges.
dstItemsCount := uint64 ( 0 )
dstBlocksCount := uint64 ( 0 )
dstSize := uint64 ( 0 )
dstPartPath := ""
if pwNew != nil {
pDst := pwNew . p
dstItemsCount = pDst . ph . itemsCount
dstBlocksCount = pDst . ph . blocksCount
dstSize = pDst . size
dstPartPath = pDst . path
}
durationSecs := d . Seconds ( )
itemsPerSec := int ( float64 ( srcItemsCount ) / durationSecs )
logger . Infof ( "merged (%d parts, %d items, %d blocks, %d bytes) into (1 part, %d items, %d blocks, %d bytes) in %.3f seconds at %d items/sec to %q" ,
len ( pws ) , srcItemsCount , srcBlocksCount , srcSize , dstItemsCount , dstBlocksCount , dstSize , durationSecs , itemsPerSec , dstPartPath )
return nil
}
func getFlushToDiskDeadline ( pws [ ] * partWrapper ) time . Time {
d := pws [ 0 ] . flushToDiskDeadline
for _ , pw := range pws [ 1 : ] {
if pw . flushToDiskDeadline . Before ( d ) {
d = pw . flushToDiskDeadline
2019-05-22 23:16:55 +02:00
}
}
2022-12-06 00:27:57 +01:00
return d
}
type partType int
var (
partInmemory = partType ( 0 )
partFile = partType ( 1 )
)
func getDstPartType ( pws [ ] * partWrapper , isFinal bool ) partType {
dstPartSize := getPartsSize ( pws )
if isFinal || dstPartSize > getMaxInmemoryPartSize ( ) {
return partFile
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
if ! areAllInmemoryParts ( pws ) {
// If at least a single source part is located in file,
// then the destination part must be in file for durability reasons.
return partFile
}
return partInmemory
}
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
func ( tb * Table ) getDstPartPaths ( dstPartType partType ) ( string , uint64 ) {
tmpPartPath := ""
mergeIdx := tb . nextMergeIdx ( )
switch dstPartType {
case partInmemory :
case partFile :
tmpPartPath = fmt . Sprintf ( "%s/tmp/%016X" , tb . path , mergeIdx )
default :
logger . Panicf ( "BUG: unknown partType=%d" , dstPartType )
}
return tmpPartPath , mergeIdx
}
func openBlockStreamReaders ( pws [ ] * partWrapper ) ( [ ] * blockStreamReader , error ) {
bsrs := make ( [ ] * blockStreamReader , 0 , len ( pws ) )
for _ , pw := range pws {
bsr := getBlockStreamReader ( )
if pw . mp != nil {
bsr . InitFromInmemoryPart ( pw . mp )
} else {
if err := bsr . InitFromFilePart ( pw . p . path ) ; err != nil {
for _ , bsr := range bsrs {
putBlockStreamReader ( bsr )
}
return nil , fmt . Errorf ( "cannot open source part for merging: %w" , err )
}
}
bsrs = append ( bsrs , bsr )
}
return bsrs , nil
}
func ( tb * Table ) mergePartsInternal ( tmpPartPath string , bsw * blockStreamWriter , bsrs [ ] * blockStreamReader , dstPartType partType , stopCh <- chan struct { } ) ( * partHeader , error ) {
var ph partHeader
var itemsMerged * uint64
var mergesCount * uint64
var activeMerges * uint64
switch dstPartType {
case partInmemory :
itemsMerged = & tb . inmemoryItemsMerged
mergesCount = & tb . inmemoryMergesCount
activeMerges = & tb . activeInmemoryMerges
case partFile :
itemsMerged = & tb . fileItemsMerged
mergesCount = & tb . fileMergesCount
activeMerges = & tb . activeFileMerges
default :
logger . Panicf ( "BUG: unknown partType=%d" , dstPartType )
}
atomic . AddUint64 ( activeMerges , 1 )
err := mergeBlockStreams ( & ph , bsw , bsrs , tb . prepareBlock , stopCh , itemsMerged )
atomic . AddUint64 ( activeMerges , ^ uint64 ( 0 ) )
atomic . AddUint64 ( mergesCount , 1 )
if err != nil {
return nil , fmt . Errorf ( "cannot merge parts to %q: %w" , tmpPartPath , err )
}
if tmpPartPath != "" {
if err := ph . WriteMetadata ( tmpPartPath ) ; err != nil {
return nil , fmt . Errorf ( "cannot write metadata to destination part %q: %w" , tmpPartPath , err )
}
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
return & ph , nil
}
func ( tb * Table ) openCreatedPart ( ph * partHeader , pws [ ] * partWrapper , mpNew * inmemoryPart , tmpPartPath string , mergeIdx uint64 ) ( * partWrapper , error ) {
dstPartPath := ""
if mpNew == nil || ! areAllInmemoryParts ( pws ) {
// Either source or destination parts are located on disk.
// Create a transaction for atomic deleting of old parts and moving new part to its destination on disk.
var bb bytesutil . ByteBuffer
for _ , pw := range pws {
if pw . mp == nil {
fmt . Fprintf ( & bb , "%s\n" , pw . p . path )
}
}
dstPartPath = ph . Path ( tb . path , mergeIdx )
fmt . Fprintf ( & bb , "%s -> %s\n" , tmpPartPath , dstPartPath )
txnPath := fmt . Sprintf ( "%s/txn/%016X" , tb . path , mergeIdx )
if err := fs . WriteFileAtomically ( txnPath , bb . B , false ) ; err != nil {
return nil , fmt . Errorf ( "cannot create transaction file %q: %w" , txnPath , err )
}
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
// Run the created transaction.
if err := runTransaction ( & tb . snapshotLock , tb . path , txnPath ) ; err != nil {
return nil , fmt . Errorf ( "cannot execute transaction %q: %w" , txnPath , err )
}
}
// Open the created part.
if mpNew != nil {
// Open the created part from memory.
flushToDiskDeadline := getFlushToDiskDeadline ( pws )
pwNew := newPartWrapperFromInmemoryPart ( mpNew , flushToDiskDeadline )
return pwNew , nil
}
// Open the created part from disk.
pNew , err := openFilePart ( dstPartPath )
2019-05-22 23:16:55 +02:00
if err != nil {
2022-12-06 00:27:57 +01:00
return nil , fmt . Errorf ( "cannot open merged part %q: %w" , dstPartPath , err )
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
pwNew := & partWrapper {
p : pNew ,
2019-05-22 23:16:55 +02:00
refCount : 1 ,
}
2022-12-06 00:27:57 +01:00
return pwNew , nil
}
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
func areAllInmemoryParts ( pws [ ] * partWrapper ) bool {
for _ , pw := range pws {
if pw . mp == nil {
return false
}
}
return true
}
func ( tb * Table ) swapSrcWithDstParts ( pws [ ] * partWrapper , pwNew * partWrapper , dstPartType partType ) {
// Atomically unregister old parts and add new part to tb.
2019-05-22 23:16:55 +02:00
m := make ( map [ * partWrapper ] bool , len ( pws ) )
for _ , pw := range pws {
m [ pw ] = true
}
if len ( m ) != len ( pws ) {
2022-12-06 00:27:57 +01:00
logger . Panicf ( "BUG: %d duplicate parts found when merging %d parts" , len ( pws ) - len ( m ) , len ( pws ) )
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
removedInmemoryParts := 0
removedFileParts := 0
2019-05-22 23:16:55 +02:00
tb . partsLock . Lock ( )
2022-12-06 00:27:57 +01:00
tb . inmemoryParts , removedInmemoryParts = removeParts ( tb . inmemoryParts , m )
tb . fileParts , removedFileParts = removeParts ( tb . fileParts , m )
if pwNew != nil {
switch dstPartType {
case partInmemory :
tb . inmemoryParts = append ( tb . inmemoryParts , pwNew )
case partFile :
tb . fileParts = append ( tb . fileParts , pwNew )
default :
logger . Panicf ( "BUG: unknown partType=%d" , dstPartType )
}
}
2019-05-22 23:16:55 +02:00
tb . partsLock . Unlock ( )
2022-12-06 00:27:57 +01:00
removedParts := removedInmemoryParts + removedFileParts
2019-05-22 23:16:55 +02:00
if removedParts != len ( m ) {
2022-12-06 00:27:57 +01:00
logger . Panicf ( "BUG: unexpected number of parts removed; got %d, want %d" , removedParts , len ( m ) )
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
// Remove references from old parts.
2019-05-22 23:16:55 +02:00
for _ , pw := range pws {
pw . decRef ( )
}
2022-12-06 00:27:57 +01:00
}
2019-05-22 23:16:55 +02:00
2022-12-06 00:27:57 +01:00
func getPartsSize ( pws [ ] * partWrapper ) uint64 {
n := uint64 ( 0 )
for _ , pw := range pws {
n += pw . p . size
2019-05-22 23:16:55 +02:00
}
2022-12-06 00:27:57 +01:00
return n
2019-05-22 23:16:55 +02:00
}
2022-12-04 07:45:53 +01:00
func getCompressLevel ( itemsCount uint64 ) int {
2020-05-14 22:44:01 +02:00
if itemsCount <= 1 << 16 {
// -5 is the minimum supported compression for zstd.
// See https://github.com/facebook/zstd/releases/tag/v1.3.4
return - 5
}
if itemsCount <= 1 << 17 {
return - 4
}
if itemsCount <= 1 << 18 {
return - 3
}
if itemsCount <= 1 << 19 {
return - 2
}
if itemsCount <= 1 << 20 {
return - 1
}
if itemsCount <= 1 << 22 {
2019-05-22 23:16:55 +02:00
return 1
}
2020-05-14 22:44:01 +02:00
if itemsCount <= 1 << 25 {
2019-05-22 23:16:55 +02:00
return 2
}
2020-05-14 22:44:01 +02:00
if itemsCount <= 1 << 28 {
2019-05-22 23:16:55 +02:00
return 3
}
2020-05-14 22:44:01 +02:00
return 4
2019-05-22 23:16:55 +02:00
}
func ( tb * Table ) nextMergeIdx ( ) uint64 {
return atomic . AddUint64 ( & tb . mergeIdx , 1 )
}
2022-12-06 00:27:57 +01:00
var mergeWorkersLimitCh = make ( chan struct { } , cgroup . AvailableCPUs ( ) )
2019-05-22 23:16:55 +02:00
func openParts ( path string ) ( [ ] * partWrapper , error ) {
2019-11-02 01:26:02 +01:00
// The path can be missing after restoring from backup, so create it if needed.
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
return nil , err
}
2022-09-13 14:56:05 +02:00
fs . MustRemoveTemporaryDirs ( path )
2019-05-22 23:16:55 +02:00
d , err := os . Open ( path )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot open difrectory: %w" , err )
2019-05-22 23:16:55 +02:00
}
defer fs . MustClose ( d )
// Run remaining transactions and cleanup /txn and /tmp directories.
// Snapshots cannot be created yet, so use fakeSnapshotLock.
var fakeSnapshotLock sync . RWMutex
if err := runTransactions ( & fakeSnapshotLock , path ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot run transactions: %w" , err )
2019-05-22 23:16:55 +02:00
}
txnDir := path + "/txn"
2022-09-13 14:56:05 +02:00
fs . MustRemoveDirAtomic ( txnDir )
2019-05-22 23:16:55 +02:00
if err := fs . MkdirAllFailIfExist ( txnDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create %q: %w" , txnDir , err )
2019-05-22 23:16:55 +02:00
}
tmpDir := path + "/tmp"
2022-09-13 14:56:05 +02:00
fs . MustRemoveDirAtomic ( tmpDir )
2019-05-22 23:16:55 +02:00
if err := fs . MkdirAllFailIfExist ( tmpDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create %q: %w" , tmpDir , err )
2019-05-22 23:16:55 +02:00
}
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( path )
2019-05-22 23:16:55 +02:00
// Open parts.
fis , err := d . Readdir ( - 1 )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read directory: %w" , err )
2019-05-22 23:16:55 +02:00
}
var pws [ ] * partWrapper
for _ , fi := range fis {
if ! fs . IsDirOrSymlink ( fi ) {
// Skip non-directories.
continue
}
fn := fi . Name ( )
if isSpecialDir ( fn ) {
// Skip special dirs.
continue
}
partPath := path + "/" + fn
2021-04-22 11:58:53 +02:00
if fs . IsEmptyDir ( partPath ) {
// Remove empty directory, which can be left after unclean shutdown on NFS.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142
2022-09-13 14:56:05 +02:00
fs . MustRemoveDirAtomic ( partPath )
2021-04-22 11:58:53 +02:00
continue
}
2019-05-22 23:16:55 +02:00
p , err := openFilePart ( partPath )
if err != nil {
mustCloseParts ( pws )
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot open part %q: %w" , partPath , err )
2019-05-22 23:16:55 +02:00
}
pw := & partWrapper {
p : p ,
refCount : 1 ,
}
pws = append ( pws , pw )
}
return pws , nil
}
func mustCloseParts ( pws [ ] * partWrapper ) {
for _ , pw := range pws {
if pw . refCount != 1 {
logger . Panicf ( "BUG: unexpected refCount when closing part %q: %d; want 1" , pw . p . path , pw . refCount )
}
pw . p . MustClose ( )
}
}
// CreateSnapshotAt creates tb snapshot in the given dstDir.
//
// Snapshot is created using linux hard links, so it is usually created
// very quickly.
func ( tb * Table ) CreateSnapshotAt ( dstDir string ) error {
logger . Infof ( "creating Table snapshot of %q..." , tb . path )
startTime := time . Now ( )
var err error
srcDir := tb . path
srcDir , err = filepath . Abs ( srcDir )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain absolute dir for %q: %w" , srcDir , err )
2019-05-22 23:16:55 +02:00
}
dstDir , err = filepath . Abs ( dstDir )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain absolute dir for %q: %w" , dstDir , err )
2019-05-22 23:16:55 +02:00
}
if strings . HasPrefix ( dstDir , srcDir + "/" ) {
return fmt . Errorf ( "cannot create snapshot %q inside the data dir %q" , dstDir , srcDir )
}
// Flush inmemory items to disk.
2022-12-06 00:27:57 +01:00
tb . flushInmemoryItems ( )
2019-05-22 23:16:55 +02:00
// The snapshot must be created under the lock in order to prevent from
// concurrent modifications via runTransaction.
tb . snapshotLock . Lock ( )
defer tb . snapshotLock . Unlock ( )
if err := fs . MkdirAllFailIfExist ( dstDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot create snapshot dir %q: %w" , dstDir , err )
2019-05-22 23:16:55 +02:00
}
d , err := os . Open ( srcDir )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot open difrectory: %w" , err )
2019-05-22 23:16:55 +02:00
}
defer fs . MustClose ( d )
fis , err := d . Readdir ( - 1 )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot read directory: %w" , err )
2019-05-22 23:16:55 +02:00
}
for _ , fi := range fis {
2019-09-20 18:46:47 +02:00
fn := fi . Name ( )
2019-05-22 23:16:55 +02:00
if ! fs . IsDirOrSymlink ( fi ) {
2022-12-04 06:17:27 +01:00
// Skip non-directories.
2019-05-22 23:16:55 +02:00
continue
}
if isSpecialDir ( fn ) {
// Skip special dirs.
continue
}
srcPartPath := srcDir + "/" + fn
dstPartPath := dstDir + "/" + fn
if err := fs . HardLinkFiles ( srcPartPath , dstPartPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot create hard links from %q to %q: %w" , srcPartPath , dstPartPath , err )
2019-05-22 23:16:55 +02:00
}
}
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( dstDir )
2019-05-22 23:16:55 +02:00
parentDir := filepath . Dir ( dstDir )
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( parentDir )
2019-05-22 23:16:55 +02:00
2020-01-22 17:27:44 +01:00
logger . Infof ( "created Table snapshot of %q at %q in %.3f seconds" , srcDir , dstDir , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 23:16:55 +02:00
return nil
}
func runTransactions ( txnLock * sync . RWMutex , path string ) error {
2019-12-04 20:35:51 +01:00
// Wait until all the previous pending transaction deletions are finished.
pendingTxnDeletionsWG . Wait ( )
// Make sure all the current transaction deletions are finished before exiting.
defer pendingTxnDeletionsWG . Wait ( )
2019-05-22 23:16:55 +02:00
txnDir := path + "/txn"
d , err := os . Open ( txnDir )
if err != nil {
if os . IsNotExist ( err ) {
return nil
}
2022-12-06 00:27:57 +01:00
return fmt . Errorf ( "cannot open transaction dir: %w" , err )
2019-05-22 23:16:55 +02:00
}
defer fs . MustClose ( d )
fis , err := d . Readdir ( - 1 )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot read directory %q: %w" , d . Name ( ) , err )
2019-05-22 23:16:55 +02:00
}
// Sort transaction files by id, since transactions must be ordered.
sort . Slice ( fis , func ( i , j int ) bool {
return fis [ i ] . Name ( ) < fis [ j ] . Name ( )
} )
for _ , fi := range fis {
2019-08-12 13:44:24 +02:00
fn := fi . Name ( )
if fs . IsTemporaryFileName ( fn ) {
// Skip temporary files, which could be left after unclean shutdown.
continue
}
txnPath := txnDir + "/" + fn
2019-05-22 23:16:55 +02:00
if err := runTransaction ( txnLock , path , txnPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot run transaction from %q: %w" , txnPath , err )
2019-05-22 23:16:55 +02:00
}
}
return nil
}
func runTransaction ( txnLock * sync . RWMutex , pathPrefix , txnPath string ) error {
2020-09-17 01:05:54 +02:00
// The transaction must run under read lock in order to provide
2019-05-22 23:16:55 +02:00
// consistent snapshots with Table.CreateSnapshot().
txnLock . RLock ( )
defer txnLock . RUnlock ( )
2022-08-21 22:51:13 +02:00
data , err := os . ReadFile ( txnPath )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot read transaction file: %w" , err )
2019-05-22 23:16:55 +02:00
}
if len ( data ) > 0 && data [ len ( data ) - 1 ] == '\n' {
data = data [ : len ( data ) - 1 ]
}
paths := strings . Split ( string ( data ) , "\n" )
if len ( paths ) == 0 {
return fmt . Errorf ( "empty transaction" )
}
rmPaths := paths [ : len ( paths ) - 1 ]
mvPaths := strings . Split ( paths [ len ( paths ) - 1 ] , " -> " )
if len ( mvPaths ) != 2 {
return fmt . Errorf ( "invalid last line in the transaction file: got %q; must contain `srcPath -> dstPath`" , paths [ len ( paths ) - 1 ] )
}
// Remove old paths. It is OK if certain paths don't exist.
for _ , path := range rmPaths {
path , err := validatePath ( pathPrefix , path )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "invalid path to remove: %w" , err )
2019-05-22 23:16:55 +02:00
}
2022-09-13 14:56:05 +02:00
fs . MustRemoveDirAtomic ( path )
2019-05-22 23:16:55 +02:00
}
// Move the new part to new directory.
srcPath := mvPaths [ 0 ]
dstPath := mvPaths [ 1 ]
srcPath , err = validatePath ( pathPrefix , srcPath )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "invalid source path to rename: %w" , err )
2019-05-22 23:16:55 +02:00
}
dstPath , err = validatePath ( pathPrefix , dstPath )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "invalid destination path to rename: %w" , err )
2019-05-22 23:16:55 +02:00
}
if fs . IsPathExist ( srcPath ) {
if err := os . Rename ( srcPath , dstPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot rename %q to %q: %w" , srcPath , dstPath , err )
2019-05-22 23:16:55 +02:00
}
2019-12-09 14:42:57 +01:00
} else if ! fs . IsPathExist ( dstPath ) {
// Emit info message for the expected condition after unclean shutdown on NFS disk.
// The dstPath part may be missing because it could be already merged into bigger part
// while old source parts for the current txn weren't still deleted due to NFS locks.
logger . Infof ( "cannot find both source and destination paths: %q -> %q; this may be the case after unclean shutdown (OOM, `kill -9`, hard reset) on NFS disk" ,
srcPath , dstPath )
2019-05-22 23:16:55 +02:00
}
// Flush pathPrefix directory metadata to the underying storage.
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( pathPrefix )
2019-05-22 23:16:55 +02:00
2019-12-04 20:35:51 +01:00
pendingTxnDeletionsWG . Add ( 1 )
2019-12-02 20:34:35 +01:00
go func ( ) {
2019-12-04 20:35:51 +01:00
defer pendingTxnDeletionsWG . Done ( )
2019-12-02 20:34:35 +01:00
if err := os . Remove ( txnPath ) ; err != nil {
logger . Errorf ( "cannot remove transaction file %q: %s" , txnPath , err )
}
} ( )
2019-05-22 23:16:55 +02:00
return nil
}
2019-12-04 20:35:51 +01:00
var pendingTxnDeletionsWG syncwg . WaitGroup
2019-05-22 23:16:55 +02:00
func validatePath ( pathPrefix , path string ) ( string , error ) {
var err error
pathPrefix , err = filepath . Abs ( pathPrefix )
if err != nil {
2020-06-30 21:58:18 +02:00
return path , fmt . Errorf ( "cannot determine absolute path for pathPrefix=%q: %w" , pathPrefix , err )
2019-05-22 23:16:55 +02:00
}
path , err = filepath . Abs ( path )
if err != nil {
2020-06-30 21:58:18 +02:00
return path , fmt . Errorf ( "cannot determine absolute path for %q: %w" , path , err )
2019-05-22 23:16:55 +02:00
}
if ! strings . HasPrefix ( path , pathPrefix + "/" ) {
return path , fmt . Errorf ( "invalid path %q; must start with %q" , path , pathPrefix + "/" )
}
return path , nil
}
// getPartsToMerge returns optimal parts to merge from pws.
//
// if isFinal is set, then merge harder.
//
2021-08-25 08:35:03 +02:00
// The summary size of the returned parts must be smaller than the maxOutBytes.
func getPartsToMerge ( pws [ ] * partWrapper , maxOutBytes uint64 , isFinal bool ) [ ] * partWrapper {
2019-05-22 23:16:55 +02:00
pwsRemaining := make ( [ ] * partWrapper , 0 , len ( pws ) )
for _ , pw := range pws {
if ! pw . isInMerge {
pwsRemaining = append ( pwsRemaining , pw )
}
}
maxPartsToMerge := defaultPartsToMerge
var dst [ ] * partWrapper
if isFinal {
for len ( dst ) == 0 && maxPartsToMerge >= finalPartsToMerge {
2021-08-25 08:35:03 +02:00
dst = appendPartsToMerge ( dst [ : 0 ] , pwsRemaining , maxPartsToMerge , maxOutBytes )
2019-05-22 23:16:55 +02:00
maxPartsToMerge --
}
} else {
2021-08-25 08:35:03 +02:00
dst = appendPartsToMerge ( dst [ : 0 ] , pwsRemaining , maxPartsToMerge , maxOutBytes )
2019-05-22 23:16:55 +02:00
}
for _ , pw := range dst {
if pw . isInMerge {
logger . Panicf ( "BUG: partWrapper.isInMerge is already set" )
}
pw . isInMerge = true
}
return dst
}
2021-08-25 08:35:03 +02:00
// minMergeMultiplier is the minimum multiplier for the size of the output part
// compared to the size of the maximum input part for the merge.
//
// Higher value reduces write amplification (disk write IO induced by the merge),
// while increases the number of unmerged parts.
// The 1.7 is good enough for production workloads.
const minMergeMultiplier = 1.7
2019-05-22 23:16:55 +02:00
// appendPartsToMerge finds optimal parts to merge from src, appends
// them to dst and returns the result.
2021-08-25 08:35:03 +02:00
func appendPartsToMerge ( dst , src [ ] * partWrapper , maxPartsToMerge int , maxOutBytes uint64 ) [ ] * partWrapper {
2019-05-22 23:16:55 +02:00
if len ( src ) < 2 {
// There is no need in merging zero or one part :)
return dst
}
if maxPartsToMerge < 2 {
logger . Panicf ( "BUG: maxPartsToMerge cannot be smaller than 2; got %d" , maxPartsToMerge )
}
// Filter out too big parts.
// This should reduce N for O(n^2) algorithm below.
2021-08-25 08:35:03 +02:00
maxInPartBytes := uint64 ( float64 ( maxOutBytes ) / minMergeMultiplier )
2019-05-22 23:16:55 +02:00
tmp := make ( [ ] * partWrapper , 0 , len ( src ) )
for _ , pw := range src {
2021-08-25 08:35:03 +02:00
if pw . p . size > maxInPartBytes {
2019-05-22 23:16:55 +02:00
continue
}
tmp = append ( tmp , pw )
}
src = tmp
2022-12-06 00:27:57 +01:00
sortPartsForOptimalMerge ( src )
2019-05-22 23:16:55 +02:00
2020-12-18 19:00:06 +01:00
maxSrcParts := maxPartsToMerge
2021-07-02 16:24:14 +02:00
if maxSrcParts > len ( src ) {
2020-12-18 19:00:06 +01:00
maxSrcParts = len ( src )
2019-05-22 23:16:55 +02:00
}
2021-07-02 16:24:14 +02:00
minSrcParts := ( maxSrcParts + 1 ) / 2
if minSrcParts < 2 {
minSrcParts = 2
}
2019-05-22 23:16:55 +02:00
2020-12-18 19:00:06 +01:00
// Exhaustive search for parts giving the lowest write amplification when merged.
2019-05-22 23:16:55 +02:00
var pws [ ] * partWrapper
maxM := float64 ( 0 )
2020-12-18 19:00:06 +01:00
for i := minSrcParts ; i <= maxSrcParts ; i ++ {
2019-05-22 23:16:55 +02:00
for j := 0 ; j <= len ( src ) - i ; j ++ {
2019-10-29 11:45:19 +01:00
a := src [ j : j + i ]
2021-08-25 08:35:03 +02:00
if a [ 0 ] . p . size * uint64 ( len ( a ) ) < a [ len ( a ) - 1 ] . p . size {
// Do not merge parts with too big difference in size,
2020-12-18 19:00:06 +01:00
// since this results in unbalanced merges.
continue
}
2021-08-25 08:35:03 +02:00
outBytes := uint64 ( 0 )
2019-10-29 11:45:19 +01:00
for _ , pw := range a {
2021-08-25 08:35:03 +02:00
outBytes += pw . p . size
2019-05-22 23:16:55 +02:00
}
2021-08-25 08:35:03 +02:00
if outBytes > maxOutBytes {
2019-10-29 11:45:19 +01:00
// There is no sense in checking the remaining bigger parts.
break
2019-05-22 23:16:55 +02:00
}
2021-08-25 08:35:03 +02:00
m := float64 ( outBytes ) / float64 ( a [ len ( a ) - 1 ] . p . size )
2019-05-22 23:16:55 +02:00
if m < maxM {
continue
}
maxM = m
2019-10-29 11:45:19 +01:00
pws = a
2019-05-22 23:16:55 +02:00
}
}
2019-10-29 11:45:19 +01:00
minM := float64 ( maxPartsToMerge ) / 2
2021-08-25 08:35:03 +02:00
if minM < minMergeMultiplier {
minM = minMergeMultiplier
2019-05-22 23:16:55 +02:00
}
if maxM < minM {
2021-08-25 08:35:03 +02:00
// There is no sense in merging parts with too small m,
// since this leads to high disk write IO.
2019-05-22 23:16:55 +02:00
return dst
}
return append ( dst , pws ... )
}
2022-12-06 00:27:57 +01:00
func sortPartsForOptimalMerge ( pws [ ] * partWrapper ) {
// Sort src parts by size.
sort . Slice ( pws , func ( i , j int ) bool {
return pws [ i ] . p . size < pws [ j ] . p . size
} )
}
2019-05-22 23:16:55 +02:00
func removeParts ( pws [ ] * partWrapper , partsToRemove map [ * partWrapper ] bool ) ( [ ] * partWrapper , int ) {
dst := pws [ : 0 ]
for _ , pw := range pws {
2020-09-17 01:05:54 +02:00
if ! partsToRemove [ pw ] {
dst = append ( dst , pw )
2019-05-22 23:16:55 +02:00
}
}
2022-12-06 00:27:57 +01:00
for i := len ( dst ) ; i < len ( pws ) ; i ++ {
pws [ i ] = nil
}
return dst , len ( pws ) - len ( dst )
2019-05-22 23:16:55 +02:00
}
func isSpecialDir ( name string ) bool {
// Snapshots and cache dirs aren't used anymore.
// Keep them here for backwards compatibility.
return name == "tmp" || name == "txn" || name == "snapshots" || name == "cache"
}