diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 5dbbb4ad90..d88cba7cb1 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -291,8 +291,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb stopCh: make(chan struct{}), } tb.rawItems.init() - tb.startPartMergers() - tb.startRawItemsFlusher() + tb.startBackgroundWorkers() var m TableMetrics tb.UpdateMetrics(&m) @@ -323,6 +322,11 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb return tb, nil } +func (tb *Table) startBackgroundWorkers() { + tb.startPartMergers() + tb.startRawItemsFlusher() +} + // MustClose closes the table. func (tb *Table) MustClose() { close(tb.stopCh) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 5c28f9a97d..17e8aa8baa 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -204,16 +204,20 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str pt := newPartition(name, smallPartsPath, bigPartsPath, s) pt.tr.fromPartitionTimestamp(timestamp) - pt.startMergeWorkers() - pt.startRawRowsFlusher() - pt.startInmemoryPartsFlusher() - pt.startStalePartsRemover() + pt.startBackgroundWorkers() logger.Infof("partition %q has been created", name) return pt, nil } +func (pt *partition) startBackgroundWorkers() { + pt.startMergeWorkers() + pt.startRawRowsFlusher() + pt.startInmemoryPartsFlusher() + pt.startStalePartsRemover() +} + // Drop drops all the data on the storage for the given pt. // // The pt must be detached from table before calling pt.Drop. @@ -258,10 +262,7 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, if err := pt.tr.fromPartitionName(name); err != nil { return nil, fmt.Errorf("cannot obtain partition time range from smallPartsPath %q: %w", smallPartsPath, err) } - pt.startMergeWorkers() - pt.startRawRowsFlusher() - pt.startInmemoryPartsFlusher() - pt.startStalePartsRemover() + pt.startBackgroundWorkers() return pt, nil }