diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index e9d228625..fbd4ffe23 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -38,13 +38,10 @@ var ( // DataPath is a path to storage data. DataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to storage data") - finalMergeDelay = flag.Duration("finalMergeDelay", 0, "The delay before starting final merge for per-month partition after no new data is ingested into it. "+ - "Final merge may require additional disk IO and CPU resources. Final merge may increase query speed and reduce disk space usage in some cases. "+ - "Zero value disables final merge") - _ = flag.Int("bigMergeConcurrency", 0, "Deprecated: this flag does nothing. Please use -smallMergeConcurrency "+ - "for controlling the concurrency of background merges. See https://docs.victoriametrics.com/#storage") - smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of workers for background merges. See https://docs.victoriametrics.com/#storage . "+ - "It isn't recommended tuning this flag in general case, since this may lead to uncontrolled increase in the number of parts and increased CPU usage during queries") + _ = flag.Duration("finalMergeDelay", 0, "Deprecated: this flag does nothing") + _ = flag.Int("bigMergeConcurrency", 0, "Deprecated: this flag does nothing") + _ = flag.Int("smallMergeConcurrency", 0, "Deprecated: this flag does nothing") + retentionTimezoneOffset = flag.Duration("retentionTimezoneOffset", 0, "The offset for performing indexdb rotation. "+ "If set to 0, then the indexdb rotation is performed at 4am UTC time per each -retentionPeriod. "+ "If set to 2h, then the indexdb rotation is performed at 4am EET time (the timezone with +2h offset)") @@ -96,8 +93,6 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) { resetResponseCacheIfNeeded = resetCacheIfNeeded storage.SetLogNewSeries(*logNewSeries) - storage.SetFinalMergeDelay(*finalMergeDelay) - storage.SetMergeWorkersCount(*smallMergeConcurrency) storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset) storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N) storage.SetTSIDCacheSize(cacheSizeStorageTSID.IntN()) @@ -496,11 +491,8 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) { metrics.WriteCounterUint64(w, `vm_composite_filter_success_conversions_total`, idbm.CompositeFilterSuccessConversions) metrics.WriteCounterUint64(w, `vm_composite_filter_missing_conversions_total`, idbm.CompositeFilterMissingConversions) - metrics.WriteCounterUint64(w, `vm_assisted_merges_total{type="storage/inmemory"}`, tm.InmemoryAssistedMergesCount) - metrics.WriteCounterUint64(w, `vm_assisted_merges_total{type="storage/small"}`, tm.SmallAssistedMergesCount) - - metrics.WriteCounterUint64(w, `vm_assisted_merges_total{type="indexdb/inmemory"}`, idbm.InmemoryAssistedMergesCount) - metrics.WriteCounterUint64(w, `vm_assisted_merges_total{type="indexdb/file"}`, idbm.FileAssistedMergesCount) + // vm_assisted_merges_total name is used for backwards compatibility. + metrics.WriteCounterUint64(w, `vm_assisted_merges_total{type="indexdb/inmemory"}`, idbm.InmemoryPartsLimitReachedCount) metrics.WriteCounterUint64(w, `vm_indexdb_items_added_total`, idbm.ItemsAdded) metrics.WriteCounterUint64(w, `vm_indexdb_items_added_size_bytes_total`, idbm.ItemsAddedSizeBytes) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 96d6cdccc..063354ebc 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -31,6 +31,7 @@ The sandbox cluster installation is running under the constant load generated by * SECURITY: upgrade Go builder from Go1.21.5 to Go1.21.6. See [the list of issues addressed in Go1.21.6](https://github.com/golang/go/issues?q=milestone%3AGo1.21.6+label%3ACherryPickApproved). * FEATURE: improve new [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series) registration speed on systems with high number of CPU cores. Thanks to @misutoth for the initial idea and [implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212). +* FEATURE: make [background merge](https://docs.victoriametrics.com/#storage) more responsive and scalable. This should help the following issues: [5190](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5190), [3425](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3425), [648](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for discovering [Hetzner Cloud](https://www.hetzner.com/cloud) and [Hetzner Robot](https://docs.hetzner.com/robot) scrape targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3154) and [these docs](https://docs.victoriametrics.com/sd_configs.html#hetzner_sd_configs). * FEATURE: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): add support for negative index in `groupByNode` and `aliasByNode` functions. Thanks to @rbizos for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5581). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 5c692a5ff..b086531b9 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "os" "path/filepath" "sort" @@ -28,16 +29,9 @@ import ( // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 // // This number may be reached when the insertion pace outreaches merger pace. -// If this number is reached, then assisted merges are performed -// during data ingestion. -const maxInmemoryParts = 15 - -// maxFileParts is the maximum number of file parts in the table. -// -// This number may be reached when the insertion pace outreaches merger pace. -// If this number is reached, then assisted merges are performed -// during data ingestion. -const maxFileParts = 30 +// If this number is reached, then the data ingestion is paused until background +// mergers reduce the number of parts below this number. +const maxInmemoryParts = 30 // Default number of parts to merge at once. // @@ -45,13 +39,6 @@ const maxFileParts = 30 // See appendPartsToMerge tests for details. const defaultPartsToMerge = 15 -// The final number of parts to merge at once. -// -// It must be smaller than defaultPartsToMerge. -// Lower value improves select performance at the cost of increased -// write amplification. -const finalPartsToMerge = 2 - // maxPartSize is the maximum part size in bytes. // // This number should be limited by the amount of time required to merge parts of this summary size. @@ -110,18 +97,16 @@ type Table struct { inmemoryItemsMerged uint64 fileItemsMerged uint64 - inmemoryAssistedMergesCount uint64 - fileAssistedMergesCount uint64 - itemsAdded uint64 itemsAddedSizeBytes uint64 + inmemoryPartsLimitReachedCount uint64 + mergeIdx uint64 path string flushCallback func() - flushCallbackWorkerWG sync.WaitGroup needFlushCallbackCall uint32 prepareBlock PrepareBlockCallback @@ -129,32 +114,38 @@ type Table struct { // rawItems contains recently added items that haven't been converted to parts yet. // - // rawItems aren't used in search for performance reasons + // rawItems are converted to inmemoryParts at least every pendingItemsFlushInterval or when rawItems becomes full. + // + // rawItems aren't visible for search due to performance reasons. rawItems rawItemsShards // partsLock protects inmemoryParts and fileParts. partsLock sync.Mutex - // inmemoryParts contains inmemory parts. + // inmemoryParts contains inmemory parts, which are visible for search. inmemoryParts []*partWrapper - // inmemoryPartsLimitCh limits the number of inmemory parts + // fileParts contains file-backed parts, which are visible for search. + fileParts []*partWrapper + + // inmemoryPartsLimitCh limits the number of inmemory parts to maxInmemoryParts // in order to prevent from data ingestion slowdown as described at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 inmemoryPartsLimitCh chan struct{} - // fileParts contains file-backed parts. - fileParts []*partWrapper - - // This channel is used for signaling the background mergers that there are parts, - // which may need to be merged. - needMergeCh chan struct{} - + // stopCh is used for notifying all the background workers to stop. + // + // It must be closed under partsLock in order to prevent from calling wg.Add() + // after stopCh is closed. stopCh chan struct{} + // wg is used for waiting for all the background workers to stop. + // + // wg.Add() must be called under partsLock after checking whether stopCh isn't closed. + // This should prevent from calling wg.Add() after stopCh is closed and wg.Wait() is called. wg sync.WaitGroup // Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines. - rawItemsPendingFlushesWG syncwg.WaitGroup + flushPendingItemsWG syncwg.WaitGroup } type rawItemsShards struct { @@ -173,7 +164,7 @@ var rawItemsShardsPerTable = func() int { if multiplier > 16 { multiplier = 16 } - return (cpus*multiplier + 1) / 2 + return cpus * multiplier }() const maxBlocksPerShard = 256 @@ -233,8 +224,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { ris.mu.Lock() ibs := ris.ibs if len(ibs) == 0 { - ib := &nmemoryBlock{} - ibs = append(ibs, ib) + ibs = append(ibs, &inmemoryBlock{}) ris.ibs = ibs } ib := ibs[len(ibs)-1] @@ -243,23 +233,23 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { continue } if len(ibs) >= maxBlocksPerShard { - ibsToFlush = ibs + ibsToFlush = append(ibsToFlush, ibs...) ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard) tailItems = items[i:] atomic.StoreUint64(&ris.lastFlushTime, fasttime.UnixTimestamp()) break } - ib = &nmemoryBlock{} + ib = &inmemoryBlock{} if ib.Add(item) { ibs = append(ibs, ib) continue } - logger.Panicf("BUG: cannot insert too big item into an empty inmemoryBlock len(item)=%d; the caller should be responsible for avoiding too big items", len(item)) + logger.Panicf("BUG: cannot insert too big item into an empty inmemoryBlock; len(item)=%d; the caller should be responsible for avoiding too big items", len(item)) } ris.ibs = ibs ris.mu.Unlock() - tb.flushBlocksToParts(ibsToFlush, false) + tb.flushBlocksToInmemoryParts(ibsToFlush, false) return tailItems } @@ -332,74 +322,169 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC pws := mustOpenParts(path) tb := &Table{ + mergeIdx: uint64(time.Now().UnixNano()), path: path, flushCallback: flushCallback, prepareBlock: prepareBlock, isReadOnly: isReadOnly, - inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts), fileParts: pws, - mergeIdx: uint64(time.Now().UnixNano()), - needMergeCh: make(chan struct{}, 1), + inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts), stopCh: make(chan struct{}), } tb.rawItems.init() tb.startBackgroundWorkers() - // Wake up a single background merger, so it could start merging parts if needed. - tb.notifyBackgroundMergers() - - if flushCallback != nil { - tb.flushCallbackWorkerWG.Add(1) - go func() { - // call flushCallback once per 10 seconds in order to improve the effectiveness of caches, - // which are reset by the flushCallback. - d := timeutil.AddJitterToDuration(time.Second * 10) - tc := time.NewTicker(d) - for { - select { - case <-tb.stopCh: - tb.flushCallback() - tb.flushCallbackWorkerWG.Done() - return - case <-tc.C: - if atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 1, 0) { - tb.flushCallback() - } - } - } - }() - } - return tb } func (tb *Table) startBackgroundWorkers() { - tb.startMergeWorkers() - tb.startInmemoryPartsFlusher() + // Start file parts mergers, so they could start merging unmerged parts if needed. + // There is no need in starting in-memory parts mergers, since there are no in-memory parts yet. + tb.startFilePartsMergers() + tb.startPendingItemsFlusher() + tb.startInmemoryPartsFlusher() + tb.startFlushCallbackWorker() +} + +func (tb *Table) startInmemoryPartsMergers() { + tb.partsLock.Lock() + for i := 0; i < cap(inmemoryPartsConcurrencyCh); i++ { + tb.startInmemoryPartsMergerLocked() + } + tb.partsLock.Unlock() +} + +func (tb *Table) startInmemoryPartsMergerLocked() { + select { + case <-tb.stopCh: + return + default: + } + tb.wg.Add(1) + go func() { + tb.inmemoryPartsMerger() + tb.wg.Done() + }() +} + +func (tb *Table) startFilePartsMergers() { + tb.partsLock.Lock() + for i := 0; i < cap(filePartsConcurrencyCh); i++ { + tb.startFilePartsMergerLocked() + } + tb.partsLock.Unlock() +} + +func (tb *Table) startFilePartsMergerLocked() { + select { + case <-tb.stopCh: + return + default: + } + tb.wg.Add(1) + go func() { + tb.filePartsMerger() + tb.wg.Done() + }() +} + +func (tb *Table) startPendingItemsFlusher() { + tb.wg.Add(1) + go func() { + tb.pendingItemsFlusher() + tb.wg.Done() + }() +} + +func (tb *Table) startInmemoryPartsFlusher() { + tb.wg.Add(1) + go func() { + tb.inmemoryPartsFlusher() + tb.wg.Done() + }() +} + +func (tb *Table) startFlushCallbackWorker() { + if tb.flushCallback == nil { + return + } + + tb.wg.Add(1) + go func() { + // call flushCallback once per 10 seconds in order to improve the effectiveness of caches, + // which are reset by the flushCallback. + d := timeutil.AddJitterToDuration(time.Second * 10) + tc := time.NewTicker(d) + for { + select { + case <-tb.stopCh: + tb.flushCallback() + tb.wg.Done() + return + case <-tc.C: + if atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 1, 0) { + tb.flushCallback() + } + } + } + }() +} + +var ( + inmemoryPartsConcurrencyCh = make(chan struct{}, getInmemoryPartsConcurrency()) + filePartsConcurrencyCh = make(chan struct{}, getFilePartsConcurrency()) +) + +func getInmemoryPartsConcurrency() int { + // The concurrency for processing in-memory parts must equal to the number of CPU cores, + // since these operations are CPU-bound. + return cgroup.AvailableCPUs() +} + +func getFilePartsConcurrency() int { + n := cgroup.AvailableCPUs() + if n < 4 { + // Allow at least 4 concurrent workers for file parts on systems + // with less than 4 CPU cores in order to be able to make small file merges + // when big file merges are in progress. + return 4 + } + return n } // MustClose closes the table. func (tb *Table) MustClose() { + // Notify background workers to stop. + // The tb.partsLock is aquired in order to guarantee that tb.wg.Add() isn't called + // after tb.stopCh is closed and tb.wg.Wait() is called below. + tb.partsLock.Lock() close(tb.stopCh) + tb.partsLock.Unlock() - // Waiting for background workers to stop + // Wait for background workers to stop. tb.wg.Wait() - tb.flushInmemoryItems() - tb.flushCallbackWorkerWG.Wait() + // Flush the remaining in-memory items to files. + tb.flushInmemoryItemsToFiles() // Remove references to parts from the tb, so they may be eventually closed after all the searches are done. tb.partsLock.Lock() - inmemoryParts := tb.inmemoryParts - fileParts := tb.fileParts + + if n := tb.rawItems.Len(); n > 0 { + logger.Panicf("BUG: raw items must be empty at this stage; got %d items", n) + } + + if n := len(tb.inmemoryParts); n > 0 { + logger.Panicf("BUG: in-memory parts must be empty at this stage; got %d parts", n) + } tb.inmemoryParts = nil + + fileParts := tb.fileParts tb.fileParts = nil + tb.partsLock.Unlock() - for _, pw := range inmemoryParts { - pw.decRef() - } for _, pw := range fileParts { pw.decRef() } @@ -421,12 +506,11 @@ type TableMetrics struct { InmemoryItemsMerged uint64 FileItemsMerged uint64 - InmemoryAssistedMergesCount uint64 - FileAssistedMergesCount uint64 - ItemsAdded uint64 ItemsAddedSizeBytes uint64 + InmemoryPartsLimitReachedCount uint64 + PendingItems uint64 InmemoryPartsCount uint64 @@ -472,12 +556,11 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.InmemoryItemsMerged += atomic.LoadUint64(&tb.inmemoryItemsMerged) m.FileItemsMerged += atomic.LoadUint64(&tb.fileItemsMerged) - m.InmemoryAssistedMergesCount += atomic.LoadUint64(&tb.inmemoryAssistedMergesCount) - m.FileAssistedMergesCount += atomic.LoadUint64(&tb.fileAssistedMergesCount) - m.ItemsAdded += atomic.LoadUint64(&tb.itemsAdded) m.ItemsAddedSizeBytes += atomic.LoadUint64(&tb.itemsAddedSizeBytes) + m.InmemoryPartsLimitReachedCount += atomic.LoadUint64(&tb.inmemoryPartsLimitReachedCount) + m.PendingItems += uint64(tb.rawItems.Len()) tb.partsLock.Lock() @@ -553,21 +636,38 @@ func (tb *Table) putParts(pws []*partWrapper) { } } -func (tb *Table) mergePartsOptimal(pws []*partWrapper) error { - sortPartsForOptimalMerge(pws) +func (tb *Table) mergeInmemoryPartsToFiles(pws []*partWrapper) error { + pwsLen := len(pws) + + var errGlobal error + var errGlobalLock sync.Mutex + wg := getWaitGroup() for len(pws) > 0 { - n := defaultPartsToMerge - if n > len(pws) { - n = len(pws) - } - pwsChunk := pws[:n] - pws = pws[n:] - err := tb.mergeParts(pwsChunk, nil, true) - if err == nil { - continue - } - tb.releasePartsToMerge(pws) - return fmt.Errorf("cannot optimally merge %d parts: %w", n, err) + pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws) + wg.Add(1) + inmemoryPartsConcurrencyCh <- struct{}{} + go func(pwsChunk []*partWrapper) { + defer func() { + <-inmemoryPartsConcurrencyCh + wg.Done() + }() + + if err := tb.mergeParts(pwsChunk, nil, true); err != nil { + // There is no need for errors.Is(err, errForciblyStopped) check here, since stopCh=nil is passed to mergeParts. + errGlobalLock.Lock() + if errGlobal == nil { + errGlobal = err + } + errGlobalLock.Unlock() + } + }(pwsToMerge) + pws = pwsRemaining + } + wg.Wait() + putWaitGroup(wg) + + if errGlobal != nil { + return fmt.Errorf("cannot optimally merge %d parts: %w", pwsLen, errGlobal) } return nil } @@ -584,23 +684,22 @@ func (tb *Table) DebugFlush() { tb.flushPendingItems(nil, true) // Wait for background flushers to finish. - tb.rawItemsPendingFlushesWG.Wait() + tb.flushPendingItemsWG.Wait() } -func (tb *Table) startInmemoryPartsFlusher() { - tb.wg.Add(1) - go func() { - tb.inmemoryPartsFlusher() - tb.wg.Done() - }() -} - -func (tb *Table) startPendingItemsFlusher() { - tb.wg.Add(1) - go func() { - tb.pendingItemsFlusher() - tb.wg.Done() - }() +func (tb *Table) pendingItemsFlusher() { + d := timeutil.AddJitterToDuration(pendingItemsFlushInterval) + ticker := time.NewTicker(d) + defer ticker.Stop() + var ibs []*inmemoryBlock + for { + select { + case <-tb.stopCh: + return + case <-ticker.C: + ibs = tb.flushPendingItems(ibs[:0], false) + } + } } func (tb *Table) inmemoryPartsFlusher() { @@ -612,38 +711,24 @@ func (tb *Table) inmemoryPartsFlusher() { case <-tb.stopCh: return case <-ticker.C: - tb.flushInmemoryParts(false) - } - } -} - -func (tb *Table) pendingItemsFlusher() { - ticker := time.NewTicker(pendingItemsFlushInterval) - defer ticker.Stop() - var ibs []*inmemoryBlock - for { - select { - case <-tb.stopCh: - return - case <-ticker.C: - ibs = tb.flushPendingItems(ibs[:0], false) - for i := range ibs { - ibs[i] = nil - } + tb.flushInmemoryPartsToFiles(false) } } } func (tb *Table) flushPendingItems(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { - return tb.rawItems.flush(tb, dst, isFinal) + tb.flushPendingItemsWG.Add(1) + dst = tb.rawItems.flush(tb, dst, isFinal) + tb.flushPendingItemsWG.Done() + return dst } -func (tb *Table) flushInmemoryItems() { - tb.rawItems.flush(tb, nil, true) - tb.flushInmemoryParts(true) +func (tb *Table) flushInmemoryItemsToFiles() { + tb.flushPendingItems(nil, true) + tb.flushInmemoryPartsToFiles(true) } -func (tb *Table) flushInmemoryParts(isFinal bool) { +func (tb *Table) flushInmemoryPartsToFiles(isFinal bool) { currentTime := time.Now() var pws []*partWrapper @@ -656,18 +741,16 @@ func (tb *Table) flushInmemoryParts(isFinal bool) { } tb.partsLock.Unlock() - if err := tb.mergePartsOptimal(pws); err != nil { - logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) + if err := tb.mergeInmemoryPartsToFiles(pws); err != nil { + logger.Panicf("FATAL: cannot merge in-memory parts to files: %s", err) } } func (riss *rawItemsShards) flush(tb *Table, dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { - tb.rawItemsPendingFlushesWG.Add(1) for i := range riss.shards { dst = riss.shards[i].appendBlocksToFlush(dst, isFinal) } - tb.flushBlocksToParts(dst, isFinal) - tb.rawItemsPendingFlushesWG.Done() + tb.flushBlocksToInmemoryParts(dst, isFinal) return dst } @@ -695,10 +778,12 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool return dst } -func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { +func (tb *Table) flushBlocksToInmemoryParts(ibs []*inmemoryBlock, isFinal bool) { if len(ibs) == 0 { return } + + // Merge ibs into in-memory parts. var pwsLock sync.Mutex pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge) wg := getWaitGroup() @@ -708,44 +793,65 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { n = len(ibs) } wg.Add(1) - flushConcurrencyCh <- struct{}{} + inmemoryPartsConcurrencyCh <- struct{}{} go func(ibsChunk []*inmemoryBlock) { defer func() { - <-flushConcurrencyCh + <-inmemoryPartsConcurrencyCh wg.Done() }() - pw := tb.createInmemoryPart(ibsChunk) - if pw == nil { - return + + if pw := tb.createInmemoryPart(ibsChunk); pw != nil { + pwsLock.Lock() + pws = append(pws, pw) + pwsLock.Unlock() + } + // Clear references to ibsChunk items, so they may be reclaimed faster by Go GC. + for i := range ibsChunk { + ibsChunk[i] = nil } - pwsLock.Lock() - pws = append(pws, pw) - pwsLock.Unlock() }(ibs[:n]) ibs = ibs[n:] } wg.Wait() putWaitGroup(wg) - flushConcurrencyCh <- struct{}{} - pw := tb.mustMergeInmemoryParts(pws) - <-flushConcurrencyCh + // Merge pws into a single in-memory part. + maxPartSize := getMaxInmemoryPartSize() + for len(pws) > 1 { + pws = tb.mustMergeInmemoryParts(pws) + pwsRemaining := pws[:0] + for _, pw := range pws { + if pw.p.size >= maxPartSize { + tb.addToInmemoryParts(pw, isFinal) + } else { + pwsRemaining = append(pwsRemaining, pw) + } + } + pws = pwsRemaining + } + if len(pws) == 1 { + tb.addToInmemoryParts(pws[0], isFinal) + } +} + +func (tb *Table) addToInmemoryParts(pw *partWrapper, isFinal bool) { + // Wait until the number of in-memory parts goes below maxInmemoryParts. + // This prevents from excess CPU usage during search in tb under high ingestion rate to tb. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 select { case tb.inmemoryPartsLimitCh <- struct{}{}: default: - // Too many in-memory parts. Try assist merging them before adding pw to tb.inmemoryParts. - flushConcurrencyCh <- struct{}{} - tb.assistedMergeForInmemoryParts() - tb.assistedMergeForFileParts() - <-flushConcurrencyCh - - tb.inmemoryPartsLimitCh <- struct{}{} + atomic.AddUint64(&tb.inmemoryPartsLimitReachedCount, 1) + select { + case tb.inmemoryPartsLimitCh <- struct{}{}: + case <-tb.stopCh: + } } tb.partsLock.Lock() tb.inmemoryParts = append(tb.inmemoryParts, pw) - tb.notifyBackgroundMergers() + tb.startInmemoryPartsMergerLocked() tb.partsLock.Unlock() if tb.flushCallback != nil { @@ -761,83 +867,6 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { } } -func (tb *Table) notifyBackgroundMergers() bool { - select { - case tb.needMergeCh <- struct{}{}: - return true - default: - return false - } -} - -var flushConcurrencyLimit = func() int { - n := cgroup.AvailableCPUs() - if n < 2 { - // Allow at least 2 concurrent flushers on systems with a single CPU core - // in order to guarantee that in-memory data flushes and background merges can be continued - // when a single flusher is busy with the long merge. - n = 2 - } - return n -}() - -var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) - -func (tb *Table) assistedMergeForInmemoryParts() { - tb.partsLock.Lock() - needMerge := getNotInMergePartsCount(tb.inmemoryParts) >= defaultPartsToMerge - tb.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&tb.inmemoryAssistedMergesCount, 1) - - maxOutBytes := tb.getMaxFilePartSize() - - tb.partsLock.Lock() - pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes, true) - tb.partsLock.Unlock() - - err := tb.mergeParts(pws, tb.stopCh, true) - if err == nil { - return - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { - return - } - logger.Panicf("FATAL: cannot assist with merging inmemory parts: %s", err) -} - -func (tb *Table) assistedMergeForFileParts() { - tb.partsLock.Lock() - needMerge := len(tb.fileParts) > maxFileParts && getNotInMergePartsCount(tb.fileParts) >= defaultPartsToMerge - tb.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&tb.fileAssistedMergesCount, 1) - err := tb.mergeExistingParts(false) - if err == nil { - return - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { - return - } - logger.Panicf("FATAL: cannot assist with merging file parts: %s", err) -} - -func getNotInMergePartsCount(pws []*partWrapper) int { - n := 0 - for _, pw := range pws { - if !pw.isInMerge { - n++ - } - } - return n -} - func getWaitGroup() *sync.WaitGroup { v := wgPool.Get() if v == nil { @@ -852,7 +881,38 @@ func putWaitGroup(wg *sync.WaitGroup) { var wgPool sync.Pool -func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) *partWrapper { +func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper { + var pwsResult []*partWrapper + var pwsResultLock sync.Mutex + wg := getWaitGroup() + for len(pws) > 0 { + pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws) + wg.Add(1) + inmemoryPartsConcurrencyCh <- struct{}{} + go func(pwsChunk []*partWrapper) { + defer func() { + <-inmemoryPartsConcurrencyCh + wg.Done() + }() + + pw := tb.mustMergeInmemoryPartsFinal(pwsChunk) + + pwsResultLock.Lock() + pwsResult = append(pwsResult, pw) + pwsResultLock.Unlock() + }(pwsToMerge) + pws = pwsRemaining + } + wg.Wait() + putWaitGroup(wg) + + return pwsResult +} + +func (tb *Table) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper { + if len(pws) == 0 { + logger.Panicf("BUG: pws must contain at least a single item") + } if len(pws) == 1 { // Nothing to merge return pws[0] @@ -912,19 +972,16 @@ func (tb *Table) mustMergeIntoInmemoryPart(bsrs []*blockStreamReader, flushToDis bsw.MustInitFromInmemoryPart(mpDst, compressLevel) // Merge parts. - // The merge shouldn't be interrupted by stopCh, - // since it may be final after stopCh is closed. - atomic.AddUint64(&tb.activeInmemoryMerges, 1) - err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.inmemoryItemsMerged) - atomic.AddUint64(&tb.activeInmemoryMerges, ^uint64(0)) - atomic.AddUint64(&tb.inmemoryMergesCount, 1) - if err != nil { - logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) - } + // The merge shouldn't be interrupted by stopCh, so use nil stopCh. + ph, err := tb.mergePartsInternal("", bsw, bsrs, partInmemory, nil) putBlockStreamWriter(bsw) for _, bsr := range bsrs { putBlockStreamReader(bsr) } + if err != nil { + logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) + } + mpDst.ph = *ph return newPartWrapperFromInmemoryPart(mpDst, flushToDiskDeadline) } @@ -939,17 +996,6 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T } } -func (tb *Table) startMergeWorkers() { - // The actual number of concurrent merges is limited inside mergeWorker() below. - for i := 0; i < cap(mergeWorkersLimitCh); i++ { - tb.wg.Add(1) - go func() { - tb.mergeWorker() - tb.wg.Done() - }() - } -} - func getMaxInmemoryPartSize() uint64 { // Allow up to 5% of memory for in-memory parts. n := uint64(0.05 * float64(memory.Allowed()) / maxInmemoryParts) @@ -961,95 +1007,86 @@ func getMaxInmemoryPartSize() uint64 { func (tb *Table) getMaxFilePartSize() uint64 { n := fs.MustGetFreeSpace(tb.path) - // Divide free space by the max number of concurrent merges. - maxOutBytes := n / uint64(cap(mergeWorkersLimitCh)) + // Divide free space by the max number of concurrent merges for file parts. + maxOutBytes := n / uint64(cap(filePartsConcurrencyCh)) if maxOutBytes > maxPartSize { maxOutBytes = maxPartSize } return maxOutBytes } -func (tb *Table) canBackgroundMerge() bool { - return atomic.LoadUint32(tb.isReadOnly) == 0 +// NotifyReadWriteMode notifies tb that it may be switched from read-only mode to read-write mode. +func (tb *Table) NotifyReadWriteMode() { + tb.startInmemoryPartsMergers() + tb.startFilePartsMergers() } -var errReadOnlyMode = fmt.Errorf("storage is in readonly mode") - -func (tb *Table) mergeExistingParts(isFinal bool) error { - if !tb.canBackgroundMerge() { - // Do not perform background merge in read-only mode - // in order to prevent from disk space shortage. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603 - return errReadOnlyMode - } - maxOutBytes := tb.getMaxFilePartSize() - - tb.partsLock.Lock() - dst := make([]*partWrapper, 0, len(tb.inmemoryParts)+len(tb.fileParts)) - dst = append(dst, tb.inmemoryParts...) - dst = append(dst, tb.fileParts...) - pws := getPartsToMerge(dst, maxOutBytes, isFinal) - tb.partsLock.Unlock() - - return tb.mergeParts(pws, tb.stopCh, isFinal) -} - -func (tb *Table) mergeWorker() { - var lastMergeTime uint64 - isFinal := false +func (tb *Table) inmemoryPartsMerger() { for { - // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers - // across tables may exceed the the cap(mergeWorkersLimitCh). - mergeWorkersLimitCh <- struct{}{} - err := tb.mergeExistingParts(isFinal) - <-mergeWorkersLimitCh + if atomic.LoadUint32(tb.isReadOnly) != 0 { + return + } + maxOutBytes := tb.getMaxFilePartSize() + + tb.partsLock.Lock() + pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes) + tb.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + inmemoryPartsConcurrencyCh <- struct{}{} + err := tb.mergeParts(pws, tb.stopCh, false) + <-inmemoryPartsConcurrencyCh + + if err == nil { + // Try merging additional parts. + continue + } + if errors.Is(err, errForciblyStopped) { + // Nothing to do - finish the merger. + return + } + // Unexpected error. + logger.Panicf("FATAL: unrecoverable error when merging inmemory parts in %q: %s", tb.path, err) + } +} + +func (tb *Table) filePartsMerger() { + for { + if atomic.LoadUint32(tb.isReadOnly) != 0 { + return + } + maxOutBytes := tb.getMaxFilePartSize() + + tb.partsLock.Lock() + pws := getPartsToMerge(tb.fileParts, maxOutBytes) + tb.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + filePartsConcurrencyCh <- struct{}{} + err := tb.mergeParts(pws, tb.stopCh, false) + <-filePartsConcurrencyCh + if err == nil { // Try merging additional parts. - lastMergeTime = fasttime.UnixTimestamp() - isFinal = false continue } if errors.Is(err, errForciblyStopped) { // The merger has been stopped. return } - if !errors.Is(err, errNothingToMerge) && !errors.Is(err, errReadOnlyMode) { - // Unexpected error. - logger.Panicf("FATAL: unrecoverable error when merging inmemory parts in %q: %s", tb.path, err) - } - if finalMergeDelaySeconds > 0 && fasttime.UnixTimestamp()-lastMergeTime > finalMergeDelaySeconds { - // We have free time for merging into bigger parts. - // This should improve select performance. - lastMergeTime = fasttime.UnixTimestamp() - isFinal = true - continue - } - - // Nothing to merge. Wait for the notification of new merge. - select { - case <-tb.stopCh: - return - case <-tb.needMergeCh: - } + // Unexpected error. + logger.Panicf("FATAL: unrecoverable error when merging file parts in %q: %s", tb.path, err) } } -// Disable final merge by default, since it may lead to high disk IO and CPU usage -// after some inactivity time. -var finalMergeDelaySeconds = uint64(0) - -// SetFinalMergeDelay sets the delay before doing final merge for Table without newly ingested data. -// -// This function may be called only before Table initialization. -func SetFinalMergeDelay(delay time.Duration) { - if delay <= 0 { - return - } - finalMergeDelaySeconds = uint64(delay.Seconds() + 1) -} - -var errNothingToMerge = fmt.Errorf("nothing to merge") - func assertIsInMerge(pws []*partWrapper) { for _, pw := range pws { if !pw.isInMerge { @@ -1071,16 +1108,19 @@ func (tb *Table) releasePartsToMerge(pws []*partWrapper) { // mergeParts merges pws to a single resulting part. // +// It is expected that pws contains at least a single part. +// // Merging is immediately stopped if stopCh is closed. // // If isFinal is set, then the resulting part will be stored to disk. +// If at least a single source part at pws is stored on disk, then the resulting part +// will be stored to disk. // // All the parts inside pws must have isInMerge field set to true. // The isInMerge field inside pws parts is set to false before returning from the function. func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error { if len(pws) == 0 { - // Nothing to merge. - return errNothingToMerge + logger.Panicf("BUG: empty pws cannot be passed to mergeParts()") } assertIsInMerge(pws) @@ -1270,13 +1310,8 @@ func areAllInmemoryParts(pws []*partWrapper) bool { func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) { // Atomically unregister old parts and add new part to tb. - m := make(map[*partWrapper]bool, len(pws)) - for _, pw := range pws { - m[pw] = true - } - if len(m) != len(pws) { - logger.Panicf("BUG: %d duplicate parts found when merging %d parts", len(pws)-len(m), len(pws)) - } + m := makeMapFromPartWrappers(pws) + removedInmemoryParts := 0 removedFileParts := 0 @@ -1287,12 +1322,13 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst switch dstPartType { case partInmemory: tb.inmemoryParts = append(tb.inmemoryParts, pwNew) + tb.startInmemoryPartsMergerLocked() case partFile: tb.fileParts = append(tb.fileParts, pwNew) + tb.startFilePartsMergerLocked() default: logger.Panicf("BUG: unknown partType=%d", dstPartType) } - tb.notifyBackgroundMergers() // Atomically store the updated list of file-based parts on disk. // This must be performed under partsLock in order to prevent from races @@ -1305,10 +1341,16 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst // Update inmemoryPartsLimitCh accordingly to the number of the remaining in-memory parts. for i := 0; i < removedInmemoryParts; i++ { - <-tb.inmemoryPartsLimitCh + select { + case <-tb.inmemoryPartsLimitCh: + case <-tb.stopCh: + } } if dstPartType == partInmemory { - tb.inmemoryPartsLimitCh <- struct{}{} + select { + case tb.inmemoryPartsLimitCh <- struct{}{}: + case <-tb.stopCh: + } } removedParts := removedInmemoryParts + removedFileParts @@ -1324,6 +1366,17 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst } } +func makeMapFromPartWrappers(pws []*partWrapper) map[*partWrapper]struct{} { + m := make(map[*partWrapper]struct{}, len(pws)) + for _, pw := range pws { + m[pw] = struct{}{} + } + if len(m) != len(pws) { + logger.Panicf("BUG: %d duplicate parts found in %d source parts", len(pws)-len(m), len(pws)) + } + return m +} + func getPartsSize(pws []*partWrapper) uint64 { n := uint64(0) for _, pw := range pws { @@ -1363,19 +1416,6 @@ func (tb *Table) nextMergeIdx() uint64 { return atomic.AddUint64(&tb.mergeIdx, 1) } -var mergeWorkersLimitCh = make(chan struct{}, getWorkersCount()) - -func getWorkersCount() int { - n := cgroup.AvailableCPUs() - if n < 4 { - // Allow at least 4 merge workers on systems with small CPUs count - // in order to guarantee that background merges can be continued - // when multiple workers are busy with big merges. - n = 4 - } - return n -} - func mustOpenParts(path string) []*partWrapper { // The path can be missing after restoring from backup, so create it if needed. fs.MustMkdirIfNotExist(path) @@ -1468,7 +1508,7 @@ func (tb *Table) CreateSnapshotAt(dstDir string, deadline uint64) error { } // Flush inmemory items to disk. - tb.flushInmemoryItems() + tb.flushInmemoryItemsToFiles() fs.MustMkdirFailIfExist(dstDir) @@ -1553,33 +1593,50 @@ func mustReadPartNames(srcDir string) []string { // getPartsToMerge returns optimal parts to merge from pws. // -// if isFinal is set, then merge harder. -// // The summary size of the returned parts must be smaller than the maxOutBytes. -func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) []*partWrapper { +func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64) []*partWrapper { pwsRemaining := make([]*partWrapper, 0, len(pws)) for _, pw := range pws { if !pw.isInMerge { pwsRemaining = append(pwsRemaining, pw) } } - maxPartsToMerge := defaultPartsToMerge - var dst []*partWrapper - if isFinal { - for len(dst) == 0 && maxPartsToMerge >= finalPartsToMerge { - dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) - maxPartsToMerge-- - } - } else { - dst = appendPartsToMerge(dst[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) - } - for _, pw := range dst { + + pwsToMerge := appendPartsToMerge(nil, pwsRemaining, defaultPartsToMerge, maxOutBytes) + + for _, pw := range pwsToMerge { if pw.isInMerge { - logger.Panicf("BUG: partWrapper.isInMerge is already set") + logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to true") } pw.isInMerge = true } - return dst + + return pwsToMerge +} + +// getPartsForOptimalMerge returns parts from pws for optimal merge, plus the remaining parts. +// +// the pws items are replaced by nil after the call. This is needed for helping Go GC to reclaim the referenced items. +func getPartsForOptimalMerge(pws []*partWrapper) ([]*partWrapper, []*partWrapper) { + pwsToMerge := appendPartsToMerge(nil, pws, defaultPartsToMerge, math.MaxUint64) + if len(pwsToMerge) == 0 { + return pws, nil + } + + m := makeMapFromPartWrappers(pwsToMerge) + pwsRemaining := make([]*partWrapper, 0, len(pws)-len(pwsToMerge)) + for _, pw := range pws { + if _, ok := m[pw]; !ok { + pwsRemaining = append(pwsRemaining, pw) + } + } + + // Clear references to pws items, so they could be reclaimed faster by Go GC. + for i := range pws { + pws[i] = nil + } + + return pwsToMerge, pwsRemaining } // minMergeMultiplier is the minimum multiplier for the size of the output part @@ -1590,8 +1647,7 @@ func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) []*pa // The 1.7 is good enough for production workloads. const minMergeMultiplier = 1.7 -// appendPartsToMerge finds optimal parts to merge from src, appends -// them to dst and returns the result. +// appendPartsToMerge finds optimal parts to merge from src, appends them to dst and returns the result. func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxOutBytes uint64) []*partWrapper { if len(src) < 2 { // There is no need in merging zero or one part :) @@ -1671,10 +1727,10 @@ func sortPartsForOptimalMerge(pws []*partWrapper) { }) } -func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*partWrapper, int) { +func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) { dst := pws[:0] for _, pw := range pws { - if !partsToRemove[pw] { + if _, ok := partsToRemove[pw]; !ok { dst = append(dst, pw) } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 66866a309..108d89f63 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -29,15 +29,15 @@ import ( // This time shouldn't exceed a few days. const maxBigPartSize = 1e12 -// The maximum number of inmemory parts in the partition. +// The maximum number of inmemory parts per partition. // -// If the number of inmemory parts reaches this value, then assisted merge runs during data ingestion. -const maxInmemoryPartsPerPartition = 20 - -// The maximum number of small parts in the partition. +// This limit allows reducing querying CPU usage under high ingestion rate. +// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 // -// If the number of small parts reaches this value, then assisted merge runs during data ingestion. -const maxSmallPartsPerPartition = 30 +// This number may be reached when the insertion pace outreaches merger pace. +// If this number is reached, then the data ingestion is paused until background +// mergers reduce the number of parts below this number. +const maxInmemoryParts = 60 // Default number of parts to merge at once. // @@ -45,17 +45,10 @@ const maxSmallPartsPerPartition = 30 // See appendPartsToMerge tests for details. const defaultPartsToMerge = 15 -// The final number of parts to merge at once. -// -// It must be smaller than defaultPartsToMerge. -// Lower value improves select performance at the cost of increased -// write amplification. -const finalPartsToMerge = 3 - // The number of shards for rawRow entries per partition. // // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. -var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 1) / 2 +var rawRowsShardsPerPartition = cgroup.AvailableCPUs() // The interval for flushing buffered rows into parts, so they become visible to search. const pendingRowsFlushInterval = time.Second @@ -76,10 +69,10 @@ func SetDataFlushInterval(d time.Duration) { } } -// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet. +// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet per earh rawRowsShard. func getMaxRawRowsPerShard() int { maxRawRowsPerPartitionOnce.Do(func() { - n := memory.Allowed() / rawRowsShardsPerPartition / 256 / int(unsafe.Sizeof(rawRow{})) + n := memory.Allowed() / rawRowsShardsPerPartition / (100 * int(unsafe.Sizeof(rawRow{}))) if n < 1e4 { n = 1e4 } @@ -117,13 +110,13 @@ type partition struct { smallRowsDeleted uint64 bigRowsDeleted uint64 - inmemoryAssistedMergesCount uint64 - smallAssistedMergesCount uint64 - mergeIdx uint64 + // the path to directory with smallParts. smallPartsPath string - bigPartsPath string + + // the path to directory with bigParts. + bigPartsPath string // The parent storage. s *Storage @@ -135,28 +128,34 @@ type partition struct { tr TimeRange // rawRows contains recently added rows that haven't been converted into parts yet. - // rawRows are periodically converted into inmemroyParts. - // rawRows aren't used in search for performance reasons. + // + // rawRows are converted into inmemoryParts on every pendingRowsFlushInterval or when rawRows becomes full. + // + // rawRows aren't visible for search due to performance reasons. rawRows rawRowsShards // partsLock protects inmemoryParts, smallParts and bigParts. partsLock sync.Mutex - // Contains inmemory parts with recently ingested data. + // Contains inmemory parts with recently ingested data, which are visible for search. inmemoryParts []*partWrapper - // Contains file-based parts with small number of items. + // Contains file-based parts with small number of items, which are visible for search. smallParts []*partWrapper - // Contains file-based parts with big number of items. + // Contains file-based parts with big number of items, which are visible for search. bigParts []*partWrapper - // This channel is used for signaling the background mergers that there are parts, - // which may need to be merged. - needMergeCh chan struct{} - + // stopCh is used for notifying all the background workers to stop. + // + // It must be closed under partsLock in order to prevent from calling wg.Add() + // after stopCh is closed. stopCh chan struct{} + // wg is used for waiting for all the background workers to stop. + // + // wg.Add() must be called under partsLock after checking whether stopCh isn't closed. + // This should prevent from calling wg.Add() after stopCh is closed and wg.Wait() is called. wg sync.WaitGroup } @@ -233,9 +232,13 @@ func mustCreatePartition(timestamp int64, smallPartitionsPath, bigPartitionsPath } func (pt *partition) startBackgroundWorkers() { - pt.startMergeWorkers() - pt.startInmemoryPartsFlusher() + // Start file parts mergers, so they could start merging unmerged parts if needed. + // There is no need in starting in-memory parts mergers, since there are no in-memory parts yet. + pt.startSmallPartsMergers() + pt.startBigPartsMergers() + pt.startPendingRowsFlusher() + pt.startInmemoryPartsFlusher() pt.startStalePartsRemover() } @@ -281,24 +284,17 @@ func mustOpenPartition(smallPartsPath, bigPartsPath string, s *Storage) *partiti } pt.startBackgroundWorkers() - // Wake up a single background merger, so it could start merging parts if needed. - pt.notifyBackgroundMergers() - return pt } func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partition { p := &partition{ name: name, + mergeIdx: uint64(time.Now().UnixNano()), smallPartsPath: smallPartsPath, bigPartsPath: bigPartsPath, - - s: s, - - mergeIdx: uint64(time.Now().UnixNano()), - needMergeCh: make(chan struct{}, cgroup.AvailableCPUs()), - - stopCh: make(chan struct{}), + s: s, + stopCh: make(chan struct{}), } p.rawRows.init() return p @@ -349,9 +345,6 @@ type partitionMetrics struct { InmemoryPartsRefCount uint64 SmallPartsRefCount uint64 BigPartsRefCount uint64 - - InmemoryAssistedMergesCount uint64 - SmallAssistedMergesCount uint64 } // TotalRowsCount returns total number of rows in tm. @@ -414,9 +407,6 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.InmemoryRowsDeleted += atomic.LoadUint64(&pt.inmemoryRowsDeleted) m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted) m.BigRowsDeleted += atomic.LoadUint64(&pt.bigRowsDeleted) - - m.InmemoryAssistedMergesCount += atomic.LoadUint64(&pt.inmemoryAssistedMergesCount) - m.SmallAssistedMergesCount += atomic.LoadUint64(&pt.smallAssistedMergesCount) } // AddRows adds the given rows to the partition pt. @@ -519,16 +509,8 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { rrs.mu.Unlock() if rrb != nil { - pt.flushRowsToParts(rrb.rows) + pt.flushRowsToInmemoryParts(rrb.rows) putRawRowsBlock(rrb) - - // Run assisted merges if needed. - flushConcurrencyCh <- struct{}{} - pt.assistedMergeForInmemoryParts() - pt.assistedMergeForSmallParts() - // There is no need in assisted merges for big parts, - // since the bottleneck is possible only at inmemory and small parts. - <-flushConcurrencyCh } return rows @@ -560,10 +542,12 @@ func putRawRowsBlock(rrb *rawRowsBlock) { var rawRowsBlockPool sync.Pool -func (pt *partition) flushRowsToParts(rows []rawRow) { +func (pt *partition) flushRowsToInmemoryParts(rows []rawRow) { if len(rows) == 0 { return } + + // Merge rows into in-memory parts. maxRows := getMaxRawRowsPerShard() var pwsLock sync.Mutex pws := make([]*partWrapper, 0, (len(rows)+maxRows-1)/maxRows) @@ -574,111 +558,155 @@ func (pt *partition) flushRowsToParts(rows []rawRow) { n = len(rows) } wg.Add(1) - flushConcurrencyCh <- struct{}{} + inmemoryPartsConcurrencyCh <- struct{}{} go func(rowsChunk []rawRow) { defer func() { - <-flushConcurrencyCh + <-inmemoryPartsConcurrencyCh wg.Done() }() + pw := pt.createInmemoryPart(rowsChunk) - if pw == nil { - return + if pw != nil { + pwsLock.Lock() + pws = append(pws, pw) + pwsLock.Unlock() } - pwsLock.Lock() - pws = append(pws, pw) - pwsLock.Unlock() }(rows[:n]) rows = rows[n:] } wg.Wait() putWaitGroup(wg) - pt.partsLock.Lock() - pt.inmemoryParts = append(pt.inmemoryParts, pws...) - for range pws { - if !pt.notifyBackgroundMergers() { - break + // Merge pws into a single in-memory part. + maxPartSize := getMaxInmemoryPartSize() + for len(pws) > 1 { + pws = pt.mustMergeInmemoryParts(pws) + + pwsRemaining := pws[:0] + for _, pw := range pws { + if pw.p.size >= maxPartSize { + pt.addToInmemoryParts(pw) + } else { + pwsRemaining = append(pwsRemaining, pw) + } } + pws = pwsRemaining } - pt.partsLock.Unlock() -} - -func (pt *partition) notifyBackgroundMergers() bool { - select { - case pt.needMergeCh <- struct{}{}: - return true - default: - return false + if len(pws) == 1 { + pt.addToInmemoryParts(pws[0]) } } -var flushConcurrencyLimit = func() int { - n := cgroup.AvailableCPUs() - if n < 3 { - // Allow at least 3 concurrent flushers on systems with a single CPU core - // in order to guarantee that in-memory data flushes and background merges can be continued - // when a single flusher is busy with the long merge of big parts, - // while another flusher is busy with the long merge of small parts. - n = 3 - } - return n -}() - -var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) - -func needAssistedMerge(pws []*partWrapper, maxParts int) bool { - if len(pws) < maxParts { - return false - } - return getNotInMergePartsCount(pws) >= defaultPartsToMerge -} - -func (pt *partition) assistedMergeForInmemoryParts() { +func (pt *partition) addToInmemoryParts(pw *partWrapper) { pt.partsLock.Lock() - needMerge := needAssistedMerge(pt.inmemoryParts, maxInmemoryPartsPerPartition) + pt.inmemoryParts = append(pt.inmemoryParts, pw) + pt.startInmemoryPartsMergerLocked() pt.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&pt.inmemoryAssistedMergesCount, 1) - err := pt.mergeInmemoryParts() - if err == nil { - return - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { - return - } - logger.Panicf("FATAL: cannot merge inmemory parts: %s", err) } -func (pt *partition) assistedMergeForSmallParts() { - pt.partsLock.Lock() - needMerge := needAssistedMerge(pt.smallParts, maxSmallPartsPerPartition) - pt.partsLock.Unlock() - if !needMerge { - return - } - - atomic.AddUint64(&pt.smallAssistedMergesCount, 1) - err := pt.mergeExistingParts(false) - if err == nil { - return - } - if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { - return - } - logger.Panicf("FATAL: cannot merge small parts: %s", err) +func (pt *partition) NotifyReadWriteMode() { + pt.startInmemoryPartsMergers() + pt.startSmallPartsMergers() + pt.startBigPartsMergers() } -func getNotInMergePartsCount(pws []*partWrapper) int { - n := 0 - for _, pw := range pws { - if !pw.isInMerge { - n++ +func (pt *partition) inmemoryPartsMerger() { + for { + if atomic.LoadUint32(&pt.s.isReadOnly) != 0 { + return } + maxOutBytes := pt.getMaxBigPartSize() + + pt.partsLock.Lock() + pws := getPartsToMerge(pt.inmemoryParts, maxOutBytes) + pt.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + inmemoryPartsConcurrencyCh <- struct{}{} + err := pt.mergeParts(pws, pt.stopCh, false) + <-inmemoryPartsConcurrencyCh + + if err == nil { + // Try merging additional parts. + continue + } + if errors.Is(err, errForciblyStopped) { + // Nothing to do - finish the merger. + return + } + // Unexpected error. + logger.Panicf("FATAL: unrecoverable error when merging inmemory parts in partition %q: %s", pt.name, err) + } +} + +func (pt *partition) smallPartsMerger() { + for { + if atomic.LoadUint32(&pt.s.isReadOnly) != 0 { + return + } + maxOutBytes := pt.getMaxBigPartSize() + + pt.partsLock.Lock() + pws := getPartsToMerge(pt.smallParts, maxOutBytes) + pt.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + smallPartsConcurrencyCh <- struct{}{} + err := pt.mergeParts(pws, pt.stopCh, false) + <-smallPartsConcurrencyCh + + if err == nil { + // Try merging additional parts. + continue + } + if errors.Is(err, errForciblyStopped) { + // Nothing to do - finish the merger. + return + } + // Unexpected error. + logger.Panicf("FATAL: unrecoverable error when merging small parts at %q: %s", pt.smallPartsPath, err) + } +} + +func (pt *partition) bigPartsMerger() { + for { + if atomic.LoadUint32(&pt.s.isReadOnly) != 0 { + return + } + maxOutBytes := pt.getMaxBigPartSize() + + pt.partsLock.Lock() + pws := getPartsToMerge(pt.bigParts, maxOutBytes) + pt.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + bigPartsConcurrencyCh <- struct{}{} + err := pt.mergeParts(pws, pt.stopCh, false) + <-bigPartsConcurrencyCh + + if err == nil { + // Try merging additional parts. + continue + } + if errors.Is(err, errForciblyStopped) { + // Nothing to do - finish the merger. + return + } + // Unexpected error. + logger.Panicf("FATAL: unrecoverable error when merging big parts at %q: %s", pt.bigPartsPath, err) } - return n } func getWaitGroup() *sync.WaitGroup { @@ -695,6 +723,85 @@ func putWaitGroup(wg *sync.WaitGroup) { var wgPool sync.Pool +func (pt *partition) mustMergeInmemoryParts(pws []*partWrapper) []*partWrapper { + var pwsResult []*partWrapper + var pwsResultLock sync.Mutex + wg := getWaitGroup() + for len(pws) > 0 { + pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws) + wg.Add(1) + inmemoryPartsConcurrencyCh <- struct{}{} + go func(pwsChunk []*partWrapper) { + defer func() { + <-inmemoryPartsConcurrencyCh + wg.Done() + }() + + pw := pt.mustMergeInmemoryPartsFinal(pwsChunk) + + pwsResultLock.Lock() + pwsResult = append(pwsResult, pw) + pwsResultLock.Unlock() + }(pwsToMerge) + pws = pwsRemaining + } + wg.Wait() + putWaitGroup(wg) + + return pwsResult +} + +func (pt *partition) mustMergeInmemoryPartsFinal(pws []*partWrapper) *partWrapper { + if len(pws) == 0 { + logger.Panicf("BUG: pws must contain at least a single item") + } + if len(pws) == 1 { + // Nothing to merge + return pws[0] + } + + bsrs := make([]*blockStreamReader, 0, len(pws)) + for _, pw := range pws { + if pw.mp == nil { + logger.Panicf("BUG: unexpected file part") + } + bsr := getBlockStreamReader() + bsr.MustInitFromInmemoryPart(pw.mp) + bsrs = append(bsrs, bsr) + } + + // determine flushToDiskDeadline before performing the actual merge, + // in order to guarantee the correct deadline, since the merge may take significant amounts of time. + flushToDiskDeadline := getFlushToDiskDeadline(pws) + + // Prepare blockStreamWriter for destination part. + srcRowsCount := uint64(0) + srcBlocksCount := uint64(0) + for _, bsr := range bsrs { + srcRowsCount += bsr.ph.RowsCount + srcBlocksCount += bsr.ph.BlocksCount + } + rowsPerBlock := float64(srcRowsCount) / float64(srcBlocksCount) + compressLevel := getCompressLevel(rowsPerBlock) + bsw := getBlockStreamWriter() + mpDst := getInmemoryPart() + bsw.MustInitFromInmemoryPart(mpDst, compressLevel) + + // Merge parts. + // The merge shouldn't be interrupted by stopCh, so use nil stopCh. + ph, err := pt.mergePartsInternal("", bsw, bsrs, partInmemory, nil) + putBlockStreamWriter(bsw) + for _, bsr := range bsrs { + putBlockStreamReader(bsr) + } + if err != nil { + logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) + } + mpDst.ph = *ph + + return newPartWrapperFromInmemoryPart(mpDst, flushToDiskDeadline) +} + func (pt *partition) createInmemoryPart(rows []rawRow) *partWrapper { if len(rows) == 0 { return nil @@ -770,27 +877,40 @@ func incRefForParts(pws []*partWrapper) { // // The pt must be detached from table before calling pt.MustClose. func (pt *partition) MustClose() { + // Notify the background workers to stop. + // The pt.partsLock is aquired in order to guarantee that pt.wg.Add() isn't called + // after pt.stopCh is closed and pt.wg.Wait() is called below. + pt.partsLock.Lock() close(pt.stopCh) + pt.partsLock.Unlock() - // Waiting for service workers to stop + // Wait for background workers to stop. pt.wg.Wait() - pt.flushInmemoryRows() + // Flush the remaining in-memory rows to files. + pt.flushInmemoryRowsToFiles() // Remove references from inmemoryParts, smallParts and bigParts, so they may be eventually closed // after all the searches are done. pt.partsLock.Lock() - inmemoryParts := pt.inmemoryParts - smallParts := pt.smallParts - bigParts := pt.bigParts + + if n := pt.rawRows.Len(); n > 0 { + logger.Panicf("BUG: raw rows must be empty at this stage; got %d rows", n) + } + + if n := len(pt.inmemoryParts); n > 0 { + logger.Panicf("BUG: in-memory parts must be empty at this stage; got %d parts", n) + } pt.inmemoryParts = nil + + smallParts := pt.smallParts pt.smallParts = nil + + bigParts := pt.bigParts pt.bigParts = nil + pt.partsLock.Unlock() - for _, pw := range inmemoryParts { - pw.decRef() - } for _, pw := range smallParts { pw.decRef() } @@ -799,10 +919,65 @@ func (pt *partition) MustClose() { } } -func (pt *partition) startInmemoryPartsFlusher() { +func (pt *partition) startInmemoryPartsMergers() { + pt.partsLock.Lock() + for i := 0; i < cap(inmemoryPartsConcurrencyCh); i++ { + pt.startInmemoryPartsMergerLocked() + } + pt.partsLock.Unlock() +} + +func (pt *partition) startInmemoryPartsMergerLocked() { + select { + case <-pt.stopCh: + return + default: + } pt.wg.Add(1) go func() { - pt.inmemoryPartsFlusher() + pt.inmemoryPartsMerger() + pt.wg.Done() + }() +} + +func (pt *partition) startSmallPartsMergers() { + pt.partsLock.Lock() + for i := 0; i < cap(smallPartsConcurrencyCh); i++ { + pt.startSmallPartsMergerLocked() + } + pt.partsLock.Unlock() +} + +func (pt *partition) startSmallPartsMergerLocked() { + select { + case <-pt.stopCh: + return + default: + } + pt.wg.Add(1) + go func() { + pt.smallPartsMerger() + pt.wg.Done() + }() +} + +func (pt *partition) startBigPartsMergers() { + pt.partsLock.Lock() + for i := 0; i < cap(bigPartsConcurrencyCh); i++ { + pt.startBigPartsMergerLocked() + } + pt.partsLock.Unlock() +} + +func (pt *partition) startBigPartsMergerLocked() { + select { + case <-pt.stopCh: + return + default: + } + pt.wg.Add(1) + go func() { + pt.bigPartsMerger() pt.wg.Done() }() } @@ -815,6 +990,56 @@ func (pt *partition) startPendingRowsFlusher() { }() } +func (pt *partition) startInmemoryPartsFlusher() { + pt.wg.Add(1) + go func() { + pt.inmemoryPartsFlusher() + pt.wg.Done() + }() +} + +func (pt *partition) startStalePartsRemover() { + pt.wg.Add(1) + go func() { + pt.stalePartsRemover() + pt.wg.Done() + }() +} + +var ( + inmemoryPartsConcurrencyCh = make(chan struct{}, getInmemoryPartsConcurrency()) + smallPartsConcurrencyCh = make(chan struct{}, getSmallPartsConcurrency()) + bigPartsConcurrencyCh = make(chan struct{}, getBigPartsConcurrency()) +) + +func getInmemoryPartsConcurrency() int { + // The concurrency for processing in-memory parts must equal to the number of CPU cores, + // since these operations are CPU-bound. + return cgroup.AvailableCPUs() +} + +func getSmallPartsConcurrency() int { + n := cgroup.AvailableCPUs() + if n < 4 { + // Allow at least 4 concurrent workers for small parts on systems + // with less than 4 CPU cores in order to be able to make smaller part merges + // when bigger part merges are in progress. + return 4 + } + return n +} + +func getBigPartsConcurrency() int { + n := cgroup.AvailableCPUs() + if n < 4 { + // Allow at least 4 concurrent workers for big parts on systems + // with less than 4 CPU cores in order to be able to make smaller part merges + // when bigger part merges are in progress. + return 4 + } + return n +} + func (pt *partition) inmemoryPartsFlusher() { d := timeutil.AddJitterToDuration(dataFlushInterval) ticker := time.NewTicker(d) @@ -824,7 +1049,7 @@ func (pt *partition) inmemoryPartsFlusher() { case <-pt.stopCh: return case <-ticker.C: - pt.flushInmemoryParts(false) + pt.flushInmemoryPartsToFiles(false) } } } @@ -848,12 +1073,12 @@ func (pt *partition) flushPendingRows(dst []rawRow, isFinal bool) []rawRow { return pt.rawRows.flush(pt, dst, isFinal) } -func (pt *partition) flushInmemoryRows() { - pt.rawRows.flush(pt, nil, true) - pt.flushInmemoryParts(true) +func (pt *partition) flushInmemoryRowsToFiles() { + pt.flushPendingRows(nil, true) + pt.flushInmemoryPartsToFiles(true) } -func (pt *partition) flushInmemoryParts(isFinal bool) { +func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) { currentTime := time.Now() var pws []*partWrapper @@ -866,7 +1091,7 @@ func (pt *partition) flushInmemoryParts(isFinal bool) { } pt.partsLock.Unlock() - if err := pt.mergePartsOptimal(pws, nil); err != nil { + if err := pt.mergePartsToFiles(pws, nil, inmemoryPartsConcurrencyCh); err != nil { logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) } } @@ -875,7 +1100,7 @@ func (rrss *rawRowsShards) flush(pt *partition, dst []rawRow, isFinal bool) []ra for i := range rrss.shards { dst = rrss.shards[i].appendRawRowsToFlush(dst, isFinal) } - pt.flushRowsToParts(dst) + pt.flushRowsToInmemoryParts(dst) return dst } @@ -899,24 +1124,37 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawR return dst } -func (pt *partition) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error { - sortPartsForOptimalMerge(pws) +func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{}, concurrencyCh chan struct{}) error { + pwsLen := len(pws) + + var errGlobal error + var errGlobalLock sync.Mutex + wg := getWaitGroup() for len(pws) > 0 { - n := defaultPartsToMerge - if n > len(pws) { - n = len(pws) - } - pwsChunk := pws[:n] - pws = pws[n:] - err := pt.mergeParts(pwsChunk, stopCh, true) - if err == nil { - continue - } - pt.releasePartsToMerge(pws) - if errors.Is(err, errForciblyStopped) { - return nil - } - return fmt.Errorf("cannot merge parts optimally: %w", err) + pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws) + wg.Add(1) + concurrencyCh <- struct{}{} + go func(pwsChunk []*partWrapper) { + defer func() { + <-concurrencyCh + wg.Done() + }() + + if err := pt.mergeParts(pwsChunk, stopCh, true); err != nil && !errors.Is(err, errForciblyStopped) { + errGlobalLock.Lock() + if errGlobal == nil { + errGlobal = err + } + errGlobalLock.Unlock() + } + }(pwsToMerge) + pws = pwsRemaining + } + wg.Wait() + putWaitGroup(wg) + + if errGlobal != nil { + return fmt.Errorf("cannot merge %d parts optimally: %w", pwsLen, errGlobal) } return nil } @@ -942,7 +1180,7 @@ func (pt *partition) ForceMergeAllParts() error { // If len(pws) == 1, then the merge must run anyway. // This allows applying the configured retention, removing the deleted series // and performing de-duplication if needed. - if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil { + if err := pt.mergePartsToFiles(pws, pt.stopCh, bigPartsConcurrencyCh); err != nil { return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err) } @@ -983,107 +1221,9 @@ func hasActiveMerges(pws []*partWrapper) bool { return false } -var mergeWorkersLimitCh = make(chan struct{}, getDefaultMergeConcurrency(16)) - -func getDefaultMergeConcurrency(max int) int { - v := (cgroup.AvailableCPUs() + 1) / 2 - if v > max { - v = max - } - return adjustMergeWorkersLimit(v) -} - -// SetMergeWorkersCount sets the maximum number of concurrent mergers for parts. -// -// The function must be called before opening or creating any storage. -func SetMergeWorkersCount(n int) { - if n <= 0 { - // Do nothing - return - } - n = adjustMergeWorkersLimit(n) - mergeWorkersLimitCh = make(chan struct{}, n) -} - -func adjustMergeWorkersLimit(n int) int { - if n < 4 { - // Allow at least 4 merge workers on systems with small CPUs count - // in order to guarantee that background merges can be continued - // when multiple workers are busy with big merges. - n = 4 - } - return n -} - -func (pt *partition) startMergeWorkers() { - // The actual number of concurrent merges is limited inside mergeWorker() below. - for i := 0; i < cap(mergeWorkersLimitCh); i++ { - pt.wg.Add(1) - go func() { - pt.mergeWorker() - pt.wg.Done() - }() - } -} - -func (pt *partition) mergeWorker() { - var lastMergeTime uint64 - isFinal := false - for { - // Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers - // across partitions may exceed the the cap(mergeWorkersLimitCh). - mergeWorkersLimitCh <- struct{}{} - err := pt.mergeExistingParts(isFinal) - <-mergeWorkersLimitCh - if err == nil { - // Try merging additional parts. - lastMergeTime = fasttime.UnixTimestamp() - isFinal = false - continue - } - if errors.Is(err, errForciblyStopped) { - // The merger has been stopped. - return - } - if !errors.Is(err, errNothingToMerge) && !errors.Is(err, errReadOnlyMode) { - // Unexpected error. - logger.Panicf("FATAL: unrecoverable error when merging parts in the partition (%q, %q): %s", pt.smallPartsPath, pt.bigPartsPath, err) - } - if finalMergeDelaySeconds > 0 && fasttime.UnixTimestamp()-lastMergeTime > finalMergeDelaySeconds { - // We have free time for merging into bigger parts. - // This should improve select performance. - lastMergeTime = fasttime.UnixTimestamp() - isFinal = true - continue - } - - // Nothing to merge. Wait for the notification of new merge. - select { - case <-pt.stopCh: - return - case <-pt.needMergeCh: - } - } -} - -// Disable final merge by default, since it may lead to high disk IO and CPU usage -// at the beginning of every month when merging data for the previous month. -var finalMergeDelaySeconds = uint64(0) - -// SetFinalMergeDelay sets the delay before doing final merge for partitions without newly ingested data. -// -// This function may be called only before Storage initialization. -func SetFinalMergeDelay(delay time.Duration) { - if delay <= 0 { - return - } - finalMergeDelaySeconds = uint64(delay.Seconds() + 1) - mergeset.SetFinalMergeDelay(delay) -} - func getMaxInmemoryPartSize() uint64 { // Allocate 10% of allowed memory for in-memory parts. - n := uint64(0.1 * float64(memory.Allowed()) / maxInmemoryPartsPerPartition) + n := uint64(0.1 * float64(memory.Allowed()) / maxInmemoryParts) if n < 1e6 { n = 1e6 } @@ -1103,7 +1243,7 @@ func (pt *partition) getMaxSmallPartSize() uint64 { n = 10e6 } // Make sure the output part fits available disk space for small parts. - sizeLimit := getMaxOutBytes(pt.smallPartsPath, cap(mergeWorkersLimitCh)) + sizeLimit := getMaxOutBytes(pt.smallPartsPath, cap(smallPartsConcurrencyCh)) if n > sizeLimit { n = sizeLimit } @@ -1130,41 +1270,6 @@ func getMaxOutBytes(path string, workersCount int) uint64 { return maxOutBytes } -func (pt *partition) canBackgroundMerge() bool { - return atomic.LoadUint32(&pt.s.isReadOnly) == 0 -} - -var errReadOnlyMode = fmt.Errorf("storage is in readonly mode") - -func (pt *partition) mergeInmemoryParts() error { - maxOutBytes := pt.getMaxBigPartSize() - - pt.partsLock.Lock() - pws := getPartsToMerge(pt.inmemoryParts, maxOutBytes, false) - pt.partsLock.Unlock() - - return pt.mergeParts(pws, pt.stopCh, false) -} - -func (pt *partition) mergeExistingParts(isFinal bool) error { - if !pt.canBackgroundMerge() { - // Do not perform merge in read-only mode, since this may result in disk space shortage. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603 - return errReadOnlyMode - } - maxOutBytes := pt.getMaxBigPartSize() - - pt.partsLock.Lock() - dst := make([]*partWrapper, 0, len(pt.inmemoryParts)+len(pt.smallParts)+len(pt.bigParts)) - dst = append(dst, pt.inmemoryParts...) - dst = append(dst, pt.smallParts...) - dst = append(dst, pt.bigParts...) - pws := getPartsToMerge(dst, maxOutBytes, isFinal) - pt.partsLock.Unlock() - - return pt.mergeParts(pws, pt.stopCh, isFinal) -} - func assertIsInMerge(pws []*partWrapper) { for _, pw := range pws { if !pw.isInMerge { @@ -1184,8 +1289,6 @@ func (pt *partition) releasePartsToMerge(pws []*partWrapper) { pt.partsLock.Unlock() } -var errNothingToMerge = fmt.Errorf("nothing to merge") - func (pt *partition) runFinalDedup() error { requiredDedupInterval, actualDedupInterval := pt.getRequiredDedupInterval() t := time.Now() @@ -1227,16 +1330,19 @@ func getMinDedupInterval(pws []*partWrapper) int64 { // mergeParts merges pws to a single resulting part. // +// It is expected that pws contains at least a single part. +// // Merging is immediately stopped if stopCh is closed. // // if isFinal is set, then the resulting part will be saved to disk. +// If at least a single source part at pws is stored on disk, then the resulting part +// will be stored to disk. // // All the parts inside pws must have isInMerge field set to true. // The isInMerge field inside pws parts is set to false before returning from the function. func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal bool) error { if len(pws) == 0 { - // Nothing to merge. - return errNothingToMerge + logger.Panicf("BUG: empty pws cannot be passed to mergeParts()") } assertIsInMerge(pws) @@ -1473,13 +1579,8 @@ func areAllInmemoryParts(pws []*partWrapper) bool { func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) { // Atomically unregister old parts and add new part to pt. - m := make(map[*partWrapper]bool, len(pws)) - for _, pw := range pws { - m[pw] = true - } - if len(m) != len(pws) { - logger.Panicf("BUG: %d duplicate parts found when merging %d parts", len(pws)-len(m), len(pws)) - } + m := makeMapFromPartWrappers(pws) + removedInmemoryParts := 0 removedSmallParts := 0 removedBigParts := 0 @@ -1493,14 +1594,16 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, switch dstPartType { case partInmemory: pt.inmemoryParts = append(pt.inmemoryParts, pwNew) + pt.startInmemoryPartsMergerLocked() case partSmall: pt.smallParts = append(pt.smallParts, pwNew) + pt.startSmallPartsMergerLocked() case partBig: pt.bigParts = append(pt.bigParts, pwNew) + pt.startBigPartsMergerLocked() default: logger.Panicf("BUG: unknown partType=%d", dstPartType) } - pt.notifyBackgroundMergers() } // Atomically store the updated list of file-based parts on disk. @@ -1549,10 +1652,10 @@ func (pt *partition) nextMergeIdx() uint64 { return atomic.AddUint64(&pt.mergeIdx, 1) } -func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*partWrapper, int) { +func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) { dst := pws[:0] for _, pw := range pws { - if !partsToRemove[pw] { + if _, ok := partsToRemove[pw]; !ok { dst = append(dst, pw) } } @@ -1562,14 +1665,6 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*pa return dst, len(pws) - len(dst) } -func (pt *partition) startStalePartsRemover() { - pt.wg.Add(1) - go func() { - pt.stalePartsRemover() - pt.wg.Done() - }() -} - func (pt *partition) stalePartsRemover() { d := timeutil.AddJitterToDuration(7 * time.Minute) ticker := time.NewTicker(d) @@ -1619,30 +1714,49 @@ func (pt *partition) removeStaleParts() { // getPartsToMerge returns optimal parts to merge from pws. // // The summary size of the returned parts must be smaller than maxOutBytes. -func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64, isFinal bool) []*partWrapper { +func getPartsToMerge(pws []*partWrapper, maxOutBytes uint64) []*partWrapper { pwsRemaining := make([]*partWrapper, 0, len(pws)) for _, pw := range pws { if !pw.isInMerge { pwsRemaining = append(pwsRemaining, pw) } } - maxPartsToMerge := defaultPartsToMerge - var pms []*partWrapper - if isFinal { - for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge { - pms = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) - maxPartsToMerge-- - } - } else { - pms = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxOutBytes) - } - for _, pw := range pms { + + pwsToMerge := appendPartsToMerge(nil, pwsRemaining, defaultPartsToMerge, maxOutBytes) + + for _, pw := range pwsToMerge { if pw.isInMerge { logger.Panicf("BUG: partWrapper.isInMerge cannot be set") } pw.isInMerge = true } - return pms + + return pwsToMerge +} + +// getPartsForOptimalMerge returns parts from pws for optimal merge, plus the remaining parts. +// +// the pws items are replaced by nil after the call. This is needed for helping Go GC to reclaim the referenced items. +func getPartsForOptimalMerge(pws []*partWrapper) ([]*partWrapper, []*partWrapper) { + pwsToMerge := appendPartsToMerge(nil, pws, defaultPartsToMerge, 1<<64-1) + if len(pwsToMerge) == 0 { + return pws, nil + } + + m := makeMapFromPartWrappers(pwsToMerge) + pwsRemaining := make([]*partWrapper, 0, len(pws)-len(pwsToMerge)) + for _, pw := range pws { + if _, ok := m[pw]; !ok { + pwsRemaining = append(pwsRemaining, pw) + } + } + + // Clear references to pws items, so they could be reclaimed faster by Go GC. + for i := range pws { + pws[i] = nil + } + + return pwsToMerge, pwsRemaining } // minMergeMultiplier is the minimum multiplier for the size of the output part @@ -1736,6 +1850,17 @@ func sortPartsForOptimalMerge(pws []*partWrapper) { }) } +func makeMapFromPartWrappers(pws []*partWrapper) map[*partWrapper]struct{} { + m := make(map[*partWrapper]struct{}, len(pws)) + for _, pw := range pws { + m[pw] = struct{}{} + } + if len(m) != len(pws) { + logger.Panicf("BUG: %d duplicate parts found in %d source parts", len(pws)-len(m), len(pws)) + } + return m +} + func getPartsSize(pws []*partWrapper) uint64 { n := uint64(0) for _, pw := range pws { @@ -1809,7 +1934,7 @@ func (pt *partition) MustCreateSnapshotAt(smallPath, bigPath string) { startTime := time.Now() // Flush inmemory data to disk. - pt.flushInmemoryRows() + pt.flushInmemoryRowsToFiles() pt.partsLock.Lock() incRefForParts(pt.smallParts) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index d4b44b043..c6e59e165 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -663,7 +663,8 @@ func (s *Storage) startFreeDiskSpaceWatcher() { // Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization // when the storage isn't in read-only mode. if atomic.LoadUint32(&s.isReadOnly) == 1 && atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) { - logger.Warnf("enabling writing to the storage at %s, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left", + s.notifyReadWriteMode() + logger.Warnf("switching the storage at %s to read-write mode, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left", s.path, freeDiskSpaceLimitBytes, freeSpaceBytes) } } @@ -685,6 +686,16 @@ func (s *Storage) startFreeDiskSpaceWatcher() { }() } +func (s *Storage) notifyReadWriteMode() { + s.tb.NotifyReadWriteMode() + + idb := s.idb() + idb.tb.NotifyReadWriteMode() + idb.doExtDB(func(extDB *indexDB) { + extDB.tb.NotifyReadWriteMode() + }) +} + func (s *Storage) startRetentionWatcher() { s.retentionWatcherWG.Add(1) go func() { diff --git a/lib/storage/table.go b/lib/storage/table.go index d3b2efbf3..1dafbbc87 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -205,6 +205,14 @@ func (tb *table) flushPendingRows() { } } +func (tb *table) NotifyReadWriteMode() { + tb.ptwsLock.Lock() + for _, ptw := range tb.ptws { + ptw.pt.NotifyReadWriteMode() + } + tb.ptwsLock.Unlock() +} + // TableMetrics contains essential metrics for the table. type TableMetrics struct { partitionMetrics