mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/storage: properly update per-part min_dedup_interval
file contents after merge
Previously 0s was always written even if -dedup.minScrapeInterval was set to non-zero value
This is a follow-up for 4ff647137a
This commit is contained in:
parent
6814cc6809
commit
8a7f08ded3
@ -1067,28 +1067,41 @@ func atomicSetBool(p *uint64, b bool) {
|
||||
}
|
||||
|
||||
func (pt *partition) runFinalDedup() error {
|
||||
if !isDedupNeeded(pt) {
|
||||
requiredDedupInterval, actualDedupInterval := pt.getRequiredDedupInterval()
|
||||
if requiredDedupInterval <= actualDedupInterval {
|
||||
// Deduplication isn't needed.
|
||||
return nil
|
||||
}
|
||||
t := time.Now()
|
||||
logger.Infof("starting final dedup for partition %s", pt.name)
|
||||
logger.Infof("starting final dedup for partition %s using requiredDedupInterval=%d ms, since the partition has smaller actualDedupInterval=%d ms",
|
||||
pt.bigPartsPath, requiredDedupInterval, actualDedupInterval)
|
||||
if err := pt.ForceMergeAllParts(); err != nil {
|
||||
return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.name, err)
|
||||
return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.bigPartsPath, err)
|
||||
}
|
||||
logger.Infof("final dedup for partition %s finished in %.3f seconds", pt.name, time.Since(t).Seconds())
|
||||
logger.Infof("final dedup for partition %s has been finished in %.3f seconds", pt.bigPartsPath, time.Since(t).Seconds())
|
||||
return nil
|
||||
}
|
||||
|
||||
func isDedupNeeded(pt *partition) bool {
|
||||
func (pt *partition) getRequiredDedupInterval() (int64, int64) {
|
||||
pws := pt.GetParts(nil)
|
||||
defer pt.PutParts(pws)
|
||||
dedupInterval := GetDedupInterval()
|
||||
if dedupInterval <= 0 {
|
||||
// The deduplication isn't needed.
|
||||
return false
|
||||
}
|
||||
minDedupInterval := getMinDedupInterval(pws)
|
||||
return minDedupInterval < dedupInterval
|
||||
return dedupInterval, minDedupInterval
|
||||
}
|
||||
|
||||
func getMinDedupInterval(pws []*partWrapper) int64 {
|
||||
if len(pws) == 0 {
|
||||
return 0
|
||||
}
|
||||
dMin := pws[0].p.ph.MinDedupInterval
|
||||
for _, pw := range pws[1:] {
|
||||
d := pw.p.ph.MinDedupInterval
|
||||
if d < dMin {
|
||||
dMin = d
|
||||
}
|
||||
}
|
||||
return dMin
|
||||
}
|
||||
|
||||
// mergeParts merges pws.
|
||||
@ -1181,7 +1194,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
||||
}
|
||||
bsrs = nil
|
||||
|
||||
ph.MinDedupInterval = getMinDedupInterval(pws)
|
||||
ph.MinDedupInterval = GetDedupInterval()
|
||||
if err := ph.writeMinDedupInterval(tmpPartPath); err != nil {
|
||||
return fmt.Errorf("cannot store min dedup interval for part %q: %w", tmpPartPath, err)
|
||||
}
|
||||
@ -1265,20 +1278,6 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMinDedupInterval(pws []*partWrapper) int64 {
|
||||
if len(pws) == 0 {
|
||||
return 0
|
||||
}
|
||||
dMin := pws[0].p.ph.MinDedupInterval
|
||||
for _, pw := range pws[1:] {
|
||||
d := pw.p.ph.MinDedupInterval
|
||||
if d < dMin {
|
||||
dMin = d
|
||||
}
|
||||
}
|
||||
return dMin
|
||||
}
|
||||
|
||||
func getCompressLevelForRowsCount(rowsCount, blocksCount uint64) int {
|
||||
avgRowsPerBlock := rowsCount / blocksCount
|
||||
if avgRowsPerBlock <= 200 {
|
||||
|
Loading…
Reference in New Issue
Block a user