From f41b36bb9a162e5cc184da5e0236e3c441776c85 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 2 Sep 2020 19:41:12 +0300 Subject: [PATCH] app/{vminsert,vmagent}: allow adding extra labels when importing data via Prometheus, CSV and JSON line formats Extra labels may be added to the imported data by passing `extra_label=name=value` query args. Multiple query args may be passed in order to add multiple extra labels. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/719 --- README.md | 9 +++++++ app/vmagent/csvimport/request_handler.go | 12 +++++++-- .../prometheusimport/request_handler.go | 12 +++++++-- app/vmagent/vmimport/request_handler.go | 12 +++++++-- app/vminsert/csvimport/request_handler.go | 14 ++++++++-- .../prometheusimport/request_handler.go | 16 ++++++++++-- app/vminsert/vmimport/request_handler.go | 16 ++++++++++-- docs/Single-server-VictoriaMetrics.md | 9 +++++++ lib/protoparser/common/extra_labels.go | 26 +++++++++++++++++++ 9 files changed, 114 insertions(+), 12 deletions(-) create mode 100644 lib/protoparser/common/extra_labels.go diff --git a/README.md b/README.md index 5a6212d49..007a9e5e1 100644 --- a/README.md +++ b/README.md @@ -516,6 +516,9 @@ The following response should be returned: {"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]} ``` +Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args. +For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines. + Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. @@ -540,6 +543,9 @@ It should return somethins like the following: {"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]} ``` +Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. +For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. + VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). @@ -762,6 +768,9 @@ curl -H 'Accept-Encoding: gzip' http://source-victoriametrics:8428/api/v1/export curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428/api/v1/import -T exported_data.jsonl.gz ``` +Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args. +For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported time series. + Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts diff --git a/app/vmagent/csvimport/request_handler.go b/app/vmagent/csvimport/request_handler.go index 247c998e3..3c5a70819 100644 --- a/app/vmagent/csvimport/request_handler.go +++ b/app/vmagent/csvimport/request_handler.go @@ -6,6 +6,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -18,12 +19,18 @@ var ( // InsertHandler processes csv data from req. func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) + }) }) } -func insertRows(rows []parser.Row) error { +func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -44,6 +51,7 @@ func insertRows(rows []parser.Row) error { Value: tag.Value, }) } + labels = append(labels, extraLabels...) samples = append(samples, prompbmarshal.Sample{ Value: r.Value, Timestamp: r.Timestamp, diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index af672e3c5..b5ef3b833 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -6,6 +6,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -18,13 +19,19 @@ var ( // InsertHandler processes `/api/v1/import/prometheus` request. func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, isGzipped, insertRows) + return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) + }) }) } -func insertRows(rows []parser.Row) error { +func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -45,6 +52,7 @@ func insertRows(rows []parser.Row) error { Value: tag.Value, }) } + labels = append(labels, extraLabels...) samples = append(samples, prompbmarshal.Sample{ Value: r.Value, Timestamp: r.Timestamp, diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index 852747ce6..a02fde0d9 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -21,12 +22,18 @@ var ( // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) + }) }) } -func insertRows(rows []parser.Row) error { +func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -44,6 +51,7 @@ func insertRows(rows []parser.Row) error { Value: bytesutil.ToUnsafeString(tag.Value), }) } + labels = append(labels, extraLabels...) values := r.Values timestamps := r.Timestamps _ = timestamps[len(values)-1] diff --git a/app/vminsert/csvimport/request_handler.go b/app/vminsert/csvimport/request_handler.go index 23590476d..fc858936e 100644 --- a/app/vminsert/csvimport/request_handler.go +++ b/app/vminsert/csvimport/request_handler.go @@ -5,6 +5,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -17,14 +19,18 @@ var ( // InsertHandler processes /api/v1/import/csv requests. func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(req, func(rows []parser.Row) error { - return insertRows(rows) + return insertRows(rows, extraLabels) }) }) } -func insertRows(rows []parser.Row) error { +func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) @@ -38,6 +44,10 @@ func insertRows(rows []parser.Row) error { tag := &r.Tags[j] ctx.AddLabel(tag.Key, tag.Value) } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } if hasRelabeling { ctx.ApplyRelabeling() } diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index 13e779fca..fa7d1068a 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -5,6 +5,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -17,13 +19,19 @@ var ( // InsertHandler processes `/api/v1/import/prometheus` request. func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, isGzipped, insertRows) + return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) + }) }) } -func insertRows(rows []parser.Row) error { +func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) @@ -37,6 +45,10 @@ func insertRows(rows []parser.Row) error { tag := &r.Tags[j] ctx.AddLabel(tag.Key, tag.Value) } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } if hasRelabeling { ctx.ApplyRelabeling() } diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index c015cede2..c5ccac205 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -7,6 +7,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -22,12 +24,18 @@ var ( // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) + }) }) } -func insertRows(rows []parser.Row) error { +func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := getPushCtx() defer putPushCtx(ctx) @@ -46,6 +54,10 @@ func insertRows(rows []parser.Row) error { tag := &r.Tags[j] ic.AddLabelBytes(tag.Key, tag.Value) } + for j := range extraLabels { + label := &extraLabels[j] + ic.AddLabel(label.Name, label.Value) + } if hasRelabeling { ic.ApplyRelabeling() } diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 5a6212d49..007a9e5e1 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -516,6 +516,9 @@ The following response should be returned: {"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]} ``` +Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args. +For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines. + Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. @@ -540,6 +543,9 @@ It should return somethins like the following: {"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]} ``` +Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. +For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. + VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). @@ -762,6 +768,9 @@ curl -H 'Accept-Encoding: gzip' http://source-victoriametrics:8428/api/v1/export curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428/api/v1/import -T exported_data.jsonl.gz ``` +Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args. +For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported time series. + Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts diff --git a/lib/protoparser/common/extra_labels.go b/lib/protoparser/common/extra_labels.go new file mode 100644 index 000000000..18e40f065 --- /dev/null +++ b/lib/protoparser/common/extra_labels.go @@ -0,0 +1,26 @@ +package common + +import ( + "fmt" + "net/http" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +// GetExtraLabels extracts name:value labels from `extra_label=name=value` query args from req. +func GetExtraLabels(req *http.Request) ([]prompbmarshal.Label, error) { + q := req.URL.Query() + var result []prompbmarshal.Label + for _, label := range q["extra_label"] { + tmp := strings.SplitN(label, "=", 2) + if len(tmp) != 2 { + return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", label) + } + result = append(result, prompbmarshal.Label{ + Name: tmp[0], + Value: tmp[1], + }) + } + return result, nil +}