app/{vmagent,vminsert}: add support for streaming aggregation

See https://docs.victoriametrics.com/stream-aggregation.html

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460
This commit is contained in:
Aliaksandr Valialkin 2023-01-03 22:19:18 -08:00
parent add2c4bf07
commit fa13bbc48a
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
29 changed files with 3142 additions and 22 deletions

View File

@ -82,6 +82,7 @@ VictoriaMetrics has the following prominent features:
* [Arbitrary CSV data](#how-to-import-csv-data).
* [Native binary format](#how-to-import-data-in-native-format).
* [DataDog agent or DogStatsD](#how-to-send-data-from-datadog-agent).
* It supports powerful [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html), which can be used as a [statsd](https://github.com/statsd/statsd) alternative.
* It supports metrics [relabeling](#relabeling).
* It can deal with [high cardinality issues](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and
[high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues via [series limiter](#cardinality-limiter).

View File

@ -24,8 +24,8 @@ additionally to [discovering Prometheus-compatible targets and scraping metrics
see [these docs](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter).
* Can add, remove and modify labels (aka tags) via Prometheus relabeling. Can filter data before sending it to remote storage. See [these docs](#relabeling) for details.
* Can accept data via all the ingestion protocols supported by VictoriaMetrics - see [these docs](#how-to-push-data-to-vmagent).
* Can replicate collected metrics simultaneously to multiple remote storage systems -
see [these docs](#replication-and-high-availability).
* Can aggregate incoming samples by time and by labels before sending them to remote storage - see [these docs](https://docs.victoriametrics.com/stream-aggregation.html).
* Can replicate collected metrics simultaneously to multiple remote storage systems - see [these docs](#replication-and-high-availability).
* Works smoothly in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics
are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as the connection
to the remote storage is repaired. The maximum disk usage for the buffer can be limited with `-remoteWrite.maxDiskUsagePerURL`.
@ -126,6 +126,12 @@ If you use Prometheus only for scraping metrics from various targets and forward
then `vmagent` can replace Prometheus. Typically, `vmagent` requires lower amounts of RAM, CPU and network bandwidth compared with Prometheus.
See [these docs](#how-to-collect-metrics-in-prometheus-format) for details.
### Statsd alternative
`vmagent` can be used as an alternative to [statsd](https://github.com/statsd/statsd)
when [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) is enabled.
See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#statsd-alternative) for details.
### Flexible metrics relay
`vmagent` can accept metrics in [various popular data ingestion protocols](#how-to-push-data-to-vmagent), apply [relabeling](#relabeling)

View File

@ -21,6 +21,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
@ -58,6 +59,13 @@ var (
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter")
maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter")
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation.html ."+
"See also -remoteWrite.streamAggr.keepInput")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config ."+
"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
)
var (
@ -140,6 +148,7 @@ func Init() {
logger.Fatalf("cannot load relabel configs: %s", err)
}
allRelabelConfigs.Store(rcs)
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
@ -435,9 +444,13 @@ var (
)
type remoteWriteCtx struct {
idx int
fq *persistentqueue.FastQueue
c *client
idx int
fq *persistentqueue.FastQueue
c *client
sas *streamaggr.Aggregators
streamAggrKeepInput bool
pss []*pendingSeries
pssNextIdx uint64
@ -469,6 +482,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
}
c.init(argIdx, *queues, sanitizedURL)
// Initialize pss
sf := significantFigures.GetOptionalArgOrDefault(argIdx, 0)
rd := roundDigits.GetOptionalArgOrDefault(argIdx, 100)
pssLen := *queues
@ -481,7 +495,8 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
for i := range pss {
pss[i] = newPendingSeries(fq.MustWriteBlock, sf, rd)
}
return &remoteWriteCtx{
rwctx := &remoteWriteCtx{
idx: argIdx,
fq: fq,
c: c,
@ -490,6 +505,19 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
}
// Initialize sas
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
if sasFile != "" {
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal)
if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err)
}
rwctx.sas = sas
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
}
return rwctx
}
func (rwctx *remoteWriteCtx) MustStop() {
@ -501,6 +529,8 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.fq.UnblockAllReaders()
rwctx.c.MustStop()
rwctx.c = nil
rwctx.sas.MustStop()
rwctx.sas = nil
rwctx.fq.MustClose()
rwctx.fq = nil
@ -509,6 +539,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
}
func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
// Apply relabeling
var rctx *relabelCtx
var v *[]prompbmarshal.TimeSeries
rcs := allRelabelConfigs.Load().(*relabelConfigs)
@ -526,11 +557,17 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
rowsCountAfterRelabel := getRowsCount(tss)
rwctx.rowsDroppedByRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel)
}
pss := rwctx.pss
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
rowsCount := getRowsCount(tss)
rwctx.rowsPushedAfterRelabel.Add(rowsCount)
pss[idx].Push(tss)
// Apply stream aggregation if any
rwctx.sas.Push(tss)
if rwctx.sas == nil || rwctx.streamAggrKeepInput {
// Push samples to the remote storage
rwctx.pushInternal(tss)
}
// Return back relabeling contexts to the pool
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssRelabelPool.Put(v)
@ -538,6 +575,12 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
}
}
func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
pss := rwctx.pss
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
pss[idx].Push(tss)
}
var tssRelabelPool = &sync.Pool{
New: func() interface{} {
a := []prompbmarshal.TimeSeries{}

View File

@ -69,16 +69,17 @@ Then configure `vmalert` accordingly:
-external.label=replica=a # Multiple external labels may be set
```
Note there's a separate `remoteWrite.url` to allow writing results of
Note there's a separate `-remoteWrite.url` command-line flag to allow writing results of
alerting/recording rules into a different storage than the initial data that's
queried. This allows using `vmalert` to aggregate data from a short-term,
high-frequency, high-cardinality storage into a long-term storage with
decreased cardinality and a bigger interval between samples.
See also [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html).
See the full list of configuration flags in [configuration](#configuration) section.
If you run multiple `vmalert` services for the same datastore or AlertManager - do not forget
to specify different `external.label` flags in order to define which `vmalert` generated rules or alerts.
to specify different `-external.label` command-line flags in order to define which `vmalert` generated rules or alerts.
Configuration for [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/)
and [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) rules is very
@ -514,8 +515,8 @@ groups:
expr: avg_over_time(http_requests[5m])
```
Ability of `vmalert` to be configured with different `datasource.url` and `remoteWrite.url` allows
reading data from one data source and backfilling results to another. This helps to build a system
Ability of `vmalert` to be configured with different `-datasource.url` and `-remoteWrite.url` command-line flags
allows reading data from one data source and backfilling results to another. This helps to build a system
for aggregating and downsampling the data.
The following example shows how to build a topology where `vmalert` will process data from one cluster
@ -539,7 +540,7 @@ Please note, [replay](#rules-backfilling) feature may be used for transforming h
Flags `-remoteRead.url` and `-notifier.url` are omitted since we assume only recording rules are used.
See also [downsampling docs](https://docs.victoriametrics.com/#downsampling).
See also [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) and [downsampling](https://docs.victoriametrics.com/#downsampling).
#### Multiple remote writes

View File

@ -19,7 +19,10 @@ type InsertCtx struct {
mrs []storage.MetricRow
metricNamesBuf []byte
relabelCtx relabel.Ctx
relabelCtx relabel.Ctx
streamAggrCtx streamAggrCtx
skipStreamAggr bool
}
// Reset resets ctx for future fill with rowsLen rows.
@ -42,6 +45,8 @@ func (ctx *InsertCtx) Reset(rowsLen int) {
ctx.mrs = ctx.mrs[:0]
ctx.metricNamesBuf = ctx.metricNamesBuf[:0]
ctx.relabelCtx.Reset()
ctx.streamAggrCtx.Reset()
ctx.skipStreamAggr = false
}
func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) []byte {
@ -132,6 +137,13 @@ func (ctx *InsertCtx) ApplyRelabeling() {
// FlushBufs flushes buffered rows to the underlying storage.
func (ctx *InsertCtx) FlushBufs() error {
if sa != nil && !ctx.skipStreamAggr {
ctx.streamAggrCtx.push(ctx.mrs)
if !*streamAggrKeepInput {
ctx.Reset(0)
return nil
}
}
err := vmstorage.AddRows(ctx.mrs)
ctx.Reset(0)
if err == nil {

View File

@ -0,0 +1,117 @@
package common
import (
"flag"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
)
var (
streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation.html ."+
"See also -remoteWrite.streamAggr.keepInput")
streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep input samples after the aggregation with -streamAggr.config ."+
"By default the input is dropped after the aggregation, so only the aggregate data is stored. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
)
// InitStreamAggr must be called after flag.Parse and before using the common package.
//
// MustStopStreamAggr must be called when stream aggr is no longer needed.
func InitStreamAggr() {
if *streamAggrConfig == "" {
// Nothing to initialize
return
}
a, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries)
if err != nil {
logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err)
}
sa = a
}
// MustStopStreamAggr stops stream aggregators.
func MustStopStreamAggr() {
sa.MustStop()
sa = nil
}
var sa *streamaggr.Aggregators
type streamAggrCtx struct {
mn storage.MetricName
tss [1]prompbmarshal.TimeSeries
}
func (ctx *streamAggrCtx) Reset() {
ctx.mn.Reset()
ts := &ctx.tss[0]
promrelabel.CleanLabels(ts.Labels)
}
func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) {
mn := &ctx.mn
tss := ctx.tss[:]
ts := &tss[0]
labels := ts.Labels
samples := ts.Samples
for _, mr := range mrs {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
logger.Panicf("BUG: cannot unmarshal recently marshaled MetricName: %s", err)
}
labels = append(labels[:0], prompbmarshal.Label{
Name: "__name__",
Value: bytesutil.ToUnsafeString(mn.MetricGroup),
})
for _, tag := range mn.Tags {
labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(tag.Key),
Value: bytesutil.ToUnsafeString(tag.Value),
})
}
samples = append(samples[:0], prompbmarshal.Sample{
Timestamp: mr.Timestamp,
Value: mr.Value,
})
ts.Labels = labels
ts.Samples = samples
sa.Push(tss)
}
}
func pushAggregateSeries(tss []prompbmarshal.TimeSeries) {
currentTimestamp := int64(fasttime.UnixTimestamp()) * 1000
var ctx InsertCtx
ctx.Reset(len(tss))
ctx.skipStreamAggr = true
for _, ts := range tss {
labels := ts.Labels
for _, label := range labels {
name := label.Name
if name == "__name__" {
name = ""
}
ctx.AddLabel(name, label.Value)
}
value := ts.Samples[0].Value
if err := ctx.WriteDataPoint(nil, ctx.Labels, currentTimestamp, value); err != nil {
logger.Errorf("cannot store aggregate series: %s", err)
// Do not continue pushing the remaining samples, since it is likely they will return the same error.
return
}
}
if err := vmstorage.AddRows(ctx.mrs); err != nil {
logger.Errorf("cannot flush aggregate series: %s", err)
}
}

View File

@ -9,6 +9,7 @@ import (
"sync/atomic"
"time"
vminsertCommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
@ -66,6 +67,7 @@ var staticServer = http.FileServer(http.FS(staticFiles))
// Init initializes vminsert.
func Init() {
relabel.Init()
vminsertCommon.InitStreamAggr()
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
storage.SetMaxLabelValueLen(*maxLabelValueLen)
common.StartUnmarshalWorkers()
@ -103,6 +105,7 @@ func Stop() {
opentsdbhttpServer.MustStop()
}
common.StopUnmarshalWorkers()
vminsertCommon.MustStopStreamAggr()
}
// RequestHandler is a handler for Prometheus remote storage write API

View File

@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for aggregation of incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) by time and by labels. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to explore metrics exported by a particular `job` / `instance`. See [these docs](https://docs.victoriametrics.com/#metrics-explorer) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3386).
* FEATURE: allow passing partial `RFC3339` date/time to `time`, `start` and `end` query args at [querying APIs](https://docs.victoriametrics.com/#prometheus-querying-api-usage) and [export APIs](https://docs.victoriametrics.com/#how-to-export-time-series). For example, `2022` is equivalent to `2022-01-01T00:00:00Z`, while `2022-01-30T14` is equivalent to `2022-01-30T14:00:00Z`. See [these docs](https://docs.victoriametrics.com/#timestamp-formats).
* FEATURE: [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling): add support for `keepequal` and `dropequal` relabeling actions, which are supported by Prometheus starting from [v2.41.0](https://github.com/prometheus/prometheus/releases/tag/v2.41.0). These relabeling actions are almost identical to `keep_if_equal` and `drop_if_equal` relabeling actions supported by VictoriaMetrics since `v1.38.0` - see [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) - so it is recommended sticking to `keep_if_equal` and `drop_if_equal` actions instead of switching to `keepequal` and `dropequal`.

View File

@ -83,6 +83,7 @@ VictoriaMetrics has the following prominent features:
* [Arbitrary CSV data](#how-to-import-csv-data).
* [Native binary format](#how-to-import-data-in-native-format).
* [DataDog agent or DogStatsD](#how-to-send-data-from-datadog-agent).
* It supports powerful [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html), which can be used as a [statsd](https://github.com/statsd/statsd) alternative.
* It supports metrics [relabeling](#relabeling).
* It can deal with [high cardinality issues](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and
[high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues via [series limiter](#cardinality-limiter).

View File

@ -86,6 +86,7 @@ VictoriaMetrics has the following prominent features:
* [Arbitrary CSV data](#how-to-import-csv-data).
* [Native binary format](#how-to-import-data-in-native-format).
* [DataDog agent or DogStatsD](#how-to-send-data-from-datadog-agent).
* It supports powerful [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html), which can be used as a [statsd](https://github.com/statsd/statsd) alternative.
* It supports metrics [relabeling](#relabeling).
* It can deal with [high cardinality issues](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and
[high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues via [series limiter](#cardinality-limiter).

438
docs/stream-aggregation.md Normal file
View File

@ -0,0 +1,438 @@
---
sort: 98
---
# streaming aggregation
[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in streaming mode by time and by labels.
The aggregation is applied to all the metrics received via any [supported data ingestion protocol](https://docs.victoriametrics.com/#how-to-import-time-series-data)
and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter).
The stream aggregation is configured via the following command-line flags:
- `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
This flag can be specified individually per each specified `-remoteWrite.url`.
This allows writing different aggregates to different remote storage destinations.
- `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
These flags must point to a file containing [stream aggregation config](#stream-aggregation-config).
By default only the aggregated data is written to the storage. If the original incoming samples must be written to the storage too,
then the following command-line flags must be specified:
- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
This flag can be specified individually per each specified `-remoteWrite.url`.
This allows writing both raw and aggregate data to different remote storage destinations.
- `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
It expects that the ingested samples have timestamps close to the current time.
## Use cases
Stream aggregation can be used in the following cases:
* [Statsd alternative](#statsd-alternative)
* [Recording rules alternative](#recording-rules-alternative)
* [Reducing the number of stored samples](#reducing-the-number-of-stored-samples)
* [Reducing the number of stored series](#reducing-the-number-of-stored-series)
### Statsd alternative
Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) altnernative in the following cases:
* [Counting input samples](#counting-input-samples)
* [Summing input metrics](#summing-input-metrics)
* [Quantiles over input metrics](#quantiles-over-input-metrics)
* [Histograms over input metrics](#histograms-over-input-metrics)
### Recording rules alternative
Sometimes [alerting queries](https://docs.victoriametrics.com/vmalert.html#alerting-rules) may require non-trivial amounts of CPU, RAM,
disk IO and network bandwith at metrics storage side. For example, if `http_request_duration_seconds` histogram is generated by thousands
of app instances, then the alerting query `histogram_quantile(0.99, sum(increase(http_request_duration_seconds_bucket[5m])) without (instance)) > 0.5`
can become slow, since it needs to scan too big number of unique [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series)
with `http_request_duration_seconds_bucket` name. This alerting query can be sped up by pre-calculating
the `sum(increase(http_request_duration_seconds_bucket[5m])) without (instance)` via [recording rule](https://docs.victoriametrics.com/vmalert.html#recording-rules).
But this recording rule may take too much time to execute too. In this case the slow recording rule can be substituted
with the following [stream aggregation config](#stream-aggregation-config):
```yaml
- match: 'http_request_duration_seconds_bucket'
interval: 5m
without: [instance]
outputs: [total]
```
This stream aggregation generates `http_request_duration_seconds_bucket:5m_without_instance_total` output series according to [output metric naming](#output-metric-names).
Then these series can be used in [alerting rules](https://docs.victoriametrics.com/vmalert.html#alerting-rules):
```metricsql
histogram_quantile(0.99, last_over_time(http_request_duration_seconds_bucket:5m_without_instance_total[5m])) > 0.5
```
This query is executed much faster than the original query, because it needs to scan much lower number of time series.
See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field.
See also [aggregating by labels](#aggregating-by-labels).
### Reducing the number of stored samples
If per-[series](https://docs.victoriametrics.com/keyConcepts.html#time-series) samples are ingested at high frequency,
then this may result in high disk space usage, since too much data must be stored to disk. This also may result
in slow queries, since too much data must be processed during queries.
This can be fixed with the stream aggregation by increasing the interval between per-series samples stored in the database.
For example, the following [stream aggregation config](#stream-aggregation-config) reduces the frequency of input samples
to one sample per 5 minutes per each input time series (this operation is also known as downsampling):
```yaml
# Aggregate metrics ending with _total with `total` output.
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs
- match: '{__name__=~".+_total"}'
interval: 5m
outputs: [total]
# Downsample other metrics with `count_samples`, `sum_samples`, `min` and `max` outputs
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs
- match: '{__name__!~".+_total"}'
interval: 5m
outputs: [count_samples, sum_samples, min, max]
```
The aggregated output metrics have the following names according to [output metric naming](#output-metric-names):
```
# For input metrics ending with _total
some_metric_total:5m_total
# For input metrics not ending with _total
some_metric:5m_count_samples
some_metric:5m_sum_samples
some_metric:5m_min
some_metric:5m_max
```
See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field.
See also [aggregating by labels](#aggregating-by-labels).
### Reducing the number of stored series
Sometimes apps may generate too many [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series).
For example, the `http_requests_total` metric may have `path` or `user` label with too big number of unique values.
In this case the following stream aggregation can be used for reducing the number metrics stored in VictoriaMetrics:
```yaml
- match: 'http_requests_total'
interval: 30s
without: [path, user]
outputs: [total]
```
This config specifies labels, which must be removed from the aggregate outpit, in the `without` list.
See [these docs](#aggregating-by-labels) for more details.
The aggregated output metric has the following name according to [output metric naming](#output-metric-names):
```
http_requests_total:30s_without_path_user_total
```
See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field.
### Counting input samples
If the monitored app generates event-based metrics, then it may be useful to count the number of such metrics
at stream aggregation level.
For example, if an advertising server generates `hits{some="labels"} 1` and `clicks{some="labels"} 1` metrics
per each incoming hit and click, then the following [stream aggregation config](#stream-aggregation-config)
can be used for counting these metrics per every 30 second interval:
```yml
- match: '{__name__=~"hits|clicks"}'
interval: 30s
outputs: [count_samples]
```
This config generates the following output metrics for `hits` and `clicks` input metrics
according to [output metric naming](#output-metric-names):
```
hits:30s_count_samples count1
clicks:30s_count_samples count2
```
See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field.
See also [aggregating by labels](#aggregating-by-labels).
### Summing input metrics
If the monitored app calulates some events and then sends the calculated number of events to VictoriaMetrics
at irregular intervals or at too high frequency, then stream aggregation can be used for summing such events
and writing the aggregate sums to the storage at regular intervals.
For example, if an advertising server generates `hits{some="labels} N` and `clicks{some="labels"} M` metrics
at irregular intervals, then the following [stream aggregation config](#stream-aggregation-config)
can be used for summing these metrics per every minute:
```yml
- match: '{__name__=~"hits|clicks"}'
interval: 1m
outputs: [sum_samples]
```
This config generates the following output metrics according to [output metric naming](#output-metric-names):
```
hits:1m_sum_samples sum1
clicks:1m_sum_samples sum2
```
See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field.
See also [aggregating by labels](#aggregating-by-labels).
### Quantiles over input metrics
If the monitored app generates measurement metrics per each request, then it may be useful to calculate
the pre-defined set of [percentiles](https://en.wikipedia.org/wiki/Percentile) over these measurements.
For example, if the monitored app generates `request_duration_seconds N` and `response_size_bytes M` metrics
per each incoming request, then the following [stream aggregation config](#stream-aggregation-config)
can be used for calculating 50th and 99th percentiles for these metrics every 30 seconds:
```yaml
- match: '{__name__=~"request_duration_seconds|response_size_bytes"}'
interval: 30s
outputs: ["quantiles(0.50, 0.99)"]
```
This config generates the following output metrics according to [output metric naming](#output-metric-names):
```
request_duration_seconds:30s_quantiles{quantile="0.50"} value1
request_duration_seconds:30s_quantiles{quantile="0.99"} value2
response_size_bytes:30s_quantiles{quantile="0.50"} value1
response_size_bytes:30s_quantiles{quantile="0.99"} value2
```
See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field.
See also [histograms over input metrics](#histograms-over-input-metrics) and [aggregating by labels](#aggregating-by-labels).
### Histograms over input metrics
If the monitored app generates measurement metrics per each request, then it may be useful to calculate
a [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram) over these metrics.
For example, if the monitored app generates `request_duration_seconds N` and `response_size_bytes M` metrics
per each incoming request, then the following [stream aggregation config](#stream-aggregation-config)
can be used for calculating [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350)
for these metrics every 60 seconds:
```yaml
- match: '{__name__=~"request_duration_seconds|response_size_bytes"}'
interval: 60s
outputs: [histogram_bucket]
```
This config generates the following output metrics according to [output metric naming](#output-metric-names).
```
request_duration_seconds:60s_histogram_bucket{vmrange="start1...end1"} count1
request_duration_seconds:60s_histogram_bucket{vmrange="start2...end2"} count2
...
request_duration_seconds:60s_histogram_bucket{vmrange="startN...endN"} countN
response_size_bytes:60s_histogram_bucket{vmrange="start1...end1"} count1
response_size_bytes:60s_histogram_bucket{vmrange="start2...end2"} count2
...
response_size_bytes:60s_histogram_bucket{vmrange="startN...endN"} countN
```
The resulting histogram buckets can be queried with [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html) in the following ways:
1. An estimated 50th and 99th [percentiles](https://en.wikipedia.org/wiki/Percentile) of the request duration over the last hour:
```metricsql
histogram_quantiles("quantile", 0.50, 0.99, sum(increase(request_duration_seconds:60s_histogram_bucket[1h])) by (vmrange))
```
This query uses [histogram_quantiles](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantiles) function.
2. An estimated [standard deviation](https://en.wikipedia.org/wiki/Standard_deviation) of the request duration over the last hour:
```metricsql
histogram_stddev(sum(increase(request_duration_seconds:60s_histogram_bucket[1h])) by (vmrange))
```
This query uses [histogram_stddev](https://docs.victoriametrics.com/MetricsQL.html#histogram_stddev) function.
3. An estimated share of requests with the duration smaller than `0.5s` over the last hour:
```metricsql
histogram_share(0.5, sum(increase(request_duration_seconds:60s_histogram_bucket[1h])) by (vmrange))
```
This query uses [histogram_share](https://docs.victoriametrics.com/MetricsQL.html#histogram_share) function.
See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field.
See also [quantiles over input metrics](#quantiles-over-input-metrics) and [aggregating by labels](#aggregating-by-labels).
## Output metric names
Output metric names for stream aggregation are constructed according to the following pattern:
```
<metric_name>:<interval>[_by_<by_labels>][_without_<without_labels>]_<output>
```
- `<metric_name>` is the original metric name.
- `<interval>` is the interval specified in the [stream aggregation config](#stream-aggregation-config).
- `<by_labels>` is `_`-delimited list of `by` labels specified in the [stream aggregation config](#stream-aggregation-config).
If the `by` list is missing in the config, then the `_by_<by_labels>` part isn't included in the output metric name.
- `<without_labels>` is an optional `_`-delimited list of `without` labels specified in the [stream aggregation config](#stream-aggregation-config).
If the `without` list is missing in the config, then the `_without_<without_labels>` part isn't included in the output metric name.
- `<output>` is the aggregate used for constucting the output metric. The aggregate name is taken from the `outputs` list
at the corresponding [stream aggregation config](#stream-aggregation-config).
Both input and ouput metric names can be modified if needed via relabeling according to [these docs](#relabeling).
## Relabeling
It is possible to apply [arbitrary relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) to input and output metrics
during stream aggregation via `input_relabel_configs` and `output_relabel_config` options in [stream aggregation config](#stream-aggregation-config).
For example, the following config removes the `:1m_sum_samples` suffix added [to the output metric name](#output-metric-names):
```yml
- interval: 1m
outputs: [sum_samples]
output_relabel_configs:
- source_labels: [__name__]
target_label: __name__
regex: "(.+):.+"
```
## Aggregation outputs
The following aggregation outputs are supported in the `outputs` list of the [stream aggregation config](#stream-aggregation-config):
* `total` generates output [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) by summing the input counters.
The `total` handler properly handles input counter resets.
The `total` handler returns garbage when something other than [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) is passed to the input.
* `increase` returns the increase of input [counters](https://docs.victoriametrics.com/keyConcepts.html#counter).
The `increase` handler properly handles the input counter resets.
The `increase` handler returns garbage when something other than [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) is passed to the input.
* `count_series` counts the number of unique [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series).
* `count_samples` counts the number of input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
* `sum_samples` sums input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
* `last` returns the last input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
* `min` returns the minimum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
* `max` returns the maximum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
* `avg` returns the average input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
* `stddev` returns [standard deviation](https://en.wikipedia.org/wiki/Standard_deviation) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
* `stdvar` returns [standard variance](https://en.wikipedia.org/wiki/Variance) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
* `histogram_bucket` returns [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350)
for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
* `quantiles(phi1, ..., phiN)` returns [percentiles](https://en.wikipedia.org/wiki/Percentile) for the given `phi*`
over the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
The `phi` must be in the range `[0..1]`, where `0` means `0th` percentile, while `1` means `100th` percentile.
The aggregations are calculated during the `interval` specified in the [config](#stream-aggregation-config)
and then sent to the storage.
If `by` and `without` lists are specified in the [config](#stream-aggregation-config),
then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`.
## Aggregating by labels
All the labels for the input metrics are preserved by default in the output metrics. For example,
the input metric `foo{app="bar",instance="host1"}` results to the output metric `foo:1m_sum_samples{app="bar",instance="host1"}`
when the following [stream aggregation config](#stream-aggregation-config) is used:
```yaml
- interval: 1m
outputs: [sum_samples]
```
The input labels can be removed via `without` list specified in the config. For example, the following config
removes the `instance` label from output metrics by summing input samples across all the instances:
```yaml
- interval: 1m
without: [instance]
outputs: [sum_samples]
```
In this case the `foo{app="bar",instance="..."}` input metrics are transformed into `foo:1m_without_instance_sum_samples{app="bar"}`
output metric.
It is possible specifying the exact list of labels in the output metrics via `by` list.
For example, the following config sums input samples by the `app` label:
```yaml
- interval: 1m
by: [app]
outputs: [sum_samples]
```
In this case the `foo{app="bar",instance="..."}` input metrics are transformed into `foo:1m_by_app_sum_samples{app="bar"}`
output metric.
## Stream aggregation config
Below is the format for stream aggregation config file, which may be referred via `-remoteWrite.streamAggr.config` command-line flag
at [vmagent](https://docs.victoriametrics.com/vmagent.html) or via `-streamAggr.config` command-line flag
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html):
```yaml
# match is an optional filter for incoming samples to aggregate.
# It can contain arbitrary Prometheus series selector
# according to https://docs.victoriametrics.com/keyConcepts.html#filtering .
# If match is missing, then all the incoming samples are aggregated.
- match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}'
# interval is the interval for the aggregation.
# The aggregated stats is sent to remote storage once per interval.
interval: 1m
# without is an optional list of labels, which must be removed from the output aggregation.
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels
without: [instance]
# by is an optioanl list of labels, which must be preserved in the output aggregation.
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels
# by: [job, vmrange]
# outputs is the list of aggregations to perform on the input data.
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs
outputs: [total]
# input_relabel_configs is an optional relabeling rules,
# which are applied to the incoming samples after they pass the match filter
# and before being aggregated.
# See https://docs.victoriametrics.com/stream-aggregation.html#relabeling
input_relabel_configs:
- target_label: vmaggr
replacement: before
# output_relabel_configs is an optional relabeling rules,
# which are applied to the aggregated output metrics.
output_relabel_configs:
- target_label: vmaggr
replacement: after
```
The file can contain multiple aggregation configs. The aggregation is performed independently
per each specified config entry.

View File

@ -28,8 +28,8 @@ additionally to [discovering Prometheus-compatible targets and scraping metrics
see [these docs](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter).
* Can add, remove and modify labels (aka tags) via Prometheus relabeling. Can filter data before sending it to remote storage. See [these docs](#relabeling) for details.
* Can accept data via all the ingestion protocols supported by VictoriaMetrics - see [these docs](#how-to-push-data-to-vmagent).
* Can replicate collected metrics simultaneously to multiple remote storage systems -
see [these docs](#replication-and-high-availability).
* Can aggregate incoming samples by time and by labels before sending them to remote storage - see [these docs](https://docs.victoriametrics.com/stream-aggregation.html).
* Can replicate collected metrics simultaneously to multiple remote storage systems - see [these docs](#replication-and-high-availability).
* Works smoothly in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics
are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as the connection
to the remote storage is repaired. The maximum disk usage for the buffer can be limited with `-remoteWrite.maxDiskUsagePerURL`.
@ -130,6 +130,12 @@ If you use Prometheus only for scraping metrics from various targets and forward
then `vmagent` can replace Prometheus. Typically, `vmagent` requires lower amounts of RAM, CPU and network bandwidth compared with Prometheus.
See [these docs](#how-to-collect-metrics-in-prometheus-format) for details.
### Statsd alternative
`vmagent` can be used as an alternative to [statsd](https://github.com/statsd/statsd)
when [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) is enabled.
See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#statsd-alternative) for details.
### Flexible metrics relay
`vmagent` can accept metrics in [various popular data ingestion protocols](#how-to-push-data-to-vmagent), apply [relabeling](#relabeling)

View File

@ -73,16 +73,17 @@ Then configure `vmalert` accordingly:
-external.label=replica=a # Multiple external labels may be set
```
Note there's a separate `remoteWrite.url` to allow writing results of
Note there's a separate `-remoteWrite.url` command-line flag to allow writing results of
alerting/recording rules into a different storage than the initial data that's
queried. This allows using `vmalert` to aggregate data from a short-term,
high-frequency, high-cardinality storage into a long-term storage with
decreased cardinality and a bigger interval between samples.
See also [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html).
See the full list of configuration flags in [configuration](#configuration) section.
If you run multiple `vmalert` services for the same datastore or AlertManager - do not forget
to specify different `external.label` flags in order to define which `vmalert` generated rules or alerts.
to specify different `-external.label` command-line flags in order to define which `vmalert` generated rules or alerts.
Configuration for [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/)
and [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) rules is very
@ -518,8 +519,8 @@ groups:
expr: avg_over_time(http_requests[5m])
```
Ability of `vmalert` to be configured with different `datasource.url` and `remoteWrite.url` allows
reading data from one data source and backfilling results to another. This helps to build a system
Ability of `vmalert` to be configured with different `-datasource.url` and `-remoteWrite.url` command-line flags
allows reading data from one data source and backfilling results to another. This helps to build a system
for aggregating and downsampling the data.
The following example shows how to build a topology where `vmalert` will process data from one cluster
@ -543,7 +544,7 @@ Please note, [replay](#rules-backfilling) feature may be used for transforming h
Flags `-remoteRead.url` and `-notifier.url` are omitted since we assume only recording rules are used.
See also [downsampling docs](https://docs.victoriametrics.com/#downsampling).
See also [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) and [downsampling](https://docs.victoriametrics.com/#downsampling).
#### Multiple remote writes

74
lib/streamaggr/avg.go Normal file
View File

@ -0,0 +1,74 @@
package streamaggr
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// avgAggrState calculates output=avg, e.g. the average value over input samples.
type avgAggrState struct {
m sync.Map
}
type avgStateValue struct {
mu sync.Mutex
sum float64
count int64
deleted bool
}
func newAvgAggrState() *avgAggrState {
return &avgAggrState{}
}
func (as *avgAggrState) pushSample(inputKey, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &avgStateValue{
sum: value,
count: 1,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The entry has been successfully stored
return
}
// Update the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*avgStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += value
sv.count++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *avgAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*avgStateValue)
sv.mu.Lock()
avg := sv.sum / float64(sv.count)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "avg", currentTimeMsec, avg)
return true
})
}

View File

@ -0,0 +1,71 @@
package streamaggr
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSamplesAggrState calculates output=countSamples, e.g. the count of input samples.
type countSamplesAggrState struct {
m sync.Map
}
type countSamplesStateValue struct {
mu sync.Mutex
n uint64
deleted bool
}
func newCountSamplesAggrState() *countSamplesAggrState {
return &countSamplesAggrState{}
}
func (as *countSamplesAggrState) pushSample(inputKey, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSamplesStateValue{
n: 1,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.n++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *countSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
n := sv.n
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n))
return true
})
}

View File

@ -0,0 +1,78 @@
package streamaggr
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSeriesAggrState calculates output=count_series, e.g. the number of unique series.
type countSeriesAggrState struct {
m sync.Map
}
type countSeriesStateValue struct {
mu sync.Mutex
countedSeries map[string]struct{}
n uint64
deleted bool
}
func newCountSeriesAggrState() *countSeriesAggrState {
return &countSeriesAggrState{}
}
func (as *countSeriesAggrState) pushSample(inputKey, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSeriesStateValue{
countedSeries: map[string]struct{}{
inputKey: {},
},
n: 1,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The entry has been added to the map.
return
}
// Update the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if _, ok := sv.countedSeries[inputKey]; !ok {
sv.countedSeries[inputKey] = struct{}{}
sv.n++
}
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
n := sv.n
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n))
return true
})
}

View File

@ -0,0 +1,102 @@
package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/metrics"
)
// histogramBucketAggrState calculates output=histogramBucket, e.g. VictoriaMetrics histogram over input samples.
type histogramBucketAggrState struct {
m sync.Map
ignoreInputDeadline uint64
intervalSecs uint64
}
type histogramBucketStateValue struct {
mu sync.Mutex
h metrics.Histogram
deleteDeadline uint64
deleted bool
}
func newHistogramBucketAggrState(interval time.Duration) *histogramBucketAggrState {
intervalSecs := uint64(interval.Seconds() + 1)
return &histogramBucketAggrState{
intervalSecs: intervalSecs,
}
}
func (as *histogramBucketAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + 2*as.intervalSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &histogramBucketStateValue{}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(value)
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *histogramBucketAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
if !sv.deleted {
key := k.(string)
sv.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", currentTimeMsec, float64(count), "vmrange", vmrange)
})
}
sv.mu.Unlock()
return true
})
}

129
lib/streamaggr/increase.go Normal file
View File

@ -0,0 +1,129 @@
package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// increaseAggrState calculates output=increase, e.g. the increase over input counters.
type increaseAggrState struct {
m sync.Map
ignoreInputDeadline uint64
intervalSecs uint64
}
type increaseStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
increase float64
deleteDeadline uint64
deleted bool
}
func newIncreaseAggrState(interval time.Duration) *increaseAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := uint64(interval.Seconds() + 1)
return &increaseAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
intervalSecs: intervalSecs,
}
}
func (as *increaseAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + 2*as.intervalSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &increaseStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*increaseStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
sv.lastValues[inputKey] = lv
}
d := value
if ok && lv.value <= value {
d = value - lv.value
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total += d
}
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *increaseAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*increaseStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *increaseAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*increaseStateValue)
sv.mu.Lock()
increase := sv.total
sv.total = 0
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, "increase", currentTimeMsec, increase)
}
return true
})
}

71
lib/streamaggr/last.go Normal file
View File

@ -0,0 +1,71 @@
package streamaggr
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// lastAggrState calculates output=last, e.g. the last value over input samples.
type lastAggrState struct {
m sync.Map
}
type lastStateValue struct {
mu sync.Mutex
last float64
deleted bool
}
func newLastAggrState() *lastAggrState {
return &lastAggrState{}
}
func (as *lastAggrState) pushSample(inputKey, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &lastStateValue{
last: value,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*lastStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.last = value
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *lastAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*lastStateValue)
sv.mu.Lock()
last := sv.last
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "last", currentTimeMsec, last)
return true
})
}

73
lib/streamaggr/max.go Normal file
View File

@ -0,0 +1,73 @@
package streamaggr
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// maxAggrState calculates output=max, e.g. the maximum value over input samples.
type maxAggrState struct {
m sync.Map
}
type maxStateValue struct {
mu sync.Mutex
max float64
deleted bool
}
func newMaxAggrState() *maxAggrState {
return &maxAggrState{}
}
func (as *maxAggrState) pushSample(inputKey, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &maxStateValue{
max: value,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*maxStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if value > sv.max {
sv.max = value
}
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*maxStateValue)
sv.mu.Lock()
max := sv.max
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "max", currentTimeMsec, max)
return true
})
}

73
lib/streamaggr/min.go Normal file
View File

@ -0,0 +1,73 @@
package streamaggr
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// minAggrState calculates output=min, e.g. the minimum value over input samples.
type minAggrState struct {
m sync.Map
}
type minStateValue struct {
mu sync.Mutex
min float64
deleted bool
}
func newMinAggrState() *minAggrState {
return &minAggrState{}
}
func (as *minAggrState) pushSample(inputKey, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &minStateValue{
min: value,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*minStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if value < sv.min {
sv.min = value
}
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*minStateValue)
sv.mu.Lock()
min := sv.min
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "min", currentTimeMsec, min)
return true
})
}

View File

@ -0,0 +1,87 @@
package streamaggr
import (
"strconv"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/valyala/histogram"
)
// quantilesAggrState calculates output=quantiles, e.g. the the given quantiles over the input samples.
type quantilesAggrState struct {
m sync.Map
phis []float64
}
type quantilesStateValue struct {
mu sync.Mutex
h *histogram.Fast
deleted bool
}
func newQuantilesAggrState(phis []float64) *quantilesAggrState {
return &quantilesAggrState{
phis: phis,
}
}
func (as *quantilesAggrState) pushSample(inputKey, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
h := histogram.GetFast()
v = &quantilesStateValue{
h: h,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
histogram.PutFast(h)
v = vNew
}
}
sv := v.(*quantilesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(value)
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *quantilesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
phis := as.phis
var quantiles []float64
var b []byte
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*quantilesStateValue)
sv.mu.Lock()
quantiles = sv.h.Quantiles(quantiles[:0], phis)
histogram.PutFast(sv.h)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
for i, quantile := range quantiles {
b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64)
phiStr := bytesutil.InternBytes(b)
ctx.appendSeriesWithExtraLabel(key, "quantiles", currentTimeMsec, quantile, "quantile", phiStr)
}
return true
})
}

74
lib/streamaggr/stddev.go Normal file
View File

@ -0,0 +1,74 @@
package streamaggr
import (
"math"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stddevAggrState calculates output=stddev, e.g. the average value over input samples.
type stddevAggrState struct {
m sync.Map
}
type stddevStateValue struct {
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
}
func newStddevAggrState() *stddevAggrState {
return &stddevAggrState{}
}
func (as *stddevAggrState) pushSample(inputKey, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &stddevStateValue{}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*stddevStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
sv.count++
avg := sv.avg + (value-sv.avg)/sv.count
sv.q += (value - sv.avg) * (value - avg)
sv.avg = avg
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *stddevAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*stddevStateValue)
sv.mu.Lock()
stddev := math.Sqrt(sv.q / sv.count)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stddev", currentTimeMsec, stddev)
return true
})
}

73
lib/streamaggr/stdvar.go Normal file
View File

@ -0,0 +1,73 @@
package streamaggr
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stdvarAggrState calculates output=stdvar, e.g. the average value over input samples.
type stdvarAggrState struct {
m sync.Map
}
type stdvarStateValue struct {
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
}
func newStdvarAggrState() *stdvarAggrState {
return &stdvarAggrState{}
}
func (as *stdvarAggrState) pushSample(inputKey, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &stdvarStateValue{}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*stdvarStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
sv.count++
avg := sv.avg + (value-sv.avg)/sv.count
sv.q += (value - sv.avg) * (value - avg)
sv.avg = avg
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *stdvarAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*stdvarStateValue)
sv.mu.Lock()
stdvar := sv.q / sv.count
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar)
return true
})
}

View File

@ -0,0 +1,641 @@
package streamaggr
import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"gopkg.in/yaml.v2"
)
var supportedOutputs = []string{
"total",
"increase",
"count_series",
"count_samples",
"sum_samples",
"last",
"min",
"max",
"avg",
"stddev",
"stdvar",
"histogram_bucket",
"quantiles(phi1, ..., phiN)",
}
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) {
data, err := fs.ReadFileOrHTTP(path)
if err != nil {
return nil, fmt.Errorf("cannot load aggregators: %w", err)
}
as, err := NewAggregatorsFromData(data, pushFunc)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
}
return as, nil
}
// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func NewAggregatorsFromData(data []byte, pushFunc PushFunc) (*Aggregators, error) {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, err
}
return NewAggregators(cfgs, pushFunc)
}
// Config is a configuration for a single stream aggregation.
type Config struct {
// Match is a label selector for filtering time series for the given selector.
//
// If the match isn't set, then all the input time series are processed.
Match *promrelabel.IfExpression `yaml:"match,omitempty"`
// Interval is the interval between aggregations.
Interval string `yaml:"interval"`
// Outputs is a list of output aggregate functions to produce.
//
// The following names are allowed:
//
// - total - aggregates input counters
// - increase - counts the increase over input counters
// - count_series - counts the input series
// - count_samples - counts the input samples
// - sum_samples - sums the input samples
// - last - the last biggest sample value
// - min - the minimum sample value
// - max - the maximum sample value
// - avg - the average value across all the samples
// - stddev - standard deviation across all the samples
// - stdvar - standard variance across all the samples
// - histogram_bucket - creates VictoriaMetrics histogram for input samples
// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]
//
// The output time series will have the following names:
//
// input_name:aggr_<interval>_<output>
//
Outputs []string `yaml:"outputs"`
// By is an optional list of labels for grouping input series.
//
// See also Without.
//
// If neither By nor Without are set, then the Outputs are calculated
// individually per each input time series.
By []string `yaml:"by,omitempty"`
// Without is an optional list of labels, which must be excluded when grouping input series.
//
// See also By.
//
// If neither By nor Without are set, then the Outputs are calculated
// individually per each input time series.
Without []string `yaml:"without,omitempty"`
// InputRelabelConfigs is an optional relabeling rules, which are applied on the input
// before aggregation.
InputRelabelConfigs []promrelabel.RelabelConfig `yaml:"input_relabel_configs,omitempty"`
// OutputRelabelConfigs is an optional relabeling rules, which are applied
// on the aggregated output before being sent to remote storage.
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
}
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
type Aggregators struct {
as []*aggregator
}
// NewAggregators creates Aggregators from the given cfgs.
//
// pushFunc is called when the aggregated data must be flushed.
//
// MustStop must be called on the returned Aggregators when they are no longer needed.
func NewAggregators(cfgs []*Config, pushFunc PushFunc) (*Aggregators, error) {
if len(cfgs) == 0 {
return nil, nil
}
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
a, err := newAggregator(cfg, pushFunc)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
}
as[i] = a
}
return &Aggregators{
as: as,
}, nil
}
// MustStop stops a.
func (a *Aggregators) MustStop() {
if a == nil {
return
}
for _, aggr := range a.as {
aggr.MustStop()
}
}
// Push pushes tss to a.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
if a == nil {
return
}
for _, aggr := range a.as {
aggr.Push(tss)
}
}
// aggregator aggregates input series according to the config passed to NewAggregator
type aggregator struct {
match *promrelabel.IfExpression
inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs
by []string
without []string
aggregateOnlyByTime bool
// aggrStates contains aggregate states for the given outputs
aggrStates []aggrState
pushFunc PushFunc
// suffix contains a suffix, which should be added to aggregate metric names
//
// It contains the interval, lables in (by, without), plus output name.
// For example, foo_bar metric name is transformed to foo_bar:1m_by_job
// for `interval: 1m`, `by: [job]`
suffix string
wg sync.WaitGroup
stopCh chan struct{}
}
type aggrState interface {
pushSample(inputKey, outputKey string, value float64)
appendSeriesForFlush(ctx *flushCtx)
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
type PushFunc func(tss []prompbmarshal.TimeSeries)
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
//
// The returned aggregator must be stopped when no longer needed by calling MustStop().
func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
// check cfg.Interval
interval, err := time.ParseDuration(cfg.Interval)
if err != nil {
return nil, fmt.Errorf("cannot parse `interval: %q`: %w", cfg.Interval, err)
}
if interval <= time.Second {
return nil, fmt.Errorf("the minimum supported aggregation interval is 1s; got %s", interval)
}
// initialize input_relabel_configs and output_relabel_configs
inputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.InputRelabelConfigs)
if err != nil {
return nil, fmt.Errorf("cannot parse input_relabel_configs: %w", err)
}
outputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.OutputRelabelConfigs)
if err != nil {
return nil, fmt.Errorf("cannot parse output_relabel_configs: %w", err)
}
// check by and without lists
by := cfg.By
without := cfg.Without
if len(by) > 0 && len(without) > 0 {
return nil, fmt.Errorf("`by: %s` and `without: %s` lists cannot be set simultaneously", by, without)
}
aggregateOnlyByTime := (len(by) == 0 && len(without) == 0)
if !aggregateOnlyByTime && len(without) == 0 {
by = addMissingUnderscoreName(by)
}
// initialize outputs list
if len(cfg.Outputs) == 0 {
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", supportedOutputs)
}
aggrStates := make([]aggrState, len(cfg.Outputs))
for i, output := range cfg.Outputs {
if strings.HasPrefix(output, "quantiles(") {
if !strings.HasSuffix(output, ")") {
return nil, fmt.Errorf("missing closing brace for `quantiles()` output")
}
argsStr := output[len("quantiles(") : len(output)-1]
if len(argsStr) == 0 {
return nil, fmt.Errorf("`quantiles()` must contain at least one phi")
}
args := strings.Split(argsStr, ",")
phis := make([]float64, len(args))
for j, arg := range args {
arg = strings.TrimSpace(arg)
phi, err := strconv.ParseFloat(arg, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err)
}
if phi < 0 || phi > 1 {
return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi)
}
phis[j] = phi
}
aggrStates[i] = newQuantilesAggrState(phis)
continue
}
switch output {
case "total":
aggrStates[i] = newTotalAggrState(interval)
case "increase":
aggrStates[i] = newIncreaseAggrState(interval)
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
case "count_samples":
aggrStates[i] = newCountSamplesAggrState()
case "sum_samples":
aggrStates[i] = newSumSamplesAggrState()
case "last":
aggrStates[i] = newLastAggrState()
case "min":
aggrStates[i] = newMinAggrState()
case "max":
aggrStates[i] = newMaxAggrState()
case "avg":
aggrStates[i] = newAvgAggrState()
case "stddev":
aggrStates[i] = newStddevAggrState()
case "stdvar":
aggrStates[i] = newStdvarAggrState()
case "histogram_bucket":
aggrStates[i] = newHistogramBucketAggrState(interval)
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
"see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", output, supportedOutputs)
}
}
// initialize suffix to add to metric names after aggregation
suffix := ":" + cfg.Interval
if labels := removeUnderscoreName(by); len(labels) > 0 {
suffix += fmt.Sprintf("_by_%s", strings.Join(labels, "_"))
}
if labels := removeUnderscoreName(without); len(labels) > 0 {
suffix += fmt.Sprintf("_without_%s", strings.Join(labels, "_"))
}
suffix += "_"
// initialize the aggregator
a := &aggregator{
match: cfg.Match,
inputRelabeling: inputRelabeling,
outputRelabeling: outputRelabeling,
by: by,
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
aggrStates: aggrStates,
pushFunc: pushFunc,
suffix: suffix,
stopCh: make(chan struct{}),
}
a.wg.Add(1)
go func() {
a.runFlusher(interval)
defer a.wg.Done()
}()
return a, nil
}
func (a *aggregator) runFlusher(interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-a.stopCh:
return
case <-t.C:
}
a.flush()
}
}
func (a *aggregator) flush() {
ctx := &flushCtx{
suffix: a.suffix,
}
for _, as := range a.aggrStates {
ctx.reset()
as.appendSeriesForFlush(ctx)
tss := ctx.tss
// Apply output relabeling
if a.outputRelabeling != nil {
dst := tss[:0]
for _, ts := range tss {
ts.Labels = a.outputRelabeling.Apply(ts.Labels, 0)
if len(ts.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
dst = append(dst, ts)
}
tss = dst
}
// Push the output metrics
a.pushFunc(tss)
}
}
// MustStop stops the aggregator.
//
// The aggregator stops pushing the aggregated metrics after this call.
func (a *aggregator) MustStop() {
close(a.stopCh)
a.wg.Wait()
}
// Push pushes series to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
labels := promutils.GetLabels()
tmpLabels := promutils.GetLabels()
bb := bbPool.Get()
for _, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
}
labels.Labels = append(labels.Labels[:0], ts.Labels...)
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
if len(labels.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
labels.Sort()
if a.aggregateOnlyByTime {
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
} else {
tmpLabels.Labels = removeUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without)
bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels)
}
outputKey := bytesutil.InternBytes(bb.B)
inputKey := ""
if !a.aggregateOnlyByTime {
tmpLabels.Labels = extractUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without)
bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels)
inputKey = bytesutil.InternBytes(bb.B)
}
for _, sample := range ts.Samples {
a.pushSample(inputKey, outputKey, sample.Value)
}
}
bbPool.Put(bb)
promutils.PutLabels(tmpLabels)
promutils.PutLabels(labels)
}
var bbPool bytesutil.ByteBufferPool
func (a *aggregator) pushSample(inputKey, outputKey string, value float64) {
if math.IsNaN(value) {
// Skip nan samples
return
}
for _, as := range a.aggrStates {
as.pushSample(inputKey, outputKey, value)
}
}
func extractUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label {
if len(without) > 0 {
for _, label := range labels {
if hasInArray(label.Name, without) {
dst = append(dst, label)
}
}
} else {
for _, label := range labels {
if !hasInArray(label.Name, by) {
dst = append(dst, label)
}
}
}
return dst
}
func removeUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label {
if len(without) > 0 {
for _, label := range labels {
if !hasInArray(label.Name, without) {
dst = append(dst, label)
}
}
} else {
for _, label := range labels {
if hasInArray(label.Name, by) {
dst = append(dst, label)
}
}
}
return dst
}
func hasInArray(name string, a []string) bool {
for _, s := range a {
if name == s {
return true
}
}
return false
}
func marshalLabelsFast(dst []byte, labels []prompbmarshal.Label) []byte {
dst = encoding.MarshalUint32(dst, uint32(len(labels)))
for _, label := range labels {
dst = encoding.MarshalUint32(dst, uint32(len(label.Name)))
dst = append(dst, label.Name...)
dst = encoding.MarshalUint32(dst, uint32(len(label.Value)))
dst = append(dst, label.Value...)
}
return dst
}
func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal.Label, error) {
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal labels count from %d bytes; needs at least 4 bytes", len(src))
}
n := encoding.UnmarshalUint32(src)
src = src[4:]
for i := uint32(0); i < n; i++ {
// Unmarshal label name
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal label name length from %d bytes; needs at least 4 bytes", len(src))
}
labelNameLen := encoding.UnmarshalUint32(src)
src = src[4:]
if uint32(len(src)) < labelNameLen {
return dst, fmt.Errorf("cannot unmarshal label name from %d bytes; needs at least %d bytes", len(src), labelNameLen)
}
labelName := bytesutil.InternBytes(src[:labelNameLen])
src = src[labelNameLen:]
// Unmarshal label value
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal label value length from %d bytes; needs at least 4 bytes", len(src))
}
labelValueLen := encoding.UnmarshalUint32(src)
src = src[4:]
if uint32(len(src)) < labelValueLen {
return dst, fmt.Errorf("cannot unmarshal label value from %d bytes; needs at least %d bytes", len(src), labelValueLen)
}
labelValue := bytesutil.InternBytes(src[:labelValueLen])
src = src[labelValueLen:]
dst = append(dst, prompbmarshal.Label{
Name: labelName,
Value: labelValue,
})
}
if len(src) > 0 {
return dst, fmt.Errorf("unexpected non-empty tail after unmarshaling labels; tail length is %d bytes", len(src))
}
return dst, nil
}
type flushCtx struct {
suffix string
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
}
func (ctx *flushCtx) reset() {
ctx.tss = prompbmarshal.ResetTimeSeries(ctx.tss)
promrelabel.CleanLabels(ctx.labels)
ctx.labels = ctx.labels[:0]
ctx.samples = ctx.samples[:0]
}
func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int64, value float64) {
var err error
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled))
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
}
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Value: value,
})
ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
}
func (ctx *flushCtx) appendSeriesWithExtraLabel(labelsMarshaled, suffix string, timestamp int64, value float64, extraName, extraValue string) {
var err error
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled))
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
}
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
ctx.labels = append(ctx.labels, prompbmarshal.Label{
Name: extraName,
Value: extraValue,
})
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Value: value,
})
ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
}
func addMetricSuffix(labels []prompbmarshal.Label, offset int, firstSuffix, lastSuffix string) []prompbmarshal.Label {
src := labels[offset:]
for i := range src {
label := &src[i]
if label.Name != "__name__" {
continue
}
bb := bbPool.Get()
bb.B = append(bb.B, label.Value...)
bb.B = append(bb.B, firstSuffix...)
bb.B = append(bb.B, lastSuffix...)
label.Value = bytesutil.InternBytes(bb.B)
bbPool.Put(bb)
return labels
}
// The __name__ isn't found. Add it
bb := bbPool.Get()
bb.B = append(bb.B, firstSuffix...)
bb.B = append(bb.B, lastSuffix...)
labelValue := bytesutil.InternBytes(bb.B)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: labelValue,
})
return labels
}
func addMissingUnderscoreName(labels []string) []string {
result := []string{"__name__"}
for _, s := range labels {
if s == "__name__" {
continue
}
result = append(result, s)
}
return result
}
func removeUnderscoreName(labels []string) []string {
var result []string
for _, s := range labels {
if s == "__name__" {
continue
}
result = append(result, s)
}
return result
}

View File

@ -0,0 +1,662 @@
package streamaggr
import (
"fmt"
"sort"
"strings"
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
)
func TestAggregatorsFailure(t *testing.T) {
f := func(config string) {
t.Helper()
pushFunc := func(tss []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("pushFunc shouldn't be called"))
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc)
if err == nil {
t.Fatalf("expecting non-nil error")
}
if a != nil {
t.Fatalf("expecting nil a")
}
}
// Invalid config
f(`foobar`)
// Unknown option
f(`
- interval: 1m
outputs: [total]
foobar: baz
`)
// missing interval
f(`
- outputs: [total]
`)
// missing outputs
f(`
- interval: 1m
`)
// Invalid output
f(`
- interval: 1m
outputs: [foobar]
`)
// Negative interval
f(`- interval: -5m`)
// Too small interval
f(`- interval: 10ms`)
// Invalid input_relabel_configs
f(`
- interval: 1m
outputs: [total]
input_relabel_configs:
- foo: bar
`)
f(`
- interval: 1m
outputs: [total]
input_relabel_configs:
- action: replace
`)
// Invalid output_relabel_configs
f(`
- interval: 1m
outputs: [total]
output_relabel_configs:
- foo: bar
`)
f(`
- interval: 1m
outputs: [total]
output_relabel_configs:
- action: replace
`)
// Both by and without are non-empty
f(`
- interval: 1m
outputs: [total]
by: [foo]
without: [bar]
`)
// Invalid quantiles()
f(`
- interval: 1m
outputs: ["quantiles("]
`)
f(`
- interval: 1m
outputs: ["quantiles()"]
`)
f(`
- interval: 1m
outputs: ["quantiles(foo)"]
`)
f(`
- interval: 1m
outputs: ["quantiles(-0.5)"]
`)
f(`
- interval: 1m
outputs: ["quantiles(1.5)"]
`)
}
func TestAggregatorsSuccess(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected string) {
t.Helper()
// Initialize Aggregators
var tssOutput []prompbmarshal.TimeSeries
var tssOutputLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssOutputLock.Lock()
for _, ts := range tss {
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
Labels: labelsCopy,
Samples: samplesCopy,
})
}
tssOutputLock.Unlock()
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput)
if a != nil {
for _, aggr := range a.as {
aggr.flush()
}
}
a.MustStop()
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
tsStrings[i] = timeSeriesToString(ts)
}
sort.Strings(tsStrings)
outputMetrics := strings.Join(tsStrings, "")
if outputMetrics != outputMetricsExpected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
}
}
// Empty config
f(``, ``, ``)
f(``, `foo{bar="baz"} 1`, ``)
f(``, "foo 1\nbaz 2", ``)
// Empty by list - aggregate only by time
f(`
- interval: 1m
outputs: [count_samples, sum_samples, count_series, last]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_count_samples 1
bar:1m_count_series 1
bar:1m_last 5
bar:1m_sum_samples 5
foo:1m_count_samples{abc="123"} 2
foo:1m_count_samples{abc="456",de="fg"} 1
foo:1m_count_series{abc="123"} 1
foo:1m_count_series{abc="456",de="fg"} 1
foo:1m_last{abc="123"} 8.5
foo:1m_last{abc="456",de="fg"} 8
foo:1m_sum_samples{abc="123"} 12.5
foo:1m_sum_samples{abc="456",de="fg"} 8
`)
// Special case: __name__ in by list
f(`
- interval: 1m
by: [__name__]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_count_samples 1
bar:1m_count_series 1
bar:1m_sum_samples 5
foo:1m_count_samples 3
foo:1m_count_series 2
foo:1m_sum_samples 20.5
`)
// Non-empty by list with non-existing labels
f(`
- interval: 1m
by: [foo, bar]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_by_foo_bar_count_samples 1
bar:1m_by_foo_bar_count_series 1
bar:1m_by_foo_bar_sum_samples 5
foo:1m_by_foo_bar_count_samples 3
foo:1m_by_foo_bar_count_series 2
foo:1m_by_foo_bar_sum_samples 20.5
`)
// Non-empty by list with existing label
f(`
- interval: 1m
by: [abc]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_by_abc_count_samples 1
bar:1m_by_abc_count_series 1
bar:1m_by_abc_sum_samples 5
foo:1m_by_abc_count_samples{abc="123"} 2
foo:1m_by_abc_count_samples{abc="456"} 1
foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`)
// Non-empty without list with non-existing labels
f(`
- interval: 1m
without: [foo]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_without_foo_count_samples 1
bar:1m_without_foo_count_series 1
bar:1m_without_foo_sum_samples 5
foo:1m_without_foo_count_samples{abc="123"} 2
foo:1m_without_foo_count_samples{abc="456",de="fg"} 1
foo:1m_without_foo_count_series{abc="123"} 1
foo:1m_without_foo_count_series{abc="456",de="fg"} 1
foo:1m_without_foo_sum_samples{abc="123"} 12.5
foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8
`)
// Non-empty without list with existing labels
f(`
- interval: 1m
without: [abc]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_without_abc_count_samples 1
bar:1m_without_abc_count_series 1
bar:1m_without_abc_sum_samples 5
foo:1m_without_abc_count_samples 2
foo:1m_without_abc_count_samples{de="fg"} 1
foo:1m_without_abc_count_series 1
foo:1m_without_abc_count_series{de="fg"} 1
foo:1m_without_abc_sum_samples 12.5
foo:1m_without_abc_sum_samples{de="fg"} 8
`)
// Special case: __name__ in without list
f(`
- interval: 1m
without: [__name__]
outputs: [count_samples, sum_samples, count_series]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `:1m_count_samples 1
:1m_count_samples{abc="123"} 2
:1m_count_samples{abc="456",de="fg"} 1
:1m_count_series 1
:1m_count_series{abc="123"} 1
:1m_count_series{abc="456",de="fg"} 1
:1m_sum_samples 5
:1m_sum_samples{abc="123"} 12.5
:1m_sum_samples{abc="456",de="fg"} 8
`)
// drop some input metrics
f(`
- interval: 1m
without: [abc]
outputs: [count_samples, sum_samples, count_series]
input_relabel_configs:
- if: 'foo'
action: drop
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_without_abc_count_samples 1
bar:1m_without_abc_count_series 1
bar:1m_without_abc_sum_samples 5
`)
// rename output metrics
f(`
- interval: 1m
without: [abc]
outputs: [count_samples, sum_samples, count_series]
output_relabel_configs:
- action: replace_all
source_labels: [__name__]
regex: ":|_"
replacement: "-"
target_label: __name__
- action: drop
source_labels: [de]
regex: fg
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar-1m-without-abc-count-samples 1
bar-1m-without-abc-count-series 1
bar-1m-without-abc-sum-samples 5
foo-1m-without-abc-count-samples 2
foo-1m-without-abc-count-series 1
foo-1m-without-abc-sum-samples 12.5
`)
// match doesn't match anything
f(`
- interval: 1m
without: [abc]
outputs: [count_samples, sum_samples, count_series]
match: '{non_existing_label!=""}'
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, ``)
// match matches foo series with non-empty abc label
f(`
- interval: 1m
by: [abc]
outputs: [count_samples, sum_samples, count_series]
match: 'foo{abc=~".+"}'
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `foo:1m_by_abc_count_samples{abc="123"} 2
foo:1m_by_abc_count_samples{abc="456"} 1
foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`)
// total output for non-repeated series
f(`
- interval: 1m
outputs: [total]
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 0
foo:1m_total 0
`)
// total output for repeated series
f(`
- interval: 1m
outputs: [total]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total{baz="qwe"} 5.02
bar:1m_total{baz="qwer"} 1
foo:1m_total 0
foo:1m_total{baz="qwe"} 15
`)
// total output for repeated series with group by __name__
f(`
- interval: 1m
by: [__name__]
outputs: [total]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total 6.02
foo:1m_total 15
`)
// increase output for non-repeated series
f(`
- interval: 1m
outputs: [increase]
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_increase{baz="qwe"} 0
foo:1m_increase 0
`)
// increase output for repeated series
f(`
- interval: 1m
outputs: [increase]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_increase{baz="qwe"} 5.02
bar:1m_increase{baz="qwer"} 1
foo:1m_increase 0
foo:1m_increase{baz="qwe"} 15
`)
// multiple aggregate configs
f(`
- interval: 1m
outputs: [count_series, sum_samples]
- interval: 5m
by: [bar]
outputs: [sum_samples]
`, `
foo 1
foo{bar="baz"} 2
foo 3.3
`, `foo:1m_count_series 1
foo:1m_count_series{bar="baz"} 1
foo:1m_sum_samples 4.3
foo:1m_sum_samples{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:5m_by_bar_sum_samples{bar="baz"} 2
`)
// min and max outputs
f(`
- interval: 1m
outputs: [min, max]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_max 5
bar:1m_min 5
foo:1m_max{abc="123"} 8.5
foo:1m_max{abc="456",de="fg"} 8
foo:1m_min{abc="123"} 4
foo:1m_min{abc="456",de="fg"} 8
`)
// avg output
f(`
- interval: 1m
outputs: [avg]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_avg 5
foo:1m_avg{abc="123"} 6.25
foo:1m_avg{abc="456",de="fg"} 8
`)
// stddev output
f(`
- interval: 1m
outputs: [stddev]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_stddev 0
foo:1m_stddev{abc="123"} 2.25
foo:1m_stddev{abc="456",de="fg"} 0
`)
// stdvar output
f(`
- interval: 1m
outputs: [stdvar]
`, `
foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_stdvar 0
foo:1m_stdvar{abc="123"} 5.0625
foo:1m_stdvar{abc="456",de="fg"} 0
`)
// histogram_bucket output
f(`
- interval: 1m
outputs: [histogram_bucket]
`, `
cpu_usage{cpu="1"} 12.5
cpu_usage{cpu="1"} 13.3
cpu_usage{cpu="1"} 13
cpu_usage{cpu="1"} 12
cpu_usage{cpu="1"} 14
cpu_usage{cpu="1"} 25
cpu_usage{cpu="2"} 90
`, `cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.136e+01...1.292e+01"} 2
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1
cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1
`)
// histogram_bucket output without cpu
f(`
- interval: 1m
without: [cpu]
outputs: [histogram_bucket]
`, `
cpu_usage{cpu="1"} 12.5
cpu_usage{cpu="1"} 13.3
cpu_usage{cpu="1"} 13
cpu_usage{cpu="1"} 12
cpu_usage{cpu="1"} 14
cpu_usage{cpu="1"} 25
cpu_usage{cpu="2"} 90
`, `cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.136e+01...1.292e+01"} 2
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1
`)
// quantiles output
f(`
- interval: 1m
outputs: ["quantiles(0, 0.5, 1)"]
`, `
cpu_usage{cpu="1"} 12.5
cpu_usage{cpu="1"} 13.3
cpu_usage{cpu="1"} 13
cpu_usage{cpu="1"} 12
cpu_usage{cpu="1"} 14
cpu_usage{cpu="1"} 25
cpu_usage{cpu="2"} 90
`, `cpu_usage:1m_quantiles{cpu="1",quantile="0"} 12
cpu_usage:1m_quantiles{cpu="1",quantile="0.5"} 13.3
cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25
cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90
cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90
cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90
`)
// quantiles output without cpu
f(`
- interval: 1m
without: [cpu]
outputs: ["quantiles(0, 0.5, 1)"]
`, `
cpu_usage{cpu="1"} 12.5
cpu_usage{cpu="1"} 13.3
cpu_usage{cpu="1"} 13
cpu_usage{cpu="1"} 12
cpu_usage{cpu="1"} 14
cpu_usage{cpu="1"} 25
cpu_usage{cpu="2"} 90
`, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12
cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3
cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
`)
}
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
labelsString := promrelabel.LabelsToString(ts.Labels)
if len(ts.Samples) != 1 {
panic(fmt.Errorf("unexpected number of samples for %s: %d; want 1", labelsString, len(ts.Samples)))
}
return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value)
}
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
var rows prometheus.Rows
errLogger := func(s string) {
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
}
rows.UnmarshalWithErrLogger(s, errLogger)
var tss []prompbmarshal.TimeSeries
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
for _, row := range rows.Rows {
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: row.Metric,
})
for _, tag := range row.Tags {
labels = append(labels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
})
}
samples = append(samples, prompbmarshal.Sample{
Value: row.Value,
Timestamp: row.Timestamp,
})
ts := prompbmarshal.TimeSeries{
Labels: labels,
Samples: samples[len(samples)-1:],
}
tss = append(tss, ts)
}
return tss
}

View File

@ -0,0 +1,73 @@
package streamaggr
import (
"fmt"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func BenchmarkAggregatorsPushByJobAvg(b *testing.B) {
for _, output := range []string{
"total",
"increase",
"count_series",
"count_samples",
"sum_samples",
"last",
"min",
"max",
"avg",
"stddev",
"stdvar",
"histogram_bucket",
"quantiles(0, 0.5, 1)",
} {
b.Run(fmt.Sprintf("output=%s", output), func(b *testing.B) {
benchmarkAggregatorsPush(b, output)
})
}
}
func benchmarkAggregatorsPush(b *testing.B, output string) {
config := fmt.Sprintf(`
- match: http_requests_total
interval: 24h
without: [job]
outputs: [%q]
`, output)
pushFunc := func(tss []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("unexpected pushFunc call"))
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc)
if err != nil {
b.Fatalf("unexpected error when initializing aggregators: %s", err)
}
defer a.MustStop()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Push(benchSeries)
}
})
}
func newBenchSeries(seriesCount, samplesPerSeries int) []prompbmarshal.TimeSeries {
a := make([]string, seriesCount*samplesPerSeries)
for i := 0; i < samplesPerSeries; i++ {
for j := 0; j < seriesCount; j++ {
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo",instance="bar"} %d`, j, i*10)
a = append(a, s)
}
}
metrics := strings.Join(a, "\n")
return mustParsePromMetrics(metrics)
}
const seriesCount = 10000
const samplesPerSeries = 10
var benchSeries = newBenchSeries(seriesCount, samplesPerSeries)

View File

@ -0,0 +1,71 @@
package streamaggr
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples.
type sumSamplesAggrState struct {
m sync.Map
}
type sumSamplesStateValue struct {
mu sync.Mutex
sum float64
deleted bool
}
func newSumSamplesAggrState() *sumSamplesAggrState {
return &sumSamplesAggrState{}
}
func (as *sumSamplesAggrState) pushSample(inputKey, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &sumSamplesStateValue{
sum: value,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += value
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *sumSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
sum := sv.sum
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum)
return true
})
}

137
lib/streamaggr/total.go Normal file
View File

@ -0,0 +1,137 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// totalAggrState calculates output=total, e.g. the summary counter over input counters.
type totalAggrState struct {
m sync.Map
ignoreInputDeadline uint64
intervalSecs uint64
}
type totalStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
deleteDeadline uint64
deleted bool
}
type lastValueState struct {
value float64
deleteDeadline uint64
}
func newTotalAggrState(interval time.Duration) *totalAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := uint64(interval.Seconds() + 1)
return &totalAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
intervalSecs: intervalSecs,
}
}
func (as *totalAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.intervalSecs + (as.intervalSecs >> 1)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &totalStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
sv.lastValues[inputKey] = lv
}
d := value
if ok && lv.value <= value {
d = value - lv.value
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total += d
}
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *totalAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*totalStateValue)
sv.mu.Lock()
total := sv.total
if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, "total", currentTimeMsec, total)
}
return true
})
}