mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 16:30:55 +01:00
lib/storage: properly update per-part min_dedup_interval
file contents after merge
Previously 0s was always written even if -dedup.minScrapeInterval was set to non-zero value
This is a follow-up for 4ff647137a
This commit is contained in:
parent
c2b0a6109b
commit
f22aab411b
@ -1067,28 +1067,41 @@ func atomicSetBool(p *uint64, b bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pt *partition) runFinalDedup() error {
|
func (pt *partition) runFinalDedup() error {
|
||||||
if !isDedupNeeded(pt) {
|
requiredDedupInterval, actualDedupInterval := pt.getRequiredDedupInterval()
|
||||||
|
if requiredDedupInterval <= actualDedupInterval {
|
||||||
|
// Deduplication isn't needed.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
logger.Infof("starting final dedup for partition %s", pt.name)
|
logger.Infof("starting final dedup for partition %s using requiredDedupInterval=%d ms, since the partition has smaller actualDedupInterval=%d ms",
|
||||||
|
pt.bigPartsPath, requiredDedupInterval, actualDedupInterval)
|
||||||
if err := pt.ForceMergeAllParts(); err != nil {
|
if err := pt.ForceMergeAllParts(); err != nil {
|
||||||
return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.name, err)
|
return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.bigPartsPath, err)
|
||||||
}
|
}
|
||||||
logger.Infof("final dedup for partition %s finished in %.3f seconds", pt.name, time.Since(t).Seconds())
|
logger.Infof("final dedup for partition %s has been finished in %.3f seconds", pt.bigPartsPath, time.Since(t).Seconds())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func isDedupNeeded(pt *partition) bool {
|
func (pt *partition) getRequiredDedupInterval() (int64, int64) {
|
||||||
pws := pt.GetParts(nil)
|
pws := pt.GetParts(nil)
|
||||||
defer pt.PutParts(pws)
|
defer pt.PutParts(pws)
|
||||||
dedupInterval := GetDedupInterval()
|
dedupInterval := GetDedupInterval()
|
||||||
if dedupInterval <= 0 {
|
|
||||||
// The deduplication isn't needed.
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
minDedupInterval := getMinDedupInterval(pws)
|
minDedupInterval := getMinDedupInterval(pws)
|
||||||
return minDedupInterval < dedupInterval
|
return dedupInterval, minDedupInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMinDedupInterval(pws []*partWrapper) int64 {
|
||||||
|
if len(pws) == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
dMin := pws[0].p.ph.MinDedupInterval
|
||||||
|
for _, pw := range pws[1:] {
|
||||||
|
d := pw.p.ph.MinDedupInterval
|
||||||
|
if d < dMin {
|
||||||
|
dMin = d
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dMin
|
||||||
}
|
}
|
||||||
|
|
||||||
// mergeParts merges pws.
|
// mergeParts merges pws.
|
||||||
@ -1181,7 +1194,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||||||
}
|
}
|
||||||
bsrs = nil
|
bsrs = nil
|
||||||
|
|
||||||
ph.MinDedupInterval = getMinDedupInterval(pws)
|
ph.MinDedupInterval = GetDedupInterval()
|
||||||
if err := ph.writeMinDedupInterval(tmpPartPath); err != nil {
|
if err := ph.writeMinDedupInterval(tmpPartPath); err != nil {
|
||||||
return fmt.Errorf("cannot store min dedup interval for part %q: %w", tmpPartPath, err)
|
return fmt.Errorf("cannot store min dedup interval for part %q: %w", tmpPartPath, err)
|
||||||
}
|
}
|
||||||
@ -1265,20 +1278,6 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMinDedupInterval(pws []*partWrapper) int64 {
|
|
||||||
if len(pws) == 0 {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
dMin := pws[0].p.ph.MinDedupInterval
|
|
||||||
for _, pw := range pws[1:] {
|
|
||||||
d := pw.p.ph.MinDedupInterval
|
|
||||||
if d < dMin {
|
|
||||||
dMin = d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return dMin
|
|
||||||
}
|
|
||||||
|
|
||||||
func getCompressLevelForRowsCount(rowsCount, blocksCount uint64) int {
|
func getCompressLevelForRowsCount(rowsCount, blocksCount uint64) int {
|
||||||
avgRowsPerBlock := rowsCount / blocksCount
|
avgRowsPerBlock := rowsCount / blocksCount
|
||||||
if avgRowsPerBlock <= 200 {
|
if avgRowsPerBlock <= 200 {
|
||||||
|
Loading…
Reference in New Issue
Block a user