lib/streamaggr: follow up for 70773f53d7

- Round staleness_interval durations to the upper number of seconds.
  This should prevent from under-calculations for fractional staleness intervals.
- Rename stalenessInterval field at *AggrState structs into stalenessSecs, since it holds seconds.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667
This commit is contained in:
Aliaksandr Valialkin 2023-07-20 21:44:21 -07:00
parent bd95341190
commit 9763e2295b
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
3 changed files with 23 additions and 11 deletions

View File

@ -1,6 +1,7 @@
package streamaggr
import (
"math"
"sync"
"time"
@ -12,7 +13,7 @@ import (
type histogramBucketAggrState struct {
m sync.Map
stalenessInterval uint64
stalenessSecs uint64
}
type histogramBucketStateValue struct {
@ -23,14 +24,15 @@ type histogramBucketStateValue struct {
}
func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &histogramBucketAggrState{
stalenessInterval: uint64(stalenessInterval.Seconds()),
stalenessSecs: stalenessSecs,
}
}
func (as *histogramBucketAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessInterval
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
@ -98,3 +100,11 @@ func (as *histogramBucketAggrState) appendSeriesForFlush(ctx *flushCtx) {
return true
})
}
func roundDurationToSecs(d time.Duration) uint64 {
if d < 0 {
return 0
}
secs := d.Seconds()
return uint64(math.Ceil(secs))
}

View File

@ -12,7 +12,7 @@ type increaseAggrState struct {
m sync.Map
ignoreInputDeadline uint64
stalenessInterval uint64
stalenessSecs uint64
}
type increaseStateValue struct {
@ -25,16 +25,17 @@ type increaseStateValue struct {
func newIncreaseAggrState(interval time.Duration, stalenessInterval time.Duration) *increaseAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := uint64(interval.Seconds() + 1)
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &increaseAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
stalenessInterval: uint64(stalenessInterval.Seconds()),
stalenessSecs: stalenessSecs,
}
}
func (as *increaseAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessInterval
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)

View File

@ -13,7 +13,7 @@ type totalAggrState struct {
m sync.Map
ignoreInputDeadline uint64
stalenessInterval uint64
stalenessSecs uint64
}
type totalStateValue struct {
@ -31,16 +31,17 @@ type lastValueState struct {
func newTotalAggrState(interval time.Duration, stalenessInterval time.Duration) *totalAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := uint64(interval.Seconds() + 1)
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &totalAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
stalenessInterval: uint64(stalenessInterval.Seconds()),
stalenessSecs: stalenessSecs,
}
}
func (as *totalAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessInterval
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)