diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index a04361ffb..f79d4b42f 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -211,7 +211,7 @@ func Init() { } sasGlobal.Store(sas) } else if sasOpts.DedupInterval > 0 { - deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesDropFailed, sasOpts.DedupInterval, sasOpts.DropInputLabels) + deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesDropFailed, sasOpts.DedupInterval, sasOpts.DropInputLabels, sasOpts.Alias) } if len(*remoteWriteURLs) > 0 { @@ -840,7 +840,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) } else if sasOpts.DedupInterval > 0 { - rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, sasOpts.DedupInterval, sasOpts.DropInputLabels) + rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, sasOpts.DedupInterval, sasOpts.DropInputLabels, sasOpts.Alias) } return rwctx diff --git a/app/vmagent/remotewrite/remotewrite_test.go b/app/vmagent/remotewrite/remotewrite_test.go index 6c384ba2e..7fd2179a1 100644 --- a/app/vmagent/remotewrite/remotewrite_test.go +++ b/app/vmagent/remotewrite/remotewrite_test.go @@ -79,12 +79,12 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`), } if dedupInterval > 0 { - rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil) + rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil, "global") } if len(streamAggrConfig) > 0 { f := createFile(t, []byte(streamAggrConfig)) - sas, err := streamaggr.LoadFromFile(f.Name(), nil, nil) + sas, err := streamaggr.LoadFromFile(f.Name(), nil, streamaggr.Options{}) if err != nil { t.Fatalf("cannot load streamaggr configs: %s", err) } diff --git a/app/vmagent/remotewrite/streamaggr.go b/app/vmagent/remotewrite/streamaggr.go index 454d14bf8..b8ea11a16 100644 --- a/app/vmagent/remotewrite/streamaggr.go +++ b/app/vmagent/remotewrite/streamaggr.go @@ -125,28 +125,35 @@ func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) { metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp()) } -func getStreamAggrOpts(idx int) (string, *streamaggr.Options) { +func getStreamAggrOpts(idx int) (string, streamaggr.Options) { if idx < 0 { - return *streamAggrGlobalConfig, &streamaggr.Options{ + return *streamAggrGlobalConfig, streamaggr.Options{ DedupInterval: streamAggrGlobalDedupInterval.Duration(), DropInputLabels: *streamAggrGlobalDropInputLabels, IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals, + Alias: "global", } } + url := fmt.Sprintf("%d:secret-url", idx+1) + if *showRemoteWriteURL { + url = fmt.Sprintf("%d:%s", idx+1, remoteWriteURLs.GetOptionalArg(idx)) + } opts := streamaggr.Options{ DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx), DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx), IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, + Alias: url, } + if len(*streamAggrConfig) == 0 { - return "", &opts + return "", opts } - return streamAggrConfig.GetOptionalArg(idx), &opts + return streamAggrConfig.GetOptionalArg(idx), opts } -func newStreamAggrConfigWithOpts(pushFunc streamaggr.PushFunc, path string, opts *streamaggr.Options) (*streamaggr.Aggregators, error) { +func newStreamAggrConfigWithOpts(pushFunc streamaggr.PushFunc, path string, opts streamaggr.Options) (*streamaggr.Aggregators, error) { if len(path) == 0 { // Skip empty stream aggregation config. return nil, nil diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 39b4478c9..c5ec26ce9 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -57,11 +57,12 @@ func CheckStreamAggrConfig() error { return nil } pushNoop := func(_ []prompbmarshal.TimeSeries) {} - opts := &streamaggr.Options{ + opts := streamaggr.Options{ DedupInterval: *streamAggrDedupInterval, DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, + Alias: "global", } sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts) if err != nil { @@ -81,21 +82,23 @@ func HasStreamAggrConfigured() bool { // MustStopStreamAggr must be called when stream aggr is no longer needed. func InitStreamAggr() { saCfgReloaderStopCh = make(chan struct{}) + rwctx := "global" if *streamAggrConfig == "" { if *streamAggrDedupInterval > 0 { - deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval, *streamAggrDropInputLabels) + deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval, *streamAggrDropInputLabels, rwctx) } return } sighupCh := procutil.NewSighupChan() - opts := &streamaggr.Options{ + opts := streamaggr.Options{ DedupInterval: *streamAggrDedupInterval, DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, + Alias: rwctx, } sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) if err != nil { @@ -125,11 +128,12 @@ func reloadStreamAggrConfig() { logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig) saCfgReloads.Inc() - opts := &streamaggr.Options{ + opts := streamaggr.Options{ DedupInterval: *streamAggrDedupInterval, DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, + Alias: "global", } sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) if err != nil { diff --git a/dashboards/vm/vmagent.json b/dashboards/vm/vmagent.json index c10b92e03..80d5ee771 100644 --- a/dashboards/vm/vmagent.json +++ b/dashboards/vm/vmagent.json @@ -4575,7 +4575,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -4591,7 +4592,7 @@ "h": 8, "w": 12, "x": 12, - "y": 14 + "y": 6 }, "id": 131, "options": { @@ -4679,7 +4680,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -4695,7 +4697,7 @@ "h": 8, "w": 12, "x": 0, - "y": 22 + "y": 14 }, "id": 130, "options": { @@ -4796,7 +4798,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -4812,7 +4815,7 @@ "h": 8, "w": 12, "x": 12, - "y": 22 + "y": 14 }, "id": 77, "options": { @@ -4887,7 +4890,7 @@ "type": "victoriametrics-datasource", "uid": "$ds" }, - "description": "The 99th percentile of avg flush duration for the aggregated data. \n\nSmaller is better.\n\nAggregation can produce incorrect results ff flush duration exceeds configured deduplication interval. See \"Flush Timeouts\" panel.", + "description": "Shows the number of matched samples by the aggregation rule. \n\nThe more samples is matched, the more work this aggregation rule does. The matching rule is specified via `match` param.\n\nSee more details in [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). ", "fieldConfig": { "defaults": { "color": { @@ -4939,8 +4942,7 @@ "value": 80 } ] - }, - "unit": "s" + } }, "overrides": [] }, @@ -4950,13 +4952,19 @@ "x": 0, "y": 7 }, - "id": 137, + "id": 146, "options": { "legend": { - "calcs": [], - "displayMode": "list", + "calcs": [ + "min", + "lastNotNull", + "max" + ], + "displayMode": "table", "placement": "bottom", - "showLegend": true + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true }, "tooltip": { "mode": "single", @@ -4970,14 +4978,14 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "histogram_quantile(0.99, vm_streamaggr_dedup_flush_duration_seconds_bucket{job=~\"$job\",instance=~\"$instance\"}[$__rate_interval])", + "expr": "rate(vm_streamaggr_matched_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0", "instant": false, - "legendFormat": "{{instance}} ({{job}})", + "legendFormat": "{{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "A" } ], - "title": "Dedup flush duration (0.99)", + "title": "Matched samples ($instance)", "type": "timeseries" }, { @@ -4985,7 +4993,7 @@ "type": "victoriametrics-datasource", "uid": "$ds" }, - "description": "Amount of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples ", + "description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples ", "fieldConfig": { "defaults": { "color": { @@ -5067,14 +5075,117 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "increase(vm_streamaggr_ignored_samples_total[5m]) > 0", + "expr": "rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0", "instant": false, - "legendFormat": "{{reason}}: {{instance}} ({{job}})", + "legendFormat": "{{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "A" } ], - "title": "Ignored samples", + "title": "Ignored samples ($instance)", + "type": "timeseries" + }, + { + "datasource": { + "type": "victoriametrics-datasource", + "uid": "$ds" + }, + "description": "Shows the number of produced samples by the aggregation rule. \n\nNumber of produced samples depend on params like `by`, `without`, `interval`, etc.\n\nSee more details in [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). ", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "axisSoftMin": 0, + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 15 + }, + "id": 147, + "options": { + "legend": { + "calcs": [ + "min", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "victoriametrics-datasource", + "uid": "$ds" + }, + "editorMode": "code", + "expr": "rate(vm_streamaggr_flushed_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0", + "instant": false, + "legendFormat": "{{url}} ({{job}}): match={{match}}; output={{output}}", + "range": true, + "refId": "A" + } + ], + "title": "Produced samples ($instance)", "type": "timeseries" }, { @@ -5142,7 +5253,7 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, + "x": 12, "y": 15 }, "id": 139, @@ -5165,9 +5276,9 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "increase(vm_streamaggr_flush_timeouts_total{job=~\"$job\",instance=~\"$instance\"}[$__rate_interval]) > 0", + "expr": "increase(vm_streamaggr_flush_timeouts_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])", "instant": false, - "legendFormat": "aggregate: {{instance}} ({{job}})", + "legendFormat": "aggregation: {{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "A" }, @@ -5177,15 +5288,15 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "increase(vm_streamaggr_dedup_flush_timeouts_total{job=~\"$job\",instance=~\"$instance\"}[$__rate_interval]) > 0", + "expr": "increase(vm_streamaggr_dedup_flush_timeouts_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])", "hide": false, "instant": false, - "legendFormat": "dedup: {{instance}} ({{job}})", + "legendFormat": "deduplication: {{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "B" } ], - "title": "Flush timeouts", + "title": "Flush timeouts ($instance)", "type": "timeseries" }, { @@ -5193,7 +5304,7 @@ "type": "victoriametrics-datasource", "uid": "$ds" }, - "description": "Shows the lag between average samples timestamp and aggregation interval.\n\nLower is better.\n\nToo high lag or lag exceeding the interval might be a sign of data delay before aggregation or resource insufficiency on aggregator. Samples with high lag may affect accuracy of aggregation.\n\nSee https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples", + "description": "Shows the max lag between samples timestamps within one batch passed to the aggregation input.\n\nLower is better.\n\nToo high lag or lag exceeding the interval might be a sign that data was delayed before aggregation or resource insufficiency on aggregator. Samples with high lag may affect accuracy of aggregation.\n\nSee https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples", "fieldConfig": { "defaults": { "color": { @@ -5254,8 +5365,8 @@ "gridPos": { "h": 8, "w": 12, - "x": 12, - "y": 15 + "x": 0, + "y": 23 }, "id": 142, "options": { @@ -5277,14 +5388,14 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "histogram_quantile(0.99, vm_streamaggr_samples_lag_seconds_bucket{job=~\"$job\",instance=~\"$instance\"}[$__rate_interval])", + "expr": "histogram_quantile(0.99, rate(vm_streamaggr_samples_lag_seconds_bucket{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]))", "instant": false, - "legendFormat": "{{instance}} ({{job}})", + "legendFormat": "{{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "A" } ], - "title": "Aggregated samples lag", + "title": "Samples lag 0.99 quantile ($instance)", "type": "timeseries" }, { @@ -5292,7 +5403,7 @@ "type": "victoriametrics-datasource", "uid": "$ds" }, - "description": "Shows the size of Label Compressor in bytes.\n\nLabels compressor encodes label-value pairs during aggregation to optimise memory usage. It is expected for its size to grow with time and to reset on vmagent restarts.\n\nRapid spikes in Label compressor size might be a sign of significant changes in labels of received samples.", + "description": "The 99th percentile of avg flush duration for the aggregated data. \n\nSmaller is better.\n\nAggregation can produce incorrect results ff flush duration exceeds configured deduplication interval. See \"Flush Timeouts\" panel.", "fieldConfig": { "defaults": { "color": { @@ -5345,17 +5456,17 @@ } ] }, - "unit": "decbytes" + "unit": "s" }, "overrides": [] }, "gridPos": { "h": 8, "w": 12, - "x": 0, + "x": 12, "y": 23 }, - "id": 140, + "id": 137, "options": { "legend": { "calcs": [], @@ -5375,14 +5486,111 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "vm_streamaggr_labels_compressor_size_bytes{job=~\"$job\",instance=~\"$instance\"} > 0", + "expr": "histogram_quantile(0.99, rate(vm_streamaggr_dedup_flush_duration_seconds_bucket{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]))", "instant": false, - "legendFormat": "{{instance}} ({{job}})", + "legendFormat": "{{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "A" } ], - "title": "Labels compressor bytes", + "title": "Dedup flush duration 0.99 quantile ($instance)", + "type": "timeseries" + }, + { + "datasource": { + "type": "victoriametrics-datasource", + "uid": "$ds" + }, + "description": "Shows the eviction rate of time series because of staleness.\n\nThere are two stages where series can be marked as stale.\n1. Input. Aggregator keeps in memory each received unique time series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for this series. \n\n2. Output. The output key is a resulting time series produced by aggregating many input series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for any of input series for this aggregation.\n\nIncrease in `input` keys shows that series previously matched by the aggregation rule now became stale.\n\nIncrease in `output` keys shows that series previously produced by the aggregation rule now became stale.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "axisSoftMin": 0, + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 31 + }, + "id": 144, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "victoriametrics-datasource", + "uid": "$ds" + }, + "editorMode": "code", + "expr": "increase(vm_streamaggr_stale_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0", + "instant": false, + "legendFormat": "{{url}} ({{job}}): match={{match}}; key={{key}}", + "range": true, + "refId": "A" + } + ], + "title": "Staleness rate ($instance)", "type": "timeseries" }, { @@ -5444,13 +5652,30 @@ ] } }, - "overrides": [] + "overrides": [ + { + "matcher": { + "id": "byRegexp", + "options": "/bytes.*/" + }, + "properties": [ + { + "id": "custom.axisPlacement", + "value": "right" + }, + { + "id": "unit", + "value": "bytes" + } + ] + } + ] }, "gridPos": { "h": 8, "w": 12, "x": 12, - "y": 23 + "y": 31 }, "id": 141, "options": { @@ -5472,14 +5697,28 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "vm_streamaggr_labels_compressor_items_count{job=~\"$job\",instance=~\"$instance\"} > 0", + "expr": "max(vm_streamaggr_labels_compressor_items_count{job=~\"$job\",instance=~\"$instance\"}) by(job, instance)", + "hide": false, "instant": false, - "legendFormat": "{{instance}} ({{job}})", + "legendFormat": "items: {{instance}} ({{job}})", "range": true, "refId": "A" + }, + { + "datasource": { + "type": "victoriametrics-datasource", + "uid": "$ds" + }, + "editorMode": "code", + "expr": "max(vm_streamaggr_labels_compressor_size_bytes{job=~\"$job\", instance=~\"$instance\"}) by(job, instance)", + "hide": false, + "instant": false, + "legendFormat": "bytes: {{instance}} ({{job}})", + "range": true, + "refId": "B" } ], - "title": "Labels compressor items count", + "title": "Labels compressor ($instance)", "type": "timeseries" } ], @@ -5551,7 +5790,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -5567,7 +5807,7 @@ "h": 8, "w": 12, "x": 0, - "y": 16 + "y": 8 }, "id": 60, "options": { @@ -5655,7 +5895,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -5671,7 +5912,7 @@ "h": 8, "w": 12, "x": 12, - "y": 16 + "y": 8 }, "id": 66, "options": { @@ -5759,7 +6000,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -5775,7 +6017,7 @@ "h": 8, "w": 12, "x": 0, - "y": 24 + "y": 16 }, "id": 61, "options": { @@ -5863,7 +6105,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -5879,7 +6122,7 @@ "h": 8, "w": 12, "x": 12, - "y": 24 + "y": 16 }, "id": 65, "options": { @@ -5929,6 +6172,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -5942,6 +6186,7 @@ "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, @@ -5964,7 +6209,8 @@ "mode": "absolute", "steps": [ { - "color": "transparent" + "color": "transparent", + "value": null }, { "color": "red", @@ -5980,7 +6226,7 @@ "h": 8, "w": 12, "x": 0, - "y": 32 + "y": 24 }, "id": 88, "options": { @@ -6026,6 +6272,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -6078,7 +6325,7 @@ "h": 8, "w": 12, "x": 12, - "y": 32 + "y": 24 }, "id": 84, "options": { @@ -6128,6 +6375,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -6141,6 +6389,7 @@ "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, @@ -6163,7 +6412,8 @@ "mode": "absolute", "steps": [ { - "color": "transparent" + "color": "transparent", + "value": null }, { "color": "red", @@ -6179,7 +6429,7 @@ "h": 8, "w": 12, "x": 0, - "y": 40 + "y": 32 }, "id": 90, "options": { @@ -6984,4 +7234,4 @@ "uid": "G7Z9GzMGz_vm", "version": 1, "weekStart": "" -} \ No newline at end of file +} diff --git a/dashboards/vmagent.json b/dashboards/vmagent.json index 5b02b43cc..fffa75163 100644 --- a/dashboards/vmagent.json +++ b/dashboards/vmagent.json @@ -4574,7 +4574,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -4590,7 +4591,7 @@ "h": 8, "w": 12, "x": 12, - "y": 14 + "y": 6 }, "id": 131, "options": { @@ -4678,7 +4679,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -4694,7 +4696,7 @@ "h": 8, "w": 12, "x": 0, - "y": 22 + "y": 14 }, "id": 130, "options": { @@ -4795,7 +4797,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -4811,7 +4814,7 @@ "h": 8, "w": 12, "x": 12, - "y": 22 + "y": 14 }, "id": 77, "options": { @@ -4886,7 +4889,7 @@ "type": "prometheus", "uid": "$ds" }, - "description": "The 99th percentile of avg flush duration for the aggregated data. \n\nSmaller is better.\n\nAggregation can produce incorrect results ff flush duration exceeds configured deduplication interval. See \"Flush Timeouts\" panel.", + "description": "Shows the number of matched samples by the aggregation rule. \n\nThe more samples is matched, the more work this aggregation rule does. The matching rule is specified via `match` param.\n\nSee more details in [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). ", "fieldConfig": { "defaults": { "color": { @@ -4938,8 +4941,7 @@ "value": 80 } ] - }, - "unit": "s" + } }, "overrides": [] }, @@ -4949,13 +4951,19 @@ "x": 0, "y": 7 }, - "id": 137, + "id": 146, "options": { "legend": { - "calcs": [], - "displayMode": "list", + "calcs": [ + "min", + "lastNotNull", + "max" + ], + "displayMode": "table", "placement": "bottom", - "showLegend": true + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true }, "tooltip": { "mode": "single", @@ -4969,14 +4977,14 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "histogram_quantile(0.99, vm_streamaggr_dedup_flush_duration_seconds_bucket{job=~\"$job\",instance=~\"$instance\"}[$__rate_interval])", + "expr": "rate(vm_streamaggr_matched_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0", "instant": false, - "legendFormat": "{{instance}} ({{job}})", + "legendFormat": "{{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "A" } ], - "title": "Dedup flush duration (0.99)", + "title": "Matched samples ($instance)", "type": "timeseries" }, { @@ -4984,7 +4992,7 @@ "type": "prometheus", "uid": "$ds" }, - "description": "Amount of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples ", + "description": "The rate of ignored samples during aggregation. \nStream aggregation will drop samples with NaN values, or samples with too old timestamps. See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples ", "fieldConfig": { "defaults": { "color": { @@ -5066,14 +5074,117 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "increase(vm_streamaggr_ignored_samples_total[5m]) > 0", + "expr": "rate(vm_streamaggr_ignored_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0", "instant": false, - "legendFormat": "{{reason}}: {{instance}} ({{job}})", + "legendFormat": "{{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "A" } ], - "title": "Ignored samples", + "title": "Ignored samples ($instance)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "$ds" + }, + "description": "Shows the number of produced samples by the aggregation rule. \n\nNumber of produced samples depend on params like `by`, `without`, `interval`, etc.\n\nSee more details in [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). ", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "axisSoftMin": 0, + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 15 + }, + "id": 147, + "options": { + "legend": { + "calcs": [ + "min", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "$ds" + }, + "editorMode": "code", + "expr": "rate(vm_streamaggr_flushed_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0", + "instant": false, + "legendFormat": "{{url}} ({{job}}): match={{match}}; output={{output}}", + "range": true, + "refId": "A" + } + ], + "title": "Produced samples ($instance)", "type": "timeseries" }, { @@ -5141,7 +5252,7 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, + "x": 12, "y": 15 }, "id": 139, @@ -5164,9 +5275,9 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "increase(vm_streamaggr_flush_timeouts_total{job=~\"$job\",instance=~\"$instance\"}[$__rate_interval]) > 0", + "expr": "increase(vm_streamaggr_flush_timeouts_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])", "instant": false, - "legendFormat": "aggregate: {{instance}} ({{job}})", + "legendFormat": "aggregation: {{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "A" }, @@ -5176,15 +5287,15 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "increase(vm_streamaggr_dedup_flush_timeouts_total{job=~\"$job\",instance=~\"$instance\"}[$__rate_interval]) > 0", + "expr": "increase(vm_streamaggr_dedup_flush_timeouts_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])", "hide": false, "instant": false, - "legendFormat": "dedup: {{instance}} ({{job}})", + "legendFormat": "deduplication: {{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "B" } ], - "title": "Flush timeouts", + "title": "Flush timeouts ($instance)", "type": "timeseries" }, { @@ -5192,7 +5303,7 @@ "type": "prometheus", "uid": "$ds" }, - "description": "Shows the lag between average samples timestamp and aggregation interval.\n\nLower is better.\n\nToo high lag or lag exceeding the interval might be a sign of data delay before aggregation or resource insufficiency on aggregator. Samples with high lag may affect accuracy of aggregation.\n\nSee https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples", + "description": "Shows the max lag between samples timestamps within one batch passed to the aggregation input.\n\nLower is better.\n\nToo high lag or lag exceeding the interval might be a sign that data was delayed before aggregation or resource insufficiency on aggregator. Samples with high lag may affect accuracy of aggregation.\n\nSee https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples", "fieldConfig": { "defaults": { "color": { @@ -5253,8 +5364,8 @@ "gridPos": { "h": 8, "w": 12, - "x": 12, - "y": 15 + "x": 0, + "y": 23 }, "id": 142, "options": { @@ -5276,14 +5387,14 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "histogram_quantile(0.99, vm_streamaggr_samples_lag_seconds_bucket{job=~\"$job\",instance=~\"$instance\"}[$__rate_interval])", + "expr": "histogram_quantile(0.99, rate(vm_streamaggr_samples_lag_seconds_bucket{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]))", "instant": false, - "legendFormat": "{{instance}} ({{job}})", + "legendFormat": "{{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "A" } ], - "title": "Aggregated samples lag", + "title": "Samples lag 0.99 quantile ($instance)", "type": "timeseries" }, { @@ -5291,7 +5402,7 @@ "type": "prometheus", "uid": "$ds" }, - "description": "Shows the size of Label Compressor in bytes.\n\nLabels compressor encodes label-value pairs during aggregation to optimise memory usage. It is expected for its size to grow with time and to reset on vmagent restarts.\n\nRapid spikes in Label compressor size might be a sign of significant changes in labels of received samples.", + "description": "The 99th percentile of avg flush duration for the aggregated data. \n\nSmaller is better.\n\nAggregation can produce incorrect results ff flush duration exceeds configured deduplication interval. See \"Flush Timeouts\" panel.", "fieldConfig": { "defaults": { "color": { @@ -5344,17 +5455,17 @@ } ] }, - "unit": "decbytes" + "unit": "s" }, "overrides": [] }, "gridPos": { "h": 8, "w": 12, - "x": 0, + "x": 12, "y": 23 }, - "id": 140, + "id": 137, "options": { "legend": { "calcs": [], @@ -5374,14 +5485,111 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "vm_streamaggr_labels_compressor_size_bytes{job=~\"$job\",instance=~\"$instance\"} > 0", + "expr": "histogram_quantile(0.99, rate(vm_streamaggr_dedup_flush_duration_seconds_bucket{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]))", "instant": false, - "legendFormat": "{{instance}} ({{job}})", + "legendFormat": "{{url}} ({{job}}): match={{match}}; outputs={{outputs}}", "range": true, "refId": "A" } ], - "title": "Labels compressor bytes", + "title": "Dedup flush duration 0.99 quantile ($instance)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "$ds" + }, + "description": "Shows the eviction rate of time series because of staleness.\n\nThere are two stages where series can be marked as stale.\n1. Input. Aggregator keeps in memory each received unique time series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for this series. \n\n2. Output. The output key is a resulting time series produced by aggregating many input series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for any of input series for this aggregation.\n\nIncrease in `input` keys shows that series previously matched by the aggregation rule now became stale.\n\nIncrease in `output` keys shows that series previously produced by the aggregation rule now became stale.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "axisSoftMin": 0, + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 31 + }, + "id": 144, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "$ds" + }, + "editorMode": "code", + "expr": "increase(vm_streamaggr_stale_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0", + "instant": false, + "legendFormat": "{{url}} ({{job}}): match={{match}}; key={{key}}", + "range": true, + "refId": "A" + } + ], + "title": "Staleness rate ($instance)", "type": "timeseries" }, { @@ -5443,13 +5651,30 @@ ] } }, - "overrides": [] + "overrides": [ + { + "matcher": { + "id": "byRegexp", + "options": "/bytes.*/" + }, + "properties": [ + { + "id": "custom.axisPlacement", + "value": "right" + }, + { + "id": "unit", + "value": "bytes" + } + ] + } + ] }, "gridPos": { "h": 8, "w": 12, "x": 12, - "y": 23 + "y": 31 }, "id": 141, "options": { @@ -5471,14 +5696,28 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "vm_streamaggr_labels_compressor_items_count{job=~\"$job\",instance=~\"$instance\"} > 0", + "expr": "max(vm_streamaggr_labels_compressor_items_count{job=~\"$job\",instance=~\"$instance\"}) by(job, instance)", + "hide": false, "instant": false, - "legendFormat": "{{instance}} ({{job}})", + "legendFormat": "items: {{instance}} ({{job}})", "range": true, "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "$ds" + }, + "editorMode": "code", + "expr": "max(vm_streamaggr_labels_compressor_size_bytes{job=~\"$job\", instance=~\"$instance\"}) by(job, instance)", + "hide": false, + "instant": false, + "legendFormat": "bytes: {{instance}} ({{job}})", + "range": true, + "refId": "B" } ], - "title": "Labels compressor items count", + "title": "Labels compressor ($instance)", "type": "timeseries" } ], @@ -5550,7 +5789,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -5566,7 +5806,7 @@ "h": 8, "w": 12, "x": 0, - "y": 16 + "y": 8 }, "id": 60, "options": { @@ -5654,7 +5894,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -5670,7 +5911,7 @@ "h": 8, "w": 12, "x": 12, - "y": 16 + "y": 8 }, "id": 66, "options": { @@ -5758,7 +5999,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -5774,7 +6016,7 @@ "h": 8, "w": 12, "x": 0, - "y": 24 + "y": 16 }, "id": 61, "options": { @@ -5862,7 +6104,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -5878,7 +6121,7 @@ "h": 8, "w": 12, "x": 12, - "y": 24 + "y": 16 }, "id": 65, "options": { @@ -5928,6 +6171,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -5941,6 +6185,7 @@ "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, @@ -5963,7 +6208,8 @@ "mode": "absolute", "steps": [ { - "color": "transparent" + "color": "transparent", + "value": null }, { "color": "red", @@ -5979,7 +6225,7 @@ "h": 8, "w": 12, "x": 0, - "y": 32 + "y": 24 }, "id": 88, "options": { @@ -6025,6 +6271,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -6077,7 +6324,7 @@ "h": 8, "w": 12, "x": 12, - "y": 32 + "y": 24 }, "id": 84, "options": { @@ -6127,6 +6374,7 @@ "mode": "palette-classic" }, "custom": { + "axisBorderShow": false, "axisCenteredZero": false, "axisColorMode": "text", "axisLabel": "", @@ -6140,6 +6388,7 @@ "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, @@ -6162,7 +6411,8 @@ "mode": "absolute", "steps": [ { - "color": "transparent" + "color": "transparent", + "value": null }, { "color": "red", @@ -6178,7 +6428,7 @@ "h": 8, "w": 12, "x": 0, - "y": 40 + "y": 32 }, "id": 90, "options": { @@ -6983,4 +7233,4 @@ "uid": "G7Z9GzMGz", "version": 1, "weekStart": "" -} \ No newline at end of file +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3565afe96..55e420143 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -32,6 +32,13 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow overriding `Host` header with a target host before sending to a downstream. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6453) * FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): add [Grafana dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmauth.json) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml) for [vmauth](https://docs.victoriametrics.com/vmauth/) dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4313) for details. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): added `yandexcloud_sd` AWS API IMDSv2 support. +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): expose metrics related to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): + * `vm_streamaggr_matched_samples_total` - shows the number of samples matched by the aggregation rule; + * `vm_streamaggr_flushed_samples_total` - shows the number of samples produced by the aggregation rule; + * `vm_streamaggr_samples_lag_seconds` - shows the max lag between samples timestamps within one batch received by the aggregation; + * `vm_streamaggr_stale_samples_total` - shows the number of time series that became [stale](https://docs.victoriametrics.com/stream-aggregation/#staleness) during aggregation; + * metrics related to stream aggregation got additional labels `match` (matching param), `group` (`by` or `without` param), `url` (address of `remoteWrite.url` where aggregation is applied), `position` (the position of the aggregation rule in config file). + * These and other metrics were reflected on the [vmagent dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmagent.json) in `stream aggregation` section. * BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana). * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Fix the dateMetricIDCache consistency issue that leads to duplicate per-day index entries when new time series are inserted concurrently. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6534) for details. diff --git a/lib/promrelabel/if_expression.go b/lib/promrelabel/if_expression.go index 7d102a1f6..90790c244 100644 --- a/lib/promrelabel/if_expression.go +++ b/lib/promrelabel/if_expression.go @@ -1,6 +1,7 @@ package promrelabel import ( + "bytes" "encoding/json" "fmt" @@ -143,7 +144,13 @@ func (ie *IfExpression) String() string { if len(ie.ies) == 1 { return ie.ies[0].String() } - return fmt.Sprintf("%s", ie.ies) + var buf bytes.Buffer + buf.WriteString(ie.ies[0].String()) + for _, e := range ie.ies[1:] { + buf.WriteString(",") + buf.WriteString(e.String()) + } + return buf.String() } type ifExpression struct { diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index 0212bfd9e..c49580dcc 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -1,6 +1,7 @@ package streamaggr import ( + "fmt" "slices" "sync" "time" @@ -35,7 +36,7 @@ type Deduplicator struct { // Common case is to drop `replica`-like labels from samples received from HA datasources. // // MustStop must be called on the returned deduplicator in order to free up occupied resources. -func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string) *Deduplicator { +func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator { d := &Deduplicator{ da: newDedupAggr(), dropLabels: dropLabels, @@ -45,15 +46,17 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels } ms := d.ms - _ = ms.NewGauge(`vm_streamaggr_dedup_state_size_bytes`, func() float64 { + + metricLabels := fmt.Sprintf(`url=%q`, alias) + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { return float64(d.da.sizeBytes()) }) - _ = ms.NewGauge(`vm_streamaggr_dedup_state_items_count`, func() float64 { + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 { return float64(d.da.itemsCount()) }) - d.dedupFlushDuration = ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`) - d.dedupFlushTimeouts = ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`) + d.dedupFlushDuration = ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels)) + d.dedupFlushTimeouts = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels)) metrics.RegisterSet(ms) diff --git a/lib/streamaggr/deduplicator_test.go b/lib/streamaggr/deduplicator_test.go index 96be55ef8..fe32cbb7a 100644 --- a/lib/streamaggr/deduplicator_test.go +++ b/lib/streamaggr/deduplicator_test.go @@ -29,7 +29,7 @@ foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="as baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3 `) - d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}) + d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}, "global") for i := 0; i < 10; i++ { d.Push(tss) } diff --git a/lib/streamaggr/deduplicator_timing_test.go b/lib/streamaggr/deduplicator_timing_test.go index e2b764041..ed98b638c 100644 --- a/lib/streamaggr/deduplicator_timing_test.go +++ b/lib/streamaggr/deduplicator_timing_test.go @@ -9,7 +9,7 @@ import ( func BenchmarkDeduplicatorPush(b *testing.B) { pushFunc := func(_ []prompbmarshal.TimeSeries) {} - d := NewDeduplicator(pushFunc, time.Hour, nil) + d := NewDeduplicator(pushFunc, time.Hour, nil, "global") b.ReportAllocs() b.SetBytes(int64(len(benchSeries))) diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 4cce32c2d..afb7ac8b7 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -66,8 +66,9 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { } } -func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { +func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) { m := &as.m + var staleOutputSamples int m.Range(func(k, v interface{}) bool { sv := v.(*histogramBucketStateValue) @@ -76,6 +77,7 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { if deleted { // Mark the current entry as deleted sv.deleted = deleted + staleOutputSamples++ } sv.mu.Unlock() @@ -84,14 +86,14 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { } return true }) + ctx.a.staleOutputSamples["histogram_bucket"].Add(staleOutputSamples) } -func (as *histogramBucketAggrState) flushState(ctx *flushCtx, resetState bool) { - _ = resetState // it isn't used here +func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 - as.removeOldEntries(currentTime) + as.removeOldEntries(ctx, currentTime) m := &as.m m.Range(func(k, v interface{}) bool { diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index 33d81bc2f..07fd2783b 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -108,6 +108,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 + var staleOutputSamples, staleInputSamples int m := &as.m m.Range(func(k, v interface{}) bool { @@ -120,6 +121,7 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { // Mark the current entry as deleted sv.deleted = deleted sv.mu.Unlock() + staleOutputSamples++ m.Delete(k) return true } @@ -130,6 +132,7 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { for k1, v1 := range lvs { if currentTime > v1.deleteDeadline { delete(lvs, k1) + staleInputSamples++ continue } rateInterval := v1.timestamp - v1.prevTimestamp @@ -153,4 +156,6 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { ctx.appendSeries(key, as.suffix, currentTimeMsec, rate) return true }) + ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples) + ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples) } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 34ad118cf..831cd500f 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -47,6 +47,9 @@ var supportedOutputs = []string{ "quantiles(phi1, ..., phiN)", } +// maxLabelValueLen is maximum match expression label value length in stream aggregation metrics +const maxLabelValueLen = 64 + var ( // lc contains information about all compressed labels for streaming aggregation lc promutils.LabelsCompressor @@ -65,7 +68,7 @@ var ( // opts can contain additional options. If opts is nil, then default options are used. // // The returned Aggregators must be stopped with MustStop() when no longer needed. -func LoadFromFile(path string, pushFunc PushFunc, opts *Options) (*Aggregators, error) { +func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, error) { data, err := fscore.ReadFileOrHTTP(path) if err != nil { return nil, fmt.Errorf("cannot load aggregators: %w", err) @@ -133,6 +136,12 @@ type Options struct { // // This option can be overridden individually per each aggregation via ignore_first_intervals option. IgnoreFirstIntervals int + + // Alias is name or url of remote write context + Alias string + + // aggrID is aggregators id number starting from 1, which is used in metrics labels + aggrID int } // Config is a configuration for a single stream aggregation. @@ -243,7 +252,7 @@ type Aggregators struct { ms *metrics.Set } -func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Aggregators, error) { +func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) @@ -252,6 +261,7 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Agg ms := metrics.NewSet() as := make([]*aggregator, len(cfgs)) for i, cfg := range cfgs { + opts.aggrID = i + 1 a, err := newAggregator(cfg, pushFunc, ms, opts) if err != nil { // Stop already initialized aggregators before returning the error. @@ -267,7 +277,8 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Agg logger.Panicf("BUG: cannot marshal the provided configs: %s", err) } - _ = ms.NewGauge(`vm_streamaggr_dedup_state_size_bytes`, func() float64 { + metricLabels := fmt.Sprintf("url=%q", opts.Alias) + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { n := uint64(0) for _, aggr := range as { if aggr.da != nil { @@ -276,7 +287,7 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Agg } return float64(n) }) - _ = ms.NewGauge(`vm_streamaggr_dedup_state_items_count`, func() float64 { + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 { n := uint64(0) for _, aggr := range as { if aggr.da != nil { @@ -395,6 +406,10 @@ type aggregator struct { dedupFlushTimeouts *metrics.Counter ignoredOldSamples *metrics.Counter ignoredNanSamples *metrics.Counter + matchedSamples *metrics.Counter + staleInputSamples map[string]*metrics.Counter + staleOutputSamples map[string]*metrics.Counter + flushedSamples map[string]*metrics.Counter } type aggrState interface { @@ -414,11 +429,7 @@ type PushFunc func(tss []prompbmarshal.TimeSeries) // opts can contain additional options. If opts is nil, then default options are used. // // The returned aggregator must be stopped when no longer needed by calling MustStop(). -func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Options) (*aggregator, error) { - if opts == nil { - opts = &Options{} - } - +func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options) (*aggregator, error) { // check cfg.Interval if cfg.Interval == "" { return nil, fmt.Errorf("missing `interval` option") @@ -586,14 +597,26 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option // initialize suffix to add to metric names after aggregation suffix := ":" + cfg.Interval + group := "none" if labels := removeUnderscoreName(by); len(labels) > 0 { + group = fmt.Sprintf("by: %s", strings.Join(labels, ",")) suffix += fmt.Sprintf("_by_%s", strings.Join(labels, "_")) } if labels := removeUnderscoreName(without); len(labels) > 0 { + group = fmt.Sprintf("without: %s", strings.Join(labels, ",")) suffix += fmt.Sprintf("_without_%s", strings.Join(labels, "_")) } suffix += "_" + outputs := strings.Join(cfg.Outputs, ",") + + matchExpr := cfg.Match.String() + if len(matchExpr) > maxLabelValueLen { + matchExpr = matchExpr[:maxLabelValueLen-3] + "..." + } + + metricLabels := fmt.Sprintf(`match=%q, group=%q, url=%q, position="%d"`, matchExpr, group, opts.Alias, opts.aggrID) + // initialize the aggregator a := &aggregator{ match: cfg.Match, @@ -615,14 +638,27 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option stopCh: make(chan struct{}), - flushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_flush_duration_seconds`), - dedupFlushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`), - samplesLag: ms.GetOrCreateHistogram(`vm_streamaggr_samples_lag_seconds`), + flushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)), + dedupFlushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)), + samplesLag: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_samples_lag_seconds{outputs=%q, %s}`, outputs, metricLabels)), - flushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_flush_timeouts_total`), - dedupFlushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`), - ignoredNanSamples: ms.GetOrCreateCounter(`vm_streamaggr_ignored_samples_total{reason="nan"}`), - ignoredOldSamples: ms.GetOrCreateCounter(`vm_streamaggr_ignored_samples_total{reason="too_old"}`), + matchedSamples: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_matched_samples_total{outputs=%q, %s}`, outputs, metricLabels)), + flushTimeouts: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_flush_timeouts_total{outputs=%q, %s}`, outputs, metricLabels)), + dedupFlushTimeouts: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{outputs=%q, %s}`, outputs, metricLabels)), + ignoredNanSamples: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="nan", outputs=%q, %s}`, outputs, metricLabels)), + ignoredOldSamples: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="too_old", outputs=%q, %s}`, outputs, metricLabels)), + staleInputSamples: make(map[string]*metrics.Counter, len(cfg.Outputs)), + staleOutputSamples: make(map[string]*metrics.Counter, len(cfg.Outputs)), + flushedSamples: make(map[string]*metrics.Counter, len(cfg.Outputs)), + } + for _, output := range cfg.Outputs { + // Removing output args for metric label value in outputs like quantile(arg1, arg2) + if ri := strings.IndexRune(output, '('); ri >= 0 { + output = output[:ri] + } + a.staleInputSamples[output] = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_stale_samples_total{key="input", output=%q, %s}`, output, metricLabels)) + a.staleOutputSamples[output] = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_stale_samples_total{key="output", output=%q, %s}`, output, metricLabels)) + a.flushedSamples[output] = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_flushed_samples_total{output=%q, %s}`, output, metricLabels)) } if dedupInterval > 0 { a.da = newDedupAggr() @@ -821,8 +857,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { dropLabels := a.dropInputLabels ignoreOldSamples := a.ignoreOldSamples minTimestamp := a.minTimestamp.Load() - var totalLag int64 - var totalSamples int + var maxLag int64 for idx, ts := range tss { if !a.match.Match(ts.Labels) { continue @@ -865,8 +900,9 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { // Skip old samples outside the current aggregation interval continue } - totalLag += now - sample.Timestamp - totalSamples++ + if maxLag < now-sample.Timestamp { + maxLag = now - sample.Timestamp + } samples = append(samples, pushSample{ key: key, value: sample.Value, @@ -874,8 +910,9 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { }) } } - if totalSamples > 0 { - a.samplesLag.Update(float64(totalLag/int64(totalSamples)) / 1000) + if len(samples) > 0 { + a.matchedSamples.Add(len(samples)) + a.samplesLag.Update(float64(maxLag) / 1_000) } ctx.samples = samples ctx.buf = buf @@ -1088,6 +1125,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo Labels: ctx.labels[labelsLen:], Samples: ctx.samples[samplesLen:], }) + ctx.a.flushedSamples[suffix].Add(len(ctx.tss)) // Limit the maximum length of ctx.tss in order to limit memory usage. if len(ctx.tss) >= 10_000 { @@ -1115,6 +1153,7 @@ func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp in Labels: ctx.labels[labelsLen:], Samples: ctx.samples[samplesLen:], }) + ctx.a.flushedSamples[suffix].Add(len(ctx.tss)) } func addMetricSuffix(labels []prompbmarshal.Label, offset int, firstSuffix, lastSuffix string) []prompbmarshal.Label { diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 5090ee31c..c6d17b832 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -20,7 +20,7 @@ func TestAggregatorsFailure(t *testing.T) { pushFunc := func(_ []prompbmarshal.TimeSeries) { panic(fmt.Errorf("pushFunc shouldn't be called")) } - a, err := newAggregatorsFromData([]byte(config), pushFunc, nil) + a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{}) if err == nil { t.Fatalf("expecting non-nil error") } @@ -158,11 +158,11 @@ func TestAggregatorsEqual(t *testing.T) { t.Helper() pushFunc := func(_ []prompbmarshal.TimeSeries) {} - aa, err := newAggregatorsFromData([]byte(a), pushFunc, nil) + aa, err := newAggregatorsFromData([]byte(a), pushFunc, Options{}) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } - ab, err := newAggregatorsFromData([]byte(b), pushFunc, nil) + ab, err := newAggregatorsFromData([]byte(b), pushFunc, Options{}) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -221,7 +221,7 @@ func TestAggregatorsSuccess(t *testing.T) { tssOutput = appendClonedTimeseries(tssOutput, tss) tssOutputLock.Unlock() } - opts := &Options{ + opts := Options{ FlushOnShutdown: true, NoAlignFlushToInterval: true, } @@ -917,7 +917,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { } tssOutputLock.Unlock() } - opts := &Options{ + opts := Options{ DedupInterval: 30 * time.Second, FlushOnShutdown: true, } diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index f06da26cc..b328a61a0 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -92,7 +92,7 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators { outputs: [%s] `, strings.Join(outputsQuoted, ",")) - a, err := newAggregatorsFromData([]byte(config), pushFunc, nil) + a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{}) if err != nil { panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) } diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index 365fdb35d..6598d7c15 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -124,8 +124,9 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { } } -func (as *totalAggrState) removeOldEntries(currentTime uint64) { +func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) { m := &as.m + var staleInputSamples, staleOutputSamples int m.Range(func(k, v interface{}) bool { sv := v.(*totalStateValue) @@ -134,12 +135,14 @@ func (as *totalAggrState) removeOldEntries(currentTime uint64) { if deleted { // Mark the current entry as deleted sv.deleted = deleted + staleOutputSamples++ } else { // Delete outdated entries in sv.lastValues m := sv.lastValues for k1, v1 := range m { if currentTime > v1.deleteDeadline { delete(m, k1) + staleInputSamples++ } } } @@ -150,13 +153,15 @@ func (as *totalAggrState) removeOldEntries(currentTime uint64) { } return true }) + ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples) + ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples) } func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000 - as.removeOldEntries(currentTime) + as.removeOldEntries(ctx, currentTime) m := &as.m m.Range(func(k, v interface{}) bool {