From 4553521f9a90c7a2eaaa5f5dded423d5c2fc392f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 18 Mar 2024 01:02:28 +0200 Subject: [PATCH] lib/streamaggr: ignore out of order samples for `last` output This is a follow-up for 6a465f6e299dff033754c054f3646b2388281feb Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5931 --- docs/stream-aggregation.md | 6 +++--- lib/streamaggr/last.go | 15 ++++++++++----- lib/streamaggr/streamaggr_test.go | 9 +++++---- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 2bde2c64a..e0f05bece 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -19,10 +19,10 @@ The aggregation is applied to all the metrics received via any [supported data i and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter) after applying all the configured [relabeling stages](https://docs.victoriametrics.com/vmagent.html#relabeling). -Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). -It expects that the ingested samples have timestamps close to the current time. +By default stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples). -Stream aggregation is configured via the following command-line flags: +Stream aggregation can be configured via the following command-line flags: - `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent.html). This flag can be specified individually per each `-remoteWrite.url`. diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index 0fd39580c..c8c7881ed 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -12,9 +12,10 @@ type lastAggrState struct { } type lastStateValue struct { - mu sync.Mutex - last float64 - deleted bool + mu sync.Mutex + last float64 + timestamp int64 + deleted bool } func newLastAggrState() *lastAggrState { @@ -31,7 +32,8 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { if !ok { // The entry is missing in the map. Try creating it. v = &lastStateValue{ - last: s.value, + last: s.value, + timestamp: s.timestamp, } vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { @@ -45,7 +47,10 @@ func (as *lastAggrState) pushSamples(samples []pushSample) { sv.mu.Lock() deleted := sv.deleted if !deleted { - sv.last = s.value + if s.timestamp >= sv.timestamp { + sv.last = s.value + sv.timestamp = s.timestamp + } } sv.mu.Unlock() if deleted { diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 0808d0f42..754c06428 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -254,13 +254,14 @@ func TestAggregatorsSuccess(t *testing.T) { outputs: [count_samples, sum_samples, count_series, last] `, ` foo{abc="123"} 4 -bar 5 +bar 5 100 +bar 34 10 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 -`, `bar:1m_count_samples 1 +`, `bar:1m_count_samples 2 bar:1m_count_series 1 bar:1m_last 5 -bar:1m_sum_samples 5 +bar:1m_sum_samples 39 foo:1m_count_samples{abc="123"} 2 foo:1m_count_samples{abc="456",de="fg"} 1 foo:1m_count_series{abc="123"} 1 @@ -269,7 +270,7 @@ foo:1m_last{abc="123"} 8.5 foo:1m_last{abc="456",de="fg"} 8 foo:1m_sum_samples{abc="123"} 12.5 foo:1m_sum_samples{abc="456",de="fg"} 8 -`, "1111") +`, "11111") // Special case: __name__ in `by` list - this is the same as empty `by` list f(`