lib/streamaggr: follow-up for 736197179e

- Use a byte slice instead of a map for tracking indexes for matching series.
  This improves performance, since access by slice index is faster than access by map key.
- Re-use the byte slice for tracking indexes for matching series.
  This removes unnecessary memory allocations and improves stream aggregation performance a bit.
- Add an ability to return to the previous behvaiour by specifying -remoteWrite.streamAggr.dropInput command-line flag.
  In this case all the input samples are dropped when stream aggregation is enabled.
- Backport the new stream aggregation behaviour from vmagent to single-node VictoriaMetrics when -streamAggr.config
  option is set.
- Improve docs regarding this change at docs/CHANGELOG.md
- Document the new behavior at docs/stream-aggregation.md

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4575
This commit is contained in:
Aliaksandr Valialkin 2023-07-24 16:44:09 -07:00
parent 736197179e
commit 52c13e9515
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
13 changed files with 256 additions and 227 deletions

View File

@ -2639,11 +2639,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-storageDataPath string
Path to storage data (default "victoria-metrics-data")
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
-streamAggr.keepInput
Whether to keep input samples after the aggregation with -streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
-tls
Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set
-tlsCertFile string

View File

@ -1548,13 +1548,16 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
The number of significant figures to leave in metric values before writing them to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.config array
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval
Supports an array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.dedupInterval array
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.dropInput array
Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.keepInput array
Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. See https://docs.victoriametrics.com/stream-aggregation.html
Whether to keep all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.tlsCAFile array
Optional path to TLS CA file to use for verifying connections to the corresponding -remoteWrite.url. By default, system CA is used

View File

@ -67,10 +67,13 @@ var (
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
"By default, the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
)
@ -507,6 +510,7 @@ type remoteWriteCtx struct {
sas atomic.Pointer[streamaggr.Aggregators]
streamAggrKeepInput bool
streamAggrDropInput bool
pss []*pendingSeries
pssNextIdx uint64
@ -579,6 +583,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
}
rwctx.sas.Store(sas)
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
}
@ -630,41 +635,45 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
rowsCount := getRowsCount(tss)
rwctx.rowsPushedAfterRelabel.Add(rowsCount)
defer func() {
// Return back relabeling contexts to the pool
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
}
// Load stream aggregagation config
// Apply stream aggregation if any
sas := rwctx.sas.Load()
// Fast path, no need to track used series
if sas == nil || rwctx.streamAggrKeepInput {
// Apply stream aggregation to the input samples
// it's safe to call sas.Push with sas == nil
sas.Push(tss, nil)
// Push all samples to the remote storage
rwctx.pushInternal(tss)
return
if sas != nil {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = sas.Push(tss, matchIdxs.B)
if !rwctx.streamAggrKeepInput {
if rctx == nil {
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
tss = append(*v, tss...)
}
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
}
matchIdxsPool.Put(matchIdxs)
}
rwctx.pushInternal(tss)
// Track series which were used for stream aggregation.
ut := streamaggr.NewTssUsageTracker(len(tss))
sas.Push(tss, ut.Matched)
// Return back relabeling contexts to the pool
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
}
unmatchedSeries := tssPool.Get().(*[]prompbmarshal.TimeSeries)
// Push only unmatched series to the remote storage
*unmatchedSeries = ut.GetUnmatched(tss, *unmatchedSeries)
rwctx.pushInternal(*unmatchedSeries)
var matchIdxsPool bytesutil.ByteBufferPool
*unmatchedSeries = prompbmarshal.ResetTimeSeries(*unmatchedSeries)
tssPool.Put(unmatchedSeries)
func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, dropInput bool) []prompbmarshal.TimeSeries {
dst := src[:0]
for i, match := range matchIdxs {
if match == 0 {
continue
}
dst = append(dst, src[i])
}
tail := src[len(dst):]
_ = prompbmarshal.ResetTimeSeries(tail)
return dst
}
func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {

View File

@ -34,11 +34,12 @@ func (ctx *InsertCtx) Reset(rowsLen int) {
}
ctx.Labels = ctx.Labels[:0]
for i := range ctx.mrs {
mr := &ctx.mrs[i]
mr.MetricNameRaw = nil
mrs := ctx.mrs
for i := range mrs {
cleanMetricRow(&mrs[i])
}
ctx.mrs = ctx.mrs[:0]
ctx.mrs = mrs[:0]
if n := rowsLen - cap(ctx.mrs); n > 0 {
ctx.mrs = append(ctx.mrs[:cap(ctx.mrs)], make([]storage.MetricRow, n)...)
}
@ -49,6 +50,10 @@ func (ctx *InsertCtx) Reset(rowsLen int) {
ctx.skipStreamAggr = false
}
func cleanMetricRow(mr *storage.MetricRow) {
mr.MetricNameRaw = nil
}
func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) []byte {
start := len(ctx.metricNamesBuf)
ctx.metricNamesBuf = append(ctx.metricNamesBuf, prefix...)
@ -139,11 +144,13 @@ func (ctx *InsertCtx) ApplyRelabeling() {
func (ctx *InsertCtx) FlushBufs() error {
sas := sasGlobal.Load()
if sas != nil && !ctx.skipStreamAggr {
ctx.streamAggrCtx.push(ctx.mrs)
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = ctx.streamAggrCtx.push(ctx.mrs, matchIdxs.B)
if !*streamAggrKeepInput {
ctx.Reset(0)
return nil
// Remove aggregated rows from ctx.mrs
ctx.dropAggregatedRows(matchIdxs.B)
}
matchIdxsPool.Put(matchIdxs)
}
// There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here,
// since the number of concurrent FlushBufs() calls should be already limited via writeconcurrencylimiter
@ -158,3 +165,23 @@ func (ctx *InsertCtx) FlushBufs() error {
StatusCode: http.StatusServiceUnavailable,
}
}
func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) {
dst := ctx.mrs[:0]
src := ctx.mrs
if !*streamAggrDropInput {
for idx, match := range matchIdxs {
if match != 0 {
continue
}
dst = append(dst, src[idx])
}
}
tail := src[len(dst):]
for i := range tail {
cleanMetricRow(&tail[i])
}
ctx.mrs = dst
}
var matchIdxsPool bytesutil.ByteBufferPool

View File

@ -21,10 +21,13 @@ import (
var (
streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
"See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval")
streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep input samples after the aggregation with -streamAggr.config. "+
"By default, the input is dropped after the aggregation, so only the aggregate data is stored. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation with -streamAggr.config. "+
"By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+
"See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDropInput = flag.Bool("streamAggr.dropInput", false, "Whether to drop all the input samples after the aggregation with -streamAggr.config. "+
"By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+
"See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
)
@ -133,14 +136,20 @@ func (ctx *streamAggrCtx) Reset() {
promrelabel.CleanLabels(ts.Labels)
}
func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) {
func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte {
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(mrs))
for i := 0; i < len(matchIdxs); i++ {
matchIdxs[i] = 0
}
mn := &ctx.mn
tss := ctx.tss[:]
ts := &tss[0]
labels := ts.Labels
samples := ts.Samples
sas := sasGlobal.Load()
for _, mr := range mrs {
var matchIdxsLocal []byte
for idx, mr := range mrs {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
logger.Panicf("BUG: cannot unmarshal recently marshaled MetricName: %s", err)
}
@ -164,8 +173,13 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) {
ts.Labels = labels
ts.Samples = samples
sas.Push(tss, nil)
matchIdxsLocal = sas.Push(tss, matchIdxsLocal)
if matchIdxsLocal[0] != 0 {
matchIdxs[idx] = 1
}
}
return matchIdxs
}
func pushAggregateSeries(tss []prompbmarshal.TimeSeries) {

View File

@ -24,9 +24,18 @@ The following `tip` changes can be tested by building VictoriaMetrics components
## tip
**Update notes:** release contains breaking change to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) `-remoteWrite.streamAggr.keepInput` command-line flag.
Default behaviour has changed to keep metrics which were not matched by any aggregation rule when `-remoteWrite.streamAggr.keepInput` is set to false (default value).
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243) for details.
**Update note: starting from this release, [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) writes
to the configured storage the following samples by default:
- aggregated samples;
- the original input samples, which match zero `match` options from the provided [config](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config).
Previously only aggregated samples were written to the storage by default.
The previous behavior can be restored in the following ways:
- by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics;
- by passing `-remoteWrite.streamAggr.dropInput` command-line flag per each configured `-remoteWrite.streamAggr.config` at `vmagent`.
**
* SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html).
* SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved).
@ -40,6 +49,9 @@ See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243)
- `WITH (w = 5m) m[w]` is automatically transformed to `m[5m]`
- `WITH (f(window, step, off) = m[window:step] offset off) f(5m, 10s, 1h)` is automatically transformed to `m[5m:10s] offset 1h`
Thanks to @lujiajing1126 for the initial idea and [implementation](https://github.com/VictoriaMetrics/metricsql/pull/13). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4025).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): added a new page with the list of currently running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4598) and [these docs](https://docs.victoriametrics.com/#active-queries).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): preserve input samples, which match zero `match` options from the [configured aggregations](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config). Previously all the input samples were dropped by default, so only the aggregated samples are written to the output storage. The previous behavior can be restored by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics or by passing `-remoteWrite.streamAggr.dropInput` command-line flag to `vmagent`.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): interrupt backoff retries when import process is cancelled. The change makes vmctl more responsive in case of errors during the import. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4442).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): update backoff policy on retries to reduce probability of overloading for `source` or `destination` databases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4402).
@ -56,13 +68,11 @@ See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243)
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): expose `vmauth_user_request_duration_seconds` and `vmauth_unauthorized_user_request_duration_seconds` summary metrics for measuring requests latency per user.
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): show backup progress percentage in log during backup uploading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460).
* FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): show restoring progress percentage in log during backup downloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details.
* FEATURE: add ability to fine-tune Graphite API limits via the following command-line flags:
`-search.maxGraphiteTagKeys` for limiting the number of tag keys returned from [Graphite API for tags](https://docs.victoriametrics.com/#graphite-tags-api-usage)
`-search.maxGraphiteTagValues` for limiting the number of tag values returned from [Graphite API for tag values](https://docs.victoriametrics.com/#graphite-tags-api-usage)
`-search.maxGraphiteSeries` for limiting the number of series (aka paths) returned from [Graphite API for series](https://docs.victoriametrics.com/#graphite-tags-api-usage)
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4339).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): added a new page to display a list of currently running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4598).
* BUGFIX: properly return series from [/api/v1/series](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#prometheus-querying-api-usage) if it finds more than the `limit` series (`limit` is an optional query arg passed to this API). Previously the `limit exceeded error` error was returned in this case. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2841#issuecomment-1560055631).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix application routing issues and problems with manual URL changes. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4408) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4604).

View File

@ -2642,11 +2642,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-storageDataPath string
Path to storage data (default "victoria-metrics-data")
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
-streamAggr.keepInput
Whether to keep input samples after the aggregation with -streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
-tls
Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set
-tlsCertFile string

View File

@ -2650,11 +2650,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-storageDataPath string
Path to storage data (default "victoria-metrics-data")
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
-streamAggr.keepInput
Whether to keep input samples after the aggregation with -streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
-tls
Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set
-tlsCertFile string

View File

@ -17,7 +17,10 @@ can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.ht
The aggregation is applied to all the metrics received via any [supported data ingestion protocol](https://docs.victoriametrics.com/#how-to-import-time-series-data)
and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter).
The stream aggregation is configured via the following command-line flags:
Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
It expects that the ingested samples have timestamps close to the current time.
Stream aggregation is configured via the following command-line flags:
- `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
This flag can be specified individually per each `-remoteWrite.url`.
@ -26,16 +29,21 @@ The stream aggregation is configured via the following command-line flags:
These flags must point to a file containing [stream aggregation config](#stream-aggregation-config).
By default, only the aggregated data is written to the storage. If the original incoming samples must be written to the storage too,
then the following command-line flags must be specified:
By default, the following data is written to the storage when stream aggregation is enabled:
- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
This flag can be specified individually per each `-remoteWrite.url`.
This allows writing both raw and aggregate data to different remote storage destinations.
- `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
- the aggregated samples;
- the raw input samples, which didn't match all the `match` options in the provided [config](#stream-aggregation-config).
Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
It expects that the ingested samples have timestamps close to the current time.
This behaviour can be changed via the following command-line flags:
- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html) and `-streamAggr.keepInput`
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
If one of these flags are set, then all the input samples are written to the storage alongside the aggregated samples.
The `-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url`.
- `-remoteWrite.streamAggr.dropInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html) and `-streamAggr.dropInput`
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
If one of these flags are set, then all the input samples are dropped, while only the aggregated samples are written to the storage.
The `-remoteWrite.streamAggr.dropInput` flag can be specified individually per each `-remoteWrite.url`.
By default, all the input samples are aggregated. Sometimes it is needed to de-duplicate samples before the aggregation.
For example, if the samples are received from replicated sources.

View File

@ -1559,13 +1559,16 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
The number of significant figures to leave in metric values before writing them to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.config array
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval
Supports an array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.dedupInterval array
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.dropInput array
Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.keepInput array
Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. See https://docs.victoriametrics.com/stream-aggregation.html
Whether to keep all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.tlsCAFile array
Optional path to TLS CA file to use for verifying connections to the corresponding -remoteWrite.url. By default, system CA is used

View File

@ -10,8 +10,6 @@ import (
"sync"
"time"
"gopkg.in/yaml.v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@ -20,6 +18,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"gopkg.in/yaml.v2"
)
var supportedOutputs = []string{
@ -195,13 +194,25 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
}
// Push pushes tss to a.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) {
if a == nil {
return
//
// Push sets matchIdxs[idx] to 1 if the corresponding tss[idx] was used in aggregations.
// Otherwise matchIdxs[idx] is set to 0.
//
// Push returns matchIdxs with len equal to len(tss).
// It re-uses the matchIdxs if it has enough capacity to hold len(tss) items.
// Otherwise it allocates new matchIdxs.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []byte {
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss))
for i := 0; i < len(matchIdxs); i++ {
matchIdxs[i] = 0
}
for _, aggr := range a.as {
aggr.Push(tss, matched)
if a != nil {
for _, aggr := range a.as {
aggr.Push(tss, matchIdxs)
}
}
return matchIdxs
}
// aggregator aggregates input series according to the config passed to NewAggregator
@ -499,25 +510,34 @@ func (a *aggregator) MustStop() {
}
// Push pushes tss to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) {
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
if a.dedupAggr == nil {
a.push(tss, matched)
// Deduplication is disabled.
a.push(tss, matchIdxs)
return
}
// deduplication is enabled.
// Deduplication is enabled.
// push samples to dedupAggr, so later they will be pushed to the configured aggregators.
pushSample := a.dedupAggr.pushSample
inputKey := ""
bb := bbPool.Get()
labels := promutils.GetLabels()
for idx, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
}
if matched != nil {
matched(idx)
matchIdxs[idx] = 1
labels.Labels = append(labels.Labels[:0], ts.Labels...)
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
if len(labels.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
bb.B = marshalLabelsFast(bb.B[:0], ts.Labels)
labels.Sort()
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
outputKey := bytesutil.InternBytes(bb.B)
for _, sample := range ts.Samples {
pushSample(inputKey, outputKey, sample.Value)
@ -527,17 +547,19 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matched func(id int))
bbPool.Put(bb)
}
func (a *aggregator) push(tss []prompbmarshal.TimeSeries, tracker func(id int)) {
func (a *aggregator) push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
labels := promutils.GetLabels()
tmpLabels := promutils.GetLabels()
bb := bbPool.Get()
applyFilters := matchIdxs != nil
for idx, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
}
if tracker != nil {
tracker(idx)
if applyFilters {
if !a.match.Match(ts.Labels) {
continue
}
matchIdxs[idx] = 1
}
labels.Labels = append(labels.Labels[:0], ts.Labels...)
if applyFilters {
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
@ -802,30 +824,3 @@ func sortAndRemoveDuplicates(a []string) []string {
}
return dst
}
// TssUsageTracker tracks used series for streaming aggregation.
type TssUsageTracker struct {
usedSeries map[int]struct{}
}
// NewTssUsageTracker returns new TssUsageTracker.
func NewTssUsageTracker(totalSeries int) *TssUsageTracker {
return &TssUsageTracker{usedSeries: make(map[int]struct{}, totalSeries)}
}
// Matched marks series with id as used.
// Not safe for concurrent use. The caller must
// ensure that there are no concurrent calls to Matched.
func (tut *TssUsageTracker) Matched(id int) {
tut.usedSeries[id] = struct{}{}
}
// GetUnmatched returns unused series from tss.
func (tut *TssUsageTracker) GetUnmatched(tss, dst []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries {
for k := range tss {
if _, ok := tut.usedSeries[k]; !ok {
dst = append(dst, tss[k])
}
}
return dst
}

View File

@ -3,6 +3,7 @@ package streamaggr
import (
"fmt"
"sort"
"strconv"
"strings"
"sync"
"testing"
@ -158,7 +159,7 @@ func TestAggregatorsEqual(t *testing.T) {
}
func TestAggregatorsSuccess(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected string) {
f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) {
t.Helper()
// Initialize Aggregators
@ -183,9 +184,18 @@ func TestAggregatorsSuccess(t *testing.T) {
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput, nil)
matchIdxs := a.Push(tssInput, nil)
a.MustStop()
// Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := ""
for _, v := range matchIdxs {
matchIdxsStr += strconv.Itoa(int(v))
}
if matchIdxsStr != matchIdxsStrExpected {
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
}
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
@ -199,9 +209,9 @@ func TestAggregatorsSuccess(t *testing.T) {
}
// Empty config
f(``, ``, ``)
f(``, `foo{bar="baz"} 1`, ``)
f(``, "foo 1\nbaz 2", ``)
f(``, ``, ``, "")
f(``, `foo{bar="baz"} 1`, ``, "0")
f(``, "foo 1\nbaz 2", ``, "00")
// Empty by list - aggregate only by time
f(`
@ -224,7 +234,7 @@ foo:1m_last{abc="123"} 8.5
foo:1m_last{abc="456",de="fg"} 8
foo:1m_sum_samples{abc="123"} 12.5
foo:1m_sum_samples{abc="456",de="fg"} 8
`)
`, "1111")
// Special case: __name__ in `by` list - this is the same as empty `by` list
f(`
@ -242,7 +252,7 @@ bar:1m_sum_samples 5
foo:1m_count_samples 3
foo:1m_count_series 2
foo:1m_sum_samples 20.5
`)
`, "1111")
// Non-empty `by` list with non-existing labels
f(`
@ -260,7 +270,7 @@ bar:1m_by_bar_foo_sum_samples 5
foo:1m_by_bar_foo_count_samples 3
foo:1m_by_bar_foo_count_series 2
foo:1m_by_bar_foo_sum_samples 20.5
`)
`, "1111")
// Non-empty `by` list with existing label
f(`
@ -281,7 +291,7 @@ foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`)
`, "1111")
// Non-empty `by` list with duplicate existing label
f(`
@ -302,7 +312,7 @@ foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`)
`, "1111")
// Non-empty `without` list with non-existing labels
f(`
@ -323,7 +333,7 @@ foo:1m_without_foo_count_series{abc="123"} 1
foo:1m_without_foo_count_series{abc="456",de="fg"} 1
foo:1m_without_foo_sum_samples{abc="123"} 12.5
foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8
`)
`, "1111")
// Non-empty `without` list with existing labels
f(`
@ -344,7 +354,7 @@ foo:1m_without_abc_count_series 1
foo:1m_without_abc_count_series{de="fg"} 1
foo:1m_without_abc_sum_samples 12.5
foo:1m_without_abc_sum_samples{de="fg"} 8
`)
`, "1111")
// Special case: __name__ in `without` list
f(`
@ -365,7 +375,7 @@ foo{abc="456",de="fg"} 8
:1m_sum_samples 5
:1m_sum_samples{abc="123"} 12.5
:1m_sum_samples{abc="456",de="fg"} 8
`)
`, "1111")
// drop some input metrics
f(`
@ -383,7 +393,7 @@ foo{abc="456",de="fg"} 8
`, `bar:1m_without_abc_count_samples 1
bar:1m_without_abc_count_series 1
bar:1m_without_abc_sum_samples 5
`)
`, "1111")
// rename output metrics
f(`
@ -410,7 +420,7 @@ bar-1m-without-abc-sum-samples 5
foo-1m-without-abc-count-samples 2
foo-1m-without-abc-count-series 1
foo-1m-without-abc-sum-samples 12.5
`)
`, "1111")
// match doesn't match anything
f(`
@ -423,7 +433,7 @@ foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, ``)
`, ``, "0000")
// match matches foo series with non-empty abc label
f(`
@ -442,7 +452,7 @@ foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`)
`, "1011")
// total output for non-repeated series
f(`
@ -453,7 +463,7 @@ foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 0
foo:1m_total 0
`)
`, "11")
// total output for repeated series
f(`
@ -472,7 +482,7 @@ foo{baz="qwe"} 10
bar:1m_total{baz="qwer"} 1
foo:1m_total 0
foo:1m_total{baz="qwe"} 15
`)
`, "11111111")
// total output for repeated series with group by __name__
f(`
@ -490,7 +500,7 @@ bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total 6.02
foo:1m_total 15
`)
`, "11111111")
// increase output for non-repeated series
f(`
@ -501,7 +511,7 @@ foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_increase{baz="qwe"} 0
foo:1m_increase 0
`)
`, "11")
// increase output for repeated series
f(`
@ -520,7 +530,7 @@ foo{baz="qwe"} 10
bar:1m_increase{baz="qwer"} 1
foo:1m_increase 0
foo:1m_increase{baz="qwe"} 15
`)
`, "11111111")
// multiple aggregate configs
f(`
@ -539,7 +549,7 @@ foo:1m_sum_samples 4.3
foo:1m_sum_samples{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:5m_by_bar_sum_samples{bar="baz"} 2
`)
`, "111")
// min and max outputs
f(`
@ -556,7 +566,7 @@ foo:1m_max{abc="123"} 8.5
foo:1m_max{abc="456",de="fg"} 8
foo:1m_min{abc="123"} 4
foo:1m_min{abc="456",de="fg"} 8
`)
`, "1111")
// avg output
f(`
@ -570,7 +580,7 @@ foo{abc="456",de="fg"} 8
`, `bar:1m_avg 5
foo:1m_avg{abc="123"} 6.25
foo:1m_avg{abc="456",de="fg"} 8
`)
`, "1111")
// stddev output
f(`
@ -584,7 +594,7 @@ foo{abc="456",de="fg"} 8
`, `bar:1m_stddev 0
foo:1m_stddev{abc="123"} 2.25
foo:1m_stddev{abc="456",de="fg"} 0
`)
`, "1111")
// stdvar output
f(`
@ -598,7 +608,7 @@ foo{abc="456",de="fg"} 8
`, `bar:1m_stdvar 0
foo:1m_stdvar{abc="123"} 5.0625
foo:1m_stdvar{abc="456",de="fg"} 0
`)
`, "1111")
// histogram_bucket output
f(`
@ -616,7 +626,7 @@ cpu_usage{cpu="2"} 90
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1
cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1
`)
`, "1111111")
// histogram_bucket output without cpu
f(`
@ -635,7 +645,7 @@ cpu_usage{cpu="2"} 90
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1
`)
`, "1111111")
// quantiles output
f(`
@ -655,7 +665,7 @@ cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25
cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90
cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90
cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90
`)
`, "1111111")
// quantiles output without cpu
f(`
@ -673,11 +683,11 @@ cpu_usage{cpu="2"} 90
`, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12
cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3
cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
`)
`, "1111111")
}
func TestAggregatorsWithDedupInterval(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected string) {
f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) {
t.Helper()
// Initialize Aggregators
@ -703,7 +713,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput, nil)
matchIdxs := a.Push(tssInput, nil)
if a != nil {
for _, aggr := range a.as {
aggr.dedupFlush()
@ -712,6 +722,15 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
}
a.MustStop()
// Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := ""
for _, v := range matchIdxs {
matchIdxsStr += strconv.Itoa(int(v))
}
if matchIdxsStr != matchIdxsStrExpected {
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
}
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
@ -732,7 +751,7 @@ foo 123
bar 567
`, `bar:1m_sum_samples 567
foo:1m_sum_samples 123
`)
`, "11")
f(`
- interval: 1m
@ -750,7 +769,7 @@ foo{baz="qwe"} 10
bar:1m_sum_samples{baz="qwer"} 344
foo:1m_sum_samples 123
foo:1m_sum_samples{baz="qwe"} 10
`)
`, "11111111")
}
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {

View File

@ -37,12 +37,11 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
without: [job]
outputs: [%q]
`, output)
i := 0
pushCalls := 0
pushFunc := func(tss []prompbmarshal.TimeSeries) {
i++
if i > 1 {
// pushFunc is expected to be called exactly once at MustStop
panic(fmt.Errorf("unexpected pushFunc call"))
pushCalls++
if pushCalls > 1 {
panic(fmt.Errorf("pushFunc is expected to be called exactly once at MustStop"))
}
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
@ -54,79 +53,18 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries)))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []byte
for pb.Next() {
a.Push(benchSeries, nil)
matchIdxs = a.Push(benchSeries, matchIdxs)
}
})
}
func BenchmarkAggregatorsPushWithSeriesTracker(b *testing.B) {
config := fmt.Sprintf(`
- match: http_requests_total
interval: 24h
without: [job]
outputs: [%q]
`, "total")
i := 0
pushFunc := func(tss []prompbmarshal.TimeSeries) {
i++
if i > 1 {
// pushFunc is expected to be called exactly once at MustStop
panic(fmt.Errorf("unexpected pushFunc call"))
}
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil {
b.Fatalf("unexpected error when initializing aggregators: %s", err)
}
defer a.MustStop()
tests := []struct {
name string
series []prompbmarshal.TimeSeries
}{
{
name: "all matches",
series: benchSeries,
},
{
name: "no matches",
series: benchSeriesWithRandomNames100,
},
{
name: "50% matches",
series: benchSeriesWithRandomNames50,
},
{
name: "10% matches",
series: benchSeriesWithRandomNames10,
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(len(tt.series)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ut := NewTssUsageTracker(len(tt.series))
a.Push(tt.series, ut.Matched)
}
})
})
}
}
func newBenchSeries(seriesCount, samplesPerSeries int, randomNameFraction float64) []prompbmarshal.TimeSeries {
func newBenchSeries(seriesCount, samplesPerSeries int) []prompbmarshal.TimeSeries {
a := make([]string, seriesCount*samplesPerSeries)
for i := 0; i < samplesPerSeries; i++ {
for j := 0; j < seriesCount; j++ {
metricName := "http_requests_total"
if randomNameFraction > 0 && j%int(1/randomNameFraction) == 0 {
metricName = fmt.Sprintf("random_other_name_%d", j)
}
s := fmt.Sprintf(`%s{path="/foo/%d",job="foo",instance="bar"} %d`, metricName, j, i*10)
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo",instance="bar"} %d`, j, i*10)
a = append(a, s)
}
}
@ -137,7 +75,4 @@ func newBenchSeries(seriesCount, samplesPerSeries int, randomNameFraction float6
const seriesCount = 10000
const samplesPerSeries = 10
var benchSeries = newBenchSeries(seriesCount, samplesPerSeries, 0)
var benchSeriesWithRandomNames10 = newBenchSeries(seriesCount, samplesPerSeries, 0.1)
var benchSeriesWithRandomNames50 = newBenchSeries(seriesCount, samplesPerSeries, 0.5)
var benchSeriesWithRandomNames100 = newBenchSeries(seriesCount, samplesPerSeries, 1)
var benchSeries = newBenchSeries(seriesCount, samplesPerSeries)