lib/storage: allow set values higher than 1 for vm_merge_need_free_disk_space if there are multiple partitions with deferred merges due to disk space shortage

This commit is contained in:
Aliaksandr Valialkin 2020-09-29 22:51:43 +03:00
parent 536aa8779a
commit 7c2e4e267a

View File

@ -123,6 +123,9 @@ type partition struct {
smallAssistedMerges uint64 smallAssistedMerges uint64
smallMergeNeedFreeDiskSpace uint64
bigMergeNeedFreeDiskSpace uint64
mergeIdx uint64 mergeIdx uint64
smallPartsPath string smallPartsPath string
@ -392,8 +395,8 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges) m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges)
m.SmallMergeNeedFreeDiskSpace = atomic.LoadUint64(&smallMergeNeedFreeDiskSpace) m.SmallMergeNeedFreeDiskSpace += atomic.LoadUint64(&pt.smallMergeNeedFreeDiskSpace)
m.BigMergeNeedFreeDiskSpace = atomic.LoadUint64(&bigMergeNeedFreeDiskSpace) m.BigMergeNeedFreeDiskSpace += atomic.LoadUint64(&pt.bigMergeNeedFreeDiskSpace)
} }
// AddRows adds the given rows to the partition pt. // AddRows adds the given rows to the partition pt.
@ -999,7 +1002,7 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxRows, isFinal) pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxRows, isFinal)
pt.partsLock.Unlock() pt.partsLock.Unlock()
atomicSetBool(&bigMergeNeedFreeDiskSpace, needFreeSpace) atomicSetBool(&pt.bigMergeNeedFreeDiskSpace, needFreeSpace)
return pt.mergeParts(pws, pt.stopCh) return pt.mergeParts(pws, pt.stopCh)
} }
@ -1018,7 +1021,7 @@ func (pt *partition) mergeSmallParts(isFinal bool) error {
pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxRows, isFinal) pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxRows, isFinal)
pt.partsLock.Unlock() pt.partsLock.Unlock()
atomicSetBool(&smallMergeNeedFreeDiskSpace, needFreeSpace) atomicSetBool(&pt.smallMergeNeedFreeDiskSpace, needFreeSpace)
return pt.mergeParts(pws, pt.stopCh) return pt.mergeParts(pws, pt.stopCh)
} }
@ -1032,11 +1035,6 @@ func atomicSetBool(p *uint64, b bool) {
atomic.StoreUint64(p, v) atomic.StoreUint64(p, v)
} }
var (
smallMergeNeedFreeDiskSpace uint64
bigMergeNeedFreeDiskSpace uint64
)
// mergeParts merges pws. // mergeParts merges pws.
// //
// Merging is immediately stopped if stopCh is closed. // Merging is immediately stopped if stopCh is closed.