diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 45e52345b7..f6ee62a628 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -814,11 +814,9 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error { return nil } -var mergeWorkersCount = runtime.GOMAXPROCS(-1) - var ( - bigMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount) - smallMergeConcurrencyLimitCh = make(chan struct{}, mergeWorkersCount) + bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 + smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 ) // SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks. @@ -829,7 +827,7 @@ func SetBigMergeWorkersCount(n int) { // Do nothing return } - bigMergeConcurrencyLimitCh = make(chan struct{}, n) + bigMergeWorkersCount = n } // SetSmallMergeWorkersCount sets the maximum number of concurrent mergers for small blocks. @@ -840,18 +838,18 @@ func SetSmallMergeWorkersCount(n int) { // Do nothing return } - smallMergeConcurrencyLimitCh = make(chan struct{}, n) + smallMergeWorkersCount = n } func (pt *partition) startMergeWorkers() { - for i := 0; i < mergeWorkersCount; i++ { + for i := 0; i < smallMergeWorkersCount; i++ { pt.smallPartsMergerWG.Add(1) go func() { pt.smallPartsMerger() pt.smallPartsMergerWG.Done() }() } - for i := 0; i < mergeWorkersCount; i++ { + for i := 0; i < bigMergeWorkersCount; i++ { pt.bigPartsMergerWG.Add(1) go func() { pt.bigPartsMerger() @@ -924,11 +922,11 @@ func maxRowsByPath(path string) uint64 { freeSpace := fs.MustGetFreeSpace(path) // Calculate the maximum number of rows in the output merge part - // by dividing the freeSpace by the number of concurrent - // mergeWorkersCount for big parts. - // This assumes each row is compressed into 0.5 bytes + // by dividing the freeSpace by the maximum number of concurrent + // workers for big parts. + // This assumes each row is compressed into 1 byte // according to production data. - maxRows := 2 * (freeSpace / uint64(mergeWorkersCount)) + maxRows := freeSpace / uint64(bigMergeWorkersCount) if maxRows > maxRowsPerBigPart { maxRows = maxRowsPerBigPart } @@ -936,11 +934,6 @@ func maxRowsByPath(path string) uint64 { } func (pt *partition) mergeBigParts(isFinal bool) error { - bigMergeConcurrencyLimitCh <- struct{}{} - defer func() { - <-bigMergeConcurrencyLimitCh - }() - maxRows := maxRowsByPath(pt.bigPartsPath) pt.partsLock.Lock() @@ -954,11 +947,6 @@ func (pt *partition) mergeBigParts(isFinal bool) error { } func (pt *partition) mergeSmallParts(isFinal bool) error { - smallMergeConcurrencyLimitCh <- struct{}{} - defer func() { - <-smallMergeConcurrencyLimitCh - }() - maxRows := maxRowsByPath(pt.smallPartsPath) if maxRows > maxRowsPerSmallPart() { // The output part may go to big part, @@ -1244,7 +1232,7 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui } // Filter out too big parts. - // This should reduce N for O(n^2) algorithm below. + // This should reduce N for O(N^2) algorithm below. maxInPartRows := maxRows / 2 tmp := make([]*partWrapper, 0, len(src)) for _, pw := range src {