diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index 41f9c31a61..d3afc82f12 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -69,9 +69,6 @@ type datadb struct { // stopCh is used for notifying background workers to stop stopCh chan struct{} - // mergeDoneCond is used for pace-limiting the data ingestion rate - mergeDoneCond *sync.Cond - // inmemoryPartsFlushersCount is the number of currently running in-memory parts flushers // // This variable must be accessed under partsLock. @@ -173,7 +170,6 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da fileParts: pws, stopCh: make(chan struct{}), } - ddb.mergeDoneCond = sync.NewCond(&ddb.partsLock) // Start merge workers in the hope they'll merge the remaining parts ddb.partsLock.Lock() @@ -415,7 +411,6 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { } if needStop(stopCh) { ddb.releasePartsToMerge(pws) - ddb.mergeDoneCond.Broadcast() // Remove incomplete destination part if dstPartType == partFile { fs.MustRemoveAll(dstPartPath) @@ -526,10 +521,6 @@ func (ddb *datadb) mustAddRows(lr *LogRows) { if len(ddb.inmemoryParts) > defaultPartsToMerge { ddb.startMergeWorkerLocked() } - for len(ddb.inmemoryParts) > maxInmemoryPartsPerPartition { - // limit the pace for data ingestion if too many inmemory parts are created - ddb.mergeDoneCond.Wait() - } ddb.partsLock.Unlock() } @@ -696,8 +687,6 @@ func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, d atomic.StoreUint32(&pw.mustBeDeleted, 1) pw.decRef() } - - ddb.mergeDoneCond.Broadcast() } func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) {