2020-02-23 12:35:47 +01:00
package remotewrite
import (
"flag"
2020-05-30 13:36:40 +02:00
"fmt"
2020-02-23 12:35:47 +01:00
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
var (
2020-03-03 12:08:17 +01:00
unparsedLabelsGlobal = flagutil . NewArray ( "remoteWrite.label" , "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. " +
2020-02-23 12:35:47 +01:00
"Pass multiple -remoteWrite.label flags in order to add multiple flags to metrics before sending them to remote storage" )
2020-03-03 12:08:17 +01:00
relabelConfigPathGlobal = flag . String ( "remoteWrite.relabelConfig" , "" , "Optional path to file with relabel_config entries. These entries are applied to all the metrics " +
2020-02-23 12:35:47 +01:00
"before sending them to -remoteWrite.url. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config for details" )
2020-05-30 13:36:40 +02:00
relabelConfigPaths = flagutil . NewArray ( "remoteWrite.urlRelabelConfig" , "Optional path to relabel config for the corresponding -remoteWrite.url" )
2020-02-23 12:35:47 +01:00
)
2020-03-03 12:08:17 +01:00
var labelsGlobal [ ] prompbmarshal . Label
2020-02-23 12:35:47 +01:00
2020-05-30 13:36:40 +02:00
// CheckRelabelConfigs checks -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig.
func CheckRelabelConfigs ( ) error {
_ , err := loadRelabelConfigs ( )
return err
}
func loadRelabelConfigs ( ) ( * relabelConfigs , error ) {
var rcs relabelConfigs
if * relabelConfigPathGlobal != "" {
global , err := promrelabel . LoadRelabelConfigs ( * relabelConfigPathGlobal )
if err != nil {
return nil , fmt . Errorf ( "cannot load -remoteWrite.relabelConfig=%q: %s" , * relabelConfigPathGlobal , err )
}
rcs . global = global
}
if len ( * relabelConfigPaths ) > len ( * remoteWriteURLs ) {
return nil , fmt . Errorf ( "too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d" ,
len ( * relabelConfigPaths ) , len ( * remoteWriteURLs ) )
}
rcs . perURL = make ( [ ] [ ] promrelabel . ParsedRelabelConfig , len ( * remoteWriteURLs ) )
for i , path := range * relabelConfigPaths {
prc , err := promrelabel . LoadRelabelConfigs ( path )
if err != nil {
return nil , fmt . Errorf ( "cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %s" , path , err )
}
rcs . perURL [ i ] = prc
}
return & rcs , nil
}
type relabelConfigs struct {
global [ ] promrelabel . ParsedRelabelConfig
perURL [ ] [ ] promrelabel . ParsedRelabelConfig
}
// initLabelsGlobal must be called after parsing command-line flags.
func initLabelsGlobal ( ) {
2020-03-03 12:08:17 +01:00
// Init labelsGlobal
labelsGlobal = nil
for _ , s := range * unparsedLabelsGlobal {
2020-02-23 12:35:47 +01:00
n := strings . IndexByte ( s , '=' )
if n < 0 {
2020-05-30 13:36:40 +02:00
logger . Fatalf ( "missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q" , s )
2020-02-23 12:35:47 +01:00
}
2020-03-03 12:08:17 +01:00
labelsGlobal = append ( labelsGlobal , prompbmarshal . Label {
2020-02-23 12:35:47 +01:00
Name : s [ : n ] ,
Value : s [ n + 1 : ] ,
} )
}
}
2020-03-03 12:08:17 +01:00
func ( rctx * relabelCtx ) applyRelabeling ( tss [ ] prompbmarshal . TimeSeries , extraLabels [ ] prompbmarshal . Label , prcs [ ] promrelabel . ParsedRelabelConfig ) [ ] prompbmarshal . TimeSeries {
2020-02-23 12:35:47 +01:00
if len ( extraLabels ) == 0 && len ( prcs ) == 0 {
// Nothing to change.
2020-02-28 17:57:45 +01:00
return tss
2020-02-23 12:35:47 +01:00
}
tssDst := tss [ : 0 ]
labels := rctx . labels [ : 0 ]
for i := range tss {
ts := & tss [ i ]
labelsLen := len ( labels )
labels = append ( labels , ts . Labels ... )
// extraLabels must be added before applying relabeling according to https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write
for j := range extraLabels {
extraLabel := & extraLabels [ j ]
tmp := promrelabel . GetLabelByName ( labels [ labelsLen : ] , extraLabel . Name )
if tmp != nil {
tmp . Value = extraLabel . Value
} else {
labels = append ( labels , * extraLabel )
}
}
labels = promrelabel . ApplyRelabelConfigs ( labels , labelsLen , prcs , true )
if len ( labels ) == labelsLen {
// Drop the current time series, since relabeling removed all the labels.
continue
}
tssDst = append ( tssDst , prompbmarshal . TimeSeries {
Labels : labels [ labelsLen : ] ,
Samples : ts . Samples ,
} )
}
rctx . labels = labels
2020-02-28 17:57:45 +01:00
return tssDst
2020-02-23 12:35:47 +01:00
}
type relabelCtx struct {
// pool for labels, which are used during the relabeling.
labels [ ] prompbmarshal . Label
}
func ( rctx * relabelCtx ) reset ( ) {
labels := rctx . labels
for i := range labels {
label := & labels [ i ]
label . Name = ""
label . Value = ""
}
rctx . labels = rctx . labels [ : 0 ]
}
var relabelCtxPool = & sync . Pool {
New : func ( ) interface { } {
return & relabelCtx { }
} ,
}
2020-03-03 12:08:17 +01:00
func getRelabelCtx ( ) * relabelCtx {
return relabelCtxPool . Get ( ) . ( * relabelCtx )
}
func putRelabelCtx ( rctx * relabelCtx ) {
rctx . labels = rctx . labels [ : 0 ]
relabelCtxPool . Put ( rctx )
}