mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-16 17:21:04 +01:00
af19ca2483
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/320 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/308
114 lines
3.5 KiB
Go
114 lines
3.5 KiB
Go
package remotewrite
|
|
|
|
import (
|
|
"flag"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
)
|
|
|
|
var (
|
|
unparsedLabelsGlobal = flagutil.NewArray("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. "+
|
|
"Pass multiple -remoteWrite.label flags in order to add multiple flags to metrics before sending them to remote storage")
|
|
relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabel_config entries. These entries are applied to all the metrics "+
|
|
"before sending them to -remoteWrite.url. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config for details")
|
|
)
|
|
|
|
var labelsGlobal []prompbmarshal.Label
|
|
var prcsGlobal []promrelabel.ParsedRelabelConfig
|
|
|
|
// initRelabelGlobal must be called after parsing command-line flags.
|
|
func initRelabelGlobal() {
|
|
// Init labelsGlobal
|
|
labelsGlobal = nil
|
|
for _, s := range *unparsedLabelsGlobal {
|
|
n := strings.IndexByte(s, '=')
|
|
if n < 0 {
|
|
logger.Panicf("FATAL: missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q", s)
|
|
}
|
|
labelsGlobal = append(labelsGlobal, prompbmarshal.Label{
|
|
Name: s[:n],
|
|
Value: s[n+1:],
|
|
})
|
|
}
|
|
|
|
// Init prcsGlobal
|
|
prcsGlobal = nil
|
|
if len(*relabelConfigPathGlobal) > 0 {
|
|
var err error
|
|
prcsGlobal, err = promrelabel.LoadRelabelConfigs(*relabelConfigPathGlobal)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: cannot load relabel configs from -remoteWrite.relabelConfig=%q: %s", *relabelConfigPathGlobal, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label, prcs []promrelabel.ParsedRelabelConfig) []prompbmarshal.TimeSeries {
|
|
if len(extraLabels) == 0 && len(prcs) == 0 {
|
|
// Nothing to change.
|
|
return tss
|
|
}
|
|
tssDst := tss[:0]
|
|
labels := rctx.labels[:0]
|
|
for i := range tss {
|
|
ts := &tss[i]
|
|
labelsLen := len(labels)
|
|
labels = append(labels, ts.Labels...)
|
|
// extraLabels must be added before applying relabeling according to https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write
|
|
for j := range extraLabels {
|
|
extraLabel := &extraLabels[j]
|
|
tmp := promrelabel.GetLabelByName(labels[labelsLen:], extraLabel.Name)
|
|
if tmp != nil {
|
|
tmp.Value = extraLabel.Value
|
|
} else {
|
|
labels = append(labels, *extraLabel)
|
|
}
|
|
}
|
|
labels = promrelabel.ApplyRelabelConfigs(labels, labelsLen, prcs, true)
|
|
if len(labels) == labelsLen {
|
|
// Drop the current time series, since relabeling removed all the labels.
|
|
continue
|
|
}
|
|
tssDst = append(tssDst, prompbmarshal.TimeSeries{
|
|
Labels: labels[labelsLen:],
|
|
Samples: ts.Samples,
|
|
})
|
|
}
|
|
rctx.labels = labels
|
|
return tssDst
|
|
}
|
|
|
|
type relabelCtx struct {
|
|
// pool for labels, which are used during the relabeling.
|
|
labels []prompbmarshal.Label
|
|
}
|
|
|
|
func (rctx *relabelCtx) reset() {
|
|
labels := rctx.labels
|
|
for i := range labels {
|
|
label := &labels[i]
|
|
label.Name = ""
|
|
label.Value = ""
|
|
}
|
|
rctx.labels = rctx.labels[:0]
|
|
}
|
|
|
|
var relabelCtxPool = &sync.Pool{
|
|
New: func() interface{} {
|
|
return &relabelCtx{}
|
|
},
|
|
}
|
|
|
|
func getRelabelCtx() *relabelCtx {
|
|
return relabelCtxPool.Get().(*relabelCtx)
|
|
}
|
|
|
|
func putRelabelCtx(rctx *relabelCtx) {
|
|
rctx.labels = rctx.labels[:0]
|
|
relabelCtxPool.Put(rctx)
|
|
}
|