diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 9c4a6adda1..7c74869355 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -68,7 +68,7 @@ var ( "at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg") dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+ - "-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+ + "-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+ "Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag") ) @@ -109,6 +109,9 @@ func main() { if err := promscrape.CheckConfig(); err != nil { logger.Fatalf("error when checking -promscrape.config: %s", err) } + if err := remotewrite.CheckStreamAggConfigs(); err != nil { + logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err) + } logger.Infof("all the configs are ok; exiting with 0 status code") return } diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 1ffa377b07..84a662d45c 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -65,15 +65,6 @@ var ( "Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+ "Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") - - streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html . "+ - "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") - streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ - "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html") - streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ - "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") ) var ( @@ -96,6 +87,9 @@ func MultitenancyEnabled() bool { // Contains the current relabelConfigs. var allRelabelConfigs atomic.Value +// Contains the loader for stream aggregation configs. +var saCfgLoader *saConfigsLoader + // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value, // since it may lead to high memory usage due to big number of buffers. var maxQueues = cgroup.AvailableCPUs() * 16 @@ -159,8 +153,13 @@ func Init() { } allRelabelConfigs.Store(rcs) - configSuccess.Set(1) - configTimestamp.Set(fasttime.UnixTimestamp()) + relabelConfigSuccess.Set(1) + relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) + + saCfgLoader, err = newSaConfigsLoader(*streamAggrConfig) + if err != nil { + logger.Fatalf("cannot load stream aggregation config: %s", err) + } if len(*remoteWriteURLs) > 0 { rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs) @@ -176,29 +175,48 @@ func Init() { case <-stopCh: return } - configReloads.Inc() + relabelConfigReloads.Inc() logger.Infof("SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig") rcs, err := loadRelabelConfigs() if err != nil { - configReloadErrors.Inc() - configSuccess.Set(0) + relabelConfigReloadErrors.Inc() + relabelConfigSuccess.Set(0) logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err) continue } - allRelabelConfigs.Store(rcs) - configSuccess.Set(1) - configTimestamp.Set(fasttime.UnixTimestamp()) + relabelConfigSuccess.Set(1) + relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) logger.Infof("Successfully reloaded relabel configs") + + logger.Infof("reloading stream agg configs pointed by -remoteWrite.streamAggr.config") + err = saCfgLoader.reloadConfigs() + if err != nil { + logger.Errorf("Cannot reload stream aggregation configs: %s", err) + } + if len(*remoteWriteMultitenantURLs) > 0 { + rwctxsMapLock.Lock() + for _, rwctxs := range rwctxsMap { + for _, rwctx := range rwctxs { + rwctx.reinitStreamAggr() + } + } + rwctxsMapLock.Unlock() + } else { + for _, rwctx := range rwctxsDefault { + rwctx.reinitStreamAggr() + } + } + logger.Infof("Successfully reloaded stream aggregation configs") } }() } var ( - configReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`) - configReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`) - configSuccess = metrics.NewCounter(`vmagent_relabel_config_last_reload_successful`) - configTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`) + relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`) + relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`) + relabelConfigSuccess = metrics.NewCounter(`vmagent_relabel_config_last_reload_successful`) + relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`) ) func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { @@ -489,6 +507,7 @@ type remoteWriteCtx struct { c *client sas *streamaggr.Aggregators + saHash uint64 streamAggrKeepInput bool pss []*pendingSeries @@ -548,14 +567,16 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI } // Initialize sas - sasFile := streamAggrConfig.GetOptionalArg(argIdx) - if sasFile != "" { + saCfg, saHash := saCfgLoader.getCurrentConfig(argIdx) + if len(saCfg) > 0 { + sasFile := streamAggrConfig.GetOptionalArg(argIdx) dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0) - sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) + sas, err := streamaggr.NewAggregators(saCfg, rwctx.pushInternal, dedupInterval) if err != nil { logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err) } rwctx.sas = sas + rwctx.saHash = saHash rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx) } @@ -623,6 +644,20 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { pss[idx].Push(tss) } +func (rwctx *remoteWriteCtx) reinitStreamAggr() { + if rwctx.sas == nil { + return + } + saCfg, saHash := saCfgLoader.getCurrentConfig(rwctx.idx) + if rwctx.saHash == saHash { + return + } + if err := rwctx.sas.ReInitConfigs(saCfg); err != nil { + logger.Errorf("Cannot apply stream aggregation configs %d: %s", rwctx.idx, err) + } + rwctx.saHash = saHash +} + var tssRelabelPool = &sync.Pool{ New: func() interface{} { a := []prompbmarshal.TimeSeries{} diff --git a/app/vmagent/remotewrite/streamagg.go b/app/vmagent/remotewrite/streamagg.go new file mode 100644 index 0000000000..b56091f531 --- /dev/null +++ b/app/vmagent/remotewrite/streamagg.go @@ -0,0 +1,118 @@ +package remotewrite + +import ( + "fmt" + "sync/atomic" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" + "github.com/VictoriaMetrics/metrics" +) + +var ( + streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html . "+ + "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") + streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ + "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ + "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") +) + +var ( + saCfgReloads = metrics.NewCounter(`vmagent_streamaggr_config_reloads_total`) + saCfgReloadErr = metrics.NewCounter(`vmagent_streamaggr_config_reloads_errors_total`) + saCfgSuccess = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_successful`) + saCfgTimestamp = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_success_timestamp_seconds`) +) + +// saConfigRules - type alias for unmarshalled stream aggregation config +type saConfigRules = []*streamaggr.Config + +// saConfigsLoader loads stream aggregation configs from the given files. +type saConfigsLoader struct { + files []string + configs atomic.Pointer[[]saConfig] +} + +// newSaConfigsLoader creates new saConfigsLoader for the given config files. +func newSaConfigsLoader(configFiles []string) (*saConfigsLoader, error) { + result := &saConfigsLoader{ + files: configFiles, + } + // Initial load of configs. + if err := result.reloadConfigs(); err != nil { + return nil, err + } + return result, nil +} + +// reloadConfigs reloads stream aggregation configs from the files given in constructor. +func (r *saConfigsLoader) reloadConfigs() error { + // Increment reloads counter if it is not the initial load. + if r.configs.Load() != nil { + saCfgReloads.Inc() + } + + // Load all configs from files. + var configs = make([]saConfig, len(r.files)) + for i, path := range r.files { + if len(path) == 0 { + // Skip empty stream aggregation config. + continue + } + rules, hash, err := streamaggr.LoadConfigsFromFile(path) + if err != nil { + saCfgSuccess.Set(0) + saCfgReloadErr.Inc() + return fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err) + } + configs[i] = saConfig{ + path: path, + hash: hash, + rules: rules, + } + } + + // Update configs. + r.configs.Store(&configs) + + saCfgSuccess.Set(1) + saCfgTimestamp.Set(fasttime.UnixTimestamp()) + return nil +} + +// getCurrentConfig returns the current stream aggregation config with the given idx. +func (r *saConfigsLoader) getCurrentConfig(idx int) (saConfigRules, uint64) { + all := r.configs.Load() + if all == nil { + return nil, 0 + } + cfgs := *all + if len(cfgs) == 0 { + return nil, 0 + } + if idx >= len(cfgs) { + if len(cfgs) == 1 { + cfg := cfgs[0] + return cfg.rules, cfg.hash + } + return nil, 0 + } + cfg := cfgs[idx] + return cfg.rules, cfg.hash +} + +type saConfig struct { + path string + hash uint64 + rules saConfigRules +} + +// CheckStreamAggConfigs checks -remoteWrite.streamAggr.config. +func CheckStreamAggConfigs() error { + _, err := newSaConfigsLoader(*streamAggrConfig) + return err +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9f60f91003..301a6dc521 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -26,6 +26,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for hot reload of stream aggregation configs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). @@ -135,19 +136,6 @@ Released at 2023-02-24 * BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816). -## [v1.87.4](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.4) - -Released at 2023-03-25 - -**v1.87.x is a line of LTS releases (e.g. long-time support). It contains important up-to-date bugfixes. -The v1.87.x line will be supported for at least 12 months since [v1.87.0](https://docs.victoriametrics.com/CHANGELOG.html#v1870) release** - -* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). -* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error. -* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055). -* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999). -* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966). - ## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3) Released at 2023-03-12 diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 4eb6c86a49..df983c6ba9 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -545,3 +545,20 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- The file can contain multiple aggregation configs. The aggregation is performed independently per each specified config entry. + +### Configuration update + +[vmagent](https://docs.victoriametrics.com/vmagent.html) and +[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) support two +approaches for reloading stream aggregation configs from updated config files such as +`-remoteWrite.streamAggr.config` and `-streamAggr.config` without restart. + +* Sending `SIGHUP` signal to `vmagent` process: + + ```console + kill -SIGHUP `pidof vmagent` + ``` + +* Sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload`). + +It will reset the aggregation state only for changed rules in the configuration files. diff --git a/docs/vmagent.md b/docs/vmagent.md index 5f425b3fa2..36ca3b697c 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -108,7 +108,7 @@ additionally to pull-based Prometheus-compatible targets' scraping: `vmagent` should be restarted in order to update config options set via command-line args. `vmagent` supports multiple approaches for reloading configs from updated config files such as -`-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`: +`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`: * Sending `SIGHUP` signal to `vmagent` process: diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 3e6199350b..c177544f30 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -1,12 +1,14 @@ package streamaggr import ( + "encoding/json" "fmt" "math" "sort" "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -17,6 +19,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/cespare/xxhash/v2" "gopkg.in/yaml.v2" ) @@ -36,22 +39,40 @@ var supportedOutputs = []string{ "quantiles(phi1, ..., phiN)", } -// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. +// ParseConfig loads array of stream aggregation configs from the given path. +func ParseConfig(data []byte) ([]*Config, uint64, error) { + var cfgs []*Config + if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { + return nil, 0, fmt.Errorf("cannot parse stream aggregation config %w", err) + } + return cfgs, xxhash.Sum64(data), nil +} + +// LoadConfigsFromFile loads array of stream aggregation configs from the given path. +func LoadConfigsFromFile(path string) ([]*Config, uint64, error) { + data, err := fs.ReadFileOrHTTP(path) + if err != nil { + return nil, 0, fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err) + } + return ParseConfig(data) +} + +// LoadAggregatorsFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // // If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, // e.g. only the last sample per each time series per each dedupInterval is aggregated. // // The returned Aggregators must be stopped with MustStop() when no longer needed. -func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { - data, err := fs.ReadFileOrHTTP(path) +func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, uint64, error) { + cfgs, configHash, err := LoadConfigsFromFile(path) if err != nil { - return nil, fmt.Errorf("cannot load aggregators: %w", err) + return nil, 0, fmt.Errorf("cannot load stream aggregation config: %w", err) } - as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval) + as, err := NewAggregators(cfgs, pushFunc, dedupInterval) if err != nil { - return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) + return nil, 0, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) } - return as, nil + return as, configHash, nil } // NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data. @@ -127,9 +148,22 @@ type Config struct { OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` } +func (cfg *Config) hash() (uint64, error) { + if cfg == nil { + return 0, nil + } + data, err := json.Marshal(cfg) + if err != nil { + return 0, fmt.Errorf("cannot marshal stream aggregation rule %+v: %w", cfg, err) + } + return xxhash.Sum64(data), nil +} + // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. type Aggregators struct { - as []*aggregator + as atomic.Pointer[[]*aggregator] + pushFunc PushFunc + dedupInterval time.Duration } // NewAggregators creates Aggregators from the given cfgs. @@ -152,9 +186,13 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati } as[i] = a } - return &Aggregators{ - as: as, - }, nil + result := &Aggregators{ + pushFunc: pushFunc, + dedupInterval: dedupInterval, + } + result.as.Store(&as) + + return result, nil } // MustStop stops a. @@ -162,7 +200,7 @@ func (a *Aggregators) MustStop() { if a == nil { return } - for _, aggr := range a.as { + for _, aggr := range *a.as.Load() { aggr.MustStop() } } @@ -172,11 +210,74 @@ func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) { if a == nil { return } - for _, aggr := range a.as { + for _, aggr := range *a.as.Load() { aggr.Push(tss) } } +// ReInitConfigs reinits state of Aggregators a with the given new stream aggregation config +func (a *Aggregators) ReInitConfigs(cfgs []*Config) error { + if a == nil { + return nil + } + + keys := make(map[uint64]struct{}) // set of all keys (configs and aggregators) + cfgsMap := make(map[uint64]*Config) // map of config keys to their indices in cfgs + aggrsMap := make(map[uint64]*aggregator) // map of aggregator keys to their indices in a.as + + for _, cfg := range cfgs { + key, err := cfg.hash() + if err != nil { + return fmt.Errorf("unable to calculate hash for config '%+v': %w", cfg, err) + } + keys[key] = struct{}{} + cfgsMap[key] = cfg + } + for _, aggr := range *a.as.Load() { + keys[aggr.cfgHash] = struct{}{} + aggrsMap[aggr.cfgHash] = aggr + } + + asNew := make([]*aggregator, 0, len(aggrsMap)) + asDel := make([]*aggregator, 0, len(aggrsMap)) + for key := range keys { + cfg, hasCfg := cfgsMap[key] + agg, hasAggr := aggrsMap[key] + + // if config for aggregator was changed or removed + // then we need to stop aggregator and remove it + if !hasCfg && hasAggr { + asDel = append(asDel, agg) + continue + } + + // if there is no aggregator for config (new config), + // then we need to create it + if hasCfg && !hasAggr { + newAgg, err := newAggregator(cfg, a.pushFunc, a.dedupInterval) + if err != nil { + return fmt.Errorf("cannot initialize aggregator for config '%+v': %w", cfg, err) + } + asNew = append(asNew, newAgg) + continue + } + + // if aggregator config was not changed, then we can just keep it + if hasCfg && hasAggr { + asNew = append(asNew, agg) + } + } + + // Atomically replace aggregators array. + a.as.Store(&asNew) + // and stop old aggregators + for _, aggr := range asDel { + aggr.MustStop() + } + + return nil +} + // aggregator aggregates input series according to the config passed to NewAggregator type aggregator struct { match *promrelabel.IfExpression @@ -194,6 +295,7 @@ type aggregator struct { // aggrStates contains aggregate states for the given outputs aggrStates []aggrState + hasState atomic.Bool pushFunc PushFunc @@ -202,7 +304,8 @@ type aggregator struct { // It contains the interval, labels in (by, without), plus output name. // For example, foo_bar metric name is transformed to foo_bar:1m_by_job // for `interval: 1m`, `by: [job]` - suffix string + suffix string + cfgHash uint64 wg sync.WaitGroup stopCh chan struct{} @@ -330,6 +433,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) dedupAggr = newLastAggrState() } + cfgHash, err := cfg.hash() + if err != nil { + return nil, fmt.Errorf("cannot calculate config hash for config %+v: %w", cfg, err) + } + // initialize the aggregator a := &aggregator{ match: cfg.Match, @@ -345,7 +453,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) aggrStates: aggrStates, pushFunc: pushFunc, - suffix: suffix, + suffix: suffix, + cfgHash: cfgHash, stopCh: make(chan struct{}), } @@ -411,8 +520,9 @@ func (a *aggregator) dedupFlush() { skipAggrSuffix: true, } a.dedupAggr.appendSeriesForFlush(ctx) - logger.Errorf("series after dedup: %v", ctx.tss) a.push(ctx.tss) + + a.hasState.Store(false) } func (a *aggregator) flush() { @@ -442,6 +552,8 @@ func (a *aggregator) flush() { // Push the output metrics. a.pushFunc(tss) } + + a.hasState.Store(false) } // MustStop stops the aggregator. @@ -449,11 +561,26 @@ func (a *aggregator) flush() { // The aggregator stops pushing the aggregated metrics after this call. func (a *aggregator) MustStop() { close(a.stopCh) + + if a.hasState.Load() { + if a.dedupAggr != nil { + flushConcurrencyCh <- struct{}{} + a.dedupFlush() + <-flushConcurrencyCh + } + + flushConcurrencyCh <- struct{}{} + a.flush() + <-flushConcurrencyCh + } + a.wg.Wait() } // Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { + a.hasState.Store(true) + if a.dedupAggr == nil { a.push(tss) return diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index a3c002c8d0..0968306962 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -146,7 +146,7 @@ func TestAggregatorsSuccess(t *testing.T) { tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) if a != nil { - for _, aggr := range a.as { + for _, aggr := range *a.as.Load() { aggr.flush() } } @@ -671,7 +671,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { tssInput := mustParsePromMetrics(inputMetrics) a.Push(tssInput) if a != nil { - for _, aggr := range a.as { + for _, aggr := range *a.as.Load() { aggr.dedupFlush() aggr.flush() } @@ -719,6 +719,106 @@ foo:1m_sum_samples{baz="qwe"} 10 `) } +func TestAggregatorsReinit(t *testing.T) { + f := func(config, newConfig, inputMetrics, outputMetricsExpected string) { + t.Helper() + + // Initialize Aggregators + var tssOutput []prompbmarshal.TimeSeries + var tssOutputLock sync.Mutex + pushFunc := func(tss []prompbmarshal.TimeSeries) { + tssOutputLock.Lock() + for _, ts := range tss { + labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) + samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) + tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ + Labels: labelsCopy, + Samples: samplesCopy, + }) + } + tssOutputLock.Unlock() + } + + a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) + if err != nil { + t.Fatalf("cannot initialize aggregators: %s", err) + } + + // Push the inputMetrics to Aggregators + tssInput := mustParsePromMetrics(inputMetrics) + a.Push(tssInput) + + // Reinitialize Aggregators + nc, _, err := ParseConfig([]byte(newConfig)) + if err != nil { + t.Fatalf("cannot parse new config: %s", err) + } + err = a.ReInitConfigs(nc) + if err != nil { + t.Fatalf("cannot reinit aggregators: %s", err) + } + + // Push the inputMetrics to Aggregators + a.Push(tssInput) + if a != nil { + for _, aggr := range *a.as.Load() { + aggr.flush() + } + } + + a.MustStop() + + // Verify the tssOutput contains the expected metrics + tsStrings := make([]string, len(tssOutput)) + for i, ts := range tssOutput { + tsStrings[i] = timeSeriesToString(ts) + } + sort.Strings(tsStrings) + outputMetrics := strings.Join(tsStrings, "") + if outputMetrics != outputMetricsExpected { + t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) + } + } + + f(` +- interval: 1m + outputs: [count_samples] +`, ` +- interval: 1m + outputs: [sum_samples] +`, ` +foo 123 +bar 567 +foo 234 +`, `bar:1m_count_samples 1 +bar:1m_sum_samples 567 +foo:1m_count_samples 2 +foo:1m_sum_samples 357 +`) + + f(` +- interval: 1m + outputs: [total] +- interval: 2m + outputs: [count_samples] +`, ` +- interval: 1m + outputs: [sum_samples] +- interval: 2m + outputs: [count_samples] +`, ` +foo 123 +bar 567 +foo 234 +`, `bar:1m_sum_samples 567 +bar:1m_total 0 +bar:2m_count_samples 2 +foo:1m_sum_samples 357 +foo:1m_total 111 +foo:2m_count_samples 4 +`) +} + func timeSeriesToString(ts prompbmarshal.TimeSeries) string { labelsString := promrelabel.LabelsToString(ts.Labels) if len(ts.Samples) != 1 {