lib/streamaggr: expand %{ENV} placeholders in stream aggregation configs

This commit is contained in:
Aliaksandr Valialkin 2024-01-24 12:31:27 +02:00
parent 12698b9136
commit e6e5b97e1e
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
5 changed files with 17 additions and 14 deletions

View File

@ -36,6 +36,7 @@ The sandbox cluster installation is running under the constant load generated by
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [DataDog v2 data ingestion protocol](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4451).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters per each `-remoteWrite.url` via the command-line flag `-remoteWrite.oauth2.endpointParams`. See [these docs](https://docs.victoriametrics.com/vmagent.html#advanced-usage). Thanks to @mhill-holoplot for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5427).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to set `attach_metadata.node=true` option for all the [`kubernetes_sd_configs`](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) defined at [`-promscrape.config`](https://docs.victoriametrics.com/vmagent.html#quick-start) via `-promscrape.kubernetes.attachNodeMetadataAll` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4640). Thanks to @wasim-nihal for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5593). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to set `attach_metadata.node=true` option for all the [`kubernetes_sd_configs`](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs) defined at [`-promscrape.config`](https://docs.victoriametrics.com/vmagent.html#quick-start) via `-promscrape.kubernetes.attachNodeMetadataAll` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4640). Thanks to @wasim-nihal for [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5593).
* FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation.html): expand `%{ENV_VAR}` placeholders in config files with the corresponding environment variable values.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags: * FEATURE: [vmalert](https://docs.victoriametrics.com/vmagent.html): expose ability to set OAuth2 endpoint parameters via the following command-line flags:
- `-datasource.oauth2.endpointParams` for `-datasource.url` - `-datasource.oauth2.endpointParams` for `-datasource.url`
- `-notifier.oauth2.endpointParams` for `-notifier.url` - `-notifier.oauth2.endpointParams` for `-notifier.url`

View File

@ -29,6 +29,7 @@ Stream aggregation is configured via the following command-line flags:
- `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). - `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
These flags must point to a file containing [stream aggregation config](#stream-aggregation-config). These flags must point to a file containing [stream aggregation config](#stream-aggregation-config).
The file may contain `%{ENV_VAR}` placeholders which are substituted by the corresponding `ENV_VAR` environment variable values.
By default, the following data is written to the storage when stream aggregation is enabled: By default, the following data is written to the storage when stream aggregation is enabled:

View File

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -48,20 +49,20 @@ func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot load aggregators: %w", err) return nil, fmt.Errorf("cannot load aggregators: %w", err)
} }
as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval) data, err = envtemplate.ReplaceBytes(data)
if err != nil {
return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err)
}
as, err := newAggregatorsFromData(data, pushFunc, dedupInterval)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
} }
return as, nil return as, nil
} }
// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data. func newAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
//
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
var cfgs []*Config var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)

View File

@ -20,7 +20,7 @@ func TestAggregatorsFailure(t *testing.T) {
pushFunc := func(tss []prompbmarshal.TimeSeries) { pushFunc := func(tss []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("pushFunc shouldn't be called")) panic(fmt.Errorf("pushFunc shouldn't be called"))
} }
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) a, err := newAggregatorsFromData([]byte(config), pushFunc, 0)
if err == nil { if err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
} }
@ -124,11 +124,11 @@ func TestAggregatorsEqual(t *testing.T) {
t.Helper() t.Helper()
pushFunc := func(tss []prompbmarshal.TimeSeries) {} pushFunc := func(tss []prompbmarshal.TimeSeries) {}
aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0) aa, err := newAggregatorsFromData([]byte(a), pushFunc, 0)
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0) ab, err := newAggregatorsFromData([]byte(b), pushFunc, 0)
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
@ -177,7 +177,7 @@ func TestAggregatorsSuccess(t *testing.T) {
} }
tssOutputLock.Unlock() tssOutputLock.Unlock()
} }
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) a, err := newAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
@ -737,7 +737,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
tssOutputLock.Unlock() tssOutputLock.Unlock()
} }
const dedupInterval = time.Hour const dedupInterval = time.Hour
a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval) a, err := newAggregatorsFromData([]byte(config), pushFunc, dedupInterval)
if err != nil { if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }

View File

@ -44,7 +44,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
panic(fmt.Errorf("pushFunc is expected to be called exactly once at MustStop")) panic(fmt.Errorf("pushFunc is expected to be called exactly once at MustStop"))
} }
} }
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) a, err := newAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil { if err != nil {
b.Fatalf("unexpected error when initializing aggregators: %s", err) b.Fatalf("unexpected error when initializing aggregators: %s", err)
} }