mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-25 20:00:06 +01:00
d577657fb7
- Make sure that the last successfully loaded config is used on hot-reload failure - Properly cleanup resources occupied by already initialized aggregators when the current aggregator fails to be initialized - Expose distinct vmagent_streamaggr_config_reload* metrics per each -remoteWrite.streamAggr.config This should simplify monitoring and debugging failed reloads - Remove race condition at app/vminsert/common.MustStopStreamAggr when calling sa.MustStop() while sa could be in use at realoadSaConfig() - Remove lib/streamaggr.aggregator.hasState global variable, since it may negatively impact scalability on system with big number of CPU cores at hasState.Store(true) call inside aggregator.Push(). - Remove fine-grained aggregator reload - reload all the aggregators on config change instead. This simplifies the code a bit. The fine-grained aggregator reload may be returned back if there will be demand from real users for it. - Check -relabelConfig and -streamAggr.config files when single-node VictoriaMetrics runs with -dryRun flag - Return back accidentally removed changelog for v1.87.4 at docs/CHANGELOG.md Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639
796 lines
17 KiB
Go
796 lines
17 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
|
)
|
|
|
|
func TestAggregatorsFailure(t *testing.T) {
|
|
f := func(config string) {
|
|
t.Helper()
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
panic(fmt.Errorf("pushFunc shouldn't be called"))
|
|
}
|
|
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
|
if err == nil {
|
|
t.Fatalf("expecting non-nil error")
|
|
}
|
|
if a != nil {
|
|
t.Fatalf("expecting nil a")
|
|
}
|
|
}
|
|
|
|
// Invalid config
|
|
f(`foobar`)
|
|
|
|
// Unknown option
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
foobar: baz
|
|
`)
|
|
|
|
// missing interval
|
|
f(`
|
|
- outputs: [total]
|
|
`)
|
|
|
|
// missing outputs
|
|
f(`
|
|
- interval: 1m
|
|
`)
|
|
|
|
// Invalid output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [foobar]
|
|
`)
|
|
|
|
// Negative interval
|
|
f(`- interval: -5m`)
|
|
// Too small interval
|
|
f(`- interval: 10ms`)
|
|
|
|
// Invalid input_relabel_configs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
input_relabel_configs:
|
|
- foo: bar
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
input_relabel_configs:
|
|
- action: replace
|
|
`)
|
|
|
|
// Invalid output_relabel_configs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
output_relabel_configs:
|
|
- foo: bar
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
output_relabel_configs:
|
|
- action: replace
|
|
`)
|
|
|
|
// Both by and without are non-empty
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
by: [foo]
|
|
without: [bar]
|
|
`)
|
|
|
|
// Invalid quantiles()
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles("]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles()"]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(foo)"]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(-0.5)"]
|
|
`)
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(1.5)"]
|
|
`)
|
|
}
|
|
|
|
func TestAggregatorsEqual(t *testing.T) {
|
|
f := func(a, b string, expectedResult bool) {
|
|
t.Helper()
|
|
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
|
|
aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
result := aa.Equal(ab)
|
|
if result != expectedResult {
|
|
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
|
|
}
|
|
}
|
|
f("", "", true)
|
|
f(`
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, ``, false)
|
|
f(`
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, `
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, true)
|
|
f(`
|
|
- outputs: [total]
|
|
interval: 3m
|
|
`, `
|
|
- outputs: [total]
|
|
interval: 5m
|
|
`, false)
|
|
}
|
|
|
|
func TestAggregatorsSuccess(t *testing.T) {
|
|
f := func(config, inputMetrics, outputMetricsExpected string) {
|
|
t.Helper()
|
|
|
|
// Initialize Aggregators
|
|
var tssOutput []prompbmarshal.TimeSeries
|
|
var tssOutputLock sync.Mutex
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
tssOutputLock.Lock()
|
|
for _, ts := range tss {
|
|
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
|
|
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
|
|
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
|
|
Labels: labelsCopy,
|
|
Samples: samplesCopy,
|
|
})
|
|
}
|
|
tssOutputLock.Unlock()
|
|
}
|
|
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
|
|
// Push the inputMetrics to Aggregators
|
|
tssInput := mustParsePromMetrics(inputMetrics)
|
|
a.Push(tssInput)
|
|
a.MustStop()
|
|
|
|
// Verify the tssOutput contains the expected metrics
|
|
tsStrings := make([]string, len(tssOutput))
|
|
for i, ts := range tssOutput {
|
|
tsStrings[i] = timeSeriesToString(ts)
|
|
}
|
|
sort.Strings(tsStrings)
|
|
outputMetrics := strings.Join(tsStrings, "")
|
|
if outputMetrics != outputMetricsExpected {
|
|
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
|
}
|
|
}
|
|
|
|
// Empty config
|
|
f(``, ``, ``)
|
|
f(``, `foo{bar="baz"} 1`, ``)
|
|
f(``, "foo 1\nbaz 2", ``)
|
|
|
|
// Empty by list - aggregate only by time
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [count_samples, sum_samples, count_series, last]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_count_samples 1
|
|
bar:1m_count_series 1
|
|
bar:1m_last 5
|
|
bar:1m_sum_samples 5
|
|
foo:1m_count_samples{abc="123"} 2
|
|
foo:1m_count_samples{abc="456",de="fg"} 1
|
|
foo:1m_count_series{abc="123"} 1
|
|
foo:1m_count_series{abc="456",de="fg"} 1
|
|
foo:1m_last{abc="123"} 8.5
|
|
foo:1m_last{abc="456",de="fg"} 8
|
|
foo:1m_sum_samples{abc="123"} 12.5
|
|
foo:1m_sum_samples{abc="456",de="fg"} 8
|
|
`)
|
|
|
|
// Special case: __name__ in `by` list - this is the same as empty `by` list
|
|
f(`
|
|
- interval: 1m
|
|
by: [__name__]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_count_samples 1
|
|
bar:1m_count_series 1
|
|
bar:1m_sum_samples 5
|
|
foo:1m_count_samples 3
|
|
foo:1m_count_series 2
|
|
foo:1m_sum_samples 20.5
|
|
`)
|
|
|
|
// Non-empty `by` list with non-existing labels
|
|
f(`
|
|
- interval: 1m
|
|
by: [foo, bar]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_by_bar_foo_count_samples 1
|
|
bar:1m_by_bar_foo_count_series 1
|
|
bar:1m_by_bar_foo_sum_samples 5
|
|
foo:1m_by_bar_foo_count_samples 3
|
|
foo:1m_by_bar_foo_count_series 2
|
|
foo:1m_by_bar_foo_sum_samples 20.5
|
|
`)
|
|
|
|
// Non-empty `by` list with existing label
|
|
f(`
|
|
- interval: 1m
|
|
by: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_by_abc_count_samples 1
|
|
bar:1m_by_abc_count_series 1
|
|
bar:1m_by_abc_sum_samples 5
|
|
foo:1m_by_abc_count_samples{abc="123"} 2
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
|
`)
|
|
|
|
// Non-empty `by` list with duplicate existing label
|
|
f(`
|
|
- interval: 1m
|
|
by: [abc, abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_by_abc_count_samples 1
|
|
bar:1m_by_abc_count_series 1
|
|
bar:1m_by_abc_sum_samples 5
|
|
foo:1m_by_abc_count_samples{abc="123"} 2
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
|
`)
|
|
|
|
// Non-empty `without` list with non-existing labels
|
|
f(`
|
|
- interval: 1m
|
|
without: [foo]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_without_foo_count_samples 1
|
|
bar:1m_without_foo_count_series 1
|
|
bar:1m_without_foo_sum_samples 5
|
|
foo:1m_without_foo_count_samples{abc="123"} 2
|
|
foo:1m_without_foo_count_samples{abc="456",de="fg"} 1
|
|
foo:1m_without_foo_count_series{abc="123"} 1
|
|
foo:1m_without_foo_count_series{abc="456",de="fg"} 1
|
|
foo:1m_without_foo_sum_samples{abc="123"} 12.5
|
|
foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8
|
|
`)
|
|
|
|
// Non-empty `without` list with existing labels
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_without_abc_count_samples 1
|
|
bar:1m_without_abc_count_series 1
|
|
bar:1m_without_abc_sum_samples 5
|
|
foo:1m_without_abc_count_samples 2
|
|
foo:1m_without_abc_count_samples{de="fg"} 1
|
|
foo:1m_without_abc_count_series 1
|
|
foo:1m_without_abc_count_series{de="fg"} 1
|
|
foo:1m_without_abc_sum_samples 12.5
|
|
foo:1m_without_abc_sum_samples{de="fg"} 8
|
|
`)
|
|
|
|
// Special case: __name__ in `without` list
|
|
f(`
|
|
- interval: 1m
|
|
without: [__name__]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `:1m_count_samples 1
|
|
:1m_count_samples{abc="123"} 2
|
|
:1m_count_samples{abc="456",de="fg"} 1
|
|
:1m_count_series 1
|
|
:1m_count_series{abc="123"} 1
|
|
:1m_count_series{abc="456",de="fg"} 1
|
|
:1m_sum_samples 5
|
|
:1m_sum_samples{abc="123"} 12.5
|
|
:1m_sum_samples{abc="456",de="fg"} 8
|
|
`)
|
|
|
|
// drop some input metrics
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
input_relabel_configs:
|
|
- if: 'foo'
|
|
action: drop
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_without_abc_count_samples 1
|
|
bar:1m_without_abc_count_series 1
|
|
bar:1m_without_abc_sum_samples 5
|
|
`)
|
|
|
|
// rename output metrics
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
output_relabel_configs:
|
|
- action: replace_all
|
|
source_labels: [__name__]
|
|
regex: ":|_"
|
|
replacement: "-"
|
|
target_label: __name__
|
|
- action: drop
|
|
source_labels: [de]
|
|
regex: fg
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar-1m-without-abc-count-samples 1
|
|
bar-1m-without-abc-count-series 1
|
|
bar-1m-without-abc-sum-samples 5
|
|
foo-1m-without-abc-count-samples 2
|
|
foo-1m-without-abc-count-series 1
|
|
foo-1m-without-abc-sum-samples 12.5
|
|
`)
|
|
|
|
// match doesn't match anything
|
|
f(`
|
|
- interval: 1m
|
|
without: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
match: '{non_existing_label!=""}'
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, ``)
|
|
|
|
// match matches foo series with non-empty abc label
|
|
f(`
|
|
- interval: 1m
|
|
by: [abc]
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
match: 'foo{abc=~".+"}'
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `foo:1m_by_abc_count_samples{abc="123"} 2
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
|
`)
|
|
|
|
// total output for non-repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 4.34
|
|
`, `bar:1m_total{baz="qwe"} 0
|
|
foo:1m_total 0
|
|
`)
|
|
|
|
// total output for repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [total]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_total{baz="qwe"} 5.02
|
|
bar:1m_total{baz="qwer"} 1
|
|
foo:1m_total 0
|
|
foo:1m_total{baz="qwe"} 15
|
|
`)
|
|
|
|
// total output for repeated series with group by __name__
|
|
f(`
|
|
- interval: 1m
|
|
by: [__name__]
|
|
outputs: [total]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_total 6.02
|
|
foo:1m_total 15
|
|
`)
|
|
|
|
// increase output for non-repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [increase]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 4.34
|
|
`, `bar:1m_increase{baz="qwe"} 0
|
|
foo:1m_increase 0
|
|
`)
|
|
|
|
// increase output for repeated series
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [increase]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_increase{baz="qwe"} 5.02
|
|
bar:1m_increase{baz="qwer"} 1
|
|
foo:1m_increase 0
|
|
foo:1m_increase{baz="qwe"} 15
|
|
`)
|
|
|
|
// multiple aggregate configs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [count_series, sum_samples]
|
|
- interval: 5m
|
|
by: [bar]
|
|
outputs: [sum_samples]
|
|
`, `
|
|
foo 1
|
|
foo{bar="baz"} 2
|
|
foo 3.3
|
|
`, `foo:1m_count_series 1
|
|
foo:1m_count_series{bar="baz"} 1
|
|
foo:1m_sum_samples 4.3
|
|
foo:1m_sum_samples{bar="baz"} 2
|
|
foo:5m_by_bar_sum_samples 4.3
|
|
foo:5m_by_bar_sum_samples{bar="baz"} 2
|
|
`)
|
|
|
|
// min and max outputs
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [min, max]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_max 5
|
|
bar:1m_min 5
|
|
foo:1m_max{abc="123"} 8.5
|
|
foo:1m_max{abc="456",de="fg"} 8
|
|
foo:1m_min{abc="123"} 4
|
|
foo:1m_min{abc="456",de="fg"} 8
|
|
`)
|
|
|
|
// avg output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [avg]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_avg 5
|
|
foo:1m_avg{abc="123"} 6.25
|
|
foo:1m_avg{abc="456",de="fg"} 8
|
|
`)
|
|
|
|
// stddev output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [stddev]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_stddev 0
|
|
foo:1m_stddev{abc="123"} 2.25
|
|
foo:1m_stddev{abc="456",de="fg"} 0
|
|
`)
|
|
|
|
// stdvar output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [stdvar]
|
|
`, `
|
|
foo{abc="123"} 4
|
|
bar 5
|
|
foo{abc="123"} 8.5
|
|
foo{abc="456",de="fg"} 8
|
|
`, `bar:1m_stdvar 0
|
|
foo:1m_stdvar{abc="123"} 5.0625
|
|
foo:1m_stdvar{abc="456",de="fg"} 0
|
|
`)
|
|
|
|
// histogram_bucket output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [histogram_bucket]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.136e+01...1.292e+01"} 2
|
|
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3
|
|
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1
|
|
cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1
|
|
`)
|
|
|
|
// histogram_bucket output without cpu
|
|
f(`
|
|
- interval: 1m
|
|
without: [cpu]
|
|
outputs: [histogram_bucket]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.136e+01...1.292e+01"} 2
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1
|
|
`)
|
|
|
|
// quantiles output
|
|
f(`
|
|
- interval: 1m
|
|
outputs: ["quantiles(0, 0.5, 1)"]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_quantiles{cpu="1",quantile="0"} 12
|
|
cpu_usage:1m_quantiles{cpu="1",quantile="0.5"} 13.3
|
|
cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90
|
|
`)
|
|
|
|
// quantiles output without cpu
|
|
f(`
|
|
- interval: 1m
|
|
without: [cpu]
|
|
outputs: ["quantiles(0, 0.5, 1)"]
|
|
`, `
|
|
cpu_usage{cpu="1"} 12.5
|
|
cpu_usage{cpu="1"} 13.3
|
|
cpu_usage{cpu="1"} 13
|
|
cpu_usage{cpu="1"} 12
|
|
cpu_usage{cpu="1"} 14
|
|
cpu_usage{cpu="1"} 25
|
|
cpu_usage{cpu="2"} 90
|
|
`, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12
|
|
cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3
|
|
cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
|
|
`)
|
|
}
|
|
|
|
func TestAggregatorsWithDedupInterval(t *testing.T) {
|
|
f := func(config, inputMetrics, outputMetricsExpected string) {
|
|
t.Helper()
|
|
|
|
// Initialize Aggregators
|
|
var tssOutput []prompbmarshal.TimeSeries
|
|
var tssOutputLock sync.Mutex
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
tssOutputLock.Lock()
|
|
for _, ts := range tss {
|
|
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
|
|
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
|
|
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
|
|
Labels: labelsCopy,
|
|
Samples: samplesCopy,
|
|
})
|
|
}
|
|
tssOutputLock.Unlock()
|
|
}
|
|
const dedupInterval = time.Hour
|
|
a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval)
|
|
if err != nil {
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
}
|
|
|
|
// Push the inputMetrics to Aggregators
|
|
tssInput := mustParsePromMetrics(inputMetrics)
|
|
a.Push(tssInput)
|
|
if a != nil {
|
|
for _, aggr := range a.as {
|
|
aggr.dedupFlush()
|
|
aggr.flush()
|
|
}
|
|
}
|
|
a.MustStop()
|
|
|
|
// Verify the tssOutput contains the expected metrics
|
|
tsStrings := make([]string, len(tssOutput))
|
|
for i, ts := range tssOutput {
|
|
tsStrings[i] = timeSeriesToString(ts)
|
|
}
|
|
sort.Strings(tsStrings)
|
|
outputMetrics := strings.Join(tsStrings, "")
|
|
if outputMetrics != outputMetricsExpected {
|
|
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
|
}
|
|
}
|
|
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [sum_samples]
|
|
`, `
|
|
foo 123
|
|
bar 567
|
|
`, `bar:1m_sum_samples 567
|
|
foo:1m_sum_samples 123
|
|
`)
|
|
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [sum_samples]
|
|
`, `
|
|
foo 123
|
|
bar{baz="qwe"} 1.32
|
|
bar{baz="qwe"} 4.34
|
|
bar{baz="qwe"} 2
|
|
foo{baz="qwe"} -5
|
|
bar{baz="qwer"} 343
|
|
bar{baz="qwer"} 344
|
|
foo{baz="qwe"} 10
|
|
`, `bar:1m_sum_samples{baz="qwe"} 2
|
|
bar:1m_sum_samples{baz="qwer"} 344
|
|
foo:1m_sum_samples 123
|
|
foo:1m_sum_samples{baz="qwe"} 10
|
|
`)
|
|
}
|
|
|
|
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
|
|
labelsString := promrelabel.LabelsToString(ts.Labels)
|
|
if len(ts.Samples) != 1 {
|
|
panic(fmt.Errorf("unexpected number of samples for %s: %d; want 1", labelsString, len(ts.Samples)))
|
|
}
|
|
return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value)
|
|
}
|
|
|
|
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
|
|
var rows prometheus.Rows
|
|
errLogger := func(s string) {
|
|
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
|
|
}
|
|
rows.UnmarshalWithErrLogger(s, errLogger)
|
|
var tss []prompbmarshal.TimeSeries
|
|
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
|
|
for _, row := range rows.Rows {
|
|
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
|
|
labels = append(labels, prompbmarshal.Label{
|
|
Name: "__name__",
|
|
Value: row.Metric,
|
|
})
|
|
for _, tag := range row.Tags {
|
|
labels = append(labels, prompbmarshal.Label{
|
|
Name: tag.Key,
|
|
Value: tag.Value,
|
|
})
|
|
}
|
|
samples = append(samples, prompbmarshal.Sample{
|
|
Value: row.Value,
|
|
Timestamp: row.Timestamp,
|
|
})
|
|
ts := prompbmarshal.TimeSeries{
|
|
Labels: labels,
|
|
Samples: samples[len(samples)-1:],
|
|
}
|
|
tss = append(tss, ts)
|
|
}
|
|
return tss
|
|
}
|