From 95688cbfc50cfc9dde6b1615c9e862b86967697a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 26 Sep 2020 04:29:45 +0300 Subject: [PATCH] all: add native format for data export and import The data can be exported via [/api/v1/export/native](https://victoriametrics.github.io/#how-to-export-data-in-native-format) handler and imported via [/api/v1/import/native](https://victoriametrics.github.io/#how-to-import-data-in-native-format) handler. --- README.md | 281 +++++++++++------- app/vmagent/README.md | 3 +- app/vmagent/main.go | 13 + app/vmagent/native/request_handler.go | 82 ++++++ app/vmagent/vmimport/request_handler.go | 5 +- app/vminsert/main.go | 13 + app/vminsert/native/request_handler.go | 113 +++++++ app/vmselect/main.go | 11 + app/vmselect/netstorage/netstorage.go | 137 +++++++-- app/vmselect/prometheus/export.qtpl | 31 +- app/vmselect/prometheus/export.qtpl.go | 373 ++++++++++++------------ app/vmselect/prometheus/prometheus.go | 226 +++++++++++--- docs/FAQ.md | 6 +- docs/Single-server-VictoriaMetrics.md | 281 +++++++++++------- docs/vmagent.md | 3 +- lib/protoparser/native/streamparser.go | 197 +++++++++++++ lib/storage/block.go | 137 ++++++++- lib/storage/block_test.go | 116 ++++++++ 18 files changed, 1523 insertions(+), 505 deletions(-) create mode 100644 app/vmagent/native/request_handler.go create mode 100644 app/vminsert/native/request_handler.go create mode 100644 lib/protoparser/native/streamparser.go create mode 100644 lib/storage/block_test.go diff --git a/README.md b/README.md index f46d550d8..ba1eb3558 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,8 @@ See [features available for enterprise customers](https://github.com/VictoriaMet if `-graphiteListenAddr` is set. * [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set. * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set. - * [/api/v1/import](#how-to-import-time-series-data). + * [JSON line format](#how-to-import-data-in-json-line-format). + * [Native binary format](#how-to-import-data-in-native-format). * [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format). * [Arbitrary CSV data](#how-to-import-csv-data). * Supports metrics' relabeling. See [these docs](#relabeling) for details. @@ -100,8 +101,6 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) * [Querying Graphite data](#querying-graphite-data) * [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents) -* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format) -* [How to import CSV data](#how-to-import-csv-data) * [Prometheus querying API usage](#prometheus-querying-api-usage) * [Prometheus querying API enhancements](#prometheus-querying-api-enhancements) * [Graphite Metrics API usage](#graphite-metrics-api-usage) @@ -117,7 +116,13 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [How to delete time series](#how-to-delete-time-series) * [Forced merge](#forced-merge) * [How to export time series](#how-to-export-time-series) + * [How to export data in native format](#how-to-export-data-in-native-format) + * [How to export data in JSON line format](#how-to-export-data-in-json-line-format) * [How to import time series data](#how-to-import-time-series-data) + * [How to import data in native format](#how-to-import-data-in-native-format) + * [How to import data in json line format](#how-to-import-data-in-json-line-format) + * [How to import CSV data](#how-to-import-csv-data) + * [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format) * [Relabeling](#relabeling) * [Federation](#federation) * [Capacity planning](#capacity-planning) @@ -345,7 +350,7 @@ curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'ht ``` An arbitrary number of lines delimited by '\n' (aka newline char) may be sent in a single request. -After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: ```bash curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}' @@ -381,7 +386,7 @@ echo "foo.bar.baz;tag1=value1;tag2=value2 123 `date +%s`" | nc -N localhost 2003 VictoriaMetrics sets the current time if the timestamp is omitted. An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go. -After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: ```bash curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz' @@ -425,7 +430,7 @@ echo "put foo.bar.baz `date +%s` 123 tag1=value1 tag2=value2" | nc -N localhost ``` An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go. -After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: ```bash curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz' @@ -460,7 +465,7 @@ Example for writing multiple data points in a single request: curl -H 'Content-Type: application/json' -d '[{"metric":"foo","value":45.34},{"metric":"bar","value":43}]' http://localhost:4242/api/put ``` -After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: ```bash curl -G 'http://localhost:8428/api/v1/export' -d 'match[]=x.y.z' -d 'match[]=foo' -d 'match[]=bar' @@ -475,91 +480,6 @@ The `/api/v1/export` endpoint should return the following response: ``` -### How to import CSV data - -Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg. -The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon: - -``` -:: -``` - -* `` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary. -* `` describes the column type. Supported types are: - * `metric` - the corresponding CSV column at `` contains metric value, which must be integer or floating-point number. - The metric name is read from the ``. CSV line must have at least a single metric field. Multiple metric fields per CSV line is OK. - * `label` - the corresponding CSV column at `` contains label value. The label name is read from the ``. - CSV line may have arbitrary number of label fields. All these labels are attached to all the configured metrics. - * `time` - the corresponding CSV column at `` contains metric time. CSV line may contain either one or zero columns with time. - If CSV line has no time, then the current time is used. The time is applied to all the configured metrics. - The format of the time is configured via ``. Supported time formats are: - * `unix_s` - unix timestamp in seconds. - * `unix_ms` - unix timestamp in milliseconds. - * `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds. - * `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`. - * `custom:` - custom layout for the timestamp. The `` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse). - -Each request to `/api/v1/import/csv` may contain arbitrary number of CSV lines. - -Example for importing CSV data via `/api/v1/import/csv`: - -```bash -curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' -curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' -``` - -After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: - -```bash -curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}' -``` - -The following response should be returned: -```bash -{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]} -{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]} -{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]} -{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]} -``` - -Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args. -For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines. - -Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. - - -### How to import data in Prometheus exposition format - -VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) -via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics: - -```bash -curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus' -``` - -The following command may be used for verifying the imported data: - -```bash -curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}' -``` - -It should return something like the following: - -``` -{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]} -``` - -Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. -For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. - -If timestamp is missing in ` ` Prometheus exposition format line, then the current timestamp is used during data ingestion. -It can be overriden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`. - -VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. - -VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). - - ### Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): @@ -756,11 +676,35 @@ when new data is ingested into it. ### How to export time series -Send a request to `http://:8428/api/v1/export?match[]=`, +VictoriaMetrics provides the following handlers for exporting data: + +* `/api/v1/export/native` for exporting data in native binary format. This is the most efficient format for data export. + See [these docs](#how-to-export-data-in-native-format) for details. +* `/api/v1/export` for exporing data in JSON line format. See [these docs](#how-to-export-data-in-json-line-format) for details. + + +#### How to export data in native format + +Send a request to `http://:8428/api/v1/export/native?match[]=`, +where `` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) +for metrics to export. Use `{__name__!=""}` selector for fetching all the time series. + +Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either +unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values. + +The exported data can be imported to VictoriaMetrics via [/api/v1/import/native](#how-to-import-data-in-native-format). + + +#### How to export data in JSON line format + +Consider [exporting data in native format](#how-to-export-data-in-native-format) if big amounts of data must be migrated between VictoriaMetrics instances, +since exporting in native format usually consumes lower amounts of CPU and memory resources, while the resulting exported data occupies lower amounts of disk space. + +In order to export data in JSON line format, send a request to `http://:8428/api/v1/export?match[]=`, where `` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for metrics to export. Use `{__name__!=""}` selector for fetching all the time series. The response would contain all the data for the selected time series in [JSON streaming format](https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON). -Each JSON line would contain data for a single time series. An example output: +Each JSON line contains samples for a single time series. An example output: ```jsonl {"metric":{"__name__":"up","job":"node_exporter","instance":"localhost:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]} @@ -770,8 +714,9 @@ Each JSON line would contain data for a single time series. An example output: Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values. -Optional `max_rows_per_line` arg may be added to the request in order to limit the maximum number of rows exported per each JSON line. -By default each JSON line contains all the rows for a single time series. +Optional `max_rows_per_line` arg may be added to the request for limiting the maximum number of rows exported per each JSON line. +Optional `reduce_mem_usage=1` arg may be added to the request for reducing memory usage when exporting big number of time series. +In this case the output may contain multiple lines with distinct samples for the same time series. Pass `Accept-Encoding: gzip` HTTP header in the request to `/api/v1/export` in order to reduce network bandwidth during exporing big amounts of time series data. This enables gzip compression for the exported data. Example for exporting gzipped data: @@ -782,22 +727,58 @@ curl -H 'Accept-Encoding: gzip' http://localhost:8428/api/v1/export -d 'match[]= The maximum duration for each request to `/api/v1/export` is limited by `-search.maxExportDuration` command-line flag. -Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-time-series-data). +Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-data-in-json-line-format). + ### How to import time series data Time series data can be imported via any supported ingestion protocol: -* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) -* [Influx line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) -* [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) -* [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol) -* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests) -* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series). -* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details. -* `/api/v1/import/prometheus` http POST handler, which accepts data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. +* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). +* Influx line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. +* Graphite plaintext protocol. See[these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details. +* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details. +* OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details. +* `/api/v1/import` for importing data obtained from [/api/v1/export](#how-to-export-data-in-json-line-format). + See [these docs](##how-to-import-data-in-json-line-format) for details. +* `/api/v1/import/native` for importing data obtained from [/api/v1/export/native](#how-to-export-data-in-native-format). + See [these docs](#how-to-import-data-in-native-format) for details. +* `/api/v1/import/csv` for importing arbitrary CSV data. See [these docs](#how-to-import-csv-data) for details. +* `/api/v1/import/prometheus` for importing data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. -The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`: + +#### How to import data in native format + +The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import/native`. +Example for importing data obtained via [/api/v1/export/native](#how-to-export-data-in-native-format): + +```bash +# Export the data from : +curl http://source-victoriametrics:8428/api/v1/export/native -d 'match={__name__!=""}' > exported_data.bin + +# Import the data to : +curl -X POST http://destination-victoriametrics:8428/api/v1/import/native -T exported_data.bin +``` + +Pass `Content-Encoding: gzip` HTTP request header to `/api/v1/import/native` for importing gzipped data: + +```bash +# Export gzipped data from : +curl -H 'Accept-Encoding: gzip' http://source-victoriametrics:8428/api/v1/export/native -d 'match={__name__!=""}' > exported_data.bin.gz + +# Import gzipped data to : +curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428/api/v1/import/native -T exported_data.bin.gz +``` + +Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args. +For example, `/api/v1/import/native?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported time series. + +Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. + + +#### How to import data in JSON line format + +Example for importing data obtained via [/api/v1/export](#how-to-export-data-in-json-line-format): ```bash # Export the data from : @@ -823,6 +804,94 @@ For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. +#### How to import CSV data + +Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg. +The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon: + +``` +:: +``` + +* `` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary. +* `` describes the column type. Supported types are: + * `metric` - the corresponding CSV column at `` contains metric value, which must be integer or floating-point number. + The metric name is read from the ``. CSV line must have at least a single metric field. Multiple metric fields per CSV line is OK. + * `label` - the corresponding CSV column at `` contains label value. The label name is read from the ``. + CSV line may have arbitrary number of label fields. All these labels are attached to all the configured metrics. + * `time` - the corresponding CSV column at `` contains metric time. CSV line may contain either one or zero columns with time. + If CSV line has no time, then the current time is used. The time is applied to all the configured metrics. + The format of the time is configured via ``. Supported time formats are: + * `unix_s` - unix timestamp in seconds. + * `unix_ms` - unix timestamp in milliseconds. + * `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds. + * `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`. + * `custom:` - custom layout for the timestamp. The `` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse). + +Each request to `/api/v1/import/csv` may contain arbitrary number of CSV lines. + +Example for importing CSV data via `/api/v1/import/csv`: + +```bash +curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' +curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' +``` + +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: + +```bash +curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}' +``` + +The following response should be returned: +```bash +{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]} +{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]} +{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]} +{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]} +``` + +Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args. +For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines. + +Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. + + +#### How to import data in Prometheus exposition format + +VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) +via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics: + +```bash +curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus' +``` + +The following command may be used for verifying the imported data: + +```bash +curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}' +``` + +It should return something like the following: + +``` +{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]} +``` + +Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. +For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. + +If timestamp is missing in ` ` Prometheus exposition format line, then the current timestamp is used during data ingestion. +It can be overriden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`. + +VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. + +Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. + +VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). + + + ### Relabeling VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 0ed6484b0..fa859959d 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -25,7 +25,8 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did * Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents). * Prometheus remote write protocol via `http://:8429/api/v1/write`. - * JSON lines import protocol via `http://:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data). + * JSON lines import protocol via `http://:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-json-line-format). + * Native data import protocol via `http://:8429/api/v1/import/native`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-native-format). * Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details. * Arbitrary CSV data via `http://:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data). * Can replicate collected metrics simultaneously to multiple remote storage systems. diff --git a/app/vmagent/main.go b/app/vmagent/main.go index e4d1c905b..473a5fc89 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport" @@ -171,6 +172,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true + case "/api/v1/import/native": + nativeimportRequests.Inc() + if err := native.InsertHandler(r); err != nil { + nativeimportErrors.Inc() + httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true case "/write", "/api/v2/write": influxWriteRequests.Inc() if err := influx.InsertHandlerForHTTP(r); err != nil { @@ -213,6 +223,9 @@ var ( prometheusimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) prometheusimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) + nativeimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`) + nativeimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`) + influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`) influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`) diff --git a/app/vmagent/native/request_handler.go b/app/vmagent/native/request_handler.go new file mode 100644 index 000000000..bda1220d4 --- /dev/null +++ b/app/vmagent/native/request_handler.go @@ -0,0 +1,82 @@ +package native + +import ( + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="native"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="native"}`) +) + +// InsertHandler processes `/api/v1/import` request. +// +// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 +func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } + return writeconcurrencylimiter.Do(func() error { + return parser.ParseStream(req, func(block *parser.Block) error { + return insertRows(block, extraLabels) + }) + }) +} + +func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { + ctx := common.GetPushCtx() + defer common.PutPushCtx(ctx) + + tssDst := ctx.WriteRequest.Timeseries[:0] + labels := ctx.Labels[:0] + samples := ctx.Samples[:0] + mn := &block.MetricName + labelsLen := len(labels) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: bytesutil.ToUnsafeString(mn.MetricGroup), + }) + for j := range mn.Tags { + tag := &mn.Tags[j] + labels = append(labels, prompbmarshal.Label{ + Name: bytesutil.ToUnsafeString(tag.Key), + Value: bytesutil.ToUnsafeString(tag.Value), + }) + } + labels = append(labels, extraLabels...) + values := block.Values + timestamps := block.Timestamps + if len(timestamps) != len(values) { + logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values)) + } + samplesLen := len(samples) + for j, value := range values { + samples = append(samples, prompbmarshal.Sample{ + Value: value, + Timestamp: timestamps[j], + }) + } + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[samplesLen:], + }) + rowsTotal := len(values) + ctx.WriteRequest.Timeseries = tssDst + ctx.Labels = labels + ctx.Samples = samples + remotewrite.Push(&ctx.WriteRequest) + rowsInserted.Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) + return nil +} diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index a02fde0d9..572ce6682 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -6,6 +6,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" @@ -54,7 +55,9 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { labels = append(labels, extraLabels...) values := r.Values timestamps := r.Timestamps - _ = timestamps[len(values)-1] + if len(timestamps) != len(values) { + logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values)) + } samplesLen := len(samples) for j, value := range values { samples = append(samples, prompbmarshal.Sample{ diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 274f10e0f..b4639e077 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport" @@ -125,6 +126,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true + case "/api/v1/import/native": + nativeimportRequests.Inc() + if err := native.InsertHandler(r); err != nil { + nativeimportErrors.Inc() + httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true case "/write", "/api/v2/write": influxWriteRequests.Inc() if err := influx.InsertHandlerForHTTP(r); err != nil { @@ -169,6 +179,9 @@ var ( prometheusimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) prometheusimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) + nativeimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`) + nativeimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`) + influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`) influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`) diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go new file mode 100644 index 000000000..c991d3891 --- /dev/null +++ b/app/vminsert/native/request_handler.go @@ -0,0 +1,113 @@ +package native + +import ( + "net/http" + "runtime" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="native"}`) + rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="native"}`) +) + +// InsertHandler processes `/api/v1/import/native` request. +func InsertHandler(req *http.Request) error { + extraLabels, err := parserCommon.GetExtraLabels(req) + if err != nil { + return err + } + return writeconcurrencylimiter.Do(func() error { + return parser.ParseStream(req, func(block *parser.Block) error { + return insertRows(block, extraLabels) + }) + }) +} + +func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { + ctx := getPushCtx() + defer putPushCtx(ctx) + + rowsLen := len(block.Values) + ic := &ctx.Common + ic.Reset(rowsLen) + hasRelabeling := relabel.HasRelabeling() + mn := &block.MetricName + ic.Labels = ic.Labels[:0] + ic.AddLabelBytes(nil, mn.MetricGroup) + for j := range mn.Tags { + tag := &mn.Tags[j] + ic.AddLabelBytes(tag.Key, tag.Value) + } + for j := range extraLabels { + label := &extraLabels[j] + ic.AddLabel(label.Name, label.Value) + } + if hasRelabeling { + ic.ApplyRelabeling() + } + if len(ic.Labels) == 0 { + // Skip metric without labels. + return nil + } + ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels) + values := block.Values + timestamps := block.Timestamps + if len(timestamps) != len(values) { + logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values)) + } + for j, value := range values { + timestamp := timestamps[j] + if err := ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value); err != nil { + return err + } + } + rowsTotal := len(values) + rowsInserted.Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) + return ic.FlushBufs() +} + +type pushCtx struct { + Common common.InsertCtx + metricNameBuf []byte +} + +func (ctx *pushCtx) reset() { + ctx.Common.Reset(0) + ctx.metricNameBuf = ctx.metricNameBuf[:0] +} + +func getPushCtx() *pushCtx { + select { + case ctx := <-pushCtxPoolCh: + return ctx + default: + if v := pushCtxPool.Get(); v != nil { + return v.(*pushCtx) + } + return &pushCtx{} + } +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + select { + case pushCtxPoolCh <- ctx: + default: + pushCtxPool.Put(ctx) + } +} + +var pushCtxPool sync.Pool +var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 305149a58..a1dfd3375 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -203,6 +203,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return true } return true + case "/api/v1/export/native": + exportNativeRequests.Inc() + if err := prometheus.ExportNativeHandler(startTime, w, r); err != nil { + exportNativeErrors.Inc() + httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err) + return true + } + return true case "/federate": federateRequests.Inc() if err := prometheus.FederateHandler(startTime, w, r); err != nil { @@ -321,6 +329,9 @@ var ( exportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/export"}`) exportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/export"}`) + exportNativeRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/export/native"}`) + exportNativeErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/export/native"}`) + federateRequests = metrics.NewCounter(`vm_http_requests_total{path="/federate"}`) federateErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/federate"}`) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 6cfa861f2..9f89cc029 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -8,11 +8,11 @@ import ( "runtime" "sort" "sync" + "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -390,29 +390,9 @@ func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, br storage.BlockRef, tr if err := tmpBlock.UnmarshalData(); err != nil { return fmt.Errorf("cannot unmarshal block: %w", err) } - timestamps := tmpBlock.Timestamps() - - // Skip timestamps smaller than tr.MinTimestamp. - i := 0 - for i < len(timestamps) && timestamps[i] < tr.MinTimestamp { - i++ - } - - // Skip timestamps bigger than tr.MaxTimestamp. - j := len(timestamps) - for j > i && timestamps[j-1] > tr.MaxTimestamp { - j-- - } - skippedRows := tmpBlock.RowsCount() - (j - i) + sb.Timestamps, sb.Values = tmpBlock.AppendRowsWithTimeRangeFilter(sb.Timestamps[:0], sb.Values[:0], tr) + skippedRows := tmpBlock.RowsCount() - len(sb.Timestamps) metricRowsSkipped.Add(skippedRows) - - // Copy the remaining values. - if i == j { - return nil - } - values := tmpBlock.Values() - sb.Timestamps = append(sb.Timestamps, timestamps[i:j]...) - sb.Values = decimal.AppendDecimalToFloat(sb.Values, values[i:j], tmpBlock.Scale()) return nil } @@ -581,7 +561,116 @@ func putStorageSearch(sr *storage.Search) { var ssPool sync.Pool -// ProcessSearchQuery performs sq on storage nodes until the given deadline. +// ExportBlocks searches for time series matching sq and calls f for each found block. +// +// f is called in parallel from multiple goroutines. +// the process is stopped if f return non-nil error. +// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block. +// It is the responsibility of f to filter blocks according to the given tr. +func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error { + if deadline.Exceeded() { + return fmt.Errorf("timeout exceeded before starting data export: %s", deadline.String()) + } + tfss, err := setupTfss(sq.TagFilterss) + if err != nil { + return err + } + tr := storage.TimeRange{ + MinTimestamp: sq.MinTimestamp, + MaxTimestamp: sq.MaxTimestamp, + } + if err := vmstorage.CheckTimeRange(tr); err != nil { + return err + } + + vmstorage.WG.Add(1) + defer vmstorage.WG.Done() + + sr := getStorageSearch() + defer putStorageSearch(sr) + sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline()) + + // Start workers that call f in parallel on available CPU cores. + gomaxprocs := runtime.GOMAXPROCS(-1) + workCh := make(chan *exportWork, gomaxprocs*8) + var ( + errGlobal error + errGlobalLock sync.Mutex + mustStop uint32 + ) + var wg sync.WaitGroup + wg.Add(gomaxprocs) + for i := 0; i < gomaxprocs; i++ { + go func() { + defer wg.Done() + for xw := range workCh { + if err := f(&xw.mn, &xw.b, tr); err != nil { + errGlobalLock.Lock() + if errGlobal != nil { + errGlobal = err + atomic.StoreUint32(&mustStop, 1) + } + errGlobalLock.Unlock() + } + xw.reset() + exportWorkPool.Put(xw) + } + }() + } + + // Feed workers with work + blocksRead := 0 + for sr.NextMetricBlock() { + blocksRead++ + if deadline.Exceeded() { + return fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String()) + } + if atomic.LoadUint32(&mustStop) != 0 { + break + } + xw := exportWorkPool.Get().(*exportWork) + if err := xw.mn.Unmarshal(sr.MetricBlockRef.MetricName); err != nil { + return fmt.Errorf("cannot unmarshal metricName for block #%d: %w", blocksRead, err) + } + sr.MetricBlockRef.BlockRef.MustReadBlock(&xw.b, true) + workCh <- xw + } + close(workCh) + + // Wait for workers to finish. + wg.Wait() + + // Check errors. + err = sr.Error() + if err == nil { + err = errGlobal + } + if err != nil { + if errors.Is(err, storage.ErrDeadlineExceeded) { + return fmt.Errorf("timeout exceeded during the query: %s", deadline.String()) + } + return fmt.Errorf("search error after reading %d data blocks: %w", blocksRead, err) + } + return nil +} + +type exportWork struct { + mn storage.MetricName + b storage.Block +} + +func (xw *exportWork) reset() { + xw.mn.Reset() + xw.b.Reset() +} + +var exportWorkPool = &sync.Pool{ + New: func() interface{} { + return &exportWork{} + }, +} + +// ProcessSearchQuery performs sq until the given deadline. // // Results.RunParallel or Results.Cancel must be called on the returned Results. func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline searchutils.Deadline) (*Results, error) { diff --git a/app/vmselect/prometheus/export.qtpl b/app/vmselect/prometheus/export.qtpl index 70e0cdee1..990120e40 100644 --- a/app/vmselect/prometheus/export.qtpl +++ b/app/vmselect/prometheus/export.qtpl @@ -1,30 +1,29 @@ {% import ( "github.com/valyala/quicktemplate" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) %} {% stripspace %} -{% func ExportPrometheusLine(rs *netstorage.Result) %} - {% if len(rs.Timestamps) == 0 %}{% return %}{% endif %} +{% func ExportPrometheusLine(xb *exportBlock) %} + {% if len(xb.timestamps) == 0 %}{% return %}{% endif %} {% code bb := quicktemplate.AcquireByteBuffer() %} - {% code writeprometheusMetricName(bb, &rs.MetricName) %} - {% for i, ts := range rs.Timestamps %} + {% code writeprometheusMetricName(bb, xb.mn) %} + {% for i, ts := range xb.timestamps %} {%z= bb.B %}{% space %} - {%f= rs.Values[i] %}{% space %} + {%f= xb.values[i] %}{% space %} {%dl= ts %}{% newline %} {% endfor %} {% code quicktemplate.ReleaseByteBuffer(bb) %} {% endfunc %} -{% func ExportJSONLine(rs *netstorage.Result) %} - {% if len(rs.Timestamps) == 0 %}{% return %}{% endif %} +{% func ExportJSONLine(xb *exportBlock) %} + {% if len(xb.timestamps) == 0 %}{% return %}{% endif %} { - "metric":{%= metricNameObject(&rs.MetricName) %}, + "metric":{%= metricNameObject(xb.mn) %}, "values":[ - {% if len(rs.Values) > 0 %} - {% code values := rs.Values %} + {% if len(xb.values) > 0 %} + {% code values := xb.values %} {%f= values[0] %} {% code values = values[1:] %} {% for _, v := range values %} @@ -33,8 +32,8 @@ {% endif %} ], "timestamps":[ - {% if len(rs.Timestamps) > 0 %} - {% code timestamps := rs.Timestamps %} + {% if len(xb.timestamps) > 0 %} + {% code timestamps := xb.timestamps %} {%dl= timestamps[0] %} {% code timestamps = timestamps[1:] %} {% for _, ts := range timestamps %} @@ -45,10 +44,10 @@ }{% newline %} {% endfunc %} -{% func ExportPromAPILine(rs *netstorage.Result) %} +{% func ExportPromAPILine(xb *exportBlock) %} { - "metric": {%= metricNameObject(&rs.MetricName) %}, - "values": {%= valuesWithTimestamps(rs.Values, rs.Timestamps) %} + "metric": {%= metricNameObject(xb.mn) %}, + "values": {%= valuesWithTimestamps(xb.values, xb.timestamps) %} } {% endfunc %} diff --git a/app/vmselect/prometheus/export.qtpl.go b/app/vmselect/prometheus/export.qtpl.go index b44bde06e..2082b9020 100644 --- a/app/vmselect/prometheus/export.qtpl.go +++ b/app/vmselect/prometheus/export.qtpl.go @@ -6,380 +6,379 @@ package prometheus //line app/vmselect/prometheus/export.qtpl:1 import ( - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/valyala/quicktemplate" ) -//line app/vmselect/prometheus/export.qtpl:9 +//line app/vmselect/prometheus/export.qtpl:8 import ( qtio422016 "io" qt422016 "github.com/valyala/quicktemplate" ) -//line app/vmselect/prometheus/export.qtpl:9 +//line app/vmselect/prometheus/export.qtpl:8 var ( _ = qtio422016.Copy _ = qt422016.AcquireByteBuffer ) +//line app/vmselect/prometheus/export.qtpl:8 +func StreamExportPrometheusLine(qw422016 *qt422016.Writer, xb *exportBlock) { +//line app/vmselect/prometheus/export.qtpl:9 + if len(xb.timestamps) == 0 { //line app/vmselect/prometheus/export.qtpl:9 -func StreamExportPrometheusLine(qw422016 *qt422016.Writer, rs *netstorage.Result) { -//line app/vmselect/prometheus/export.qtpl:10 - if len(rs.Timestamps) == 0 { -//line app/vmselect/prometheus/export.qtpl:10 return -//line app/vmselect/prometheus/export.qtpl:10 +//line app/vmselect/prometheus/export.qtpl:9 } -//line app/vmselect/prometheus/export.qtpl:11 +//line app/vmselect/prometheus/export.qtpl:10 bb := quicktemplate.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:12 - writeprometheusMetricName(bb, &rs.MetricName) +//line app/vmselect/prometheus/export.qtpl:11 + writeprometheusMetricName(bb, xb.mn) +//line app/vmselect/prometheus/export.qtpl:12 + for i, ts := range xb.timestamps { //line app/vmselect/prometheus/export.qtpl:13 - for i, ts := range rs.Timestamps { -//line app/vmselect/prometheus/export.qtpl:14 qw422016.N().Z(bb.B) +//line app/vmselect/prometheus/export.qtpl:13 + qw422016.N().S(` `) +//line app/vmselect/prometheus/export.qtpl:14 + qw422016.N().F(xb.values[i]) //line app/vmselect/prometheus/export.qtpl:14 qw422016.N().S(` `) //line app/vmselect/prometheus/export.qtpl:15 - qw422016.N().F(rs.Values[i]) -//line app/vmselect/prometheus/export.qtpl:15 - qw422016.N().S(` `) -//line app/vmselect/prometheus/export.qtpl:16 qw422016.N().DL(ts) -//line app/vmselect/prometheus/export.qtpl:16 +//line app/vmselect/prometheus/export.qtpl:15 qw422016.N().S(` `) -//line app/vmselect/prometheus/export.qtpl:17 +//line app/vmselect/prometheus/export.qtpl:16 } -//line app/vmselect/prometheus/export.qtpl:18 +//line app/vmselect/prometheus/export.qtpl:17 quicktemplate.ReleaseByteBuffer(bb) -//line app/vmselect/prometheus/export.qtpl:19 +//line app/vmselect/prometheus/export.qtpl:18 } -//line app/vmselect/prometheus/export.qtpl:19 -func WriteExportPrometheusLine(qq422016 qtio422016.Writer, rs *netstorage.Result) { -//line app/vmselect/prometheus/export.qtpl:19 +//line app/vmselect/prometheus/export.qtpl:18 +func WriteExportPrometheusLine(qq422016 qtio422016.Writer, xb *exportBlock) { +//line app/vmselect/prometheus/export.qtpl:18 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:19 - StreamExportPrometheusLine(qw422016, rs) -//line app/vmselect/prometheus/export.qtpl:19 +//line app/vmselect/prometheus/export.qtpl:18 + StreamExportPrometheusLine(qw422016, xb) +//line app/vmselect/prometheus/export.qtpl:18 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:19 +//line app/vmselect/prometheus/export.qtpl:18 } -//line app/vmselect/prometheus/export.qtpl:19 -func ExportPrometheusLine(rs *netstorage.Result) string { -//line app/vmselect/prometheus/export.qtpl:19 +//line app/vmselect/prometheus/export.qtpl:18 +func ExportPrometheusLine(xb *exportBlock) string { +//line app/vmselect/prometheus/export.qtpl:18 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:19 - WriteExportPrometheusLine(qb422016, rs) -//line app/vmselect/prometheus/export.qtpl:19 +//line app/vmselect/prometheus/export.qtpl:18 + WriteExportPrometheusLine(qb422016, xb) +//line app/vmselect/prometheus/export.qtpl:18 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:19 +//line app/vmselect/prometheus/export.qtpl:18 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:19 +//line app/vmselect/prometheus/export.qtpl:18 return qs422016 -//line app/vmselect/prometheus/export.qtpl:19 +//line app/vmselect/prometheus/export.qtpl:18 } +//line app/vmselect/prometheus/export.qtpl:20 +func StreamExportJSONLine(qw422016 *qt422016.Writer, xb *exportBlock) { +//line app/vmselect/prometheus/export.qtpl:21 + if len(xb.timestamps) == 0 { //line app/vmselect/prometheus/export.qtpl:21 -func StreamExportJSONLine(qw422016 *qt422016.Writer, rs *netstorage.Result) { -//line app/vmselect/prometheus/export.qtpl:22 - if len(rs.Timestamps) == 0 { -//line app/vmselect/prometheus/export.qtpl:22 return -//line app/vmselect/prometheus/export.qtpl:22 +//line app/vmselect/prometheus/export.qtpl:21 } -//line app/vmselect/prometheus/export.qtpl:22 +//line app/vmselect/prometheus/export.qtpl:21 qw422016.N().S(`{"metric":`) -//line app/vmselect/prometheus/export.qtpl:24 - streammetricNameObject(qw422016, &rs.MetricName) -//line app/vmselect/prometheus/export.qtpl:24 +//line app/vmselect/prometheus/export.qtpl:23 + streammetricNameObject(qw422016, xb.mn) +//line app/vmselect/prometheus/export.qtpl:23 qw422016.N().S(`,"values":[`) +//line app/vmselect/prometheus/export.qtpl:25 + if len(xb.values) > 0 { //line app/vmselect/prometheus/export.qtpl:26 - if len(rs.Values) > 0 { -//line app/vmselect/prometheus/export.qtpl:27 - values := rs.Values + values := xb.values -//line app/vmselect/prometheus/export.qtpl:28 +//line app/vmselect/prometheus/export.qtpl:27 qw422016.N().F(values[0]) -//line app/vmselect/prometheus/export.qtpl:29 +//line app/vmselect/prometheus/export.qtpl:28 values = values[1:] -//line app/vmselect/prometheus/export.qtpl:30 +//line app/vmselect/prometheus/export.qtpl:29 for _, v := range values { -//line app/vmselect/prometheus/export.qtpl:30 +//line app/vmselect/prometheus/export.qtpl:29 qw422016.N().S(`,`) -//line app/vmselect/prometheus/export.qtpl:31 +//line app/vmselect/prometheus/export.qtpl:30 qw422016.N().F(v) -//line app/vmselect/prometheus/export.qtpl:32 +//line app/vmselect/prometheus/export.qtpl:31 } -//line app/vmselect/prometheus/export.qtpl:33 +//line app/vmselect/prometheus/export.qtpl:32 } -//line app/vmselect/prometheus/export.qtpl:33 +//line app/vmselect/prometheus/export.qtpl:32 qw422016.N().S(`],"timestamps":[`) +//line app/vmselect/prometheus/export.qtpl:35 + if len(xb.timestamps) > 0 { //line app/vmselect/prometheus/export.qtpl:36 - if len(rs.Timestamps) > 0 { -//line app/vmselect/prometheus/export.qtpl:37 - timestamps := rs.Timestamps + timestamps := xb.timestamps -//line app/vmselect/prometheus/export.qtpl:38 +//line app/vmselect/prometheus/export.qtpl:37 qw422016.N().DL(timestamps[0]) -//line app/vmselect/prometheus/export.qtpl:39 +//line app/vmselect/prometheus/export.qtpl:38 timestamps = timestamps[1:] -//line app/vmselect/prometheus/export.qtpl:40 +//line app/vmselect/prometheus/export.qtpl:39 for _, ts := range timestamps { -//line app/vmselect/prometheus/export.qtpl:40 +//line app/vmselect/prometheus/export.qtpl:39 qw422016.N().S(`,`) -//line app/vmselect/prometheus/export.qtpl:41 +//line app/vmselect/prometheus/export.qtpl:40 qw422016.N().DL(ts) -//line app/vmselect/prometheus/export.qtpl:42 +//line app/vmselect/prometheus/export.qtpl:41 } -//line app/vmselect/prometheus/export.qtpl:43 +//line app/vmselect/prometheus/export.qtpl:42 } -//line app/vmselect/prometheus/export.qtpl:43 +//line app/vmselect/prometheus/export.qtpl:42 qw422016.N().S(`]}`) -//line app/vmselect/prometheus/export.qtpl:45 +//line app/vmselect/prometheus/export.qtpl:44 qw422016.N().S(` `) -//line app/vmselect/prometheus/export.qtpl:46 +//line app/vmselect/prometheus/export.qtpl:45 } -//line app/vmselect/prometheus/export.qtpl:46 -func WriteExportJSONLine(qq422016 qtio422016.Writer, rs *netstorage.Result) { -//line app/vmselect/prometheus/export.qtpl:46 +//line app/vmselect/prometheus/export.qtpl:45 +func WriteExportJSONLine(qq422016 qtio422016.Writer, xb *exportBlock) { +//line app/vmselect/prometheus/export.qtpl:45 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:46 - StreamExportJSONLine(qw422016, rs) -//line app/vmselect/prometheus/export.qtpl:46 +//line app/vmselect/prometheus/export.qtpl:45 + StreamExportJSONLine(qw422016, xb) +//line app/vmselect/prometheus/export.qtpl:45 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:46 +//line app/vmselect/prometheus/export.qtpl:45 } -//line app/vmselect/prometheus/export.qtpl:46 -func ExportJSONLine(rs *netstorage.Result) string { -//line app/vmselect/prometheus/export.qtpl:46 +//line app/vmselect/prometheus/export.qtpl:45 +func ExportJSONLine(xb *exportBlock) string { +//line app/vmselect/prometheus/export.qtpl:45 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:46 - WriteExportJSONLine(qb422016, rs) -//line app/vmselect/prometheus/export.qtpl:46 +//line app/vmselect/prometheus/export.qtpl:45 + WriteExportJSONLine(qb422016, xb) +//line app/vmselect/prometheus/export.qtpl:45 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:46 +//line app/vmselect/prometheus/export.qtpl:45 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:46 +//line app/vmselect/prometheus/export.qtpl:45 return qs422016 -//line app/vmselect/prometheus/export.qtpl:46 +//line app/vmselect/prometheus/export.qtpl:45 } -//line app/vmselect/prometheus/export.qtpl:48 -func StreamExportPromAPILine(qw422016 *qt422016.Writer, rs *netstorage.Result) { -//line app/vmselect/prometheus/export.qtpl:48 +//line app/vmselect/prometheus/export.qtpl:47 +func StreamExportPromAPILine(qw422016 *qt422016.Writer, xb *exportBlock) { +//line app/vmselect/prometheus/export.qtpl:47 qw422016.N().S(`{"metric":`) -//line app/vmselect/prometheus/export.qtpl:50 - streammetricNameObject(qw422016, &rs.MetricName) -//line app/vmselect/prometheus/export.qtpl:50 +//line app/vmselect/prometheus/export.qtpl:49 + streammetricNameObject(qw422016, xb.mn) +//line app/vmselect/prometheus/export.qtpl:49 qw422016.N().S(`,"values":`) -//line app/vmselect/prometheus/export.qtpl:51 - streamvaluesWithTimestamps(qw422016, rs.Values, rs.Timestamps) -//line app/vmselect/prometheus/export.qtpl:51 +//line app/vmselect/prometheus/export.qtpl:50 + streamvaluesWithTimestamps(qw422016, xb.values, xb.timestamps) +//line app/vmselect/prometheus/export.qtpl:50 qw422016.N().S(`}`) -//line app/vmselect/prometheus/export.qtpl:53 +//line app/vmselect/prometheus/export.qtpl:52 } -//line app/vmselect/prometheus/export.qtpl:53 -func WriteExportPromAPILine(qq422016 qtio422016.Writer, rs *netstorage.Result) { -//line app/vmselect/prometheus/export.qtpl:53 +//line app/vmselect/prometheus/export.qtpl:52 +func WriteExportPromAPILine(qq422016 qtio422016.Writer, xb *exportBlock) { +//line app/vmselect/prometheus/export.qtpl:52 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:53 - StreamExportPromAPILine(qw422016, rs) -//line app/vmselect/prometheus/export.qtpl:53 +//line app/vmselect/prometheus/export.qtpl:52 + StreamExportPromAPILine(qw422016, xb) +//line app/vmselect/prometheus/export.qtpl:52 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:53 +//line app/vmselect/prometheus/export.qtpl:52 } -//line app/vmselect/prometheus/export.qtpl:53 -func ExportPromAPILine(rs *netstorage.Result) string { -//line app/vmselect/prometheus/export.qtpl:53 +//line app/vmselect/prometheus/export.qtpl:52 +func ExportPromAPILine(xb *exportBlock) string { +//line app/vmselect/prometheus/export.qtpl:52 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:53 - WriteExportPromAPILine(qb422016, rs) -//line app/vmselect/prometheus/export.qtpl:53 +//line app/vmselect/prometheus/export.qtpl:52 + WriteExportPromAPILine(qb422016, xb) +//line app/vmselect/prometheus/export.qtpl:52 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:53 +//line app/vmselect/prometheus/export.qtpl:52 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:53 +//line app/vmselect/prometheus/export.qtpl:52 return qs422016 -//line app/vmselect/prometheus/export.qtpl:53 +//line app/vmselect/prometheus/export.qtpl:52 } -//line app/vmselect/prometheus/export.qtpl:55 +//line app/vmselect/prometheus/export.qtpl:54 func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) { -//line app/vmselect/prometheus/export.qtpl:55 +//line app/vmselect/prometheus/export.qtpl:54 qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`) -//line app/vmselect/prometheus/export.qtpl:61 +//line app/vmselect/prometheus/export.qtpl:60 bb, ok := <-resultsCh -//line app/vmselect/prometheus/export.qtpl:62 +//line app/vmselect/prometheus/export.qtpl:61 if ok { -//line app/vmselect/prometheus/export.qtpl:63 +//line app/vmselect/prometheus/export.qtpl:62 qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/export.qtpl:64 +//line app/vmselect/prometheus/export.qtpl:63 quicktemplate.ReleaseByteBuffer(bb) -//line app/vmselect/prometheus/export.qtpl:65 +//line app/vmselect/prometheus/export.qtpl:64 for bb := range resultsCh { -//line app/vmselect/prometheus/export.qtpl:65 +//line app/vmselect/prometheus/export.qtpl:64 qw422016.N().S(`,`) -//line app/vmselect/prometheus/export.qtpl:66 +//line app/vmselect/prometheus/export.qtpl:65 qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/export.qtpl:67 +//line app/vmselect/prometheus/export.qtpl:66 quicktemplate.ReleaseByteBuffer(bb) -//line app/vmselect/prometheus/export.qtpl:68 +//line app/vmselect/prometheus/export.qtpl:67 } -//line app/vmselect/prometheus/export.qtpl:69 +//line app/vmselect/prometheus/export.qtpl:68 } -//line app/vmselect/prometheus/export.qtpl:69 +//line app/vmselect/prometheus/export.qtpl:68 qw422016.N().S(`]}}`) -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 } -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) { -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 StreamExportPromAPIResponse(qw422016, resultsCh) -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 } -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string { -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 WriteExportPromAPIResponse(qb422016, resultsCh) -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 return qs422016 -//line app/vmselect/prometheus/export.qtpl:73 +//line app/vmselect/prometheus/export.qtpl:72 } -//line app/vmselect/prometheus/export.qtpl:75 +//line app/vmselect/prometheus/export.qtpl:74 func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) { -//line app/vmselect/prometheus/export.qtpl:76 +//line app/vmselect/prometheus/export.qtpl:75 for bb := range resultsCh { -//line app/vmselect/prometheus/export.qtpl:77 +//line app/vmselect/prometheus/export.qtpl:76 qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/export.qtpl:78 +//line app/vmselect/prometheus/export.qtpl:77 quicktemplate.ReleaseByteBuffer(bb) -//line app/vmselect/prometheus/export.qtpl:79 +//line app/vmselect/prometheus/export.qtpl:78 } -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 } -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) { -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 StreamExportStdResponse(qw422016, resultsCh) -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 } -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string { -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 WriteExportStdResponse(qb422016, resultsCh) -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 return qs422016 -//line app/vmselect/prometheus/export.qtpl:80 +//line app/vmselect/prometheus/export.qtpl:79 } -//line app/vmselect/prometheus/export.qtpl:82 +//line app/vmselect/prometheus/export.qtpl:81 func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) { -//line app/vmselect/prometheus/export.qtpl:83 +//line app/vmselect/prometheus/export.qtpl:82 qw422016.N().Z(mn.MetricGroup) -//line app/vmselect/prometheus/export.qtpl:84 +//line app/vmselect/prometheus/export.qtpl:83 if len(mn.Tags) > 0 { -//line app/vmselect/prometheus/export.qtpl:84 +//line app/vmselect/prometheus/export.qtpl:83 qw422016.N().S(`{`) -//line app/vmselect/prometheus/export.qtpl:86 +//line app/vmselect/prometheus/export.qtpl:85 tags := mn.Tags -//line app/vmselect/prometheus/export.qtpl:87 +//line app/vmselect/prometheus/export.qtpl:86 qw422016.N().Z(tags[0].Key) -//line app/vmselect/prometheus/export.qtpl:87 +//line app/vmselect/prometheus/export.qtpl:86 qw422016.N().S(`=`) -//line app/vmselect/prometheus/export.qtpl:87 +//line app/vmselect/prometheus/export.qtpl:86 qw422016.N().QZ(tags[0].Value) -//line app/vmselect/prometheus/export.qtpl:88 +//line app/vmselect/prometheus/export.qtpl:87 tags = tags[1:] -//line app/vmselect/prometheus/export.qtpl:89 +//line app/vmselect/prometheus/export.qtpl:88 for i := range tags { -//line app/vmselect/prometheus/export.qtpl:90 +//line app/vmselect/prometheus/export.qtpl:89 tag := &tags[i] -//line app/vmselect/prometheus/export.qtpl:90 +//line app/vmselect/prometheus/export.qtpl:89 qw422016.N().S(`,`) -//line app/vmselect/prometheus/export.qtpl:91 +//line app/vmselect/prometheus/export.qtpl:90 qw422016.N().Z(tag.Key) -//line app/vmselect/prometheus/export.qtpl:91 +//line app/vmselect/prometheus/export.qtpl:90 qw422016.N().S(`=`) -//line app/vmselect/prometheus/export.qtpl:91 +//line app/vmselect/prometheus/export.qtpl:90 qw422016.N().QZ(tag.Value) -//line app/vmselect/prometheus/export.qtpl:92 +//line app/vmselect/prometheus/export.qtpl:91 } -//line app/vmselect/prometheus/export.qtpl:92 +//line app/vmselect/prometheus/export.qtpl:91 qw422016.N().S(`}`) -//line app/vmselect/prometheus/export.qtpl:94 +//line app/vmselect/prometheus/export.qtpl:93 } -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 } -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) { -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 streamprometheusMetricName(qw422016, mn) -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 } -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 func prometheusMetricName(mn *storage.MetricName) string { -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 writeprometheusMetricName(qb422016, mn) -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 return qs422016 -//line app/vmselect/prometheus/export.qtpl:95 +//line app/vmselect/prometheus/export.qtpl:94 } diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index ab074e9f1..8a7499910 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -1,6 +1,7 @@ package prometheus import ( + "bufio" "flag" "fmt" "math" @@ -14,6 +15,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -41,6 +44,11 @@ var ( // Default step used if not set. const defaultStep = 5 * 60 * 1000 +// Buffer size for big responses (i.e. /federate and /api/v1/export/* ) +// By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses. +// These buffers may result in visible overhead for responses exceeding tens of megabytes. +const bigResponseBufferSize = 128 * 1024 + // FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { ct := startTime.UnixNano() / 1e6 @@ -97,10 +105,12 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request }() w.Header().Set("Content-Type", "text/plain") + bw := bufio.NewWriterSize(w, bigResponseBufferSize) for bb := range resultsCh { - w.Write(bb.B) + bw.Write(bb.B) quicktemplate.ReleaseByteBuffer(bb) } + _ = bw.Flush() err = <-doneCh if err != nil { @@ -112,6 +122,85 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`) +// ExportNativeHandler exports data in native format from /api/v1/export/native. +func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { + ct := startTime.UnixNano() / 1e6 + if err := r.ParseForm(); err != nil { + return fmt.Errorf("cannot parse request form values: %w", err) + } + matches := r.Form["match[]"] + if len(matches) == 0 { + // Maintain backwards compatibility + match := r.FormValue("match") + if len(match) == 0 { + return fmt.Errorf("missing `match[]` arg") + } + matches = []string{match} + } + start, err := searchutils.GetTime(r, "start", 0) + if err != nil { + return err + } + end, err := searchutils.GetTime(r, "end", ct) + if err != nil { + return err + } + deadline := searchutils.GetDeadlineForExport(r, startTime) + tagFilterss, err := getTagFilterssFromMatches(matches) + if err != nil { + return err + } + sq := &storage.SearchQuery{ + MinTimestamp: start, + MaxTimestamp: end, + TagFilterss: tagFilterss, + } + w.Header().Set("Content-Type", "VictoriaMetrics/native") + bw := bufio.NewWriterSize(w, bigResponseBufferSize) + + // Marshal tr + trBuf := make([]byte, 0, 16) + trBuf = encoding.MarshalInt64(trBuf, start) + trBuf = encoding.MarshalInt64(trBuf, end) + bw.Write(trBuf) + + var bwLock sync.Mutex + err = netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + dstBuf := bbPool.Get() + tmpBuf := bbPool.Get() + dst := dstBuf.B + tmp := tmpBuf.B + + // Marshal mn + tmp = mn.Marshal(tmp[:0]) + dst = encoding.MarshalUint32(dst, uint32(len(tmp))) + dst = append(dst, tmp...) + + // Marshal b + tmp = b.MarshalPortable(tmp[:0]) + dst = encoding.MarshalUint32(dst, uint32(len(tmp))) + dst = append(dst, tmp...) + + tmpBuf.B = tmp + bbPool.Put(tmpBuf) + + bwLock.Lock() + _, err := bw.Write(dst) + bwLock.Unlock() + if err != nil { + return fmt.Errorf("cannot write data to client: %w", err) + } + + dstBuf.B = dst + bbPool.Put(dstBuf) + return nil + }) + _ = bw.Flush() + return err +} + +var bbPool bytesutil.ByteBufferPool + // ExportHandler exports data in raw format from /api/v1/export. func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { ct := startTime.UnixNano() / 1e6 @@ -137,11 +226,12 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) } format := r.FormValue("format") maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line"))) + reduceMemUsage := searchutils.GetBool(r, "reduce_mem_usage") deadline := searchutils.GetDeadlineForExport(r, startTime) if start >= end { end = start + defaultStep } - if err := exportHandler(w, matches, start, end, format, maxRowsPerLine, deadline); err != nil { + if err := exportHandler(w, matches, start, end, format, maxRowsPerLine, reduceMemUsage, deadline); err != nil { return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %w", matches, start, end, err) } exportDuration.UpdateDuration(startTime) @@ -150,17 +240,34 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) -func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, deadline searchutils.Deadline) error { +func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, reduceMemUsage bool, deadline searchutils.Deadline) error { writeResponseFunc := WriteExportStdResponse - writeLineFunc := func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { + writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { bb := quicktemplate.AcquireByteBuffer() - WriteExportJSONLine(bb, rs) + WriteExportJSONLine(bb, xb) resultsCh <- bb } + contentType := "application/stream+json" + if format == "prometheus" { + contentType = "text/plain" + writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { + bb := quicktemplate.AcquireByteBuffer() + WriteExportPrometheusLine(bb, xb) + resultsCh <- bb + } + } else if format == "promapi" { + writeResponseFunc = WriteExportPromAPIResponse + writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { + bb := quicktemplate.AcquireByteBuffer() + WriteExportPromAPILine(bb, xb) + resultsCh <- bb + } + } if maxRowsPerLine > 0 { - writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { - valuesOrig := rs.Values - timestampsOrig := rs.Timestamps + writeLineFuncOrig := writeLineFunc + writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { + valuesOrig := xb.values + timestampsOrig := xb.timestamps values := valuesOrig timestamps := timestampsOrig for len(values) > 0 { @@ -177,30 +284,12 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo values = nil timestamps = nil } - rs.Values = valuesChunk - rs.Timestamps = timestampsChunk - bb := quicktemplate.AcquireByteBuffer() - WriteExportJSONLine(bb, rs) - resultsCh <- bb + xb.values = valuesChunk + xb.timestamps = timestampsChunk + writeLineFuncOrig(xb, resultsCh) } - rs.Values = valuesOrig - rs.Timestamps = timestampsOrig - } - } - contentType := "application/stream+json" - if format == "prometheus" { - contentType = "text/plain" - writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { - bb := quicktemplate.AcquireByteBuffer() - WriteExportPrometheusLine(bb, rs) - resultsCh <- bb - } - } else if format == "promapi" { - writeResponseFunc = WriteExportPromAPIResponse - writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { - bb := quicktemplate.AcquireByteBuffer() - WriteExportPromAPILine(bb, rs) - resultsCh <- bb + xb.values = valuesOrig + xb.timestamps = timestampsOrig } } @@ -213,23 +302,52 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo MaxTimestamp: end, TagFilterss: tagFilterss, } - rss, err := netstorage.ProcessSearchQuery(sq, true, deadline) - if err != nil { - return fmt.Errorf("cannot fetch data for %q: %w", sq, err) - } - resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) doneCh := make(chan error) - go func() { - err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { - writeLineFunc(rs, resultsCh) - }) - close(resultsCh) - doneCh <- err - }() + + if !reduceMemUsage { + rss, err := netstorage.ProcessSearchQuery(sq, true, deadline) + if err != nil { + return fmt.Errorf("cannot fetch data for %q: %w", sq, err) + } + go func() { + err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { + xb := exportBlockPool.Get().(*exportBlock) + xb.mn = &rs.MetricName + xb.timestamps = rs.Timestamps + xb.values = rs.Values + writeLineFunc(xb, resultsCh) + xb.reset() + exportBlockPool.Put(xb) + }) + close(resultsCh) + doneCh <- err + }() + } else { + go func() { + err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + if err := b.UnmarshalData(); err != nil { + return fmt.Errorf("cannot unmarshal block during export: %s", err) + } + xb := exportBlockPool.Get().(*exportBlock) + xb.mn = mn + xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr) + if len(xb.timestamps) > 0 { + writeLineFunc(xb, resultsCh) + } + xb.reset() + exportBlockPool.Put(xb) + return nil + }) + close(resultsCh) + doneCh <- err + }() + } w.Header().Set("Content-Type", contentType) - writeResponseFunc(w, resultsCh) + bw := bufio.NewWriterSize(w, bigResponseBufferSize) + writeResponseFunc(bw, resultsCh) + _ = bw.Flush() // Consume all the data from resultsCh in the event writeResponseFunc // fails to consume all the data. @@ -243,6 +361,24 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo return nil } +type exportBlock struct { + mn *storage.MetricName + timestamps []int64 + values []float64 +} + +func (xb *exportBlock) reset() { + xb.mn = nil + xb.timestamps = xb.timestamps[:0] + xb.values = xb.values[:0] +} + +var exportBlockPool = &sync.Pool{ + New: func() interface{} { + return &exportBlock{} + }, +} + // DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request. // // See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series @@ -662,7 +798,7 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e start -= offset end := start start = end - window - if err := exportHandler(w, []string{childQuery}, start, end, "promapi", 0, deadline); err != nil { + if err := exportHandler(w, []string{childQuery}, start, end, "promapi", 0, false, deadline); err != nil { return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %w", childQuery, start, end, err) } queryDuration.UpdateDuration(startTime) diff --git a/docs/FAQ.md b/docs/FAQ.md index 81f71d6ab..dcc46cc30 100644 --- a/docs/FAQ.md +++ b/docs/FAQ.md @@ -91,7 +91,7 @@ The main differences between Cortex and VictoriaMetrics: VictoriaMetrics may lose only a few seconds of recent data, which isn't synced to persistent storage yet. See [this article for details](https://medium.com/@valyala/wal-usage-looks-broken-in-modern-time-series-databases-b62a627ab704). - Cortex is usually slower and requires more CPU and RAM than VictoriaMetrics. See [this talk from Adidas at PromCon 2019](https://promcon.io/2019-munich/talks/remote-write-storage-wars/) and [other case studies](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies). -- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV. +- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV, JSON, native binary. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details. @@ -112,7 +112,7 @@ The main differences between Cortex and VictoriaMetrics: - Thanos may be harder to setup and operate comparing to VictoriaMetrics, since it has more moving parts, which can be connected with less reliable networks. See [this article for details](https://medium.com/faun/comparing-thanos-to-victoriametrics-cluster-b193bea1683). - Thanos is usually slower and requires more CPU and RAM than VictoriaMetrics. See [this talk from Adidas at PromCon 2019](https://promcon.io/2019-munich/talks/remote-write-storage-wars/). -- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV. +- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV, JSON, native binary. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details. @@ -120,7 +120,7 @@ The main differences between Cortex and VictoriaMetrics: - VictoriaMetrics requires [10x less RAM](https://medium.com/@valyala/insert-benchmarks-with-inch-influxdb-vs-victoriametrics-e31a41ae2893) and it [works faster](https://medium.com/@valyala/measuring-vertical-scalability-for-time-series-databases-in-google-cloud-92550d78d8ae). - VictoriaMetrics provides [better query language](https://medium.com/@valyala/promql-tutorial-for-beginners-9ab455142085) than InfluxQL or Flux. -- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to InfluxDB - Prometheus remote_write, OpenTSDB, Graphite, CSV. +- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to InfluxDB - Prometheus remote_write, OpenTSDB, Graphite, CSV, JSON, native binary. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index f46d550d8..ba1eb3558 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -77,7 +77,8 @@ See [features available for enterprise customers](https://github.com/VictoriaMet if `-graphiteListenAddr` is set. * [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set. * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set. - * [/api/v1/import](#how-to-import-time-series-data). + * [JSON line format](#how-to-import-data-in-json-line-format). + * [Native binary format](#how-to-import-data-in-native-format). * [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format). * [Arbitrary CSV data](#how-to-import-csv-data). * Supports metrics' relabeling. See [these docs](#relabeling) for details. @@ -100,8 +101,6 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) * [Querying Graphite data](#querying-graphite-data) * [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents) -* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format) -* [How to import CSV data](#how-to-import-csv-data) * [Prometheus querying API usage](#prometheus-querying-api-usage) * [Prometheus querying API enhancements](#prometheus-querying-api-enhancements) * [Graphite Metrics API usage](#graphite-metrics-api-usage) @@ -117,7 +116,13 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [How to delete time series](#how-to-delete-time-series) * [Forced merge](#forced-merge) * [How to export time series](#how-to-export-time-series) + * [How to export data in native format](#how-to-export-data-in-native-format) + * [How to export data in JSON line format](#how-to-export-data-in-json-line-format) * [How to import time series data](#how-to-import-time-series-data) + * [How to import data in native format](#how-to-import-data-in-native-format) + * [How to import data in json line format](#how-to-import-data-in-json-line-format) + * [How to import CSV data](#how-to-import-csv-data) + * [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format) * [Relabeling](#relabeling) * [Federation](#federation) * [Capacity planning](#capacity-planning) @@ -345,7 +350,7 @@ curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'ht ``` An arbitrary number of lines delimited by '\n' (aka newline char) may be sent in a single request. -After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: ```bash curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}' @@ -381,7 +386,7 @@ echo "foo.bar.baz;tag1=value1;tag2=value2 123 `date +%s`" | nc -N localhost 2003 VictoriaMetrics sets the current time if the timestamp is omitted. An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go. -After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: ```bash curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz' @@ -425,7 +430,7 @@ echo "put foo.bar.baz `date +%s` 123 tag1=value1 tag2=value2" | nc -N localhost ``` An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go. -After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: ```bash curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz' @@ -460,7 +465,7 @@ Example for writing multiple data points in a single request: curl -H 'Content-Type: application/json' -d '[{"metric":"foo","value":45.34},{"metric":"bar","value":43}]' http://localhost:4242/api/put ``` -After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: ```bash curl -G 'http://localhost:8428/api/v1/export' -d 'match[]=x.y.z' -d 'match[]=foo' -d 'match[]=bar' @@ -475,91 +480,6 @@ The `/api/v1/export` endpoint should return the following response: ``` -### How to import CSV data - -Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg. -The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon: - -``` -:: -``` - -* `` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary. -* `` describes the column type. Supported types are: - * `metric` - the corresponding CSV column at `` contains metric value, which must be integer or floating-point number. - The metric name is read from the ``. CSV line must have at least a single metric field. Multiple metric fields per CSV line is OK. - * `label` - the corresponding CSV column at `` contains label value. The label name is read from the ``. - CSV line may have arbitrary number of label fields. All these labels are attached to all the configured metrics. - * `time` - the corresponding CSV column at `` contains metric time. CSV line may contain either one or zero columns with time. - If CSV line has no time, then the current time is used. The time is applied to all the configured metrics. - The format of the time is configured via ``. Supported time formats are: - * `unix_s` - unix timestamp in seconds. - * `unix_ms` - unix timestamp in milliseconds. - * `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds. - * `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`. - * `custom:` - custom layout for the timestamp. The `` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse). - -Each request to `/api/v1/import/csv` may contain arbitrary number of CSV lines. - -Example for importing CSV data via `/api/v1/import/csv`: - -```bash -curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' -curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' -``` - -After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: - -```bash -curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}' -``` - -The following response should be returned: -```bash -{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]} -{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]} -{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]} -{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]} -``` - -Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args. -For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines. - -Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. - - -### How to import data in Prometheus exposition format - -VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) -via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics: - -```bash -curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus' -``` - -The following command may be used for verifying the imported data: - -```bash -curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}' -``` - -It should return something like the following: - -``` -{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]} -``` - -Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. -For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. - -If timestamp is missing in ` ` Prometheus exposition format line, then the current timestamp is used during data ingestion. -It can be overriden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`. - -VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. - -VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). - - ### Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): @@ -756,11 +676,35 @@ when new data is ingested into it. ### How to export time series -Send a request to `http://:8428/api/v1/export?match[]=`, +VictoriaMetrics provides the following handlers for exporting data: + +* `/api/v1/export/native` for exporting data in native binary format. This is the most efficient format for data export. + See [these docs](#how-to-export-data-in-native-format) for details. +* `/api/v1/export` for exporing data in JSON line format. See [these docs](#how-to-export-data-in-json-line-format) for details. + + +#### How to export data in native format + +Send a request to `http://:8428/api/v1/export/native?match[]=`, +where `` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) +for metrics to export. Use `{__name__!=""}` selector for fetching all the time series. + +Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either +unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values. + +The exported data can be imported to VictoriaMetrics via [/api/v1/import/native](#how-to-import-data-in-native-format). + + +#### How to export data in JSON line format + +Consider [exporting data in native format](#how-to-export-data-in-native-format) if big amounts of data must be migrated between VictoriaMetrics instances, +since exporting in native format usually consumes lower amounts of CPU and memory resources, while the resulting exported data occupies lower amounts of disk space. + +In order to export data in JSON line format, send a request to `http://:8428/api/v1/export?match[]=`, where `` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for metrics to export. Use `{__name__!=""}` selector for fetching all the time series. The response would contain all the data for the selected time series in [JSON streaming format](https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON). -Each JSON line would contain data for a single time series. An example output: +Each JSON line contains samples for a single time series. An example output: ```jsonl {"metric":{"__name__":"up","job":"node_exporter","instance":"localhost:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]} @@ -770,8 +714,9 @@ Each JSON line would contain data for a single time series. An example output: Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values. -Optional `max_rows_per_line` arg may be added to the request in order to limit the maximum number of rows exported per each JSON line. -By default each JSON line contains all the rows for a single time series. +Optional `max_rows_per_line` arg may be added to the request for limiting the maximum number of rows exported per each JSON line. +Optional `reduce_mem_usage=1` arg may be added to the request for reducing memory usage when exporting big number of time series. +In this case the output may contain multiple lines with distinct samples for the same time series. Pass `Accept-Encoding: gzip` HTTP header in the request to `/api/v1/export` in order to reduce network bandwidth during exporing big amounts of time series data. This enables gzip compression for the exported data. Example for exporting gzipped data: @@ -782,22 +727,58 @@ curl -H 'Accept-Encoding: gzip' http://localhost:8428/api/v1/export -d 'match[]= The maximum duration for each request to `/api/v1/export` is limited by `-search.maxExportDuration` command-line flag. -Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-time-series-data). +Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-data-in-json-line-format). + ### How to import time series data Time series data can be imported via any supported ingestion protocol: -* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) -* [Influx line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) -* [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) -* [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol) -* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests) -* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series). -* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details. -* `/api/v1/import/prometheus` http POST handler, which accepts data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. +* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). +* Influx line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. +* Graphite plaintext protocol. See[these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details. +* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details. +* OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details. +* `/api/v1/import` for importing data obtained from [/api/v1/export](#how-to-export-data-in-json-line-format). + See [these docs](##how-to-import-data-in-json-line-format) for details. +* `/api/v1/import/native` for importing data obtained from [/api/v1/export/native](#how-to-export-data-in-native-format). + See [these docs](#how-to-import-data-in-native-format) for details. +* `/api/v1/import/csv` for importing arbitrary CSV data. See [these docs](#how-to-import-csv-data) for details. +* `/api/v1/import/prometheus` for importing data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. -The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`: + +#### How to import data in native format + +The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import/native`. +Example for importing data obtained via [/api/v1/export/native](#how-to-export-data-in-native-format): + +```bash +# Export the data from : +curl http://source-victoriametrics:8428/api/v1/export/native -d 'match={__name__!=""}' > exported_data.bin + +# Import the data to : +curl -X POST http://destination-victoriametrics:8428/api/v1/import/native -T exported_data.bin +``` + +Pass `Content-Encoding: gzip` HTTP request header to `/api/v1/import/native` for importing gzipped data: + +```bash +# Export gzipped data from : +curl -H 'Accept-Encoding: gzip' http://source-victoriametrics:8428/api/v1/export/native -d 'match={__name__!=""}' > exported_data.bin.gz + +# Import gzipped data to : +curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428/api/v1/import/native -T exported_data.bin.gz +``` + +Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args. +For example, `/api/v1/import/native?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported time series. + +Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. + + +#### How to import data in JSON line format + +Example for importing data obtained via [/api/v1/export](#how-to-export-data-in-json-line-format): ```bash # Export the data from : @@ -823,6 +804,94 @@ For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. +#### How to import CSV data + +Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg. +The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon: + +``` +:: +``` + +* `` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary. +* `` describes the column type. Supported types are: + * `metric` - the corresponding CSV column at `` contains metric value, which must be integer or floating-point number. + The metric name is read from the ``. CSV line must have at least a single metric field. Multiple metric fields per CSV line is OK. + * `label` - the corresponding CSV column at `` contains label value. The label name is read from the ``. + CSV line may have arbitrary number of label fields. All these labels are attached to all the configured metrics. + * `time` - the corresponding CSV column at `` contains metric time. CSV line may contain either one or zero columns with time. + If CSV line has no time, then the current time is used. The time is applied to all the configured metrics. + The format of the time is configured via ``. Supported time formats are: + * `unix_s` - unix timestamp in seconds. + * `unix_ms` - unix timestamp in milliseconds. + * `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds. + * `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`. + * `custom:` - custom layout for the timestamp. The `` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse). + +Each request to `/api/v1/import/csv` may contain arbitrary number of CSV lines. + +Example for importing CSV data via `/api/v1/import/csv`: + +```bash +curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' +curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' +``` + +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: + +```bash +curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}' +``` + +The following response should be returned: +```bash +{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]} +{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]} +{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]} +{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]} +``` + +Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args. +For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines. + +Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. + + +#### How to import data in Prometheus exposition format + +VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) +via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics: + +```bash +curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus' +``` + +The following command may be used for verifying the imported data: + +```bash +curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}' +``` + +It should return something like the following: + +``` +{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]} +``` + +Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args. +For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics. + +If timestamp is missing in ` ` Prometheus exposition format line, then the current timestamp is used during data ingestion. +It can be overriden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`. + +VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. + +Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. + +VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). + + + ### Relabeling VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points diff --git a/docs/vmagent.md b/docs/vmagent.md index 0ed6484b0..fa859959d 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -25,7 +25,8 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did * Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents). * Prometheus remote write protocol via `http://:8429/api/v1/write`. - * JSON lines import protocol via `http://:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data). + * JSON lines import protocol via `http://:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-json-line-format). + * Native data import protocol via `http://:8429/api/v1/import/native`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-native-format). * Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details. * Arbitrary CSV data via `http://:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data). * Can replicate collected metrics simultaneously to multiple remote storage systems. diff --git a/lib/protoparser/native/streamparser.go b/lib/protoparser/native/streamparser.go new file mode 100644 index 000000000..cc3bcb18f --- /dev/null +++ b/lib/protoparser/native/streamparser.go @@ -0,0 +1,197 @@ +package native + +import ( + "bufio" + "fmt" + "io" + "net/http" + "runtime" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/metrics" +) + +// ParseStream parses /api/v1/import/native lines from req and calls callback for parsed blocks. +// +// The callback can be called multiple times for streamed data from req. +// +// callback shouldn't hold block after returning. +// callback can be called in parallel from multiple concurrent goroutines. +func ParseStream(req *http.Request, callback func(block *Block) error) error { + r := req.Body + if req.Header.Get("Content-Encoding") == "gzip" { + zr, err := common.GetGzipReader(r) + if err != nil { + return fmt.Errorf("cannot read gzipped vmimport data: %w", err) + } + defer common.PutGzipReader(zr) + r = zr + } + // By default req.Body uses 4Kb buffer. This size is too small for typical request to /api/v1/import/native, + // so use slightly bigger buffer in order to reduce read syscall overhead. + br := bufio.NewReaderSize(r, 1024*1024) + + // Read time range (tr) + trBuf := make([]byte, 16) + var tr storage.TimeRange + if _, err := io.ReadFull(br, trBuf); err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read time range: %w", err) + } + tr.MinTimestamp = encoding.UnmarshalInt64(trBuf) + tr.MaxTimestamp = encoding.UnmarshalInt64(trBuf[8:]) + + // Start GOMAXPROC workers in order to process ingested data in parallel. + gomaxprocs := runtime.GOMAXPROCS(-1) + workCh := make(chan *unmarshalWork, 8*gomaxprocs) + var wg sync.WaitGroup + defer func() { + close(workCh) + wg.Wait() + }() + wg.Add(gomaxprocs) + for i := 0; i < gomaxprocs; i++ { + go func() { + defer wg.Done() + var tmpBlock storage.Block + for uw := range workCh { + if err := uw.unmarshal(&tmpBlock, tr); err != nil { + parseErrors.Inc() + logger.Errorf("error when unmarshaling native block: %s", err) + putUnmarshalWork(uw) + continue + } + if err := callback(&uw.block); err != nil { + processErrors.Inc() + logger.Errorf("error when processing native block: %s", err) + putUnmarshalWork(uw) + continue + } + putUnmarshalWork(uw) + } + }() + } + + // Read native blocks and feed workers with work. + sizeBuf := make([]byte, 4) + for { + uw := getUnmarshalWork() + + // Read uw.metricNameBuf + if _, err := io.ReadFull(br, sizeBuf); err != nil { + if err == io.EOF { + // End of stream + return nil + } + readErrors.Inc() + return fmt.Errorf("cannot read metricName size: %w", err) + } + readCalls.Inc() + bufSize := encoding.UnmarshalUint32(sizeBuf) + if bufSize > 1024*1024 { + parseErrors.Inc() + return fmt.Errorf("too big metricName size; got %d; shouldn't exceed %d", bufSize, 1024*1024) + } + uw.metricNameBuf = bytesutil.Resize(uw.metricNameBuf, int(bufSize)) + if _, err := io.ReadFull(br, uw.metricNameBuf); err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read metricName with size %d bytes: %w", bufSize, err) + } + readCalls.Inc() + + // Read uw.blockBuf + if _, err := io.ReadFull(br, sizeBuf); err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read native block size: %w", err) + } + readCalls.Inc() + bufSize = encoding.UnmarshalUint32(sizeBuf) + if bufSize > 1024*1024 { + parseErrors.Inc() + return fmt.Errorf("too big native block size; got %d; shouldn't exceed %d", bufSize, 1024*1024) + } + uw.blockBuf = bytesutil.Resize(uw.blockBuf, int(bufSize)) + if _, err := io.ReadFull(br, uw.blockBuf); err != nil { + readErrors.Inc() + return fmt.Errorf("cannot read native block with size %d bytes: %w", bufSize, err) + } + readCalls.Inc() + blocksRead.Inc() + + // Feed workers with work. + workCh <- uw + } +} + +// Block is a single block from `/api/v1/import/native` request. +type Block struct { + MetricName storage.MetricName + Values []float64 + Timestamps []int64 +} + +func (b *Block) reset() { + b.MetricName.Reset() + b.Values = b.Values[:0] + b.Timestamps = b.Timestamps[:0] +} + +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="native"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="native"}`) + rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="native"}`) + blocksRead = metrics.NewCounter(`vm_protoparser_blocks_read_total{type="native"}`) + + parseErrors = metrics.NewCounter(`vm_protoparser_parse_errors_total{type="native"}`) + processErrors = metrics.NewCounter(`vm_protoparser_process_errors_total{type="native"}`) +) + +type unmarshalWork struct { + tr storage.TimeRange + metricNameBuf []byte + blockBuf []byte + block Block +} + +func (uw *unmarshalWork) reset() { + uw.metricNameBuf = uw.metricNameBuf[:0] + uw.blockBuf = uw.blockBuf[:0] + uw.block.reset() +} + +func (uw *unmarshalWork) unmarshal(tmpBlock *storage.Block, tr storage.TimeRange) error { + block := &uw.block + if err := block.MetricName.Unmarshal(uw.metricNameBuf); err != nil { + return fmt.Errorf("cannot unmarshal metricName from %d bytes: %w", len(uw.metricNameBuf), err) + } + tail, err := tmpBlock.UnmarshalPortable(uw.blockBuf) + if err != nil { + return fmt.Errorf("cannot unmarshal native block from %d bytes: %w", len(uw.blockBuf), err) + } + if len(tail) > 0 { + return fmt.Errorf("unexpected non-empty tail left after unmarshaling native block from %d bytes; len(tail)=%d bytes", len(uw.blockBuf), len(tail)) + } + block.Timestamps, block.Values = tmpBlock.AppendRowsWithTimeRangeFilter(block.Timestamps[:0], block.Values[:0], tr) + rowsRead.Add(len(block.Timestamps)) + return nil +} + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool diff --git a/lib/storage/block.go b/lib/storage/block.go index ee8e158fe..311677674 100644 --- a/lib/storage/block.go +++ b/lib/storage/block.go @@ -2,9 +2,11 @@ package storage import ( "fmt" + "math" "sync" "sync/atomic" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -86,21 +88,6 @@ func (b *Block) RowsCount() int { return int(b.bh.RowsCount) } -// Values returns b values. -func (b *Block) Values() []int64 { - return b.values -} - -// Timestamps returns b timestamps. -func (b *Block) Timestamps() []int64 { - return b.timestamps -} - -// Scale returns the decimal scale used for encoding values in the block. -func (b *Block) Scale() int16 { - return b.bh.Scale -} - // Init initializes b with the given tsid, timestamps, values and scale. func (b *Block) Init(tsid *TSID, timestamps, values []int64, scale int16, precisionBits uint8) { b.Reset() @@ -289,3 +276,123 @@ func (b *Block) UnmarshalData() error { return nil } + +// AppendRowsWithTimeRangeFilter filters samples from b according to tr and appends them to dst*. +// +// It is expected that UnmarshalData has been already called on b. +func (b *Block) AppendRowsWithTimeRangeFilter(dstTimestamps []int64, dstValues []float64, tr TimeRange) ([]int64, []float64) { + timestamps, values := b.filterTimestamps(tr) + dstTimestamps = append(dstTimestamps, timestamps...) + dstValues = decimal.AppendDecimalToFloat(dstValues, values, b.bh.Scale) + return dstTimestamps, dstValues +} + +func (b *Block) filterTimestamps(tr TimeRange) ([]int64, []int64) { + timestamps := b.timestamps + + // Skip timestamps smaller than tr.MinTimestamp. + i := 0 + for i < len(timestamps) && timestamps[i] < tr.MinTimestamp { + i++ + } + + // Skip timestamps bigger than tr.MaxTimestamp. + j := len(timestamps) + for j > i && timestamps[j-1] > tr.MaxTimestamp { + j-- + } + + if i == j { + return nil, nil + } + return timestamps[i:j], b.values[i:j] +} + +// MarshalPortable marshals b to dst, so it could be portably migrated to other VictoriaMetrics instance. +// +// The marshaled value must be unmarshaled with UnmarshalPortable function. +func (b *Block) MarshalPortable(dst []byte) []byte { + b.MarshalData(0, 0) + + dst = encoding.MarshalVarInt64(dst, b.bh.MinTimestamp) + dst = encoding.MarshalVarInt64(dst, b.bh.FirstValue) + dst = encoding.MarshalVarUint64(dst, uint64(b.bh.RowsCount)) + dst = encoding.MarshalVarInt64(dst, int64(b.bh.Scale)) + dst = append(dst, byte(b.bh.TimestampsMarshalType)) + dst = append(dst, byte(b.bh.ValuesMarshalType)) + dst = encoding.MarshalBytes(dst, b.timestampsData) + dst = encoding.MarshalBytes(dst, b.valuesData) + + return dst +} + +// UnmarshalPortable unmarshals block from src to b and returns the remaining tail. +// +// It is assumed that the block has been marshaled with MarshalPortable. +func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) { + b.Reset() + + // Read header + src, firstTimestamp, err := encoding.UnmarshalVarInt64(src) + if err != nil { + return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err) + } + b.bh.MinTimestamp = firstTimestamp + src, firstValue, err := encoding.UnmarshalVarInt64(src) + if err != nil { + return src, fmt.Errorf("cannot unmarshal firstValue: %w", err) + } + b.bh.FirstValue = firstValue + src, rowsCount, err := encoding.UnmarshalVarUint64(src) + if err != nil { + return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err) + } + if rowsCount > math.MaxUint32 { + return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, math.MaxUint32) + } + b.bh.RowsCount = uint32(rowsCount) + src, scale, err := encoding.UnmarshalVarInt64(src) + if err != nil { + return src, fmt.Errorf("cannot unmarshal scale: %w", err) + } + if scale < math.MinInt16 { + return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16) + } + if scale > math.MaxInt16 { + return src, fmt.Errorf("got too big scale=%d; it mustn't exceeed %d", scale, math.MaxInt16) + } + b.bh.Scale = int16(scale) + if len(src) < 1 { + return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1) + } + b.bh.TimestampsMarshalType = encoding.MarshalType(src[0]) + src = src[1:] + if len(src) < 1 { + return src, fmt.Errorf("cannot unmarshal marshalType for values from %d bytes; need at least %d bytes", len(src), 1) + } + b.bh.ValuesMarshalType = encoding.MarshalType(src[0]) + src = src[1:] + b.bh.PrecisionBits = 64 + + // Read data + src, timestampsData, err := encoding.UnmarshalBytes(src) + if err != nil { + return src, fmt.Errorf("cannot read timestampsData: %w", err) + } + b.timestampsData = append(b.timestampsData[:0], timestampsData...) + src, valuesData, err := encoding.UnmarshalBytes(src) + if err != nil { + return src, fmt.Errorf("cannot read valuesData: %w", err) + } + b.valuesData = append(b.valuesData[:0], valuesData...) + + // Validate + if err := b.bh.validate(); err != nil { + return src, fmt.Errorf("invalid blockHeader: %w", err) + } + if err := b.UnmarshalData(); err != nil { + return src, fmt.Errorf("invalid data: %w", err) + } + + return src, nil +} diff --git a/lib/storage/block_test.go b/lib/storage/block_test.go new file mode 100644 index 000000000..586995b86 --- /dev/null +++ b/lib/storage/block_test.go @@ -0,0 +1,116 @@ +package storage + +import ( + "math/rand" + "reflect" + "strings" + "testing" +) + +func TestBlockMarshalUnmarshalPortable(t *testing.T) { + var b Block + for i := 0; i < 1000; i++ { + b.Reset() + rowsCount := rand.Intn(maxRowsPerBlock) + 1 + b.timestamps = getRandTimestamps(rowsCount) + b.values = getRandValues(rowsCount) + b.bh.Scale = int16(rand.Intn(30) - 15) + b.bh.PrecisionBits = 64 + testBlockMarshalUnmarshalPortable(t, &b) + } +} + +func testBlockMarshalUnmarshalPortable(t *testing.T, b *Block) { + var b1, b2 Block + b1.CopyFrom(b) + rowsCount := len(b.values) + data := b1.MarshalPortable(nil) + if b1.bh.RowsCount != uint32(rowsCount) { + t.Fatalf("unexpected number of rows marshaled; got %d; want %d", b1.bh.RowsCount, rowsCount) + } + tail, err := b2.UnmarshalPortable(data) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if len(tail) > 0 { + t.Fatalf("unexpected non-empty tail: %X", tail) + } + compareBlocksPortable(t, &b2, b, &b1.bh) + + // Verify non-empty prefix and suffix + prefix := "prefix" + suffix := "suffix" + data = append(data[:0], prefix...) + data = b1.MarshalPortable(data) + if b1.bh.RowsCount != uint32(rowsCount) { + t.Fatalf("unexpected number of rows marshaled; got %d; want %d", b1.bh.RowsCount, rowsCount) + } + if !strings.HasPrefix(string(data), prefix) { + t.Fatalf("unexpected prefix in %X; want %X", data, prefix) + } + data = data[len(prefix):] + data = append(data, suffix...) + tail, err = b2.UnmarshalPortable(data) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if string(tail) != suffix { + t.Fatalf("unexpected tail; got %X; want %X", tail, suffix) + } + compareBlocksPortable(t, &b2, b, &b1.bh) +} + +func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader) { + t.Helper() + if b1.bh.MinTimestamp != bhExpected.MinTimestamp { + t.Fatalf("unexpected MinTimestamp; got %d; want %d", b1.bh.MinTimestamp, bhExpected.MinTimestamp) + } + if b1.bh.FirstValue != bhExpected.FirstValue { + t.Fatalf("unexpected FirstValue; got %d; want %d", b1.bh.FirstValue, bhExpected.FirstValue) + } + if b1.bh.RowsCount != bhExpected.RowsCount { + t.Fatalf("unexpected RowsCount; got %d; want %d", b1.bh.RowsCount, bhExpected.RowsCount) + } + if b1.bh.Scale != bhExpected.Scale { + t.Fatalf("unexpected Scale; got %d; want %d", b1.bh.Scale, bhExpected.Scale) + } + if b1.bh.TimestampsMarshalType != bhExpected.TimestampsMarshalType { + t.Fatalf("unexpected TimestampsMarshalType; got %d; want %d", b1.bh.TimestampsMarshalType, bhExpected.TimestampsMarshalType) + } + if b1.bh.ValuesMarshalType != bhExpected.ValuesMarshalType { + t.Fatalf("unexpected ValuesMarshalType; got %d; want %d", b1.bh.ValuesMarshalType, bhExpected.ValuesMarshalType) + } + if b1.bh.PrecisionBits != bhExpected.PrecisionBits { + t.Fatalf("unexpected PrecisionBits; got %d; want %d", b1.bh.PrecisionBits, bhExpected.PrecisionBits) + } + if !reflect.DeepEqual(b1.values, b2.values) { + t.Fatalf("unexpected values; got %d; want %d", b1.values, b2.values) + } + if !reflect.DeepEqual(b1.timestamps, b2.timestamps) { + t.Fatalf("unexpected timestamps; got %d; want %d", b1.timestamps, b2.timestamps) + } + if len(b1.values) != int(bhExpected.RowsCount) { + t.Fatalf("unexpected number of values; got %d; want %d", len(b1.values), bhExpected.RowsCount) + } + if len(b1.timestamps) != int(bhExpected.RowsCount) { + t.Fatalf("unexpected number of timestamps; got %d; want %d", len(b1.timestamps), bhExpected.RowsCount) + } +} + +func getRandValues(rowsCount int) []int64 { + a := make([]int64, rowsCount) + for i := 0; i < rowsCount; i++ { + a[i] = int64(rand.Intn(1e5) - 0.5e5) + } + return a +} + +func getRandTimestamps(rowsCount int) []int64 { + a := make([]int64, rowsCount) + ts := int64(rand.Intn(1e12)) + for i := 0; i < rowsCount; i++ { + a[i] = ts + ts += int64(rand.Intn(1e5)) + } + return a +}