mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 08:32:18 +01:00
7acc54025e
This reverts commit 9e99f2f5b3
.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4068
Reason for revert: this breaks valid use cases:
- If timestamps aren't specified in the incoming samples on purpose. For example, if stream aggregation is used
as StatsD replacement. StatsD protocol has no timestamp concept for incoming samples.
See https://github.com/b/statsd_spec
- If all the samples must be aggregated, even if they contain stale timestamps.
for example, if the stream aggregation produces some counter of some events,
it may be better to count all the events even if they were delayed before
being ingested into VictoriaMetrics.
Is is also unclear how to determine whether the sample becomes stale.
For example, if the aggregation interval equals to 1h, and the previous
aggregation cycle just finished 10 minutes ago, what to do with the newly
incoming sample with the timestamp 30 minutes older than the current time?
The answer highly depends on the context, so it is unsafe to uncoditionally
use a single logic for dropping the old samples here.
796 lines
17 KiB
Go
796 lines
17 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
|
)
|
|
|
|
func TestAggregatorsFailure(t *testing.T) {
|
|
f := func(config string) {
|
|
t.Helper()
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
panic(fmt.Errorf("pushFunc shouldn't be called"))
|
|
}
|
|
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
|
if err == nil {
|
|
t.Fatalf("expecting non-nil error")
|
|
}
|
|
if a != nil {
|
|
t.Fatalf("expecting nil a")
|
|
}
|
|
}
|
|
|
|
// Invalid config
|
|
f(`foobar`)
|
|
|
|
// Unknown option
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
foobar: baz
|
|
`)
|
|
|
|
// missing interval
|
|
f(`
|
|
- outputs: [total]
|
|
`)
|
|
|
|
// missing outputs
|
|
f(`
|
|
- interval: 1m
|
|
`)
|
|
|
|
// Invalid output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [foobar]
|
|
`)
|
|
|
|
// Negative interval
|
|
f(`- interval: -5m`)
|
|
// Too small interval
|
|
f(`- interval: 10ms`)
|
|
|
|
// Invalid input_relabel_configs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
input_relabel_configs:
|
|
- foo: bar
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
input_relabel_configs:
|
|
- action: replace
|
|
`)
|
|
|
|
// Invalid output_relabel_configs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
output_relabel_configs:
|
|
- foo: bar
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
output_relabel_configs:
|
|
- action: replace
|
|
`)
|
|
|
|
// Both by and without are non-empty
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
by: [foo]
|
|
without: [bar]
|
|
`)
|
|
|
|
// Invalid quantiles()
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles("]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles()"]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(foo)"]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(-0.5)"]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(1.5)"]
|
|
`)
|
|
}
|
|
|
|
func TestAggregatorsEqual(t *testing.T) {
|
|
f := func(a, b string, expectedResult bool) {
|
|
t.Helper()
|
|
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
|
|
aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
result := aa.Equal(ab)
|
|
if result != expectedResult {
|
|
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
|
|
}
|
|
}
|
|
f("", "", true)
|
|
f(`
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, ``, false)
|
|
f(`
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, `
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, true)
|
|
f(`
|
|
- outputs: [total]
|
|
interval: 3m
|
|
`, `
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, false)
|
|
}
|
|
|
|
func TestAggregatorsSuccess(t *testing.T) {
|
|
f := func(config, inputMetrics, outputMetricsExpected string) {
|
|
t.Helper()
|
|
|
|
// Initialize Aggregators
|
|
var tssOutput []prompbmarshal.TimeSeries
|
|
var tssOutputLock sync.Mutex
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
tssOutputLock.Lock()
|
|
for _, ts := range tss {
|
|
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
|
|
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
|
|
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
|
|
Labels: labelsCopy,
|
|
Samples: samplesCopy,
|
|
})
|
|
}
|
|
tssOutputLock.Unlock()
|
|
}
|
|
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
|
|
// Push the inputMetrics to Aggregators
|
|
tssInput := mustParsePromMetrics(inputMetrics)
|
|
a.Push(tssInput)
|
|
a.MustStop()
|
|
|
|
// Verify the tssOutput contains the expected metrics
|
|
tsStrings := make([]string, len(tssOutput))
|
|
for i, ts := range tssOutput {
|
|
tsStrings[i] = timeSeriesToString(ts)
|
|
}
|
|
sort.Strings(tsStrings)
|
|
outputMetrics := strings.Join(tsStrings, "")
|
|
if outputMetrics != outputMetricsExpected {
|
|
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
|
}
|
|
}
|
|
|
|
// Empty config
|
|
f(``, ``, ``)
|
|
f(``, `foo{bar="baz"} 1`, ``)
|
|
f(``, "foo 1\nbaz 2", ``)
|
|
|
|
// Empty by list - aggregate only by time
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [count_samples, sum_samples, count_series, last]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_count_samples 1
|
|
bar:1m_count_series 1
|
|
bar:1m_last 5
|
|
bar:1m_sum_samples 5
|
|
foo:1m_count_samples{abc="123"} 2
|
|
foo:1m_count_samples{abc="456",de="fg"} 1
|
|
foo:1m_count_series{abc="123"} 1
|
|
foo:1m_count_series{abc="456",de="fg"} 1
|
|
foo:1m_last{abc="123"} 8.5
|
|
foo:1m_last{abc="456",de="fg"} 8
|
|
foo:1m_sum_samples{abc="123"} 12.5
|
|
foo:1m_sum_samples{abc="456",de="fg"} 8
|
|
`)
|
|
|
|
// Special case: __name__ in `by` list - this is the same as empty `by` list
|
|
f(`
|
|
- interval: 1m
|
|
by: [__name__]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_count_samples 1
|
|
bar:1m_count_series 1
|
|
bar:1m_sum_samples 5
|
|
foo:1m_count_samples 3
|
|
foo:1m_count_series 2
|
|
foo:1m_sum_samples 20.5
|
|
`)
|
|
|
|
// Non-empty `by` list with non-existing labels
|
|
f(`
|
|
- interval: 1m
|
|
by: [foo, bar]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_by_bar_foo_count_samples 1
|
|
bar:1m_by_bar_foo_count_series 1
|
|
bar:1m_by_bar_foo_sum_samples 5
|
|
foo:1m_by_bar_foo_count_samples 3
|
|
foo:1m_by_bar_foo_count_series 2
|
|
foo:1m_by_bar_foo_sum_samples 20.5
|
|
`)
|
|
|
|
// Non-empty `by` list with existing label
|
|
f(`
|
|
- interval: 1m
|
|
by: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_by_abc_count_samples 1
|
|
bar:1m_by_abc_count_series 1
|
|
bar:1m_by_abc_sum_samples 5
|
|
foo:1m_by_abc_count_samples{abc="123"} 2
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
|
`)
|
|
|
|
// Non-empty `by` list with duplicate existing label
|
|
f(`
|
|
- interval: 1m
|
|
by: [abc, abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_by_abc_count_samples 1
|
|
bar:1m_by_abc_count_series 1
|
|
bar:1m_by_abc_sum_samples 5
|
|
foo:1m_by_abc_count_samples{abc="123"} 2
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
|
`)
|
|
|
|
// Non-empty `without` list with non-existing labels
|
|
f(`
|
|
- interval: 1m
|
|
without: [foo]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_without_foo_count_samples 1
|
|
bar:1m_without_foo_count_series 1
|
|
bar:1m_without_foo_sum_samples 5
|
|
foo:1m_without_foo_count_samples{abc="123"} 2
|
|
foo:1m_without_foo_count_samples{abc="456",de="fg"} 1
|
|
foo:1m_without_foo_count_series{abc="123"} 1
|
|
foo:1m_without_foo_count_series{abc="456",de="fg"} 1
|
|
foo:1m_without_foo_sum_samples{abc="123"} 12.5
|
|
foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8
|
|
`)
|
|
|
|
// Non-empty `without` list with existing labels
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_without_abc_count_samples 1
|
|
bar:1m_without_abc_count_series 1
|
|
bar:1m_without_abc_sum_samples 5
|
|
foo:1m_without_abc_count_samples 2
|
|
foo:1m_without_abc_count_samples{de="fg"} 1
|
|
foo:1m_without_abc_count_series 1
|
|
foo:1m_without_abc_count_series{de="fg"} 1
|
|
foo:1m_without_abc_sum_samples 12.5
|
|
foo:1m_without_abc_sum_samples{de="fg"} 8
|
|
`)
|
|
|
|
// Special case: __name__ in `without` list
|
|
f(`
|
|
- interval: 1m
|
|
without: [__name__]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `:1m_count_samples 1
|
|
:1m_count_samples{abc="123"} 2
|
|
:1m_count_samples{abc="456",de="fg"} 1
|
|
:1m_count_series 1
|
|
:1m_count_series{abc="123"} 1
|
|
:1m_count_series{abc="456",de="fg"} 1
|
|
:1m_sum_samples 5
|
|
:1m_sum_samples{abc="123"} 12.5
|
|
:1m_sum_samples{abc="456",de="fg"} 8
|
|
`)
|
|
|
|
// drop some input metrics
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
input_relabel_configs:
|
|
- if: 'foo'
|
|
action: drop
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_without_abc_count_samples 1
|
|
bar:1m_without_abc_count_series 1
|
|
bar:1m_without_abc_sum_samples 5
|
|
`)
|
|
|
|
// rename output metrics
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
output_relabel_configs:
|
|
- action: replace_all
|
|
source_labels: [__name__]
|
|
regex: ":|_"
|
|
replacement: "-"
|
|
target_label: __name__
|
|
- action: drop
|
|
source_labels: [de]
|
|
regex: fg
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar-1m-without-abc-count-samples 1
|
|
bar-1m-without-abc-count-series 1
|
|
bar-1m-without-abc-sum-samples 5
|
|
foo-1m-without-abc-count-samples 2
|
|
foo-1m-without-abc-count-series 1
|
|
foo-1m-without-abc-sum-samples 12.5
|
|
`)
|
|
|
|
// match doesn't match anything
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
match: '{non_existing_label!=""}'
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, ``)
|
|
|
|
// match matches foo series with non-empty abc label
|
|
f(`
|
|
- interval: 1m
|
|
by: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
match: 'foo{abc=~".+"}'
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `foo:1m_by_abc_count_samples{abc="123"} 2
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
|
`)
|
|
|
|
// total output for non-repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 4.34
|
|
`, `bar:1m_total{baz="qwe"} 0
|
|
foo:1m_total 0
|
|
`)
|
|
|
|
// total output for repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_total{baz="qwe"} 5.02
|
|
bar:1m_total{baz="qwer"} 1
|
|
foo:1m_total 0
|
|
foo:1m_total{baz="qwe"} 15
|
|
`)
|
|
|
|
// total output for repeated series with group by __name__
|
|
f(`
|
|
- interval: 1m
|
|
by: [__name__]
|
|
outputs: [total]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_total 6.02
|
|
foo:1m_total 15
|
|
`)
|
|
|
|
// increase output for non-repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [increase]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 4.34
|
|
`, `bar:1m_increase{baz="qwe"} 0
|
|
foo:1m_increase 0
|
|
`)
|
|
|
|
// increase output for repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [increase]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_increase{baz="qwe"} 5.02
|
|
bar:1m_increase{baz="qwer"} 1
|
|
foo:1m_increase 0
|
|
foo:1m_increase{baz="qwe"} 15
|
|
`)
|
|
|
|
// multiple aggregate configs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [count_series, sum_samples]
|
|
- interval: 5m
|
|
by: [bar]
|
|
outputs: [sum_samples]
|
|
`, `
|
|
foo 1
|
|
foo{bar="baz"} 2
|
|
foo 3.3
|
|
`, `foo:1m_count_series 1
|
|
foo:1m_count_series{bar="baz"} 1
|
|
foo:1m_sum_samples 4.3
|
|
foo:1m_sum_samples{bar="baz"} 2
|
|
foo:5m_by_bar_sum_samples 4.3
|
|
foo:5m_by_bar_sum_samples{bar="baz"} 2
|
|
`)
|
|
|
|
// min and max outputs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [min, max]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_max 5
|
|
bar:1m_min 5
|
|
foo:1m_max{abc="123"} 8.5
|
|
foo:1m_max{abc="456",de="fg"} 8
|
|
foo:1m_min{abc="123"} 4
|
|
foo:1m_min{abc="456",de="fg"} 8
|
|
`)
|
|
|
|
// avg output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [avg]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_avg 5
|
|
foo:1m_avg{abc="123"} 6.25
|
|
foo:1m_avg{abc="456",de="fg"} 8
|
|
`)
|
|
|
|
// stddev output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [stddev]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_stddev 0
|
|
foo:1m_stddev{abc="123"} 2.25
|
|
foo:1m_stddev{abc="456",de="fg"} 0
|
|
`)
|
|
|
|
// stdvar output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [stdvar]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_stdvar 0
|
|
foo:1m_stdvar{abc="123"} 5.0625
|
|
foo:1m_stdvar{abc="456",de="fg"} 0
|
|
`)
|
|
|
|
// histogram_bucket output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [histogram_bucket]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.136e+01...1.292e+01"} 2
|
|
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3
|
|
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1
|
|
cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1
|
|
`)
|
|
|
|
// histogram_bucket output without cpu
|
|
f(`
|
|
- interval: 1m
|
|
without: [cpu]
|
|
outputs: [histogram_bucket]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.136e+01...1.292e+01"} 2
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1
|
|
`)
|
|
|
|
// quantiles output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(0, 0.5, 1)"]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_quantiles{cpu="1",quantile="0"} 12
|
|
cpu_usage:1m_quantiles{cpu="1",quantile="0.5"} 13.3
|
|
cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90
|
|
`)
|
|
|
|
// quantiles output without cpu
|
|
f(`
|
|
- interval: 1m
|
|
without: [cpu]
|
|
outputs: ["quantiles(0, 0.5, 1)"]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12
|
|
cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3
|
|
cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
|
|
`)
|
|
}
|
|
|
|
func TestAggregatorsWithDedupInterval(t *testing.T) {
|
|
f := func(config, inputMetrics, outputMetricsExpected string) {
|
|
t.Helper()
|
|
|
|
// Initialize Aggregators
|
|
var tssOutput []prompbmarshal.TimeSeries
|
|
var tssOutputLock sync.Mutex
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
tssOutputLock.Lock()
|
|
for _, ts := range tss {
|
|
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
|
|
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
|
|
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
|
|
Labels: labelsCopy,
|
|
Samples: samplesCopy,
|
|
})
|
|
}
|
|
tssOutputLock.Unlock()
|
|
}
|
|
const dedupInterval = time.Hour
|
|
a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
|
|
// Push the inputMetrics to Aggregators
|
|
tssInput := mustParsePromMetrics(inputMetrics)
|
|
a.Push(tssInput)
|
|
if a != nil {
|
|
for _, aggr := range a.as {
|
|
aggr.dedupFlush()
|
|
aggr.flush()
|
|
}
|
|
}
|
|
a.MustStop()
|
|
|
|
// Verify the tssOutput contains the expected metrics
|
|
tsStrings := make([]string, len(tssOutput))
|
|
for i, ts := range tssOutput {
|
|
tsStrings[i] = timeSeriesToString(ts)
|
|
}
|
|
sort.Strings(tsStrings)
|
|
outputMetrics := strings.Join(tsStrings, "")
|
|
if outputMetrics != outputMetricsExpected {
|
|
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
|
}
|
|
}
|
|
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [sum_samples]
|
|
`, `
|
|
foo 123
|
|
bar 567
|
|
`, `bar:1m_sum_samples 567
|
|
foo:1m_sum_samples 123
|
|
`)
|
|
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [sum_samples]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_sum_samples{baz="qwe"} 2
|
|
bar:1m_sum_samples{baz="qwer"} 344
|
|
foo:1m_sum_samples 123
|
|
foo:1m_sum_samples{baz="qwe"} 10
|
|
`)
|
|
}
|
|
|
|
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
|
|
labelsString := promrelabel.LabelsToString(ts.Labels)
|
|
if len(ts.Samples) != 1 {
|
|
panic(fmt.Errorf("unexpected number of samples for %s: %d; want 1", labelsString, len(ts.Samples)))
|
|
}
|
|
return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value)
|
|
}
|
|
|
|
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
|
|
var rows prometheus.Rows
|
|
errLogger := func(s string) {
|
|
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
|
|
}
|
|
rows.UnmarshalWithErrLogger(s, errLogger)
|
|
var tss []prompbmarshal.TimeSeries
|
|
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
|
|
for _, row := range rows.Rows {
|
|
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
|
|
labels = append(labels, prompbmarshal.Label{
|
|
Name: "__name__",
|
|
Value: row.Metric,
|
|
})
|
|
for _, tag := range row.Tags {
|
|
labels = append(labels, prompbmarshal.Label{
|
|
Name: tag.Key,
|
|
Value: tag.Value,
|
|
})
|
|
}
|
|
samples = append(samples, prompbmarshal.Sample{
|
|
Value: row.Value,
|
|
Timestamp: row.Timestamp,
|
|
})
|
|
ts := prompbmarshal.TimeSeries{
|
|
Labels: labels,
|
|
Samples: samples[len(samples)-1:],
|
|
}
|
|
tss = append(tss, ts)
|
|
}
|
|
return tss
|
|
}
|