diff --git a/README.md b/README.md index 46ad8330c..f85ee3078 100644 --- a/README.md +++ b/README.md @@ -2639,11 +2639,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -storageDataPath string Path to storage data (default "victoria-metrics-data") -streamAggr.config string - Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval + Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval -streamAggr.dedupInterval duration Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero + -streamAggr.dropInput + Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html -streamAggr.keepInput - Whether to keep input samples after the aggregation with -streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html + Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html -tls Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set -tlsCertFile string diff --git a/app/vmagent/README.md b/app/vmagent/README.md index c8418e911..bac17b84f 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -1548,13 +1548,16 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . The number of significant figures to leave in metric values before writing them to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits Supports array of values separated by comma or specified via multiple flags. -remoteWrite.streamAggr.config array - Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval + Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval Supports an array of values separated by comma or specified via multiple flags. -remoteWrite.streamAggr.dedupInterval array Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero Supports array of values separated by comma or specified via multiple flags. + -remoteWrite.streamAggr.dropInput array + Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html + Supports array of values separated by comma or specified via multiple flags. -remoteWrite.streamAggr.keepInput array - Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. See https://docs.victoriametrics.com/stream-aggregation.html + Whether to keep all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html Supports array of values separated by comma or specified via multiple flags. -remoteWrite.tlsCAFile array Optional path to TLS CA file to use for verifying connections to the corresponding -remoteWrite.url. By default, system CA is used diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 6770dfa0e..98d77b295 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -67,10 +67,13 @@ var ( streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ "See https://docs.victoriametrics.com/stream-aggregation.html . "+ - "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") - streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ - "By default, the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html") + "See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval") + streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+ + "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ + "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+ + "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ + "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") ) @@ -507,6 +510,7 @@ type remoteWriteCtx struct { sas atomic.Pointer[streamaggr.Aggregators] streamAggrKeepInput bool + streamAggrDropInput bool pss []*pendingSeries pssNextIdx uint64 @@ -579,6 +583,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI } rwctx.sas.Store(sas) rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx) + rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) } @@ -630,41 +635,45 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { rowsCount := getRowsCount(tss) rwctx.rowsPushedAfterRelabel.Add(rowsCount) - defer func() { - // Return back relabeling contexts to the pool - if rctx != nil { - *v = prompbmarshal.ResetTimeSeries(tss) - tssPool.Put(v) - putRelabelCtx(rctx) - } - } - - // Load stream aggregagation config + // Apply stream aggregation if any sas := rwctx.sas.Load() - - // Fast path, no need to track used series - if sas == nil || rwctx.streamAggrKeepInput { - // Apply stream aggregation to the input samples - // it's safe to call sas.Push with sas == nil - sas.Push(tss, nil) - - // Push all samples to the remote storage - rwctx.pushInternal(tss) - - return + if sas != nil { + matchIdxs := matchIdxsPool.Get() + matchIdxs.B = sas.Push(tss, matchIdxs.B) + if !rwctx.streamAggrKeepInput { + if rctx == nil { + rctx = getRelabelCtx() + // Make a copy of tss before dropping aggregated series + v = tssPool.Get().(*[]prompbmarshal.TimeSeries) + tss = append(*v, tss...) + } + tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput) + } + matchIdxsPool.Put(matchIdxs) } + rwctx.pushInternal(tss) - // Track series which were used for stream aggregation. - ut := streamaggr.NewTssUsageTracker(len(tss)) - sas.Push(tss, ut.Matched) + // Return back relabeling contexts to the pool + if rctx != nil { + *v = prompbmarshal.ResetTimeSeries(tss) + tssPool.Put(v) + putRelabelCtx(rctx) + } +} - unmatchedSeries := tssPool.Get().(*[]prompbmarshal.TimeSeries) - // Push only unmatched series to the remote storage - *unmatchedSeries = ut.GetUnmatched(tss, *unmatchedSeries) - rwctx.pushInternal(*unmatchedSeries) +var matchIdxsPool bytesutil.ByteBufferPool - *unmatchedSeries = prompbmarshal.ResetTimeSeries(*unmatchedSeries) - tssPool.Put(unmatchedSeries) +func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, dropInput bool) []prompbmarshal.TimeSeries { + dst := src[:0] + for i, match := range matchIdxs { + if match == 0 { + continue + } + dst = append(dst, src[i]) + } + tail := src[len(dst):] + _ = prompbmarshal.ResetTimeSeries(tail) + return dst } func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index b2acad938..7e55e0430 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -34,11 +34,12 @@ func (ctx *InsertCtx) Reset(rowsLen int) { } ctx.Labels = ctx.Labels[:0] - for i := range ctx.mrs { - mr := &ctx.mrs[i] - mr.MetricNameRaw = nil + mrs := ctx.mrs + for i := range mrs { + cleanMetricRow(&mrs[i]) } - ctx.mrs = ctx.mrs[:0] + ctx.mrs = mrs[:0] + if n := rowsLen - cap(ctx.mrs); n > 0 { ctx.mrs = append(ctx.mrs[:cap(ctx.mrs)], make([]storage.MetricRow, n)...) } @@ -49,6 +50,10 @@ func (ctx *InsertCtx) Reset(rowsLen int) { ctx.skipStreamAggr = false } +func cleanMetricRow(mr *storage.MetricRow) { + mr.MetricNameRaw = nil +} + func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) []byte { start := len(ctx.metricNamesBuf) ctx.metricNamesBuf = append(ctx.metricNamesBuf, prefix...) @@ -139,11 +144,13 @@ func (ctx *InsertCtx) ApplyRelabeling() { func (ctx *InsertCtx) FlushBufs() error { sas := sasGlobal.Load() if sas != nil && !ctx.skipStreamAggr { - ctx.streamAggrCtx.push(ctx.mrs) + matchIdxs := matchIdxsPool.Get() + matchIdxs.B = ctx.streamAggrCtx.push(ctx.mrs, matchIdxs.B) if !*streamAggrKeepInput { - ctx.Reset(0) - return nil + // Remove aggregated rows from ctx.mrs + ctx.dropAggregatedRows(matchIdxs.B) } + matchIdxsPool.Put(matchIdxs) } // There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here, // since the number of concurrent FlushBufs() calls should be already limited via writeconcurrencylimiter @@ -158,3 +165,23 @@ func (ctx *InsertCtx) FlushBufs() error { StatusCode: http.StatusServiceUnavailable, } } + +func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) { + dst := ctx.mrs[:0] + src := ctx.mrs + if !*streamAggrDropInput { + for idx, match := range matchIdxs { + if match != 0 { + continue + } + dst = append(dst, src[idx]) + } + } + tail := src[len(dst):] + for i := range tail { + cleanMetricRow(&tail[i]) + } + ctx.mrs = dst +} + +var matchIdxsPool bytesutil.ByteBufferPool diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 868c57613..1d3fdc56a 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -21,10 +21,13 @@ import ( var ( streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+ "See https://docs.victoriametrics.com/stream-aggregation.html . "+ - "See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval") - streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep input samples after the aggregation with -streamAggr.config. "+ - "By default, the input is dropped after the aggregation, so only the aggregate data is stored. "+ - "See https://docs.victoriametrics.com/stream-aggregation.html") + "See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval") + streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation with -streamAggr.config. "+ + "By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+ + "See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDropInput = flag.Bool("streamAggr.dropInput", false, "Whether to drop all the input samples after the aggregation with -streamAggr.config. "+ + "By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+ + "See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+ "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") ) @@ -133,14 +136,20 @@ func (ctx *streamAggrCtx) Reset() { promrelabel.CleanLabels(ts.Labels) } -func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) { +func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte { + matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(mrs)) + for i := 0; i < len(matchIdxs); i++ { + matchIdxs[i] = 0 + } + mn := &ctx.mn tss := ctx.tss[:] ts := &tss[0] labels := ts.Labels samples := ts.Samples sas := sasGlobal.Load() - for _, mr := range mrs { + var matchIdxsLocal []byte + for idx, mr := range mrs { if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { logger.Panicf("BUG: cannot unmarshal recently marshaled MetricName: %s", err) } @@ -164,8 +173,13 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) { ts.Labels = labels ts.Samples = samples - sas.Push(tss, nil) + matchIdxsLocal = sas.Push(tss, matchIdxsLocal) + if matchIdxsLocal[0] != 0 { + matchIdxs[idx] = 1 + } } + + return matchIdxs } func pushAggregateSeries(tss []prompbmarshal.TimeSeries) { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 15118e07a..6651ffc53 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -24,9 +24,18 @@ The following `tip` changes can be tested by building VictoriaMetrics components ## tip -**Update notes:** release contains breaking change to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) `-remoteWrite.streamAggr.keepInput` command-line flag. -Default behaviour has changed to keep metrics which were not matched by any aggregation rule when `-remoteWrite.streamAggr.keepInput` is set to false (default value). -See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243) for details. +**Update note: starting from this release, [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) writes +to the configured storage the following samples by default: + +- aggregated samples; +- the original input samples, which match zero `match` options from the provided [config](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config). + +Previously only aggregated samples were written to the storage by default. +The previous behavior can be restored in the following ways: + +- by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics; +- by passing `-remoteWrite.streamAggr.dropInput` command-line flag per each configured `-remoteWrite.streamAggr.config` at `vmagent`. +** * SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html). * SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved). @@ -40,6 +49,9 @@ See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243) - `WITH (w = 5m) m[w]` is automatically transformed to `m[5m]` - `WITH (f(window, step, off) = m[window:step] offset off) f(5m, 10s, 1h)` is automatically transformed to `m[5m:10s] offset 1h` Thanks to @lujiajing1126 for the initial idea and [implementation](https://github.com/VictoriaMetrics/metricsql/pull/13). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4025). +* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): added a new page with the list of currently running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4598) and [these docs](https://docs.victoriametrics.com/#active-queries). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): preserve input samples, which match zero `match` options from the [configured aggregations](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config). Previously all the input samples were dropped by default, so only the aggregated samples are written to the output storage. The previous behavior can be restored by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics or by passing `-remoteWrite.streamAggr.dropInput` command-line flag to `vmagent`. * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): interrupt backoff retries when import process is cancelled. The change makes vmctl more responsive in case of errors during the import. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4442). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): update backoff policy on retries to reduce probability of overloading for `source` or `destination` databases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4402). @@ -56,13 +68,11 @@ See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243) * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): expose `vmauth_user_request_duration_seconds` and `vmauth_unauthorized_user_request_duration_seconds` summary metrics for measuring requests latency per user. * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): show backup progress percentage in log during backup uploading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460). * FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): show restoring progress percentage in log during backup downloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460). -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details. * FEATURE: add ability to fine-tune Graphite API limits via the following command-line flags: `-search.maxGraphiteTagKeys` for limiting the number of tag keys returned from [Graphite API for tags](https://docs.victoriametrics.com/#graphite-tags-api-usage) `-search.maxGraphiteTagValues` for limiting the number of tag values returned from [Graphite API for tag values](https://docs.victoriametrics.com/#graphite-tags-api-usage) `-search.maxGraphiteSeries` for limiting the number of series (aka paths) returned from [Graphite API for series](https://docs.victoriametrics.com/#graphite-tags-api-usage) See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4339). -* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): added a new page to display a list of currently running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4598). * BUGFIX: properly return series from [/api/v1/series](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#prometheus-querying-api-usage) if it finds more than the `limit` series (`limit` is an optional query arg passed to this API). Previously the `limit exceeded error` error was returned in this case. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2841#issuecomment-1560055631). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix application routing issues and problems with manual URL changes. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4408) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4604). diff --git a/docs/README.md b/docs/README.md index a5f9d261a..87c451aaf 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2642,11 +2642,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -storageDataPath string Path to storage data (default "victoria-metrics-data") -streamAggr.config string - Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval + Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval -streamAggr.dedupInterval duration Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero + -streamAggr.dropInput + Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html -streamAggr.keepInput - Whether to keep input samples after the aggregation with -streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html + Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html -tls Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set -tlsCertFile string diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 9478b61b6..f9e0a06cd 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -2650,11 +2650,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -storageDataPath string Path to storage data (default "victoria-metrics-data") -streamAggr.config string - Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval + Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval -streamAggr.dedupInterval duration Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero + -streamAggr.dropInput + Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html -streamAggr.keepInput - Whether to keep input samples after the aggregation with -streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html + Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html -tls Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set -tlsCertFile string diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 09de6e768..c281e74e6 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -17,7 +17,10 @@ can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.ht The aggregation is applied to all the metrics received via any [supported data ingestion protocol](https://docs.victoriametrics.com/#how-to-import-time-series-data) and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter). -The stream aggregation is configured via the following command-line flags: +Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +It expects that the ingested samples have timestamps close to the current time. + +Stream aggregation is configured via the following command-line flags: - `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent.html). This flag can be specified individually per each `-remoteWrite.url`. @@ -26,16 +29,21 @@ The stream aggregation is configured via the following command-line flags: These flags must point to a file containing [stream aggregation config](#stream-aggregation-config). -By default, only the aggregated data is written to the storage. If the original incoming samples must be written to the storage too, -then the following command-line flags must be specified: +By default, the following data is written to the storage when stream aggregation is enabled: -- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html). - This flag can be specified individually per each `-remoteWrite.url`. - This allows writing both raw and aggregate data to different remote storage destinations. -- `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). +- the aggregated samples; +- the raw input samples, which didn't match all the `match` options in the provided [config](#stream-aggregation-config). -Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). -It expects that the ingested samples have timestamps close to the current time. +This behaviour can be changed via the following command-line flags: + +- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html) and `-streamAggr.keepInput` + at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). + If one of these flags are set, then all the input samples are written to the storage alongside the aggregated samples. + The `-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url`. +- `-remoteWrite.streamAggr.dropInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html) and `-streamAggr.dropInput` + at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). + If one of these flags are set, then all the input samples are dropped, while only the aggregated samples are written to the storage. + The `-remoteWrite.streamAggr.dropInput` flag can be specified individually per each `-remoteWrite.url`. By default, all the input samples are aggregated. Sometimes it is needed to de-duplicate samples before the aggregation. For example, if the samples are received from replicated sources. diff --git a/docs/vmagent.md b/docs/vmagent.md index bc25e2ab0..ae143f164 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1559,13 +1559,16 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . The number of significant figures to leave in metric values before writing them to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits Supports array of values separated by comma or specified via multiple flags. -remoteWrite.streamAggr.config array - Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval + Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval Supports an array of values separated by comma or specified via multiple flags. -remoteWrite.streamAggr.dedupInterval array Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero Supports array of values separated by comma or specified via multiple flags. + -remoteWrite.streamAggr.dropInput array + Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html + Supports array of values separated by comma or specified via multiple flags. -remoteWrite.streamAggr.keepInput array - Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. See https://docs.victoriametrics.com/stream-aggregation.html + Whether to keep all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html Supports array of values separated by comma or specified via multiple flags. -remoteWrite.tlsCAFile array Optional path to TLS CA file to use for verifying connections to the corresponding -remoteWrite.url. By default, system CA is used diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 939b33f7f..318c2b701 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -10,8 +10,6 @@ import ( "sync" "time" - "gopkg.in/yaml.v2" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -20,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "gopkg.in/yaml.v2" ) var supportedOutputs = []string{ @@ -195,13 +194,25 @@ func (a *Aggregators) Equal(b *Aggregators) bool { } // Push pushes tss to a. -func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) { - if a == nil { - return +// +// Push sets matchIdxs[idx] to 1 if the corresponding tss[idx] was used in aggregations. +// Otherwise matchIdxs[idx] is set to 0. +// +// Push returns matchIdxs with len equal to len(tss). +// It re-uses the matchIdxs if it has enough capacity to hold len(tss) items. +// Otherwise it allocates new matchIdxs. +func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []byte { + matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) + for i := 0; i < len(matchIdxs); i++ { + matchIdxs[i] = 0 } - for _, aggr := range a.as { - aggr.Push(tss, matched) + + if a != nil { + for _, aggr := range a.as { + aggr.Push(tss, matchIdxs) + } } + return matchIdxs } // aggregator aggregates input series according to the config passed to NewAggregator @@ -499,25 +510,34 @@ func (a *aggregator) MustStop() { } // Push pushes tss to a. -func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) { +func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { if a.dedupAggr == nil { - a.push(tss, matched) + // Deduplication is disabled. + a.push(tss, matchIdxs) return } - // deduplication is enabled. + // Deduplication is enabled. // push samples to dedupAggr, so later they will be pushed to the configured aggregators. pushSample := a.dedupAggr.pushSample inputKey := "" bb := bbPool.Get() + labels := promutils.GetLabels() for idx, ts := range tss { if !a.match.Match(ts.Labels) { continue } - if matched != nil { - matched(idx) + matchIdxs[idx] = 1 + + labels.Labels = append(labels.Labels[:0], ts.Labels...) + labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) + if len(labels.Labels) == 0 { + // The metric has been deleted by the relabeling + continue } - bb.B = marshalLabelsFast(bb.B[:0], ts.Labels) + labels.Sort() + + bb.B = marshalLabelsFast(bb.B[:0], labels.Labels) outputKey := bytesutil.InternBytes(bb.B) for _, sample := range ts.Samples { pushSample(inputKey, outputKey, sample.Value) @@ -527,17 +547,19 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) bbPool.Put(bb) } -func (a *aggregator) push(tss []prompbmarshal.TimeSeries, tracker func(id int)) { +func (a *aggregator) push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { labels := promutils.GetLabels() tmpLabels := promutils.GetLabels() bb := bbPool.Get() + applyFilters := matchIdxs != nil for idx, ts := range tss { - if !a.match.Match(ts.Labels) { - continue - } - if tracker != nil { - tracker(idx) + if applyFilters { + if !a.match.Match(ts.Labels) { + continue + } + matchIdxs[idx] = 1 } + labels.Labels = append(labels.Labels[:0], ts.Labels...) if applyFilters { labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) @@ -802,30 +824,3 @@ func sortAndRemoveDuplicates(a []string) []string { } return dst } - -// TssUsageTracker tracks used series for streaming aggregation. -type TssUsageTracker struct { - usedSeries map[int]struct{} -} - -// NewTssUsageTracker returns new TssUsageTracker. -func NewTssUsageTracker(totalSeries int) *TssUsageTracker { - return &TssUsageTracker{usedSeries: make(map[int]struct{}, totalSeries)} -} - -// Matched marks series with id as used. -// Not safe for concurrent use. The caller must -// ensure that there are no concurrent calls to Matched. -func (tut *TssUsageTracker) Matched(id int) { - tut.usedSeries[id] = struct{}{} -} - -// GetUnmatched returns unused series from tss. -func (tut *TssUsageTracker) GetUnmatched(tss, dst []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries { - for k := range tss { - if _, ok := tut.usedSeries[k]; !ok { - dst = append(dst, tss[k]) - } - } - return dst -} diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 311442791..ae987444b 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -3,6 +3,7 @@ package streamaggr import ( "fmt" "sort" + "strconv" "strings" "sync" "testing" @@ -158,7 +159,7 @@ func TestAggregatorsEqual(t *testing.T) { } func TestAggregatorsSuccess(t *testing.T) { - f := func(config, inputMetrics, outputMetricsExpected string) { + f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) { t.Helper() // Initialize Aggregators @@ -183,9 +184,18 @@ func TestAggregatorsSuccess(t *testing.T) { // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) - a.Push(tssInput, nil) + matchIdxs := a.Push(tssInput, nil) a.MustStop() + // Verify matchIdxs equals to matchIdxsExpected + matchIdxsStr := "" + for _, v := range matchIdxs { + matchIdxsStr += strconv.Itoa(int(v)) + } + if matchIdxsStr != matchIdxsStrExpected { + t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected) + } + // Verify the tssOutput contains the expected metrics tsStrings := make([]string, len(tssOutput)) for i, ts := range tssOutput { @@ -199,9 +209,9 @@ func TestAggregatorsSuccess(t *testing.T) { } // Empty config - f(``, ``, ``) - f(``, `foo{bar="baz"} 1`, ``) - f(``, "foo 1\nbaz 2", ``) + f(``, ``, ``, "") + f(``, `foo{bar="baz"} 1`, ``, "0") + f(``, "foo 1\nbaz 2", ``, "00") // Empty by list - aggregate only by time f(` @@ -224,7 +234,7 @@ foo:1m_last{abc="123"} 8.5 foo:1m_last{abc="456",de="fg"} 8 foo:1m_sum_samples{abc="123"} 12.5 foo:1m_sum_samples{abc="456",de="fg"} 8 -`) +`, "1111") // Special case: __name__ in `by` list - this is the same as empty `by` list f(` @@ -242,7 +252,7 @@ bar:1m_sum_samples 5 foo:1m_count_samples 3 foo:1m_count_series 2 foo:1m_sum_samples 20.5 -`) +`, "1111") // Non-empty `by` list with non-existing labels f(` @@ -260,7 +270,7 @@ bar:1m_by_bar_foo_sum_samples 5 foo:1m_by_bar_foo_count_samples 3 foo:1m_by_bar_foo_count_series 2 foo:1m_by_bar_foo_sum_samples 20.5 -`) +`, "1111") // Non-empty `by` list with existing label f(` @@ -281,7 +291,7 @@ foo:1m_by_abc_count_series{abc="123"} 1 foo:1m_by_abc_count_series{abc="456"} 1 foo:1m_by_abc_sum_samples{abc="123"} 12.5 foo:1m_by_abc_sum_samples{abc="456"} 8 -`) +`, "1111") // Non-empty `by` list with duplicate existing label f(` @@ -302,7 +312,7 @@ foo:1m_by_abc_count_series{abc="123"} 1 foo:1m_by_abc_count_series{abc="456"} 1 foo:1m_by_abc_sum_samples{abc="123"} 12.5 foo:1m_by_abc_sum_samples{abc="456"} 8 -`) +`, "1111") // Non-empty `without` list with non-existing labels f(` @@ -323,7 +333,7 @@ foo:1m_without_foo_count_series{abc="123"} 1 foo:1m_without_foo_count_series{abc="456",de="fg"} 1 foo:1m_without_foo_sum_samples{abc="123"} 12.5 foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8 -`) +`, "1111") // Non-empty `without` list with existing labels f(` @@ -344,7 +354,7 @@ foo:1m_without_abc_count_series 1 foo:1m_without_abc_count_series{de="fg"} 1 foo:1m_without_abc_sum_samples 12.5 foo:1m_without_abc_sum_samples{de="fg"} 8 -`) +`, "1111") // Special case: __name__ in `without` list f(` @@ -365,7 +375,7 @@ foo{abc="456",de="fg"} 8 :1m_sum_samples 5 :1m_sum_samples{abc="123"} 12.5 :1m_sum_samples{abc="456",de="fg"} 8 -`) +`, "1111") // drop some input metrics f(` @@ -383,7 +393,7 @@ foo{abc="456",de="fg"} 8 `, `bar:1m_without_abc_count_samples 1 bar:1m_without_abc_count_series 1 bar:1m_without_abc_sum_samples 5 -`) +`, "1111") // rename output metrics f(` @@ -410,7 +420,7 @@ bar-1m-without-abc-sum-samples 5 foo-1m-without-abc-count-samples 2 foo-1m-without-abc-count-series 1 foo-1m-without-abc-sum-samples 12.5 -`) +`, "1111") // match doesn't match anything f(` @@ -423,7 +433,7 @@ foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 -`, ``) +`, ``, "0000") // match matches foo series with non-empty abc label f(` @@ -442,7 +452,7 @@ foo:1m_by_abc_count_series{abc="123"} 1 foo:1m_by_abc_count_series{abc="456"} 1 foo:1m_by_abc_sum_samples{abc="123"} 12.5 foo:1m_by_abc_sum_samples{abc="456"} 8 -`) +`, "1011") // total output for non-repeated series f(` @@ -453,7 +463,7 @@ foo 123 bar{baz="qwe"} 4.34 `, `bar:1m_total{baz="qwe"} 0 foo:1m_total 0 -`) +`, "11") // total output for repeated series f(` @@ -472,7 +482,7 @@ foo{baz="qwe"} 10 bar:1m_total{baz="qwer"} 1 foo:1m_total 0 foo:1m_total{baz="qwe"} 15 -`) +`, "11111111") // total output for repeated series with group by __name__ f(` @@ -490,7 +500,7 @@ bar{baz="qwer"} 344 foo{baz="qwe"} 10 `, `bar:1m_total 6.02 foo:1m_total 15 -`) +`, "11111111") // increase output for non-repeated series f(` @@ -501,7 +511,7 @@ foo 123 bar{baz="qwe"} 4.34 `, `bar:1m_increase{baz="qwe"} 0 foo:1m_increase 0 -`) +`, "11") // increase output for repeated series f(` @@ -520,7 +530,7 @@ foo{baz="qwe"} 10 bar:1m_increase{baz="qwer"} 1 foo:1m_increase 0 foo:1m_increase{baz="qwe"} 15 -`) +`, "11111111") // multiple aggregate configs f(` @@ -539,7 +549,7 @@ foo:1m_sum_samples 4.3 foo:1m_sum_samples{bar="baz"} 2 foo:5m_by_bar_sum_samples 4.3 foo:5m_by_bar_sum_samples{bar="baz"} 2 -`) +`, "111") // min and max outputs f(` @@ -556,7 +566,7 @@ foo:1m_max{abc="123"} 8.5 foo:1m_max{abc="456",de="fg"} 8 foo:1m_min{abc="123"} 4 foo:1m_min{abc="456",de="fg"} 8 -`) +`, "1111") // avg output f(` @@ -570,7 +580,7 @@ foo{abc="456",de="fg"} 8 `, `bar:1m_avg 5 foo:1m_avg{abc="123"} 6.25 foo:1m_avg{abc="456",de="fg"} 8 -`) +`, "1111") // stddev output f(` @@ -584,7 +594,7 @@ foo{abc="456",de="fg"} 8 `, `bar:1m_stddev 0 foo:1m_stddev{abc="123"} 2.25 foo:1m_stddev{abc="456",de="fg"} 0 -`) +`, "1111") // stdvar output f(` @@ -598,7 +608,7 @@ foo{abc="456",de="fg"} 8 `, `bar:1m_stdvar 0 foo:1m_stdvar{abc="123"} 5.0625 foo:1m_stdvar{abc="456",de="fg"} 0 -`) +`, "1111") // histogram_bucket output f(` @@ -616,7 +626,7 @@ cpu_usage{cpu="2"} 90 cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3 cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1 cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1 -`) +`, "1111111") // histogram_bucket output without cpu f(` @@ -635,7 +645,7 @@ cpu_usage{cpu="2"} 90 cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3 cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1 cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1 -`) +`, "1111111") // quantiles output f(` @@ -655,7 +665,7 @@ cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25 cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90 cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90 cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90 -`) +`, "1111111") // quantiles output without cpu f(` @@ -673,11 +683,11 @@ cpu_usage{cpu="2"} 90 `, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12 cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3 cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 -`) +`, "1111111") } func TestAggregatorsWithDedupInterval(t *testing.T) { - f := func(config, inputMetrics, outputMetricsExpected string) { + f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) { t.Helper() // Initialize Aggregators @@ -703,7 +713,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) - a.Push(tssInput, nil) + matchIdxs := a.Push(tssInput, nil) if a != nil { for _, aggr := range a.as { aggr.dedupFlush() @@ -712,6 +722,15 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { } a.MustStop() + // Verify matchIdxs equals to matchIdxsExpected + matchIdxsStr := "" + for _, v := range matchIdxs { + matchIdxsStr += strconv.Itoa(int(v)) + } + if matchIdxsStr != matchIdxsStrExpected { + t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected) + } + // Verify the tssOutput contains the expected metrics tsStrings := make([]string, len(tssOutput)) for i, ts := range tssOutput { @@ -732,7 +751,7 @@ foo 123 bar 567 `, `bar:1m_sum_samples 567 foo:1m_sum_samples 123 -`) +`, "11") f(` - interval: 1m @@ -750,7 +769,7 @@ foo{baz="qwe"} 10 bar:1m_sum_samples{baz="qwer"} 344 foo:1m_sum_samples 123 foo:1m_sum_samples{baz="qwe"} 10 -`) +`, "11111111") } func timeSeriesToString(ts prompbmarshal.TimeSeries) string { diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index f3c81c3cd..51f95455b 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -37,12 +37,11 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { without: [job] outputs: [%q] `, output) - i := 0 + pushCalls := 0 pushFunc := func(tss []prompbmarshal.TimeSeries) { - i++ - if i > 1 { - // pushFunc is expected to be called exactly once at MustStop - panic(fmt.Errorf("unexpected pushFunc call")) + pushCalls++ + if pushCalls > 1 { + panic(fmt.Errorf("pushFunc is expected to be called exactly once at MustStop")) } } a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) @@ -54,79 +53,18 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { b.ReportAllocs() b.SetBytes(int64(len(benchSeries))) b.RunParallel(func(pb *testing.PB) { + var matchIdxs []byte for pb.Next() { - a.Push(benchSeries, nil) + matchIdxs = a.Push(benchSeries, matchIdxs) } }) } -func BenchmarkAggregatorsPushWithSeriesTracker(b *testing.B) { - config := fmt.Sprintf(` -- match: http_requests_total - interval: 24h - without: [job] - outputs: [%q] -`, "total") - i := 0 - pushFunc := func(tss []prompbmarshal.TimeSeries) { - i++ - if i > 1 { - // pushFunc is expected to be called exactly once at MustStop - panic(fmt.Errorf("unexpected pushFunc call")) - } - } - a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) - if err != nil { - b.Fatalf("unexpected error when initializing aggregators: %s", err) - } - defer a.MustStop() - - tests := []struct { - name string - series []prompbmarshal.TimeSeries - }{ - { - name: "all matches", - series: benchSeries, - }, - { - name: "no matches", - series: benchSeriesWithRandomNames100, - }, - { - name: "50% matches", - series: benchSeriesWithRandomNames50, - }, - { - name: "10% matches", - series: benchSeriesWithRandomNames10, - }, - } - - for _, tt := range tests { - b.Run(tt.name, func(b *testing.B) { - b.ReportAllocs() - b.SetBytes(int64(len(tt.series))) - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - ut := NewTssUsageTracker(len(tt.series)) - a.Push(tt.series, ut.Matched) - } - }) - }) - } -} - -func newBenchSeries(seriesCount, samplesPerSeries int, randomNameFraction float64) []prompbmarshal.TimeSeries { +func newBenchSeries(seriesCount, samplesPerSeries int) []prompbmarshal.TimeSeries { a := make([]string, seriesCount*samplesPerSeries) for i := 0; i < samplesPerSeries; i++ { for j := 0; j < seriesCount; j++ { - metricName := "http_requests_total" - if randomNameFraction > 0 && j%int(1/randomNameFraction) == 0 { - metricName = fmt.Sprintf("random_other_name_%d", j) - } - - s := fmt.Sprintf(`%s{path="/foo/%d",job="foo",instance="bar"} %d`, metricName, j, i*10) + s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo",instance="bar"} %d`, j, i*10) a = append(a, s) } } @@ -137,7 +75,4 @@ func newBenchSeries(seriesCount, samplesPerSeries int, randomNameFraction float6 const seriesCount = 10000 const samplesPerSeries = 10 -var benchSeries = newBenchSeries(seriesCount, samplesPerSeries, 0) -var benchSeriesWithRandomNames10 = newBenchSeries(seriesCount, samplesPerSeries, 0.1) -var benchSeriesWithRandomNames50 = newBenchSeries(seriesCount, samplesPerSeries, 0.5) -var benchSeriesWithRandomNames100 = newBenchSeries(seriesCount, samplesPerSeries, 1) +var benchSeries = newBenchSeries(seriesCount, samplesPerSeries)