From bb7a419cc30afed0a42c431d48cb5f4d1ac29e7b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 26 Jan 2024 21:39:49 +0100 Subject: [PATCH] lib/{mergeset,storage}: make background merge more responsive and scalable - Maintain a separate worker pool per each part type (in-memory, file, big and small). Previously a shared pool was used for merging all the part types. A single merge worker could merge parts with mixed types at once. For example, it could merge simultaneously an in-memory part plus a big file part. Such a merge could take hours for big file part. During the duration of this merge the in-memory part was pinned in memory and couldn't be persisted to disk under the configured -inmemoryDataFlushInterval . Another common issue, which could happen when parts with mixed types are merged, is uncontrolled growth of in-memory parts or small parts when all the merge workers were busy with merging big files. Such growth could lead to significant performance degradataion for queries, since every query needs to check ever growing list of parts. This could also slow down the registration of new time series, since VictoriaMetrics searches for the internal series_id in the indexdb for every new time series. The third issue is graceful shutdown duration, which could be very long when a background merge is running on in-memory parts plus big file parts. This merge couldn't be interrupted, since it merges in-memory parts. A separate pool of merge workers per every part type elegantly resolves both issues: - In-memory parts are merged to file-based parts in a timely manner, since the maximum size of in-memory parts is limited. - Long-running merges for big parts do not block merges for in-memory parts and small parts. - Graceful shutdown duration is now limited by the time needed for flushing in-memory parts to files. Merging for file parts is instantly canceled on graceful shutdown now. - Deprecate -smallMergeConcurrency command-line flag, since the new background merge algorithm should automatically self-tune according to the number of available CPU cores. - Deprecate -finalMergeDelay command-line flag, since it wasn't working correctly. It is better to run forced merge when needed - https://docs.victoriametrics.com/#forced-merge - Tune the number of shards for pending rows and items before the data goes to in-memory parts and becomes visible for search. This improves the maximum data ingestion rate and the maximum rate for registration of new time series. This should reduce the duration of data ingestion slowdown in VictoriaMetrics cluster on e.g. re-routing events, when some of vmstorage nodes become temporarily unavailable. - Prevent from possible "sync: WaitGroup misuse" panic on graceful shutdown. This is a follow-up for fa566c68a6ccf7385a05f649aee7e5f5a38afb15 . Thanks @misutoth to for the inspiration at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5190 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3790 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3425 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 --- app/vmstorage/main.go | 20 +- docs/CHANGELOG.md | 1 + lib/mergeset/table.go | 802 ++++++++++++++++++++------------------ lib/storage/partition.go | 813 ++++++++++++++++++++++----------------- lib/storage/storage.go | 13 +- lib/storage/table.go | 8 + 6 files changed, 925 insertions(+), 732 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index e9d228625..fbd4ffe23 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -38,13 +38,10 @@ var ( // DataPath is a path to storage data. DataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to storage data") - finalMergeDelay = flag.Duration("finalMergeDelay", 0, "The delay before starting final merge for per-month partition after no new data is ingested into it. "+ - "Final merge may require additional disk IO and CPU resources. Final merge may increase query speed and reduce disk space usage in some cases. "+ - "Zero value disables final merge") - _ = flag.Int("bigMergeConcurrency", 0, "Deprecated: this flag does nothing. Please use -smallMergeConcurrency "+ - "for controlling the concurrency of background merges. See https://docs.victoriametrics.com/#storage") - smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of workers for background merges. See https://docs.victoriametrics.com/#storage . "+ - "It isn't recommended tuning this flag in general case, since this may lead to uncontrolled increase in the number of parts and increased CPU usage during queries") + _ = flag.Duration("finalMergeDelay", 0, "Deprecated: this flag does nothing") + _ = flag.Int("bigMergeConcurrency", 0, "Deprecated: this flag does nothing") + _ = flag.Int("smallMergeConcurrency", 0, "Deprecated: this flag does nothing") + retentionTimezoneOffset = flag.Duration("retentionTimezoneOffset", 0, "The offset for performing indexdb rotation. "+ "If set to 0, then the indexdb rotation is performed at 4am UTC time per each -retentionPeriod. "+ "If set to 2h, then the indexdb rotation is performed at 4am EET time (the timezone with +2h offset)") @@ -96,8 +93,6 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) { resetResponseCacheIfNeeded = resetCacheIfNeeded storage.SetLogNewSeries(*logNewSeries) - storage.SetFinalMergeDelay(*finalMergeDelay) - storage.SetMergeWorkersCount(*smallMergeConcurrency) storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset) storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N) storage.SetTSIDCacheSize(cacheSizeStorageTSID.IntN()) @@ -496,11 +491,8 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) { metrics.WriteCounterUint64(w, `vm_composite_filter_success_conversions_total`, idbm.CompositeFilterSuccessConversions) metrics.WriteCounterUint64(w, `vm_composite_filter_missing_conversions_total`, idbm.CompositeFilterMissingConversions) - metrics.WriteCounterUint64(w, `vm_assisted_merges_total{type="storage/inmemory"}`, tm.InmemoryAssistedMergesCount) - metrics.WriteCounterUint64(w, `vm_assisted_merges_total{type="storage/small"}`, tm.SmallAssistedMergesCount) - - metrics.WriteCounterUint64(w, `vm_assisted_merges_total{type="indexdb/inmemory"}`, idbm.InmemoryAssistedMergesCount) - metrics.WriteCounterUint64(w, `vm_assisted_merges_total{type="indexdb/file"}`, idbm.FileAssistedMergesCount) + // vm_assisted_merges_total name is used for backwards compatibility. + metrics.WriteCounterUint64(w, `vm_assisted_merges_total{type="indexdb/inmemory"}`, idbm.InmemoryPartsLimitReachedCount) metrics.WriteCounterUint64(w, `vm_indexdb_items_added_total`, idbm.ItemsAdded) metrics.WriteCounterUint64(w, `vm_indexdb_items_added_size_bytes_total`, idbm.ItemsAddedSizeBytes) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 96d6cdccc..063354ebc 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -31,6 +31,7 @@ The sandbox cluster installation is running under the constant load generated by * SECURITY: upgrade Go builder from Go1.21.5 to Go1.21.6. See [the list of issues addressed in Go1.21.6](https://github.com/golang/go/issues?q=milestone%3AGo1.21.6+label%3ACherryPickApproved). * FEATURE: improve new [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series) registration speed on systems with high number of CPU cores. Thanks to @misutoth for the initial idea and [implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212). +* FEATURE: make [background merge](https://docs.victoriametrics.com/#storage) more responsive and scalable. This should help the following issues: [5190](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5190), [3425](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3425), [648](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for discovering [Hetzner Cloud](https://www.hetzner.com/cloud) and [Hetzner Robot](https://docs.hetzner.com/robot) scrape targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3154) and [these docs](https://docs.victoriametrics.com/sd_configs.html#hetzner_sd_configs). * FEATURE: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): add support for negative index in `groupByNode` and `aliasByNode` functions. Thanks to @rbizos for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5581). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 5c692a5ff..b086531b9 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "os" "path/filepath" "sort" @@ -28,16 +29,9 @@ import ( // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 // // This number may be reached when the insertion pace outreaches merger pace. -// If this number is reached, then assisted merges are performed -// during data ingestion. -const maxInmemoryParts = 15 - -// maxFileParts is the maximum number of file parts in the table. -// -// This number may be reached when the insertion pace outreaches merger pace. -// If this number is reached, then assisted merges are performed -// during data ingestion. -const maxFileParts = 30 +// If this number is reached, then the data ingestion is paused until background +// mergers reduce the number of parts below this number. +const maxInmemoryParts = 30 // Default number of parts to merge at once. // @@ -45,13 +39,6 @@ const maxFileParts = 30 // See appendPartsToMerge tests for details. const defaultPartsToMerge = 15 -// The final number of parts to merge at once. -// -// It must be smaller than defaultPartsToMerge. -// Lower value improves select performance at the cost of increased -// write amplification. -const finalPartsToMerge = 2 - // maxPartSize is the maximum part size in bytes. // // This number should be limited by the amount of time required to merge parts of this summary size. @@ -110,18 +97,16 @@ type Table struct { inmemoryItemsMerged uint64 fileItemsMerged uint64 - inmemoryAssistedMergesCount uint64 - fileAssistedMergesCount uint64 - itemsAdded uint64 itemsAddedSizeBytes uint64 + inmemoryPartsLimitReachedCount uint64 + mergeIdx uint64 path string flushCallback func() - flushCallbackWorkerWG sync.WaitGroup needFlushCallbackCall uint32 prepareBlock PrepareBlockCallback @@ -129,32 +114,38 @@ type Table struct { // rawItems contains recently added items that haven't been converted to parts yet. // - // rawItems aren't used in search for performance reasons + // rawItems are converted to inmemoryParts at least every pendingItemsFlushInterval or when rawItems becomes full. + // + // rawItems aren't visible for search due to performance reasons. rawItems rawItemsShards // partsLock protects inmemoryParts and fileParts. partsLock sync.Mutex - // inmemoryParts contains inmemory parts. + // inmemoryParts contains inmemory parts, which are visible for search. inmemoryParts []*partWrapper - // inmemoryPartsLimitCh limits the number of inmemory parts + // fileParts contains file-backed parts, which are visible for search. + fileParts []*partWrapper + + // inmemoryPartsLimitCh limits the number of inmemory parts to maxInmemoryParts // in order to prevent from data ingestion slowdown as described at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 inmemoryPartsLimitCh chan struct{} - // fileParts contains file-backed parts. - fileParts []*partWrapper - - // This channel is used for signaling the background mergers that there are parts, - // which may need to be merged. - needMergeCh chan struct{} - + // stopCh is used for notifying all the background workers to stop. + // + // It must be closed under partsLock in order to prevent from calling wg.Add() + // after stopCh is closed. stopCh chan struct{} + // wg is used for waiting for all the background workers to stop. + // + // wg.Add() must be called under partsLock after checking whether stopCh isn't closed. + // This should prevent from calling wg.Add() after stopCh is closed and wg.Wait() is called. wg sync.WaitGroup // Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines. - rawItemsPendingFlushesWG syncwg.WaitGroup + flushPendingItemsWG syncwg.WaitGroup } type rawItemsShards struct { @@ -173,7 +164,7 @@ var rawItemsShardsPerTable = func() int { if multiplier > 16 { multiplier = 16 } - return (cpus*multiplier + 1) / 2 + return cpus * multiplier }() const maxBlocksPerShard = 256 @@ -233,8 +224,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { ris.mu.Lock() ibs := ris.ibs if len(ibs) == 0 { - ib := &nmemoryBlock{} - ibs = append(ibs, ib) + ibs = append(ibs, &inmemoryBlock{}) ris.ibs = ibs } ib := ibs[len(ibs)-1] @@ -243,23 +233,23 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { continue } if len(ibs) >= maxBlocksPerShard { - ibsToFlush = ibs + ibsToFlush = append(ibsToFlush, ibs...) ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard) tailItems = items[i:] atomic.StoreUint64(&ris.lastFlushTime, fasttime.UnixTimestamp()) break } - ib = &nmemoryBlock{} + ib = &inmemoryBlock{} if ib.Add(item) { ibs = append(ibs, ib) continue } - logger.Panicf("BUG: cannot insert too big item into an empty inmemoryBlock len(item)=%d; the caller should be responsible for avoiding too big items", len(item)) + logger.Panicf("BUG: cannot insert too big item into an empty inmemoryBlock; len(item)=%d; the caller should be responsible for avoiding too big items", len(item)) } ris.ibs = ibs ris.mu.Unlock() - tb.flushBlocksToParts(ibsToFlush, false) + tb.flushBlocksToInmemoryParts(ibsToFlush, false) return tailItems } @@ -332,74 +322,169 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC pws := mustOpenParts(path) tb := &Table{ + mergeIdx: uint64(time.Now().UnixNano()), path: path, flushCallback: flushCallback, prepareBlock: prepareBlock, isReadOnly: isReadOnly, - inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts), fileParts: pws, - mergeIdx: uint64(time.Now().UnixNano()), - needMergeCh: make(chan struct{}, 1), + inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts), stopCh: make(chan struct{}), } tb.rawItems.init() tb.startBackgroundWorkers() - // Wake up a single background merger, so it could start merging parts if needed. - tb.notifyBackgroundMergers() - - if flushCallback != nil { - tb.flushCallbackWorkerWG.Add(1) - go func() { - // call flushCallback once per 10 seconds in order to improve the effectiveness of caches, - // which are reset by the flushCallback. - d := timeutil.AddJitterToDuration(time.Second * 10) - tc := time.NewTicker(d) - for { - select { - case <-tb.stopCh: - tb.flushCallback() - tb.flushCallbackWorkerWG.Done() - return - case <-tc.C: - if atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 1, 0) { - tb.flushCallback() - } - } - } - }() - } - return tb } func (tb *Table) startBackgroundWorkers() { - tb.startMergeWorkers() - tb.startInmemoryPartsFlusher() + // Start file parts mergers, so they could start merging unmerged parts if needed. + // There is no need in starting in-memory parts mergers, since there are no in-memory parts yet. + tb.startFilePartsMergers() + tb.startPendingItemsFlusher() + tb.startInmemoryPartsFlusher() + tb.startFlushCallbackWorker() +} + +func (tb *Table) startInmemoryPartsMergers() { + tb.partsLock.Lock() + for i := 0; i < cap(inmemoryPartsConcurrencyCh); i++ { + tb.startInmemoryPartsMergerLocked() + } + tb.partsLock.Unlock() +} + +func (tb *Table) startInmemoryPartsMergerLocked() { + select { + case <-tb.stopCh: + return + default: + } + tb.wg.Add(1) + go func() { + tb.inmemoryPartsMerger() + tb.wg.Done() + }() +} + +func (tb *Table) startFilePartsMergers() { + tb.partsLock.Lock() + for i := 0; i < cap(filePartsConcurrencyCh); i++ { + tb.startFilePartsMergerLocked() + } + tb.partsLock.Unlock() +} + +func (tb *Table) startFilePartsMergerLocked() { + select { + case <-tb.stopCh: + return + default: + } + tb.wg.Add(1) + go func() { + tb.filePartsMerger() + tb.wg.Done() + }() +} + +func (tb *Table) startPendingItemsFlusher() { + tb.wg.Add(1) + go func() { + tb.pendingItemsFlusher() + tb.wg.Done() + }() +} + +func (tb *Table) startInmemoryPartsFlusher() { + tb.wg.Add(1) + go func() { + tb.inmemoryPartsFlusher() + tb.wg.Done() + }() +} + +func (tb *Table) startFlushCallbackWorker() { + if tb.flushCallback == nil { + return + } + + tb.wg.Add(1) + go func() { + // call flushCallback once per 10 seconds in order to improve the effectiveness of caches, + // which are reset by the flushCallback. + d := timeutil.AddJitterToDuration(time.Second * 10) + tc := time.NewTicker(d) + for { + select { + case <-tb.stopCh: + tb.flushCallback() + tb.wg.Done() + return + case <-tc.C: + if atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 1, 0) { + tb.flushCallback() + } + } + } + }() +} + +var ( + inmemoryPartsConcurrencyCh = make(chan struct{}, getInmemoryPartsConcurrency()) + filePartsConcurrencyCh = make(chan struct{}, getFilePartsConcurrency()) +) + +func getInmemoryPartsConcurrency() int { + // The concurrency for processing in-memory parts must equal to the number of CPU cores, + // since these operations are CPU-bound. + return cgroup.AvailableCPUs() +} + +func getFilePartsConcurrency() int { + n := cgroup.AvailableCPUs() + if n < 4 { + // Allow at least 4 concurrent workers for file parts on systems + // with less than 4 CPU cores in order to be able to make small file merges + // when big file merges are in progress. + return 4 + } + return n } // MustClose closes the table. func (tb *Table) MustClose() { + // Notify background workers to stop. + // The tb.partsLock is aquired in order to guarantee that tb.wg.Add() isn't called + // after tb.stopCh is closed and tb.wg.Wait() is called below. + tb.partsLock.Lock() close(tb.stopCh) + tb.partsLock.Unlock() - // Waiting for background workers to stop + // Wait for background workers to stop. tb.wg.Wait() - tb.flushInmemoryItems() - tb.flushCallbackWorkerWG.Wait() + // Flush the remaining in-memory items to files. + tb.flushInmemoryItemsToFiles() // Remove references to parts from the tb, so they may be eventually closed after all the searches are done. tb.partsLock.Lock() - inmemoryParts := tb.inmemoryParts - fileParts := tb.fileParts + + if n := tb.rawItems.Len(); n > 0 { + logger.Panicf("BUG: raw items must be empty at this stage; got %d items", n) + } + + if n := len(tb.inmemoryParts); n > 0 { + logger.Panicf("BUG: in-memory parts must be empty at this stage; got %d parts", n) + } tb.inmemoryParts = nil + + fileParts := tb.fileParts tb.fileParts = nil + tb.partsLock.Unlock() - for _, pw := range inmemoryParts { - pw.decRef() - } for _, pw := range fileParts { pw.decRef() } @@ -421,12 +506,11 @@ type TableMetrics struct { InmemoryItemsMerged uint64 FileItemsMerged uint64 - InmemoryAssistedMergesCount uint64 - FileAssistedMergesCount uint64 - ItemsAdded uint64 ItemsAddedSizeBytes uint64 + InmemoryPartsLimitReachedCount uint64 + PendingItems uint64 InmemoryPartsCount uint64 @@ -472,12 +556,11 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.InmemoryItemsMerged += atomic.LoadUint64(&tb.inmemoryItemsMerged) m.FileItemsMerged += atomic.LoadUint64(&tb.fileItemsMerged) - m.InmemoryAssistedMergesCount += atomic.LoadUint64(&tb.inmemoryAssistedMergesCount) - m.FileAssistedMergesCount += atomic.LoadUint64(&tb.fileAssistedMergesCount) - m.ItemsAdded += atomic.LoadUint64(&tb.itemsAdded) m.ItemsAddedSizeBytes += atomic.LoadUint64(&tb.itemsAddedSizeBytes) + m.InmemoryPartsLimitReachedCount += atomic.LoadUint64(&tb.inmemoryPartsLimitReachedCount) + m.PendingItems += uint64(tb.rawItems.Len()) tb.partsLock.Lock() @@ -553,21 +636,38 @@ func (tb *Table) putParts(pws []*partWrapper) { } } -func (tb *Table) mergePartsOptimal(pws []*partWrapper) error { - sortPartsForOptimalMerge(pws) +func (tb *Table) mergeInmemoryPartsToFiles(pws []*partWrapper) error { + pwsLen := len(pws) + + var errGlobal error + var errGlobalLock sync.Mutex + wg := getWaitGroup() for len(pws) > 0 { - n := defaultPartsToMerge - if n > len(pws) { - n = len(pws) - } - pwsChunk := pws[:n] - pws = pws[n:] - err := tb.mergeParts(pwsChunk, nil, true) - if err == nil { - continue - } - tb.releasePartsToMerge(pws) - return fmt.Errorf("cannot optimally merge %d parts: %w", n, err) + pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws) + wg.Add(1) + inmemoryPartsConcurrencyCh <- struct{}{} + go func(pwsChunk []*partWrapper) { + defer func() { + <-inmemoryPartsConcurrencyCh + wg.Done() + }() + + if err := tb.mergeParts(pwsChunk, nil, true); err != nil { + // There is no need for errors.Is(err, errForciblyStopped) check here, since stopCh=nil is passed to mergeParts. + errGlobalLock.Lock() + if errGlobal == nil { + errGlobal = err + } + errGlobalLock.Unlock() + } + }(pwsToMerge) + pws = pwsRemaining + } + wg.Wait() + putWaitGroup(wg) + + if errGlobal != nil { + return fmt.Errorf("cannot optimally merge %d parts: %w", pwsLen, errGlobal) } return nil } @@ -584,23 +684,22 @@ func (tb *Table) DebugFlush() { tb.flushPendingItems(nil, true) // Wait for background flushers to finish. - tb.rawItemsPendingFlushesWG.Wait() + tb.flushPendingItemsWG.Wait() } -func (tb *Table) startInmemoryPartsFlusher() { - tb.wg.Add(1) - go func() { - tb.inmemoryPartsFlusher() - tb.wg.Done() - }() -} - -func (tb *Table) startPendingItemsFlusher() { - tb.wg.Add(1) - go func() { - tb.pendingItemsFlusher() - tb.wg.Done() - }() +func (tb *Table) pendingItemsFlusher() { + d := timeutil.AddJitterToDuration(pendingItemsFlushInterval) + ticker := time.NewTicker(d) + defer ticker.Stop() + var ibs []*inmemoryBlock + for { + select { + case <-tb.stopCh: + return + case <-ticker.C: + ibs = tb.flushPendingItems(ibs[:0], false) + } + } } func (tb *Table) inmemoryPartsFlusher() { @@ -612,38 +711,24 @@ func (tb *Table) inmemoryPartsFlusher() { case <-tb.stopCh: return case <-ticker.C: - tb.flushInmemoryParts(false) - } - } -} - -func (tb *Table) pendingItemsFlusher() { - ticker := time.NewTicker(pendingItemsFlushInterval) - defer ticker.Stop() - var ibs []*inmemoryBlock - for { - select { - case <-tb.stopCh: - return - case <-ticker.C: - ibs = tb.flushPendingItems(ibs[:0], false) - for i := range ibs { - ibs[i] = nil - } + tb.flushInmemoryPartsToFiles(false) } } } func (tb *Table) flushPendingItems(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { - return tb.rawItems.flush(tb, dst, isFinal) + tb.flushPendingItemsWG.Add(1) + dst = tb.rawItems.flush(tb, dst, isFinal) + tb.flushPendingItemsWG.Done() + return dst } -func (tb *Table) flushInmemoryItems() { - tb.rawItems.flush(tb, nil, true) - tb.flushInmemoryParts(true) +func (tb *Table) flushInmemoryItemsToFiles() { + tb.flushPendingItems(nil, true) + tb.flushInmemoryPartsToFiles(true) } -func (tb *Table) flushInmemoryParts(isFinal bool) { +func (tb *Table) flushInmemoryPartsToFiles(isFinal bool) { currentTime := time.Now() var pws []*partWrapper @@ -656,18 +741,16 @@ func (tb *Table) flushInmemoryParts(isFinal bool) { } tb.partsLock.Unlock() - if err := tb.mergePartsOptimal(pws); err != nil { - logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) + if err := tb.mergeInmemoryPartsToFiles(pws); err != nil { + logger.Panicf("FATAL: cannot merge in-memory parts to files: %s", err) } } func (riss *rawItemsShards) flush(tb *Table, dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { - tb.rawItemsPendingFlushesWG.Add(1) for i := range riss.shards { dst = riss.shards[i].appendBlocksToFlush(dst, isFinal) } - tb.flushBlocksToParts(dst, isFinal) - tb.rawItemsPendingFlushesWG.Done() + tb.flushBlocksToInmemoryParts(dst, isFinal) return dst } @@ -695,10 +778,12 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool return dst } -func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { +func (tb *Table) flushBlocksToInmemoryParts(ibs []*inmemoryBlock, isFinal bool) { if len(ibs) == 0 { return } + + // Merge ibs into in-memory parts. var pwsLock sync.Mutex pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge) wg := getWaitGroup() @@ -708,44 +793,65 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { n = len(ibs) } wg.Add(1) - flushConcurrencyCh <- struct{}{} + inmemoryPartsConcurrencyCh <- struct{}{} go func(ibsChunk []*inmemoryBlock) { defer func() { - <-flushConcurrencyCh + <-inmemoryPartsConcurrencyCh wg.Done() }() - pw := tb.createInmemoryPart(ibsChunk) - if pw == nil { - return + + if pw := tb.createInmemoryPart(ibsChunk); pw != nil { + pwsLock.Lock() + pws = append(pws, pw) + pwsLock.Unlock() + } + // Clear references to ibsChunk items, so they may be reclaimed faster by Go GC. + for i := range ibsChunk { + ibsChunk[i] = nil } - pwsLock.Lock() - pws = append(pws, pw) - pwsLock.Unlock() }(ibs[:n]) ibs = ibs[n:] } wg.Wait() putWaitGroup(wg) - flushConcurrencyCh <- struct{}{} - pw := tb.mustMergeInmemoryParts(pws) - <-flushConcurrencyCh + // Merge pws into a single in-memory part. + maxPartSize := getMaxInmemoryPartSize() + for len(pws) > 1 { + pws = tb.mustMergeInmemoryParts(pws) + pwsRemaining := pws[:0] + for _, pw := range pws { + if pw.p.size >= maxPartSize { + tb.addToInmemoryParts(pw, isFinal) + } else { + pwsRemaining = append(pwsRemaining, pw) + } + } + pws = pwsRemaining + } + if len(pws) == 1 { + tb.addToInmemoryParts(pws[0], isFinal) + } +} + +func (tb *Table) addToInmemoryParts(pw *partWrapper, isFinal bool) { + // Wait until the number of in-memory parts goes below maxInmemoryParts. + // This prevents from excess CPU usage during search in tb under high ingestion rate to tb. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 select { case tb.inmemoryPartsLimitCh <- struct{}{}: default: - // Too many in-memory parts. Try assist merging them before adding pw to tb.inmemoryParts. - flushConcurrencyCh <- struct{}{} - tb.assistedMergeForInmemoryParts() - tb.assistedMergeForFileParts() - <-flushConcurrencyCh - - tb.inmemoryPartsLimitCh <- struct{}{} + atomic.AddUint64(&tb.inmemoryPartsLimitReachedCount, 1) + select { + case tb.inmemoryPartsLimitCh <- struct{}{}: + case <-tb.stopCh: + } } tb.partsLock.Lock() tb.inmemoryParts = append(tb.inmemoryParts, pw) - tb.notifyBackgroundMergers() + tb.startInmemoryPartsMergerLocked() tb.partsLock.Unlock() if tb.flushCallback != nil { @@ -761,83 +867,6 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { } } -func (tb *Table) notifyBackgroundMergers() bool { - select { - case tb.needMergeCh <- struct{}{}: - return true - default: - return false - } -} - -var flushConcurrencyLimit = func() int { - n := cgroup.AvailableCPUs() - if n < 2 { - // Allow at least 2 concurrent flushers on systems with a single CPU core - // in order to guarantee that in-memory data flushes and background merges can be continued - // when a single flusher is busy with the long merge. - n = 2 - } - return n -}() - -var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) - -func (tb *Table) assistedMergeForInmemoryParts() { - tb.partsLock.Lock() - needMerge := getNotInMergePartsCount(tb.inmemoryParts) >= defaultPartsToMerge - tb.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&tb.inmemoryAssistedMergesCount, 1) - - maxOutBytes := tb.getMaxFilePartSize() - - tb.partsLock.Lock() - pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes, true) - tb.partsLock.Unlock() - - err := tb.mergeParts(pws, tb.stopCh, true) - if err == nil { - return - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { - return - } - logger.Panicf("FATAL: cannot assist with merging inmemory parts: %s", err) -} - -func (tb *Table) assistedMergeForFileParts() { - tb.partsLock.Lock() - needMerge := len(tb.fileParts) > maxFileParts && getNotInMergePartsCount(tb.fileParts) >= defaultPartsToMerge - tb.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&tb.fileAssistedMergesCount, 1) - err := tb.mergeExistingParts(false) - if err == nil { - return - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { - return - } - logger.Panicf("FATAL: cannot assist with merging file parts: %s", err) -} - -func getNotInMergePartsCount(pws []*partWrapper) int { - n := 0 - for _, pw := range pws { - if !pw.isInMerge { - n++ - } - } - return n -} - func getWaitGroup() *sync.WaitGroup { v := wgPool.Get() if v == nil { @@ -852,7 +881,38 @@ func putWaitGroup(wg *sync.WaitGroup) { var wgPool sync.Pool -func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) *partWrapper { +func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper { + var pwsResult []*partWrapper + var pwsResultLock sync.Mutex + wg := getWaitGroup() + for len(pws) > 0 { + pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws) + wg.Add(1) + inmemoryPartsConcurrencyCh <- struct{}{} + go func(pwsChunk []*partWrapper) { + defer func() { + <-inmemoryPartsConcurrencyCh + wg.Done() + }() + + pw := tb.mustMergeInmemoryPartsFinal(pwsChunk) + + pwsResultLock.Lock() + pwsResult = append(pwsResult, pw) + pwsResultLock.Unlock() + }(pwsToMerge) + pws = pwsRemaining + } + wg.Wait() + putWaitGroup(wg) + + return pwsResult +} + +func (tb *Table) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper { + if len(pws) == 0 { + logger.Panicf("BUG: pws must contain at least a single item") + } if len(pws) == 1 { // Nothing to merge return pws[0] @@ -912,19 +972,16 @@ func (tb *Table) mustMergeIntoInmemoryPart(bsrs []*blockStreamReader, flushToDis bsw.MustInitFromInmemoryPart(mpDst, compressLevel) // Merge parts. - // The merge shouldn't be interrupted by stopCh, - // since it may be final after stopCh is closed. - atomic.AddUint64(&tb.activeInmemoryMerges, 1) - err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.inmemoryItemsMerged) - atomic.AddUint64(&tb.activeInmemoryMerges, ^uint64(0)) - atomic.AddUint64(&tb.inmemoryMergesCount, 1) - if err != nil { - logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) - } + // The merge shouldn't be interrupted by stopCh, so use nil stopCh. + ph, err := tb.mergePartsInternal("", bsw, bsrs, partInmemory, nil) putBlockStreamWriter(bsw) for _, bsr := range bsrs { putBlockStreamReader(bsr) } + if err != nil { + logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) + } + mpDst.ph = *ph return newPartWrapperFromInmemoryPart(mpDst, flushToDiskDeadline) } @@ -939,17 +996,6 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T } } -func (tb *Table) startMergeWorkers() { - // The actual number of concurrent merges is limited inside mergeWorker() below. - for i := 0; i < cap(mergeWorkersLimitCh); i++ { - tb.wg.Add(1) - go func() { - tb.mergeWorker() - tb.wg.Done() - }() - } -} - func getMaxInmemoryPartSize() uint64 { // Allow up to 5% of memory for in-memory parts. n := uint64(0.05 * float64(memory.Allowed()) / maxInmemoryParts) @@ -961,95 +1007,86 @@ func getMaxInmemoryPartSize() uint64 { func (tb *Table) getMaxFilePartSize() uint64 { n := fs.MustGetFreeSpace(tb.path) - // Divide free space by the max number of concurrent merges. - maxOutBytes := n / uint64(cap(mergeWorkersLimitCh)) + // Divide free space by the max number of concurrent merges for file parts. + maxOutBytes := n / uint64(cap(filePartsConcurrencyCh)) if maxOutBytes > maxPartSize { maxOutBytes = maxPartSize } return maxOutBytes } -func (tb *Table) canBackgroundMerge() bool { - return atomic.LoadUint32(tb.isReadOnly) == 0 +// NotifyReadWriteMode notifies tb that it may be switched from read-only mode to read-write mode. +func (tb *Table) NotifyReadWriteMode() { + tb.startInmemoryPartsMergers() + tb.startFilePartsMergers() } -var errReadOnlyMode = fmt.Errorf("storage is in readonly mode") - -func (tb *Table) mergeExistingParts(isFinal bool) error { - if !tb.canBackgroundMerge() { - // Do not perform background merge in read-only mode - // in order to prevent from disk space shortage. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603 - return errReadOnlyMode - } - maxOutBytes := tb.getMaxFilePartSize() - - tb.partsLock.Lock() - dst := make([]*partWrapper, 0, len(tb.inmemoryParts)+len(tb.fileParts)) - dst = append(dst, tb.inmemoryParts...) - dst = append(dst, tb.fileParts...) - pws := getPartsToMerge(dst, maxOutBytes, isFinal) - tb.partsLock.Unlock() - - return tb.mergeParts(pws, tb.stopCh, isFinal) -} - -func (tb *Table) mergeWorker() { - var lastMergeTime uint64 - isFinal := false +func (tb *Table) inmemoryPartsMerger() { for { - // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers - // across tables may exceed the the cap(mergeWorkersLimitCh). - mergeWorkersLimitCh <- struct{}{} - err := tb.mergeExistingParts(isFinal) - <-mergeWorkersLimitCh + if atomic.LoadUint32(tb.isReadOnly) != 0 { + return + } + maxOutBytes := tb.getMaxFilePartSize() + + tb.partsLock.Lock() + pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes) + tb.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + inmemoryPartsConcurrencyCh <- struct{}{} + err := tb.mergeParts(pws, tb.stopCh, false) + <-inmemoryPartsConcurrencyCh + + if err == nil { + // Try merging additional parts. + continue + } + if errors.Is(err, errForciblyStopped) { + // Nothing to do - finish the merger. + return + } + // Unexpected error. + logger.Panicf("FATAL: unrecoverable error when merging inmemory parts in %q: %s", tb.path, err) + } +} + +func (tb *Table) filePartsMerger() { + for { + if atomic.LoadUint32(tb.isReadOnly) != 0 { + return + } + maxOutBytes := tb.getMaxFilePartSize() + + tb.partsLock.Lock() + pws := getPartsToMerge(tb.fileParts, maxOutBytes) + tb.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + filePartsConcurrencyCh <- struct{}{} + err := tb.mergeParts(pws, tb.stopCh, false) + <-filePartsConcurrencyCh + if err == nil { // Try merging additional parts. - lastMergeTime = fasttime.UnixTimestamp() - isFinal = false continue } if errors.Is(err, errForciblyStopped) { // The merger has been stopped. return } - if !errors.Is(err, errNothingToMerge) && !errors.Is(err, errReadOnlyMode) { - // Unexpected error. - logger.Panicf("FATAL: unrecoverable error when merging inmemory parts in %q: %s", tb.path, err) - } - if finalMergeDelaySeconds > 0 && fasttime.UnixTimestamp()-lastMergeTime > finalMergeDelaySeconds { - // We have free time for merging into bigger parts. - // This should improve select performance. - lastMergeTime = fasttime.UnixTimestamp() - isFinal = true - continue - } - - // Nothing to merge. Wait for the notification of new merge. - select { - case <-tb.stopCh: - return - case <-tb.needMergeCh: - } + // Unexpected error. + logger.Panicf("FATAL: unrecoverable error when merging file parts in %q: %s", tb.path, err) } } -// Disable final merge by default, since it may lead to high disk IO and CPU usage -// after some inactivity time. -var finalMergeDelaySeconds = uint64(0) - -// SetFinalMergeDelay sets the delay before doing final merge for Table without newly ingested data. -// -// This function may be called only before Table initialization. -func SetFinalMergeDelay(delay time.Duration) { - if delay <= 0 { - return - } - finalMergeDelaySeconds = uint64(delay.Seconds() + 1) -} - -var errNothingToMerge = fmt.Errorf("nothing to merge") - func assertIsInMerge(pws []*partWrapper) { for _, pw := range pws { if !pw.isInMerge { @@ -1071,16 +1108,19 @@ func (tb *Table) releasePartsToMerge(pws []*partWrapper) { // mergeParts merges pws to a single resulting part. // +// It is expected that pws contains at least a single part. +// // Merging is immediately stopped if stopCh is closed. // // If isFinal is set, then the resulting part will be stored to disk. +// If at least a single source part at pws is stored on disk, then the resulting part +// will be stored to disk. // // All the parts inside pws must have isInMerge field set to true. // The isInMerge field inside pws parts is set to false before returning from the function. func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error { if len(pws) == 0 { - // Nothing to merge. - return errNothingToMerge + logger.Panicf("BUG: empty pws cannot be passed to mergeParts()") } assertIsInMerge(pws) @@ -1270,13 +1310,8 @@ func areAllInmemoryParts(pws []*partWrapper) bool { func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) { // Atomically unregister old parts and add new part to tb. - m := make(map[*partWrapper]bool, len(pws)) - for _, pw := range pws { - m[pw] = true - } - if len(m) != len(pws) { - logger.Panicf("BUG: %d duplicate parts found when merging %d parts", len(pws)-len(m), len(pws)) - } + m := makeMapFromPartWrappers(pws) + removedInmemoryParts := 0 removedFileParts := 0 @@ -1287,12 +1322,13 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst switch dstPartType { case partInmemory: tb.inmemoryParts = append(tb.inmemoryParts, pwNew) + tb.startInmemoryPartsMergerLocked() case partFile: tb.fileParts = append(tb.fileParts, pwNew) + tb.startFilePartsMergerLocked() default: logger.Panicf("BUG: unknown partType=%d", dstPartType) } - tb.notifyBackgroundMergers() // Atomically store the updated list of file-based parts on disk. // This must be performed under partsLock in order to prevent from races @@ -1305,10 +1341,16 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst // Update inmemoryPartsLimitCh accordingly to the number of the remaining in-memory parts. for i := 0; i < removedInmemoryParts; i++ { - <-tb.inmemoryPartsLimitCh + select { + case <-tb.inmemoryPartsLimitCh: + case <-tb.stopCh: + } } if dstPartType == partInmemory { - tb.inmemoryPartsLimitCh <- struct{}{} + select { + case tb.inmemoryPartsLimitCh <- struct{}{}: + case <-tb.stopCh: + } } removedParts := removedInmemoryParts + removedFileParts @@ -1324,6 +1366,17 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst } } +func makeMapFromPartWrappers(pws []*partWrapper) map[*partWrapper]struct{} { + m := make(map[*partWrapper]struct{}, len(pws)) + for _, pw := range pws { + m[pw] = struct{}{} + } + if len(m) != len(pws) { + logger.Panicf("BUG: %d duplicate parts found in %d source parts", len(pws)-len(m), len(pws)) + } + return m +} + func getPartsSize(pws []*partWrapper) uint64 { n := uint64(0) for _, pw := range pws { @@ -1363,19 +1416,6 @@ func (tb *Table) nextMergeIdx() uint64 { return atomic.AddUint64(&tb.mergeIdx, 1) } -var mergeWorkersLimitCh = make(chan struct{}, getWorkersCount()) - -func getWorkersCount() int { - n := cgroup.AvailableCPUs() - if n < 4 { - // Allow at least 4 merge workers on systems with small CPUs count - // in order to guarantee that background merges can be continued - // when multiple workers are busy with big merges. - n = 4 - } - return n -} - func mustOpenParts(path string) []*partWrapper { // The path can be missing after restoring from backup, so create it if needed. fs.MustMkdirIfNotExist(path) @@ -1468,7 +1508,7 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error { } // Flush inmemory items to disk. - tb.flushInmemoryItems() + tb.flushInmemoryItemsToFiles() fs.MustMkdirFailIfExist(dstDir) @@ -1553,33 +1593,50 @@ func mustReadPartNames(srcDir string) []string { // getPartsToMerge returns optimal parts to merge from pws. // -// if isFinal is set, then merge harder. -// // The summary size of the returned parts must be smaller than the maxOutBytes. -func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) []*partWrapper { +func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64) []*partWrapper { pwsRemaining := make([]*partWrapper, 0, len(pws)) for _, pw := range pws { if !pw.isInMerge { pwsRemaining = append(pwsRemaining, pw) } } - maxPartsToMerge := defaultPartsToMerge - var dst []*partWrapper - if isFinal { - for len(dst) == 0 && maxPartsToMerge >= finalPartsToMerge { - dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) - maxPartsToMerge-- - } - } else { - dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) - } - for _, pw := range dst { + + pwsToMerge := appendPartsToMerge(nil, pwsRemaining, defaultPartsToMerge, maxOutBytes) + + for _, pw := range pwsToMerge { if pw.isInMerge { - logger.Panicf("BUG: partWrapper.isInMerge is already set") + logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to true") } pw.isInMerge = true } - return dst + + return pwsToMerge +} + +// getPartsForOptimalMerge returns parts from pws for optimal merge, plus the remaining parts. +// +// the pws items are replaced by nil after the call. This is needed for helping Go GC to reclaim the referenced items. +func getPartsForOptimalMerge(pws []*partWrapper) ([]*partWrapper, []*partWrapper) { + pwsToMerge := appendPartsToMerge(nil, pws, defaultPartsToMerge, math.MaxUint64) + if len(pwsToMerge) == 0 { + return pws, nil + } + + m := makeMapFromPartWrappers(pwsToMerge) + pwsRemaining := make([]*partWrapper, 0, len(pws)-len(pwsToMerge)) + for _, pw := range pws { + if _, ok := m[pw]; !ok { + pwsRemaining = append(pwsRemaining, pw) + } + } + + // Clear references to pws items, so they could be reclaimed faster by Go GC. + for i := range pws { + pws[i] = nil + } + + return pwsToMerge, pwsRemaining } // minMergeMultiplier is the minimum multiplier for the size of the output part @@ -1590,8 +1647,7 @@ func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) []*pa // The 1.7 is good enough for production workloads. const minMergeMultiplier = 1.7 -// appendPartsToMerge finds optimal parts to merge from src, appends -// them to dst and returns the result. +// appendPartsToMerge finds optimal parts to merge from src, appends them to dst and returns the result. func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxOutBytes uint64) []*partWrapper { if len(src) < 2 { // There is no need in merging zero or one part :) @@ -1671,10 +1727,10 @@ func sortPartsForOptimalMerge(pws []*partWrapper) { }) } -func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*partWrapper, int) { +func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) { dst := pws[:0] for _, pw := range pws { - if !partsToRemove[pw] { + if _, ok := partsToRemove[pw]; !ok { dst = append(dst, pw) } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 66866a309..108d89f63 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -29,15 +29,15 @@ import ( // This time shouldn't exceed a few days. const maxBigPartSize = 1e12 -// The maximum number of inmemory parts in the partition. +// The maximum number of inmemory parts per partition. // -// If the number of inmemory parts reaches this value, then assisted merge runs during data ingestion. -const maxInmemoryPartsPerPartition = 20 - -// The maximum number of small parts in the partition. +// This limit allows reducing querying CPU usage under high ingestion rate. +// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 // -// If the number of small parts reaches this value, then assisted merge runs during data ingestion. -const maxSmallPartsPerPartition = 30 +// This number may be reached when the insertion pace outreaches merger pace. +// If this number is reached, then the data ingestion is paused until background +// mergers reduce the number of parts below this number. +const maxInmemoryParts = 60 // Default number of parts to merge at once. // @@ -45,17 +45,10 @@ const maxSmallPartsPerPartition = 30 // See appendPartsToMerge tests for details. const defaultPartsToMerge = 15 -// The final number of parts to merge at once. -// -// It must be smaller than defaultPartsToMerge. -// Lower value improves select performance at the cost of increased -// write amplification. -const finalPartsToMerge = 3 - // The number of shards for rawRow entries per partition. // // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. -var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 1) / 2 +var rawRowsShardsPerPartition = cgroup.AvailableCPUs() // The interval for flushing buffered rows into parts, so they become visible to search. const pendingRowsFlushInterval = time.Second @@ -76,10 +69,10 @@ func SetDataFlushInterval(d time.Duration) { } } -// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet. +// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet per earh rawRowsShard. func getMaxRawRowsPerShard() int { maxRawRowsPerPartitionOnce.Do(func() { - n := memory.Allowed() / rawRowsShardsPerPartition / 256 / int(unsafe.Sizeof(rawRow{})) + n := memory.Allowed() / rawRowsShardsPerPartition / (100 * int(unsafe.Sizeof(rawRow{}))) if n < 1e4 { n = 1e4 } @@ -117,13 +110,13 @@ type partition struct { smallRowsDeleted uint64 bigRowsDeleted uint64 - inmemoryAssistedMergesCount uint64 - smallAssistedMergesCount uint64 - mergeIdx uint64 + // the path to directory with smallParts. smallPartsPath string - bigPartsPath string + + // the path to directory with bigParts. + bigPartsPath string // The parent storage. s *Storage @@ -135,28 +128,34 @@ type partition struct { tr TimeRange // rawRows contains recently added rows that haven't been converted into parts yet. - // rawRows are periodically converted into inmemroyParts. - // rawRows aren't used in search for performance reasons. + // + // rawRows are converted into inmemoryParts on every pendingRowsFlushInterval or when rawRows becomes full. + // + // rawRows aren't visible for search due to performance reasons. rawRows rawRowsShards // partsLock protects inmemoryParts, smallParts and bigParts. partsLock sync.Mutex - // Contains inmemory parts with recently ingested data. + // Contains inmemory parts with recently ingested data, which are visible for search. inmemoryParts []*partWrapper - // Contains file-based parts with small number of items. + // Contains file-based parts with small number of items, which are visible for search. smallParts []*partWrapper - // Contains file-based parts with big number of items. + // Contains file-based parts with big number of items, which are visible for search. bigParts []*partWrapper - // This channel is used for signaling the background mergers that there are parts, - // which may need to be merged. - needMergeCh chan struct{} - + // stopCh is used for notifying all the background workers to stop. + // + // It must be closed under partsLock in order to prevent from calling wg.Add() + // after stopCh is closed. stopCh chan struct{} + // wg is used for waiting for all the background workers to stop. + // + // wg.Add() must be called under partsLock after checking whether stopCh isn't closed. + // This should prevent from calling wg.Add() after stopCh is closed and wg.Wait() is called. wg sync.WaitGroup } @@ -233,9 +232,13 @@ func mustCreatePartition(timestamp int64, smallPartitionsPath, bigPartitionsPath } func (pt *partition) startBackgroundWorkers() { - pt.startMergeWorkers() - pt.startInmemoryPartsFlusher() + // Start file parts mergers, so they could start merging unmerged parts if needed. + // There is no need in starting in-memory parts mergers, since there are no in-memory parts yet. + pt.startSmallPartsMergers() + pt.startBigPartsMergers() + pt.startPendingRowsFlusher() + pt.startInmemoryPartsFlusher() pt.startStalePartsRemover() } @@ -281,24 +284,17 @@ func mustOpenPartition(smallPartsPath, bigPartsPath string, s *Storage) *partiti } pt.startBackgroundWorkers() - // Wake up a single background merger, so it could start merging parts if needed. - pt.notifyBackgroundMergers() - return pt } func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partition { p := &partition{ name: name, + mergeIdx: uint64(time.Now().UnixNano()), smallPartsPath: smallPartsPath, bigPartsPath: bigPartsPath, - - s: s, - - mergeIdx: uint64(time.Now().UnixNano()), - needMergeCh: make(chan struct{}, cgroup.AvailableCPUs()), - - stopCh: make(chan struct{}), + s: s, + stopCh: make(chan struct{}), } p.rawRows.init() return p @@ -349,9 +345,6 @@ type partitionMetrics struct { InmemoryPartsRefCount uint64 SmallPartsRefCount uint64 BigPartsRefCount uint64 - - InmemoryAssistedMergesCount uint64 - SmallAssistedMergesCount uint64 } // TotalRowsCount returns total number of rows in tm. @@ -414,9 +407,6 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.InmemoryRowsDeleted += atomic.LoadUint64(&pt.inmemoryRowsDeleted) m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted) m.BigRowsDeleted += atomic.LoadUint64(&pt.bigRowsDeleted) - - m.InmemoryAssistedMergesCount += atomic.LoadUint64(&pt.inmemoryAssistedMergesCount) - m.SmallAssistedMergesCount += atomic.LoadUint64(&pt.smallAssistedMergesCount) } // AddRows adds the given rows to the partition pt. @@ -519,16 +509,8 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { rrs.mu.Unlock() if rrb != nil { - pt.flushRowsToParts(rrb.rows) + pt.flushRowsToInmemoryParts(rrb.rows) putRawRowsBlock(rrb) - - // Run assisted merges if needed. - flushConcurrencyCh <- struct{}{} - pt.assistedMergeForInmemoryParts() - pt.assistedMergeForSmallParts() - // There is no need in assisted merges for big parts, - // since the bottleneck is possible only at inmemory and small parts. - <-flushConcurrencyCh } return rows @@ -560,10 +542,12 @@ func putRawRowsBlock(rrb *rawRowsBlock) { var rawRowsBlockPool sync.Pool -func (pt *partition) flushRowsToParts(rows []rawRow) { +func (pt *partition) flushRowsToInmemoryParts(rows []rawRow) { if len(rows) == 0 { return } + + // Merge rows into in-memory parts. maxRows := getMaxRawRowsPerShard() var pwsLock sync.Mutex pws := make([]*partWrapper, 0, (len(rows)+maxRows-1)/maxRows) @@ -574,111 +558,155 @@ func (pt *partition) flushRowsToParts(rows []rawRow) { n = len(rows) } wg.Add(1) - flushConcurrencyCh <- struct{}{} + inmemoryPartsConcurrencyCh <- struct{}{} go func(rowsChunk []rawRow) { defer func() { - <-flushConcurrencyCh + <-inmemoryPartsConcurrencyCh wg.Done() }() + pw := pt.createInmemoryPart(rowsChunk) - if pw == nil { - return + if pw != nil { + pwsLock.Lock() + pws = append(pws, pw) + pwsLock.Unlock() } - pwsLock.Lock() - pws = append(pws, pw) - pwsLock.Unlock() }(rows[:n]) rows = rows[n:] } wg.Wait() putWaitGroup(wg) - pt.partsLock.Lock() - pt.inmemoryParts = append(pt.inmemoryParts, pws...) - for range pws { - if !pt.notifyBackgroundMergers() { - break + // Merge pws into a single in-memory part. + maxPartSize := getMaxInmemoryPartSize() + for len(pws) > 1 { + pws = pt.mustMergeInmemoryParts(pws) + + pwsRemaining := pws[:0] + for _, pw := range pws { + if pw.p.size >= maxPartSize { + pt.addToInmemoryParts(pw) + } else { + pwsRemaining = append(pwsRemaining, pw) + } } + pws = pwsRemaining } - pt.partsLock.Unlock() -} - -func (pt *partition) notifyBackgroundMergers() bool { - select { - case pt.needMergeCh <- struct{}{}: - return true - default: - return false + if len(pws) == 1 { + pt.addToInmemoryParts(pws[0]) } } -var flushConcurrencyLimit = func() int { - n := cgroup.AvailableCPUs() - if n < 3 { - // Allow at least 3 concurrent flushers on systems with a single CPU core - // in order to guarantee that in-memory data flushes and background merges can be continued - // when a single flusher is busy with the long merge of big parts, - // while another flusher is busy with the long merge of small parts. - n = 3 - } - return n -}() - -var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) - -func needAssistedMerge(pws []*partWrapper, maxParts int) bool { - if len(pws) < maxParts { - return false - } - return getNotInMergePartsCount(pws) >= defaultPartsToMerge -} - -func (pt *partition) assistedMergeForInmemoryParts() { +func (pt *partition) addToInmemoryParts(pw *partWrapper) { pt.partsLock.Lock() - needMerge := needAssistedMerge(pt.inmemoryParts, maxInmemoryPartsPerPartition) + pt.inmemoryParts = append(pt.inmemoryParts, pw) + pt.startInmemoryPartsMergerLocked() pt.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&pt.inmemoryAssistedMergesCount, 1) - err := pt.mergeInmemoryParts() - if err == nil { - return - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { - return - } - logger.Panicf("FATAL: cannot merge inmemory parts: %s", err) } -func (pt *partition) assistedMergeForSmallParts() { - pt.partsLock.Lock() - needMerge := needAssistedMerge(pt.smallParts, maxSmallPartsPerPartition) - pt.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&pt.smallAssistedMergesCount, 1) - err := pt.mergeExistingParts(false) - if err == nil { - return - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { - return - } - logger.Panicf("FATAL: cannot merge small parts: %s", err) +func (pt *partition) NotifyReadWriteMode() { + pt.startInmemoryPartsMergers() + pt.startSmallPartsMergers() + pt.startBigPartsMergers() } -func getNotInMergePartsCount(pws []*partWrapper) int { - n := 0 - for _, pw := range pws { - if !pw.isInMerge { - n++ +func (pt *partition) inmemoryPartsMerger() { + for { + if atomic.LoadUint32(&pt.s.isReadOnly) != 0 { + return } + maxOutBytes := pt.getMaxBigPartSize() + + pt.partsLock.Lock() + pws := getPartsToMerge(pt.inmemoryParts, maxOutBytes) + pt.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + inmemoryPartsConcurrencyCh <- struct{}{} + err := pt.mergeParts(pws, pt.stopCh, false) + <-inmemoryPartsConcurrencyCh + + if err == nil { + // Try merging additional parts. + continue + } + if errors.Is(err, errForciblyStopped) { + // Nothing to do - finish the merger. + return + } + // Unexpected error. + logger.Panicf("FATAL: unrecoverable error when merging inmemory parts in partition %q: %s", pt.name, err) + } +} + +func (pt *partition) smallPartsMerger() { + for { + if atomic.LoadUint32(&pt.s.isReadOnly) != 0 { + return + } + maxOutBytes := pt.getMaxBigPartSize() + + pt.partsLock.Lock() + pws := getPartsToMerge(pt.smallParts, maxOutBytes) + pt.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + smallPartsConcurrencyCh <- struct{}{} + err := pt.mergeParts(pws, pt.stopCh, false) + <-smallPartsConcurrencyCh + + if err == nil { + // Try merging additional parts. + continue + } + if errors.Is(err, errForciblyStopped) { + // Nothing to do - finish the merger. + return + } + // Unexpected error. + logger.Panicf("FATAL: unrecoverable error when merging small parts at %q: %s", pt.smallPartsPath, err) + } +} + +func (pt *partition) bigPartsMerger() { + for { + if atomic.LoadUint32(&pt.s.isReadOnly) != 0 { + return + } + maxOutBytes := pt.getMaxBigPartSize() + + pt.partsLock.Lock() + pws := getPartsToMerge(pt.bigParts, maxOutBytes) + pt.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + bigPartsConcurrencyCh <- struct{}{} + err := pt.mergeParts(pws, pt.stopCh, false) + <-bigPartsConcurrencyCh + + if err == nil { + // Try merging additional parts. + continue + } + if errors.Is(err, errForciblyStopped) { + // Nothing to do - finish the merger. + return + } + // Unexpected error. + logger.Panicf("FATAL: unrecoverable error when merging big parts at %q: %s", pt.bigPartsPath, err) } - return n } func getWaitGroup() *sync.WaitGroup { @@ -695,6 +723,85 @@ func putWaitGroup(wg *sync.WaitGroup) { var wgPool sync.Pool +func (pt *partition) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper { + var pwsResult []*partWrapper + var pwsResultLock sync.Mutex + wg := getWaitGroup() + for len(pws) > 0 { + pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws) + wg.Add(1) + inmemoryPartsConcurrencyCh <- struct{}{} + go func(pwsChunk []*partWrapper) { + defer func() { + <-inmemoryPartsConcurrencyCh + wg.Done() + }() + + pw := pt.mustMergeInmemoryPartsFinal(pwsChunk) + + pwsResultLock.Lock() + pwsResult = append(pwsResult, pw) + pwsResultLock.Unlock() + }(pwsToMerge) + pws = pwsRemaining + } + wg.Wait() + putWaitGroup(wg) + + return pwsResult +} + +func (pt *partition) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper { + if len(pws) == 0 { + logger.Panicf("BUG: pws must contain at least a single item") + } + if len(pws) == 1 { + // Nothing to merge + return pws[0] + } + + bsrs := make([]*blockStreamReader, 0, len(pws)) + for _, pw := range pws { + if pw.mp == nil { + logger.Panicf("BUG: unexpected file part") + } + bsr := getBlockStreamReader() + bsr.MustInitFromInmemoryPart(pw.mp) + bsrs = append(bsrs, bsr) + } + + // determine flushToDiskDeadline before performing the actual merge, + // in order to guarantee the correct deadline, since the merge may take significant amounts of time. + flushToDiskDeadline := getFlushToDiskDeadline(pws) + + // Prepare blockStreamWriter for destination part. + srcRowsCount := uint64(0) + srcBlocksCount := uint64(0) + for _, bsr := range bsrs { + srcRowsCount += bsr.ph.RowsCount + srcBlocksCount += bsr.ph.BlocksCount + } + rowsPerBlock := float64(srcRowsCount) / float64(srcBlocksCount) + compressLevel := getCompressLevel(rowsPerBlock) + bsw := getBlockStreamWriter() + mpDst := getInmemoryPart() + bsw.MustInitFromInmemoryPart(mpDst, compressLevel) + + // Merge parts. + // The merge shouldn't be interrupted by stopCh, so use nil stopCh. + ph, err := pt.mergePartsInternal("", bsw, bsrs, partInmemory, nil) + putBlockStreamWriter(bsw) + for _, bsr := range bsrs { + putBlockStreamReader(bsr) + } + if err != nil { + logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) + } + mpDst.ph = *ph + + return newPartWrapperFromInmemoryPart(mpDst, flushToDiskDeadline) +} + func (pt *partition) createInmemoryPart(rows []rawRow) *partWrapper { if len(rows) == 0 { return nil @@ -770,27 +877,40 @@ func incRefForParts(pws []*partWrapper) { // // The pt must be detached from table before calling pt.MustClose. func (pt *partition) MustClose() { + // Notify the background workers to stop. + // The pt.partsLock is aquired in order to guarantee that pt.wg.Add() isn't called + // after pt.stopCh is closed and pt.wg.Wait() is called below. + pt.partsLock.Lock() close(pt.stopCh) + pt.partsLock.Unlock() - // Waiting for service workers to stop + // Wait for background workers to stop. pt.wg.Wait() - pt.flushInmemoryRows() + // Flush the remaining in-memory rows to files. + pt.flushInmemoryRowsToFiles() // Remove references from inmemoryParts, smallParts and bigParts, so they may be eventually closed // after all the searches are done. pt.partsLock.Lock() - inmemoryParts := pt.inmemoryParts - smallParts := pt.smallParts - bigParts := pt.bigParts + + if n := pt.rawRows.Len(); n > 0 { + logger.Panicf("BUG: raw rows must be empty at this stage; got %d rows", n) + } + + if n := len(pt.inmemoryParts); n > 0 { + logger.Panicf("BUG: in-memory parts must be empty at this stage; got %d parts", n) + } pt.inmemoryParts = nil + + smallParts := pt.smallParts pt.smallParts = nil + + bigParts := pt.bigParts pt.bigParts = nil + pt.partsLock.Unlock() - for _, pw := range inmemoryParts { - pw.decRef() - } for _, pw := range smallParts { pw.decRef() } @@ -799,10 +919,65 @@ func (pt *partition) MustClose() { } } -func (pt *partition) startInmemoryPartsFlusher() { +func (pt *partition) startInmemoryPartsMergers() { + pt.partsLock.Lock() + for i := 0; i < cap(inmemoryPartsConcurrencyCh); i++ { + pt.startInmemoryPartsMergerLocked() + } + pt.partsLock.Unlock() +} + +func (pt *partition) startInmemoryPartsMergerLocked() { + select { + case <-pt.stopCh: + return + default: + } pt.wg.Add(1) go func() { - pt.inmemoryPartsFlusher() + pt.inmemoryPartsMerger() + pt.wg.Done() + }() +} + +func (pt *partition) startSmallPartsMergers() { + pt.partsLock.Lock() + for i := 0; i < cap(smallPartsConcurrencyCh); i++ { + pt.startSmallPartsMergerLocked() + } + pt.partsLock.Unlock() +} + +func (pt *partition) startSmallPartsMergerLocked() { + select { + case <-pt.stopCh: + return + default: + } + pt.wg.Add(1) + go func() { + pt.smallPartsMerger() + pt.wg.Done() + }() +} + +func (pt *partition) startBigPartsMergers() { + pt.partsLock.Lock() + for i := 0; i < cap(bigPartsConcurrencyCh); i++ { + pt.startBigPartsMergerLocked() + } + pt.partsLock.Unlock() +} + +func (pt *partition) startBigPartsMergerLocked() { + select { + case <-pt.stopCh: + return + default: + } + pt.wg.Add(1) + go func() { + pt.bigPartsMerger() pt.wg.Done() }() } @@ -815,6 +990,56 @@ func (pt *partition) startPendingRowsFlusher() { }() } +func (pt *partition) startInmemoryPartsFlusher() { + pt.wg.Add(1) + go func() { + pt.inmemoryPartsFlusher() + pt.wg.Done() + }() +} + +func (pt *partition) startStalePartsRemover() { + pt.wg.Add(1) + go func() { + pt.stalePartsRemover() + pt.wg.Done() + }() +} + +var ( + inmemoryPartsConcurrencyCh = make(chan struct{}, getInmemoryPartsConcurrency()) + smallPartsConcurrencyCh = make(chan struct{}, getSmallPartsConcurrency()) + bigPartsConcurrencyCh = make(chan struct{}, getBigPartsConcurrency()) +) + +func getInmemoryPartsConcurrency() int { + // The concurrency for processing in-memory parts must equal to the number of CPU cores, + // since these operations are CPU-bound. + return cgroup.AvailableCPUs() +} + +func getSmallPartsConcurrency() int { + n := cgroup.AvailableCPUs() + if n < 4 { + // Allow at least 4 concurrent workers for small parts on systems + // with less than 4 CPU cores in order to be able to make smaller part merges + // when bigger part merges are in progress. + return 4 + } + return n +} + +func getBigPartsConcurrency() int { + n := cgroup.AvailableCPUs() + if n < 4 { + // Allow at least 4 concurrent workers for big parts on systems + // with less than 4 CPU cores in order to be able to make smaller part merges + // when bigger part merges are in progress. + return 4 + } + return n +} + func (pt *partition) inmemoryPartsFlusher() { d := timeutil.AddJitterToDuration(dataFlushInterval) ticker := time.NewTicker(d) @@ -824,7 +1049,7 @@ func (pt *partition) inmemoryPartsFlusher() { case <-pt.stopCh: return case <-ticker.C: - pt.flushInmemoryParts(false) + pt.flushInmemoryPartsToFiles(false) } } } @@ -848,12 +1073,12 @@ func (pt *partition) flushPendingRows(dst []rawRow, isFinal bool) []rawRow { return pt.rawRows.flush(pt, dst, isFinal) } -func (pt *partition) flushInmemoryRows() { - pt.rawRows.flush(pt, nil, true) - pt.flushInmemoryParts(true) +func (pt *partition) flushInmemoryRowsToFiles() { + pt.flushPendingRows(nil, true) + pt.flushInmemoryPartsToFiles(true) } -func (pt *partition) flushInmemoryParts(isFinal bool) { +func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) { currentTime := time.Now() var pws []*partWrapper @@ -866,7 +1091,7 @@ func (pt *partition) flushInmemoryParts(isFinal bool) { } pt.partsLock.Unlock() - if err := pt.mergePartsOptimal(pws, nil); err != nil { + if err := pt.mergePartsToFiles(pws, nil, inmemoryPartsConcurrencyCh); err != nil { logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) } } @@ -875,7 +1100,7 @@ func (rrss *rawRowsShards) flush(pt *partition, dst []rawRow, isFinal bool) []ra for i := range rrss.shards { dst = rrss.shards[i].appendRawRowsToFlush(dst, isFinal) } - pt.flushRowsToParts(dst) + pt.flushRowsToInmemoryParts(dst) return dst } @@ -899,24 +1124,37 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawR return dst } -func (pt *partition) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error { - sortPartsForOptimalMerge(pws) +func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{}, concurrencyCh chan struct{}) error { + pwsLen := len(pws) + + var errGlobal error + var errGlobalLock sync.Mutex + wg := getWaitGroup() for len(pws) > 0 { - n := defaultPartsToMerge - if n > len(pws) { - n = len(pws) - } - pwsChunk := pws[:n] - pws = pws[n:] - err := pt.mergeParts(pwsChunk, stopCh, true) - if err == nil { - continue - } - pt.releasePartsToMerge(pws) - if errors.Is(err, errForciblyStopped) { - return nil - } - return fmt.Errorf("cannot merge parts optimally: %w", err) + pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws) + wg.Add(1) + concurrencyCh <- struct{}{} + go func(pwsChunk []*partWrapper) { + defer func() { + <-concurrencyCh + wg.Done() + }() + + if err := pt.mergeParts(pwsChunk, stopCh, true); err != nil && !errors.Is(err, errForciblyStopped) { + errGlobalLock.Lock() + if errGlobal == nil { + errGlobal = err + } + errGlobalLock.Unlock() + } + }(pwsToMerge) + pws = pwsRemaining + } + wg.Wait() + putWaitGroup(wg) + + if errGlobal != nil { + return fmt.Errorf("cannot merge %d parts optimally: %w", pwsLen, errGlobal) } return nil } @@ -942,7 +1180,7 @@ func (pt *partition) ForceMergeAllParts() error { // If len(pws) == 1, then the merge must run anyway. // This allows applying the configured retention, removing the deleted series // and performing de-duplication if needed. - if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil { + if err := pt.mergePartsToFiles(pws, pt.stopCh, bigPartsConcurrencyCh); err != nil { return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err) } @@ -983,107 +1221,9 @@ func hasActiveMerges(pws []*partWrapper) bool { return false } -var mergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(16)) - -func getDefaultMergeConcurrency(max int) int { - v := (cgroup.AvailableCPUs() + 1) / 2 - if v > max { - v = max - } - return adjustMergeWorkersLimit(v) -} - -// SetMergeWorkersCount sets the maximum number of concurrent mergers for parts. -// -// The function must be called before opening or creating any storage. -func SetMergeWorkersCount(n int) { - if n <= 0 { - // Do nothing - return - } - n = adjustMergeWorkersLimit(n) - mergeWorkersLimitCh = make(chan struct{}, n) -} - -func adjustMergeWorkersLimit(n int) int { - if n < 4 { - // Allow at least 4 merge workers on systems with small CPUs count - // in order to guarantee that background merges can be continued - // when multiple workers are busy with big merges. - n = 4 - } - return n -} - -func (pt *partition) startMergeWorkers() { - // The actual number of concurrent merges is limited inside mergeWorker() below. - for i := 0; i < cap(mergeWorkersLimitCh); i++ { - pt.wg.Add(1) - go func() { - pt.mergeWorker() - pt.wg.Done() - }() - } -} - -func (pt *partition) mergeWorker() { - var lastMergeTime uint64 - isFinal := false - for { - // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers - // across partitions may exceed the the cap(mergeWorkersLimitCh). - mergeWorkersLimitCh <- struct{}{} - err := pt.mergeExistingParts(isFinal) - <-mergeWorkersLimitCh - if err == nil { - // Try merging additional parts. - lastMergeTime = fasttime.UnixTimestamp() - isFinal = false - continue - } - if errors.Is(err, errForciblyStopped) { - // The merger has been stopped. - return - } - if !errors.Is(err, errNothingToMerge) && !errors.Is(err, errReadOnlyMode) { - // Unexpected error. - logger.Panicf("FATAL: unrecoverable error when merging parts in the partition (%q, %q): %s", pt.smallPartsPath, pt.bigPartsPath, err) - } - if finalMergeDelaySeconds > 0 && fasttime.UnixTimestamp()-lastMergeTime > finalMergeDelaySeconds { - // We have free time for merging into bigger parts. - // This should improve select performance. - lastMergeTime = fasttime.UnixTimestamp() - isFinal = true - continue - } - - // Nothing to merge. Wait for the notification of new merge. - select { - case <-pt.stopCh: - return - case <-pt.needMergeCh: - } - } -} - -// Disable final merge by default, since it may lead to high disk IO and CPU usage -// at the beginning of every month when merging data for the previous month. -var finalMergeDelaySeconds = uint64(0) - -// SetFinalMergeDelay sets the delay before doing final merge for partitions without newly ingested data. -// -// This function may be called only before Storage initialization. -func SetFinalMergeDelay(delay time.Duration) { - if delay <= 0 { - return - } - finalMergeDelaySeconds = uint64(delay.Seconds() + 1) - mergeset.SetFinalMergeDelay(delay) -} - func getMaxInmemoryPartSize() uint64 { // Allocate 10% of allowed memory for in-memory parts. - n := uint64(0.1 * float64(memory.Allowed()) / maxInmemoryPartsPerPartition) + n := uint64(0.1 * float64(memory.Allowed()) / maxInmemoryParts) if n < 1e6 { n = 1e6 } @@ -1103,7 +1243,7 @@ func (pt *partition) getMaxSmallPartSize() uint64 { n = 10e6 } // Make sure the output part fits available disk space for small parts. - sizeLimit := getMaxOutBytes(pt.smallPartsPath, cap(mergeWorkersLimitCh)) + sizeLimit := getMaxOutBytes(pt.smallPartsPath, cap(smallPartsConcurrencyCh)) if n > sizeLimit { n = sizeLimit } @@ -1130,41 +1270,6 @@ func getMaxOutBytes(path string, workersCount int) uint64 { return maxOutBytes } -func (pt *partition) canBackgroundMerge() bool { - return atomic.LoadUint32(&pt.s.isReadOnly) == 0 -} - -var errReadOnlyMode = fmt.Errorf("storage is in readonly mode") - -func (pt *partition) mergeInmemoryParts() error { - maxOutBytes := pt.getMaxBigPartSize() - - pt.partsLock.Lock() - pws := getPartsToMerge(pt.inmemoryParts, maxOutBytes, false) - pt.partsLock.Unlock() - - return pt.mergeParts(pws, pt.stopCh, false) -} - -func (pt *partition) mergeExistingParts(isFinal bool) error { - if !pt.canBackgroundMerge() { - // Do not perform merge in read-only mode, since this may result in disk space shortage. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603 - return errReadOnlyMode - } - maxOutBytes := pt.getMaxBigPartSize() - - pt.partsLock.Lock() - dst := make([]*partWrapper, 0, len(pt.inmemoryParts)+len(pt.smallParts)+len(pt.bigParts)) - dst = append(dst, pt.inmemoryParts...) - dst = append(dst, pt.smallParts...) - dst = append(dst, pt.bigParts...) - pws := getPartsToMerge(dst, maxOutBytes, isFinal) - pt.partsLock.Unlock() - - return pt.mergeParts(pws, pt.stopCh, isFinal) -} - func assertIsInMerge(pws []*partWrapper) { for _, pw := range pws { if !pw.isInMerge { @@ -1184,8 +1289,6 @@ func (pt *partition) releasePartsToMerge(pws []*partWrapper) { pt.partsLock.Unlock() } -var errNothingToMerge = fmt.Errorf("nothing to merge") - func (pt *partition) runFinalDedup() error { requiredDedupInterval, actualDedupInterval := pt.getRequiredDedupInterval() t := time.Now() @@ -1227,16 +1330,19 @@ func getMinDedupInterval(pws []*partWrapper) int64 { // mergeParts merges pws to a single resulting part. // +// It is expected that pws contains at least a single part. +// // Merging is immediately stopped if stopCh is closed. // // if isFinal is set, then the resulting part will be saved to disk. +// If at least a single source part at pws is stored on disk, then the resulting part +// will be stored to disk. // // All the parts inside pws must have isInMerge field set to true. // The isInMerge field inside pws parts is set to false before returning from the function. func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error { if len(pws) == 0 { - // Nothing to merge. - return errNothingToMerge + logger.Panicf("BUG: empty pws cannot be passed to mergeParts()") } assertIsInMerge(pws) @@ -1473,13 +1579,8 @@ func areAllInmemoryParts(pws []*partWrapper) bool { func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) { // Atomically unregister old parts and add new part to pt. - m := make(map[*partWrapper]bool, len(pws)) - for _, pw := range pws { - m[pw] = true - } - if len(m) != len(pws) { - logger.Panicf("BUG: %d duplicate parts found when merging %d parts", len(pws)-len(m), len(pws)) - } + m := makeMapFromPartWrappers(pws) + removedInmemoryParts := 0 removedSmallParts := 0 removedBigParts := 0 @@ -1493,14 +1594,16 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, switch dstPartType { case partInmemory: pt.inmemoryParts = append(pt.inmemoryParts, pwNew) + pt.startInmemoryPartsMergerLocked() case partSmall: pt.smallParts = append(pt.smallParts, pwNew) + pt.startSmallPartsMergerLocked() case partBig: pt.bigParts = append(pt.bigParts, pwNew) + pt.startBigPartsMergerLocked() default: logger.Panicf("BUG: unknown partType=%d", dstPartType) } - pt.notifyBackgroundMergers() } // Atomically store the updated list of file-based parts on disk. @@ -1549,10 +1652,10 @@ func (pt *partition) nextMergeIdx() uint64 { return atomic.AddUint64(&pt.mergeIdx, 1) } -func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*partWrapper, int) { +func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) { dst := pws[:0] for _, pw := range pws { - if !partsToRemove[pw] { + if _, ok := partsToRemove[pw]; !ok { dst = append(dst, pw) } } @@ -1562,14 +1665,6 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*pa return dst, len(pws) - len(dst) } -func (pt *partition) startStalePartsRemover() { - pt.wg.Add(1) - go func() { - pt.stalePartsRemover() - pt.wg.Done() - }() -} - func (pt *partition) stalePartsRemover() { d := timeutil.AddJitterToDuration(7 * time.Minute) ticker := time.NewTicker(d) @@ -1619,30 +1714,49 @@ func (pt *partition) removeStaleParts() { // getPartsToMerge returns optimal parts to merge from pws. // // The summary size of the returned parts must be smaller than maxOutBytes. -func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) []*partWrapper { +func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64) []*partWrapper { pwsRemaining := make([]*partWrapper, 0, len(pws)) for _, pw := range pws { if !pw.isInMerge { pwsRemaining = append(pwsRemaining, pw) } } - maxPartsToMerge := defaultPartsToMerge - var pms []*partWrapper - if isFinal { - for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge { - pms = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) - maxPartsToMerge-- - } - } else { - pms = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) - } - for _, pw := range pms { + + pwsToMerge := appendPartsToMerge(nil, pwsRemaining, defaultPartsToMerge, maxOutBytes) + + for _, pw := range pwsToMerge { if pw.isInMerge { logger.Panicf("BUG: partWrapper.isInMerge cannot be set") } pw.isInMerge = true } - return pms + + return pwsToMerge +} + +// getPartsForOptimalMerge returns parts from pws for optimal merge, plus the remaining parts. +// +// the pws items are replaced by nil after the call. This is needed for helping Go GC to reclaim the referenced items. +func getPartsForOptimalMerge(pws []*partWrapper) ([]*partWrapper, []*partWrapper) { + pwsToMerge := appendPartsToMerge(nil, pws, defaultPartsToMerge, 1<<64-1) + if len(pwsToMerge) == 0 { + return pws, nil + } + + m := makeMapFromPartWrappers(pwsToMerge) + pwsRemaining := make([]*partWrapper, 0, len(pws)-len(pwsToMerge)) + for _, pw := range pws { + if _, ok := m[pw]; !ok { + pwsRemaining = append(pwsRemaining, pw) + } + } + + // Clear references to pws items, so they could be reclaimed faster by Go GC. + for i := range pws { + pws[i] = nil + } + + return pwsToMerge, pwsRemaining } // minMergeMultiplier is the minimum multiplier for the size of the output part @@ -1736,6 +1850,17 @@ func sortPartsForOptimalMerge(pws []*partWrapper) { }) } +func makeMapFromPartWrappers(pws []*partWrapper) map[*partWrapper]struct{} { + m := make(map[*partWrapper]struct{}, len(pws)) + for _, pw := range pws { + m[pw] = struct{}{} + } + if len(m) != len(pws) { + logger.Panicf("BUG: %d duplicate parts found in %d source parts", len(pws)-len(m), len(pws)) + } + return m +} + func getPartsSize(pws []*partWrapper) uint64 { n := uint64(0) for _, pw := range pws { @@ -1809,7 +1934,7 @@ func (pt *partition) MustCreateSnapshotAt(smallPath, bigPath string) { startTime := time.Now() // Flush inmemory data to disk. - pt.flushInmemoryRows() + pt.flushInmemoryRowsToFiles() pt.partsLock.Lock() incRefForParts(pt.smallParts) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index d4b44b043..c6e59e165 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -663,7 +663,8 @@ func (s *Storage) startFreeDiskSpaceWatcher() { // Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization // when the storage isn't in read-only mode. if atomic.LoadUint32(&s.isReadOnly) == 1 && atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) { - logger.Warnf("enabling writing to the storage at %s, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left", + s.notifyReadWriteMode() + logger.Warnf("switching the storage at %s to read-write mode, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left", s.path, freeDiskSpaceLimitBytes, freeSpaceBytes) } } @@ -685,6 +686,16 @@ func (s *Storage) startFreeDiskSpaceWatcher() { }() } +func (s *Storage) notifyReadWriteMode() { + s.tb.NotifyReadWriteMode() + + idb := s.idb() + idb.tb.NotifyReadWriteMode() + idb.doExtDB(func(extDB *indexDB) { + extDB.tb.NotifyReadWriteMode() + }) +} + func (s *Storage) startRetentionWatcher() { s.retentionWatcherWG.Add(1) go func() { diff --git a/lib/storage/table.go b/lib/storage/table.go index d3b2efbf3..1dafbbc87 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -205,6 +205,14 @@ func (tb *table) flushPendingRows() { } } +func (tb *table) NotifyReadWriteMode() { + tb.ptwsLock.Lock() + for _, ptw := range tb.ptws { + ptw.pt.NotifyReadWriteMode() + } + tb.ptwsLock.Unlock() +} + // TableMetrics contains essential metrics for the table. type TableMetrics struct { partitionMetrics