lib/mergeset: really limit the number of in-memory parts to 15

It has been appeared that the registration of new time series slows down linearly
with the number of indexdb parts, since VictoriaMetrics needs to check every indexdb part
when it searches for TSID by newly ingested metric name.

The number of in-memory parts grows when new time series are registered
at high rate. The number of in-memory parts grows faster on systems with big number
of CPU cores, because the mergeset maintains per-CPU buffers with newly added entries
for the indexdb, and every such entry is transformed eventually into a separate in-memory part.

The solution has been suggested in https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212
by @misutoth - to limit the number of in-memory parts with buffered channel.
This solution is implemented in this commit. Additionally, this commit merges per-CPU parts
into a single part before adding it to the list of in-memory parts. This reduces CPU load
when searching for TSID by newly ingested metric name.

The https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 recommends setting the limit on the number
of in-memory parts to 100, but my internal testing shows that much lower limit 15 works with the same efficiency
on a system with 16 CPU cores while reducing memory usage for `indexdb/dataBlocks` cache by up to 50%.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5190
This commit is contained in:
Aliaksandr Valialkin 2024-01-24 03:27:49 +02:00
parent 5205f1c6a6
commit 12698b9136
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
2 changed files with 84 additions and 48 deletions

View File

@ -30,6 +30,7 @@ The sandbox cluster installation is running under the constant load generated by
* SECURITY: upgrade Go builder from Go1.21.5 to Go1.21.6. See [the list of issues addressed in Go1.21.6](https://github.com/golang/go/issues?q=milestone%3AGo1.21.6+label%3ACherryPickApproved). * SECURITY: upgrade Go builder from Go1.21.5 to Go1.21.6. See [the list of issues addressed in Go1.21.6](https://github.com/golang/go/issues?q=milestone%3AGo1.21.6+label%3ACherryPickApproved).
* FEATURE: improve new [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series) registration speed on systems with high number of CPU cores. Thanks to @misutoth for the initial idea and [implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for discovering [Hetzner Cloud](https://www.hetzner.com/cloud) and [Hetzner Robot](https://docs.hetzner.com/robot) scrape targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3154) and [these docs](https://docs.victoriametrics.com/sd_configs.html#hetzner_sd_configs). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for discovering [Hetzner Cloud](https://www.hetzner.com/cloud) and [Hetzner Robot](https://docs.hetzner.com/robot) scrape targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3154) and [these docs](https://docs.victoriametrics.com/sd_configs.html#hetzner_sd_configs).
* FEATURE: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): add support for negative index in `groupByNode` and `aliasByNode` functions. Thanks to @rbizos for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5581). * FEATURE: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): add support for negative index in `groupByNode` and `aliasByNode` functions. Thanks to @rbizos for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5581).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451).

View File

@ -24,17 +24,20 @@ import (
// maxInmemoryParts is the maximum number of inmemory parts in the table. // maxInmemoryParts is the maximum number of inmemory parts in the table.
// //
// This limit allows reducing CPU usage under high ingestion rate.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212
//
// This number may be reached when the insertion pace outreaches merger pace. // This number may be reached when the insertion pace outreaches merger pace.
// If this number is reached, then assisted merges are performed // If this number is reached, then assisted merges are performed
// during data ingestion. // during data ingestion.
const maxInmemoryParts = 30 const maxInmemoryParts = 15
// maxFileParts is the maximum number of file parts in the table. // maxFileParts is the maximum number of file parts in the table.
// //
// This number may be reached when the insertion pace outreaches merger pace. // This number may be reached when the insertion pace outreaches merger pace.
// If this number is reached, then assisted merges are performed // If this number is reached, then assisted merges are performed
// during data ingestion. // during data ingestion.
const maxFileParts = 64 const maxFileParts = 30
// Default number of parts to merge at once. // Default number of parts to merge at once.
// //
@ -135,6 +138,10 @@ type Table struct {
// inmemoryParts contains inmemory parts. // inmemoryParts contains inmemory parts.
inmemoryParts []*partWrapper inmemoryParts []*partWrapper
// inmemoryPartsLimitCh limits the number of inmemory parts
// in order to prevent from data ingestion slowdown as described at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212
inmemoryPartsLimitCh chan struct{}
// fileParts contains file-backed parts. // fileParts contains file-backed parts.
fileParts []*partWrapper fileParts []*partWrapper
@ -255,14 +262,6 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
tb.flushBlocksToParts(ibsToFlush, false) tb.flushBlocksToParts(ibsToFlush, false)
if len(ibsToFlush) > 0 {
// Run assisted merges if needed.
flushConcurrencyCh <- struct{}{}
tb.assistedMergeForInmemoryParts()
tb.assistedMergeForFileParts()
<-flushConcurrencyCh
}
return tailItems return tailItems
} }
@ -338,6 +337,7 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC
flushCallback: flushCallback, flushCallback: flushCallback,
prepareBlock: prepareBlock, prepareBlock: prepareBlock,
isReadOnly: isReadOnly, isReadOnly: isReadOnly,
inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts),
fileParts: pws, fileParts: pws,
mergeIdx: uint64(time.Now().UnixNano()), mergeIdx: uint64(time.Now().UnixNano()),
needMergeCh: make(chan struct{}, 1), needMergeCh: make(chan struct{}, 1),
@ -728,13 +728,25 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) {
wg.Wait() wg.Wait()
putWaitGroup(wg) putWaitGroup(wg)
flushConcurrencyCh <- struct{}{}
pw := tb.mustMergeInmemoryParts(pws)
<-flushConcurrencyCh
select {
case tb.inmemoryPartsLimitCh <- struct{}{}:
default:
// Too many in-memory parts. Try assist merging them before adding pw to tb.inmemoryParts.
flushConcurrencyCh <- struct{}{}
tb.assistedMergeForInmemoryParts()
tb.assistedMergeForFileParts()
<-flushConcurrencyCh
tb.inmemoryPartsLimitCh <- struct{}{}
}
tb.partsLock.Lock() tb.partsLock.Lock()
tb.inmemoryParts = append(tb.inmemoryParts, pws...) tb.inmemoryParts = append(tb.inmemoryParts, pw)
for range pws { tb.notifyBackgroundMergers()
if !tb.notifyBackgroundMergers() {
break
}
}
tb.partsLock.Unlock() tb.partsLock.Unlock()
if tb.flushCallback != nil { if tb.flushCallback != nil {
@ -772,23 +784,23 @@ var flushConcurrencyLimit = func() int {
var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit)
func needAssistedMerge(pws []*partWrapper, maxParts int) bool {
if len(pws) < maxParts {
return false
}
return getNotInMergePartsCount(pws) >= defaultPartsToMerge
}
func (tb *Table) assistedMergeForInmemoryParts() { func (tb *Table) assistedMergeForInmemoryParts() {
tb.partsLock.Lock() tb.partsLock.Lock()
needMerge := needAssistedMerge(tb.inmemoryParts, maxInmemoryParts) needMerge := getNotInMergePartsCount(tb.inmemoryParts) >= defaultPartsToMerge
tb.partsLock.Unlock() tb.partsLock.Unlock()
if !needMerge { if !needMerge {
return return
} }
atomic.AddUint64(&tb.inmemoryAssistedMerges, 1) atomic.AddUint64(&tb.inmemoryAssistedMerges, 1)
err := tb.mergeInmemoryParts()
maxOutBytes := tb.getMaxFilePartSize()
tb.partsLock.Lock()
pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes, true)
tb.partsLock.Unlock()
err := tb.mergeParts(pws, tb.stopCh, true)
if err == nil { if err == nil {
return return
} }
@ -800,7 +812,7 @@ func (tb *Table) assistedMergeForInmemoryParts() {
func (tb *Table) assistedMergeForFileParts() { func (tb *Table) assistedMergeForFileParts() {
tb.partsLock.Lock() tb.partsLock.Lock()
needMerge := needAssistedMerge(tb.fileParts, maxFileParts) needMerge := getNotInMergePartsCount(tb.fileParts) >= defaultPartsToMerge
tb.partsLock.Unlock() tb.partsLock.Unlock()
if !needMerge { if !needMerge {
return return
@ -841,12 +853,27 @@ func putWaitGroup(wg *sync.WaitGroup) {
var wgPool sync.Pool var wgPool sync.Pool
func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) *partWrapper {
outItemsCount := uint64(0) if len(pws) == 1 {
for _, ib := range ibs { // Nothing to merge
outItemsCount += uint64(ib.Len()) return pws[0]
} }
bsrs := make([]*blockStreamReader, 0, len(pws))
for _, pw := range pws {
if pw.mp == nil {
logger.Panicf("BUG: unexpected file part")
}
bsr := getBlockStreamReader()
bsr.MustInitFromInmemoryPart(pw.mp)
bsrs = append(bsrs, bsr)
}
flushToDiskDeadline := getFlushToDiskDeadline(pws)
return tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline)
}
func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper {
// Prepare blockStreamReaders for source blocks. // Prepare blockStreamReaders for source blocks.
bsrs := make([]*blockStreamReader, 0, len(ibs)) bsrs := make([]*blockStreamReader, 0, len(ibs))
for _, ib := range ibs { for _, ib := range ibs {
@ -861,6 +888,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper {
if len(bsrs) == 0 { if len(bsrs) == 0 {
return nil return nil
} }
flushToDiskDeadline := time.Now().Add(dataFlushInterval) flushToDiskDeadline := time.Now().Add(dataFlushInterval)
if len(bsrs) == 1 { if len(bsrs) == 1 {
// Nothing to merge. Just return a single inmemory part. // Nothing to merge. Just return a single inmemory part.
@ -871,7 +899,15 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper {
return newPartWrapperFromInmemoryPart(mp, flushToDiskDeadline) return newPartWrapperFromInmemoryPart(mp, flushToDiskDeadline)
} }
return tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline)
}
func (tb *Table) mustMergeIntoInmemoryPart(bsrs []*blockStreamReader, flushToDiskDeadline time.Time) *partWrapper {
// Prepare blockStreamWriter for destination part. // Prepare blockStreamWriter for destination part.
outItemsCount := uint64(0)
for _, bsr := range bsrs {
outItemsCount += bsr.ph.itemsCount
}
compressLevel := getCompressLevel(outItemsCount) compressLevel := getCompressLevel(outItemsCount)
bsw := getBlockStreamWriter() bsw := getBlockStreamWriter()
mpDst := &inmemoryPart{} mpDst := &inmemoryPart{}
@ -891,6 +927,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper {
for _, bsr := range bsrs { for _, bsr := range bsrs {
putBlockStreamReader(bsr) putBlockStreamReader(bsr)
} }
return newPartWrapperFromInmemoryPart(mpDst, flushToDiskDeadline) return newPartWrapperFromInmemoryPart(mpDst, flushToDiskDeadline)
} }
@ -940,16 +977,6 @@ func (tb *Table) canBackgroundMerge() bool {
var errReadOnlyMode = fmt.Errorf("storage is in readonly mode") var errReadOnlyMode = fmt.Errorf("storage is in readonly mode")
func (tb *Table) mergeInmemoryParts() error {
maxOutBytes := tb.getMaxFilePartSize()
tb.partsLock.Lock()
pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes, false)
tb.partsLock.Unlock()
return tb.mergeParts(pws, tb.stopCh, false)
}
func (tb *Table) mergeExistingParts(isFinal bool) error { func (tb *Table) mergeExistingParts(isFinal bool) error {
if !tb.canBackgroundMerge() { if !tb.canBackgroundMerge() {
// Do not perform background merge in read-only mode // Do not perform background merge in read-only mode
@ -1278,6 +1305,14 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst
tb.partsLock.Unlock() tb.partsLock.Unlock()
// Update inmemoryPartsLimitCh accordingly to the number of the remaining in-memory parts.
for i := 0; i < removedInmemoryParts; i++ {
<-tb.inmemoryPartsLimitCh
}
if dstPartType == partInmemory {
tb.inmemoryPartsLimitCh <- struct{}{}
}
removedParts := removedInmemoryParts + removedFileParts removedParts := removedInmemoryParts + removedFileParts
if removedParts != len(m) { if removedParts != len(m) {
logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(m)) logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(m))