lib/streamaggr: update the minimum allowed timestamp for incoming samples before flushing the samples to the storage

This should prevent from dropping samples with old timestamps during long flushes.

This is a follow-up for 1cedaf61cb
This commit is contained in:
Aliaksandr Valialkin 2024-04-04 02:24:56 +03:00
parent d61f6c89a1
commit 222e8b5a7b
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB

View File

@ -713,6 +713,11 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) {
startTime := time.Now()
// Update minTimestamp before flushing samples to the storage,
// since the flush durtion can be quite long.
// This should prevent from dropping samples with old timestamps when the flush takes long time.
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
var wg sync.WaitGroup
for _, as := range a.aggrStates {
flushConcurrencyCh <- struct{}{}
@ -732,8 +737,6 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
}
wg.Wait()
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
d := time.Since(startTime)
a.flushDuration.Update(d.Seconds())
if d > interval {