mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-05 14:22:15 +01:00
50820ab1aa
- Fix Prometheus-compatible naming after applying the relabeling if -usePromCompatibleNaming command-line flag is set. This should prevent from possible Prometheus-incompatible metric names and label names generated by the relabeling. - Do not return anything from relabelCtx.appendExtraLabels() function, since it cannot change the number of time series passed to it. Append labels for the passed time series in-place. - Remove promrelabel.FinalizeLabels() call after adding extra labels to time series, since this call has been already made at relabelCtx.applyRelabeling(). It is user's responsibility if he passes labels with double underscore prefixes to -remoteWrite.label. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4247
181 lines
5.9 KiB
Go
181 lines
5.9 KiB
Go
package remotewrite
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
)
|
|
|
|
var (
|
|
unparsedLabelsGlobal = flagutil.NewArrayString("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. "+
|
|
"Pass multiple -remoteWrite.label flags in order to add multiple labels to metrics before sending them to remote storage")
|
|
relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabeling configs, which are applied "+
|
|
"to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. "+
|
|
"The path can point either to local file or to http url. "+
|
|
"See https://docs.victoriametrics.com/vmagent.html#relabeling")
|
|
relabelConfigPaths = flagutil.NewArrayString("remoteWrite.urlRelabelConfig", "Optional path to relabel configs for the corresponding -remoteWrite.url. "+
|
|
"See also -remoteWrite.relabelConfig. The path can point either to local file or to http url. "+
|
|
"See https://docs.victoriametrics.com/vmagent.html#relabeling")
|
|
|
|
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
|
|
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
|
|
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
|
|
)
|
|
|
|
var labelsGlobal []prompbmarshal.Label
|
|
|
|
// CheckRelabelConfigs checks -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig.
|
|
func CheckRelabelConfigs() error {
|
|
_, err := loadRelabelConfigs()
|
|
return err
|
|
}
|
|
|
|
func loadRelabelConfigs() (*relabelConfigs, error) {
|
|
var rcs relabelConfigs
|
|
if *relabelConfigPathGlobal != "" {
|
|
global, err := promrelabel.LoadRelabelConfigs(*relabelConfigPathGlobal)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot load -remoteWrite.relabelConfig=%q: %w", *relabelConfigPathGlobal, err)
|
|
}
|
|
rcs.global = global
|
|
}
|
|
if len(*relabelConfigPaths) > (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) {
|
|
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url or -remoteWrite.multitenantURL args: %d",
|
|
len(*relabelConfigPaths), (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
|
|
}
|
|
rcs.perURL = make([]*promrelabel.ParsedConfigs, (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
|
|
for i, path := range *relabelConfigPaths {
|
|
if len(path) == 0 {
|
|
// Skip empty relabel config.
|
|
continue
|
|
}
|
|
prc, err := promrelabel.LoadRelabelConfigs(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %w", path, err)
|
|
}
|
|
rcs.perURL[i] = prc
|
|
}
|
|
return &rcs, nil
|
|
}
|
|
|
|
type relabelConfigs struct {
|
|
global *promrelabel.ParsedConfigs
|
|
perURL []*promrelabel.ParsedConfigs
|
|
}
|
|
|
|
// initLabelsGlobal must be called after parsing command-line flags.
|
|
func initLabelsGlobal() {
|
|
labelsGlobal = nil
|
|
for _, s := range *unparsedLabelsGlobal {
|
|
if len(s) == 0 {
|
|
continue
|
|
}
|
|
n := strings.IndexByte(s, '=')
|
|
if n < 0 {
|
|
logger.Fatalf("missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q", s)
|
|
}
|
|
labelsGlobal = append(labelsGlobal, prompbmarshal.Label{
|
|
Name: s[:n],
|
|
Value: s[n+1:],
|
|
})
|
|
}
|
|
}
|
|
|
|
func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, pcs *promrelabel.ParsedConfigs) []prompbmarshal.TimeSeries {
|
|
if pcs.Len() == 0 && !*usePromCompatibleNaming {
|
|
// Nothing to change.
|
|
return tss
|
|
}
|
|
tssDst := tss[:0]
|
|
labels := rctx.labels[:0]
|
|
for i := range tss {
|
|
ts := &tss[i]
|
|
labelsLen := len(labels)
|
|
labels = append(labels, ts.Labels...)
|
|
labels = pcs.Apply(labels, labelsLen)
|
|
labels = promrelabel.FinalizeLabels(labels[:labelsLen], labels[labelsLen:])
|
|
if len(labels) == labelsLen {
|
|
// Drop the current time series, since relabeling removed all the labels.
|
|
continue
|
|
}
|
|
if *usePromCompatibleNaming {
|
|
fixPromCompatibleNaming(labels[labelsLen:])
|
|
}
|
|
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
|
Labels: labels[labelsLen:],
|
|
Samples: ts.Samples,
|
|
})
|
|
}
|
|
rctx.labels = labels
|
|
return tssDst
|
|
}
|
|
|
|
func (rctx *relabelCtx) appendExtraLabels(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label) {
|
|
if len(extraLabels) == 0 {
|
|
return
|
|
}
|
|
labels := rctx.labels[:0]
|
|
for i := range tss {
|
|
ts := &tss[i]
|
|
labelsLen := len(labels)
|
|
labels = append(labels, ts.Labels...)
|
|
for j := range extraLabels {
|
|
extraLabel := extraLabels[j]
|
|
if *usePromCompatibleNaming {
|
|
extraLabel.Name = promrelabel.SanitizeLabelName(extraLabel.Name)
|
|
}
|
|
tmp := promrelabel.GetLabelByName(labels[labelsLen:], extraLabel.Name)
|
|
if tmp != nil {
|
|
tmp.Value = extraLabel.Value
|
|
} else {
|
|
labels = append(labels, extraLabel)
|
|
}
|
|
}
|
|
ts.Labels = labels[labelsLen:]
|
|
}
|
|
rctx.labels = labels
|
|
}
|
|
|
|
type relabelCtx struct {
|
|
// pool for labels, which are used during the relabeling.
|
|
labels []prompbmarshal.Label
|
|
}
|
|
|
|
func (rctx *relabelCtx) reset() {
|
|
promrelabel.CleanLabels(rctx.labels)
|
|
rctx.labels = rctx.labels[:0]
|
|
}
|
|
|
|
var relabelCtxPool = &sync.Pool{
|
|
New: func() interface{} {
|
|
return &relabelCtx{}
|
|
},
|
|
}
|
|
|
|
func getRelabelCtx() *relabelCtx {
|
|
return relabelCtxPool.Get().(*relabelCtx)
|
|
}
|
|
|
|
func putRelabelCtx(rctx *relabelCtx) {
|
|
rctx.labels = rctx.labels[:0]
|
|
relabelCtxPool.Put(rctx)
|
|
}
|
|
|
|
func fixPromCompatibleNaming(labels []prompbmarshal.Label) {
|
|
// Replace unsupported Prometheus chars in label names and metric names with underscores.
|
|
for i := range labels {
|
|
label := &labels[i]
|
|
if label.Name == "__name__" {
|
|
label.Value = promrelabel.SanitizeMetricName(label.Value)
|
|
} else {
|
|
label.Name = promrelabel.SanitizeLabelName(label.Name)
|
|
}
|
|
}
|
|
}
|