diff --git a/README.md b/README.md index f4b64cc22..e60d13534 100644 --- a/README.md +++ b/README.md @@ -409,6 +409,8 @@ The `/api/v1/export` endpoint should return the following response: Note that Influx line protocol expects [timestamps in *nanoseconds* by default](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp), while VictoriaMetrics stores them with *milliseconds* precision. +Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. +For example, `/write?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. ## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd) @@ -524,6 +526,8 @@ The `/api/v1/export` endpoint should return the following response: {"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]} ``` +Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args. +For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. ## Prometheus querying API usage diff --git a/app/victoria-metrics/main_test.go b/app/victoria-metrics/main_test.go index 926157623..1497668ee 100644 --- a/app/victoria-metrics/main_test.go +++ b/app/victoria-metrics/main_test.go @@ -58,6 +58,7 @@ var ( type test struct { Name string `json:"name"` Data []string `json:"data"` + InsertQuery string `json:"insert_query"` Query []string `json:"query"` ResultMetrics []Metric `json:"result_metrics"` ResultSeries Series `json:"result_series"` @@ -209,7 +210,7 @@ func testWrite(t *testing.T) { t.Errorf("error compressing %v %s", r, err) t.Fail() } - httpWrite(t, testPromWriteHTTPPath, bytes.NewBuffer(data)) + httpWrite(t, testPromWriteHTTPPath, test.InsertQuery, bytes.NewBuffer(data)) } }) @@ -218,7 +219,7 @@ func testWrite(t *testing.T) { test := x t.Run(test.Name, func(t *testing.T) { t.Parallel() - httpWrite(t, testWriteHTTPPath, bytes.NewBufferString(strings.Join(test.Data, "\n"))) + httpWrite(t, testWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n"))) }) } }) @@ -246,7 +247,7 @@ func testWrite(t *testing.T) { t.Run(test.Name, func(t *testing.T) { t.Parallel() logger.Infof("writing %s", test.Data) - httpWrite(t, testOpenTSDBWriteHTTPPath, bytes.NewBufferString(strings.Join(test.Data, "\n"))) + httpWrite(t, testOpenTSDBWriteHTTPPath, test.InsertQuery, bytes.NewBufferString(strings.Join(test.Data, "\n"))) }) } }) @@ -324,10 +325,10 @@ func readIn(readFor string, t *testing.T, insertTime time.Time) []test { return tt } -func httpWrite(t *testing.T, address string, r io.Reader) { +func httpWrite(t *testing.T, address, query string, r io.Reader) { t.Helper() s := newSuite(t) - resp, err := http.Post(address, "", r) + resp, err := http.Post(address+query, "", r) s.noError(err) s.noError(resp.Body.Close()) s.equalInt(resp.StatusCode, 204) diff --git a/app/victoria-metrics/testdata/influxdb/with_extra_labels.json b/app/victoria-metrics/testdata/influxdb/with_extra_labels.json new file mode 100644 index 000000000..e28fd4976 --- /dev/null +++ b/app/victoria-metrics/testdata/influxdb/with_extra_labels.json @@ -0,0 +1,10 @@ +{ + "name": "insert_with_extra_labels", + "data": ["measurement,tag1=value1,tag2=value2 field6=1.23,field5=123 {TIME_NS}"], + "insert_query": "?extra_label=job=test&extra_label=tag2=value10", + "query": ["/api/v1/export?match={__name__!=''}"], + "result_metrics": [ + {"metric":{"__name__":"measurement_field5","tag1":"value1","job": "test","tag2":"value10"},"values":[123], "timestamps": ["{TIME_MS}"]}, + {"metric":{"__name__":"measurement_field6","tag1":"value1","job": "test","tag2":"value10"},"values":[1.23], "timestamps": ["{TIME_MS}"]} + ] +} diff --git a/app/victoria-metrics/testdata/opentsdbhttp/with_extra_labels.json b/app/victoria-metrics/testdata/opentsdbhttp/with_extra_labels.json new file mode 100644 index 000000000..3fa71a2e8 --- /dev/null +++ b/app/victoria-metrics/testdata/opentsdbhttp/with_extra_labels.json @@ -0,0 +1,9 @@ +{ + "name": "insert_with_extra_labels", + "data": ["{\"metric\": \"opentsdbhttp.foobar\", \"value\": 1001, \"timestamp\": {TIME_S}, \"tags\": {\"bar\":\"baz\", \"x\": \"y\"}}"], + "insert_query": "?extra_label=job=open-test&extra_label=x=z", + "query": ["/api/v1/export?match={__name__!=''}"], + "result_metrics": [ + {"metric":{"__name__":"opentsdbhttp.foobar","bar":"baz","x":"z","job": "open-test"},"values":[1001], "timestamps": ["{TIME_MSZ}"]} + ] +} diff --git a/app/victoria-metrics/testdata/prometheus/with_extra_labels.json b/app/victoria-metrics/testdata/prometheus/with_extra_labels.json new file mode 100644 index 000000000..19c4cbb62 --- /dev/null +++ b/app/victoria-metrics/testdata/prometheus/with_extra_labels.json @@ -0,0 +1,9 @@ +{ + "name": "basic_insertion_with_extra_labels", + "insert_query": "?extra_label=job=prom-test&extra_label=baz=bar", + "data": ["[{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.foobar\"},{\"name\":\"baz\",\"value\":\"qux\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]}]"], + "query": ["/api/v1/export?match={__name__!=''}"], + "result_metrics": [ + {"metric":{"__name__":"prometheus.foobar","baz":"bar","job": "prom-test"},"values":[100000], "timestamps": ["{TIME_MS}"]} + ] +} diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index 6235e61f9..b39d365f3 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -33,7 +34,9 @@ var ( // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(r io.Reader) error { return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, false, "", "", insertRows) + return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { + return insertRows(db, rows, nil) + }) }) } @@ -41,17 +44,23 @@ func InsertHandlerForReader(r io.Reader) error { // // See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md func InsertHandlerForHTTP(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" q := req.URL.Query() precision := q.Get("precision") // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") - return parser.ParseStream(req.Body, isGzipped, precision, db, insertRows) + return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { + return insertRows(db, rows, extraLabels) + }) }) } -func insertRows(db string, rows []parser.Row) error { +func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := getPushCtx() defer putPushCtx(ctx) @@ -82,6 +91,7 @@ func insertRows(db string, rows []parser.Row) error { Value: db, }) } + commonLabels = append(commonLabels, extraLabels...) ctx.metricGroupBuf = ctx.metricGroupBuf[:0] if !*skipMeasurement { ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...) diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index 365cbe0aa..b3026ab86 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -6,6 +6,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -19,12 +20,18 @@ var ( // InsertHandler processes HTTP OpenTSDB put requests. // See http://opentsdb.net/docs/build/html/api_http/put.html func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) + }) }) } -func insertRows(rows []parser.Row) error { +func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -45,6 +52,7 @@ func insertRows(rows []parser.Row) error { Value: tag.Value, }) } + labels = append(labels, extraLabels...) samples = append(samples, prompbmarshal.Sample{ Value: r.Value, Timestamp: r.Timestamp, diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index 00dfcd614..0039e4ef4 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -20,12 +21,18 @@ var ( // InsertHandler processes remote write for prometheus. func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + return parser.ParseStream(req, func(tss []prompb.TimeSeries) error { + return insertRows(tss, extraLabels) + }) }) } -func insertRows(timeseries []prompb.TimeSeries) error { +func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -44,6 +51,7 @@ func insertRows(timeseries []prompb.TimeSeries) error { Value: bytesutil.ToUnsafeString(label.Value), }) } + labels = append(labels, extraLabels...) samplesLen := len(samples) for i := range ts.Samples { sample := &ts.Samples[i] diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index b841dd0d2..4e4e17c28 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -11,6 +11,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -33,7 +35,9 @@ var ( // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(r io.Reader) error { return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, false, "", "", insertRows) + return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { + return insertRows(db, rows, nil) + }) }) } @@ -41,17 +45,23 @@ func InsertHandlerForReader(r io.Reader) error { // // See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md func InsertHandlerForHTTP(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" q := req.URL.Query() precision := q.Get("precision") // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") - return parser.ParseStream(req.Body, isGzipped, precision, db, insertRows) + return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { + return insertRows(db, rows, extraLabels) + }) }) } -func insertRows(db string, rows []parser.Row) error { +func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := getPushCtx() defer putPushCtx(ctx) @@ -78,6 +88,10 @@ func insertRows(db string, rows []parser.Row) error { if !hasDBKey { ic.AddLabel("db", db) } + for j := range extraLabels { + label := &extraLabels[j] + ic.AddLabel(label.Name, label.Value) + } ctx.metricGroupBuf = ctx.metricGroupBuf[:0] if !*skipMeasurement { ctx.metricGroupBuf = append(ctx.metricGroupBuf, r.Measurement...) diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index 927d083b2..f468d88d6 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -6,6 +6,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -22,15 +24,21 @@ func InsertHandler(req *http.Request) error { path := req.URL.Path switch path { case "/api/put": + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) + }) }) default: return fmt.Errorf("unexpected path requested on HTTP OpenTSDB server: %q", path) } } -func insertRows(rows []parser.Row) error { +func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) @@ -44,6 +52,10 @@ func insertRows(rows []parser.Row) error { tag := &r.Tags[j] ctx.AddLabel(tag.Key, tag.Value) } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } if hasRelabeling { ctx.ApplyRelabeling() } diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index f4ff53814..15f3895ed 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -6,6 +6,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" @@ -18,12 +20,18 @@ var ( // InsertHandler processes remote write for prometheus. func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, insertRows) + return parser.ParseStream(req, func(tss []prompb.TimeSeries) error { + return insertRows(tss, extraLabels) + }) }) } -func insertRows(timeseries []prompb.TimeSeries) error { +func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error { ctx := common.GetInsertCtx() defer common.PutInsertCtx(ctx) @@ -42,6 +50,10 @@ func insertRows(timeseries []prompb.TimeSeries) error { for _, srcLabel := range srcLabels { ctx.AddLabelBytes(srcLabel.Name, srcLabel.Value) } + for j := range extraLabels { + label := &extraLabels[j] + ctx.AddLabel(label.Name, label.Value) + } if hasRelabeling { ctx.ApplyRelabeling() } diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index f4b64cc22..e60d13534 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -409,6 +409,8 @@ The `/api/v1/export` endpoint should return the following response: Note that Influx line protocol expects [timestamps in *nanoseconds* by default](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp), while VictoriaMetrics stores them with *milliseconds* precision. +Extra labels may be added to all the written time series by passing `extra_label=name=value` query args. +For example, `/write?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. ## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd) @@ -524,6 +526,8 @@ The `/api/v1/export` endpoint should return the following response: {"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]} ``` +Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args. +For example, `/api/put?extra_label=foo=bar` would add `{foo="bar"}` label to all the ingested metrics. ## Prometheus querying API usage