From 10a17bfa16cd937f3f72bf7a3bf7d3ba6a235688 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 3 Dec 2022 22:17:46 -0800 Subject: [PATCH] lib/{storage,mergeset}: consistency rename: `flushRaw{Rows,Items} -> flushPending{Rows,Items} --- lib/mergeset/table.go | 10 +++++----- lib/storage/partition.go | 8 ++++---- lib/storage/partition_search_test.go | 2 +- lib/storage/storage.go | 2 +- lib/storage/table.go | 6 +++--- lib/storage/table_search_test.go | 2 +- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 87e4f56437..4f1e224ae6 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -346,7 +346,7 @@ func (tb *Table) MustClose() { startTime = time.Now() // Flush raw items the last time before exit. - tb.flushRawItems(true) + tb.flushPendingItems(true) // Flush inmemory parts to disk. var pws []*partWrapper @@ -515,7 +515,7 @@ func (tb *Table) rawItemsFlusher() { case <-tb.stopCh: return case <-ticker.C: - tb.flushRawItems(false) + tb.flushPendingItems(false) } } } @@ -543,13 +543,13 @@ func (tb *Table) mergePartsOptimal(pws []*partWrapper) error { // // This function is only for debugging and testing. func (tb *Table) DebugFlush() { - tb.flushRawItems(true) + tb.flushPendingItems(true) // Wait for background flushers to finish. tb.rawItemsPendingFlushesWG.Wait() } -func (tb *Table) flushRawItems(isFinal bool) { +func (tb *Table) flushPendingItems(isFinal bool) { tb.rawItems.flush(tb, isFinal) } @@ -1099,7 +1099,7 @@ func (tb *Table) CreateSnapshotAt(dstDir string) error { } // Flush inmemory items to disk. - tb.flushRawItems(true) + tb.flushPendingItems(true) // The snapshot must be created under the lock in order to prevent from // concurrent modifications via runTransaction. diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 6f91cba1c2..296b8af68a 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -649,7 +649,7 @@ func (pt *partition) MustClose() { startTime = time.Now() // Flush raw rows the last time before exit. - pt.flushRawRows(true) + pt.flushPendingRows(true) // Flush inmemory parts to disk. var pws []*partWrapper @@ -710,12 +710,12 @@ func (pt *partition) rawRowsFlusher() { case <-pt.stopCh: return case <-ticker.C: - pt.flushRawRows(false) + pt.flushPendingRows(false) } } } -func (pt *partition) flushRawRows(isFinal bool) { +func (pt *partition) flushPendingRows(isFinal bool) { pt.rawRows.flush(pt, isFinal) } @@ -1639,7 +1639,7 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error { startTime := time.Now() // Flush inmemory data to disk. - pt.flushRawRows(true) + pt.flushPendingRows(true) if _, err := pt.flushInmemoryParts(nil, true); err != nil { return fmt.Errorf("cannot flush inmemory parts: %w", err) } diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 9649770dfd..b0e39ef7d0 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -185,7 +185,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.AddRows(rows) // Flush just added rows to a separate partition. - pt.flushRawRows(true) + pt.flushPendingRows(true) } testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1) pt.MustClose() diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 205e92a910..00f88c1954 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -321,7 +321,7 @@ func (s *Storage) updateDeletedMetricIDs(metricIDs *uint64set.Set) { // DebugFlush flushes recently added storage data, so it becomes visible to search. func (s *Storage) DebugFlush() { - s.tb.flushRawRows() + s.tb.flushPendingRows() s.idb().tb.DebugFlush() } diff --git a/lib/storage/table.go b/lib/storage/table.go index 5b7ffbce71..e4b6030449 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -215,15 +215,15 @@ func (tb *table) MustClose() { } } -// flushRawRows flushes all the pending rows, so they become visible to search. +// flushPendingRows flushes all the pending rows, so they become visible to search. // // This function is for debug purposes only. -func (tb *table) flushRawRows() { +func (tb *table) flushPendingRows() { ptws := tb.GetPartitions(nil) defer tb.PutPartitions(ptws) for _, ptw := range ptws { - ptw.pt.flushRawRows(true) + ptw.pt.flushPendingRows(true) } } diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index c9b1119dcf..17e1f5b86c 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -197,7 +197,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount } // Flush rows to parts. - tb.flushRawRows() + tb.flushPendingRows() } testTableSearch(t, tb, tsids, trSearch, rbsExpected, -1) tb.MustClose()