lib/streamaggr: discard samples with timestamps outside of aggregation interval (#4199)

* lib/streamaggr: discard samples with timestamps not matching aggregation interval

Samples with timestamps lower than `now - aggregation_interval` are likely to be written via backfilling and should not be used for calculation of aggregation.
See #4068

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/streamaggr: make log message more descriptive, fix imports

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2023-04-27 13:59:49 +04:00 committed by GitHub
parent 03150c8973
commit 9e99f2f5b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 108 additions and 0 deletions

View File

@ -13,11 +13,13 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
@ -210,6 +212,9 @@ type aggregator struct {
without []string without []string
aggregateOnlyByTime bool aggregateOnlyByTime bool
// interval is the interval for aggregating input samples
interval time.Duration
// dedupAggr is set to non-nil if input samples must be de-duplicated according // dedupAggr is set to non-nil if input samples must be de-duplicated according
// to the dedupInterval passed to newAggregator(). // to the dedupInterval passed to newAggregator().
dedupAggr *lastAggrState dedupAggr *lastAggrState
@ -228,6 +233,10 @@ type aggregator struct {
wg sync.WaitGroup wg sync.WaitGroup
stopCh chan struct{} stopCh chan struct{}
// tooOldSamplesDroppedTotal is the total number of dropped samples due to being too old.
// stored in the aggregator in order to avoid creating a metric if aggregation is not used.
tooOldSamplesDroppedTotal *metrics.Counter
} }
type aggrState interface { type aggrState interface {
@ -362,6 +371,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
by: by, by: by,
without: without, without: without,
aggregateOnlyByTime: aggregateOnlyByTime, aggregateOnlyByTime: aggregateOnlyByTime,
interval: interval,
dedupAggr: dedupAggr, dedupAggr: dedupAggr,
aggrStates: aggrStates, aggrStates: aggrStates,
@ -370,6 +380,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
suffix: suffix, suffix: suffix,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
tooOldSamplesDroppedTotal: metrics.GetOrCreateCounter(`vmagent_streamaggr_samples_dropped_total{reason="too_old"}`),
} }
if dedupAggr != nil { if dedupAggr != nil {
@ -507,6 +519,7 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
labels := promutils.GetLabels() labels := promutils.GetLabels()
tmpLabels := promutils.GetLabels() tmpLabels := promutils.GetLabels()
bb := bbPool.Get() bb := bbPool.Get()
minAllowedTimestamp := int64(float64(fasttime.UnixTimestamp())-a.interval.Seconds()) * 1000
for _, ts := range tss { for _, ts := range tss {
if !a.match.Match(ts.Labels) { if !a.match.Match(ts.Labels) {
continue continue
@ -535,6 +548,11 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
} }
for _, sample := range ts.Samples { for _, sample := range ts.Samples {
if sample.Timestamp < minAllowedTimestamp {
// Skip too old samples.
trackDroppedSample(&ts, sample.Timestamp, minAllowedTimestamp, a.tooOldSamplesDroppedTotal)
continue
}
a.pushSample(inputKey, outputKey, sample.Value) a.pushSample(inputKey, outputKey, sample.Value)
} }
} }
@ -543,6 +561,21 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
promutils.PutLabels(labels) promutils.PutLabels(labels)
} }
func trackDroppedSample(ts *prompbmarshal.TimeSeries, actualTs, minTs int64, m *metrics.Counter) {
select {
case <-droppedSamplesLogTicker.C:
// Do not call logger.WithThrottler() here, since this will result in increased CPU usage
// because LabelsToString() will be called with each trackDroppedSample call.
lbs := promrelabel.LabelsToString(ts.Labels)
logger.Warnf("skipping a sample for metric %s at streaming aggregation: timestamp too old: %d; minimal accepted timestamp: %d", lbs, actualTs, minTs)
default:
}
m.Inc()
}
var droppedSamplesLogTicker = time.NewTicker(5 * time.Second)
var bbPool bytesutil.ByteBufferPool var bbPool bytesutil.ByteBufferPool
func (a *aggregator) pushSample(inputKey, outputKey string, value float64) { func (a *aggregator) pushSample(inputKey, outputKey string, value float64) {

View File

@ -8,6 +8,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
@ -676,6 +677,72 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
`) `)
} }
func TestDiscardsSamplesWithOldTimestamps(t *testing.T) {
f := func(interval string, inputTs int64, mustMatch bool) {
t.Helper()
config := fmt.Sprintf(`
- interval: %s
outputs: ["avg"]
`, interval)
input := `
cpu_usage{cpu="1"} 1
`
expected := fmt.Sprintf(`cpu_usage:%s_avg{cpu="1"} 1
`, interval)
if !mustMatch {
expected = ""
}
// Initialize Aggregators
var tssOutput []prompbmarshal.TimeSeries
var tssOutputLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssOutputLock.Lock()
for _, ts := range tss {
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
Labels: labelsCopy,
Samples: samplesCopy,
})
}
tssOutputLock.Unlock()
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetricsSetTS(input, inputTs)
a.Push(tssInput)
a.MustStop()
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
tsStrings[i] = timeSeriesToString(ts)
}
sort.Strings(tsStrings)
outputMetrics := strings.Join(tsStrings, "")
if outputMetrics != expected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, expected)
}
}
currentTs := func() int64 {
return int64(fasttime.UnixTimestamp() * 1000)
}
f("1m", currentTs(), true)
f("1h", currentTs()-120*1000, true)
f("24h", currentTs()-60*60*1000, true)
f("1m", currentTs()-120*1000, false)
f("1h", currentTs()-2*60*60*1000*1000, false)
f("24h", currentTs()-25*60*60*1000, false)
}
func TestAggregatorsWithDedupInterval(t *testing.T) { func TestAggregatorsWithDedupInterval(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected string) { f := func(config, inputMetrics, outputMetricsExpected string) {
t.Helper() t.Helper()
@ -762,6 +829,10 @@ func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
} }
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
return mustParsePromMetricsSetTS(s, int64(fasttime.UnixTimestamp()*1000))
}
func mustParsePromMetricsSetTS(s string, timestamp int64) []prompbmarshal.TimeSeries {
var rows prometheus.Rows var rows prometheus.Rows
errLogger := func(s string) { errLogger := func(s string) {
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s)) panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
@ -770,6 +841,10 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries var tss []prompbmarshal.TimeSeries
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows)) samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
for _, row := range rows.Rows { for _, row := range rows.Rows {
if row.Timestamp == 0 && timestamp != 0 {
row.Timestamp = timestamp
}
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1) labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
labels = append(labels, prompbmarshal.Label{ labels = append(labels, prompbmarshal.Label{
Name: "__name__", Name: "__name__",