lib/storage: properly determine max rows for output part when merging small parts

This commit is contained in:
Aliaksandr Valialkin 2020-12-18 23:14:35 +02:00
parent edbe35509e
commit 6859737329

View File

@ -1025,24 +1025,47 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
} }
func (pt *partition) mergeSmallParts(isFinal bool) error { func (pt *partition) mergeSmallParts(isFinal bool) error {
maxRows := maxRowsByPath(pt.smallPartsPath) // Try merging small parts to a big part at first.
if maxRows > maxRowsPerSmallPart() {
// The output part may go to big part,
// so make sure it has enough space.
maxBigPartRows := maxRowsByPath(pt.bigPartsPath) maxBigPartRows := maxRowsByPath(pt.bigPartsPath)
if maxRows > maxBigPartRows {
maxRows = maxBigPartRows
}
}
pt.partsLock.Lock() pt.partsLock.Lock()
pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxRows, isFinal) pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxBigPartRows, isFinal)
pt.partsLock.Unlock() pt.partsLock.Unlock()
atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace)
atomicSetBool(&pt.smallMergeNeedFreeDiskSpace, needFreeSpace) rowsCount := getRowsCount(pws)
if rowsCount > maxRowsPerSmallPart() {
// Merge small parts to a big part.
return pt.mergeParts(pws, pt.stopCh) return pt.mergeParts(pws, pt.stopCh)
} }
// Make sure that the output small part fits small parts storage.
maxSmallPartRows := maxRowsByPath(pt.smallPartsPath)
if rowsCount <= maxSmallPartRows {
// Merge small parts to a small part.
return pt.mergeParts(pws, pt.stopCh)
}
// The output small part doesn't fit small parts storage. Try merging small parts according to maxSmallPartRows limit.
pt.releasePartsToMerge(pws)
pt.partsLock.Lock()
pws, needFreeSpace = getPartsToMerge(pt.smallParts, maxSmallPartRows, isFinal)
pt.partsLock.Unlock()
atomicSetBool(&pt.smallMergeNeedFreeDiskSpace, needFreeSpace)
return pt.mergeParts(pws, pt.stopCh)
}
func (pt *partition) releasePartsToMerge(pws []*partWrapper) {
pt.partsLock.Lock()
for _, pw := range pws {
if !pw.isInMerge {
logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path)
}
pw.isInMerge = false
}
pt.partsLock.Unlock()
}
var errNothingToMerge = fmt.Errorf("nothing to merge") var errNothingToMerge = fmt.Errorf("nothing to merge")
func atomicSetBool(p *uint64, b bool) { func atomicSetBool(p *uint64, b bool) {
@ -1063,18 +1086,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
// Nothing to merge. // Nothing to merge.
return errNothingToMerge return errNothingToMerge
} }
defer pt.releasePartsToMerge(pws)
defer func() {
// Remove isInMerge flag from pws.
pt.partsLock.Lock()
for _, pw := range pws {
if !pw.isInMerge {
logger.Panicf("BUG: missing isInMerge flag on the part %q", pw.p.path)
}
pw.isInMerge = false
}
pt.partsLock.Unlock()
}()
startTime := time.Now() startTime := time.Now()
@ -1365,22 +1377,19 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
// since this results in unbalanced merges. // since this results in unbalanced merges.
continue continue
} }
rowsSum := uint64(0) rowsCount := getRowsCount(a)
for _, pw := range a { if rowsCount < 1e6 && len(a) < maxPartsToMerge {
rowsSum += pw.p.ph.RowsCount
}
if rowsSum < 1e6 && len(a) < maxPartsToMerge {
// Do not merge parts with too small number of rows if the number of source parts // Do not merge parts with too small number of rows if the number of source parts
// isn't equal to maxPartsToMerge. This should reduce CPU usage and disk IO usage // isn't equal to maxPartsToMerge. This should reduce CPU usage and disk IO usage
// for small parts merge. // for small parts merge.
continue continue
} }
if rowsSum > maxRows { if rowsCount > maxRows {
// There is no need in verifying remaining parts with higher number of rows // There is no need in verifying remaining parts with higher number of rows
needFreeSpace = true needFreeSpace = true
break break
} }
m := float64(rowsSum) / float64(a[len(a)-1].p.ph.RowsCount) m := float64(rowsCount) / float64(a[len(a)-1].p.ph.RowsCount)
if m < maxM { if m < maxM {
continue continue
} }
@ -1400,6 +1409,14 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
return append(dst, pws...), needFreeSpace return append(dst, pws...), needFreeSpace
} }
func getRowsCount(pws []*partWrapper) uint64 {
n := uint64(0)
for _, pw := range pws {
n += pw.p.ph.RowsCount
}
return n
}
func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) { func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
// The path can be missing after restoring from backup, so create it if needed. // The path can be missing after restoring from backup, so create it if needed.
if err := fs.MkdirAllIfNotExist(path); err != nil { if err := fs.MkdirAllIfNotExist(path); err != nil {