mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
vmagent: properly add extra labels before sending data to remote storage
labels from `remoteWrite.label` are now added to sent metrics just before they
are pushed to `remoteWrite.url` after all relabelings, including stream aggregation relabelings (#4247)
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247
Signed-off-by: Alexander Marshalov <_@marshalov.org>
Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
(cherry picked from commit a27c2f3773
)
This commit is contained in:
parent
80ec24a088
commit
58cf862b05
@ -87,8 +87,8 @@ func initLabelsGlobal() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label, pcs *promrelabel.ParsedConfigs) []prompbmarshal.TimeSeries {
|
func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, pcs *promrelabel.ParsedConfigs) []prompbmarshal.TimeSeries {
|
||||||
if len(extraLabels) == 0 && pcs.Len() == 0 && !*usePromCompatibleNaming {
|
if pcs.Len() == 0 && !*usePromCompatibleNaming {
|
||||||
// Nothing to change.
|
// Nothing to change.
|
||||||
return tss
|
return tss
|
||||||
}
|
}
|
||||||
@ -98,16 +98,6 @@ func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLab
|
|||||||
ts := &tss[i]
|
ts := &tss[i]
|
||||||
labelsLen := len(labels)
|
labelsLen := len(labels)
|
||||||
labels = append(labels, ts.Labels...)
|
labels = append(labels, ts.Labels...)
|
||||||
// extraLabels must be added before applying relabeling according to https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write
|
|
||||||
for j := range extraLabels {
|
|
||||||
extraLabel := &extraLabels[j]
|
|
||||||
tmp := promrelabel.GetLabelByName(labels[labelsLen:], extraLabel.Name)
|
|
||||||
if tmp != nil {
|
|
||||||
tmp.Value = extraLabel.Value
|
|
||||||
} else {
|
|
||||||
labels = append(labels, *extraLabel)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if *usePromCompatibleNaming {
|
if *usePromCompatibleNaming {
|
||||||
// Replace unsupported Prometheus chars in label names and metric names with underscores.
|
// Replace unsupported Prometheus chars in label names and metric names with underscores.
|
||||||
tmpLabels := labels[labelsLen:]
|
tmpLabels := labels[labelsLen:]
|
||||||
@ -135,6 +125,38 @@ func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLab
|
|||||||
return tssDst
|
return tssDst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rctx *relabelCtx) appendExtraLabels(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label) []prompbmarshal.TimeSeries {
|
||||||
|
if len(extraLabels) == 0 {
|
||||||
|
return tss
|
||||||
|
}
|
||||||
|
tssDst := tss[:0]
|
||||||
|
labels := rctx.labels[:0]
|
||||||
|
for i := range tss {
|
||||||
|
ts := &tss[i]
|
||||||
|
labelsLen := len(labels)
|
||||||
|
labels = append(labels, ts.Labels...)
|
||||||
|
for j := range extraLabels {
|
||||||
|
extraLabel := extraLabels[j]
|
||||||
|
if *usePromCompatibleNaming {
|
||||||
|
extraLabel.Name = promrelabel.SanitizeLabelName(extraLabel.Name)
|
||||||
|
}
|
||||||
|
tmp := promrelabel.GetLabelByName(labels[labelsLen:], extraLabel.Name)
|
||||||
|
if tmp != nil {
|
||||||
|
tmp.Value = extraLabel.Value
|
||||||
|
} else {
|
||||||
|
labels = append(labels, extraLabel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
labels = promrelabel.FinalizeLabels(labels[:labelsLen], labels[labelsLen:])
|
||||||
|
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
||||||
|
Labels: labels[labelsLen:],
|
||||||
|
Samples: ts.Samples,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
rctx.labels = labels
|
||||||
|
return tssDst
|
||||||
|
}
|
||||||
|
|
||||||
type relabelCtx struct {
|
type relabelCtx struct {
|
||||||
// pool for labels, which are used during the relabeling.
|
// pool for labels, which are used during the relabeling.
|
||||||
labels []prompbmarshal.Label
|
labels []prompbmarshal.Label
|
||||||
|
@ -10,18 +10,16 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestApplyRelabeling(t *testing.T) {
|
func TestApplyRelabeling(t *testing.T) {
|
||||||
f := func(extraLabels []prompbmarshal.Label, pcs *promrelabel.ParsedConfigs, sTss, sExpTss string) {
|
f := func(pcs *promrelabel.ParsedConfigs, sTss, sExpTss string) {
|
||||||
rctx := &relabelCtx{}
|
rctx := &relabelCtx{}
|
||||||
tss, expTss := parseSeries(sTss), parseSeries(sExpTss)
|
tss, expTss := parseSeries(sTss), parseSeries(sExpTss)
|
||||||
gotTss := rctx.applyRelabeling(tss, extraLabels, pcs)
|
gotTss := rctx.applyRelabeling(tss, pcs)
|
||||||
if !reflect.DeepEqual(gotTss, expTss) {
|
if !reflect.DeepEqual(gotTss, expTss) {
|
||||||
t.Fatalf("expected to have: \n%v;\ngot: \n%v", expTss, gotTss)
|
t.Fatalf("expected to have: \n%v;\ngot: \n%v", expTss, gotTss)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
f(nil, nil, "up", "up")
|
f(nil, "up", "up")
|
||||||
f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, nil, "up", `up{foo="bar"}`)
|
|
||||||
f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, nil, `up{foo="baz"}`, `up{foo="bar"}`)
|
|
||||||
|
|
||||||
pcs, err := promrelabel.ParseRelabelConfigsData([]byte(`
|
pcs, err := promrelabel.ParseRelabelConfigsData([]byte(`
|
||||||
- target_label: "foo"
|
- target_label: "foo"
|
||||||
@ -32,11 +30,32 @@ func TestApplyRelabeling(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %s", err)
|
t.Fatalf("unexpected error: %s", err)
|
||||||
}
|
}
|
||||||
f(nil, pcs, `up{foo="baz", env="prod"}`, `up{foo="aaa"}`)
|
f(pcs, `up{foo="baz", env="prod"}`, `up{foo="aaa"}`)
|
||||||
|
|
||||||
oldVal := *usePromCompatibleNaming
|
oldVal := *usePromCompatibleNaming
|
||||||
*usePromCompatibleNaming = true
|
*usePromCompatibleNaming = true
|
||||||
f(nil, nil, `foo.bar`, `foo_bar`)
|
f(nil, `foo.bar`, `foo_bar`)
|
||||||
|
*usePromCompatibleNaming = oldVal
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAppendExtraLabels(t *testing.T) {
|
||||||
|
f := func(extraLabels []prompbmarshal.Label, sTss, sExpTss string) {
|
||||||
|
rctx := &relabelCtx{}
|
||||||
|
tss, expTss := parseSeries(sTss), parseSeries(sExpTss)
|
||||||
|
gotTss := rctx.appendExtraLabels(tss, extraLabels)
|
||||||
|
if !reflect.DeepEqual(gotTss, expTss) {
|
||||||
|
t.Fatalf("expected to have: \n%v;\ngot: \n%v", expTss, gotTss)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f(nil, "up", "up")
|
||||||
|
f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, "up", `up{foo="bar"}`)
|
||||||
|
f([]prompbmarshal.Label{{Name: "foo", Value: "bar"}}, `up{foo="baz"}`, `up{foo="bar"}`)
|
||||||
|
f([]prompbmarshal.Label{{Name: "baz", Value: "qux"}}, `up{foo="baz"}`, `up{foo="baz",baz="qux"}`)
|
||||||
|
|
||||||
|
oldVal := *usePromCompatibleNaming
|
||||||
|
*usePromCompatibleNaming = true
|
||||||
|
f([]prompbmarshal.Label{{Name: "foo.bar", Value: "baz"}}, "up", `up{foo_bar="baz"}`)
|
||||||
*usePromCompatibleNaming = oldVal
|
*usePromCompatibleNaming = oldVal
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -386,7 +386,7 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
|||||||
}
|
}
|
||||||
if rctx != nil {
|
if rctx != nil {
|
||||||
rowsCountBeforeRelabel := getRowsCount(tssBlock)
|
rowsCountBeforeRelabel := getRowsCount(tssBlock)
|
||||||
tssBlock = rctx.applyRelabeling(tssBlock, labelsGlobal, pcsGlobal)
|
tssBlock = rctx.applyRelabeling(tssBlock, pcsGlobal)
|
||||||
rowsCountAfterRelabel := getRowsCount(tssBlock)
|
rowsCountAfterRelabel := getRowsCount(tssBlock)
|
||||||
rowsDroppedByGlobalRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel)
|
rowsDroppedByGlobalRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel)
|
||||||
}
|
}
|
||||||
@ -668,7 +668,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
|
|||||||
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
|
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
|
||||||
tss = append(*v, tss...)
|
tss = append(*v, tss...)
|
||||||
rowsCountBeforeRelabel := getRowsCount(tss)
|
rowsCountBeforeRelabel := getRowsCount(tss)
|
||||||
tss = rctx.applyRelabeling(tss, nil, pcs)
|
tss = rctx.applyRelabeling(tss, pcs)
|
||||||
rowsCountAfterRelabel := getRowsCount(tss)
|
rowsCountAfterRelabel := getRowsCount(tss)
|
||||||
rwctx.rowsDroppedByRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel)
|
rwctx.rowsDroppedByRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel)
|
||||||
}
|
}
|
||||||
@ -719,6 +719,11 @@ func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, drop
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
|
func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
|
||||||
|
if len(labelsGlobal) > 0 {
|
||||||
|
rctx := getRelabelCtx()
|
||||||
|
tss = rctx.appendExtraLabels(tss, labelsGlobal)
|
||||||
|
putRelabelCtx(rctx)
|
||||||
|
}
|
||||||
pss := rwctx.pss
|
pss := rwctx.pss
|
||||||
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
|
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
|
||||||
pss[idx].Push(tss)
|
pss[idx].Push(tss)
|
||||||
|
@ -26,6 +26,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components
|
|||||||
|
|
||||||
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add support for server-side copy of existing backups. See [these docs](https://docs.victoriametrics.com/vmbackup.html#server-side-copy-of-the-existing-backup) for details.
|
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add support for server-side copy of existing backups. See [these docs](https://docs.victoriametrics.com/vmbackup.html#server-side-copy-of-the-existing-backup) for details.
|
||||||
|
|
||||||
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): according to [the docs](https://docs.victoriametrics.com/vmagent.html#adding-labels-to-metrics) labels from `-remoteWrite.label` cmd-line flag are now added to the sent metrics just before they are pushed to the `-remoteWrite.url` (after all relabeling, including stream aggregation relabeling). In addition, it allows adding labels for identifying vmagent instances when using streaming aggregation in vmagents [cluster mode](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247) and [these docs](https://docs.victoriametrics.com/stream-aggregation.html#cluster-mode) for more details.
|
||||||
* BUGFIX: remove `DEBUG` logging when parsing `if` filters inside [relabeling rules](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) and when parsing `match` filters inside [stream aggregation rules](https://docs.victoriametrics.com/stream-aggregation.html).
|
* BUGFIX: remove `DEBUG` logging when parsing `if` filters inside [relabeling rules](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) and when parsing `match` filters inside [stream aggregation rules](https://docs.victoriametrics.com/stream-aggregation.html).
|
||||||
* BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071).
|
* BUGFIX: properly replace `:` chars in label names with `_` when `-usePromCompatibleNaming` command-line flag is passed to `vmagent`, `vminsert` or single-node VictoriaMetrics. This addresses [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113#issuecomment-1275077071).
|
||||||
* BUGFIX: [Official Grafana dashboards for VictoriaMetrics](https://grafana.com/orgs/victoriametrics): fix display of ingested rows rate for `Samples ingested/s` and `Samples rate` panels for vmagent's dasbhoard. Previously, not all ingested protocols were accounted in these panels. An extra panel `Rows rate` was added to `Ingestion` section to display the split for rows ingested rate by protocol.
|
* BUGFIX: [Official Grafana dashboards for VictoriaMetrics](https://grafana.com/orgs/victoriametrics): fix display of ingested rows rate for `Samples ingested/s` and `Samples rate` panels for vmagent's dasbhoard. Previously, not all ingested protocols were accounted in these panels. An extra panel `Rows rate` was added to `Ingestion` section to display the split for rows ingested rate by protocol.
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
---
|
---
|
||||||
sort: 98
|
sort: 98
|
||||||
weight: 98
|
weight: 98
|
||||||
title: streaming aggregation
|
title: Streaming aggregation
|
||||||
menu:
|
menu:
|
||||||
docs:
|
docs:
|
||||||
parent: "victoriametrics"
|
parent: "victoriametrics"
|
||||||
@ -10,7 +10,7 @@ aliases:
|
|||||||
- /stream-aggregation.html
|
- /stream-aggregation.html
|
||||||
---
|
---
|
||||||
|
|
||||||
# streaming aggregation
|
# Streaming aggregation
|
||||||
|
|
||||||
[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
|
[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
|
||||||
can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in streaming mode by time and by labels before data is written to remote storage.
|
can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in streaming mode by time and by labels before data is written to remote storage.
|
||||||
@ -673,3 +673,13 @@ support the following approaches for hot reloading stream aggregation configs fr
|
|||||||
```
|
```
|
||||||
|
|
||||||
* By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload).
|
* By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload).
|
||||||
|
|
||||||
|
## Cluster mode
|
||||||
|
|
||||||
|
If you use [vmagent in cluster mode](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets) for streaming aggregation
|
||||||
|
(with `-promscrape.cluster.*` parameters or with `VMAgent.spec.shardCount > 1` for [vmoperator](https://docs.victoriametrics.com/operator))
|
||||||
|
then be careful when aggregating metrics via `by`, `without` or modifying via`*_relabel_configs` parameters, as incorrect usage may result in duplicates and data collision. For example, if more than one vmagent calculates `increase` for metric `http_requests_total` with `by: [path]` directive, then the resulting time series written to the remote database will be indistinguishable, as there is no way to tell to which `instance` the aggregation belongs. The proper fix would be to add a unique aggregation dimension: `by: [instance, path]`. With this change, aggregates between `instances` won't collide.
|
||||||
|
|
||||||
|
If adding a new aggregation dimension isn't feasible (due to cardinality reduction purposes), then it is worth at least differentiating by which vmagent the aggregation was performed. You can do it with `remoteWrite.label` [parameter in vmagent](https://docs.victoriametrics.com/vmagent.html#adding-labels-to-metrics).
|
||||||
|
For example, for running in docker or k8s you can use `remoteWrite.label` with `POD_NAME` or `HOSTNAME` environment variable: `remoteWrite.label='vmagent=%{HOSTNAME}'`.
|
||||||
|
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247#issue-1692894073) for details.
|
||||||
|
Loading…
Reference in New Issue
Block a user