mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
app/vminsert: add ability to apply relabeling to all the incoming metrics if -relabelConfig
command-line arg points to a file with a list of relabel_config
entries
See https://victoriametrics.github.io/#relabeling
This commit is contained in:
parent
e5500bfcf2
commit
91b3482894
20
README.md
20
README.md
@ -79,6 +79,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
|
|||||||
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
|
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
|
||||||
* [/api/v1/import](#how-to-import-time-series-data).
|
* [/api/v1/import](#how-to-import-time-series-data).
|
||||||
* [Arbitrary CSV data](#how-to-import-csv-data).
|
* [Arbitrary CSV data](#how-to-import-csv-data).
|
||||||
|
* Supports metrics' relabeling. See [these docs](#relabeling) for details.
|
||||||
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
||||||
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
||||||
* See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles).
|
* See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles).
|
||||||
@ -111,6 +112,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
|
|||||||
* [How to delete time series](#how-to-delete-time-series)
|
* [How to delete time series](#how-to-delete-time-series)
|
||||||
* [How to export time series](#how-to-export-time-series)
|
* [How to export time series](#how-to-export-time-series)
|
||||||
* [How to import time series data](#how-to-import-time-series-data)
|
* [How to import time series data](#how-to-import-time-series-data)
|
||||||
|
* [Relabeling](#relabeling)
|
||||||
* [Federation](#federation)
|
* [Federation](#federation)
|
||||||
* [Capacity planning](#capacity-planning)
|
* [Capacity planning](#capacity-planning)
|
||||||
* [High availability](#high-availability)
|
* [High availability](#high-availability)
|
||||||
@ -650,7 +652,7 @@ The delete API is intended mainly for the following cases:
|
|||||||
It isn't recommended using delete API for the following cases, since it brings non-zero overhead:
|
It isn't recommended using delete API for the following cases, since it brings non-zero overhead:
|
||||||
|
|
||||||
* Regular cleanups for unneeded data. Just prevent writing unneeded data into VictoriaMetrics.
|
* Regular cleanups for unneeded data. Just prevent writing unneeded data into VictoriaMetrics.
|
||||||
This can be done with relabeling in [vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md).
|
This can be done with [relabeling](#relabeling).
|
||||||
See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details.
|
See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details.
|
||||||
* Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted
|
* Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted
|
||||||
time series occupy disk space until the next merge operation, which can never occur when deleting too old data.
|
time series occupy disk space until the next merge operation, which can never occur when deleting too old data.
|
||||||
@ -724,6 +726,22 @@ Note that it could be required to flush response cache after importing historica
|
|||||||
Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts
|
Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts
|
||||||
and importing them concurrently. Note that the original file must be split on newlines.
|
and importing them concurrently. Note that the original file must be split on newlines.
|
||||||
|
|
||||||
|
|
||||||
|
### Relabeling
|
||||||
|
|
||||||
|
VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points
|
||||||
|
to a file containing a list of [relabel_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) entries.
|
||||||
|
|
||||||
|
Additionally VictoriaMetrics provides the following extra actions for relabeling rules:
|
||||||
|
|
||||||
|
* `replace_all`: replaces all the occurences of `regex` in the values of `source_labels` with the `replacement` and stores the result in the `target_label`.
|
||||||
|
* `labelmap_all`: replaces all the occurences of `regex` in all the label names with the `replacement`.
|
||||||
|
* `keep_if_equal`: keeps the entry if all label values from `source_labels` are equal.
|
||||||
|
* `drop_if_equal`: drops the entry if all the label values from `source_labels` are equal.
|
||||||
|
|
||||||
|
See also [relabeling in vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md#relabeling).
|
||||||
|
|
||||||
|
|
||||||
### Federation
|
### Federation
|
||||||
|
|
||||||
VictoriaMetrics exports [Prometheus-compatible federation data](https://prometheus.io/docs/prometheus/latest/federation/)
|
VictoriaMetrics exports [Prometheus-compatible federation data](https://prometheus.io/docs/prometheus/latest/federation/)
|
||||||
|
@ -16,7 +16,7 @@ var (
|
|||||||
unparsedLabelsGlobal = flagutil.NewArray("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. "+
|
unparsedLabelsGlobal = flagutil.NewArray("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. "+
|
||||||
"Pass multiple -remoteWrite.label flags in order to add multiple flags to metrics before sending them to remote storage")
|
"Pass multiple -remoteWrite.label flags in order to add multiple flags to metrics before sending them to remote storage")
|
||||||
relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabel_config entries. These entries are applied to all the metrics "+
|
relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabel_config entries. These entries are applied to all the metrics "+
|
||||||
"before sending them to -remoteWrite.url. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config for details")
|
"before sending them to -remoteWrite.url. See https://victoriametrics.github.io/vmagent.html#relabeling for details")
|
||||||
relabelConfigPaths = flagutil.NewArray("remoteWrite.urlRelabelConfig", "Optional path to relabel config for the corresponding -remoteWrite.url")
|
relabelConfigPaths = flagutil.NewArray("remoteWrite.urlRelabelConfig", "Optional path to relabel config for the corresponding -remoteWrite.url")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
@ -17,11 +18,14 @@ type InsertCtx struct {
|
|||||||
|
|
||||||
mrs []storage.MetricRow
|
mrs []storage.MetricRow
|
||||||
metricNamesBuf []byte
|
metricNamesBuf []byte
|
||||||
|
|
||||||
|
relabelCtx relabel.Ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset resets ctx for future fill with rowsLen rows.
|
// Reset resets ctx for future fill with rowsLen rows.
|
||||||
func (ctx *InsertCtx) Reset(rowsLen int) {
|
func (ctx *InsertCtx) Reset(rowsLen int) {
|
||||||
for _, label := range ctx.Labels {
|
for i := range ctx.Labels {
|
||||||
|
label := &ctx.Labels[i]
|
||||||
label.Name = nil
|
label.Name = nil
|
||||||
label.Value = nil
|
label.Value = nil
|
||||||
}
|
}
|
||||||
@ -37,6 +41,8 @@ func (ctx *InsertCtx) Reset(rowsLen int) {
|
|||||||
}
|
}
|
||||||
ctx.mrs = ctx.mrs[:0]
|
ctx.mrs = ctx.mrs[:0]
|
||||||
ctx.metricNamesBuf = ctx.metricNamesBuf[:0]
|
ctx.metricNamesBuf = ctx.metricNamesBuf[:0]
|
||||||
|
|
||||||
|
ctx.relabelCtx.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) []byte {
|
func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) []byte {
|
||||||
@ -118,6 +124,11 @@ func (ctx *InsertCtx) AddLabel(name, value string) {
|
|||||||
ctx.Labels = labels
|
ctx.Labels = labels
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ApplyRelabeling applies relabeling to ic.Labels.
|
||||||
|
func (ctx *InsertCtx) ApplyRelabeling() {
|
||||||
|
ctx.Labels = ctx.relabelCtx.ApplyRelabeling(ctx.Labels)
|
||||||
|
}
|
||||||
|
|
||||||
// FlushBufs flushes buffered rows to the underlying storage.
|
// FlushBufs flushes buffered rows to the underlying storage.
|
||||||
func (ctx *InsertCtx) FlushBufs() error {
|
func (ctx *InsertCtx) FlushBufs() error {
|
||||||
if err := vmstorage.AddRows(ctx.mrs); err != nil {
|
if err := vmstorage.AddRows(ctx.mrs); err != nil {
|
||||||
|
@ -36,6 +36,11 @@ func insertRows(rows []parser.Row) error {
|
|||||||
tag := &r.Tags[j]
|
tag := &r.Tags[j]
|
||||||
ctx.AddLabel(tag.Key, tag.Value)
|
ctx.AddLabel(tag.Key, tag.Value)
|
||||||
}
|
}
|
||||||
|
ctx.ApplyRelabeling()
|
||||||
|
if len(ctx.Labels) == 0 {
|
||||||
|
// Skip metric without labels.
|
||||||
|
continue
|
||||||
|
}
|
||||||
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
||||||
}
|
}
|
||||||
rowsInserted.Add(len(rows))
|
rowsInserted.Add(len(rows))
|
||||||
|
@ -36,6 +36,11 @@ func insertRows(rows []parser.Row) error {
|
|||||||
tag := &r.Tags[j]
|
tag := &r.Tags[j]
|
||||||
ctx.AddLabel(tag.Key, tag.Value)
|
ctx.AddLabel(tag.Key, tag.Value)
|
||||||
}
|
}
|
||||||
|
ctx.ApplyRelabeling()
|
||||||
|
if len(ctx.Labels) == 0 {
|
||||||
|
// Skip metric without labels.
|
||||||
|
continue
|
||||||
|
}
|
||||||
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
||||||
}
|
}
|
||||||
rowsInserted.Add(len(rows))
|
rowsInserted.Add(len(rows))
|
||||||
|
@ -8,7 +8,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
|
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
@ -59,6 +61,7 @@ func insertRows(db string, rows []parser.Row) error {
|
|||||||
ic := &ctx.Common
|
ic := &ctx.Common
|
||||||
ic.Reset(rowsLen)
|
ic.Reset(rowsLen)
|
||||||
rowsTotal := 0
|
rowsTotal := 0
|
||||||
|
hasRelabeling := relabel.HasRelabeling()
|
||||||
for i := range rows {
|
for i := range rows {
|
||||||
r := &rows[i]
|
r := &rows[i]
|
||||||
ic.Labels = ic.Labels[:0]
|
ic.Labels = ic.Labels[:0]
|
||||||
@ -73,22 +76,37 @@ func insertRows(db string, rows []parser.Row) error {
|
|||||||
if len(db) > 0 && !hasDBLabel {
|
if len(db) > 0 && !hasDBLabel {
|
||||||
ic.AddLabel("db", db)
|
ic.AddLabel("db", db)
|
||||||
}
|
}
|
||||||
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
|
|
||||||
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:0], r.Measurement...)
|
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:0], r.Measurement...)
|
||||||
skipFieldKey := len(r.Fields) == 1 && *skipSingleField
|
skipFieldKey := len(r.Fields) == 1 && *skipSingleField
|
||||||
if len(ctx.metricGroupBuf) > 0 && !skipFieldKey {
|
if len(ctx.metricGroupBuf) > 0 && !skipFieldKey {
|
||||||
ctx.metricGroupBuf = append(ctx.metricGroupBuf, *measurementFieldSeparator...)
|
ctx.metricGroupBuf = append(ctx.metricGroupBuf, *measurementFieldSeparator...)
|
||||||
}
|
}
|
||||||
metricGroupPrefixLen := len(ctx.metricGroupBuf)
|
metricGroupPrefixLen := len(ctx.metricGroupBuf)
|
||||||
|
ctx.metricNameBuf = ctx.metricNameBuf[:0]
|
||||||
|
if !hasRelabeling {
|
||||||
|
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf, ic.Labels)
|
||||||
|
}
|
||||||
|
labelsLen := len(ic.Labels)
|
||||||
for j := range r.Fields {
|
for j := range r.Fields {
|
||||||
f := &r.Fields[j]
|
f := &r.Fields[j]
|
||||||
if !skipFieldKey {
|
if !skipFieldKey {
|
||||||
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...)
|
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:metricGroupPrefixLen], f.Key...)
|
||||||
}
|
}
|
||||||
metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf)
|
metricGroup := bytesutil.ToUnsafeString(ctx.metricGroupBuf)
|
||||||
ic.Labels = ic.Labels[:0]
|
ic.Labels = ic.Labels[:labelsLen]
|
||||||
ic.AddLabel("", metricGroup)
|
ic.AddLabel("", metricGroup)
|
||||||
ic.WriteDataPoint(ctx.metricNameBuf, ic.Labels[:1], r.Timestamp, f.Value)
|
var labels []prompb.Label
|
||||||
|
if hasRelabeling {
|
||||||
|
ic.ApplyRelabeling()
|
||||||
|
labels = ic.Labels
|
||||||
|
if len(labels) == 0 {
|
||||||
|
// Skip metric without labels.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
labels = ic.Labels[labelsLen : labelsLen+1]
|
||||||
|
}
|
||||||
|
ic.WriteDataPoint(ctx.metricNameBuf, labels, r.Timestamp, f.Value)
|
||||||
}
|
}
|
||||||
rowsTotal += len(r.Fields)
|
rowsTotal += len(r.Fields)
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prompush"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prompush"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite"
|
graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite"
|
||||||
@ -46,6 +47,7 @@ var (
|
|||||||
|
|
||||||
// Init initializes vminsert.
|
// Init initializes vminsert.
|
||||||
func Init() {
|
func Init() {
|
||||||
|
relabel.Init()
|
||||||
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
|
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
|
||||||
|
|
||||||
writeconcurrencylimiter.Init()
|
writeconcurrencylimiter.Init()
|
||||||
|
@ -36,6 +36,11 @@ func insertRows(rows []parser.Row) error {
|
|||||||
tag := &r.Tags[j]
|
tag := &r.Tags[j]
|
||||||
ctx.AddLabel(tag.Key, tag.Value)
|
ctx.AddLabel(tag.Key, tag.Value)
|
||||||
}
|
}
|
||||||
|
ctx.ApplyRelabeling()
|
||||||
|
if len(ctx.Labels) == 0 {
|
||||||
|
// Skip metric without labels.
|
||||||
|
continue
|
||||||
|
}
|
||||||
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
||||||
}
|
}
|
||||||
rowsInserted.Add(len(rows))
|
rowsInserted.Add(len(rows))
|
||||||
|
@ -42,6 +42,11 @@ func insertRows(rows []parser.Row) error {
|
|||||||
tag := &r.Tags[j]
|
tag := &r.Tags[j]
|
||||||
ctx.AddLabel(tag.Key, tag.Value)
|
ctx.AddLabel(tag.Key, tag.Value)
|
||||||
}
|
}
|
||||||
|
ctx.ApplyRelabeling()
|
||||||
|
if len(ctx.Labels) == 0 {
|
||||||
|
// Skip metric without labels.
|
||||||
|
continue
|
||||||
|
}
|
||||||
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value)
|
||||||
}
|
}
|
||||||
rowsInserted.Add(len(rows))
|
rowsInserted.Add(len(rows))
|
||||||
|
@ -1,13 +1,8 @@
|
|||||||
package prompush
|
package prompush
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"runtime"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
@ -21,8 +16,8 @@ const maxRowsPerBlock = 10000
|
|||||||
|
|
||||||
// Push pushes wr to storage.
|
// Push pushes wr to storage.
|
||||||
func Push(wr *prompbmarshal.WriteRequest) {
|
func Push(wr *prompbmarshal.WriteRequest) {
|
||||||
ctx := getPushCtx()
|
ctx := common.GetInsertCtx()
|
||||||
defer putPushCtx(ctx)
|
defer common.PutInsertCtx(ctx)
|
||||||
|
|
||||||
tss := wr.Timeseries
|
tss := wr.Timeseries
|
||||||
for len(tss) > 0 {
|
for len(tss) > 0 {
|
||||||
@ -34,80 +29,39 @@ func Push(wr *prompbmarshal.WriteRequest) {
|
|||||||
} else {
|
} else {
|
||||||
tss = nil
|
tss = nil
|
||||||
}
|
}
|
||||||
ctx.push(tssBlock)
|
push(ctx, tssBlock)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *pushCtx) push(tss []prompbmarshal.TimeSeries) {
|
func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) {
|
||||||
rowsLen := 0
|
rowsLen := 0
|
||||||
for i := range tss {
|
for i := range tss {
|
||||||
rowsLen += len(tss[i].Samples)
|
rowsLen += len(tss[i].Samples)
|
||||||
}
|
}
|
||||||
ic := &ctx.Common
|
ctx.Reset(rowsLen)
|
||||||
ic.Reset(rowsLen)
|
|
||||||
rowsTotal := 0
|
rowsTotal := 0
|
||||||
labels := ctx.labels[:0]
|
|
||||||
for i := range tss {
|
for i := range tss {
|
||||||
ts := &tss[i]
|
ts := &tss[i]
|
||||||
labels = labels[:0]
|
ctx.Labels = ctx.Labels[:0]
|
||||||
for j := range ts.Labels {
|
for j := range ts.Labels {
|
||||||
label := &ts.Labels[j]
|
label := &ts.Labels[j]
|
||||||
labels = append(labels, prompb.Label{
|
ctx.AddLabel(label.Name, label.Value)
|
||||||
Name: bytesutil.ToUnsafeBytes(label.Name),
|
}
|
||||||
Value: bytesutil.ToUnsafeBytes(label.Value),
|
ctx.ApplyRelabeling()
|
||||||
})
|
if len(ctx.Labels) == 0 {
|
||||||
|
// Skip metric without labels.
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
var metricNameRaw []byte
|
var metricNameRaw []byte
|
||||||
for i := range ts.Samples {
|
for i := range ts.Samples {
|
||||||
r := &ts.Samples[i]
|
r := &ts.Samples[i]
|
||||||
metricNameRaw = ic.WriteDataPointExt(metricNameRaw, labels, r.Timestamp, r.Value)
|
metricNameRaw = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
|
||||||
}
|
}
|
||||||
rowsTotal += len(ts.Samples)
|
rowsTotal += len(ts.Samples)
|
||||||
}
|
}
|
||||||
ctx.labels = labels
|
|
||||||
rowsInserted.Add(rowsTotal)
|
rowsInserted.Add(rowsTotal)
|
||||||
rowsPerInsert.Update(float64(rowsTotal))
|
rowsPerInsert.Update(float64(rowsTotal))
|
||||||
if err := ic.FlushBufs(); err != nil {
|
if err := ctx.FlushBufs(); err != nil {
|
||||||
logger.Errorf("cannot flush promscrape data to storage: %s", err)
|
logger.Errorf("cannot flush promscrape data to storage: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type pushCtx struct {
|
|
||||||
Common common.InsertCtx
|
|
||||||
labels []prompb.Label
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ctx *pushCtx) reset() {
|
|
||||||
ctx.Common.Reset(0)
|
|
||||||
|
|
||||||
for i := range ctx.labels {
|
|
||||||
label := &ctx.labels[i]
|
|
||||||
label.Name = nil
|
|
||||||
label.Value = nil
|
|
||||||
}
|
|
||||||
ctx.labels = ctx.labels[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
func getPushCtx() *pushCtx {
|
|
||||||
select {
|
|
||||||
case ctx := <-pushCtxPoolCh:
|
|
||||||
return ctx
|
|
||||||
default:
|
|
||||||
if v := pushCtxPool.Get(); v != nil {
|
|
||||||
return v.(*pushCtx)
|
|
||||||
}
|
|
||||||
return &pushCtx{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func putPushCtx(ctx *pushCtx) {
|
|
||||||
ctx.reset()
|
|
||||||
select {
|
|
||||||
case pushCtxPoolCh <- ctx:
|
|
||||||
default:
|
|
||||||
pushCtxPool.Put(ctx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var pushCtxPool sync.Pool
|
|
||||||
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
|
|
||||||
|
@ -34,10 +34,17 @@ func insertRows(timeseries []prompb.TimeSeries) error {
|
|||||||
rowsTotal := 0
|
rowsTotal := 0
|
||||||
for i := range timeseries {
|
for i := range timeseries {
|
||||||
ts := ×eries[i]
|
ts := ×eries[i]
|
||||||
|
// Make a shallow copy of ts.Labels before calling ctx.ApplyRelabeling, since ctx.ApplyRelabeling may modify labels.
|
||||||
|
ctx.Labels = append(ctx.Labels[:0], ts.Labels...)
|
||||||
|
ctx.ApplyRelabeling()
|
||||||
|
if len(ctx.Labels) == 0 {
|
||||||
|
// Skip metric without labels.
|
||||||
|
continue
|
||||||
|
}
|
||||||
var metricNameRaw []byte
|
var metricNameRaw []byte
|
||||||
for i := range ts.Samples {
|
for i := range ts.Samples {
|
||||||
r := &ts.Samples[i]
|
r := &ts.Samples[i]
|
||||||
metricNameRaw = ctx.WriteDataPointExt(metricNameRaw, ts.Labels, r.Timestamp, r.Value)
|
metricNameRaw = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
|
||||||
}
|
}
|
||||||
rowsTotal += len(ts.Samples)
|
rowsTotal += len(ts.Samples)
|
||||||
}
|
}
|
||||||
|
120
app/vminsert/relabel/relabel.go
Normal file
120
app/vminsert/relabel/relabel.go
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
package relabel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||||
|
)
|
||||||
|
|
||||||
|
var relabelConfig = flag.String("relabelConfig", "", "Optional path to a file with relabeling rules, which are applied to all the ingested metrics. "+
|
||||||
|
"See https://victoriametrics.github.io/#relabeling for details")
|
||||||
|
|
||||||
|
// Init must be called after flag.Parse and before using the relabel package.
|
||||||
|
func Init() {
|
||||||
|
prcs, err := loadRelabelConfig()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("cannot load relabelConfig: %s", err)
|
||||||
|
}
|
||||||
|
prcsGlobal.Store(&prcs)
|
||||||
|
if len(*relabelConfig) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sighupCh := procutil.NewSighupChan()
|
||||||
|
go func() {
|
||||||
|
for range sighupCh {
|
||||||
|
logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig)
|
||||||
|
prcs, err := loadRelabelConfig()
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
prcsGlobal.Store(&prcs)
|
||||||
|
logger.Infof("successfully reloaded -relabelConfig=%q", *relabelConfig)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var prcsGlobal atomic.Value
|
||||||
|
|
||||||
|
func loadRelabelConfig() ([]promrelabel.ParsedRelabelConfig, error) {
|
||||||
|
if len(*relabelConfig) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
prcs, err := promrelabel.LoadRelabelConfigs(*relabelConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error when reading -relabelConfig=%q: %w", *relabelConfig, err)
|
||||||
|
}
|
||||||
|
return prcs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasRelabeling returns true if there is global relabeling.
|
||||||
|
func HasRelabeling() bool {
|
||||||
|
prcs := prcsGlobal.Load().(*[]promrelabel.ParsedRelabelConfig)
|
||||||
|
return len(*prcs) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ctx holds relabeling context.
|
||||||
|
type Ctx struct {
|
||||||
|
// tmpLabels is used during ApplyRelabeling call.
|
||||||
|
tmpLabels []prompbmarshal.Label
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset resets ctx.
|
||||||
|
func (ctx *Ctx) Reset() {
|
||||||
|
labels := ctx.tmpLabels
|
||||||
|
for i := range labels {
|
||||||
|
label := &labels[i]
|
||||||
|
label.Name = ""
|
||||||
|
label.Value = ""
|
||||||
|
}
|
||||||
|
ctx.tmpLabels = ctx.tmpLabels[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// ApplyRelabeling applies relabeling to the given labels and returns the result.
|
||||||
|
//
|
||||||
|
// The returned labels are valid until the next call to ApplyRelabeling.
|
||||||
|
func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
|
||||||
|
prcs := prcsGlobal.Load().(*[]promrelabel.ParsedRelabelConfig)
|
||||||
|
if len(*prcs) == 0 {
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
// Convert src to prompbmarshal.Label format suitable for relabeling.
|
||||||
|
tmpLabels := ctx.tmpLabels[:0]
|
||||||
|
for _, label := range labels {
|
||||||
|
name := bytesutil.ToUnsafeString(label.Name)
|
||||||
|
if len(name) == 0 {
|
||||||
|
name = "__name__"
|
||||||
|
}
|
||||||
|
value := bytesutil.ToUnsafeString(label.Value)
|
||||||
|
tmpLabels = append(tmpLabels, prompbmarshal.Label{
|
||||||
|
Name: name,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply relabeling
|
||||||
|
tmpLabels = promrelabel.ApplyRelabelConfigs(tmpLabels, 0, *prcs, true)
|
||||||
|
ctx.tmpLabels = tmpLabels
|
||||||
|
|
||||||
|
// Return back labels to the desired format.
|
||||||
|
dst := labels[:0]
|
||||||
|
for _, label := range tmpLabels {
|
||||||
|
name := bytesutil.ToUnsafeBytes(label.Name)
|
||||||
|
if label.Name == "__name__" {
|
||||||
|
name = nil
|
||||||
|
}
|
||||||
|
value := bytesutil.ToUnsafeBytes(label.Value)
|
||||||
|
dst = append(dst, prompb.Label{
|
||||||
|
Name: name,
|
||||||
|
Value: value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return dst
|
||||||
|
}
|
@ -44,6 +44,11 @@ func insertRows(rows []parser.Row) error {
|
|||||||
tag := &r.Tags[j]
|
tag := &r.Tags[j]
|
||||||
ic.AddLabelBytes(tag.Key, tag.Value)
|
ic.AddLabelBytes(tag.Key, tag.Value)
|
||||||
}
|
}
|
||||||
|
ic.ApplyRelabeling()
|
||||||
|
if len(ic.Labels) == 0 {
|
||||||
|
// Skip metric without labels.
|
||||||
|
continue
|
||||||
|
}
|
||||||
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
|
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
|
||||||
values := r.Values
|
values := r.Values
|
||||||
timestamps := r.Timestamps
|
timestamps := r.Timestamps
|
||||||
|
@ -79,6 +79,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
|
|||||||
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
|
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
|
||||||
* [/api/v1/import](#how-to-import-time-series-data).
|
* [/api/v1/import](#how-to-import-time-series-data).
|
||||||
* [Arbitrary CSV data](#how-to-import-csv-data).
|
* [Arbitrary CSV data](#how-to-import-csv-data).
|
||||||
|
* Supports metrics' relabeling. See [these docs](#relabeling) for details.
|
||||||
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
||||||
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
||||||
* See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles).
|
* See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles).
|
||||||
@ -111,6 +112,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
|
|||||||
* [How to delete time series](#how-to-delete-time-series)
|
* [How to delete time series](#how-to-delete-time-series)
|
||||||
* [How to export time series](#how-to-export-time-series)
|
* [How to export time series](#how-to-export-time-series)
|
||||||
* [How to import time series data](#how-to-import-time-series-data)
|
* [How to import time series data](#how-to-import-time-series-data)
|
||||||
|
* [Relabeling](#relabeling)
|
||||||
* [Federation](#federation)
|
* [Federation](#federation)
|
||||||
* [Capacity planning](#capacity-planning)
|
* [Capacity planning](#capacity-planning)
|
||||||
* [High availability](#high-availability)
|
* [High availability](#high-availability)
|
||||||
@ -650,7 +652,7 @@ The delete API is intended mainly for the following cases:
|
|||||||
It isn't recommended using delete API for the following cases, since it brings non-zero overhead:
|
It isn't recommended using delete API for the following cases, since it brings non-zero overhead:
|
||||||
|
|
||||||
* Regular cleanups for unneeded data. Just prevent writing unneeded data into VictoriaMetrics.
|
* Regular cleanups for unneeded data. Just prevent writing unneeded data into VictoriaMetrics.
|
||||||
This can be done with relabeling in [vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md).
|
This can be done with [relabeling](#relabeling).
|
||||||
See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details.
|
See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details.
|
||||||
* Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted
|
* Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted
|
||||||
time series occupy disk space until the next merge operation, which can never occur when deleting too old data.
|
time series occupy disk space until the next merge operation, which can never occur when deleting too old data.
|
||||||
@ -724,6 +726,22 @@ Note that it could be required to flush response cache after importing historica
|
|||||||
Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts
|
Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts
|
||||||
and importing them concurrently. Note that the original file must be split on newlines.
|
and importing them concurrently. Note that the original file must be split on newlines.
|
||||||
|
|
||||||
|
|
||||||
|
### Relabeling
|
||||||
|
|
||||||
|
VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points
|
||||||
|
to a file containing a list of [relabel_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) entries.
|
||||||
|
|
||||||
|
Additionally VictoriaMetrics provides the following extra actions for relabeling rules:
|
||||||
|
|
||||||
|
* `replace_all`: replaces all the occurences of `regex` in the values of `source_labels` with the `replacement` and stores the result in the `target_label`.
|
||||||
|
* `labelmap_all`: replaces all the occurences of `regex` in all the label names with the `replacement`.
|
||||||
|
* `keep_if_equal`: keeps the entry if all label values from `source_labels` are equal.
|
||||||
|
* `drop_if_equal`: drops the entry if all the label values from `source_labels` are equal.
|
||||||
|
|
||||||
|
See also [relabeling in vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md#relabeling).
|
||||||
|
|
||||||
|
|
||||||
### Federation
|
### Federation
|
||||||
|
|
||||||
VictoriaMetrics exports [Prometheus-compatible federation data](https://prometheus.io/docs/prometheus/latest/federation/)
|
VictoriaMetrics exports [Prometheus-compatible federation data](https://prometheus.io/docs/prometheus/latest/federation/)
|
||||||
|
@ -90,7 +90,7 @@ func parseRelabelConfig(dst []ParsedRelabelConfig, rc *RelabelConfig) ([]ParsedR
|
|||||||
return dst, fmt.Errorf("missing `source_labels` for `action=replace_all`")
|
return dst, fmt.Errorf("missing `source_labels` for `action=replace_all`")
|
||||||
}
|
}
|
||||||
if targetLabel == "" {
|
if targetLabel == "" {
|
||||||
return dst, fmt.Errorf("missing `target_label` for `action=replace`")
|
return dst, fmt.Errorf("missing `target_label` for `action=replace_all`")
|
||||||
}
|
}
|
||||||
case "keep_if_equal":
|
case "keep_if_equal":
|
||||||
if len(sourceLabels) < 2 {
|
if len(sourceLabels) < 2 {
|
||||||
|
Loading…
Reference in New Issue
Block a user