lib/storage: remove data race when updating rowsDeleted

This commit is contained in:
Aliaksandr Valialkin 2020-11-05 01:12:21 +02:00
parent c5e6c5f5a6
commit f2bff64933

View File

@ -3,6 +3,7 @@ package storage
import ( import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -53,12 +54,12 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
} }
if dmis.Has(bsm.Block.bh.TSID.MetricID) { if dmis.Has(bsm.Block.bh.TSID.MetricID) {
// Skip blocks for deleted metrics. // Skip blocks for deleted metrics.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount) atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount))
continue continue
} }
if bsm.Block.bh.MaxTimestamp < retentionDeadline { if bsm.Block.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention. // Skip blocks out of the given retention.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount) atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount))
continue continue
} }
if pendingBlockIsEmpty { if pendingBlockIsEmpty {
@ -181,7 +182,7 @@ func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted
for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline { for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline {
nextIdx++ nextIdx++
} }
*rowsDeleted += uint64(nextIdx - b.nextIdx) atomic.AddUint64(rowsDeleted, uint64(nextIdx - b.nextIdx))
b.nextIdx = nextIdx b.nextIdx = nextIdx
} }