2020-02-23 12:35:47 +01:00
package remotewrite
import (
"flag"
2020-05-30 13:36:40 +02:00
"fmt"
2020-02-23 12:35:47 +01:00
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
var (
2022-10-01 17:26:05 +02:00
unparsedLabelsGlobal = flagutil . NewArrayString ( "remoteWrite.label" , "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. " +
2021-03-25 16:54:55 +01:00
"Pass multiple -remoteWrite.label flags in order to add multiple labels to metrics before sending them to remote storage" )
2023-01-04 07:04:46 +01:00
relabelConfigPathGlobal = flag . String ( "remoteWrite.relabelConfig" , "" , "Optional path to file with relabeling configs, which are applied " +
"to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. " +
"The path can point either to local file or to http url. " +
"See https://docs.victoriametrics.com/vmagent.html#relabeling" )
relabelConfigPaths = flagutil . NewArrayString ( "remoteWrite.urlRelabelConfig" , "Optional path to relabel configs for the corresponding -remoteWrite.url. " +
"See also -remoteWrite.relabelConfig. The path can point either to local file or to http url. " +
"See https://docs.victoriametrics.com/vmagent.html#relabeling" )
2022-09-26 12:11:37 +02:00
usePromCompatibleNaming = flag . Bool ( "usePromCompatibleNaming" , false , "Whether to replace characters unsupported by Prometheus with underscores " +
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. " +
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels" )
2020-02-23 12:35:47 +01:00
)
2020-03-03 12:08:17 +01:00
var labelsGlobal [ ] prompbmarshal . Label
2020-02-23 12:35:47 +01:00
2020-05-30 13:36:40 +02:00
// CheckRelabelConfigs checks -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig.
func CheckRelabelConfigs ( ) error {
_ , err := loadRelabelConfigs ( )
return err
}
func loadRelabelConfigs ( ) ( * relabelConfigs , error ) {
var rcs relabelConfigs
if * relabelConfigPathGlobal != "" {
2022-12-10 11:09:21 +01:00
global , err := promrelabel . LoadRelabelConfigs ( * relabelConfigPathGlobal )
2020-05-30 13:36:40 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot load -remoteWrite.relabelConfig=%q: %w" , * relabelConfigPathGlobal , err )
2020-05-30 13:36:40 +02:00
}
rcs . global = global
}
2021-08-05 08:44:29 +02:00
if len ( * relabelConfigPaths ) > ( len ( * remoteWriteURLs ) + len ( * remoteWriteMultitenantURLs ) ) {
2021-08-05 08:46:19 +02:00
return nil , fmt . Errorf ( "too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url or -remoteWrite.multitenantURL args: %d" ,
2021-08-05 08:44:29 +02:00
len ( * relabelConfigPaths ) , ( len ( * remoteWriteURLs ) + len ( * remoteWriteMultitenantURLs ) ) )
2020-05-30 13:36:40 +02:00
}
2021-08-05 08:44:29 +02:00
rcs . perURL = make ( [ ] * promrelabel . ParsedConfigs , ( len ( * remoteWriteURLs ) + len ( * remoteWriteMultitenantURLs ) ) )
2020-05-30 13:36:40 +02:00
for i , path := range * relabelConfigPaths {
2020-07-20 14:49:05 +02:00
if len ( path ) == 0 {
// Skip empty relabel config.
continue
}
2022-12-10 11:09:21 +01:00
prc , err := promrelabel . LoadRelabelConfigs ( path )
2020-05-30 13:36:40 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %w" , path , err )
2020-05-30 13:36:40 +02:00
}
rcs . perURL [ i ] = prc
}
return & rcs , nil
}
type relabelConfigs struct {
2021-02-22 15:33:55 +01:00
global * promrelabel . ParsedConfigs
perURL [ ] * promrelabel . ParsedConfigs
2020-05-30 13:36:40 +02:00
}
// initLabelsGlobal must be called after parsing command-line flags.
func initLabelsGlobal ( ) {
2020-03-03 12:08:17 +01:00
labelsGlobal = nil
for _ , s := range * unparsedLabelsGlobal {
2020-12-08 13:53:41 +01:00
if len ( s ) == 0 {
continue
}
2020-02-23 12:35:47 +01:00
n := strings . IndexByte ( s , '=' )
if n < 0 {
2020-05-30 13:36:40 +02:00
logger . Fatalf ( "missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q" , s )
2020-02-23 12:35:47 +01:00
}
2020-03-03 12:08:17 +01:00
labelsGlobal = append ( labelsGlobal , prompbmarshal . Label {
2020-02-23 12:35:47 +01:00
Name : s [ : n ] ,
Value : s [ n + 1 : ] ,
} )
}
}
2023-08-15 13:47:48 +02:00
func ( rctx * relabelCtx ) applyRelabeling ( tss [ ] prompbmarshal . TimeSeries , pcs * promrelabel . ParsedConfigs ) [ ] prompbmarshal . TimeSeries {
if pcs . Len ( ) == 0 && ! * usePromCompatibleNaming {
2020-02-23 12:35:47 +01:00
// Nothing to change.
2020-02-28 17:57:45 +01:00
return tss
2020-02-23 12:35:47 +01:00
}
tssDst := tss [ : 0 ]
labels := rctx . labels [ : 0 ]
for i := range tss {
ts := & tss [ i ]
labelsLen := len ( labels )
labels = append ( labels , ts . Labels ... )
2022-10-09 13:51:14 +02:00
labels = pcs . Apply ( labels , labelsLen )
2022-10-13 11:04:10 +02:00
labels = promrelabel . FinalizeLabels ( labels [ : labelsLen ] , labels [ labelsLen : ] )
2020-02-23 12:35:47 +01:00
if len ( labels ) == labelsLen {
// Drop the current time series, since relabeling removed all the labels.
continue
}
2023-08-17 14:35:26 +02:00
if * usePromCompatibleNaming {
fixPromCompatibleNaming ( labels [ labelsLen : ] )
}
2020-02-23 12:35:47 +01:00
tssDst = append ( tssDst , prompbmarshal . TimeSeries {
Labels : labels [ labelsLen : ] ,
Samples : ts . Samples ,
} )
}
rctx . labels = labels
2020-02-28 17:57:45 +01:00
return tssDst
2020-02-23 12:35:47 +01:00
}
2023-08-17 14:35:26 +02:00
func ( rctx * relabelCtx ) appendExtraLabels ( tss [ ] prompbmarshal . TimeSeries , extraLabels [ ] prompbmarshal . Label ) {
2023-08-15 13:47:48 +02:00
if len ( extraLabels ) == 0 {
2023-08-17 14:35:26 +02:00
return
2023-08-15 13:47:48 +02:00
}
labels := rctx . labels [ : 0 ]
for i := range tss {
ts := & tss [ i ]
labelsLen := len ( labels )
labels = append ( labels , ts . Labels ... )
for j := range extraLabels {
extraLabel := extraLabels [ j ]
tmp := promrelabel . GetLabelByName ( labels [ labelsLen : ] , extraLabel . Name )
if tmp != nil {
tmp . Value = extraLabel . Value
} else {
labels = append ( labels , extraLabel )
}
}
2023-08-17 14:35:26 +02:00
ts . Labels = labels [ labelsLen : ]
2023-08-15 13:47:48 +02:00
}
rctx . labels = labels
}
2020-02-23 12:35:47 +01:00
type relabelCtx struct {
// pool for labels, which are used during the relabeling.
labels [ ] prompbmarshal . Label
}
func ( rctx * relabelCtx ) reset ( ) {
2020-11-07 15:16:56 +01:00
promrelabel . CleanLabels ( rctx . labels )
2020-02-23 12:35:47 +01:00
rctx . labels = rctx . labels [ : 0 ]
}
var relabelCtxPool = & sync . Pool {
New : func ( ) interface { } {
return & relabelCtx { }
} ,
}
2020-03-03 12:08:17 +01:00
func getRelabelCtx ( ) * relabelCtx {
return relabelCtxPool . Get ( ) . ( * relabelCtx )
}
func putRelabelCtx ( rctx * relabelCtx ) {
rctx . labels = rctx . labels [ : 0 ]
relabelCtxPool . Put ( rctx )
}
2023-08-17 14:35:26 +02:00
func fixPromCompatibleNaming ( labels [ ] prompbmarshal . Label ) {
// Replace unsupported Prometheus chars in label names and metric names with underscores.
for i := range labels {
label := & labels [ i ]
if label . Name == "__name__" {
label . Value = promrelabel . SanitizeMetricName ( label . Value )
} else {
label . Name = promrelabel . SanitizeLabelName ( label . Name )
}
}
}