lib/{storage,mergeset}: consistency rename: `flushRaw{Rows,Items} -> flushPending{Rows,Items}

This commit is contained in:
Aliaksandr Valialkin 2022-12-03 22:17:46 -08:00
parent 233301a549
commit 10a17bfa16
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
6 changed files with 15 additions and 15 deletions

View File

@ -346,7 +346,7 @@ func (tb *Table) MustClose() {
startTime = time.Now()
// Flush raw items the last time before exit.
tb.flushRawItems(true)
tb.flushPendingItems(true)
// Flush inmemory parts to disk.
var pws []*partWrapper
@ -515,7 +515,7 @@ func (tb *Table) rawItemsFlusher() {
case <-tb.stopCh:
return
case <-ticker.C:
tb.flushRawItems(false)
tb.flushPendingItems(false)
}
}
}
@ -543,13 +543,13 @@ func (tb *Table) mergePartsOptimal(pws []*partWrapper) error {
//
// This function is only for debugging and testing.
func (tb *Table) DebugFlush() {
tb.flushRawItems(true)
tb.flushPendingItems(true)
// Wait for background flushers to finish.
tb.rawItemsPendingFlushesWG.Wait()
}
func (tb *Table) flushRawItems(isFinal bool) {
func (tb *Table) flushPendingItems(isFinal bool) {
tb.rawItems.flush(tb, isFinal)
}
@ -1099,7 +1099,7 @@ func (tb *Table) CreateSnapshotAt(dstDir string) error {
}
// Flush inmemory items to disk.
tb.flushRawItems(true)
tb.flushPendingItems(true)
// The snapshot must be created under the lock in order to prevent from
// concurrent modifications via runTransaction.

View File

@ -649,7 +649,7 @@ func (pt *partition) MustClose() {
startTime = time.Now()
// Flush raw rows the last time before exit.
pt.flushRawRows(true)
pt.flushPendingRows(true)
// Flush inmemory parts to disk.
var pws []*partWrapper
@ -710,12 +710,12 @@ func (pt *partition) rawRowsFlusher() {
case <-pt.stopCh:
return
case <-ticker.C:
pt.flushRawRows(false)
pt.flushPendingRows(false)
}
}
}
func (pt *partition) flushRawRows(isFinal bool) {
func (pt *partition) flushPendingRows(isFinal bool) {
pt.rawRows.flush(pt, isFinal)
}
@ -1639,7 +1639,7 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error {
startTime := time.Now()
// Flush inmemory data to disk.
pt.flushRawRows(true)
pt.flushPendingRows(true)
if _, err := pt.flushInmemoryParts(nil, true); err != nil {
return fmt.Errorf("cannot flush inmemory parts: %w", err)
}

View File

@ -185,7 +185,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt.AddRows(rows)
// Flush just added rows to a separate partition.
pt.flushRawRows(true)
pt.flushPendingRows(true)
}
testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1)
pt.MustClose()

View File

@ -321,7 +321,7 @@ func (s *Storage) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
// DebugFlush flushes recently added storage data, so it becomes visible to search.
func (s *Storage) DebugFlush() {
s.tb.flushRawRows()
s.tb.flushPendingRows()
s.idb().tb.DebugFlush()
}

View File

@ -215,15 +215,15 @@ func (tb *table) MustClose() {
}
}
// flushRawRows flushes all the pending rows, so they become visible to search.
// flushPendingRows flushes all the pending rows, so they become visible to search.
//
// This function is for debug purposes only.
func (tb *table) flushRawRows() {
func (tb *table) flushPendingRows() {
ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws)
for _, ptw := range ptws {
ptw.pt.flushRawRows(true)
ptw.pt.flushPendingRows(true)
}
}

View File

@ -197,7 +197,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
}
// Flush rows to parts.
tb.flushRawRows()
tb.flushPendingRows()
}
testTableSearch(t, tb, tsids, trSearch, rbsExpected, -1)
tb.MustClose()