mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
--------- Signed-off-by: Alexander Marshalov <_@marshalov.org> Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
This commit is contained in:
parent
a24541bdb7
commit
70773f53d7
@ -50,6 +50,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): expose `vmauth_user_request_duration_seconds` and `vmauth_unauthorized_user_request_duration_seconds` summary metrics for measuring requests latency per user.
|
||||
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): show backup progress percentage in log during backup uploading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460).
|
||||
* FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): show restoring progress percentage in log during backup downloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details.
|
||||
* FEATURE: add ability to fine-tune Graphite API limits via the following command-line flags:
|
||||
`-search.maxGraphiteTagKeys` for limiting the number of tag keys returned from [Graphite API for tags](https://docs.victoriametrics.com/#graphite-tags-api-usage)
|
||||
`-search.maxGraphiteTagValues` for limiting the number of tag values returned from [Graphite API for tag values](https://docs.victoriametrics.com/#graphite-tags-api-usage)
|
||||
|
@ -541,6 +541,16 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
|
||||
# The aggregated stats is sent to remote storage once per interval.
|
||||
interval: 1m
|
||||
|
||||
# staleness_interval defines an interval after which the series state will be reset if no samples have been sent during it.
|
||||
# It means that:
|
||||
# - no data point will be written for a resulting time series if it didn't receive any updates during configured interval,
|
||||
# - if the series receives updates after the configured interval again, then the time series will be calculated from the initial state
|
||||
# (it's like this series didn't exist until now).
|
||||
# Increase this parameter if it is expected for matched metrics to be delayed or collected with irregular intervals exceeding the `interval` value.
|
||||
# By default, is equal to x2 of the `interval` field.
|
||||
# The parameter is only relevant for outputs: total, increase and histogram_bucket.
|
||||
# staleness_interval: 2m
|
||||
|
||||
# without is an optional list of labels, which must be removed from the output aggregation.
|
||||
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels
|
||||
without: [instance]
|
||||
|
@ -12,7 +12,7 @@ import (
|
||||
type histogramBucketAggrState struct {
|
||||
m sync.Map
|
||||
|
||||
intervalSecs uint64
|
||||
stalenessInterval uint64
|
||||
}
|
||||
|
||||
type histogramBucketStateValue struct {
|
||||
@ -22,16 +22,15 @@ type histogramBucketStateValue struct {
|
||||
deleted bool
|
||||
}
|
||||
|
||||
func newHistogramBucketAggrState(interval time.Duration) *histogramBucketAggrState {
|
||||
intervalSecs := uint64(interval.Seconds() + 1)
|
||||
func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState {
|
||||
return &histogramBucketAggrState{
|
||||
intervalSecs: intervalSecs,
|
||||
stalenessInterval: uint64(stalenessInterval.Seconds()),
|
||||
}
|
||||
}
|
||||
|
||||
func (as *histogramBucketAggrState) pushSample(inputKey, outputKey string, value float64) {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
deleteDeadline := currentTime + 2*as.intervalSecs
|
||||
deleteDeadline := currentTime + as.stalenessInterval
|
||||
|
||||
again:
|
||||
v, ok := as.m.Load(outputKey)
|
||||
|
@ -12,7 +12,7 @@ type increaseAggrState struct {
|
||||
m sync.Map
|
||||
|
||||
ignoreInputDeadline uint64
|
||||
intervalSecs uint64
|
||||
stalenessInterval uint64
|
||||
}
|
||||
|
||||
type increaseStateValue struct {
|
||||
@ -23,18 +23,18 @@ type increaseStateValue struct {
|
||||
deleted bool
|
||||
}
|
||||
|
||||
func newIncreaseAggrState(interval time.Duration) *increaseAggrState {
|
||||
func newIncreaseAggrState(interval time.Duration, stalenessInterval time.Duration) *increaseAggrState {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
intervalSecs := uint64(interval.Seconds() + 1)
|
||||
return &increaseAggrState{
|
||||
ignoreInputDeadline: currentTime + intervalSecs,
|
||||
intervalSecs: intervalSecs,
|
||||
stalenessInterval: uint64(stalenessInterval.Seconds()),
|
||||
}
|
||||
}
|
||||
|
||||
func (as *increaseAggrState) pushSample(inputKey, outputKey string, value float64) {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
deleteDeadline := currentTime + 2*as.intervalSecs
|
||||
deleteDeadline := currentTime + as.stalenessInterval
|
||||
|
||||
again:
|
||||
v, ok := as.m.Load(outputKey)
|
||||
|
@ -79,6 +79,10 @@ type Config struct {
|
||||
// Interval is the interval between aggregations.
|
||||
Interval string `yaml:"interval"`
|
||||
|
||||
// Staleness interval is interval after which the series state will be reset if no samples have been sent during it.
|
||||
// The parameter is only relevant for outputs: total, increase and histogram_bucket.
|
||||
StalenessInterval string `yaml:"staleness_interval,omitempty"`
|
||||
|
||||
// Outputs is a list of output aggregate functions to produce.
|
||||
//
|
||||
// The following names are allowed:
|
||||
@ -254,6 +258,18 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
|
||||
return nil, fmt.Errorf("the minimum supported aggregation interval is 1s; got %s", interval)
|
||||
}
|
||||
|
||||
// check cfg.StalenessInterval
|
||||
stalenessInterval := interval * 2
|
||||
if cfg.StalenessInterval != "" {
|
||||
stalenessInterval, err = time.ParseDuration(cfg.StalenessInterval)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse `staleness_interval: %q`: %w", cfg.StalenessInterval, err)
|
||||
}
|
||||
if stalenessInterval < interval {
|
||||
return nil, fmt.Errorf("staleness_interval cannot be less than interval (%s); got %s", cfg.Interval, cfg.StalenessInterval)
|
||||
}
|
||||
}
|
||||
|
||||
// initialize input_relabel_configs and output_relabel_configs
|
||||
inputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.InputRelabelConfigs)
|
||||
if err != nil {
|
||||
@ -308,9 +324,9 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
|
||||
}
|
||||
switch output {
|
||||
case "total":
|
||||
aggrStates[i] = newTotalAggrState(interval)
|
||||
aggrStates[i] = newTotalAggrState(interval, stalenessInterval)
|
||||
case "increase":
|
||||
aggrStates[i] = newIncreaseAggrState(interval)
|
||||
aggrStates[i] = newIncreaseAggrState(interval, stalenessInterval)
|
||||
case "count_series":
|
||||
aggrStates[i] = newCountSeriesAggrState()
|
||||
case "count_samples":
|
||||
@ -330,7 +346,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
|
||||
case "stdvar":
|
||||
aggrStates[i] = newStdvarAggrState()
|
||||
case "histogram_bucket":
|
||||
aggrStates[i] = newHistogramBucketAggrState(interval)
|
||||
aggrStates[i] = newHistogramBucketAggrState(stalenessInterval)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
|
||||
"see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", output, supportedOutputs)
|
||||
|
@ -13,7 +13,7 @@ type totalAggrState struct {
|
||||
m sync.Map
|
||||
|
||||
ignoreInputDeadline uint64
|
||||
intervalSecs uint64
|
||||
stalenessInterval uint64
|
||||
}
|
||||
|
||||
type totalStateValue struct {
|
||||
@ -29,18 +29,18 @@ type lastValueState struct {
|
||||
deleteDeadline uint64
|
||||
}
|
||||
|
||||
func newTotalAggrState(interval time.Duration) *totalAggrState {
|
||||
func newTotalAggrState(interval time.Duration, stalenessInterval time.Duration) *totalAggrState {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
intervalSecs := uint64(interval.Seconds() + 1)
|
||||
return &totalAggrState{
|
||||
ignoreInputDeadline: currentTime + intervalSecs,
|
||||
intervalSecs: intervalSecs,
|
||||
stalenessInterval: uint64(stalenessInterval.Seconds()),
|
||||
}
|
||||
}
|
||||
|
||||
func (as *totalAggrState) pushSample(inputKey, outputKey string, value float64) {
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
deleteDeadline := currentTime + as.intervalSecs + (as.intervalSecs >> 1)
|
||||
deleteDeadline := currentTime + as.stalenessInterval
|
||||
|
||||
again:
|
||||
v, ok := as.m.Load(outputKey)
|
||||
|
Loading…
Reference in New Issue
Block a user