From e6e5b97e1ea0b2a73f7a2e65862cf3774a2e4871 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 24 Jan 2024 12:31:27 +0200 Subject: [PATCH] lib/streamaggr: expand `%{ENV}` placeholders in stream aggregation configs --- docs/CHANGELOG.md | 1 + docs/stream-aggregation.md | 1 + lib/streamaggr/streamaggr.go | 17 +++++++++-------- lib/streamaggr/streamaggr_test.go | 10 +++++----- lib/streamaggr/streamaggr_timing_test.go | 2 +- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 8c1b4dca39..ff1dc6066a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -36,6 +36,7 @@ The sandbox cluster installation is running under the constant load generated by * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to set `attach_metadata.node=true` option for all the [`kubernetes_sd_configs`](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) defined at [`-promscrape.config`](https://docs.victoriametrics.com/vmagent.html#quick-start) via `-promscrape.kubernetes.attachNodeMetadataAll` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4640). Thanks to @wasim-nihal for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5593). +* FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation.html): expand `%{ENV_VAR}` placeholders in config files with the corresponding environment variable values. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags: - `-datasource.oauth2.endpointParams` for `-datasource.url` - `-notifier.oauth2.endpointParams` for `-notifier.url` diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index bcafd97eb3..cf705538d2 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -29,6 +29,7 @@ Stream aggregation is configured via the following command-line flags: - `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). These flags must point to a file containing [stream aggregation config](#stream-aggregation-config). +The file may contain `%{ENV_VAR}` placeholders which are substituted by the corresponding `ENV_VAR` environment variable values. By default, the following data is written to the storage when stream aggregation is enabled: diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 3812670213..836d21bd15 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -48,20 +49,20 @@ func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) ( if err != nil { return nil, fmt.Errorf("cannot load aggregators: %w", err) } - as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval) + data, err = envtemplate.ReplaceBytes(data) + if err != nil { + return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err) + } + + as, err := newAggregatorsFromData(data, pushFunc, dedupInterval) if err != nil { return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) } + return as, nil } -// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data. -// -// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, -// e.g. only the last sample per each time series per each dedupInterval is aggregated. -// -// The returned Aggregators must be stopped with MustStop() when no longer needed. -func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { +func newAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 33924c8e21..20f2d43f00 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -20,7 +20,7 @@ func TestAggregatorsFailure(t *testing.T) { pushFunc := func(tss []prompbmarshal.TimeSeries) { panic(fmt.Errorf("pushFunc shouldn't be called")) } - a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) + a, err := newAggregatorsFromData([]byte(config), pushFunc, 0) if err == nil { t.Fatalf("expecting non-nil error") } @@ -124,11 +124,11 @@ func TestAggregatorsEqual(t *testing.T) { t.Helper() pushFunc := func(tss []prompbmarshal.TimeSeries) {} - aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0) + aa, err := newAggregatorsFromData([]byte(a), pushFunc, 0) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } - ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0) + ab, err := newAggregatorsFromData([]byte(b), pushFunc, 0) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -177,7 +177,7 @@ func TestAggregatorsSuccess(t *testing.T) { } tssOutputLock.Unlock() } - a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) + a, err := newAggregatorsFromData([]byte(config), pushFunc, 0) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -737,7 +737,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { tssOutputLock.Unlock() } const dedupInterval = time.Hour - a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval) + a, err := newAggregatorsFromData([]byte(config), pushFunc, dedupInterval) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index 51f95455b0..f69d2aa17a 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -44,7 +44,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { panic(fmt.Errorf("pushFunc is expected to be called exactly once at MustStop")) } } - a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) + a, err := newAggregatorsFromData([]byte(config), pushFunc, 0) if err != nil { b.Fatalf("unexpected error when initializing aggregators: %s", err) }