diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2b36f335a..8c1b4dca3 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,7 @@ The sandbox cluster installation is running under the constant load generated by * SECURITY: upgrade Go builder from Go1.21.5 to Go1.21.6. See [the list of issues addressed in Go1.21.6](https://github.com/golang/go/issues?q=milestone%3AGo1.21.6+label%3ACherryPickApproved). +* FEATURE: improve new [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series) registration speed on systems with high number of CPU cores. Thanks to @misutoth for the initial idea and [implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for discovering [Hetzner Cloud](https://www.hetzner.com/cloud) and [Hetzner Robot](https://docs.hetzner.com/robot) scrape targets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3154) and [these docs](https://docs.victoriametrics.com/sd_configs.html#hetzner_sd_configs). * FEATURE: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): add support for negative index in `groupByNode` and `aliasByNode` functions. Thanks to @rbizos for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5581). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index ab82d123c..b0680ed9f 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -24,17 +24,20 @@ import ( // maxInmemoryParts is the maximum number of inmemory parts in the table. // +// This limit allows reducing CPU usage under high ingestion rate. +// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 +// // This number may be reached when the insertion pace outreaches merger pace. // If this number is reached, then assisted merges are performed // during data ingestion. -const maxInmemoryParts = 30 +const maxInmemoryParts = 15 // maxFileParts is the maximum number of file parts in the table. // // This number may be reached when the insertion pace outreaches merger pace. // If this number is reached, then assisted merges are performed // during data ingestion. -const maxFileParts = 64 +const maxFileParts = 30 // Default number of parts to merge at once. // @@ -135,6 +138,10 @@ type Table struct { // inmemoryParts contains inmemory parts. inmemoryParts []*partWrapper + // inmemoryPartsLimitCh limits the number of inmemory parts + // in order to prevent from data ingestion slowdown as described at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 + inmemoryPartsLimitCh chan struct{} + // fileParts contains file-backed parts. fileParts []*partWrapper @@ -255,14 +262,6 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { tb.flushBlocksToParts(ibsToFlush, false) - if len(ibsToFlush) > 0 { - // Run assisted merges if needed. - flushConcurrencyCh <- struct{}{} - tb.assistedMergeForInmemoryParts() - tb.assistedMergeForFileParts() - <-flushConcurrencyCh - } - return tailItems } @@ -334,14 +333,15 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC pws := mustOpenParts(path) tb := &Table{ - path: path, - flushCallback: flushCallback, - prepareBlock: prepareBlock, - isReadOnly: isReadOnly, - fileParts: pws, - mergeIdx: uint64(time.Now().UnixNano()), - needMergeCh: make(chan struct{}, 1), - stopCh: make(chan struct{}), + path: path, + flushCallback: flushCallback, + prepareBlock: prepareBlock, + isReadOnly: isReadOnly, + inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts), + fileParts: pws, + mergeIdx: uint64(time.Now().UnixNano()), + needMergeCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), } tb.rawItems.init() tb.startBackgroundWorkers() @@ -728,13 +728,25 @@ func (tb *Table) flushBlocksToParts(ibs []*inmemoryBlock, isFinal bool) { wg.Wait() putWaitGroup(wg) - tb.partsLock.Lock() - tb.inmemoryParts = append(tb.inmemoryParts, pws...) - for range pws { - if !tb.notifyBackgroundMergers() { - break - } + flushConcurrencyCh <- struct{}{} + pw := tb.mustMergeInmemoryParts(pws) + <-flushConcurrencyCh + + select { + case tb.inmemoryPartsLimitCh <- struct{}{}: + default: + // Too many in-memory parts. Try assist merging them before adding pw to tb.inmemoryParts. + flushConcurrencyCh <- struct{}{} + tb.assistedMergeForInmemoryParts() + tb.assistedMergeForFileParts() + <-flushConcurrencyCh + + tb.inmemoryPartsLimitCh <- struct{}{} } + + tb.partsLock.Lock() + tb.inmemoryParts = append(tb.inmemoryParts, pw) + tb.notifyBackgroundMergers() tb.partsLock.Unlock() if tb.flushCallback != nil { @@ -772,23 +784,23 @@ var flushConcurrencyLimit = func() int { var flushConcurrencyCh = make(chan struct{}, flushConcurrencyLimit) -func needAssistedMerge(pws []*partWrapper, maxParts int) bool { - if len(pws) < maxParts { - return false - } - return getNotInMergePartsCount(pws) >= defaultPartsToMerge -} - func (tb *Table) assistedMergeForInmemoryParts() { tb.partsLock.Lock() - needMerge := needAssistedMerge(tb.inmemoryParts, maxInmemoryParts) + needMerge := getNotInMergePartsCount(tb.inmemoryParts) >= defaultPartsToMerge tb.partsLock.Unlock() if !needMerge { return } atomic.AddUint64(&tb.inmemoryAssistedMerges, 1) - err := tb.mergeInmemoryParts() + + maxOutBytes := tb.getMaxFilePartSize() + + tb.partsLock.Lock() + pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes, true) + tb.partsLock.Unlock() + + err := tb.mergeParts(pws, tb.stopCh, true) if err == nil { return } @@ -800,7 +812,7 @@ func (tb *Table) assistedMergeForInmemoryParts() { func (tb *Table) assistedMergeForFileParts() { tb.partsLock.Lock() - needMerge := needAssistedMerge(tb.fileParts, maxFileParts) + needMerge := getNotInMergePartsCount(tb.fileParts) >= defaultPartsToMerge tb.partsLock.Unlock() if !needMerge { return @@ -841,12 +853,27 @@ func putWaitGroup(wg *sync.WaitGroup) { var wgPool sync.Pool -func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { - outItemsCount := uint64(0) - for _, ib := range ibs { - outItemsCount += uint64(ib.Len()) +func (tb *Table) mustMergeInmemoryParts(pws []*partWrapper) *partWrapper { + if len(pws) == 1 { + // Nothing to merge + return pws[0] } + bsrs := make([]*blockStreamReader, 0, len(pws)) + for _, pw := range pws { + if pw.mp == nil { + logger.Panicf("BUG: unexpected file part") + } + bsr := getBlockStreamReader() + bsr.MustInitFromInmemoryPart(pw.mp) + bsrs = append(bsrs, bsr) + } + + flushToDiskDeadline := getFlushToDiskDeadline(pws) + return tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline) +} + +func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { // Prepare blockStreamReaders for source blocks. bsrs := make([]*blockStreamReader, 0, len(ibs)) for _, ib := range ibs { @@ -861,6 +888,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { if len(bsrs) == 0 { return nil } + flushToDiskDeadline := time.Now().Add(dataFlushInterval) if len(bsrs) == 1 { // Nothing to merge. Just return a single inmemory part. @@ -871,7 +899,15 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { return newPartWrapperFromInmemoryPart(mp, flushToDiskDeadline) } + return tb.mustMergeIntoInmemoryPart(bsrs, flushToDiskDeadline) +} + +func (tb *Table) mustMergeIntoInmemoryPart(bsrs []*blockStreamReader, flushToDiskDeadline time.Time) *partWrapper { // Prepare blockStreamWriter for destination part. + outItemsCount := uint64(0) + for _, bsr := range bsrs { + outItemsCount += bsr.ph.itemsCount + } compressLevel := getCompressLevel(outItemsCount) bsw := getBlockStreamWriter() mpDst := &inmemoryPart{} @@ -891,6 +927,7 @@ func (tb *Table) createInmemoryPart(ibs []*inmemoryBlock) *partWrapper { for _, bsr := range bsrs { putBlockStreamReader(bsr) } + return newPartWrapperFromInmemoryPart(mpDst, flushToDiskDeadline) } @@ -940,16 +977,6 @@ func (tb *Table) canBackgroundMerge() bool { var errReadOnlyMode = fmt.Errorf("storage is in readonly mode") -func (tb *Table) mergeInmemoryParts() error { - maxOutBytes := tb.getMaxFilePartSize() - - tb.partsLock.Lock() - pws := getPartsToMerge(tb.inmemoryParts, maxOutBytes, false) - tb.partsLock.Unlock() - - return tb.mergeParts(pws, tb.stopCh, false) -} - func (tb *Table) mergeExistingParts(isFinal bool) error { if !tb.canBackgroundMerge() { // Do not perform background merge in read-only mode @@ -1278,6 +1305,14 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst tb.partsLock.Unlock() + // Update inmemoryPartsLimitCh accordingly to the number of the remaining in-memory parts. + for i := 0; i < removedInmemoryParts; i++ { + <-tb.inmemoryPartsLimitCh + } + if dstPartType == partInmemory { + tb.inmemoryPartsLimitCh <- struct{}{} + } + removedParts := removedInmemoryParts + removedFileParts if removedParts != len(m) { logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(m))