lib/{mergeset,storage}: perform at most one assisted merge per each call to addRows/addItems

This should reduce tail latency during data ingestion.

This shouldn't slow down data ingestion in the worst case, since assisted merges are spread among
distinct addRows/addItems calls after this change.
This commit is contained in:
Aliaksandr Valialkin 2023-10-01 22:17:38 +02:00
parent 7373d04d54
commit 3ca6fea858
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 60 additions and 68 deletions

View File

@ -773,7 +773,6 @@ func needAssistedMerge(pws []*partWrapper, maxParts int) bool {
} }
func (tb *Table) assistedMergeForInmemoryParts() { func (tb *Table) assistedMergeForInmemoryParts() {
for {
tb.partsLock.Lock() tb.partsLock.Lock()
needMerge := needAssistedMerge(tb.inmemoryParts, maxInmemoryParts) needMerge := needAssistedMerge(tb.inmemoryParts, maxInmemoryParts)
tb.partsLock.Unlock() tb.partsLock.Unlock()
@ -784,17 +783,15 @@ func (tb *Table) assistedMergeForInmemoryParts() {
atomic.AddUint64(&tb.inmemoryAssistedMerges, 1) atomic.AddUint64(&tb.inmemoryAssistedMerges, 1)
err := tb.mergeInmemoryParts() err := tb.mergeInmemoryParts()
if err == nil { if err == nil {
continue return
} }
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {
return return
} }
logger.Panicf("FATAL: cannot assist with merging inmemory parts: %s", err) logger.Panicf("FATAL: cannot assist with merging inmemory parts: %s", err)
} }
}
func (tb *Table) assistedMergeForFileParts() { func (tb *Table) assistedMergeForFileParts() {
for {
tb.partsLock.Lock() tb.partsLock.Lock()
needMerge := needAssistedMerge(tb.fileParts, maxFileParts) needMerge := needAssistedMerge(tb.fileParts, maxFileParts)
tb.partsLock.Unlock() tb.partsLock.Unlock()
@ -805,14 +802,13 @@ func (tb *Table) assistedMergeForFileParts() {
atomic.AddUint64(&tb.fileAssistedMerges, 1) atomic.AddUint64(&tb.fileAssistedMerges, 1)
err := tb.mergeExistingParts(false) err := tb.mergeExistingParts(false)
if err == nil { if err == nil {
continue return
} }
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) {
return return
} }
logger.Panicf("FATAL: cannot assist with merging file parts: %s", err) logger.Panicf("FATAL: cannot assist with merging file parts: %s", err)
} }
}
func getNotInMergePartsCount(pws []*partWrapper) int { func getNotInMergePartsCount(pws []*partWrapper) int {
n := 0 n := 0

View File

@ -634,7 +634,6 @@ func needAssistedMerge(pws []*partWrapper, maxParts int) bool {
} }
func (pt *partition) assistedMergeForInmemoryParts() { func (pt *partition) assistedMergeForInmemoryParts() {
for {
pt.partsLock.Lock() pt.partsLock.Lock()
needMerge := needAssistedMerge(pt.inmemoryParts, maxInmemoryPartsPerPartition) needMerge := needAssistedMerge(pt.inmemoryParts, maxInmemoryPartsPerPartition)
pt.partsLock.Unlock() pt.partsLock.Unlock()
@ -645,17 +644,15 @@ func (pt *partition) assistedMergeForInmemoryParts() {
atomic.AddUint64(&pt.inmemoryAssistedMerges, 1) atomic.AddUint64(&pt.inmemoryAssistedMerges, 1)
err := pt.mergeInmemoryParts() err := pt.mergeInmemoryParts()
if err == nil { if err == nil {
continue return
} }
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) { if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {
return return
} }
logger.Panicf("FATAL: cannot merge inmemory parts: %s", err) logger.Panicf("FATAL: cannot merge inmemory parts: %s", err)
} }
}
func (pt *partition) assistedMergeForSmallParts() { func (pt *partition) assistedMergeForSmallParts() {
for {
pt.partsLock.Lock() pt.partsLock.Lock()
needMerge := needAssistedMerge(pt.smallParts, maxSmallPartsPerPartition) needMerge := needAssistedMerge(pt.smallParts, maxSmallPartsPerPartition)
pt.partsLock.Unlock() pt.partsLock.Unlock()
@ -666,14 +663,13 @@ func (pt *partition) assistedMergeForSmallParts() {
atomic.AddUint64(&pt.smallAssistedMerges, 1) atomic.AddUint64(&pt.smallAssistedMerges, 1)
err := pt.mergeExistingParts(false) err := pt.mergeExistingParts(false)
if err == nil { if err == nil {
continue return
} }
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) { if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) || errors.Is(err, errReadOnlyMode) {
return return
} }
logger.Panicf("FATAL: cannot merge small parts: %s", err) logger.Panicf("FATAL: cannot merge small parts: %s", err)
} }
}
func getNotInMergePartsCount(pws []*partWrapper) int { func getNotInMergePartsCount(pws []*partWrapper) int {
n := 0 n := 0