diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 722504c243..93fc92ac85 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1067,28 +1067,41 @@ func atomicSetBool(p *uint64, b bool) { } func (pt *partition) runFinalDedup() error { - if !isDedupNeeded(pt) { + requiredDedupInterval, actualDedupInterval := pt.getRequiredDedupInterval() + if requiredDedupInterval <= actualDedupInterval { + // Deduplication isn't needed. return nil } t := time.Now() - logger.Infof("starting final dedup for partition %s", pt.name) + logger.Infof("starting final dedup for partition %s using requiredDedupInterval=%d ms, since the partition has smaller actualDedupInterval=%d ms", + pt.bigPartsPath, requiredDedupInterval, actualDedupInterval) if err := pt.ForceMergeAllParts(); err != nil { - return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.name, err) + return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.bigPartsPath, err) } - logger.Infof("final dedup for partition %s finished in %.3f seconds", pt.name, time.Since(t).Seconds()) + logger.Infof("final dedup for partition %s has been finished in %.3f seconds", pt.bigPartsPath, time.Since(t).Seconds()) return nil } -func isDedupNeeded(pt *partition) bool { +func (pt *partition) getRequiredDedupInterval() (int64, int64) { pws := pt.GetParts(nil) defer pt.PutParts(pws) dedupInterval := GetDedupInterval() - if dedupInterval <= 0 { - // The deduplication isn't needed. - return false - } minDedupInterval := getMinDedupInterval(pws) - return minDedupInterval < dedupInterval + return dedupInterval, minDedupInterval +} + +func getMinDedupInterval(pws []*partWrapper) int64 { + if len(pws) == 0 { + return 0 + } + dMin := pws[0].p.ph.MinDedupInterval + for _, pw := range pws[1:] { + d := pw.p.ph.MinDedupInterval + if d < dMin { + dMin = d + } + } + return dMin } // mergeParts merges pws. @@ -1181,7 +1194,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro } bsrs = nil - ph.MinDedupInterval = getMinDedupInterval(pws) + ph.MinDedupInterval = GetDedupInterval() if err := ph.writeMinDedupInterval(tmpPartPath); err != nil { return fmt.Errorf("cannot store min dedup interval for part %q: %w", tmpPartPath, err) } @@ -1265,20 +1278,6 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro return nil } -func getMinDedupInterval(pws []*partWrapper) int64 { - if len(pws) == 0 { - return 0 - } - dMin := pws[0].p.ph.MinDedupInterval - for _, pw := range pws[1:] { - d := pw.p.ph.MinDedupInterval - if d < dMin { - dMin = d - } - } - return dMin -} - func getCompressLevelForRowsCount(rowsCount, blocksCount uint64) int { avgRowsPerBlock := rowsCount / blocksCount if avgRowsPerBlock <= 200 {