diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f3d339a83f..bb4677cc34 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,8 @@ # tip +* FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. + # [v1.50.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.50.2) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 4369ec1169..c049c0812c 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -167,6 +167,7 @@ type partition struct { bigPartsMergerWG sync.WaitGroup rawRowsFlusherWG sync.WaitGroup inmemoryPartsFlusherWG sync.WaitGroup + stalePartsRemoverWG sync.WaitGroup } // partWrapper is a wrapper for the part. @@ -278,6 +279,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func pt.startMergeWorkers() pt.startRawRowsFlusher() pt.startInmemoryPartsFlusher() + pt.startStalePartsRemover() return pt, nil } @@ -641,8 +643,13 @@ func (pt *partition) PutParts(pws []*partWrapper) { func (pt *partition) MustClose() { close(pt.stopCh) - logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath) + logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath) startTime := time.Now() + pt.stalePartsRemoverWG.Wait() + logger.Infof("stale parts remover stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) + + logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath) + startTime = time.Now() pt.inmemoryPartsFlusherWG.Wait() logger.Infof("inmemory parts flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) @@ -1289,6 +1296,64 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig return dst, removedParts } +func (pt *partition) startStalePartsRemover() { + pt.stalePartsRemoverWG.Add(1) + go func() { + pt.stalePartsRemover() + pt.stalePartsRemoverWG.Done() + }() +} + +func (pt *partition) stalePartsRemover() { + ticker := time.NewTicker(7 * time.Minute) + defer ticker.Stop() + for { + select { + case <-pt.stopCh: + return + case <-ticker.C: + pt.removeStaleParts() + } + } +} + +func (pt *partition) removeStaleParts() { + m := make(map[*partWrapper]bool) + startTime := time.Now() + retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs + + pt.partsLock.Lock() + for _, pw := range pt.bigParts { + if pw.p.ph.MaxTimestamp < retentionDeadline { + atomic.AddUint64(&pt.bigRowsDeleted, pw.p.ph.RowsCount) + m[pw] = true + } + } + for _, pw := range pt.smallParts { + if pw.p.ph.MaxTimestamp < retentionDeadline { + atomic.AddUint64(&pt.smallRowsDeleted, pw.p.ph.RowsCount) + m[pw] = true + } + } + removedSmallParts := 0 + removedBigParts := 0 + if len(m) > 0 { + pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m, false) + pt.bigParts, removedBigParts = removeParts(pt.bigParts, m, true) + } + pt.partsLock.Unlock() + + if removedSmallParts+removedBigParts != len(m) { + logger.Panicf("BUG: unexpected number of stale parts removed; got %d, want %d", removedSmallParts+removedBigParts, len(m)) + } + + // Remove partition references from removed parts, so they are eventually deleted when nobody reads from them. + for pw := range m { + logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000) + pw.decRef() + } +} + // getPartsToMerge returns optimal parts to merge from pws. // // The returned parts will contain less than maxRows rows.