app/vmstorage: add -bigMergeConcurrency and -smallMergeConcurrency flags for tuning the maximum number of CPU cores used during merges

This commit is contained in:
Aliaksandr Valialkin 2019-10-31 16:16:53 +02:00
parent e0b292c6de
commit d18ea0c95b
2 changed files with 38 additions and 2 deletions

View File

@ -24,6 +24,9 @@ var (
// DataPath is a path to storage data.
DataPath = flag.String("storageDataPath", "victoria-metrics-data", "Path to storage data")
bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0")
smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0")
)
// Init initializes vmstorage.
@ -39,6 +42,10 @@ func InitWithoutMetrics() {
if err := encoding.CheckPrecisionBits(uint8(*precisionBits)); err != nil {
logger.Fatalf("invalid `-precisionBits`: %s", err)
}
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
logger.Infof("opening storage at %q with retention period %d months", *DataPath, *retentionPeriod)
startTime := time.Now()
WG = syncwg.WaitGroup{}

View File

@ -742,8 +742,37 @@ var mergeWorkersCount = func() int {
return n
}()
var bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
var smallMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
var (
bigMergeWorkersCount = uint64(mergeWorkersCount)
smallMergeWorkersCount = uint64(mergeWorkersCount)
)
var (
bigMergeConcurrencyLimitCh = make(chan struct{}, bigMergeWorkersCount)
smallMergeConcurrencyLimitCh = make(chan struct{}, smallMergeWorkersCount)
)
// SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks.
//
// The function must be called before opening or creating any storage.
func SetBigMergeWorkersCount(n int) {
if n <= 0 {
// Do nothing
return
}
atomic.StoreUint64(&bigMergeWorkersCount, uint64(n))
}
// SetSmallMergeWorkersCount sets the maximum number of concurrent mergers for small blocks.
//
// The function must be called before opening or creating any storage.
func SetSmallMergeWorkersCount(n int) {
if n <= 0 {
// Do nothing
return
}
atomic.StoreUint64(&smallMergeWorkersCount, uint64(n))
}
func (pt *partition) startMergeWorkers() {
for i := 0; i < mergeWorkersCount; i++ {