diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 03a0e0210..0a693f1ae 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -13,11 +13,13 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/VictoriaMetrics/metrics" "gopkg.in/yaml.v2" ) @@ -210,6 +212,9 @@ type aggregator struct { without []string aggregateOnlyByTime bool + // interval is the interval for aggregating input samples + interval time.Duration + // dedupAggr is set to non-nil if input samples must be de-duplicated according // to the dedupInterval passed to newAggregator(). dedupAggr *lastAggrState @@ -228,6 +233,10 @@ type aggregator struct { wg sync.WaitGroup stopCh chan struct{} + + // tooOldSamplesDroppedTotal is the total number of dropped samples due to being too old. + // stored in the aggregator in order to avoid creating a metric if aggregation is not used. + tooOldSamplesDroppedTotal *metrics.Counter } type aggrState interface { @@ -362,6 +371,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) by: by, without: without, aggregateOnlyByTime: aggregateOnlyByTime, + interval: interval, dedupAggr: dedupAggr, aggrStates: aggrStates, @@ -370,6 +380,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) suffix: suffix, stopCh: make(chan struct{}), + + tooOldSamplesDroppedTotal: metrics.GetOrCreateCounter(`vmagent_streamaggr_samples_dropped_total{reason="too_old"}`), } if dedupAggr != nil { @@ -507,6 +519,7 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) { labels := promutils.GetLabels() tmpLabels := promutils.GetLabels() bb := bbPool.Get() + minAllowedTimestamp := int64(float64(fasttime.UnixTimestamp())-a.interval.Seconds()) * 1000 for _, ts := range tss { if !a.match.Match(ts.Labels) { continue @@ -535,6 +548,11 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) { } for _, sample := range ts.Samples { + if sample.Timestamp < minAllowedTimestamp { + // Skip too old samples. + trackDroppedSample(&ts, sample.Timestamp, minAllowedTimestamp, a.tooOldSamplesDroppedTotal) + continue + } a.pushSample(inputKey, outputKey, sample.Value) } } @@ -543,6 +561,21 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) { promutils.PutLabels(labels) } +func trackDroppedSample(ts *prompbmarshal.TimeSeries, actualTs, minTs int64, m *metrics.Counter) { + select { + case <-droppedSamplesLogTicker.C: + // Do not call logger.WithThrottler() here, since this will result in increased CPU usage + // because LabelsToString() will be called with each trackDroppedSample call. + lbs := promrelabel.LabelsToString(ts.Labels) + logger.Warnf("skipping a sample for metric %s at streaming aggregation: timestamp too old: %d; minimal accepted timestamp: %d", lbs, actualTs, minTs) + default: + } + + m.Inc() +} + +var droppedSamplesLogTicker = time.NewTicker(5 * time.Second) + var bbPool bytesutil.ByteBufferPool func (a *aggregator) pushSample(inputKey, outputKey string, value float64) { diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 57c74aa8e..95dce6ff4 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" @@ -676,6 +677,72 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 `) } +func TestDiscardsSamplesWithOldTimestamps(t *testing.T) { + f := func(interval string, inputTs int64, mustMatch bool) { + t.Helper() + + config := fmt.Sprintf(` +- interval: %s + outputs: ["avg"] +`, interval) + input := ` +cpu_usage{cpu="1"} 1 +` + expected := fmt.Sprintf(`cpu_usage:%s_avg{cpu="1"} 1 +`, interval) + if !mustMatch { + expected = "" + } + + // Initialize Aggregators + var tssOutput []prompbmarshal.TimeSeries + var tssOutputLock sync.Mutex + pushFunc := func(tss []prompbmarshal.TimeSeries) { + tssOutputLock.Lock() + for _, ts := range tss { + labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) + samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) + tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ + Labels: labelsCopy, + Samples: samplesCopy, + }) + } + tssOutputLock.Unlock() + } + a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) + if err != nil { + t.Fatalf("cannot initialize aggregators: %s", err) + } + + // Push the inputMetrics to Aggregators + tssInput := mustParsePromMetricsSetTS(input, inputTs) + a.Push(tssInput) + a.MustStop() + + // Verify the tssOutput contains the expected metrics + tsStrings := make([]string, len(tssOutput)) + for i, ts := range tssOutput { + tsStrings[i] = timeSeriesToString(ts) + } + sort.Strings(tsStrings) + outputMetrics := strings.Join(tsStrings, "") + if outputMetrics != expected { + t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, expected) + } + } + currentTs := func() int64 { + return int64(fasttime.UnixTimestamp() * 1000) + } + + f("1m", currentTs(), true) + f("1h", currentTs()-120*1000, true) + f("24h", currentTs()-60*60*1000, true) + + f("1m", currentTs()-120*1000, false) + f("1h", currentTs()-2*60*60*1000*1000, false) + f("24h", currentTs()-25*60*60*1000, false) +} + func TestAggregatorsWithDedupInterval(t *testing.T) { f := func(config, inputMetrics, outputMetricsExpected string) { t.Helper() @@ -762,6 +829,10 @@ func timeSeriesToString(ts prompbmarshal.TimeSeries) string { } func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { + return mustParsePromMetricsSetTS(s, int64(fasttime.UnixTimestamp()*1000)) +} + +func mustParsePromMetricsSetTS(s string, timestamp int64) []prompbmarshal.TimeSeries { var rows prometheus.Rows errLogger := func(s string) { panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s)) @@ -770,6 +841,10 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { var tss []prompbmarshal.TimeSeries samples := make([]prompbmarshal.Sample, 0, len(rows.Rows)) for _, row := range rows.Rows { + if row.Timestamp == 0 && timestamp != 0 { + row.Timestamp = timestamp + } + labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1) labels = append(labels, prompbmarshal.Label{ Name: "__name__",