lib/logstorage/datadb: remove parts merge cond (#4828)

It was added in order to limit number of goroutines performing assisted merges during ingestion.
It turned out that blocking ingestion goroutines lower ingestion performance and limits overall ingestion around 40k items per seconds because of lock contention.
Removing parts merge sync.Cond allows to remove lock contention at write path and significantly improves write performance.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4775

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2023-09-29 13:50:14 +04:00 committed by Aliaksandr Valialkin
parent 10371eac60
commit 53268ebc66
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1

View File

@ -69,9 +69,6 @@ type datadb struct {
// stopCh is used for notifying background workers to stop
stopCh chan struct{}
// mergeDoneCond is used for pace-limiting the data ingestion rate
mergeDoneCond *sync.Cond
// inmemoryPartsFlushersCount is the number of currently running in-memory parts flushers
//
// This variable must be accessed under partsLock.
@ -173,7 +170,6 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da
fileParts: pws,
stopCh: make(chan struct{}),
}
ddb.mergeDoneCond = sync.NewCond(&ddb.partsLock)
// Start merge workers in the hope they'll merge the remaining parts
ddb.partsLock.Lock()
@ -415,7 +411,6 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
}
if needStop(stopCh) {
ddb.releasePartsToMerge(pws)
ddb.mergeDoneCond.Broadcast()
// Remove incomplete destination part
if dstPartType == partFile {
fs.MustRemoveAll(dstPartPath)
@ -526,10 +521,6 @@ func (ddb *datadb) mustAddRows(lr *LogRows) {
if len(ddb.inmemoryParts) > defaultPartsToMerge {
ddb.startMergeWorkerLocked()
}
for len(ddb.inmemoryParts) > maxInmemoryPartsPerPartition {
// limit the pace for data ingestion if too many inmemory parts are created
ddb.mergeDoneCond.Wait()
}
ddb.partsLock.Unlock()
}
@ -696,8 +687,6 @@ func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, d
atomic.StoreUint32(&pw.mustBeDeleted, 1)
pw.decRef()
}
ddb.mergeDoneCond.Broadcast()
}
func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) {