mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
lib/storage: reduce the maximum number of concurrent merge workers to GOMAXPROCS/2
Previously the limit has been raised to GOMAXPROCS, but it has been appeared that this increases query latencies since more CPUs are busy with merges. While at it, substitute `*MergeConcurrencyLimitCh` channels with simple integer limits.
This commit is contained in:
parent
106e302d7a
commit
3149af624d
@ -814,11 +814,9 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var mergeWorkersCount = runtime.GOMAXPROCS(-1)
|
||||
|
||||
var (
|
||||
bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
|
||||
smallMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount)
|
||||
bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
|
||||
smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
|
||||
)
|
||||
|
||||
// SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks.
|
||||
@ -829,7 +827,7 @@ func SetBigMergeWorkersCount(n int) {
|
||||
// Do nothing
|
||||
return
|
||||
}
|
||||
bigMergeConcurrencyLimitCh = make(chan struct{}, n)
|
||||
bigMergeWorkersCount = n
|
||||
}
|
||||
|
||||
// SetSmallMergeWorkersCount sets the maximum number of concurrent mergers for small blocks.
|
||||
@ -840,18 +838,18 @@ func SetSmallMergeWorkersCount(n int) {
|
||||
// Do nothing
|
||||
return
|
||||
}
|
||||
smallMergeConcurrencyLimitCh = make(chan struct{}, n)
|
||||
smallMergeWorkersCount = n
|
||||
}
|
||||
|
||||
func (pt *partition) startMergeWorkers() {
|
||||
for i := 0; i < mergeWorkersCount; i++ {
|
||||
for i := 0; i < smallMergeWorkersCount; i++ {
|
||||
pt.smallPartsMergerWG.Add(1)
|
||||
go func() {
|
||||
pt.smallPartsMerger()
|
||||
pt.smallPartsMergerWG.Done()
|
||||
}()
|
||||
}
|
||||
for i := 0; i < mergeWorkersCount; i++ {
|
||||
for i := 0; i < bigMergeWorkersCount; i++ {
|
||||
pt.bigPartsMergerWG.Add(1)
|
||||
go func() {
|
||||
pt.bigPartsMerger()
|
||||
@ -924,11 +922,11 @@ func maxRowsByPath(path string) uint64 {
|
||||
freeSpace := fs.MustGetFreeSpace(path)
|
||||
|
||||
// Calculate the maximum number of rows in the output merge part
|
||||
// by dividing the freeSpace by the number of concurrent
|
||||
// mergeWorkersCount for big parts.
|
||||
// This assumes each row is compressed into 0.5 bytes
|
||||
// by dividing the freeSpace by the maximum number of concurrent
|
||||
// workers for big parts.
|
||||
// This assumes each row is compressed into 1 byte
|
||||
// according to production data.
|
||||
maxRows := 2 * (freeSpace / uint64(mergeWorkersCount))
|
||||
maxRows := freeSpace / uint64(bigMergeWorkersCount)
|
||||
if maxRows > maxRowsPerBigPart {
|
||||
maxRows = maxRowsPerBigPart
|
||||
}
|
||||
@ -936,11 +934,6 @@ func maxRowsByPath(path string) uint64 {
|
||||
}
|
||||
|
||||
func (pt *partition) mergeBigParts(isFinal bool) error {
|
||||
bigMergeConcurrencyLimitCh <- struct{}{}
|
||||
defer func() {
|
||||
<-bigMergeConcurrencyLimitCh
|
||||
}()
|
||||
|
||||
maxRows := maxRowsByPath(pt.bigPartsPath)
|
||||
|
||||
pt.partsLock.Lock()
|
||||
@ -954,11 +947,6 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
|
||||
}
|
||||
|
||||
func (pt *partition) mergeSmallParts(isFinal bool) error {
|
||||
smallMergeConcurrencyLimitCh <- struct{}{}
|
||||
defer func() {
|
||||
<-smallMergeConcurrencyLimitCh
|
||||
}()
|
||||
|
||||
maxRows := maxRowsByPath(pt.smallPartsPath)
|
||||
if maxRows > maxRowsPerSmallPart() {
|
||||
// The output part may go to big part,
|
||||
@ -1244,7 +1232,7 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
||||
}
|
||||
|
||||
// Filter out too big parts.
|
||||
// This should reduce N for O(n^2) algorithm below.
|
||||
// This should reduce N for O(N^2) algorithm below.
|
||||
maxInPartRows := maxRows / 2
|
||||
tmp := make([]*partWrapper, 0, len(src))
|
||||
for _, pw := range src {
|
||||
|
Loading…
Reference in New Issue
Block a user