mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 08:32:18 +01:00
141 lines
4.4 KiB
Go
141 lines
4.4 KiB
Go
package remotewrite
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
)
|
|
|
|
var (
|
|
unparsedLabelsGlobal = flagutil.NewArray("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. "+
|
|
"Pass multiple -remoteWrite.label flags in order to add multiple labels to metrics before sending them to remote storage")
|
|
relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabel_config entries. These entries are applied to all the metrics "+
|
|
"before sending them to -remoteWrite.url. See https://victoriametrics.github.io/vmagent.html#relabeling for details")
|
|
relabelConfigPaths = flagutil.NewArray("remoteWrite.urlRelabelConfig", "Optional path to relabel config for the corresponding -remoteWrite.url")
|
|
)
|
|
|
|
var labelsGlobal []prompbmarshal.Label
|
|
|
|
// CheckRelabelConfigs checks -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig.
|
|
func CheckRelabelConfigs() error {
|
|
_, err := loadRelabelConfigs()
|
|
return err
|
|
}
|
|
|
|
func loadRelabelConfigs() (*relabelConfigs, error) {
|
|
var rcs relabelConfigs
|
|
if *relabelConfigPathGlobal != "" {
|
|
global, err := promrelabel.LoadRelabelConfigs(*relabelConfigPathGlobal)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot load -remoteWrite.relabelConfig=%q: %w", *relabelConfigPathGlobal, err)
|
|
}
|
|
rcs.global = global
|
|
}
|
|
if len(*relabelConfigPaths) > len(*remoteWriteURLs) {
|
|
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
|
|
len(*relabelConfigPaths), len(*remoteWriteURLs))
|
|
}
|
|
rcs.perURL = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs))
|
|
for i, path := range *relabelConfigPaths {
|
|
if len(path) == 0 {
|
|
// Skip empty relabel config.
|
|
continue
|
|
}
|
|
prc, err := promrelabel.LoadRelabelConfigs(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %w", path, err)
|
|
}
|
|
rcs.perURL[i] = prc
|
|
}
|
|
return &rcs, nil
|
|
}
|
|
|
|
type relabelConfigs struct {
|
|
global *promrelabel.ParsedConfigs
|
|
perURL []*promrelabel.ParsedConfigs
|
|
}
|
|
|
|
// initLabelsGlobal must be called after parsing command-line flags.
|
|
func initLabelsGlobal() {
|
|
labelsGlobal = nil
|
|
for _, s := range *unparsedLabelsGlobal {
|
|
if len(s) == 0 {
|
|
continue
|
|
}
|
|
n := strings.IndexByte(s, '=')
|
|
if n < 0 {
|
|
logger.Fatalf("missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q", s)
|
|
}
|
|
labelsGlobal = append(labelsGlobal, prompbmarshal.Label{
|
|
Name: s[:n],
|
|
Value: s[n+1:],
|
|
})
|
|
}
|
|
}
|
|
|
|
func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label, pcs *promrelabel.ParsedConfigs) []prompbmarshal.TimeSeries {
|
|
if len(extraLabels) == 0 && pcs.Len() == 0 {
|
|
// Nothing to change.
|
|
return tss
|
|
}
|
|
tssDst := tss[:0]
|
|
labels := rctx.labels[:0]
|
|
for i := range tss {
|
|
ts := &tss[i]
|
|
labelsLen := len(labels)
|
|
labels = append(labels, ts.Labels...)
|
|
// extraLabels must be added before applying relabeling according to https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write
|
|
for j := range extraLabels {
|
|
extraLabel := &extraLabels[j]
|
|
tmp := promrelabel.GetLabelByName(labels[labelsLen:], extraLabel.Name)
|
|
if tmp != nil {
|
|
tmp.Value = extraLabel.Value
|
|
} else {
|
|
labels = append(labels, *extraLabel)
|
|
}
|
|
}
|
|
labels = pcs.Apply(labels, labelsLen, true)
|
|
if len(labels) == labelsLen {
|
|
// Drop the current time series, since relabeling removed all the labels.
|
|
continue
|
|
}
|
|
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
|
Labels: labels[labelsLen:],
|
|
Samples: ts.Samples,
|
|
})
|
|
}
|
|
rctx.labels = labels
|
|
return tssDst
|
|
}
|
|
|
|
type relabelCtx struct {
|
|
// pool for labels, which are used during the relabeling.
|
|
labels []prompbmarshal.Label
|
|
}
|
|
|
|
func (rctx *relabelCtx) reset() {
|
|
promrelabel.CleanLabels(rctx.labels)
|
|
rctx.labels = rctx.labels[:0]
|
|
}
|
|
|
|
var relabelCtxPool = &sync.Pool{
|
|
New: func() interface{} {
|
|
return &relabelCtx{}
|
|
},
|
|
}
|
|
|
|
func getRelabelCtx() *relabelCtx {
|
|
return relabelCtxPool.Get().(*relabelCtx)
|
|
}
|
|
|
|
func putRelabelCtx(rctx *relabelCtx) {
|
|
rctx.labels = rctx.labels[:0]
|
|
relabelCtxPool.Put(rctx)
|
|
}
|