2019-05-22 23:16:55 +02:00
package mergeset
import (
2020-09-17 02:02:35 +02:00
"errors"
2019-05-22 23:16:55 +02:00
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-05-14 21:01:51 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-09-09 10:41:30 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2020-07-22 23:58:48 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
)
// maxParts is the maximum number of parts in the table.
//
// This number may be reached when the insertion pace outreaches merger pace.
const maxParts = 512
// Default number of parts to merge at once.
//
// This number has been obtained empirically - it gives the lowest possible overhead.
// See appendPartsToMerge tests for details.
const defaultPartsToMerge = 15
// The final number of parts to merge at once.
//
// It must be smaller than defaultPartsToMerge.
// Lower value improves select performance at the cost of increased
// write amplification.
const finalPartsToMerge = 2
2021-08-25 08:35:03 +02:00
// maxPartSize is the maximum part size in bytes.
2019-05-22 23:16:55 +02:00
//
2021-08-25 08:35:03 +02:00
// This number should be limited by the amount of time required to merge parts of this summary size.
// The required time shouldn't exceed a day.
const maxPartSize = 400e9
2019-05-22 23:16:55 +02:00
// maxItemsPerCachedPart is the maximum items per created part by the merge,
// which must be cached in the OS page cache.
//
// Such parts are usually frequently accessed, so it is good to cache their
// contents in OS page cache.
2019-09-09 10:41:30 +02:00
func maxItemsPerCachedPart ( ) uint64 {
mem := memory . Remaining ( )
// Production data shows that each item occupies ~4 bytes in the compressed part.
// It is expected no more than defaultPartsToMerge/2 parts exist
// in the OS page cache before they are merged into bigger part.
// Halft of the remaining RAM must be left for lib/storage parts,
// so the maxItems is calculated using the below code:
maxItems := uint64 ( mem ) / ( 4 * defaultPartsToMerge )
if maxItems < 1e6 {
maxItems = 1e6
}
return maxItems
}
2019-05-22 23:16:55 +02:00
// The interval for flushing (converting) recent raw items into parts,
// so they become visible to search.
const rawItemsFlushInterval = time . Second
// Table represents mergeset table.
type Table struct {
2019-10-17 17:22:56 +02:00
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
2022-04-21 12:18:05 +02:00
activeMerges uint64
mergesCount uint64
itemsMerged uint64
assistedMerges uint64
itemsAdded uint64
itemsAddedSizeBytes uint64
2019-10-17 17:22:56 +02:00
mergeIdx uint64
2019-05-22 23:16:55 +02:00
path string
2021-07-06 11:17:15 +02:00
flushCallback func ( )
flushCallbackWorkerWG sync . WaitGroup
needFlushCallbackCall uint32
2019-08-29 13:39:05 +02:00
2019-09-20 18:46:47 +02:00
prepareBlock PrepareBlockCallback
2022-06-01 13:21:12 +02:00
isReadOnly * uint32
2019-09-20 18:46:47 +02:00
2019-05-22 23:16:55 +02:00
partsLock sync . Mutex
parts [ ] * partWrapper
2021-04-27 14:36:31 +02:00
// rawItems contains recently added items that haven't been converted to parts yet.
//
// rawItems aren't used in search for performance reasons
rawItems rawItemsShards
2019-05-22 23:16:55 +02:00
snapshotLock sync . RWMutex
flockF * os . File
stopCh chan struct { }
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
partMergersWG syncwg . WaitGroup
rawItemsFlusherWG sync . WaitGroup
2019-09-20 18:46:47 +02:00
convertersWG sync . WaitGroup
2019-05-22 23:16:55 +02:00
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
rawItemsPendingFlushesWG syncwg . WaitGroup
}
2021-04-27 14:36:31 +02:00
type rawItemsShards struct {
2021-04-27 15:41:22 +02:00
shardIdx uint32
2021-04-27 14:36:31 +02:00
// shards reduce lock contention when adding rows on multi-CPU systems.
shards [ ] rawItemsShard
}
// The number of shards for rawItems per table.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
2022-04-06 18:35:50 +02:00
var rawItemsShardsPerTable = func ( ) int {
cpus := cgroup . AvailableCPUs ( )
multiplier := cpus
if multiplier > 16 {
multiplier = 16
}
2022-04-07 14:21:17 +02:00
return ( cpus * multiplier + 1 ) / 2
2022-04-06 18:35:50 +02:00
} ( )
2021-04-27 14:36:31 +02:00
2022-04-06 18:35:50 +02:00
const maxBlocksPerShard = 256
2021-04-27 14:36:31 +02:00
func ( riss * rawItemsShards ) init ( ) {
riss . shards = make ( [ ] rawItemsShard , rawItemsShardsPerTable )
}
func ( riss * rawItemsShards ) addItems ( tb * Table , items [ ] [ ] byte ) error {
2021-04-27 15:41:22 +02:00
n := atomic . AddUint32 ( & riss . shardIdx , 1 )
2021-04-27 14:36:31 +02:00
shards := riss . shards
2021-04-27 15:41:22 +02:00
idx := n % uint32 ( len ( shards ) )
2021-04-27 14:36:31 +02:00
shard := & shards [ idx ]
return shard . addItems ( tb , items )
}
func ( riss * rawItemsShards ) Len ( ) int {
n := 0
for i := range riss . shards {
n += riss . shards [ i ] . Len ( )
}
return n
}
type rawItemsShard struct {
mu sync . Mutex
ibs [ ] * inmemoryBlock
lastFlushTime uint64
}
func ( ris * rawItemsShard ) Len ( ) int {
ris . mu . Lock ( )
n := 0
for _ , ib := range ris . ibs {
n += len ( ib . items )
}
ris . mu . Unlock ( )
return n
}
func ( ris * rawItemsShard ) addItems ( tb * Table , items [ ] [ ] byte ) error {
var err error
2021-06-17 12:42:32 +02:00
var blocksToFlush [ ] * inmemoryBlock
2021-04-27 14:36:31 +02:00
ris . mu . Lock ( )
ibs := ris . ibs
if len ( ibs ) == 0 {
2021-05-28 00:09:32 +02:00
ib := getInmemoryBlock ( )
2021-04-27 14:36:31 +02:00
ibs = append ( ibs , ib )
ris . ibs = ibs
}
ib := ibs [ len ( ibs ) - 1 ]
for _ , item := range items {
if ! ib . Add ( item ) {
2021-05-28 00:09:32 +02:00
ib = getInmemoryBlock ( )
2021-04-27 14:36:31 +02:00
if ! ib . Add ( item ) {
2021-05-28 00:09:32 +02:00
putInmemoryBlock ( ib )
2021-04-27 14:36:31 +02:00
err = fmt . Errorf ( "cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d" , item , len ( item ) )
break
}
ibs = append ( ibs , ib )
ris . ibs = ibs
}
}
if len ( ibs ) >= maxBlocksPerShard {
2021-06-17 12:42:32 +02:00
blocksToFlush = append ( blocksToFlush , ibs ... )
for i := range ibs {
ibs [ i ] = nil
}
ris . ibs = ibs [ : 0 ]
2021-04-27 14:36:31 +02:00
ris . lastFlushTime = fasttime . UnixTimestamp ( )
}
ris . mu . Unlock ( )
2021-07-06 11:17:15 +02:00
tb . mergeRawItemsBlocks ( blocksToFlush , false )
2021-04-27 14:36:31 +02:00
return err
}
2019-05-22 23:16:55 +02:00
type partWrapper struct {
p * part
mp * inmemoryPart
refCount uint64
isInMerge bool
}
func ( pw * partWrapper ) incRef ( ) {
atomic . AddUint64 ( & pw . refCount , 1 )
}
func ( pw * partWrapper ) decRef ( ) {
n := atomic . AddUint64 ( & pw . refCount , ^ uint64 ( 0 ) )
if int64 ( n ) < 0 {
logger . Panicf ( "BUG: pw.refCount must be bigger than 0; got %d" , int64 ( n ) )
}
if n > 0 {
return
}
if pw . mp != nil {
2022-03-03 15:46:35 +01:00
// Do not return pw.mp to pool via putInmemoryPart(),
// since pw.mp size may be too big compared to other entries stored in the pool.
// This may result in increased memory usage because of high fragmentation.
2019-05-22 23:16:55 +02:00
pw . mp = nil
}
pw . p . MustClose ( )
pw . p = nil
}
// OpenTable opens a table on the given path.
//
2019-08-29 13:39:05 +02:00
// Optional flushCallback is called every time new data batch is flushed
// to the underlying storage and becomes visible to search.
//
2019-09-20 18:46:47 +02:00
// Optional prepareBlock is called during merge before flushing the prepared block
// to persistent storage.
//
2019-05-22 23:16:55 +02:00
// The table is created if it doesn't exist yet.
2022-06-01 13:21:12 +02:00
func OpenTable ( path string , flushCallback func ( ) , prepareBlock PrepareBlockCallback , isReadOnly * uint32 ) ( * Table , error ) {
2019-05-22 23:16:55 +02:00
path = filepath . Clean ( path )
logger . Infof ( "opening table %q..." , path )
startTime := time . Now ( )
// Create a directory for the table if it doesn't exist yet.
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create directory %q: %w" , path , err )
2019-05-22 23:16:55 +02:00
}
// Protect from concurrent opens.
2019-08-13 00:45:22 +02:00
flockF , err := fs . CreateFlockFile ( path )
2019-05-22 23:16:55 +02:00
if err != nil {
2019-08-13 00:45:22 +02:00
return nil , err
2019-05-22 23:16:55 +02:00
}
// Open table parts.
pws , err := openParts ( path )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot open table parts at %q: %w" , path , err )
2019-05-22 23:16:55 +02:00
}
tb := & Table {
2019-08-29 13:39:05 +02:00
path : path ,
flushCallback : flushCallback ,
2019-09-20 18:46:47 +02:00
prepareBlock : prepareBlock ,
2022-06-01 13:21:12 +02:00
isReadOnly : isReadOnly ,
2019-08-29 13:39:05 +02:00
parts : pws ,
mergeIdx : uint64 ( time . Now ( ) . UnixNano ( ) ) ,
flockF : flockF ,
stopCh : make ( chan struct { } ) ,
2019-05-22 23:16:55 +02:00
}
2021-04-27 14:36:31 +02:00
tb . rawItems . init ( )
2019-05-22 23:16:55 +02:00
tb . startPartMergers ( )
tb . startRawItemsFlusher ( )
var m TableMetrics
tb . UpdateMetrics ( & m )
2020-01-22 17:27:44 +01:00
logger . Infof ( "table %q has been opened in %.3f seconds; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d" ,
path , time . Since ( startTime ) . Seconds ( ) , m . PartsCount , m . BlocksCount , m . ItemsCount , m . SizeBytes )
2019-05-22 23:16:55 +02:00
2019-09-20 18:46:47 +02:00
tb . convertersWG . Add ( 1 )
go func ( ) {
tb . convertToV1280 ( )
tb . convertersWG . Done ( )
} ( )
2021-07-06 11:17:15 +02:00
if flushCallback != nil {
tb . flushCallbackWorkerWG . Add ( 1 )
go func ( ) {
// call flushCallback once per 10 seconds in order to improve the effectiveness of caches,
// which are reset by the flushCallback.
tc := time . NewTicker ( 10 * time . Second )
for {
select {
case <- tb . stopCh :
tb . flushCallback ( )
tb . flushCallbackWorkerWG . Done ( )
return
case <- tc . C :
if atomic . CompareAndSwapUint32 ( & tb . needFlushCallbackCall , 1 , 0 ) {
tb . flushCallback ( )
}
}
}
} ( )
}
2019-05-22 23:16:55 +02:00
return tb , nil
}
// MustClose closes the table.
func ( tb * Table ) MustClose ( ) {
close ( tb . stopCh )
logger . Infof ( "waiting for raw items flusher to stop on %q..." , tb . path )
startTime := time . Now ( )
tb . rawItemsFlusherWG . Wait ( )
2020-01-22 17:27:44 +01:00
logger . Infof ( "raw items flusher stopped in %.3f seconds on %q" , time . Since ( startTime ) . Seconds ( ) , tb . path )
2019-05-22 23:16:55 +02:00
2019-09-20 18:46:47 +02:00
logger . Infof ( "waiting for converters to stop on %q..." , tb . path )
startTime = time . Now ( )
tb . convertersWG . Wait ( )
2020-01-22 17:27:44 +01:00
logger . Infof ( "converters stopped in %.3f seconds on %q" , time . Since ( startTime ) . Seconds ( ) , tb . path )
2019-09-20 18:46:47 +02:00
2019-05-22 23:16:55 +02:00
logger . Infof ( "waiting for part mergers to stop on %q..." , tb . path )
startTime = time . Now ( )
tb . partMergersWG . Wait ( )
2020-01-22 17:27:44 +01:00
logger . Infof ( "part mergers stopped in %.3f seconds on %q" , time . Since ( startTime ) . Seconds ( ) , tb . path )
2019-05-22 23:16:55 +02:00
logger . Infof ( "flushing inmemory parts to files on %q..." , tb . path )
startTime = time . Now ( )
// Flush raw items the last time before exit.
tb . flushRawItems ( true )
// Flush inmemory parts to disk.
var pws [ ] * partWrapper
tb . partsLock . Lock ( )
for _ , pw := range tb . parts {
if pw . mp == nil {
continue
}
if pw . isInMerge {
logger . Panicf ( "BUG: the inmemory part %s mustn't be in merge after stopping parts merger in %q" , & pw . mp . ph , tb . path )
}
pw . isInMerge = true
pws = append ( pws , pw )
}
tb . partsLock . Unlock ( )
2019-09-20 18:46:47 +02:00
if err := tb . mergePartsOptimal ( pws , nil ) ; err != nil {
2019-05-22 23:16:55 +02:00
logger . Panicf ( "FATAL: cannot flush inmemory parts to files in %q: %s" , tb . path , err )
}
2020-01-22 17:27:44 +01:00
logger . Infof ( "%d inmemory parts have been flushed to files in %.3f seconds on %q" , len ( pws ) , time . Since ( startTime ) . Seconds ( ) , tb . path )
2019-05-22 23:16:55 +02:00
2021-07-06 11:17:15 +02:00
logger . Infof ( "waiting for flush callback worker to stop on %q..." , tb . path )
startTime = time . Now ( )
tb . flushCallbackWorkerWG . Wait ( )
logger . Infof ( "flush callback worker stopped in %.3f seconds on %q" , time . Since ( startTime ) . Seconds ( ) , tb . path )
2019-05-22 23:16:55 +02:00
// Remove references to parts from the tb, so they may be eventually closed
2019-05-25 20:51:11 +02:00
// after all the searches are done.
2019-05-22 23:16:55 +02:00
tb . partsLock . Lock ( )
parts := tb . parts
tb . parts = nil
tb . partsLock . Unlock ( )
for _ , pw := range parts {
pw . decRef ( )
}
// Release flockF
if err := tb . flockF . Close ( ) ; err != nil {
logger . Panicf ( "FATAL:cannot close %q: %s" , tb . flockF . Name ( ) , err )
}
}
// Path returns the path to tb on the filesystem.
func ( tb * Table ) Path ( ) string {
return tb . path
}
// TableMetrics contains essential metrics for the Table.
type TableMetrics struct {
2022-04-21 12:18:05 +02:00
ActiveMerges uint64
MergesCount uint64
ItemsMerged uint64
AssistedMerges uint64
ItemsAdded uint64
ItemsAddedSizeBytes uint64
2019-05-22 23:16:55 +02:00
PendingItems uint64
PartsCount uint64
BlocksCount uint64
ItemsCount uint64
2019-07-04 18:09:40 +02:00
SizeBytes uint64
2019-05-22 23:16:55 +02:00
2021-12-02 09:28:45 +01:00
DataBlocksCacheSize uint64
DataBlocksCacheSizeBytes uint64
DataBlocksCacheSizeMaxBytes uint64
DataBlocksCacheRequests uint64
DataBlocksCacheMisses uint64
IndexBlocksCacheSize uint64
IndexBlocksCacheSizeBytes uint64
IndexBlocksCacheSizeMaxBytes uint64
IndexBlocksCacheRequests uint64
IndexBlocksCacheMisses uint64
2019-05-22 23:16:55 +02:00
PartsRefCount uint64
}
// UpdateMetrics updates m with metrics from tb.
func ( tb * Table ) UpdateMetrics ( m * TableMetrics ) {
m . ActiveMerges += atomic . LoadUint64 ( & tb . activeMerges )
m . MergesCount += atomic . LoadUint64 ( & tb . mergesCount )
m . ItemsMerged += atomic . LoadUint64 ( & tb . itemsMerged )
m . AssistedMerges += atomic . LoadUint64 ( & tb . assistedMerges )
2022-04-21 12:18:05 +02:00
m . ItemsAdded += atomic . LoadUint64 ( & tb . itemsAdded )
m . ItemsAddedSizeBytes += atomic . LoadUint64 ( & tb . itemsAddedSizeBytes )
2019-05-22 23:16:55 +02:00
2021-04-27 14:36:31 +02:00
m . PendingItems += uint64 ( tb . rawItems . Len ( ) )
2019-05-22 23:16:55 +02:00
tb . partsLock . Lock ( )
m . PartsCount += uint64 ( len ( tb . parts ) )
for _ , pw := range tb . parts {
p := pw . p
m . BlocksCount += p . ph . blocksCount
m . ItemsCount += p . ph . itemsCount
2019-07-04 18:09:40 +02:00
m . SizeBytes += p . size
2019-05-22 23:16:55 +02:00
m . PartsRefCount += atomic . LoadUint64 ( & pw . refCount )
}
tb . partsLock . Unlock ( )
2022-01-20 17:34:59 +01:00
m . DataBlocksCacheSize = uint64 ( ibCache . Len ( ) )
m . DataBlocksCacheSizeBytes = uint64 ( ibCache . SizeBytes ( ) )
m . DataBlocksCacheSizeMaxBytes = uint64 ( ibCache . SizeMaxBytes ( ) )
m . DataBlocksCacheRequests = ibCache . Requests ( )
m . DataBlocksCacheMisses = ibCache . Misses ( )
m . IndexBlocksCacheSize = uint64 ( idxbCache . Len ( ) )
m . IndexBlocksCacheSizeBytes = uint64 ( idxbCache . SizeBytes ( ) )
m . IndexBlocksCacheSizeMaxBytes = uint64 ( idxbCache . SizeMaxBytes ( ) )
m . IndexBlocksCacheRequests = idxbCache . Requests ( )
m . IndexBlocksCacheMisses = idxbCache . Misses ( )
2019-05-22 23:16:55 +02:00
}
// AddItems adds the given items to the tb.
func ( tb * Table ) AddItems ( items [ ] [ ] byte ) error {
2021-04-27 14:36:31 +02:00
if err := tb . rawItems . addItems ( tb , items ) ; err != nil {
return fmt . Errorf ( "cannot insert data into %q: %w" , tb . path , err )
2019-05-22 23:16:55 +02:00
}
2022-04-21 12:18:05 +02:00
atomic . AddUint64 ( & tb . itemsAdded , uint64 ( len ( items ) ) )
n := 0
for _ , item := range items {
n += len ( item )
}
atomic . AddUint64 ( & tb . itemsAddedSizeBytes , uint64 ( n ) )
2021-04-27 14:36:31 +02:00
return nil
2019-05-22 23:16:55 +02:00
}
// getParts appends parts snapshot to dst and returns it.
//
// The appended parts must be released with putParts.
func ( tb * Table ) getParts ( dst [ ] * partWrapper ) [ ] * partWrapper {
tb . partsLock . Lock ( )
for _ , pw := range tb . parts {
pw . incRef ( )
}
dst = append ( dst , tb . parts ... )
tb . partsLock . Unlock ( )
return dst
}
// putParts releases the given pws obtained via getParts.
func ( tb * Table ) putParts ( pws [ ] * partWrapper ) {
for _ , pw := range pws {
pw . decRef ( )
}
}
func ( tb * Table ) startRawItemsFlusher ( ) {
tb . rawItemsFlusherWG . Add ( 1 )
go func ( ) {
tb . rawItemsFlusher ( )
tb . rawItemsFlusherWG . Done ( )
} ( )
}
func ( tb * Table ) rawItemsFlusher ( ) {
2020-02-13 11:55:58 +01:00
ticker := time . NewTicker ( rawItemsFlushInterval )
defer ticker . Stop ( )
2019-05-22 23:16:55 +02:00
for {
select {
case <- tb . stopCh :
return
2020-02-13 11:55:58 +01:00
case <- ticker . C :
tb . flushRawItems ( false )
2019-05-22 23:16:55 +02:00
}
}
}
2019-09-20 18:46:47 +02:00
const convertToV1280FileName = "converted-to-v1.28.0"
func ( tb * Table ) convertToV1280 ( ) {
// Convert tag->metricID rows into tag->metricIDs rows when upgrading to v1.28.0+.
flagFilePath := tb . path + "/" + convertToV1280FileName
if fs . IsPathExist ( flagFilePath ) {
// The conversion has been already performed.
return
}
getAllPartsForMerge := func ( ) [ ] * partWrapper {
var pws [ ] * partWrapper
tb . partsLock . Lock ( )
for _ , pw := range tb . parts {
if pw . isInMerge {
continue
}
pw . isInMerge = true
pws = append ( pws , pw )
}
tb . partsLock . Unlock ( )
return pws
}
pws := getAllPartsForMerge ( )
if len ( pws ) > 0 {
logger . Infof ( "started round 1 of background conversion of %q to v1.28.0 format; merge %d parts" , tb . path , len ( pws ) )
startTime := time . Now ( )
if err := tb . mergePartsOptimal ( pws , tb . stopCh ) ; err != nil {
logger . Errorf ( "failed round 1 of background conversion of %q to v1.28.0 format: %s" , tb . path , err )
return
}
2020-01-22 17:27:44 +01:00
logger . Infof ( "finished round 1 of background conversion of %q to v1.28.0 format in %.3f seconds" , tb . path , time . Since ( startTime ) . Seconds ( ) )
2019-09-20 18:46:47 +02:00
// The second round is needed in order to merge small blocks
// with tag->metricIDs rows left after the first round.
pws = getAllPartsForMerge ( )
logger . Infof ( "started round 2 of background conversion of %q to v1.28.0 format; merge %d parts" , tb . path , len ( pws ) )
startTime = time . Now ( )
if len ( pws ) > 0 {
if err := tb . mergePartsOptimal ( pws , tb . stopCh ) ; err != nil {
logger . Errorf ( "failed round 2 of background conversion of %q to v1.28.0 format: %s" , tb . path , err )
return
}
}
2020-01-22 17:27:44 +01:00
logger . Infof ( "finished round 2 of background conversion of %q to v1.28.0 format in %.3f seconds" , tb . path , time . Since ( startTime ) . Seconds ( ) )
2019-09-20 18:46:47 +02:00
}
if err := fs . WriteFileAtomically ( flagFilePath , [ ] byte ( "ok" ) ) ; err != nil {
logger . Panicf ( "FATAL: cannot create %q: %s" , flagFilePath , err )
}
}
func ( tb * Table ) mergePartsOptimal ( pws [ ] * partWrapper , stopCh <- chan struct { } ) error {
2020-09-17 02:02:35 +02:00
defer func ( ) {
// Remove isInMerge flag from pws.
tb . partsLock . Lock ( )
for _ , pw := range pws {
// Do not check for pws.isInMerge set to false,
// since it may be set to false in mergeParts below.
pw . isInMerge = false
}
tb . partsLock . Unlock ( )
} ( )
2019-05-22 23:16:55 +02:00
for len ( pws ) > defaultPartsToMerge {
2019-09-20 18:46:47 +02:00
if err := tb . mergeParts ( pws [ : defaultPartsToMerge ] , stopCh , false ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot merge %d parts: %w" , defaultPartsToMerge , err )
2019-05-22 23:16:55 +02:00
}
pws = pws [ defaultPartsToMerge : ]
}
2020-09-17 02:02:35 +02:00
if len ( pws ) == 0 {
return nil
}
if err := tb . mergeParts ( pws , stopCh , false ) ; err != nil {
return fmt . Errorf ( "cannot merge %d parts: %w" , len ( pws ) , err )
2019-05-22 23:16:55 +02:00
}
return nil
}
// DebugFlush flushes all the added items to the storage,
// so they become visible to search.
//
// This function is only for debugging and testing.
func ( tb * Table ) DebugFlush ( ) {
tb . flushRawItems ( true )
// Wait for background flushers to finish.
tb . rawItemsPendingFlushesWG . Wait ( )
}
func ( tb * Table ) flushRawItems ( isFinal bool ) {
2021-04-27 14:36:31 +02:00
tb . rawItems . flush ( tb , isFinal )
}
func ( riss * rawItemsShards ) flush ( tb * Table , isFinal bool ) {
2019-05-22 23:16:55 +02:00
tb . rawItemsPendingFlushesWG . Add ( 1 )
defer tb . rawItemsPendingFlushesWG . Done ( )
2021-06-17 12:42:32 +02:00
var blocksToFlush [ ] * inmemoryBlock
2021-04-27 14:36:31 +02:00
for i := range riss . shards {
2021-06-17 12:42:32 +02:00
blocksToFlush = riss . shards [ i ] . appendBlocksToFlush ( blocksToFlush , tb , isFinal )
2021-04-27 14:36:31 +02:00
}
2021-07-06 11:17:15 +02:00
tb . mergeRawItemsBlocks ( blocksToFlush , isFinal )
2021-04-27 14:36:31 +02:00
}
2021-06-17 12:42:32 +02:00
func ( ris * rawItemsShard ) appendBlocksToFlush ( dst [ ] * inmemoryBlock , tb * Table , isFinal bool ) [ ] * inmemoryBlock {
2020-05-14 21:01:51 +02:00
currentTime := fasttime . UnixTimestamp ( )
flushSeconds := int64 ( rawItemsFlushInterval . Seconds ( ) )
if flushSeconds <= 0 {
flushSeconds = 1
}
2019-05-22 23:16:55 +02:00
2021-04-27 14:36:31 +02:00
ris . mu . Lock ( )
if isFinal || currentTime - ris . lastFlushTime > uint64 ( flushSeconds ) {
2021-06-17 12:42:32 +02:00
ibs := ris . ibs
dst = append ( dst , ibs ... )
for i := range ibs {
ibs [ i ] = nil
}
ris . ibs = ibs [ : 0 ]
2021-04-27 14:36:31 +02:00
ris . lastFlushTime = currentTime
2019-05-22 23:16:55 +02:00
}
2021-04-27 14:36:31 +02:00
ris . mu . Unlock ( )
2019-05-22 23:16:55 +02:00
2021-06-17 12:42:32 +02:00
return dst
2019-05-22 23:16:55 +02:00
}
2021-07-06 11:17:15 +02:00
func ( tb * Table ) mergeRawItemsBlocks ( ibs [ ] * inmemoryBlock , isFinal bool ) {
2021-06-17 12:42:32 +02:00
if len ( ibs ) == 0 {
return
}
2019-05-22 23:16:55 +02:00
tb . partMergersWG . Add ( 1 )
defer tb . partMergersWG . Done ( )
2021-06-17 12:42:32 +02:00
pws := make ( [ ] * partWrapper , 0 , ( len ( ibs ) + defaultPartsToMerge - 1 ) / defaultPartsToMerge )
var pwsLock sync . Mutex
var wg sync . WaitGroup
for len ( ibs ) > 0 {
2019-05-22 23:16:55 +02:00
n := defaultPartsToMerge
2021-06-17 12:42:32 +02:00
if n > len ( ibs ) {
n = len ( ibs )
2019-05-22 23:16:55 +02:00
}
2021-06-17 12:42:32 +02:00
wg . Add ( 1 )
go func ( ibsPart [ ] * inmemoryBlock ) {
defer wg . Done ( )
pw := tb . mergeInmemoryBlocks ( ibsPart )
if pw == nil {
return
}
pw . isInMerge = true
pwsLock . Lock ( )
pws = append ( pws , pw )
pwsLock . Unlock ( )
} ( ibs [ : n ] )
ibs = ibs [ n : ]
2019-05-22 23:16:55 +02:00
}
2021-06-17 12:42:32 +02:00
wg . Wait ( )
2019-05-22 23:16:55 +02:00
if len ( pws ) > 0 {
if err := tb . mergeParts ( pws , nil , true ) ; err != nil {
logger . Panicf ( "FATAL: cannot merge raw parts: %s" , err )
}
2019-08-29 13:39:05 +02:00
if tb . flushCallback != nil {
2021-07-06 11:17:15 +02:00
if isFinal {
tb . flushCallback ( )
} else {
atomic . CompareAndSwapUint32 ( & tb . needFlushCallbackCall , 0 , 1 )
}
2019-08-29 13:39:05 +02:00
}
2019-05-22 23:16:55 +02:00
}
for {
tb . partsLock . Lock ( )
ok := len ( tb . parts ) <= maxParts
tb . partsLock . Unlock ( )
if ok {
return
}
// The added part exceeds maxParts count. Assist with merging other parts.
2020-07-22 23:58:48 +02:00
//
// Prioritize assisted merges over searches.
storagepacelimiter . Search . Inc ( )
2019-09-19 20:48:14 +02:00
err := tb . mergeExistingParts ( false )
2020-07-22 23:58:48 +02:00
storagepacelimiter . Search . Dec ( )
2019-05-22 23:16:55 +02:00
if err == nil {
atomic . AddUint64 ( & tb . assistedMerges , 1 )
continue
}
2020-09-17 02:02:35 +02:00
if errors . Is ( err , errNothingToMerge ) || errors . Is ( err , errForciblyStopped ) {
2019-05-22 23:16:55 +02:00
return
}
logger . Panicf ( "FATAL: cannot merge small parts: %s" , err )
}
}
2021-06-17 12:42:32 +02:00
func ( tb * Table ) mergeInmemoryBlocks ( ibs [ ] * inmemoryBlock ) * partWrapper {
2022-07-27 22:47:18 +02:00
atomic . AddUint64 ( & tb . mergesCount , 1 )
atomic . AddUint64 ( & tb . activeMerges , 1 )
defer atomic . AddUint64 ( & tb . activeMerges , ^ uint64 ( 0 ) )
// Prepare blockStreamReaders for source blocks.
bsrs := make ( [ ] * blockStreamReader , 0 , len ( ibs ) )
2021-06-17 12:42:32 +02:00
for _ , ib := range ibs {
2019-05-22 23:16:55 +02:00
if len ( ib . items ) == 0 {
continue
}
2022-07-27 22:47:18 +02:00
bsr := getBlockStreamReader ( )
bsr . InitFromInmemoryBlock ( ib )
2021-05-28 00:09:32 +02:00
putInmemoryBlock ( ib )
2022-07-27 22:47:18 +02:00
bsrs = append ( bsrs , bsr )
2020-02-13 13:06:51 +01:00
}
2022-07-27 22:47:18 +02:00
if len ( bsrs ) == 0 {
2020-02-13 13:06:51 +01:00
return nil
}
2022-07-27 22:47:18 +02:00
if len ( bsrs ) == 1 {
2020-02-13 13:06:51 +01:00
// Nothing to merge. Just return a single inmemory part.
2022-08-04 17:22:41 +02:00
mp := & inmemoryPart { }
2022-07-27 22:47:18 +02:00
mp . Init ( & bsrs [ 0 ] . Block )
2019-05-22 23:16:55 +02:00
p := mp . NewPart ( )
2020-02-13 13:06:51 +01:00
return & partWrapper {
2019-05-22 23:16:55 +02:00
p : p ,
mp : mp ,
refCount : 1 ,
}
}
// Prepare blockStreamWriter for destination part.
bsw := getBlockStreamWriter ( )
2022-03-03 15:46:35 +01:00
mpDst := & inmemoryPart { }
2020-05-15 12:11:30 +02:00
bsw . InitFromInmemoryPart ( mpDst )
2019-05-22 23:16:55 +02:00
// Merge parts.
// The merge shouldn't be interrupted by stopCh,
// since it may be final after stopCh is closed.
2020-07-30 18:57:25 +02:00
err := mergeBlockStreams ( & mpDst . ph , bsw , bsrs , tb . prepareBlock , nil , & tb . itemsMerged )
2020-07-22 23:58:48 +02:00
if err != nil {
2019-05-22 23:16:55 +02:00
logger . Panicf ( "FATAL: cannot merge inmemoryBlocks: %s" , err )
}
putBlockStreamWriter ( bsw )
for _ , bsr := range bsrs {
putBlockStreamReader ( bsr )
}
p := mpDst . NewPart ( )
return & partWrapper {
p : p ,
mp : mpDst ,
refCount : 1 ,
}
}
func ( tb * Table ) startPartMergers ( ) {
2019-10-29 11:45:19 +01:00
for i := 0 ; i < mergeWorkersCount ; i ++ {
2019-05-22 23:16:55 +02:00
tb . partMergersWG . Add ( 1 )
go func ( ) {
if err := tb . partMerger ( ) ; err != nil {
logger . Panicf ( "FATAL: unrecoverable error when merging parts in %q: %s" , tb . path , err )
}
tb . partMergersWG . Done ( )
} ( )
}
}
2022-06-01 13:21:12 +02:00
func ( tb * Table ) canBackgroundMerge ( ) bool {
return atomic . LoadUint32 ( tb . isReadOnly ) == 0
}
2019-09-19 20:48:14 +02:00
func ( tb * Table ) mergeExistingParts ( isFinal bool ) error {
2022-06-01 13:21:12 +02:00
if ! tb . canBackgroundMerge ( ) {
// Do not perform background merge in read-only mode
// in order to prevent from disk space shortage.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
return nil
}
2021-08-25 08:35:03 +02:00
n := fs . MustGetFreeSpace ( tb . path )
// Divide free space by the max number of concurrent merges.
maxOutBytes := n / uint64 ( mergeWorkersCount )
if maxOutBytes > maxPartSize {
maxOutBytes = maxPartSize
2019-05-22 23:16:55 +02:00
}
tb . partsLock . Lock ( )
2021-08-25 08:35:03 +02:00
pws := getPartsToMerge ( tb . parts , maxOutBytes , isFinal )
2019-05-22 23:16:55 +02:00
tb . partsLock . Unlock ( )
return tb . mergeParts ( pws , tb . stopCh , false )
}
const (
minMergeSleepTime = time . Millisecond
maxMergeSleepTime = time . Second
)
func ( tb * Table ) partMerger ( ) error {
sleepTime := minMergeSleepTime
2020-05-14 21:01:51 +02:00
var lastMergeTime uint64
2019-05-22 23:16:55 +02:00
isFinal := false
t := time . NewTimer ( sleepTime )
for {
2019-09-19 20:48:14 +02:00
err := tb . mergeExistingParts ( isFinal )
2019-05-22 23:16:55 +02:00
if err == nil {
// Try merging additional parts.
sleepTime = minMergeSleepTime
2020-05-14 21:01:51 +02:00
lastMergeTime = fasttime . UnixTimestamp ( )
2019-05-22 23:16:55 +02:00
isFinal = false
continue
}
2020-09-17 02:02:35 +02:00
if errors . Is ( err , errForciblyStopped ) {
2019-05-22 23:16:55 +02:00
// The merger has been stopped.
return nil
}
2020-09-17 02:02:35 +02:00
if ! errors . Is ( err , errNothingToMerge ) {
2019-05-22 23:16:55 +02:00
return err
}
2020-05-14 21:01:51 +02:00
if fasttime . UnixTimestamp ( ) - lastMergeTime > 30 {
2019-05-22 23:16:55 +02:00
// We have free time for merging into bigger parts.
// This should improve select performance.
2020-05-14 21:01:51 +02:00
lastMergeTime = fasttime . UnixTimestamp ( )
2019-05-22 23:16:55 +02:00
isFinal = true
continue
}
// Nothing to merge. Sleep for a while and try again.
sleepTime *= 2
if sleepTime > maxMergeSleepTime {
sleepTime = maxMergeSleepTime
}
select {
case <- tb . stopCh :
return nil
case <- t . C :
t . Reset ( sleepTime )
}
}
}
var errNothingToMerge = fmt . Errorf ( "nothing to merge" )
2020-09-17 01:05:54 +02:00
// mergeParts merges pws.
//
// Merging is immediately stopped if stopCh is closed.
//
// All the parts inside pws must have isInMerge field set to true.
2019-05-22 23:16:55 +02:00
func ( tb * Table ) mergeParts ( pws [ ] * partWrapper , stopCh <- chan struct { } , isOuterParts bool ) error {
if len ( pws ) == 0 {
// Nothing to merge.
return errNothingToMerge
}
atomic . AddUint64 ( & tb . mergesCount , 1 )
atomic . AddUint64 ( & tb . activeMerges , 1 )
defer atomic . AddUint64 ( & tb . activeMerges , ^ uint64 ( 0 ) )
startTime := time . Now ( )
defer func ( ) {
// Remove isInMerge flag from pws.
tb . partsLock . Lock ( )
for _ , pw := range pws {
if ! pw . isInMerge {
logger . Panicf ( "BUG: missing isInMerge flag on the part %q" , pw . p . path )
}
pw . isInMerge = false
}
tb . partsLock . Unlock ( )
} ( )
// Prepare blockStreamReaders for source parts.
bsrs := make ( [ ] * blockStreamReader , 0 , len ( pws ) )
defer func ( ) {
for _ , bsr := range bsrs {
putBlockStreamReader ( bsr )
}
} ( )
for _ , pw := range pws {
bsr := getBlockStreamReader ( )
if pw . mp != nil {
if ! isOuterParts {
logger . Panicf ( "BUG: inmemory part must be always outer" )
}
bsr . InitFromInmemoryPart ( pw . mp )
} else {
if err := bsr . InitFromFilePart ( pw . p . path ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot open source part for merging: %w" , err )
2019-05-22 23:16:55 +02:00
}
}
bsrs = append ( bsrs , bsr )
}
outItemsCount := uint64 ( 0 )
2020-05-15 12:11:30 +02:00
outBlocksCount := uint64 ( 0 )
2019-05-22 23:16:55 +02:00
for _ , pw := range pws {
outItemsCount += pw . p . ph . itemsCount
2020-05-15 12:11:30 +02:00
outBlocksCount += pw . p . ph . blocksCount
2019-05-22 23:16:55 +02:00
}
nocache := true
2019-09-09 10:41:30 +02:00
if outItemsCount < maxItemsPerCachedPart ( ) {
2019-05-22 23:16:55 +02:00
// Cache small (i.e. recent) output parts in OS file cache,
// since there is high chance they will be read soon.
nocache = false
}
// Prepare blockStreamWriter for destination part.
mergeIdx := tb . nextMergeIdx ( )
tmpPartPath := fmt . Sprintf ( "%s/tmp/%016X" , tb . path , mergeIdx )
bsw := getBlockStreamWriter ( )
2020-05-15 12:11:30 +02:00
compressLevel := getCompressLevelForPartItems ( outItemsCount , outBlocksCount )
2019-05-22 23:16:55 +02:00
if err := bsw . InitFromFilePart ( tmpPartPath , nocache , compressLevel ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot create destination part %q: %w" , tmpPartPath , err )
2019-05-22 23:16:55 +02:00
}
// Merge parts into a temporary location.
var ph partHeader
2020-07-30 18:57:25 +02:00
err := mergeBlockStreams ( & ph , bsw , bsrs , tb . prepareBlock , stopCh , & tb . itemsMerged )
2019-05-22 23:16:55 +02:00
putBlockStreamWriter ( bsw )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when merging parts to %q: %w" , tmpPartPath , err )
2019-05-22 23:16:55 +02:00
}
if err := ph . WriteMetadata ( tmpPartPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot write metadata to destination part %q: %w" , tmpPartPath , err )
2019-05-22 23:16:55 +02:00
}
// Close bsrs (aka source parts).
for _ , bsr := range bsrs {
putBlockStreamReader ( bsr )
}
bsrs = nil
// Create a transaction for atomic deleting old parts and moving
// new part to its destination place.
var bb bytesutil . ByteBuffer
for _ , pw := range pws {
if pw . mp == nil {
fmt . Fprintf ( & bb , "%s\n" , pw . p . path )
}
}
dstPartPath := ph . Path ( tb . path , mergeIdx )
fmt . Fprintf ( & bb , "%s -> %s\n" , tmpPartPath , dstPartPath )
txnPath := fmt . Sprintf ( "%s/txn/%016X" , tb . path , mergeIdx )
2019-08-12 13:44:24 +02:00
if err := fs . WriteFileAtomically ( txnPath , bb . B ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot create transaction file %q: %w" , txnPath , err )
2019-05-22 23:16:55 +02:00
}
// Run the created transaction.
if err := runTransaction ( & tb . snapshotLock , tb . path , txnPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot execute transaction %q: %w" , txnPath , err )
2019-05-22 23:16:55 +02:00
}
// Open the merged part.
newP , err := openFilePart ( dstPartPath )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot open merged part %q: %w" , dstPartPath , err )
2019-05-22 23:16:55 +02:00
}
2019-07-04 18:09:40 +02:00
newPSize := newP . size
2019-05-22 23:16:55 +02:00
newPW := & partWrapper {
p : newP ,
refCount : 1 ,
}
// Atomically remove old parts and add new part.
m := make ( map [ * partWrapper ] bool , len ( pws ) )
for _ , pw := range pws {
m [ pw ] = true
}
if len ( m ) != len ( pws ) {
logger . Panicf ( "BUG: %d duplicate parts found in the merge of %d parts" , len ( pws ) - len ( m ) , len ( pws ) )
}
removedParts := 0
tb . partsLock . Lock ( )
tb . parts , removedParts = removeParts ( tb . parts , m )
tb . parts = append ( tb . parts , newPW )
tb . partsLock . Unlock ( )
if removedParts != len ( m ) {
if ! isOuterParts {
logger . Panicf ( "BUG: unexpected number of parts removed; got %d; want %d" , removedParts , len ( m ) )
}
if removedParts != 0 {
logger . Panicf ( "BUG: removed non-zero outer parts: %d" , removedParts )
}
}
// Remove partition references from old parts.
for _ , pw := range pws {
pw . decRef ( )
}
d := time . Since ( startTime )
2021-05-23 13:02:29 +02:00
if d > 30 * time . Second {
2020-05-15 12:11:30 +02:00
logger . Infof ( "merged %d items across %d blocks in %.3f seconds at %d items/sec to %q; sizeBytes: %d" ,
outItemsCount , outBlocksCount , d . Seconds ( ) , int ( float64 ( outItemsCount ) / d . Seconds ( ) ) , dstPartPath , newPSize )
2019-05-22 23:16:55 +02:00
}
return nil
}
2020-05-15 12:11:30 +02:00
func getCompressLevelForPartItems ( itemsCount , blocksCount uint64 ) int {
// There is no need in using blocksCount here, since mergeset blocks are usually full.
2020-05-14 22:44:01 +02:00
if itemsCount <= 1 << 16 {
// -5 is the minimum supported compression for zstd.
// See https://github.com/facebook/zstd/releases/tag/v1.3.4
return - 5
}
if itemsCount <= 1 << 17 {
return - 4
}
if itemsCount <= 1 << 18 {
return - 3
}
if itemsCount <= 1 << 19 {
return - 2
}
if itemsCount <= 1 << 20 {
return - 1
}
if itemsCount <= 1 << 22 {
2019-05-22 23:16:55 +02:00
return 1
}
2020-05-14 22:44:01 +02:00
if itemsCount <= 1 << 25 {
2019-05-22 23:16:55 +02:00
return 2
}
2020-05-14 22:44:01 +02:00
if itemsCount <= 1 << 28 {
2019-05-22 23:16:55 +02:00
return 3
}
2020-05-14 22:44:01 +02:00
return 4
2019-05-22 23:16:55 +02:00
}
func ( tb * Table ) nextMergeIdx ( ) uint64 {
return atomic . AddUint64 ( & tb . mergeIdx , 1 )
}
2020-12-08 19:49:32 +01:00
var mergeWorkersCount = cgroup . AvailableCPUs ( )
2019-05-22 23:16:55 +02:00
func openParts ( path string ) ( [ ] * partWrapper , error ) {
2019-11-02 01:26:02 +01:00
// The path can be missing after restoring from backup, so create it if needed.
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
return nil , err
}
2019-05-22 23:16:55 +02:00
d , err := os . Open ( path )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot open difrectory: %w" , err )
2019-05-22 23:16:55 +02:00
}
defer fs . MustClose ( d )
// Run remaining transactions and cleanup /txn and /tmp directories.
// Snapshots cannot be created yet, so use fakeSnapshotLock.
var fakeSnapshotLock sync . RWMutex
if err := runTransactions ( & fakeSnapshotLock , path ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot run transactions: %w" , err )
2019-05-22 23:16:55 +02:00
}
txnDir := path + "/txn"
2019-06-12 00:53:43 +02:00
fs . MustRemoveAll ( txnDir )
2019-05-22 23:16:55 +02:00
if err := fs . MkdirAllFailIfExist ( txnDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create %q: %w" , txnDir , err )
2019-05-22 23:16:55 +02:00
}
tmpDir := path + "/tmp"
2019-06-12 00:53:43 +02:00
fs . MustRemoveAll ( tmpDir )
2019-05-22 23:16:55 +02:00
if err := fs . MkdirAllFailIfExist ( tmpDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create %q: %w" , tmpDir , err )
2019-05-22 23:16:55 +02:00
}
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( path )
2019-05-22 23:16:55 +02:00
// Open parts.
fis , err := d . Readdir ( - 1 )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read directory: %w" , err )
2019-05-22 23:16:55 +02:00
}
var pws [ ] * partWrapper
for _ , fi := range fis {
if ! fs . IsDirOrSymlink ( fi ) {
// Skip non-directories.
continue
}
fn := fi . Name ( )
if isSpecialDir ( fn ) {
// Skip special dirs.
continue
}
partPath := path + "/" + fn
2021-04-22 11:58:53 +02:00
if fs . IsEmptyDir ( partPath ) {
// Remove empty directory, which can be left after unclean shutdown on NFS.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1142
fs . MustRemoveAll ( partPath )
continue
}
2019-05-22 23:16:55 +02:00
p , err := openFilePart ( partPath )
if err != nil {
mustCloseParts ( pws )
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot open part %q: %w" , partPath , err )
2019-05-22 23:16:55 +02:00
}
pw := & partWrapper {
p : p ,
refCount : 1 ,
}
pws = append ( pws , pw )
}
return pws , nil
}
func mustCloseParts ( pws [ ] * partWrapper ) {
for _ , pw := range pws {
if pw . refCount != 1 {
logger . Panicf ( "BUG: unexpected refCount when closing part %q: %d; want 1" , pw . p . path , pw . refCount )
}
pw . p . MustClose ( )
}
}
// CreateSnapshotAt creates tb snapshot in the given dstDir.
//
// Snapshot is created using linux hard links, so it is usually created
// very quickly.
func ( tb * Table ) CreateSnapshotAt ( dstDir string ) error {
logger . Infof ( "creating Table snapshot of %q..." , tb . path )
startTime := time . Now ( )
var err error
srcDir := tb . path
srcDir , err = filepath . Abs ( srcDir )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain absolute dir for %q: %w" , srcDir , err )
2019-05-22 23:16:55 +02:00
}
dstDir , err = filepath . Abs ( dstDir )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain absolute dir for %q: %w" , dstDir , err )
2019-05-22 23:16:55 +02:00
}
if strings . HasPrefix ( dstDir , srcDir + "/" ) {
return fmt . Errorf ( "cannot create snapshot %q inside the data dir %q" , dstDir , srcDir )
}
// Flush inmemory items to disk.
tb . flushRawItems ( true )
// The snapshot must be created under the lock in order to prevent from
// concurrent modifications via runTransaction.
tb . snapshotLock . Lock ( )
defer tb . snapshotLock . Unlock ( )
if err := fs . MkdirAllFailIfExist ( dstDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot create snapshot dir %q: %w" , dstDir , err )
2019-05-22 23:16:55 +02:00
}
d , err := os . Open ( srcDir )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot open difrectory: %w" , err )
2019-05-22 23:16:55 +02:00
}
defer fs . MustClose ( d )
fis , err := d . Readdir ( - 1 )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot read directory: %w" , err )
2019-05-22 23:16:55 +02:00
}
for _ , fi := range fis {
2019-09-20 18:46:47 +02:00
fn := fi . Name ( )
2019-05-22 23:16:55 +02:00
if ! fs . IsDirOrSymlink ( fi ) {
2019-09-20 18:46:47 +02:00
switch fn {
case convertToV1280FileName :
srcPath := srcDir + "/" + fn
dstPath := dstDir + "/" + fn
if err := os . Link ( srcPath , dstPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot hard link from %q to %q: %w" , srcPath , dstPath , err )
2019-09-20 18:46:47 +02:00
}
default :
// Skip other non-directories.
}
2019-05-22 23:16:55 +02:00
continue
}
if isSpecialDir ( fn ) {
// Skip special dirs.
continue
}
srcPartPath := srcDir + "/" + fn
dstPartPath := dstDir + "/" + fn
if err := fs . HardLinkFiles ( srcPartPath , dstPartPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot create hard links from %q to %q: %w" , srcPartPath , dstPartPath , err )
2019-05-22 23:16:55 +02:00
}
}
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( dstDir )
2019-05-22 23:16:55 +02:00
parentDir := filepath . Dir ( dstDir )
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( parentDir )
2019-05-22 23:16:55 +02:00
2020-01-22 17:27:44 +01:00
logger . Infof ( "created Table snapshot of %q at %q in %.3f seconds" , srcDir , dstDir , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 23:16:55 +02:00
return nil
}
func runTransactions ( txnLock * sync . RWMutex , path string ) error {
2019-12-04 20:35:51 +01:00
// Wait until all the previous pending transaction deletions are finished.
pendingTxnDeletionsWG . Wait ( )
// Make sure all the current transaction deletions are finished before exiting.
defer pendingTxnDeletionsWG . Wait ( )
2019-05-22 23:16:55 +02:00
txnDir := path + "/txn"
d , err := os . Open ( txnDir )
if err != nil {
if os . IsNotExist ( err ) {
return nil
}
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot open %q: %w" , txnDir , err )
2019-05-22 23:16:55 +02:00
}
defer fs . MustClose ( d )
fis , err := d . Readdir ( - 1 )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot read directory %q: %w" , d . Name ( ) , err )
2019-05-22 23:16:55 +02:00
}
// Sort transaction files by id, since transactions must be ordered.
sort . Slice ( fis , func ( i , j int ) bool {
return fis [ i ] . Name ( ) < fis [ j ] . Name ( )
} )
for _ , fi := range fis {
2019-08-12 13:44:24 +02:00
fn := fi . Name ( )
if fs . IsTemporaryFileName ( fn ) {
// Skip temporary files, which could be left after unclean shutdown.
continue
}
txnPath := txnDir + "/" + fn
2019-05-22 23:16:55 +02:00
if err := runTransaction ( txnLock , path , txnPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot run transaction from %q: %w" , txnPath , err )
2019-05-22 23:16:55 +02:00
}
}
return nil
}
func runTransaction ( txnLock * sync . RWMutex , pathPrefix , txnPath string ) error {
2020-09-17 01:05:54 +02:00
// The transaction must run under read lock in order to provide
2019-05-22 23:16:55 +02:00
// consistent snapshots with Table.CreateSnapshot().
txnLock . RLock ( )
defer txnLock . RUnlock ( )
2022-08-21 22:51:13 +02:00
data , err := os . ReadFile ( txnPath )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot read transaction file: %w" , err )
2019-05-22 23:16:55 +02:00
}
if len ( data ) > 0 && data [ len ( data ) - 1 ] == '\n' {
data = data [ : len ( data ) - 1 ]
}
paths := strings . Split ( string ( data ) , "\n" )
if len ( paths ) == 0 {
return fmt . Errorf ( "empty transaction" )
}
rmPaths := paths [ : len ( paths ) - 1 ]
mvPaths := strings . Split ( paths [ len ( paths ) - 1 ] , " -> " )
if len ( mvPaths ) != 2 {
return fmt . Errorf ( "invalid last line in the transaction file: got %q; must contain `srcPath -> dstPath`" , paths [ len ( paths ) - 1 ] )
}
// Remove old paths. It is OK if certain paths don't exist.
2019-12-02 20:34:35 +01:00
var removeWG sync . WaitGroup
2019-05-22 23:16:55 +02:00
for _ , path := range rmPaths {
path , err := validatePath ( pathPrefix , path )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "invalid path to remove: %w" , err )
2019-05-22 23:16:55 +02:00
}
2019-12-02 20:34:35 +01:00
removeWG . Add ( 1 )
fs . MustRemoveAllWithDoneCallback ( path , removeWG . Done )
2019-05-22 23:16:55 +02:00
}
// Move the new part to new directory.
srcPath := mvPaths [ 0 ]
dstPath := mvPaths [ 1 ]
srcPath , err = validatePath ( pathPrefix , srcPath )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "invalid source path to rename: %w" , err )
2019-05-22 23:16:55 +02:00
}
dstPath , err = validatePath ( pathPrefix , dstPath )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "invalid destination path to rename: %w" , err )
2019-05-22 23:16:55 +02:00
}
if fs . IsPathExist ( srcPath ) {
if err := os . Rename ( srcPath , dstPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot rename %q to %q: %w" , srcPath , dstPath , err )
2019-05-22 23:16:55 +02:00
}
2019-12-09 14:42:57 +01:00
} else if ! fs . IsPathExist ( dstPath ) {
// Emit info message for the expected condition after unclean shutdown on NFS disk.
// The dstPath part may be missing because it could be already merged into bigger part
// while old source parts for the current txn weren't still deleted due to NFS locks.
logger . Infof ( "cannot find both source and destination paths: %q -> %q; this may be the case after unclean shutdown (OOM, `kill -9`, hard reset) on NFS disk" ,
srcPath , dstPath )
2019-05-22 23:16:55 +02:00
}
// Flush pathPrefix directory metadata to the underying storage.
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( pathPrefix )
2019-05-22 23:16:55 +02:00
2019-12-04 20:35:51 +01:00
pendingTxnDeletionsWG . Add ( 1 )
2019-12-02 20:34:35 +01:00
go func ( ) {
2019-12-04 20:35:51 +01:00
defer pendingTxnDeletionsWG . Done ( )
2019-12-02 20:34:35 +01:00
// Remove the transaction file only after all the source paths are deleted.
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
removeWG . Wait ( )
if err := os . Remove ( txnPath ) ; err != nil {
logger . Errorf ( "cannot remove transaction file %q: %s" , txnPath , err )
}
} ( )
2019-05-22 23:16:55 +02:00
return nil
}
2019-12-04 20:35:51 +01:00
var pendingTxnDeletionsWG syncwg . WaitGroup
2019-05-22 23:16:55 +02:00
func validatePath ( pathPrefix , path string ) ( string , error ) {
var err error
pathPrefix , err = filepath . Abs ( pathPrefix )
if err != nil {
2020-06-30 21:58:18 +02:00
return path , fmt . Errorf ( "cannot determine absolute path for pathPrefix=%q: %w" , pathPrefix , err )
2019-05-22 23:16:55 +02:00
}
path , err = filepath . Abs ( path )
if err != nil {
2020-06-30 21:58:18 +02:00
return path , fmt . Errorf ( "cannot determine absolute path for %q: %w" , path , err )
2019-05-22 23:16:55 +02:00
}
if ! strings . HasPrefix ( path , pathPrefix + "/" ) {
return path , fmt . Errorf ( "invalid path %q; must start with %q" , path , pathPrefix + "/" )
}
return path , nil
}
// getPartsToMerge returns optimal parts to merge from pws.
//
// if isFinal is set, then merge harder.
//
2021-08-25 08:35:03 +02:00
// The summary size of the returned parts must be smaller than the maxOutBytes.
func getPartsToMerge ( pws [ ] * partWrapper , maxOutBytes uint64 , isFinal bool ) [ ] * partWrapper {
2019-05-22 23:16:55 +02:00
pwsRemaining := make ( [ ] * partWrapper , 0 , len ( pws ) )
for _ , pw := range pws {
if ! pw . isInMerge {
pwsRemaining = append ( pwsRemaining , pw )
}
}
maxPartsToMerge := defaultPartsToMerge
var dst [ ] * partWrapper
if isFinal {
for len ( dst ) == 0 && maxPartsToMerge >= finalPartsToMerge {
2021-08-25 08:35:03 +02:00
dst = appendPartsToMerge ( dst [ : 0 ] , pwsRemaining , maxPartsToMerge , maxOutBytes )
2019-05-22 23:16:55 +02:00
maxPartsToMerge --
}
} else {
2021-08-25 08:35:03 +02:00
dst = appendPartsToMerge ( dst [ : 0 ] , pwsRemaining , maxPartsToMerge , maxOutBytes )
2019-05-22 23:16:55 +02:00
}
for _ , pw := range dst {
if pw . isInMerge {
logger . Panicf ( "BUG: partWrapper.isInMerge is already set" )
}
pw . isInMerge = true
}
return dst
}
2021-08-25 08:35:03 +02:00
// minMergeMultiplier is the minimum multiplier for the size of the output part
// compared to the size of the maximum input part for the merge.
//
// Higher value reduces write amplification (disk write IO induced by the merge),
// while increases the number of unmerged parts.
// The 1.7 is good enough for production workloads.
const minMergeMultiplier = 1.7
2019-05-22 23:16:55 +02:00
// appendPartsToMerge finds optimal parts to merge from src, appends
// them to dst and returns the result.
2021-08-25 08:35:03 +02:00
func appendPartsToMerge ( dst , src [ ] * partWrapper , maxPartsToMerge int , maxOutBytes uint64 ) [ ] * partWrapper {
2019-05-22 23:16:55 +02:00
if len ( src ) < 2 {
// There is no need in merging zero or one part :)
return dst
}
if maxPartsToMerge < 2 {
logger . Panicf ( "BUG: maxPartsToMerge cannot be smaller than 2; got %d" , maxPartsToMerge )
}
// Filter out too big parts.
// This should reduce N for O(n^2) algorithm below.
2021-08-25 08:35:03 +02:00
maxInPartBytes := uint64 ( float64 ( maxOutBytes ) / minMergeMultiplier )
2019-05-22 23:16:55 +02:00
tmp := make ( [ ] * partWrapper , 0 , len ( src ) )
for _ , pw := range src {
2021-08-25 08:35:03 +02:00
if pw . p . size > maxInPartBytes {
2019-05-22 23:16:55 +02:00
continue
}
tmp = append ( tmp , pw )
}
src = tmp
2021-08-25 08:35:03 +02:00
// Sort src parts by size.
sort . Slice ( src , func ( i , j int ) bool { return src [ i ] . p . size < src [ j ] . p . size } )
2019-05-22 23:16:55 +02:00
2020-12-18 19:00:06 +01:00
maxSrcParts := maxPartsToMerge
2021-07-02 16:24:14 +02:00
if maxSrcParts > len ( src ) {
2020-12-18 19:00:06 +01:00
maxSrcParts = len ( src )
2019-05-22 23:16:55 +02:00
}
2021-07-02 16:24:14 +02:00
minSrcParts := ( maxSrcParts + 1 ) / 2
if minSrcParts < 2 {
minSrcParts = 2
}
2019-05-22 23:16:55 +02:00
2020-12-18 19:00:06 +01:00
// Exhaustive search for parts giving the lowest write amplification when merged.
2019-05-22 23:16:55 +02:00
var pws [ ] * partWrapper
maxM := float64 ( 0 )
2020-12-18 19:00:06 +01:00
for i := minSrcParts ; i <= maxSrcParts ; i ++ {
2019-05-22 23:16:55 +02:00
for j := 0 ; j <= len ( src ) - i ; j ++ {
2019-10-29 11:45:19 +01:00
a := src [ j : j + i ]
2021-08-25 08:35:03 +02:00
if a [ 0 ] . p . size * uint64 ( len ( a ) ) < a [ len ( a ) - 1 ] . p . size {
// Do not merge parts with too big difference in size,
2020-12-18 19:00:06 +01:00
// since this results in unbalanced merges.
continue
}
2021-08-25 08:35:03 +02:00
outBytes := uint64 ( 0 )
2019-10-29 11:45:19 +01:00
for _ , pw := range a {
2021-08-25 08:35:03 +02:00
outBytes += pw . p . size
2019-05-22 23:16:55 +02:00
}
2021-08-25 08:35:03 +02:00
if outBytes > maxOutBytes {
2019-10-29 11:45:19 +01:00
// There is no sense in checking the remaining bigger parts.
break
2019-05-22 23:16:55 +02:00
}
2021-08-25 08:35:03 +02:00
m := float64 ( outBytes ) / float64 ( a [ len ( a ) - 1 ] . p . size )
2019-05-22 23:16:55 +02:00
if m < maxM {
continue
}
maxM = m
2019-10-29 11:45:19 +01:00
pws = a
2019-05-22 23:16:55 +02:00
}
}
2019-10-29 11:45:19 +01:00
minM := float64 ( maxPartsToMerge ) / 2
2021-08-25 08:35:03 +02:00
if minM < minMergeMultiplier {
minM = minMergeMultiplier
2019-05-22 23:16:55 +02:00
}
if maxM < minM {
2021-08-25 08:35:03 +02:00
// There is no sense in merging parts with too small m,
// since this leads to high disk write IO.
2019-05-22 23:16:55 +02:00
return dst
}
return append ( dst , pws ... )
}
func removeParts ( pws [ ] * partWrapper , partsToRemove map [ * partWrapper ] bool ) ( [ ] * partWrapper , int ) {
removedParts := 0
dst := pws [ : 0 ]
for _ , pw := range pws {
2020-09-17 01:05:54 +02:00
if ! partsToRemove [ pw ] {
dst = append ( dst , pw )
2019-05-22 23:16:55 +02:00
continue
}
2020-09-17 01:05:54 +02:00
removedParts ++
2019-05-22 23:16:55 +02:00
}
return dst , removedParts
}
func isSpecialDir ( name string ) bool {
// Snapshots and cache dirs aren't used anymore.
// Keep them here for backwards compatibility.
return name == "tmp" || name == "txn" || name == "snapshots" || name == "cache"
}