mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-05 14:22:15 +01:00
cbc637d1dd
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go . This improves code maintainability a bit. - Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs(). This prevents from potential resource leaks. - Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation. This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions. - Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics. - Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation. This label should simplify investigation of the exposed metrics. - Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability: it is hard to use these labels in query filters and aggregation functions. - Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` . This metric shows the number of samples generated by the corresponding streaming aggregation rule. This metric has been added in the commit861852f262
. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice. This metric has been added in the commit861852f262
. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params, which could modify the behaviour of the constructed streaming aggregator. Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory. - Pass Options arg to LoadFromFile() function by reference, since this structure is quite big. This also allows passing nil instead of Options when default options are enough. - Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics, so they have consistent set of labels comparing to the rest of streaming aggregation metrics. - Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding `aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding `output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output metrics. This is a follow-up for the commit2eb1bc4f81
. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604 - Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason in the commit2eb1bc4f81
. - Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function. While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
190 lines
4.8 KiB
Go
190 lines
4.8 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
)
|
|
|
|
// totalAggrState calculates output=total, total_prometheus, increase and increase_prometheus.
|
|
type totalAggrState struct {
|
|
m sync.Map
|
|
|
|
// Whether to reset the output value on every flushState call.
|
|
resetTotalOnFlush bool
|
|
|
|
// Whether to take into account the first sample in new time series when calculating the output value.
|
|
keepFirstSample bool
|
|
|
|
// Time series state is dropped if no new samples are received during stalenessSecs.
|
|
//
|
|
// Aslo, the first sample per each new series is ignored during stalenessSecs even if keepFirstSample is set.
|
|
// see ignoreFirstSampleDeadline for more details.
|
|
stalenessSecs uint64
|
|
|
|
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
|
|
// This allows avoiding an initial spike of the output values at startup when new time series
|
|
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
|
|
ignoreFirstSampleDeadline uint64
|
|
}
|
|
|
|
type totalStateValue struct {
|
|
mu sync.Mutex
|
|
lastValues map[string]totalLastValueState
|
|
total float64
|
|
deleteDeadline uint64
|
|
deleted bool
|
|
}
|
|
|
|
type totalLastValueState struct {
|
|
value float64
|
|
timestamp int64
|
|
deleteDeadline uint64
|
|
}
|
|
|
|
func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
|
|
stalenessSecs := roundDurationToSecs(stalenessInterval)
|
|
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs
|
|
|
|
return &totalAggrState{
|
|
resetTotalOnFlush: resetTotalOnFlush,
|
|
keepFirstSample: keepFirstSample,
|
|
stalenessSecs: stalenessSecs,
|
|
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
|
|
}
|
|
}
|
|
|
|
func (as *totalAggrState) pushSamples(samples []pushSample) {
|
|
currentTime := fasttime.UnixTimestamp()
|
|
deleteDeadline := currentTime + as.stalenessSecs
|
|
keepFirstSample := as.keepFirstSample && currentTime > as.ignoreFirstSampleDeadline
|
|
for i := range samples {
|
|
s := &samples[i]
|
|
inputKey, outputKey := getInputOutputKey(s.key)
|
|
|
|
again:
|
|
v, ok := as.m.Load(outputKey)
|
|
if !ok {
|
|
// The entry is missing in the map. Try creating it.
|
|
v = &totalStateValue{
|
|
lastValues: make(map[string]totalLastValueState),
|
|
}
|
|
outputKey = bytesutil.InternString(outputKey)
|
|
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
|
if loaded {
|
|
// Use the entry created by a concurrent goroutine.
|
|
v = vNew
|
|
}
|
|
}
|
|
sv := v.(*totalStateValue)
|
|
sv.mu.Lock()
|
|
deleted := sv.deleted
|
|
if !deleted {
|
|
lv, ok := sv.lastValues[inputKey]
|
|
if ok || keepFirstSample {
|
|
if s.timestamp < lv.timestamp {
|
|
// Skip out of order sample
|
|
sv.mu.Unlock()
|
|
continue
|
|
}
|
|
|
|
if s.value >= lv.value {
|
|
sv.total += s.value - lv.value
|
|
} else {
|
|
// counter reset
|
|
sv.total += s.value
|
|
}
|
|
}
|
|
lv.value = s.value
|
|
lv.timestamp = s.timestamp
|
|
lv.deleteDeadline = deleteDeadline
|
|
|
|
inputKey = bytesutil.InternString(inputKey)
|
|
sv.lastValues[inputKey] = lv
|
|
sv.deleteDeadline = deleteDeadline
|
|
}
|
|
sv.mu.Unlock()
|
|
if deleted {
|
|
// The entry has been deleted by the concurrent call to flushState
|
|
// Try obtaining and updating the entry again.
|
|
goto again
|
|
}
|
|
}
|
|
}
|
|
|
|
func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
|
|
currentTime := fasttime.UnixTimestamp()
|
|
currentTimeMsec := int64(currentTime) * 1000
|
|
|
|
suffix := as.getSuffix()
|
|
|
|
as.removeOldEntries(currentTime)
|
|
|
|
m := &as.m
|
|
m.Range(func(k, v any) bool {
|
|
sv := v.(*totalStateValue)
|
|
|
|
sv.mu.Lock()
|
|
total := sv.total
|
|
if resetState {
|
|
if as.resetTotalOnFlush {
|
|
sv.total = 0
|
|
} else if math.Abs(sv.total) >= (1 << 53) {
|
|
// It is time to reset the entry, since it starts losing float64 precision
|
|
sv.total = 0
|
|
}
|
|
}
|
|
deleted := sv.deleted
|
|
sv.mu.Unlock()
|
|
|
|
if !deleted {
|
|
key := k.(string)
|
|
ctx.appendSeries(key, suffix, currentTimeMsec, total)
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (as *totalAggrState) getSuffix() string {
|
|
// Note: this function is at hot path, so it shouldn't allocate.
|
|
if as.resetTotalOnFlush {
|
|
if as.keepFirstSample {
|
|
return "increase"
|
|
}
|
|
return "increase_prometheus"
|
|
}
|
|
if as.keepFirstSample {
|
|
return "total"
|
|
}
|
|
return "total_prometheus"
|
|
}
|
|
|
|
func (as *totalAggrState) removeOldEntries(currentTime uint64) {
|
|
m := &as.m
|
|
m.Range(func(k, v any) bool {
|
|
sv := v.(*totalStateValue)
|
|
|
|
sv.mu.Lock()
|
|
if currentTime > sv.deleteDeadline {
|
|
// Mark the current entry as deleted
|
|
sv.deleted = true
|
|
sv.mu.Unlock()
|
|
m.Delete(k)
|
|
return true
|
|
}
|
|
|
|
// Delete outdated entries in sv.lastValues
|
|
lvs := sv.lastValues
|
|
for k1, lv := range lvs {
|
|
if currentTime > lv.deleteDeadline {
|
|
delete(lvs, k1)
|
|
}
|
|
}
|
|
sv.mu.Unlock()
|
|
return true
|
|
})
|
|
}
|