mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
added hot reload support for stream aggregation configs (#3969) Signed-off-by: Alexander Marshalov <_@marshalov.org>
This commit is contained in:
parent
7c70fb0fb9
commit
8c14d17694
@ -68,7 +68,7 @@ var (
|
||||
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
|
||||
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
|
||||
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
|
||||
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+
|
||||
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+
|
||||
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag")
|
||||
)
|
||||
|
||||
@ -109,6 +109,9 @@ func main() {
|
||||
if err := promscrape.CheckConfig(); err != nil {
|
||||
logger.Fatalf("error when checking -promscrape.config: %s", err)
|
||||
}
|
||||
if err := remotewrite.CheckStreamAggConfigs(); err != nil {
|
||||
logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err)
|
||||
}
|
||||
logger.Infof("all the configs are ok; exiting with 0 status code")
|
||||
return
|
||||
}
|
||||
|
@ -65,15 +65,6 @@ var (
|
||||
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter")
|
||||
maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
|
||||
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter")
|
||||
|
||||
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
|
||||
"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
|
||||
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
|
||||
"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html")
|
||||
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
|
||||
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
|
||||
)
|
||||
|
||||
var (
|
||||
@ -96,6 +87,9 @@ func MultitenancyEnabled() bool {
|
||||
// Contains the current relabelConfigs.
|
||||
var allRelabelConfigs atomic.Value
|
||||
|
||||
// Contains the loader for stream aggregation configs.
|
||||
var saCfgLoader *saConfigsLoader
|
||||
|
||||
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
|
||||
// since it may lead to high memory usage due to big number of buffers.
|
||||
var maxQueues = cgroup.AvailableCPUs() * 16
|
||||
@ -159,8 +153,13 @@ func Init() {
|
||||
}
|
||||
allRelabelConfigs.Store(rcs)
|
||||
|
||||
configSuccess.Set(1)
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
relabelConfigSuccess.Set(1)
|
||||
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
|
||||
|
||||
saCfgLoader, err = newSaConfigsLoader(*streamAggrConfig)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot load stream aggregation config: %s", err)
|
||||
}
|
||||
|
||||
if len(*remoteWriteURLs) > 0 {
|
||||
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
||||
@ -176,29 +175,48 @@ func Init() {
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
configReloads.Inc()
|
||||
relabelConfigReloads.Inc()
|
||||
logger.Infof("SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
|
||||
rcs, err := loadRelabelConfigs()
|
||||
if err != nil {
|
||||
configReloadErrors.Inc()
|
||||
configSuccess.Set(0)
|
||||
relabelConfigReloadErrors.Inc()
|
||||
relabelConfigSuccess.Set(0)
|
||||
logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
allRelabelConfigs.Store(rcs)
|
||||
configSuccess.Set(1)
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
relabelConfigSuccess.Set(1)
|
||||
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
|
||||
logger.Infof("Successfully reloaded relabel configs")
|
||||
|
||||
logger.Infof("reloading stream agg configs pointed by -remoteWrite.streamAggr.config")
|
||||
err = saCfgLoader.reloadConfigs()
|
||||
if err != nil {
|
||||
logger.Errorf("Cannot reload stream aggregation configs: %s", err)
|
||||
}
|
||||
if len(*remoteWriteMultitenantURLs) > 0 {
|
||||
rwctxsMapLock.Lock()
|
||||
for _, rwctxs := range rwctxsMap {
|
||||
for _, rwctx := range rwctxs {
|
||||
rwctx.reinitStreamAggr()
|
||||
}
|
||||
}
|
||||
rwctxsMapLock.Unlock()
|
||||
} else {
|
||||
for _, rwctx := range rwctxsDefault {
|
||||
rwctx.reinitStreamAggr()
|
||||
}
|
||||
}
|
||||
logger.Infof("Successfully reloaded stream aggregation configs")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var (
|
||||
configReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`)
|
||||
configReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`)
|
||||
configSuccess = metrics.NewCounter(`vmagent_relabel_config_last_reload_successful`)
|
||||
configTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
|
||||
relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`)
|
||||
relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`)
|
||||
relabelConfigSuccess = metrics.NewCounter(`vmagent_relabel_config_last_reload_successful`)
|
||||
relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
|
||||
)
|
||||
|
||||
func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
|
||||
@ -489,6 +507,7 @@ type remoteWriteCtx struct {
|
||||
c *client
|
||||
|
||||
sas *streamaggr.Aggregators
|
||||
saHash uint64
|
||||
streamAggrKeepInput bool
|
||||
|
||||
pss []*pendingSeries
|
||||
@ -548,14 +567,16 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
|
||||
}
|
||||
|
||||
// Initialize sas
|
||||
saCfg, saHash := saCfgLoader.getCurrentConfig(argIdx)
|
||||
if len(saCfg) > 0 {
|
||||
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
|
||||
if sasFile != "" {
|
||||
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0)
|
||||
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval)
|
||||
sas, err := streamaggr.NewAggregators(saCfg, rwctx.pushInternal, dedupInterval)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err)
|
||||
}
|
||||
rwctx.sas = sas
|
||||
rwctx.saHash = saHash
|
||||
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
|
||||
}
|
||||
|
||||
@ -623,6 +644,20 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
|
||||
pss[idx].Push(tss)
|
||||
}
|
||||
|
||||
func (rwctx *remoteWriteCtx) reinitStreamAggr() {
|
||||
if rwctx.sas == nil {
|
||||
return
|
||||
}
|
||||
saCfg, saHash := saCfgLoader.getCurrentConfig(rwctx.idx)
|
||||
if rwctx.saHash == saHash {
|
||||
return
|
||||
}
|
||||
if err := rwctx.sas.ReInitConfigs(saCfg); err != nil {
|
||||
logger.Errorf("Cannot apply stream aggregation configs %d: %s", rwctx.idx, err)
|
||||
}
|
||||
rwctx.saHash = saHash
|
||||
}
|
||||
|
||||
var tssRelabelPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
a := []prompbmarshal.TimeSeries{}
|
||||
|
118
app/vmagent/remotewrite/streamagg.go
Normal file
118
app/vmagent/remotewrite/streamagg.go
Normal file
@ -0,0 +1,118 @@
|
||||
package remotewrite
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
|
||||
"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
|
||||
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
|
||||
"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html")
|
||||
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
|
||||
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
|
||||
)
|
||||
|
||||
var (
|
||||
saCfgReloads = metrics.NewCounter(`vmagent_streamaggr_config_reloads_total`)
|
||||
saCfgReloadErr = metrics.NewCounter(`vmagent_streamaggr_config_reloads_errors_total`)
|
||||
saCfgSuccess = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_successful`)
|
||||
saCfgTimestamp = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_success_timestamp_seconds`)
|
||||
)
|
||||
|
||||
// saConfigRules - type alias for unmarshalled stream aggregation config
|
||||
type saConfigRules = []*streamaggr.Config
|
||||
|
||||
// saConfigsLoader loads stream aggregation configs from the given files.
|
||||
type saConfigsLoader struct {
|
||||
files []string
|
||||
configs atomic.Pointer[[]saConfig]
|
||||
}
|
||||
|
||||
// newSaConfigsLoader creates new saConfigsLoader for the given config files.
|
||||
func newSaConfigsLoader(configFiles []string) (*saConfigsLoader, error) {
|
||||
result := &saConfigsLoader{
|
||||
files: configFiles,
|
||||
}
|
||||
// Initial load of configs.
|
||||
if err := result.reloadConfigs(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// reloadConfigs reloads stream aggregation configs from the files given in constructor.
|
||||
func (r *saConfigsLoader) reloadConfigs() error {
|
||||
// Increment reloads counter if it is not the initial load.
|
||||
if r.configs.Load() != nil {
|
||||
saCfgReloads.Inc()
|
||||
}
|
||||
|
||||
// Load all configs from files.
|
||||
var configs = make([]saConfig, len(r.files))
|
||||
for i, path := range r.files {
|
||||
if len(path) == 0 {
|
||||
// Skip empty stream aggregation config.
|
||||
continue
|
||||
}
|
||||
rules, hash, err := streamaggr.LoadConfigsFromFile(path)
|
||||
if err != nil {
|
||||
saCfgSuccess.Set(0)
|
||||
saCfgReloadErr.Inc()
|
||||
return fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err)
|
||||
}
|
||||
configs[i] = saConfig{
|
||||
path: path,
|
||||
hash: hash,
|
||||
rules: rules,
|
||||
}
|
||||
}
|
||||
|
||||
// Update configs.
|
||||
r.configs.Store(&configs)
|
||||
|
||||
saCfgSuccess.Set(1)
|
||||
saCfgTimestamp.Set(fasttime.UnixTimestamp())
|
||||
return nil
|
||||
}
|
||||
|
||||
// getCurrentConfig returns the current stream aggregation config with the given idx.
|
||||
func (r *saConfigsLoader) getCurrentConfig(idx int) (saConfigRules, uint64) {
|
||||
all := r.configs.Load()
|
||||
if all == nil {
|
||||
return nil, 0
|
||||
}
|
||||
cfgs := *all
|
||||
if len(cfgs) == 0 {
|
||||
return nil, 0
|
||||
}
|
||||
if idx >= len(cfgs) {
|
||||
if len(cfgs) == 1 {
|
||||
cfg := cfgs[0]
|
||||
return cfg.rules, cfg.hash
|
||||
}
|
||||
return nil, 0
|
||||
}
|
||||
cfg := cfgs[idx]
|
||||
return cfg.rules, cfg.hash
|
||||
}
|
||||
|
||||
type saConfig struct {
|
||||
path string
|
||||
hash uint64
|
||||
rules saConfigRules
|
||||
}
|
||||
|
||||
// CheckStreamAggConfigs checks -remoteWrite.streamAggr.config.
|
||||
func CheckStreamAggConfigs() error {
|
||||
_, err := newSaConfigsLoader(*streamAggrConfig)
|
||||
return err
|
||||
}
|
@ -26,6 +26,7 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for hot reload of stream aggregation configs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
|
||||
@ -135,19 +136,6 @@ Released at 2023-02-24
|
||||
* BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810).
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816).
|
||||
|
||||
## [v1.87.4](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.4)
|
||||
|
||||
Released at 2023-03-25
|
||||
|
||||
**v1.87.x is a line of LTS releases (e.g. long-time support). It contains important up-to-date bugfixes.
|
||||
The v1.87.x line will be supported for at least 12 months since [v1.87.0](https://docs.victoriametrics.com/CHANGELOG.html#v1870) release**
|
||||
|
||||
* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.
|
||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055).
|
||||
* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
|
||||
* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966).
|
||||
|
||||
## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3)
|
||||
|
||||
Released at 2023-03-12
|
||||
|
@ -545,3 +545,20 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
|
||||
|
||||
The file can contain multiple aggregation configs. The aggregation is performed independently
|
||||
per each specified config entry.
|
||||
|
||||
### Configuration update
|
||||
|
||||
[vmagent](https://docs.victoriametrics.com/vmagent.html) and
|
||||
[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) support two
|
||||
approaches for reloading stream aggregation configs from updated config files such as
|
||||
`-remoteWrite.streamAggr.config` and `-streamAggr.config` without restart.
|
||||
|
||||
* Sending `SIGHUP` signal to `vmagent` process:
|
||||
|
||||
```console
|
||||
kill -SIGHUP `pidof vmagent`
|
||||
```
|
||||
|
||||
* Sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload`).
|
||||
|
||||
It will reset the aggregation state only for changed rules in the configuration files.
|
||||
|
@ -108,7 +108,7 @@ additionally to pull-based Prometheus-compatible targets' scraping:
|
||||
|
||||
`vmagent` should be restarted in order to update config options set via command-line args.
|
||||
`vmagent` supports multiple approaches for reloading configs from updated config files such as
|
||||
`-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`:
|
||||
`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`:
|
||||
|
||||
* Sending `SIGHUP` signal to `vmagent` process:
|
||||
|
||||
|
@ -1,12 +1,14 @@
|
||||
package streamaggr
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
@ -17,6 +19,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
@ -36,22 +39,40 @@ var supportedOutputs = []string{
|
||||
"quantiles(phi1, ..., phiN)",
|
||||
}
|
||||
|
||||
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
|
||||
// ParseConfig loads array of stream aggregation configs from the given path.
|
||||
func ParseConfig(data []byte) ([]*Config, uint64, error) {
|
||||
var cfgs []*Config
|
||||
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
|
||||
return nil, 0, fmt.Errorf("cannot parse stream aggregation config %w", err)
|
||||
}
|
||||
return cfgs, xxhash.Sum64(data), nil
|
||||
}
|
||||
|
||||
// LoadConfigsFromFile loads array of stream aggregation configs from the given path.
|
||||
func LoadConfigsFromFile(path string) ([]*Config, uint64, error) {
|
||||
data, err := fs.ReadFileOrHTTP(path)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err)
|
||||
}
|
||||
return ParseConfig(data)
|
||||
}
|
||||
|
||||
// LoadAggregatorsFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
|
||||
//
|
||||
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
|
||||
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
|
||||
//
|
||||
// The returned Aggregators must be stopped with MustStop() when no longer needed.
|
||||
func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
|
||||
data, err := fs.ReadFileOrHTTP(path)
|
||||
func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, uint64, error) {
|
||||
cfgs, configHash, err := LoadConfigsFromFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot load aggregators: %w", err)
|
||||
return nil, 0, fmt.Errorf("cannot load stream aggregation config: %w", err)
|
||||
}
|
||||
as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval)
|
||||
as, err := NewAggregators(cfgs, pushFunc, dedupInterval)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
|
||||
return nil, 0, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
|
||||
}
|
||||
return as, nil
|
||||
return as, configHash, nil
|
||||
}
|
||||
|
||||
// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.
|
||||
@ -127,9 +148,22 @@ type Config struct {
|
||||
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
|
||||
}
|
||||
|
||||
func (cfg *Config) hash() (uint64, error) {
|
||||
if cfg == nil {
|
||||
return 0, nil
|
||||
}
|
||||
data, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("cannot marshal stream aggregation rule %+v: %w", cfg, err)
|
||||
}
|
||||
return xxhash.Sum64(data), nil
|
||||
}
|
||||
|
||||
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
|
||||
type Aggregators struct {
|
||||
as []*aggregator
|
||||
as atomic.Pointer[[]*aggregator]
|
||||
pushFunc PushFunc
|
||||
dedupInterval time.Duration
|
||||
}
|
||||
|
||||
// NewAggregators creates Aggregators from the given cfgs.
|
||||
@ -152,9 +186,13 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati
|
||||
}
|
||||
as[i] = a
|
||||
}
|
||||
return &Aggregators{
|
||||
as: as,
|
||||
}, nil
|
||||
result := &Aggregators{
|
||||
pushFunc: pushFunc,
|
||||
dedupInterval: dedupInterval,
|
||||
}
|
||||
result.as.Store(&as)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// MustStop stops a.
|
||||
@ -162,7 +200,7 @@ func (a *Aggregators) MustStop() {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
for _, aggr := range a.as {
|
||||
for _, aggr := range *a.as.Load() {
|
||||
aggr.MustStop()
|
||||
}
|
||||
}
|
||||
@ -172,11 +210,74 @@ func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
for _, aggr := range a.as {
|
||||
for _, aggr := range *a.as.Load() {
|
||||
aggr.Push(tss)
|
||||
}
|
||||
}
|
||||
|
||||
// ReInitConfigs reinits state of Aggregators a with the given new stream aggregation config
|
||||
func (a *Aggregators) ReInitConfigs(cfgs []*Config) error {
|
||||
if a == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
keys := make(map[uint64]struct{}) // set of all keys (configs and aggregators)
|
||||
cfgsMap := make(map[uint64]*Config) // map of config keys to their indices in cfgs
|
||||
aggrsMap := make(map[uint64]*aggregator) // map of aggregator keys to their indices in a.as
|
||||
|
||||
for _, cfg := range cfgs {
|
||||
key, err := cfg.hash()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to calculate hash for config '%+v': %w", cfg, err)
|
||||
}
|
||||
keys[key] = struct{}{}
|
||||
cfgsMap[key] = cfg
|
||||
}
|
||||
for _, aggr := range *a.as.Load() {
|
||||
keys[aggr.cfgHash] = struct{}{}
|
||||
aggrsMap[aggr.cfgHash] = aggr
|
||||
}
|
||||
|
||||
asNew := make([]*aggregator, 0, len(aggrsMap))
|
||||
asDel := make([]*aggregator, 0, len(aggrsMap))
|
||||
for key := range keys {
|
||||
cfg, hasCfg := cfgsMap[key]
|
||||
agg, hasAggr := aggrsMap[key]
|
||||
|
||||
// if config for aggregator was changed or removed
|
||||
// then we need to stop aggregator and remove it
|
||||
if !hasCfg && hasAggr {
|
||||
asDel = append(asDel, agg)
|
||||
continue
|
||||
}
|
||||
|
||||
// if there is no aggregator for config (new config),
|
||||
// then we need to create it
|
||||
if hasCfg && !hasAggr {
|
||||
newAgg, err := newAggregator(cfg, a.pushFunc, a.dedupInterval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot initialize aggregator for config '%+v': %w", cfg, err)
|
||||
}
|
||||
asNew = append(asNew, newAgg)
|
||||
continue
|
||||
}
|
||||
|
||||
// if aggregator config was not changed, then we can just keep it
|
||||
if hasCfg && hasAggr {
|
||||
asNew = append(asNew, agg)
|
||||
}
|
||||
}
|
||||
|
||||
// Atomically replace aggregators array.
|
||||
a.as.Store(&asNew)
|
||||
// and stop old aggregators
|
||||
for _, aggr := range asDel {
|
||||
aggr.MustStop()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// aggregator aggregates input series according to the config passed to NewAggregator
|
||||
type aggregator struct {
|
||||
match *promrelabel.IfExpression
|
||||
@ -194,6 +295,7 @@ type aggregator struct {
|
||||
|
||||
// aggrStates contains aggregate states for the given outputs
|
||||
aggrStates []aggrState
|
||||
hasState atomic.Bool
|
||||
|
||||
pushFunc PushFunc
|
||||
|
||||
@ -203,6 +305,7 @@ type aggregator struct {
|
||||
// For example, foo_bar metric name is transformed to foo_bar:1m_by_job
|
||||
// for `interval: 1m`, `by: [job]`
|
||||
suffix string
|
||||
cfgHash uint64
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
@ -330,6 +433,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
|
||||
dedupAggr = newLastAggrState()
|
||||
}
|
||||
|
||||
cfgHash, err := cfg.hash()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot calculate config hash for config %+v: %w", cfg, err)
|
||||
}
|
||||
|
||||
// initialize the aggregator
|
||||
a := &aggregator{
|
||||
match: cfg.Match,
|
||||
@ -346,6 +454,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
|
||||
pushFunc: pushFunc,
|
||||
|
||||
suffix: suffix,
|
||||
cfgHash: cfgHash,
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
@ -411,8 +520,9 @@ func (a *aggregator) dedupFlush() {
|
||||
skipAggrSuffix: true,
|
||||
}
|
||||
a.dedupAggr.appendSeriesForFlush(ctx)
|
||||
logger.Errorf("series after dedup: %v", ctx.tss)
|
||||
a.push(ctx.tss)
|
||||
|
||||
a.hasState.Store(false)
|
||||
}
|
||||
|
||||
func (a *aggregator) flush() {
|
||||
@ -442,6 +552,8 @@ func (a *aggregator) flush() {
|
||||
// Push the output metrics.
|
||||
a.pushFunc(tss)
|
||||
}
|
||||
|
||||
a.hasState.Store(false)
|
||||
}
|
||||
|
||||
// MustStop stops the aggregator.
|
||||
@ -449,11 +561,26 @@ func (a *aggregator) flush() {
|
||||
// The aggregator stops pushing the aggregated metrics after this call.
|
||||
func (a *aggregator) MustStop() {
|
||||
close(a.stopCh)
|
||||
|
||||
if a.hasState.Load() {
|
||||
if a.dedupAggr != nil {
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
a.dedupFlush()
|
||||
<-flushConcurrencyCh
|
||||
}
|
||||
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
a.flush()
|
||||
<-flushConcurrencyCh
|
||||
}
|
||||
|
||||
a.wg.Wait()
|
||||
}
|
||||
|
||||
// Push pushes tss to a.
|
||||
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
|
||||
a.hasState.Store(true)
|
||||
|
||||
if a.dedupAggr == nil {
|
||||
a.push(tss)
|
||||
return
|
||||
|
@ -146,7 +146,7 @@ func TestAggregatorsSuccess(t *testing.T) {
|
||||
tssInput := mustParsePromMetrics(inputMetrics)
|
||||
a.Push(tssInput)
|
||||
if a != nil {
|
||||
for _, aggr := range a.as {
|
||||
for _, aggr := range *a.as.Load() {
|
||||
aggr.flush()
|
||||
}
|
||||
}
|
||||
@ -671,7 +671,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
|
||||
tssInput := mustParsePromMetrics(inputMetrics)
|
||||
a.Push(tssInput)
|
||||
if a != nil {
|
||||
for _, aggr := range a.as {
|
||||
for _, aggr := range *a.as.Load() {
|
||||
aggr.dedupFlush()
|
||||
aggr.flush()
|
||||
}
|
||||
@ -719,6 +719,106 @@ foo:1m_sum_samples{baz="qwe"} 10
|
||||
`)
|
||||
}
|
||||
|
||||
func TestAggregatorsReinit(t *testing.T) {
|
||||
f := func(config, newConfig, inputMetrics, outputMetricsExpected string) {
|
||||
t.Helper()
|
||||
|
||||
// Initialize Aggregators
|
||||
var tssOutput []prompbmarshal.TimeSeries
|
||||
var tssOutputLock sync.Mutex
|
||||
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
||||
tssOutputLock.Lock()
|
||||
for _, ts := range tss {
|
||||
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
|
||||
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
|
||||
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
|
||||
Labels: labelsCopy,
|
||||
Samples: samplesCopy,
|
||||
})
|
||||
}
|
||||
tssOutputLock.Unlock()
|
||||
}
|
||||
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
|
||||
// Push the inputMetrics to Aggregators
|
||||
tssInput := mustParsePromMetrics(inputMetrics)
|
||||
a.Push(tssInput)
|
||||
|
||||
// Reinitialize Aggregators
|
||||
nc, _, err := ParseConfig([]byte(newConfig))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse new config: %s", err)
|
||||
}
|
||||
err = a.ReInitConfigs(nc)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot reinit aggregators: %s", err)
|
||||
}
|
||||
|
||||
// Push the inputMetrics to Aggregators
|
||||
a.Push(tssInput)
|
||||
if a != nil {
|
||||
for _, aggr := range *a.as.Load() {
|
||||
aggr.flush()
|
||||
}
|
||||
}
|
||||
|
||||
a.MustStop()
|
||||
|
||||
// Verify the tssOutput contains the expected metrics
|
||||
tsStrings := make([]string, len(tssOutput))
|
||||
for i, ts := range tssOutput {
|
||||
tsStrings[i] = timeSeriesToString(ts)
|
||||
}
|
||||
sort.Strings(tsStrings)
|
||||
outputMetrics := strings.Join(tsStrings, "")
|
||||
if outputMetrics != outputMetricsExpected {
|
||||
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [count_samples]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [sum_samples]
|
||||
`, `
|
||||
foo 123
|
||||
bar 567
|
||||
foo 234
|
||||
`, `bar:1m_count_samples 1
|
||||
bar:1m_sum_samples 567
|
||||
foo:1m_count_samples 2
|
||||
foo:1m_sum_samples 357
|
||||
`)
|
||||
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [total]
|
||||
- interval: 2m
|
||||
outputs: [count_samples]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [sum_samples]
|
||||
- interval: 2m
|
||||
outputs: [count_samples]
|
||||
`, `
|
||||
foo 123
|
||||
bar 567
|
||||
foo 234
|
||||
`, `bar:1m_sum_samples 567
|
||||
bar:1m_total 0
|
||||
bar:2m_count_samples 2
|
||||
foo:1m_sum_samples 357
|
||||
foo:1m_total 111
|
||||
foo:2m_count_samples 4
|
||||
`)
|
||||
}
|
||||
|
||||
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
|
||||
labelsString := promrelabel.LabelsToString(ts.Labels)
|
||||
if len(ts.Samples) != 1 {
|
||||
|
Loading…
Reference in New Issue
Block a user