lib/{mergeset,storage}: add start background workers via startBackgroundWorkers() function

This commit is contained in:
Aliaksandr Valialkin 2022-12-04 00:01:04 -08:00
parent 3a25a4b1de
commit 9ac1174493
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 15 additions and 10 deletions

View File

@ -291,8 +291,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
tb.rawItems.init() tb.rawItems.init()
tb.startPartMergers() tb.startBackgroundWorkers()
tb.startRawItemsFlusher()
var m TableMetrics var m TableMetrics
tb.UpdateMetrics(&m) tb.UpdateMetrics(&m)
@ -323,6 +322,11 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
return tb, nil return tb, nil
} }
func (tb *Table) startBackgroundWorkers() {
tb.startPartMergers()
tb.startRawItemsFlusher()
}
// MustClose closes the table. // MustClose closes the table.
func (tb *Table) MustClose() { func (tb *Table) MustClose() {
close(tb.stopCh) close(tb.stopCh)

View File

@ -204,16 +204,20 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
pt := newPartition(name, smallPartsPath, bigPartsPath, s) pt := newPartition(name, smallPartsPath, bigPartsPath, s)
pt.tr.fromPartitionTimestamp(timestamp) pt.tr.fromPartitionTimestamp(timestamp)
pt.startMergeWorkers() pt.startBackgroundWorkers()
pt.startRawRowsFlusher()
pt.startInmemoryPartsFlusher()
pt.startStalePartsRemover()
logger.Infof("partition %q has been created", name) logger.Infof("partition %q has been created", name)
return pt, nil return pt, nil
} }
func (pt *partition) startBackgroundWorkers() {
pt.startMergeWorkers()
pt.startRawRowsFlusher()
pt.startInmemoryPartsFlusher()
pt.startStalePartsRemover()
}
// Drop drops all the data on the storage for the given pt. // Drop drops all the data on the storage for the given pt.
// //
// The pt must be detached from table before calling pt.Drop. // The pt must be detached from table before calling pt.Drop.
@ -258,10 +262,7 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition,
if err := pt.tr.fromPartitionName(name); err != nil { if err := pt.tr.fromPartitionName(name); err != nil {
return nil, fmt.Errorf("cannot obtain partition time range from smallPartsPath %q: %w", smallPartsPath, err) return nil, fmt.Errorf("cannot obtain partition time range from smallPartsPath %q: %w", smallPartsPath, err)
} }
pt.startMergeWorkers() pt.startBackgroundWorkers()
pt.startRawRowsFlusher()
pt.startInmemoryPartsFlusher()
pt.startStalePartsRemover()
return pt, nil return pt, nil
} }