VictoriaMetrics/lib/streamaggr/increase.go
Alexander Marshalov 9ba03b4838
allow configuring staleness interval in stream aggregation (#4667) (#4670)
---------

Signed-off-by: Alexander Marshalov <_@marshalov.org>
Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
2023-07-20 21:47:29 -07:00

129 lines
2.9 KiB
Go

package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// increaseAggrState calculates output=increase, e.g. the increase over input counters.
type increaseAggrState struct {
m sync.Map
ignoreInputDeadline uint64
stalenessInterval uint64
}
type increaseStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
deleteDeadline uint64
deleted bool
}
func newIncreaseAggrState(interval time.Duration, stalenessInterval time.Duration) *increaseAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := uint64(interval.Seconds() + 1)
return &increaseAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
stalenessInterval: uint64(stalenessInterval.Seconds()),
}
}
func (as *increaseAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessInterval
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &increaseStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*increaseStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
sv.lastValues[inputKey] = lv
}
d := value
if ok && lv.value <= value {
d = value - lv.value
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total += d
}
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *increaseAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*increaseStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *increaseAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*increaseStateValue)
sv.mu.Lock()
increase := sv.total
sv.total = 0
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, "increase", currentTimeMsec, increase)
}
return true
})
}