mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-12 13:32:25 +01:00
d577657fb7
- Make sure that the last successfully loaded config is used on hot-reload failure - Properly cleanup resources occupied by already initialized aggregators when the current aggregator fails to be initialized - Expose distinct vmagent_streamaggr_config_reload* metrics per each -remoteWrite.streamAggr.config This should simplify monitoring and debugging failed reloads - Remove race condition at app/vminsert/common.MustStopStreamAggr when calling sa.MustStop() while sa could be in use at realoadSaConfig() - Remove lib/streamaggr.aggregator.hasState global variable, since it may negatively impact scalability on system with big number of CPU cores at hasState.Store(true) call inside aggregator.Push(). - Remove fine-grained aggregator reload - reload all the aggregators on config change instead. This simplifies the code a bit. The fine-grained aggregator reload may be returned back if there will be demand from real users for it. - Check -relabelConfig and -streamAggr.config files when single-node VictoriaMetrics runs with -dryRun flag - Return back accidentally removed changelog for v1.87.4 at docs/CHANGELOG.md Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639
172 lines
5.3 KiB
Go
172 lines
5.3 KiB
Go
package relabel
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"sync/atomic"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
relabelConfig = flag.String("relabelConfig", "", "Optional path to a file with relabeling rules, which are applied to all the ingested metrics. "+
|
|
"The path can point either to local file or to http url. "+
|
|
"See https://docs.victoriametrics.com/#relabeling for details. The config is reloaded on SIGHUP signal")
|
|
|
|
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
|
|
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
|
|
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
|
|
)
|
|
|
|
// Init must be called after flag.Parse and before using the relabel package.
|
|
func Init() {
|
|
// Register SIGHUP handler for config re-read just before loadRelabelConfig call.
|
|
// This guarantees that the config will be re-read if the signal arrives during loadRelabelConfig call.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
|
|
sighupCh := procutil.NewSighupChan()
|
|
|
|
pcs, err := loadRelabelConfig()
|
|
if err != nil {
|
|
logger.Fatalf("cannot load relabelConfig: %s", err)
|
|
}
|
|
pcsGlobal.Store(pcs)
|
|
configSuccess.Set(1)
|
|
configTimestamp.Set(fasttime.UnixTimestamp())
|
|
|
|
if len(*relabelConfig) == 0 {
|
|
return
|
|
}
|
|
go func() {
|
|
for range sighupCh {
|
|
configReloads.Inc()
|
|
logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig)
|
|
pcs, err := loadRelabelConfig()
|
|
if err != nil {
|
|
configReloadErrors.Inc()
|
|
configSuccess.Set(0)
|
|
logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err)
|
|
continue
|
|
}
|
|
pcsGlobal.Store(pcs)
|
|
configSuccess.Set(1)
|
|
configTimestamp.Set(fasttime.UnixTimestamp())
|
|
logger.Infof("successfully reloaded -relabelConfig=%q", *relabelConfig)
|
|
}
|
|
}()
|
|
}
|
|
|
|
var (
|
|
configReloads = metrics.NewCounter(`vm_relabel_config_reloads_total`)
|
|
configReloadErrors = metrics.NewCounter(`vm_relabel_config_reloads_errors_total`)
|
|
configSuccess = metrics.NewCounter(`vm_relabel_config_last_reload_successful`)
|
|
configTimestamp = metrics.NewCounter(`vm_relabel_config_last_reload_success_timestamp_seconds`)
|
|
)
|
|
|
|
var pcsGlobal atomic.Value
|
|
|
|
// CheckRelabelConfig checks config pointed by -relabelConfig
|
|
func CheckRelabelConfig() error {
|
|
_, err := loadRelabelConfig()
|
|
return err
|
|
}
|
|
|
|
func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) {
|
|
if len(*relabelConfig) == 0 {
|
|
return nil, nil
|
|
}
|
|
pcs, err := promrelabel.LoadRelabelConfigs(*relabelConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error when reading -relabelConfig=%q: %w", *relabelConfig, err)
|
|
}
|
|
return pcs, nil
|
|
}
|
|
|
|
// HasRelabeling returns true if there is global relabeling.
|
|
func HasRelabeling() bool {
|
|
pcs := pcsGlobal.Load().(*promrelabel.ParsedConfigs)
|
|
return pcs.Len() > 0 || *usePromCompatibleNaming
|
|
}
|
|
|
|
// Ctx holds relabeling context.
|
|
type Ctx struct {
|
|
// tmpLabels is used during ApplyRelabeling call.
|
|
tmpLabels []prompbmarshal.Label
|
|
}
|
|
|
|
// Reset resets ctx.
|
|
func (ctx *Ctx) Reset() {
|
|
promrelabel.CleanLabels(ctx.tmpLabels)
|
|
ctx.tmpLabels = ctx.tmpLabels[:0]
|
|
}
|
|
|
|
// ApplyRelabeling applies relabeling to the given labels and returns the result.
|
|
//
|
|
// The returned labels are valid until the next call to ApplyRelabeling.
|
|
func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
|
|
pcs := pcsGlobal.Load().(*promrelabel.ParsedConfigs)
|
|
if pcs.Len() == 0 && !*usePromCompatibleNaming {
|
|
// There are no relabeling rules.
|
|
return labels
|
|
}
|
|
// Convert labels to prompbmarshal.Label format suitable for relabeling.
|
|
tmpLabels := ctx.tmpLabels[:0]
|
|
for _, label := range labels {
|
|
name := bytesutil.ToUnsafeString(label.Name)
|
|
if len(name) == 0 {
|
|
name = "__name__"
|
|
}
|
|
value := bytesutil.ToUnsafeString(label.Value)
|
|
tmpLabels = append(tmpLabels, prompbmarshal.Label{
|
|
Name: name,
|
|
Value: value,
|
|
})
|
|
}
|
|
|
|
if *usePromCompatibleNaming {
|
|
// Replace unsupported Prometheus chars in label names and metric names with underscores.
|
|
for i := range tmpLabels {
|
|
label := &tmpLabels[i]
|
|
if label.Name == "__name__" {
|
|
label.Value = promrelabel.SanitizeName(label.Value)
|
|
} else {
|
|
label.Name = promrelabel.SanitizeName(label.Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
if pcs.Len() > 0 {
|
|
// Apply relabeling
|
|
tmpLabels = pcs.Apply(tmpLabels, 0)
|
|
tmpLabels = promrelabel.FinalizeLabels(tmpLabels[:0], tmpLabels)
|
|
if len(tmpLabels) == 0 {
|
|
metricsDropped.Inc()
|
|
}
|
|
}
|
|
|
|
ctx.tmpLabels = tmpLabels
|
|
|
|
// Return back labels to the desired format.
|
|
dst := labels[:0]
|
|
for _, label := range tmpLabels {
|
|
name := bytesutil.ToUnsafeBytes(label.Name)
|
|
if label.Name == "__name__" {
|
|
name = nil
|
|
}
|
|
value := bytesutil.ToUnsafeBytes(label.Value)
|
|
dst = append(dst, prompb.Label{
|
|
Name: name,
|
|
Value: value,
|
|
})
|
|
}
|
|
return dst
|
|
}
|
|
|
|
var metricsDropped = metrics.NewCounter(`vm_relabel_metrics_dropped_total`)
|