lib/storage: do not keep rawRows buffer across flush() calls

The buffer can be quite big under high ingestion rate (e.g. more than 100MB).
This leads to increased memory usage between buffer flushes.
So it is better to re-create the buffer on every flush in order to reduce memory usage
between buffer flushes.
This commit is contained in:
Aliaksandr Valialkin 2024-02-22 17:22:23 +02:00
parent fa19daf3bd
commit b7dfe9894c
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
4 changed files with 18 additions and 21 deletions

View File

@ -255,7 +255,9 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
ris.ibs = ibs ris.ibs = ibs
ris.mu.Unlock() ris.mu.Unlock()
if len(ibsToFlush) > 0 {
tb.flushBlocksToInmemoryParts(ibsToFlush, false) tb.flushBlocksToInmemoryParts(ibsToFlush, false)
}
return tailItems return tailItems
} }
@ -689,7 +691,7 @@ func (tb *Table) mergeInmemoryPartsToFiles(pws []*partWrapper) error {
// This function is for debugging and testing purposes only, // This function is for debugging and testing purposes only,
// since it may slow down data ingestion when used frequently. // since it may slow down data ingestion when used frequently.
func (tb *Table) DebugFlush() { func (tb *Table) DebugFlush() {
tb.flushPendingItems(nil, true) tb.flushPendingItems(true)
// Wait for background flushers to finish. // Wait for background flushers to finish.
tb.flushPendingItemsWG.Wait() tb.flushPendingItemsWG.Wait()
@ -699,13 +701,12 @@ func (tb *Table) pendingItemsFlusher() {
d := timeutil.AddJitterToDuration(pendingItemsFlushInterval) d := timeutil.AddJitterToDuration(pendingItemsFlushInterval)
ticker := time.NewTicker(d) ticker := time.NewTicker(d)
defer ticker.Stop() defer ticker.Stop()
var ibs []*inmemoryBlock
for { for {
select { select {
case <-tb.stopCh: case <-tb.stopCh:
return return
case <-ticker.C: case <-ticker.C:
ibs = tb.flushPendingItems(ibs[:0], false) tb.flushPendingItems(false)
} }
} }
} }
@ -724,15 +725,14 @@ func (tb *Table) inmemoryPartsFlusher() {
} }
} }
func (tb *Table) flushPendingItems(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { func (tb *Table) flushPendingItems(isFinal bool) {
tb.flushPendingItemsWG.Add(1) tb.flushPendingItemsWG.Add(1)
dst = tb.rawItems.flush(tb, dst, isFinal) tb.rawItems.flush(tb, isFinal)
tb.flushPendingItemsWG.Done() tb.flushPendingItemsWG.Done()
return dst
} }
func (tb *Table) flushInmemoryItemsToFiles() { func (tb *Table) flushInmemoryItemsToFiles() {
tb.flushPendingItems(nil, true) tb.flushPendingItems(true)
tb.flushInmemoryPartsToFiles(true) tb.flushInmemoryPartsToFiles(true)
} }
@ -754,12 +754,12 @@ func (tb *Table) flushInmemoryPartsToFiles(isFinal bool) {
} }
} }
func (riss *rawItemsShards) flush(tb *Table, dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { func (riss *rawItemsShards) flush(tb *Table, isFinal bool) {
var dst []*inmemoryBlock
for i := range riss.shards { for i := range riss.shards {
dst = riss.shards[i].appendBlocksToFlush(dst, isFinal) dst = riss.shards[i].appendBlocksToFlush(dst, isFinal)
} }
tb.flushBlocksToInmemoryParts(dst, isFinal) tb.flushBlocksToInmemoryParts(dst, isFinal)
return dst
} }
func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock {

View File

@ -1058,23 +1058,22 @@ func (pt *partition) pendingRowsFlusher() {
d := timeutil.AddJitterToDuration(pendingRowsFlushInterval) d := timeutil.AddJitterToDuration(pendingRowsFlushInterval)
ticker := time.NewTicker(d) ticker := time.NewTicker(d)
defer ticker.Stop() defer ticker.Stop()
var rows []rawRow
for { for {
select { select {
case <-pt.stopCh: case <-pt.stopCh:
return return
case <-ticker.C: case <-ticker.C:
rows = pt.flushPendingRows(rows[:0], false) pt.flushPendingRows(false)
} }
} }
} }
func (pt *partition) flushPendingRows(dst []rawRow, isFinal bool) []rawRow { func (pt *partition) flushPendingRows(isFinal bool) {
return pt.rawRows.flush(pt, dst, isFinal) pt.rawRows.flush(pt, isFinal)
} }
func (pt *partition) flushInmemoryRowsToFiles() { func (pt *partition) flushInmemoryRowsToFiles() {
pt.flushPendingRows(nil, true) pt.flushPendingRows(true)
pt.flushInmemoryPartsToFiles(true) pt.flushInmemoryPartsToFiles(true)
} }
@ -1096,12 +1095,12 @@ func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) {
} }
} }
func (rrss *rawRowsShards) flush(pt *partition, dst []rawRow, isFinal bool) []rawRow { func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) {
var dst []rawRow
for i := range rrss.shards { for i := range rrss.shards {
dst = rrss.shards[i].appendRawRowsToFlush(dst, isFinal) dst = rrss.shards[i].appendRawRowsToFlush(dst, isFinal)
} }
pt.flushRowsToInmemoryParts(dst) pt.flushRowsToInmemoryParts(dst)
return dst
} }
func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawRow { func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawRow {

View File

@ -172,12 +172,11 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt := mustCreatePartition(ptt, "small-table", "big-table", strg) pt := mustCreatePartition(ptt, "small-table", "big-table", strg)
smallPartsPath := pt.smallPartsPath smallPartsPath := pt.smallPartsPath
bigPartsPath := pt.bigPartsPath bigPartsPath := pt.bigPartsPath
var tmpRows []rawRow
for _, rows := range rowss { for _, rows := range rowss {
pt.AddRows(rows) pt.AddRows(rows)
// Flush just added rows to a separate partitions. // Flush just added rows to a separate partitions.
tmpRows = pt.flushPendingRows(tmpRows[:0], true) pt.flushPendingRows(true)
} }
testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1) testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1)
pt.MustClose() pt.MustClose()

View File

@ -199,9 +199,8 @@ func (tb *table) flushPendingRows() {
ptws := tb.GetPartitions(nil) ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws) defer tb.PutPartitions(ptws)
var rows []rawRow
for _, ptw := range ptws { for _, ptw := range ptws {
rows = ptw.pt.flushPendingRows(rows[:0], true) ptw.pt.flushPendingRows(true)
} }
} }