From 68e1cf8942bea3e9d53d6cf911f43ce9b3837213 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 9 Dec 2019 20:58:19 +0200 Subject: [PATCH] app/vminsert: add `/api/v1/import` handler Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 --- README.md | 37 +++- app/vminsert/common/insert_ctx.go | 22 +- app/vminsert/common/lines_reader.go | 15 +- app/vminsert/influx/request_handler.go | 2 +- app/vminsert/main.go | 12 ++ app/vminsert/vmimport/parser.go | 202 ++++++++++++++++++ app/vminsert/vmimport/parser_test.go | 216 ++++++++++++++++++++ app/vminsert/vmimport/parser_timing_test.go | 25 +++ app/vminsert/vmimport/request_handler.go | 160 +++++++++++++++ 9 files changed, 683 insertions(+), 8 deletions(-) create mode 100644 app/vminsert/vmimport/parser.go create mode 100644 app/vminsert/vmimport/parser_test.go create mode 100644 app/vminsert/vmimport/parser_timing_test.go create mode 100644 app/vminsert/vmimport/request_handler.go diff --git a/README.md b/README.md index 5d530a639..b482e4361 100644 --- a/README.md +++ b/README.md @@ -49,11 +49,12 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM * Storage is protected from corruption on unclean shutdown (i.e. OOM, hardware reset or `kill -9`) thanks to [the storage architecture](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). * Supports metrics' ingestion and [backfilling](#backfilling) via the following protocols: * [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) - * [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/) - * [Graphite plaintext protocol](https://graphite.readthedocs.io/en/latest/feeding-carbon.html) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon) + * [InfluxDB line protocol](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) + * [Graphite plaintext protocol](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon) if `-graphiteListenAddr` is set. - * [OpenTSDB put message](http://opentsdb.net/docs/build/html/api_telnet/put.html) if `-opentsdbListenAddr` is set. - * [HTTP OpenTSDB /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html) if `-opentsdbHTTPListenAddr` is set. + * [OpenTSDB put message](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set. + * [HTTP OpenTSDB /api/put requests](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set. + * [/api/v1/import](#how-to-import-time-series-data) * Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads. * Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster). @@ -84,6 +85,7 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM - [How to work with snapshots?](#how-to-work-with-snapshots) - [How to delete time series?](#how-to-delete-time-series) - [How to export time series?](#how-to-export-time-series) + - [How to import time series data?](#how-to-import-time-series-data) - [Federation](#federation) - [Capacity planning](#capacity-planning) - [High availability](#high-availability) @@ -513,6 +515,33 @@ Each JSON line would contain data for a single time series. An example output: Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values. +Exported data can be imported via POST'ing it to [/api/v1/import](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data). + + +### How to import time series data? + +Time series data can be imported via any supported ingestion protocol: + +* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) +* [Influx line protocol](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) +* [Graphite plaintext protocol](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) +* [OpenTSDB telnet put protocol](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#sending-data-via-telnet-put-protocol) +* [OpenTSDB http /api/put](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#sending-opentsdb-data-via-http-apiput-requests) +* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-export-time-series). + +The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`: + +``` +# Export the data from : +curl -s 'http://source-victoriametrics:8428/api/v1/export' -d 'match={__name__!=""}' > exported_data.jsonl + +# Import the data to : +curl -X POST 'http://destination-victoriametrics:8428/api/v1/import' -T exported_data.jsonl +``` + +Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts +and importing them concurrently. Note that the original file must be split on newlines. + ### Federation diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index 47742b8f3..1de903808 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -47,7 +47,7 @@ func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label) return metricNameRaw[:len(metricNameRaw):len(metricNameRaw)] } -// WriteDataPoint writes (timestamp, value) with the given prefix and lables into ctx buffer. +// WriteDataPoint writes (timestamp, value) with the given prefix and labels into ctx buffer. func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) { metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels) ctx.addRow(metricNameRaw, timestamp, value) @@ -78,6 +78,26 @@ func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float6 mr.Value = value } +// AddLabelBytes adds (name, value) label to ctx.Labels. +// +// name and value must exist until ctx.Labels is used. +func (ctx *InsertCtx) AddLabelBytes(name, value []byte) { + labels := ctx.Labels + if cap(labels) > len(labels) { + labels = labels[:len(labels)+1] + } else { + labels = append(labels, prompb.Label{}) + } + label := &labels[len(labels)-1] + + // Do not copy name and value contents for performance reasons. + // This reduces GC overhead on the number of objects and allocations. + label.Name = name + label.Value = value + + ctx.Labels = labels +} + // AddLabel adds (name, value) label to ctx.Labels. // // name and value must exist until ctx.Labels is used. diff --git a/app/vminsert/common/lines_reader.go b/app/vminsert/common/lines_reader.go index 74973f126..260cdc4eb 100644 --- a/app/vminsert/common/lines_reader.go +++ b/app/vminsert/common/lines_reader.go @@ -20,6 +20,17 @@ const defaultBlockSize = 64 * 1024 // // Returns (dstBuf, tailBuf). func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) { + return ReadLinesBlockExt(r, dstBuf, tailBuf, maxLineSize) +} + +// ReadLinesBlockExt reads a block of lines delimited by '\n' from tailBuf and r into dstBuf. +// +// Trailing chars after the last newline are put into tailBuf. +// +// Returns (dstBuf, tailBuf). +// +// maxLineLen limits the maximum length of a single line. +func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) { if cap(dstBuf) < defaultBlockSize { dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize) } @@ -48,8 +59,8 @@ again: nn := bytes.LastIndexByte(dstBuf[len(dstBuf)-n:], '\n') if nn < 0 { // Didn't found at least a single line. - if len(dstBuf) > maxLineSize { - return dstBuf, tailBuf, fmt.Errorf("too long line: more than %d bytes", maxLineSize) + if len(dstBuf) > maxLineLen { + return dstBuf, tailBuf, fmt.Errorf("too long line: more than %d bytes", maxLineLen) } if cap(dstBuf) < 2*len(dstBuf) { // Increase dsbBuf capacity, so more data could be read into it. diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index d14937060..0e397c29d 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -82,7 +82,7 @@ func (ctx *pushCtx) InsertRows(db string) error { rows := ctx.Rows.Rows rowsLen := 0 for i := range rows { - rowsLen += len(rows[i].Tags) + rowsLen += len(rows[i].Fields) } ic := &ctx.Common ic.Reset(rowsLen) diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 23e9e27df..e16813df4 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" @@ -67,6 +68,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true + case "/api/v1/import": + vmimportRequests.Inc() + if err := vmimport.InsertHandler(r); err != nil { + vmimportErrors.Inc() + httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) + return true + } + return true case "/write", "/api/v2/write": influxWriteRequests.Inc() if err := influx.InsertHandler(r); err != nil { @@ -92,6 +101,9 @@ var ( prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/write", protocol="prometheus"}`) prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/write", protocol="prometheus"}`) + vmimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import", protocol="vm"}`) + vmimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import", protocol="vm"}`) + influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`) influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`) diff --git a/app/vminsert/vmimport/parser.go b/app/vminsert/vmimport/parser.go new file mode 100644 index 000000000..785b0845d --- /dev/null +++ b/app/vminsert/vmimport/parser.go @@ -0,0 +1,202 @@ +package vmimport + +import ( + "fmt" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" + "github.com/valyala/fastjson" +) + +// Rows contains parsed rows from `/api/v1/import` request. +type Rows struct { + Rows []Row + + tu tagsUnmarshaler +} + +// Reset resets rs. +func (rs *Rows) Reset() { + for i := range rs.Rows { + rs.Rows[i].reset() + } + rs.Rows = rs.Rows[:0] + + rs.tu.reset() +} + +// Unmarshal unmarshals influx line protocol rows from s. +// +// See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/ +// +// s must be unchanged until rs is in use. +func (rs *Rows) Unmarshal(s string) { + rs.tu.reset() + rs.Rows = unmarshalRows(rs.Rows[:0], s, &rs.tu) +} + +// Row is a single row from `/api/v1/import` request. +type Row struct { + Tags []Tag + Values []float64 + Timestamps []int64 +} + +func (r *Row) reset() { + r.Tags = nil + r.Values = r.Values[:0] + r.Timestamps = r.Timestamps[:0] +} + +func (r *Row) unmarshal(s string, tu *tagsUnmarshaler) error { + r.reset() + v, err := tu.p.Parse(s) + if err != nil { + return fmt.Errorf("cannot parse json line: %s", err) + } + + // Unmarshal tags + metric := v.GetObject("metric") + if metric == nil { + return fmt.Errorf("missing `metric` object") + } + tagsStart := len(tu.tagsPool) + if err := tu.unmarshalTags(metric); err != nil { + return fmt.Errorf("cannot unmarshal `metric`: %s", err) + } + tags := tu.tagsPool[tagsStart:] + r.Tags = tags[:len(tags):len(tags)] + if len(r.Tags) == 0 { + return fmt.Errorf("missing tags") + } + + // Unmarshal values + values := v.GetArray("values") + if len(values) == 0 { + return fmt.Errorf("missing `values` array") + } + for i, v := range values { + f, err := v.Float64() + if err != nil { + return fmt.Errorf("cannot unmarshal value at position %d: %s", i, err) + } + r.Values = append(r.Values, f) + } + + // Unmarshal timestamps + timestamps := v.GetArray("timestamps") + if len(timestamps) == 0 { + return fmt.Errorf("missing `timestamps` array") + } + for i, v := range timestamps { + ts, err := v.Int64() + if err != nil { + return fmt.Errorf("cannot unmarshal timestamp at position %d: %s", i, err) + } + r.Timestamps = append(r.Timestamps, ts) + } + + if len(r.Timestamps) != len(r.Values) { + return fmt.Errorf("`timestamps` array size must match `values` array size; got %d; want %d", len(r.Timestamps), len(r.Values)) + } + return nil +} + +// Tag represents `/api/v1/import` tag. +type Tag struct { + Key []byte + Value []byte +} + +func (tag *Tag) reset() { + // tag.Key and tag.Value point to tu.bytesPool, so there is no need in keeping these byte slices here. + tag.Key = nil + tag.Value = nil +} + +type tagsUnmarshaler struct { + p fastjson.Parser + tagsPool []Tag + bytesPool []byte + err error +} + +func (tu *tagsUnmarshaler) reset() { + for i := range tu.tagsPool { + tu.tagsPool[i].reset() + } + tu.tagsPool = tu.tagsPool[:0] + + tu.bytesPool = tu.bytesPool[:0] + tu.err = nil +} + +func (tu *tagsUnmarshaler) addTag() *Tag { + dst := tu.tagsPool + if cap(dst) > len(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, Tag{}) + } + tag := &dst[len(dst)-1] + tu.tagsPool = dst + return tag +} + +func (tu *tagsUnmarshaler) addBytes(b []byte) []byte { + bytesPoolLen := len(tu.bytesPool) + tu.bytesPool = append(tu.bytesPool, b...) + bCopy := tu.bytesPool[bytesPoolLen:] + return bCopy[:len(bCopy):len(bCopy)] +} + +func (tu *tagsUnmarshaler) unmarshalTags(o *fastjson.Object) error { + tu.err = nil + o.Visit(func(key []byte, v *fastjson.Value) { + tag := tu.addTag() + tag.Key = tu.addBytes(key) + sb, err := v.StringBytes() + if err != nil && tu.err != nil { + tu.err = fmt.Errorf("cannot parse value for tag %q: %s", tag.Key, err) + } + tag.Value = tu.addBytes(sb) + }) + return tu.err +} + +func unmarshalRows(dst []Row, s string, tu *tagsUnmarshaler) []Row { + for len(s) > 0 { + n := strings.IndexByte(s, '\n') + if n < 0 { + // The last line. + return unmarshalRow(dst, s, tu) + } + dst = unmarshalRow(dst, s[:n], tu) + s = s[n+1:] + } + return dst +} + +func unmarshalRow(dst []Row, s string, tu *tagsUnmarshaler) []Row { + if len(s) > 0 && s[len(s)-1] == '\r' { + s = s[:len(s)-1] + } + if len(s) == 0 { + return dst + } + if cap(dst) > len(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, Row{}) + } + r := &dst[len(dst)-1] + if err := r.unmarshal(s, tu); err != nil { + dst = dst[:len(dst)-1] + logger.Errorf("cannot unmarshal json line %q: %s; skipping it", s, err) + invalidLines.Inc() + } + return dst +} + +var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="vmimport"}`) diff --git a/app/vminsert/vmimport/parser_test.go b/app/vminsert/vmimport/parser_test.go new file mode 100644 index 000000000..7ed445af1 --- /dev/null +++ b/app/vminsert/vmimport/parser_test.go @@ -0,0 +1,216 @@ +package vmimport + +import ( + "reflect" + "testing" +) + +func TestRowsUnmarshalFailure(t *testing.T) { + f := func(s string) { + t.Helper() + var rows Rows + rows.Unmarshal(s) + if len(rows.Rows) != 0 { + t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows)) + } + + // Try again + rows.Unmarshal(s) + if len(rows.Rows) != 0 { + t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows)) + } + } + + // Invalid json line + f("") + f("\n") + f("foo\n") + f("123") + f("[1,3]") + f("{}") + f("[]") + f(`{"foo":"bar"}`) + + // Invalid metric + f(`{"metric":123,"values":[1,2],"timestamps":[3,4]}`) + f(`{"metric":[123],"values":[1,2],"timestamps":[3,4]}`) + f(`{"metric":[],"values":[1,2],"timestamps":[3,4]}`) + f(`{"metric":{},"values":[1,2],"timestamps":[3,4]}`) + f(`{"metric":null,"values":[1,2],"timestamps":[3,4]}`) + f(`{"values":[1,2],"timestamps":[3,4]}`) + + // Invalid values + f(`{"metric":{"foo":"bar"},"values":1,"timestamps":[3,4]}`) + f(`{"metric":{"foo":"bar"},"values":{"x":1},"timestamps":[3,4]}`) + f(`{"metric":{"foo":"bar"},"values":{"x":1},"timestamps":[3,4]}`) + f(`{"metric":{"foo":"bar"},"values":null,"timestamps":[3,4]}`) + f(`{"metric":{"foo":"bar"},"timestamps":[3,4]}`) + + // Invalid timestamps + f(`{"metric":{"foo":"bar"},"values":[1,2],"timestamps":3}`) + f(`{"metric":{"foo":"bar"},"values":[1,2],"timestamps":false}`) + f(`{"metric":{"foo":"bar"},"values":[1,2],"timestamps":{}}`) + f(`{"metric":{"foo":"bar"},"values":[1,2]}`) + + // values and timestamps count mismatch + f(`{"metric":{"foo":"bar"},"values":[],"timestamps":[]}`) + f(`{"metric":{"foo":"bar"},"values":[],"timestamps":[1]}`) + f(`{"metric":{"foo":"bar"},"values":[2],"timestamps":[]}`) + f(`{"metric":{"foo":"bar"},"values":[2],"timestamps":[3,4]}`) + f(`{"metric":{"foo":"bar"},"values":[2,3],"timestamps":[4]}`) + + // Garbage after the line + f(`{"metric":{"foo":"bar"},"values":[2],"timestamps":[4]}{}`) +} + +func TestRowsUnmarshalSuccess(t *testing.T) { + f := func(s string, rowsExpected *Rows) { + t.Helper() + var rows Rows + rows.Unmarshal(s) + if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { + t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) + } + + // Try unmarshaling again + rows.Unmarshal(s) + if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { + t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) + } + + rows.Reset() + if len(rows.Rows) != 0 { + t.Fatalf("non-empty rows after reset: %+v", rows.Rows) + } + } + + // Empty line + f("", &Rows{}) + f("\n\n", &Rows{}) + f("\n\r\n", &Rows{}) + + // Single line with a single tag + f(`{"metric":{"foo":"bar"},"values":[1.23],"timestamps":[456]}`, &Rows{ + Rows: []Row{{ + Tags: []Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }}, + Values: []float64{1.23}, + Timestamps: []int64{456}, + }}, + }) + + // Line with multiple tags + f(`{"metric":{"foo":"bar","baz":"xx"},"values":[1.23, -3.21],"timestamps" : [456,789]}`, &Rows{ + Rows: []Row{{ + Tags: []Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("baz"), + Value: []byte("xx"), + }, + }, + Values: []float64{1.23, -3.21}, + Timestamps: []int64{456, 789}, + }}, + }) + + // Multiple lines + f(`{"metric":{"foo":"bar","baz":"xx"},"values":[1.23, -3.21],"timestamps" : [456,789]} +{"metric":{"__name__":"xx"},"values":[34],"timestamps" : [11]} +`, &Rows{ + Rows: []Row{ + { + Tags: []Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("baz"), + Value: []byte("xx"), + }, + }, + Values: []float64{1.23, -3.21}, + Timestamps: []int64{456, 789}, + }, + { + Tags: []Tag{ + { + Key: []byte("__name__"), + Value: []byte("xx"), + }, + }, + Values: []float64{34}, + Timestamps: []int64{11}, + }, + }, + }) + + // Multiple lines with invalid line in the middle. + f(`{"metric":{"xfoo":"bar","baz":"xx"},"values":[1.232, -3.21],"timestamps" : [456,7890]} +garbage here +{"metric":{"__name__":"xxy"},"values":[34],"timestamps" : [111]}`, &Rows{ + Rows: []Row{ + { + Tags: []Tag{ + { + Key: []byte("xfoo"), + Value: []byte("bar"), + }, + { + Key: []byte("baz"), + Value: []byte("xx"), + }, + }, + Values: []float64{1.232, -3.21}, + Timestamps: []int64{456, 7890}, + }, + { + Tags: []Tag{ + { + Key: []byte("__name__"), + Value: []byte("xxy"), + }, + }, + Values: []float64{34}, + Timestamps: []int64{111}, + }, + }, + }) + + // No newline after the second line. + f(`{"metric":{"foo":"bar","baz":"xx"},"values":[1.23, -3.21],"timestamps" : [456,789]} +{"metric":{"__name__":"xx"},"values":[34],"timestamps" : [11]}`, &Rows{ + Rows: []Row{ + { + Tags: []Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("baz"), + Value: []byte("xx"), + }, + }, + Values: []float64{1.23, -3.21}, + Timestamps: []int64{456, 789}, + }, + { + Tags: []Tag{ + { + Key: []byte("__name__"), + Value: []byte("xx"), + }, + }, + Values: []float64{34}, + Timestamps: []int64{11}, + }, + }, + }) +} diff --git a/app/vminsert/vmimport/parser_timing_test.go b/app/vminsert/vmimport/parser_timing_test.go new file mode 100644 index 000000000..e512a3a1d --- /dev/null +++ b/app/vminsert/vmimport/parser_timing_test.go @@ -0,0 +1,25 @@ +package vmimport + +import ( + "fmt" + "testing" +) + +func BenchmarkRowsUnmarshal(b *testing.B) { + s := `{"metric":{"__name__":"up","job":"node_exporter","instance":"localhost:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]} +{"metric":{"__name__":"up","job":"prometheus","instance":"localhost:9090"},"values":[1,1,1],"timestamps":[1549891461511,1549891476511,1549891491511]} +{"metric":{"__name__":"up","job":"node_exporter","instance":"foobar.com:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]} +{"metric":{"__name__":"up","job":"prometheus","instance":"xxx.yyy.zzz:9090"},"values":[1,1,1],"timestamps":[1549891461511,1549891476511,1549891491511]} +` + b.SetBytes(int64(len(s))) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var rows Rows + for pb.Next() { + rows.Unmarshal(s) + if len(rows.Rows) != 4 { + panic(fmt.Errorf("unexpected number of rows parsed; got %d; want 4", len(rows.Rows))) + } + } + }) +} diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go new file mode 100644 index 000000000..d6fd76802 --- /dev/null +++ b/app/vminsert/vmimport/request_handler.go @@ -0,0 +1,160 @@ +package vmimport + +import ( + "flag" + "fmt" + "io" + "net/http" + "runtime" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/metrics" +) + +var maxLineLen = flag.Int("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by `/api/v1/import`") + +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="vmimport"}`) + rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="vmimport"}`) +) + +// InsertHandler processes `/api/v1/import` request. +// +// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 +func InsertHandler(req *http.Request) error { + return concurrencylimiter.Do(func() error { + return insertHandlerInternal(req) + }) +} + +func insertHandlerInternal(req *http.Request) error { + readCalls.Inc() + + r := req.Body + if req.Header.Get("Content-Encoding") == "gzip" { + zr, err := common.GetGzipReader(r) + if err != nil { + return fmt.Errorf("cannot read gzipped vmimport data: %s", err) + } + defer common.PutGzipReader(zr) + r = zr + } + + ctx := getPushCtx() + defer putPushCtx(ctx) + for ctx.Read(r) { + if err := ctx.InsertRows(); err != nil { + return err + } + } + return ctx.Error() +} + +func (ctx *pushCtx) InsertRows() error { + rows := ctx.Rows.Rows + rowsLen := 0 + for i := range rows { + rowsLen += len(rows[i].Values) + } + ic := &ctx.Common + ic.Reset(rowsLen) + rowsTotal := 0 + for i := range rows { + r := &rows[i] + ic.Labels = ic.Labels[:0] + for j := range r.Tags { + tag := &r.Tags[j] + ic.AddLabelBytes(tag.Key, tag.Value) + } + ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels) + values := r.Values + timestamps := r.Timestamps + _ = timestamps[len(values)-1] + for j, value := range values { + timestamp := timestamps[j] + ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value) + } + rowsTotal += len(values) + } + rowsInserted.Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) + return ic.FlushBufs() +} + +func (ctx *pushCtx) Read(r io.Reader) bool { + if ctx.err != nil { + return false + } + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(r, ctx.reqBuf, ctx.tailBuf, *maxLineLen) + if ctx.err != nil { + if ctx.err != io.EOF { + readErrors.Inc() + ctx.err = fmt.Errorf("cannot read vmimport data: %s", ctx.err) + } + return false + } + ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) + return true +} + +var ( + readCalls = metrics.NewCounter(`vm_read_calls_total{name="vmimport"}`) + readErrors = metrics.NewCounter(`vm_read_errors_total{name="vmimport"}`) +) + +type pushCtx struct { + Rows Rows + Common common.InsertCtx + + reqBuf []byte + tailBuf []byte + metricNameBuf []byte + + err error +} + +func (ctx *pushCtx) Error() error { + if ctx.err == io.EOF { + return nil + } + return ctx.err +} + +func (ctx *pushCtx) reset() { + ctx.Rows.Reset() + ctx.Common.Reset(0) + + ctx.reqBuf = ctx.reqBuf[:0] + ctx.tailBuf = ctx.tailBuf[:0] + ctx.metricNameBuf = ctx.metricNameBuf[:0] + + ctx.err = nil +} + +func getPushCtx() *pushCtx { + select { + case ctx := <-pushCtxPoolCh: + return ctx + default: + if v := pushCtxPool.Get(); v != nil { + return v.(*pushCtx) + } + return &pushCtx{} + } +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + select { + case pushCtxPoolCh <- ctx: + default: + pushCtxPool.Put(ctx) + } +} + +var pushCtxPool sync.Pool +var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))