lib/streamaggr: follow-up for ff72ca14b9

- Make sure that the last successfully loaded config is used on hot-reload failure
- Properly cleanup resources occupied by already initialized aggregators
  when the current aggregator fails to be initialized
- Expose distinct vmagent_streamaggr_config_reload* metrics per each -remoteWrite.streamAggr.config
  This should simplify monitoring and debugging failed reloads
- Remove race condition at app/vminsert/common.MustStopStreamAggr when calling sa.MustStop() while sa
  could be in use at realoadSaConfig()
- Remove lib/streamaggr.aggregator.hasState global variable, since it may negatively impact scalability
  on system with big number of CPU cores at hasState.Store(true) call inside aggregator.Push().
- Remove fine-grained aggregator reload - reload all the aggregators on config change instead.
  This simplifies the code a bit. The fine-grained aggregator reload may be returned back
  if there will be demand from real users for it.
- Check -relabelConfig and -streamAggr.config files when single-node VictoriaMetrics runs with -dryRun flag
- Return back accidentally removed changelog for v1.87.4 at docs/CHANGELOG.md

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639
This commit is contained in:
Aliaksandr Valialkin 2023-03-31 21:27:45 -07:00
parent 59c350d0d2
commit d577657fb7
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
17 changed files with 291 additions and 509 deletions

View File

@ -2193,7 +2193,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-dryRun
Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
-enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
-envflag.enable

View File

@ -8,6 +8,8 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
vminsertcommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
vminsertrelabel "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
@ -30,8 +32,9 @@ var (
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing")
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling")
dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag")
dryRun = flag.Bool("dryRun", false, "Whether to check config files without running VictoriaMetrics. The following config files are checked: "+
"-promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. "+
"This can be changed with -promscrape.config.strictParse=false command-line flag")
inmemoryDataFlushInterval = flag.Duration("inmemoryDataFlushInterval", 5*time.Second, "The interval for guaranteed saving of in-memory data to disk. "+
"The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. "+
"Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). "+
@ -54,6 +57,12 @@ func main() {
if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err)
}
if err := vminsertrelabel.CheckRelabelConfig(); err != nil {
logger.Fatalf("error when checking -relabelConfig: %s", err)
}
if err := vminsertcommon.CheckStreamAggrConfig(); err != nil {
logger.Fatalf("error when checking -streamAggr.config: %s", err)
}
logger.Infof("-promscrape.config is ok; exiting with 0 status code")
return
}

View File

@ -104,7 +104,7 @@ additionally to pull-based Prometheus-compatible targets' scraping:
`vmagent` should be restarted in order to update config options set via command-line args.
`vmagent` supports multiple approaches for reloading configs from updated config files such as
`-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`:
`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`:
* Sending `SIGHUP` signal to `vmagent` process:
@ -1186,7 +1186,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
-denyQueryTracing
Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing
-dryRun
Whether to check only config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag
Whether to check config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag
-enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
-envflag.enable

View File

@ -67,7 +67,7 @@ var (
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
dryRun = flag.Bool("dryRun", false, "Whether to check config files without running vmagent. The following files are checked: "+
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag")
)
@ -103,13 +103,13 @@ func main() {
return
}
if *dryRun {
if err := remotewrite.CheckRelabelConfigs(); err != nil {
logger.Fatalf("error when checking relabel configs: %s", err)
}
if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err)
}
if err := remotewrite.CheckStreamAggConfigs(); err != nil {
if err := remotewrite.CheckRelabelConfigs(); err != nil {
logger.Fatalf("error when checking relabel configs: %s", err)
}
if err := remotewrite.CheckStreamAggrConfigs(); err != nil {
logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err)
}
logger.Infof("all the configs are ok; exiting with 0 status code")

View File

@ -65,6 +65,15 @@ var (
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter")
maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter")
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
)
var (
@ -87,9 +96,6 @@ func MultitenancyEnabled() bool {
// Contains the current relabelConfigs.
var allRelabelConfigs atomic.Value
// Contains the loader for stream aggregation configs.
var saCfgLoader *saConfigsLoader
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers.
var maxQueues = cgroup.AvailableCPUs() * 16
@ -152,15 +158,9 @@ func Init() {
logger.Fatalf("cannot load relabel configs: %s", err)
}
allRelabelConfigs.Store(rcs)
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
saCfgLoader, err = newSaConfigsLoader(*streamAggrConfig)
if err != nil {
logger.Fatalf("cannot load stream aggregation config: %s", err)
}
if len(*remoteWriteURLs) > 0 {
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
}
@ -172,46 +172,31 @@ func Init() {
for {
select {
case <-sighupCh:
case <-stopCh:
case <-configReloaderStopCh:
return
}
relabelConfigReloads.Inc()
logger.Infof("SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
rcs, err := loadRelabelConfigs()
if err != nil {
relabelConfigReloadErrors.Inc()
relabelConfigSuccess.Set(0)
logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
continue
}
allRelabelConfigs.Store(rcs)
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("Successfully reloaded relabel configs")
logger.Infof("reloading stream agg configs pointed by -remoteWrite.streamAggr.config")
err = saCfgLoader.reloadConfigs()
if err != nil {
logger.Errorf("Cannot reload stream aggregation configs: %s", err)
}
if len(*remoteWriteMultitenantURLs) > 0 {
rwctxsMapLock.Lock()
for _, rwctxs := range rwctxsMap {
for _, rwctx := range rwctxs {
rwctx.reinitStreamAggr()
}
}
rwctxsMapLock.Unlock()
} else {
for _, rwctx := range rwctxsDefault {
rwctx.reinitStreamAggr()
}
}
logger.Infof("Successfully reloaded stream aggregation configs")
reloadRelabelConfigs()
reloadStreamAggrConfigs()
}
}()
}
func reloadRelabelConfigs() {
relabelConfigReloads.Inc()
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
rcs, err := loadRelabelConfigs()
if err != nil {
relabelConfigReloadErrors.Inc()
relabelConfigSuccess.Set(0)
logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
return
}
allRelabelConfigs.Store(rcs)
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("successfully reloaded relabel configs")
}
var (
relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`)
relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`)
@ -219,6 +204,24 @@ var (
relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
)
func reloadStreamAggrConfigs() {
if len(*remoteWriteMultitenantURLs) > 0 {
rwctxsMapLock.Lock()
for _, rwctxs := range rwctxsMap {
reinitStreamAggr(rwctxs)
}
rwctxsMapLock.Unlock()
} else {
reinitStreamAggr(rwctxsDefault)
}
}
func reinitStreamAggr(rwctxs []*remoteWriteCtx) {
for _, rwctx := range rwctxs {
rwctx.reinitStreamAggr()
}
}
func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
if len(urls) == 0 {
logger.Panicf("BUG: urls must be non-empty")
@ -284,14 +287,14 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
return rwctxs
}
var stopCh = make(chan struct{})
var configReloaderStopCh = make(chan struct{})
var configReloaderWG sync.WaitGroup
// Stop stops remotewrite.
//
// It is expected that nobody calls Push during and after the call to this func.
func Stop() {
close(stopCh)
close(configReloaderStopCh)
configReloaderWG.Wait()
for _, rwctx := range rwctxsDefault {
@ -506,8 +509,7 @@ type remoteWriteCtx struct {
fq *persistentqueue.FastQueue
c *client
sas *streamaggr.Aggregators
saHash uint64
sas atomic.Pointer[streamaggr.Aggregators]
streamAggrKeepInput bool
pss []*pendingSeries
@ -567,17 +569,17 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
}
// Initialize sas
saCfg, saHash := saCfgLoader.getCurrentConfig(argIdx)
if len(saCfg) > 0 {
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
if sasFile != "" {
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0)
sas, err := streamaggr.NewAggregators(saCfg, rwctx.pushInternal, dedupInterval)
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval)
if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err)
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
}
rwctx.sas = sas
rwctx.saHash = saHash
rwctx.sas.Store(sas)
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
}
return rwctx
@ -592,8 +594,10 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.fq.UnblockAllReaders()
rwctx.c.MustStop()
rwctx.c = nil
rwctx.sas.MustStop()
rwctx.sas = nil
sas := rwctx.sas.Swap(nil)
sas.MustStop()
rwctx.fq.MustClose()
rwctx.fq = nil
@ -624,8 +628,9 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
rwctx.rowsPushedAfterRelabel.Add(rowsCount)
// Apply stream aggregation if any
rwctx.sas.Push(tss)
if rwctx.sas == nil || rwctx.streamAggrKeepInput {
sas := rwctx.sas.Load()
sas.Push(tss)
if sas == nil || rwctx.streamAggrKeepInput {
// Push samples to the remote storage
rwctx.pushInternal(tss)
}
@ -645,17 +650,33 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
}
func (rwctx *remoteWriteCtx) reinitStreamAggr() {
if rwctx.sas == nil {
sas := rwctx.sas.Load()
if sas == nil {
// There is no stream aggregation for rwctx
return
}
saCfg, saHash := saCfgLoader.getCurrentConfig(rwctx.idx)
if rwctx.saHash == saHash {
sasFile := streamAggrConfig.GetOptionalArg(rwctx.idx)
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc()
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(rwctx.idx, 0)
sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0)
logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err)
return
}
if err := rwctx.sas.ReInitConfigs(saCfg); err != nil {
logger.Errorf("Cannot apply stream aggregation configs %d: %s", rwctx.idx, err)
if !sasNew.Equal(sas) {
sasOld := rwctx.sas.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile)
} else {
sasNew.MustStop()
logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile)
}
rwctx.saHash = saHash
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
}
var tssRelabelPool = &sync.Pool{
@ -672,3 +693,20 @@ func getRowsCount(tss []prompbmarshal.TimeSeries) int {
}
return rowsCount
}
// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config
func CheckStreamAggrConfigs() error {
pushNoop := func(tss []prompbmarshal.TimeSeries) {}
for idx, sasFile := range *streamAggrConfig {
if sasFile == "" {
continue
}
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(idx, 0)
sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, dedupInterval)
if err != nil {
return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err)
}
sas.MustStop()
}
return nil
}

View File

@ -1,118 +0,0 @@
package remotewrite
import (
"fmt"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
)
var (
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
)
var (
saCfgReloads = metrics.NewCounter(`vmagent_streamaggr_config_reloads_total`)
saCfgReloadErr = metrics.NewCounter(`vmagent_streamaggr_config_reloads_errors_total`)
saCfgSuccess = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_successful`)
saCfgTimestamp = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_success_timestamp_seconds`)
)
// saConfigRules - type alias for unmarshalled stream aggregation config
type saConfigRules = []*streamaggr.Config
// saConfigsLoader loads stream aggregation configs from the given files.
type saConfigsLoader struct {
files []string
configs atomic.Pointer[[]saConfig]
}
// newSaConfigsLoader creates new saConfigsLoader for the given config files.
func newSaConfigsLoader(configFiles []string) (*saConfigsLoader, error) {
result := &saConfigsLoader{
files: configFiles,
}
// Initial load of configs.
if err := result.reloadConfigs(); err != nil {
return nil, err
}
return result, nil
}
// reloadConfigs reloads stream aggregation configs from the files given in constructor.
func (r *saConfigsLoader) reloadConfigs() error {
// Increment reloads counter if it is not the initial load.
if r.configs.Load() != nil {
saCfgReloads.Inc()
}
// Load all configs from files.
var configs = make([]saConfig, len(r.files))
for i, path := range r.files {
if len(path) == 0 {
// Skip empty stream aggregation config.
continue
}
rules, hash, err := streamaggr.LoadConfigsFromFile(path)
if err != nil {
saCfgSuccess.Set(0)
saCfgReloadErr.Inc()
return fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err)
}
configs[i] = saConfig{
path: path,
hash: hash,
rules: rules,
}
}
// Update configs.
r.configs.Store(&configs)
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
return nil
}
// getCurrentConfig returns the current stream aggregation config with the given idx.
func (r *saConfigsLoader) getCurrentConfig(idx int) (saConfigRules, uint64) {
all := r.configs.Load()
if all == nil {
return nil, 0
}
cfgs := *all
if len(cfgs) == 0 {
return nil, 0
}
if idx >= len(cfgs) {
if len(cfgs) == 1 {
cfg := cfgs[0]
return cfg.rules, cfg.hash
}
return nil, 0
}
cfg := cfgs[idx]
return cfg.rules, cfg.hash
}
type saConfig struct {
path string
hash uint64
rules saConfigRules
}
// CheckStreamAggConfigs checks -remoteWrite.streamAggr.config.
func CheckStreamAggConfigs() error {
_, err := newSaConfigsLoader(*streamAggrConfig)
return err
}

View File

@ -137,7 +137,8 @@ func (ctx *InsertCtx) ApplyRelabeling() {
// FlushBufs flushes buffered rows to the underlying storage.
func (ctx *InsertCtx) FlushBufs() error {
if sa != nil && !ctx.skipStreamAggr {
sas := sasGlobal.Load()
if sas != nil && !ctx.skipStreamAggr {
ctx.streamAggrCtx.push(ctx.mrs)
if !*streamAggrKeepInput {
ctx.Reset(0)

View File

@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -29,18 +30,31 @@ var (
)
var (
stopCh = make(chan struct{})
configReloaderWG sync.WaitGroup
saCfgReloaderStopCh = make(chan struct{})
saCfgReloaderWG sync.WaitGroup
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`)
saCfgSuccess = metrics.NewCounter(`vminsert_streamagg_config_last_reload_successful`)
saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`)
sa *streamaggr.Aggregators
saHash uint64
sasGlobal atomic.Pointer[streamaggr.Aggregators]
)
// CheckStreamAggrConfig checks config pointed by -stramaggr.config
func CheckStreamAggrConfig() error {
if *streamAggrConfig == "" {
return nil
}
pushNoop := func(tss []prompbmarshal.TimeSeries) {}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, *streamAggrDedupInterval)
if err != nil {
return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err)
}
sas.MustStop()
return nil
}
// InitStreamAggr must be called after flag.Parse and before using the common package.
//
// MustStopStreamAggr must be called when stream aggr is no longer needed.
@ -51,45 +65,60 @@ func InitStreamAggr() {
sighupCh := procutil.NewSighupChan()
configs, hash, err := streamaggr.LoadConfigsFromFile(*streamAggrConfig)
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval)
if err != nil {
logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err)
}
a, err := streamaggr.NewAggregators(configs, pushAggregateSeries, *streamAggrDedupInterval)
if err != nil {
logger.Fatalf("cannot init -streamAggr.config=%q: %s", *streamAggrConfig, err)
}
sa = a
saHash = hash
sasGlobal.Store(sas)
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
// Start config reloader.
configReloaderWG.Add(1)
saCfgReloaderWG.Add(1)
go func() {
defer configReloaderWG.Done()
defer saCfgReloaderWG.Done()
for {
select {
case <-sighupCh:
case <-stopCh:
case <-saCfgReloaderStopCh:
return
}
if err := reloadSaConfig(); err != nil {
logger.Errorf("cannot reload -streamAggr.config=%q: %s", *streamAggrConfig, err)
continue
}
reloadStreamAggrConfig()
}
}()
}
func reloadStreamAggrConfig() {
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
saCfgReloads.Inc()
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval)
if err != nil {
saCfgSuccess.Set(0)
saCfgReloadErr.Inc()
logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err)
return
}
sas := sasGlobal.Load()
if !sasNew.Equal(sas) {
sasOld := sasGlobal.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
} else {
logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig)
sasNew.MustStop()
}
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
}
// MustStopStreamAggr stops stream aggregators.
func MustStopStreamAggr() {
close(stopCh)
close(saCfgReloaderStopCh)
saCfgReloaderWG.Wait()
sa.MustStop()
sa = nil
configReloaderWG.Wait()
sas := sasGlobal.Swap(nil)
sas.MustStop()
}
type streamAggrCtx struct {
@ -109,6 +138,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) {
ts := &tss[0]
labels := ts.Labels
samples := ts.Samples
sas := sasGlobal.Load()
for _, mr := range mrs {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
logger.Panicf("BUG: cannot unmarshal recently marshaled MetricName: %s", err)
@ -133,7 +163,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) {
ts.Labels = labels
ts.Samples = samples
sa.Push(tss)
sas.Push(tss)
}
}
@ -164,33 +194,3 @@ func pushAggregateSeries(tss []prompbmarshal.TimeSeries) {
logger.Errorf("cannot flush aggregate series: %s", err)
}
}
func reloadSaConfig() error {
saCfgReloads.Inc()
cfgs, hash, err := streamaggr.LoadConfigsFromFile(*streamAggrConfig)
if err != nil {
saCfgSuccess.Set(0)
saCfgReloadErr.Inc()
return fmt.Errorf("cannot reload -streamAggr.config=%q: %w", *streamAggrConfig, err)
}
if saHash == hash {
return nil
}
if err = sa.ReInitConfigs(cfgs); err != nil {
saCfgSuccess.Set(0)
saCfgReloadErr.Inc()
return fmt.Errorf("cannot apply new -streamAggr.config=%q: %w", *streamAggrConfig, err)
}
saHash = hash
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("Successfully reloaded stream aggregation config")
return nil
}

View File

@ -71,6 +71,12 @@ var (
var pcsGlobal atomic.Value
// CheckRelabelConfig checks config pointed by -relabelConfig
func CheckRelabelConfig() error {
_, err := loadRelabelConfig()
return err
}
func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) {
if len(*relabelConfig) == 0 {
return nil, nil

View File

@ -26,7 +26,8 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for hot reload of stream aggregation configs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add the ability for hot reloading of [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) configs. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#configuration-update) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639).
* FEATURE: check the contents of `-relabelConfig` and `-streamAggr.config` files additionally to `-promscrape.config` when single-node VictoriaMetrics runs with `-dryRun` command-line flag. This aligns the behaviour of single-node VictoriaMetrics with [vmagent](https://docs.victoriametrics.com/vmagent.html) behaviour for `-dryRun` command-line flag.
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
@ -138,6 +139,19 @@ Released at 2023-02-24
* BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816).
## [v1.87.4](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.4)
Released at 2023-03-25
**v1.87.x is a line of LTS releases (e.g. long-time support). It contains important up-to-date bugfixes.
The v1.87.x line will be supported for at least 12 months since [v1.87.0](https://docs.victoriametrics.com/CHANGELOG.html#v1870) release**
* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055).
* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966).
## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3)
Released at 2023-03-12

View File

@ -2194,7 +2194,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-dryRun
Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
-enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
-envflag.enable

View File

@ -2197,7 +2197,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags.
-dryRun
Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
-enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
-envflag.enable

View File

@ -509,7 +509,7 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
# match is an optional filter for incoming samples to aggregate.
# It can contain arbitrary Prometheus series selector
# according to https://docs.victoriametrics.com/keyConcepts.html#filtering .
# If match is missing, then all the incoming samples are aggregated.
# If match isn't set, then all the incoming samples are aggregated.
- match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}'
# interval is the interval for the aggregation.
@ -548,17 +548,13 @@ per each specified config entry.
### Configuration update
[vmagent](https://docs.victoriametrics.com/vmagent.html) and
[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) support two
approaches for reloading stream aggregation configs from updated config files such as
`-remoteWrite.streamAggr.config` and `-streamAggr.config` without restart.
[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
support the following approaches for hot reloading stream aggregation configs from `-remoteWrite.streamAggr.config` and `-streamAggr.config`:
* Sending `SIGHUP` signal to `vmagent` process:
* By sending `SIGHUP` signal to `vmagent` or `victoria-metrics` process:
```console
kill -SIGHUP `pidof vmagent`
```
* Sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload`).
It will reset the aggregation state only for changed rules in the configuration files.
* By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload).

View File

@ -1190,7 +1190,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
-denyQueryTracing
Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing
-dryRun
Whether to check only config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag
Whether to check config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag
-enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
-envflag.enable

View File

@ -50,7 +50,7 @@ var (
// CheckConfig checks -promscrape.config for errors and unsupported options.
func CheckConfig() error {
if *promscrapeConfigFile == "" {
return fmt.Errorf("missing -promscrape.config option")
return nil
}
_, _, err := loadConfig(*promscrapeConfigFile)
return err

View File

@ -8,7 +8,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -19,7 +18,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/cespare/xxhash/v2"
"gopkg.in/yaml.v2"
)
@ -39,40 +37,22 @@ var supportedOutputs = []string{
"quantiles(phi1, ..., phiN)",
}
// ParseConfig loads array of stream aggregation configs from the given path.
func ParseConfig(data []byte) ([]*Config, uint64, error) {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, 0, fmt.Errorf("cannot parse stream aggregation config %w", err)
}
return cfgs, xxhash.Sum64(data), nil
}
// LoadConfigsFromFile loads array of stream aggregation configs from the given path.
func LoadConfigsFromFile(path string) ([]*Config, uint64, error) {
data, err := fs.ReadFileOrHTTP(path)
if err != nil {
return nil, 0, fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err)
}
return ParseConfig(data)
}
// LoadAggregatorsFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
//
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, uint64, error) {
cfgs, configHash, err := LoadConfigsFromFile(path)
func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
data, err := fs.ReadFileOrHTTP(path)
if err != nil {
return nil, 0, fmt.Errorf("cannot load stream aggregation config: %w", err)
return nil, fmt.Errorf("cannot load aggregators: %w", err)
}
as, err := NewAggregators(cfgs, pushFunc, dedupInterval)
as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval)
if err != nil {
return nil, 0, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
}
return as, configHash, nil
return as, nil
}
// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.
@ -84,7 +64,7 @@ func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time.
func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, err
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
}
return NewAggregators(cfgs, pushFunc, dedupInterval)
}
@ -148,22 +128,13 @@ type Config struct {
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
}
func (cfg *Config) hash() (uint64, error) {
if cfg == nil {
return 0, nil
}
data, err := json.Marshal(cfg)
if err != nil {
return 0, fmt.Errorf("cannot marshal stream aggregation rule %+v: %w", cfg, err)
}
return xxhash.Sum64(data), nil
}
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
type Aggregators struct {
as atomic.Pointer[[]*aggregator]
pushFunc PushFunc
dedupInterval time.Duration
as []*aggregator
// configData contains marshaled configs passed to NewAggregators().
// It is used in Equal() for comparing Aggregators.
configData []byte
}
// NewAggregators creates Aggregators from the given cfgs.
@ -182,17 +153,22 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati
for i, cfg := range cfgs {
a, err := newAggregator(cfg, pushFunc, dedupInterval)
if err != nil {
// Stop already initialized aggregators before returning the error.
for _, a := range as[:i] {
a.MustStop()
}
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
}
as[i] = a
}
result := &Aggregators{
pushFunc: pushFunc,
dedupInterval: dedupInterval,
configData, err := json.Marshal(cfgs)
if err != nil {
logger.Panicf("BUG: cannot marshal the provided configs: %s", err)
}
result.as.Store(&as)
return result, nil
return &Aggregators{
as: as,
configData: configData,
}, nil
}
// MustStop stops a.
@ -200,84 +176,29 @@ func (a *Aggregators) MustStop() {
if a == nil {
return
}
for _, aggr := range *a.as.Load() {
for _, aggr := range a.as {
aggr.MustStop()
}
}
// Equal returns true if a and b are initialized from identical configs.
func (a *Aggregators) Equal(b *Aggregators) bool {
if a == nil || b == nil {
return a == nil && b == nil
}
return string(a.configData) == string(b.configData)
}
// Push pushes tss to a.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
if a == nil {
return
}
for _, aggr := range *a.as.Load() {
for _, aggr := range a.as {
aggr.Push(tss)
}
}
// ReInitConfigs reinits state of Aggregators a with the given new stream aggregation config
func (a *Aggregators) ReInitConfigs(cfgs []*Config) error {
if a == nil {
return nil
}
keys := make(map[uint64]struct{}) // set of all keys (configs and aggregators)
cfgsMap := make(map[uint64]*Config) // map of config keys to their indices in cfgs
aggrsMap := make(map[uint64]*aggregator) // map of aggregator keys to their indices in a.as
for _, cfg := range cfgs {
key, err := cfg.hash()
if err != nil {
return fmt.Errorf("unable to calculate hash for config '%+v': %w", cfg, err)
}
keys[key] = struct{}{}
cfgsMap[key] = cfg
}
for _, aggr := range *a.as.Load() {
keys[aggr.cfgHash] = struct{}{}
aggrsMap[aggr.cfgHash] = aggr
}
asNew := make([]*aggregator, 0, len(aggrsMap))
asDel := make([]*aggregator, 0, len(aggrsMap))
for key := range keys {
cfg, hasCfg := cfgsMap[key]
agg, hasAggr := aggrsMap[key]
// if config for aggregator was changed or removed
// then we need to stop aggregator and remove it
if !hasCfg && hasAggr {
asDel = append(asDel, agg)
continue
}
// if there is no aggregator for config (new config),
// then we need to create it
if hasCfg && !hasAggr {
newAgg, err := newAggregator(cfg, a.pushFunc, a.dedupInterval)
if err != nil {
return fmt.Errorf("cannot initialize aggregator for config '%+v': %w", cfg, err)
}
asNew = append(asNew, newAgg)
continue
}
// if aggregator config was not changed, then we can just keep it
if hasCfg && hasAggr {
asNew = append(asNew, agg)
}
}
// Atomically replace aggregators array.
a.as.Store(&asNew)
// and stop old aggregators
for _, aggr := range asDel {
aggr.MustStop()
}
return nil
}
// aggregator aggregates input series according to the config passed to NewAggregator
type aggregator struct {
match *promrelabel.IfExpression
@ -295,7 +216,6 @@ type aggregator struct {
// aggrStates contains aggregate states for the given outputs
aggrStates []aggrState
hasState atomic.Bool
pushFunc PushFunc
@ -304,8 +224,7 @@ type aggregator struct {
// It contains the interval, labels in (by, without), plus output name.
// For example, foo_bar metric name is transformed to foo_bar:1m_by_job
// for `interval: 1m`, `by: [job]`
suffix string
cfgHash uint64
suffix string
wg sync.WaitGroup
stopCh chan struct{}
@ -433,11 +352,6 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
dedupAggr = newLastAggrState()
}
cfgHash, err := cfg.hash()
if err != nil {
return nil, fmt.Errorf("cannot calculate config hash for config %+v: %w", cfg, err)
}
// initialize the aggregator
a := &aggregator{
match: cfg.Match,
@ -453,8 +367,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
aggrStates: aggrStates,
pushFunc: pushFunc,
suffix: suffix,
cfgHash: cfgHash,
suffix: suffix,
stopCh: make(chan struct{}),
}
@ -521,8 +434,6 @@ func (a *aggregator) dedupFlush() {
}
a.dedupAggr.appendSeriesForFlush(ctx)
a.push(ctx.tss)
a.hasState.Store(false)
}
func (a *aggregator) flush() {
@ -552,8 +463,6 @@ func (a *aggregator) flush() {
// Push the output metrics.
a.pushFunc(tss)
}
a.hasState.Store(false)
}
// MustStop stops the aggregator.
@ -561,26 +470,19 @@ func (a *aggregator) flush() {
// The aggregator stops pushing the aggregated metrics after this call.
func (a *aggregator) MustStop() {
close(a.stopCh)
if a.hasState.Load() {
if a.dedupAggr != nil {
flushConcurrencyCh <- struct{}{}
a.dedupFlush()
<-flushConcurrencyCh
}
flushConcurrencyCh <- struct{}{}
a.flush()
<-flushConcurrencyCh
}
a.wg.Wait()
// Flush the remaining data from the last interval if needed.
flushConcurrencyCh <- struct{}{}
if a.dedupAggr != nil {
a.dedupFlush()
}
a.flush()
<-flushConcurrencyCh
}
// Push pushes tss to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
a.hasState.Store(true)
if a.dedupAggr == nil {
a.push(tss)
return

View File

@ -118,6 +118,45 @@ func TestAggregatorsFailure(t *testing.T) {
`)
}
func TestAggregatorsEqual(t *testing.T) {
f := func(a, b string, expectedResult bool) {
t.Helper()
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
result := aa.Equal(ab)
if result != expectedResult {
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
}
}
f("", "", true)
f(`
- outputs: [total]
interval: 5m
`, ``, false)
f(`
- outputs: [total]
interval: 5m
`, `
- outputs: [total]
interval: 5m
`, true)
f(`
- outputs: [total]
interval: 3m
`, `
- outputs: [total]
interval: 5m
`, false)
}
func TestAggregatorsSuccess(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected string) {
t.Helper()
@ -145,11 +184,6 @@ func TestAggregatorsSuccess(t *testing.T) {
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput)
if a != nil {
for _, aggr := range *a.as.Load() {
aggr.flush()
}
}
a.MustStop()
// Verify the tssOutput contains the expected metrics
@ -671,7 +705,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput)
if a != nil {
for _, aggr := range *a.as.Load() {
for _, aggr := range a.as {
aggr.dedupFlush()
aggr.flush()
}
@ -719,106 +753,6 @@ foo:1m_sum_samples{baz="qwe"} 10
`)
}
func TestAggregatorsReinit(t *testing.T) {
f := func(config, newConfig, inputMetrics, outputMetricsExpected string) {
t.Helper()
// Initialize Aggregators
var tssOutput []prompbmarshal.TimeSeries
var tssOutputLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssOutputLock.Lock()
for _, ts := range tss {
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
Labels: labelsCopy,
Samples: samplesCopy,
})
}
tssOutputLock.Unlock()
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput)
// Reinitialize Aggregators
nc, _, err := ParseConfig([]byte(newConfig))
if err != nil {
t.Fatalf("cannot parse new config: %s", err)
}
err = a.ReInitConfigs(nc)
if err != nil {
t.Fatalf("cannot reinit aggregators: %s", err)
}
// Push the inputMetrics to Aggregators
a.Push(tssInput)
if a != nil {
for _, aggr := range *a.as.Load() {
aggr.flush()
}
}
a.MustStop()
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
tsStrings[i] = timeSeriesToString(ts)
}
sort.Strings(tsStrings)
outputMetrics := strings.Join(tsStrings, "")
if outputMetrics != outputMetricsExpected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
}
}
f(`
- interval: 1m
outputs: [count_samples]
`, `
- interval: 1m
outputs: [sum_samples]
`, `
foo 123
bar 567
foo 234
`, `bar:1m_count_samples 1
bar:1m_sum_samples 567
foo:1m_count_samples 2
foo:1m_sum_samples 357
`)
f(`
- interval: 1m
outputs: [total]
- interval: 2m
outputs: [count_samples]
`, `
- interval: 1m
outputs: [sum_samples]
- interval: 2m
outputs: [count_samples]
`, `
foo 123
bar 567
foo 234
`, `bar:1m_sum_samples 567
bar:1m_total 0
bar:2m_count_samples 2
foo:1m_sum_samples 357
foo:1m_total 111
foo:2m_count_samples 4
`)
}
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
labelsString := promrelabel.LabelsToString(ts.Labels)
if len(ts.Samples) != 1 {