all: add native format for data export and import

The data can be exported via [/api/v1/export/native](https://victoriametrics.github.io/#how-to-export-data-in-native-format) handler
and imported via [/api/v1/import/native](https://victoriametrics.github.io/#how-to-import-data-in-native-format) handler.
This commit is contained in:
Aliaksandr Valialkin 2020-09-26 04:29:45 +03:00
parent b4bf722d8f
commit 95688cbfc5
18 changed files with 1523 additions and 505 deletions

281
README.md
View File

@ -77,7 +77,8 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
if `-graphiteListenAddr` is set. if `-graphiteListenAddr` is set.
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set. * [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set.
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set. * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
* [/api/v1/import](#how-to-import-time-series-data). * [JSON line format](#how-to-import-data-in-json-line-format).
* [Native binary format](#how-to-import-data-in-native-format).
* [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format). * [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format).
* [Arbitrary CSV data](#how-to-import-csv-data). * [Arbitrary CSV data](#how-to-import-csv-data).
* Supports metrics' relabeling. See [these docs](#relabeling) for details. * Supports metrics' relabeling. See [these docs](#relabeling) for details.
@ -100,8 +101,6 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) * [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd)
* [Querying Graphite data](#querying-graphite-data) * [Querying Graphite data](#querying-graphite-data)
* [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents) * [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents)
* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format)
* [How to import CSV data](#how-to-import-csv-data)
* [Prometheus querying API usage](#prometheus-querying-api-usage) * [Prometheus querying API usage](#prometheus-querying-api-usage)
* [Prometheus querying API enhancements](#prometheus-querying-api-enhancements) * [Prometheus querying API enhancements](#prometheus-querying-api-enhancements)
* [Graphite Metrics API usage](#graphite-metrics-api-usage) * [Graphite Metrics API usage](#graphite-metrics-api-usage)
@ -117,7 +116,13 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to delete time series](#how-to-delete-time-series) * [How to delete time series](#how-to-delete-time-series)
* [Forced merge](#forced-merge) * [Forced merge](#forced-merge)
* [How to export time series](#how-to-export-time-series) * [How to export time series](#how-to-export-time-series)
* [How to export data in native format](#how-to-export-data-in-native-format)
* [How to export data in JSON line format](#how-to-export-data-in-json-line-format)
* [How to import time series data](#how-to-import-time-series-data) * [How to import time series data](#how-to-import-time-series-data)
* [How to import data in native format](#how-to-import-data-in-native-format)
* [How to import data in json line format](#how-to-import-data-in-json-line-format)
* [How to import CSV data](#how-to-import-csv-data)
* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format)
* [Relabeling](#relabeling) * [Relabeling](#relabeling)
* [Federation](#federation) * [Federation](#federation)
* [Capacity planning](#capacity-planning) * [Capacity planning](#capacity-planning)
@ -345,7 +350,7 @@ curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'ht
``` ```
An arbitrary number of lines delimited by '\n' (aka newline char) may be sent in a single request. An arbitrary number of lines delimited by '\n' (aka newline char) may be sent in a single request.
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash ```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}' curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}'
@ -381,7 +386,7 @@ echo "foo.bar.baz;tag1=value1;tag2=value2 123 `date +%s`" | nc -N localhost 2003
VictoriaMetrics sets the current time if the timestamp is omitted. VictoriaMetrics sets the current time if the timestamp is omitted.
An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go. An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash ```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz' curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
@ -425,7 +430,7 @@ echo "put foo.bar.baz `date +%s` 123 tag1=value1 tag2=value2" | nc -N localhost
``` ```
An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go. An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash ```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz' curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
@ -460,7 +465,7 @@ Example for writing multiple data points in a single request:
curl -H 'Content-Type: application/json' -d '[{"metric":"foo","value":45.34},{"metric":"bar","value":43}]' http://localhost:4242/api/put curl -H 'Content-Type: application/json' -d '[{"metric":"foo","value":45.34},{"metric":"bar","value":43}]' http://localhost:4242/api/put
``` ```
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash ```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]=x.y.z' -d 'match[]=foo' -d 'match[]=bar' curl -G 'http://localhost:8428/api/v1/export' -d 'match[]=x.y.z' -d 'match[]=foo' -d 'match[]=bar'
@ -475,91 +480,6 @@ The `/api/v1/export` endpoint should return the following response:
``` ```
### How to import CSV data
Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg.
The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon:
```
<column_pos>:<type>:<context>
```
* `<column_pos>` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary.
* `<type>` describes the column type. Supported types are:
* `metric` - the corresponding CSV column at `<column_pos>` contains metric value, which must be integer or floating-point number.
The metric name is read from the `<context>`. CSV line must have at least a single metric field. Multiple metric fields per CSV line is OK.
* `label` - the corresponding CSV column at `<column_pos>` contains label value. The label name is read from the `<context>`.
CSV line may have arbitrary number of label fields. All these labels are attached to all the configured metrics.
* `time` - the corresponding CSV column at `<column_pos>` contains metric time. CSV line may contain either one or zero columns with time.
If CSV line has no time, then the current time is used. The time is applied to all the configured metrics.
The format of the time is configured via `<context>`. Supported time formats are:
* `unix_s` - unix timestamp in seconds.
* `unix_ms` - unix timestamp in milliseconds.
* `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds.
* `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`.
* `custom:<layout>` - custom layout for the timestamp. The `<layout>` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse).
Each request to `/api/v1/import/csv` may contain arbitrary number of CSV lines.
Example for importing CSV data via `/api/v1/import/csv`:
```bash
curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
```
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}'
```
The following response should be returned:
```bash
{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]}
{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]}
{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]}
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
```
Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args.
For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
### How to import data in Prometheus exposition format
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics:
```bash
curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus'
```
The following command may be used for verifying the imported data:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}'
```
It should return something like the following:
```
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
```
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
It can be overriden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`.
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
### Prometheus querying API usage ### Prometheus querying API usage
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
@ -756,11 +676,35 @@ when new data is ingested into it.
### How to export time series ### How to export time series
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`, VictoriaMetrics provides the following handlers for exporting data:
* `/api/v1/export/native` for exporting data in native binary format. This is the most efficient format for data export.
See [these docs](#how-to-export-data-in-native-format) for details.
* `/api/v1/export` for exporing data in JSON line format. See [these docs](#how-to-export-data-in-json-line-format) for details.
#### How to export data in native format
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export/native?match[]=<timeseries_selector_for_export>`,
where `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
for metrics to export. Use `{__name__!=""}` selector for fetching all the time series.
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
The exported data can be imported to VictoriaMetrics via [/api/v1/import/native](#how-to-import-data-in-native-format).
#### How to export data in JSON line format
Consider [exporting data in native format](#how-to-export-data-in-native-format) if big amounts of data must be migrated between VictoriaMetrics instances,
since exporting in native format usually consumes lower amounts of CPU and memory resources, while the resulting exported data occupies lower amounts of disk space.
In order to export data in JSON line format, send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`,
where `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) where `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
for metrics to export. Use `{__name__!=""}` selector for fetching all the time series. for metrics to export. Use `{__name__!=""}` selector for fetching all the time series.
The response would contain all the data for the selected time series in [JSON streaming format](https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON). The response would contain all the data for the selected time series in [JSON streaming format](https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON).
Each JSON line would contain data for a single time series. An example output: Each JSON line contains samples for a single time series. An example output:
```jsonl ```jsonl
{"metric":{"__name__":"up","job":"node_exporter","instance":"localhost:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]} {"metric":{"__name__":"up","job":"node_exporter","instance":"localhost:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]}
@ -770,8 +714,9 @@ Each JSON line would contain data for a single time series. An example output:
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values. unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
Optional `max_rows_per_line` arg may be added to the request in order to limit the maximum number of rows exported per each JSON line. Optional `max_rows_per_line` arg may be added to the request for limiting the maximum number of rows exported per each JSON line.
By default each JSON line contains all the rows for a single time series. Optional `reduce_mem_usage=1` arg may be added to the request for reducing memory usage when exporting big number of time series.
In this case the output may contain multiple lines with distinct samples for the same time series.
Pass `Accept-Encoding: gzip` HTTP header in the request to `/api/v1/export` in order to reduce network bandwidth during exporing big amounts Pass `Accept-Encoding: gzip` HTTP header in the request to `/api/v1/export` in order to reduce network bandwidth during exporing big amounts
of time series data. This enables gzip compression for the exported data. Example for exporting gzipped data: of time series data. This enables gzip compression for the exported data. Example for exporting gzipped data:
@ -782,22 +727,58 @@ curl -H 'Accept-Encoding: gzip' http://localhost:8428/api/v1/export -d 'match[]=
The maximum duration for each request to `/api/v1/export` is limited by `-search.maxExportDuration` command-line flag. The maximum duration for each request to `/api/v1/export` is limited by `-search.maxExportDuration` command-line flag.
Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-time-series-data). Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-data-in-json-line-format).
### How to import time series data ### How to import time series data
Time series data can be imported via any supported ingestion protocol: Time series data can be imported via any supported ingestion protocol:
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) * [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
* [Influx line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) * Influx line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
* [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) * Graphite plaintext protocol. See[these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol) * OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.
* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests) * OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details.
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series). * `/api/v1/import` for importing data obtained from [/api/v1/export](#how-to-export-data-in-json-line-format).
* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details. See [these docs](##how-to-import-data-in-json-line-format) for details.
* `/api/v1/import/prometheus` http POST handler, which accepts data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. * `/api/v1/import/native` for importing data obtained from [/api/v1/export/native](#how-to-export-data-in-native-format).
See [these docs](#how-to-import-data-in-native-format) for details.
* `/api/v1/import/csv` for importing arbitrary CSV data. See [these docs](#how-to-import-csv-data) for details.
* `/api/v1/import/prometheus` for importing data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:
#### How to import data in native format
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import/native`.
Example for importing data obtained via [/api/v1/export/native](#how-to-export-data-in-native-format):
```bash
# Export the data from <source-victoriametrics>:
curl http://source-victoriametrics:8428/api/v1/export/native -d 'match={__name__!=""}' > exported_data.bin
# Import the data to <destination-victoriametrics>:
curl -X POST http://destination-victoriametrics:8428/api/v1/import/native -T exported_data.bin
```
Pass `Content-Encoding: gzip` HTTP request header to `/api/v1/import/native` for importing gzipped data:
```bash
# Export gzipped data from <source-victoriametrics>:
curl -H 'Accept-Encoding: gzip' http://source-victoriametrics:8428/api/v1/export/native -d 'match={__name__!=""}' > exported_data.bin.gz
# Import gzipped data to <destination-victoriametrics>:
curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428/api/v1/import/native -T exported_data.bin.gz
```
Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args.
For example, `/api/v1/import/native?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported time series.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
#### How to import data in JSON line format
Example for importing data obtained via [/api/v1/export](#how-to-export-data-in-json-line-format):
```bash ```bash
# Export the data from <source-victoriametrics>: # Export the data from <source-victoriametrics>:
@ -823,6 +804,94 @@ For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
#### How to import CSV data
Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg.
The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon:
```
<column_pos>:<type>:<context>
```
* `<column_pos>` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary.
* `<type>` describes the column type. Supported types are:
* `metric` - the corresponding CSV column at `<column_pos>` contains metric value, which must be integer or floating-point number.
The metric name is read from the `<context>`. CSV line must have at least a single metric field. Multiple metric fields per CSV line is OK.
* `label` - the corresponding CSV column at `<column_pos>` contains label value. The label name is read from the `<context>`.
CSV line may have arbitrary number of label fields. All these labels are attached to all the configured metrics.
* `time` - the corresponding CSV column at `<column_pos>` contains metric time. CSV line may contain either one or zero columns with time.
If CSV line has no time, then the current time is used. The time is applied to all the configured metrics.
The format of the time is configured via `<context>`. Supported time formats are:
* `unix_s` - unix timestamp in seconds.
* `unix_ms` - unix timestamp in milliseconds.
* `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds.
* `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`.
* `custom:<layout>` - custom layout for the timestamp. The `<layout>` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse).
Each request to `/api/v1/import/csv` may contain arbitrary number of CSV lines.
Example for importing CSV data via `/api/v1/import/csv`:
```bash
curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
```
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}'
```
The following response should be returned:
```bash
{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]}
{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]}
{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]}
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
```
Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args.
For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
#### How to import data in Prometheus exposition format
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics:
```bash
curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus'
```
The following command may be used for verifying the imported data:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}'
```
It should return something like the following:
```
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
```
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
It can be overriden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`.
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
### Relabeling ### Relabeling
VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points

View File

@ -25,7 +25,8 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). * Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents). * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents).
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`. * Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data). * JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-json-line-format).
* Native data import protocol via `http://<vmagent>:8429/api/v1/import/native`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-native-format).
* Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details. * Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details.
* Arbitrary CSV data via `http://<vmagent>:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data). * Arbitrary CSV data via `http://<vmagent>:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data).
* Can replicate collected metrics simultaneously to multiple remote storage systems. * Can replicate collected metrics simultaneously to multiple remote storage systems.

View File

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport"
@ -171,6 +172,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
} }
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
return true return true
case "/api/v1/import/native":
nativeimportRequests.Inc()
if err := native.InsertHandler(r); err != nil {
nativeimportErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/write", "/api/v2/write": case "/write", "/api/v2/write":
influxWriteRequests.Inc() influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(r); err != nil { if err := influx.InsertHandlerForHTTP(r); err != nil {
@ -213,6 +223,9 @@ var (
prometheusimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) prometheusimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
prometheusimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) prometheusimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
nativeimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`)
nativeimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`)
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`) influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`) influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`)

View File

@ -0,0 +1,82 @@
package native
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="native"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="native"}`)
)
// InsertHandler processes `/api/v1/import` request.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(block *parser.Block) error {
return insertRows(block, extraLabels)
})
})
}
func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
mn := &block.MetricName
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: bytesutil.ToUnsafeString(mn.MetricGroup),
})
for j := range mn.Tags {
tag := &mn.Tags[j]
labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(tag.Key),
Value: bytesutil.ToUnsafeString(tag.Value),
})
}
labels = append(labels, extraLabels...)
values := block.Values
timestamps := block.Timestamps
if len(timestamps) != len(values) {
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
}
samplesLen := len(samples)
for j, value := range values {
samples = append(samples, prompbmarshal.Sample{
Value: value,
Timestamp: timestamps[j],
})
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal := len(values)
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View File

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
@ -54,7 +55,9 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
labels = append(labels, extraLabels...) labels = append(labels, extraLabels...)
values := r.Values values := r.Values
timestamps := r.Timestamps timestamps := r.Timestamps
_ = timestamps[len(values)-1] if len(timestamps) != len(values) {
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
}
samplesLen := len(samples) samplesLen := len(samples)
for j, value := range values { for j, value := range values {
samples = append(samples, prompbmarshal.Sample{ samples = append(samples, prompbmarshal.Sample{

View File

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport"
@ -125,6 +126,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
} }
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
return true return true
case "/api/v1/import/native":
nativeimportRequests.Inc()
if err := native.InsertHandler(r); err != nil {
nativeimportErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/write", "/api/v2/write": case "/write", "/api/v2/write":
influxWriteRequests.Inc() influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(r); err != nil { if err := influx.InsertHandlerForHTTP(r); err != nil {
@ -169,6 +179,9 @@ var (
prometheusimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) prometheusimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
prometheusimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) prometheusimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
nativeimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`)
nativeimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`)
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`) influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`) influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`)

View File

@ -0,0 +1,113 @@
package native
import (
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="native"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="native"}`)
)
// InsertHandler processes `/api/v1/import/native` request.
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(block *parser.Block) error {
return insertRows(block, extraLabels)
})
})
}
func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)
rowsLen := len(block.Values)
ic := &ctx.Common
ic.Reset(rowsLen)
hasRelabeling := relabel.HasRelabeling()
mn := &block.MetricName
ic.Labels = ic.Labels[:0]
ic.AddLabelBytes(nil, mn.MetricGroup)
for j := range mn.Tags {
tag := &mn.Tags[j]
ic.AddLabelBytes(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ic.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ic.ApplyRelabeling()
}
if len(ic.Labels) == 0 {
// Skip metric without labels.
return nil
}
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
values := block.Values
timestamps := block.Timestamps
if len(timestamps) != len(values) {
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
}
for j, value := range values {
timestamp := timestamps[j]
if err := ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value); err != nil {
return err
}
}
rowsTotal := len(values)
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ic.FlushBufs()
}
type pushCtx struct {
Common common.InsertCtx
metricNameBuf []byte
}
func (ctx *pushCtx) reset() {
ctx.Common.Reset(0)
ctx.metricNameBuf = ctx.metricNameBuf[:0]
}
func getPushCtx() *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
return v.(*pushCtx)
}
return &pushCtx{}
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))

View File

@ -203,6 +203,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true return true
} }
return true return true
case "/api/v1/export/native":
exportNativeRequests.Inc()
if err := prometheus.ExportNativeHandler(startTime, w, r); err != nil {
exportNativeErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "/federate": case "/federate":
federateRequests.Inc() federateRequests.Inc()
if err := prometheus.FederateHandler(startTime, w, r); err != nil { if err := prometheus.FederateHandler(startTime, w, r); err != nil {
@ -321,6 +329,9 @@ var (
exportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/export"}`) exportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/export"}`)
exportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/export"}`) exportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/export"}`)
exportNativeRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/export/native"}`)
exportNativeErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/export/native"}`)
federateRequests = metrics.NewCounter(`vm_http_requests_total{path="/federate"}`) federateRequests = metrics.NewCounter(`vm_http_requests_total{path="/federate"}`)
federateErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/federate"}`) federateErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/federate"}`)

View File

@ -8,11 +8,11 @@ import (
"runtime" "runtime"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@ -390,29 +390,9 @@ func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, br storage.BlockRef, tr
if err := tmpBlock.UnmarshalData(); err != nil { if err := tmpBlock.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block: %w", err) return fmt.Errorf("cannot unmarshal block: %w", err)
} }
timestamps := tmpBlock.Timestamps() sb.Timestamps, sb.Values = tmpBlock.AppendRowsWithTimeRangeFilter(sb.Timestamps[:0], sb.Values[:0], tr)
skippedRows := tmpBlock.RowsCount() - len(sb.Timestamps)
// Skip timestamps smaller than tr.MinTimestamp.
i := 0
for i < len(timestamps) && timestamps[i] < tr.MinTimestamp {
i++
}
// Skip timestamps bigger than tr.MaxTimestamp.
j := len(timestamps)
for j > i && timestamps[j-1] > tr.MaxTimestamp {
j--
}
skippedRows := tmpBlock.RowsCount() - (j - i)
metricRowsSkipped.Add(skippedRows) metricRowsSkipped.Add(skippedRows)
// Copy the remaining values.
if i == j {
return nil
}
values := tmpBlock.Values()
sb.Timestamps = append(sb.Timestamps, timestamps[i:j]...)
sb.Values = decimal.AppendDecimalToFloat(sb.Values, values[i:j], tmpBlock.Scale())
return nil return nil
} }
@ -581,7 +561,116 @@ func putStorageSearch(sr *storage.Search) {
var ssPool sync.Pool var ssPool sync.Pool
// ProcessSearchQuery performs sq on storage nodes until the given deadline. // ExportBlocks searches for time series matching sq and calls f for each found block.
//
// f is called in parallel from multiple goroutines.
// the process is stopped if f return non-nil error.
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
// It is the responsibility of f to filter blocks according to the given tr.
func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error {
if deadline.Exceeded() {
return fmt.Errorf("timeout exceeded before starting data export: %s", deadline.String())
}
tfss, err := setupTfss(sq.TagFilterss)
if err != nil {
return err
}
tr := storage.TimeRange{
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
if err := vmstorage.CheckTimeRange(tr); err != nil {
return err
}
vmstorage.WG.Add(1)
defer vmstorage.WG.Done()
sr := getStorageSearch()
defer putStorageSearch(sr)
sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline())
// Start workers that call f in parallel on available CPU cores.
gomaxprocs := runtime.GOMAXPROCS(-1)
workCh := make(chan *exportWork, gomaxprocs*8)
var (
errGlobal error
errGlobalLock sync.Mutex
mustStop uint32
)
var wg sync.WaitGroup
wg.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {
defer wg.Done()
for xw := range workCh {
if err := f(&xw.mn, &xw.b, tr); err != nil {
errGlobalLock.Lock()
if errGlobal != nil {
errGlobal = err
atomic.StoreUint32(&mustStop, 1)
}
errGlobalLock.Unlock()
}
xw.reset()
exportWorkPool.Put(xw)
}
}()
}
// Feed workers with work
blocksRead := 0
for sr.NextMetricBlock() {
blocksRead++
if deadline.Exceeded() {
return fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String())
}
if atomic.LoadUint32(&mustStop) != 0 {
break
}
xw := exportWorkPool.Get().(*exportWork)
if err := xw.mn.Unmarshal(sr.MetricBlockRef.MetricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName for block #%d: %w", blocksRead, err)
}
sr.MetricBlockRef.BlockRef.MustReadBlock(&xw.b, true)
workCh <- xw
}
close(workCh)
// Wait for workers to finish.
wg.Wait()
// Check errors.
err = sr.Error()
if err == nil {
err = errGlobal
}
if err != nil {
if errors.Is(err, storage.ErrDeadlineExceeded) {
return fmt.Errorf("timeout exceeded during the query: %s", deadline.String())
}
return fmt.Errorf("search error after reading %d data blocks: %w", blocksRead, err)
}
return nil
}
type exportWork struct {
mn storage.MetricName
b storage.Block
}
func (xw *exportWork) reset() {
xw.mn.Reset()
xw.b.Reset()
}
var exportWorkPool = &sync.Pool{
New: func() interface{} {
return &exportWork{}
},
}
// ProcessSearchQuery performs sq until the given deadline.
// //
// Results.RunParallel or Results.Cancel must be called on the returned Results. // Results.RunParallel or Results.Cancel must be called on the returned Results.
func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline searchutils.Deadline) (*Results, error) { func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline searchutils.Deadline) (*Results, error) {

View File

@ -1,30 +1,29 @@
{% import ( {% import (
"github.com/valyala/quicktemplate" "github.com/valyala/quicktemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
) %} ) %}
{% stripspace %} {% stripspace %}
{% func ExportPrometheusLine(rs *netstorage.Result) %} {% func ExportPrometheusLine(xb *exportBlock) %}
{% if len(rs.Timestamps) == 0 %}{% return %}{% endif %} {% if len(xb.timestamps) == 0 %}{% return %}{% endif %}
{% code bb := quicktemplate.AcquireByteBuffer() %} {% code bb := quicktemplate.AcquireByteBuffer() %}
{% code writeprometheusMetricName(bb, &rs.MetricName) %} {% code writeprometheusMetricName(bb, xb.mn) %}
{% for i, ts := range rs.Timestamps %} {% for i, ts := range xb.timestamps %}
{%z= bb.B %}{% space %} {%z= bb.B %}{% space %}
{%f= rs.Values[i] %}{% space %} {%f= xb.values[i] %}{% space %}
{%dl= ts %}{% newline %} {%dl= ts %}{% newline %}
{% endfor %} {% endfor %}
{% code quicktemplate.ReleaseByteBuffer(bb) %} {% code quicktemplate.ReleaseByteBuffer(bb) %}
{% endfunc %} {% endfunc %}
{% func ExportJSONLine(rs *netstorage.Result) %} {% func ExportJSONLine(xb *exportBlock) %}
{% if len(rs.Timestamps) == 0 %}{% return %}{% endif %} {% if len(xb.timestamps) == 0 %}{% return %}{% endif %}
{ {
"metric":{%= metricNameObject(&rs.MetricName) %}, "metric":{%= metricNameObject(xb.mn) %},
"values":[ "values":[
{% if len(rs.Values) > 0 %} {% if len(xb.values) > 0 %}
{% code values := rs.Values %} {% code values := xb.values %}
{%f= values[0] %} {%f= values[0] %}
{% code values = values[1:] %} {% code values = values[1:] %}
{% for _, v := range values %} {% for _, v := range values %}
@ -33,8 +32,8 @@
{% endif %} {% endif %}
], ],
"timestamps":[ "timestamps":[
{% if len(rs.Timestamps) > 0 %} {% if len(xb.timestamps) > 0 %}
{% code timestamps := rs.Timestamps %} {% code timestamps := xb.timestamps %}
{%dl= timestamps[0] %} {%dl= timestamps[0] %}
{% code timestamps = timestamps[1:] %} {% code timestamps = timestamps[1:] %}
{% for _, ts := range timestamps %} {% for _, ts := range timestamps %}
@ -45,10 +44,10 @@
}{% newline %} }{% newline %}
{% endfunc %} {% endfunc %}
{% func ExportPromAPILine(rs *netstorage.Result) %} {% func ExportPromAPILine(xb *exportBlock) %}
{ {
"metric": {%= metricNameObject(&rs.MetricName) %}, "metric": {%= metricNameObject(xb.mn) %},
"values": {%= valuesWithTimestamps(rs.Values, rs.Timestamps) %} "values": {%= valuesWithTimestamps(xb.values, xb.timestamps) %}
} }
{% endfunc %} {% endfunc %}

View File

@ -6,380 +6,379 @@ package prometheus
//line app/vmselect/prometheus/export.qtpl:1 //line app/vmselect/prometheus/export.qtpl:1
import ( import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/valyala/quicktemplate" "github.com/valyala/quicktemplate"
) )
//line app/vmselect/prometheus/export.qtpl:9 //line app/vmselect/prometheus/export.qtpl:8
import ( import (
qtio422016 "io" qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate" qt422016 "github.com/valyala/quicktemplate"
) )
//line app/vmselect/prometheus/export.qtpl:9 //line app/vmselect/prometheus/export.qtpl:8
var ( var (
_ = qtio422016.Copy _ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer _ = qt422016.AcquireByteBuffer
) )
//line app/vmselect/prometheus/export.qtpl:8
func StreamExportPrometheusLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:9
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:9 //line app/vmselect/prometheus/export.qtpl:9
func StreamExportPrometheusLine(qw422016 *qt422016.Writer, rs *netstorage.Result) {
//line app/vmselect/prometheus/export.qtpl:10
if len(rs.Timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:10
return return
//line app/vmselect/prometheus/export.qtpl:10 //line app/vmselect/prometheus/export.qtpl:9
} }
//line app/vmselect/prometheus/export.qtpl:11 //line app/vmselect/prometheus/export.qtpl:10
bb := quicktemplate.AcquireByteBuffer() bb := quicktemplate.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:12 //line app/vmselect/prometheus/export.qtpl:11
writeprometheusMetricName(bb, &rs.MetricName) writeprometheusMetricName(bb, xb.mn)
//line app/vmselect/prometheus/export.qtpl:12
for i, ts := range xb.timestamps {
//line app/vmselect/prometheus/export.qtpl:13 //line app/vmselect/prometheus/export.qtpl:13
for i, ts := range rs.Timestamps {
//line app/vmselect/prometheus/export.qtpl:14
qw422016.N().Z(bb.B) qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:13
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:14
qw422016.N().F(xb.values[i])
//line app/vmselect/prometheus/export.qtpl:14 //line app/vmselect/prometheus/export.qtpl:14
qw422016.N().S(` `) qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:15 //line app/vmselect/prometheus/export.qtpl:15
qw422016.N().F(rs.Values[i])
//line app/vmselect/prometheus/export.qtpl:15
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:16
qw422016.N().DL(ts) qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:16 //line app/vmselect/prometheus/export.qtpl:15
qw422016.N().S(` qw422016.N().S(`
`) `)
//line app/vmselect/prometheus/export.qtpl:17 //line app/vmselect/prometheus/export.qtpl:16
} }
//line app/vmselect/prometheus/export.qtpl:18 //line app/vmselect/prometheus/export.qtpl:17
quicktemplate.ReleaseByteBuffer(bb) quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
} }
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
func WriteExportPrometheusLine(qq422016 qtio422016.Writer, rs *netstorage.Result) { func WriteExportPrometheusLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
qw422016 := qt422016.AcquireWriter(qq422016) qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
StreamExportPrometheusLine(qw422016, rs) StreamExportPrometheusLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
qt422016.ReleaseWriter(qw422016) qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
} }
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
func ExportPrometheusLine(rs *netstorage.Result) string { func ExportPrometheusLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
qb422016 := qt422016.AcquireByteBuffer() qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
WriteExportPrometheusLine(qb422016, rs) WriteExportPrometheusLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
qs422016 := string(qb422016.B) qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
qt422016.ReleaseByteBuffer(qb422016) qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
return qs422016 return qs422016
//line app/vmselect/prometheus/export.qtpl:19 //line app/vmselect/prometheus/export.qtpl:18
} }
//line app/vmselect/prometheus/export.qtpl:20
func StreamExportJSONLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:21
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:21 //line app/vmselect/prometheus/export.qtpl:21
func StreamExportJSONLine(qw422016 *qt422016.Writer, rs *netstorage.Result) {
//line app/vmselect/prometheus/export.qtpl:22
if len(rs.Timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:22
return return
//line app/vmselect/prometheus/export.qtpl:22 //line app/vmselect/prometheus/export.qtpl:21
} }
//line app/vmselect/prometheus/export.qtpl:22 //line app/vmselect/prometheus/export.qtpl:21
qw422016.N().S(`{"metric":`) qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:24 //line app/vmselect/prometheus/export.qtpl:23
streammetricNameObject(qw422016, &rs.MetricName) streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:24 //line app/vmselect/prometheus/export.qtpl:23
qw422016.N().S(`,"values":[`) qw422016.N().S(`,"values":[`)
//line app/vmselect/prometheus/export.qtpl:25
if len(xb.values) > 0 {
//line app/vmselect/prometheus/export.qtpl:26 //line app/vmselect/prometheus/export.qtpl:26
if len(rs.Values) > 0 { values := xb.values
//line app/vmselect/prometheus/export.qtpl:27
values := rs.Values
//line app/vmselect/prometheus/export.qtpl:28 //line app/vmselect/prometheus/export.qtpl:27
qw422016.N().F(values[0]) qw422016.N().F(values[0])
//line app/vmselect/prometheus/export.qtpl:29 //line app/vmselect/prometheus/export.qtpl:28
values = values[1:] values = values[1:]
//line app/vmselect/prometheus/export.qtpl:30 //line app/vmselect/prometheus/export.qtpl:29
for _, v := range values { for _, v := range values {
//line app/vmselect/prometheus/export.qtpl:30 //line app/vmselect/prometheus/export.qtpl:29
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:31 //line app/vmselect/prometheus/export.qtpl:30
qw422016.N().F(v) qw422016.N().F(v)
//line app/vmselect/prometheus/export.qtpl:32 //line app/vmselect/prometheus/export.qtpl:31
} }
//line app/vmselect/prometheus/export.qtpl:33 //line app/vmselect/prometheus/export.qtpl:32
} }
//line app/vmselect/prometheus/export.qtpl:33 //line app/vmselect/prometheus/export.qtpl:32
qw422016.N().S(`],"timestamps":[`) qw422016.N().S(`],"timestamps":[`)
//line app/vmselect/prometheus/export.qtpl:35
if len(xb.timestamps) > 0 {
//line app/vmselect/prometheus/export.qtpl:36 //line app/vmselect/prometheus/export.qtpl:36
if len(rs.Timestamps) > 0 { timestamps := xb.timestamps
//line app/vmselect/prometheus/export.qtpl:37
timestamps := rs.Timestamps
//line app/vmselect/prometheus/export.qtpl:38 //line app/vmselect/prometheus/export.qtpl:37
qw422016.N().DL(timestamps[0]) qw422016.N().DL(timestamps[0])
//line app/vmselect/prometheus/export.qtpl:39 //line app/vmselect/prometheus/export.qtpl:38
timestamps = timestamps[1:] timestamps = timestamps[1:]
//line app/vmselect/prometheus/export.qtpl:40 //line app/vmselect/prometheus/export.qtpl:39
for _, ts := range timestamps { for _, ts := range timestamps {
//line app/vmselect/prometheus/export.qtpl:40 //line app/vmselect/prometheus/export.qtpl:39
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:41 //line app/vmselect/prometheus/export.qtpl:40
qw422016.N().DL(ts) qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:42 //line app/vmselect/prometheus/export.qtpl:41
} }
//line app/vmselect/prometheus/export.qtpl:43 //line app/vmselect/prometheus/export.qtpl:42
} }
//line app/vmselect/prometheus/export.qtpl:43 //line app/vmselect/prometheus/export.qtpl:42
qw422016.N().S(`]}`) qw422016.N().S(`]}`)
//line app/vmselect/prometheus/export.qtpl:45 //line app/vmselect/prometheus/export.qtpl:44
qw422016.N().S(` qw422016.N().S(`
`) `)
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
} }
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
func WriteExportJSONLine(qq422016 qtio422016.Writer, rs *netstorage.Result) { func WriteExportJSONLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
qw422016 := qt422016.AcquireWriter(qq422016) qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
StreamExportJSONLine(qw422016, rs) StreamExportJSONLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
qt422016.ReleaseWriter(qw422016) qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
} }
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
func ExportJSONLine(rs *netstorage.Result) string { func ExportJSONLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
qb422016 := qt422016.AcquireByteBuffer() qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
WriteExportJSONLine(qb422016, rs) WriteExportJSONLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
qs422016 := string(qb422016.B) qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
qt422016.ReleaseByteBuffer(qb422016) qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
return qs422016 return qs422016
//line app/vmselect/prometheus/export.qtpl:46 //line app/vmselect/prometheus/export.qtpl:45
} }
//line app/vmselect/prometheus/export.qtpl:48 //line app/vmselect/prometheus/export.qtpl:47
func StreamExportPromAPILine(qw422016 *qt422016.Writer, rs *netstorage.Result) { func StreamExportPromAPILine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:48 //line app/vmselect/prometheus/export.qtpl:47
qw422016.N().S(`{"metric":`) qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:50 //line app/vmselect/prometheus/export.qtpl:49
streammetricNameObject(qw422016, &rs.MetricName) streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:50 //line app/vmselect/prometheus/export.qtpl:49
qw422016.N().S(`,"values":`) qw422016.N().S(`,"values":`)
//line app/vmselect/prometheus/export.qtpl:51 //line app/vmselect/prometheus/export.qtpl:50
streamvaluesWithTimestamps(qw422016, rs.Values, rs.Timestamps) streamvaluesWithTimestamps(qw422016, xb.values, xb.timestamps)
//line app/vmselect/prometheus/export.qtpl:51 //line app/vmselect/prometheus/export.qtpl:50
qw422016.N().S(`}`) qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
} }
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
func WriteExportPromAPILine(qq422016 qtio422016.Writer, rs *netstorage.Result) { func WriteExportPromAPILine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
qw422016 := qt422016.AcquireWriter(qq422016) qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
StreamExportPromAPILine(qw422016, rs) StreamExportPromAPILine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
qt422016.ReleaseWriter(qw422016) qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
} }
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
func ExportPromAPILine(rs *netstorage.Result) string { func ExportPromAPILine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
qb422016 := qt422016.AcquireByteBuffer() qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
WriteExportPromAPILine(qb422016, rs) WriteExportPromAPILine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
qs422016 := string(qb422016.B) qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
qt422016.ReleaseByteBuffer(qb422016) qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
return qs422016 return qs422016
//line app/vmselect/prometheus/export.qtpl:53 //line app/vmselect/prometheus/export.qtpl:52
} }
//line app/vmselect/prometheus/export.qtpl:55 //line app/vmselect/prometheus/export.qtpl:54
func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) { func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:55 //line app/vmselect/prometheus/export.qtpl:54
qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`) qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`)
//line app/vmselect/prometheus/export.qtpl:61 //line app/vmselect/prometheus/export.qtpl:60
bb, ok := <-resultsCh bb, ok := <-resultsCh
//line app/vmselect/prometheus/export.qtpl:62 //line app/vmselect/prometheus/export.qtpl:61
if ok { if ok {
//line app/vmselect/prometheus/export.qtpl:63 //line app/vmselect/prometheus/export.qtpl:62
qw422016.N().Z(bb.B) qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:64 //line app/vmselect/prometheus/export.qtpl:63
quicktemplate.ReleaseByteBuffer(bb) quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:65 //line app/vmselect/prometheus/export.qtpl:64
for bb := range resultsCh { for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:65 //line app/vmselect/prometheus/export.qtpl:64
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:66 //line app/vmselect/prometheus/export.qtpl:65
qw422016.N().Z(bb.B) qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:67 //line app/vmselect/prometheus/export.qtpl:66
quicktemplate.ReleaseByteBuffer(bb) quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:68 //line app/vmselect/prometheus/export.qtpl:67
} }
//line app/vmselect/prometheus/export.qtpl:69 //line app/vmselect/prometheus/export.qtpl:68
} }
//line app/vmselect/prometheus/export.qtpl:69 //line app/vmselect/prometheus/export.qtpl:68
qw422016.N().S(`]}}`) qw422016.N().S(`]}}`)
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
} }
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) { func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
qw422016 := qt422016.AcquireWriter(qq422016) qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
StreamExportPromAPIResponse(qw422016, resultsCh) StreamExportPromAPIResponse(qw422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
qt422016.ReleaseWriter(qw422016) qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
} }
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string { func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
qb422016 := qt422016.AcquireByteBuffer() qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
WriteExportPromAPIResponse(qb422016, resultsCh) WriteExportPromAPIResponse(qb422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
qs422016 := string(qb422016.B) qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
qt422016.ReleaseByteBuffer(qb422016) qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
return qs422016 return qs422016
//line app/vmselect/prometheus/export.qtpl:73 //line app/vmselect/prometheus/export.qtpl:72
} }
//line app/vmselect/prometheus/export.qtpl:75 //line app/vmselect/prometheus/export.qtpl:74
func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) { func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:76 //line app/vmselect/prometheus/export.qtpl:75
for bb := range resultsCh { for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:77 //line app/vmselect/prometheus/export.qtpl:76
qw422016.N().Z(bb.B) qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:78 //line app/vmselect/prometheus/export.qtpl:77
quicktemplate.ReleaseByteBuffer(bb) quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:79 //line app/vmselect/prometheus/export.qtpl:78
} }
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
} }
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) { func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
qw422016 := qt422016.AcquireWriter(qq422016) qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
StreamExportStdResponse(qw422016, resultsCh) StreamExportStdResponse(qw422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
qt422016.ReleaseWriter(qw422016) qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
} }
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string { func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
qb422016 := qt422016.AcquireByteBuffer() qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
WriteExportStdResponse(qb422016, resultsCh) WriteExportStdResponse(qb422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
qs422016 := string(qb422016.B) qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
qt422016.ReleaseByteBuffer(qb422016) qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
return qs422016 return qs422016
//line app/vmselect/prometheus/export.qtpl:80 //line app/vmselect/prometheus/export.qtpl:79
} }
//line app/vmselect/prometheus/export.qtpl:82 //line app/vmselect/prometheus/export.qtpl:81
func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) { func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:83 //line app/vmselect/prometheus/export.qtpl:82
qw422016.N().Z(mn.MetricGroup) qw422016.N().Z(mn.MetricGroup)
//line app/vmselect/prometheus/export.qtpl:84 //line app/vmselect/prometheus/export.qtpl:83
if len(mn.Tags) > 0 { if len(mn.Tags) > 0 {
//line app/vmselect/prometheus/export.qtpl:84 //line app/vmselect/prometheus/export.qtpl:83
qw422016.N().S(`{`) qw422016.N().S(`{`)
//line app/vmselect/prometheus/export.qtpl:86 //line app/vmselect/prometheus/export.qtpl:85
tags := mn.Tags tags := mn.Tags
//line app/vmselect/prometheus/export.qtpl:87 //line app/vmselect/prometheus/export.qtpl:86
qw422016.N().Z(tags[0].Key) qw422016.N().Z(tags[0].Key)
//line app/vmselect/prometheus/export.qtpl:87 //line app/vmselect/prometheus/export.qtpl:86
qw422016.N().S(`=`) qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:87 //line app/vmselect/prometheus/export.qtpl:86
qw422016.N().QZ(tags[0].Value) qw422016.N().QZ(tags[0].Value)
//line app/vmselect/prometheus/export.qtpl:88 //line app/vmselect/prometheus/export.qtpl:87
tags = tags[1:] tags = tags[1:]
//line app/vmselect/prometheus/export.qtpl:89 //line app/vmselect/prometheus/export.qtpl:88
for i := range tags { for i := range tags {
//line app/vmselect/prometheus/export.qtpl:90 //line app/vmselect/prometheus/export.qtpl:89
tag := &tags[i] tag := &tags[i]
//line app/vmselect/prometheus/export.qtpl:90 //line app/vmselect/prometheus/export.qtpl:89
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:91 //line app/vmselect/prometheus/export.qtpl:90
qw422016.N().Z(tag.Key) qw422016.N().Z(tag.Key)
//line app/vmselect/prometheus/export.qtpl:91 //line app/vmselect/prometheus/export.qtpl:90
qw422016.N().S(`=`) qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:91 //line app/vmselect/prometheus/export.qtpl:90
qw422016.N().QZ(tag.Value) qw422016.N().QZ(tag.Value)
//line app/vmselect/prometheus/export.qtpl:92 //line app/vmselect/prometheus/export.qtpl:91
} }
//line app/vmselect/prometheus/export.qtpl:92 //line app/vmselect/prometheus/export.qtpl:91
qw422016.N().S(`}`) qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:94 //line app/vmselect/prometheus/export.qtpl:93
} }
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
} }
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) { func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
qw422016 := qt422016.AcquireWriter(qq422016) qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
streamprometheusMetricName(qw422016, mn) streamprometheusMetricName(qw422016, mn)
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
qt422016.ReleaseWriter(qw422016) qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
} }
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
func prometheusMetricName(mn *storage.MetricName) string { func prometheusMetricName(mn *storage.MetricName) string {
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
qb422016 := qt422016.AcquireByteBuffer() qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
writeprometheusMetricName(qb422016, mn) writeprometheusMetricName(qb422016, mn)
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
qs422016 := string(qb422016.B) qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
qt422016.ReleaseByteBuffer(qb422016) qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
return qs422016 return qs422016
//line app/vmselect/prometheus/export.qtpl:95 //line app/vmselect/prometheus/export.qtpl:94
} }

View File

@ -1,6 +1,7 @@
package prometheus package prometheus
import ( import (
"bufio"
"flag" "flag"
"fmt" "fmt"
"math" "math"
@ -14,6 +15,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@ -41,6 +44,11 @@ var (
// Default step used if not set. // Default step used if not set.
const defaultStep = 5 * 60 * 1000 const defaultStep = 5 * 60 * 1000
// Buffer size for big responses (i.e. /federate and /api/v1/export/* )
// By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses.
// These buffers may result in visible overhead for responses exceeding tens of megabytes.
const bigResponseBufferSize = 128 * 1024
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/ // FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6 ct := startTime.UnixNano() / 1e6
@ -97,10 +105,12 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request
}() }()
w.Header().Set("Content-Type", "text/plain") w.Header().Set("Content-Type", "text/plain")
bw := bufio.NewWriterSize(w, bigResponseBufferSize)
for bb := range resultsCh { for bb := range resultsCh {
w.Write(bb.B) bw.Write(bb.B)
quicktemplate.ReleaseByteBuffer(bb) quicktemplate.ReleaseByteBuffer(bb)
} }
_ = bw.Flush()
err = <-doneCh err = <-doneCh
if err != nil { if err != nil {
@ -112,6 +122,85 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request
var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`) var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`)
// ExportNativeHandler exports data in native format from /api/v1/export/native.
func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %w", err)
}
matches := r.Form["match[]"]
if len(matches) == 0 {
// Maintain backwards compatibility
match := r.FormValue("match")
if len(match) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
matches = []string{match}
}
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
return err
}
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
deadline := searchutils.GetDeadlineForExport(r, startTime)
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return err
}
sq := &storage.SearchQuery{
MinTimestamp: start,
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
w.Header().Set("Content-Type", "VictoriaMetrics/native")
bw := bufio.NewWriterSize(w, bigResponseBufferSize)
// Marshal tr
trBuf := make([]byte, 0, 16)
trBuf = encoding.MarshalInt64(trBuf, start)
trBuf = encoding.MarshalInt64(trBuf, end)
bw.Write(trBuf)
var bwLock sync.Mutex
err = netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
dstBuf := bbPool.Get()
tmpBuf := bbPool.Get()
dst := dstBuf.B
tmp := tmpBuf.B
// Marshal mn
tmp = mn.Marshal(tmp[:0])
dst = encoding.MarshalUint32(dst, uint32(len(tmp)))
dst = append(dst, tmp...)
// Marshal b
tmp = b.MarshalPortable(tmp[:0])
dst = encoding.MarshalUint32(dst, uint32(len(tmp)))
dst = append(dst, tmp...)
tmpBuf.B = tmp
bbPool.Put(tmpBuf)
bwLock.Lock()
_, err := bw.Write(dst)
bwLock.Unlock()
if err != nil {
return fmt.Errorf("cannot write data to client: %w", err)
}
dstBuf.B = dst
bbPool.Put(dstBuf)
return nil
})
_ = bw.Flush()
return err
}
var bbPool bytesutil.ByteBufferPool
// ExportHandler exports data in raw format from /api/v1/export. // ExportHandler exports data in raw format from /api/v1/export.
func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6 ct := startTime.UnixNano() / 1e6
@ -137,11 +226,12 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
} }
format := r.FormValue("format") format := r.FormValue("format")
maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line"))) maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
reduceMemUsage := searchutils.GetBool(r, "reduce_mem_usage")
deadline := searchutils.GetDeadlineForExport(r, startTime) deadline := searchutils.GetDeadlineForExport(r, startTime)
if start >= end { if start >= end {
end = start + defaultStep end = start + defaultStep
} }
if err := exportHandler(w, matches, start, end, format, maxRowsPerLine, deadline); err != nil { if err := exportHandler(w, matches, start, end, format, maxRowsPerLine, reduceMemUsage, deadline); err != nil {
return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %w", matches, start, end, err) return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %w", matches, start, end, err)
} }
exportDuration.UpdateDuration(startTime) exportDuration.UpdateDuration(startTime)
@ -150,17 +240,34 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, deadline searchutils.Deadline) error { func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, reduceMemUsage bool, deadline searchutils.Deadline) error {
writeResponseFunc := WriteExportStdResponse writeResponseFunc := WriteExportStdResponse
writeLineFunc := func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer() bb := quicktemplate.AcquireByteBuffer()
WriteExportJSONLine(bb, rs) WriteExportJSONLine(bb, xb)
resultsCh <- bb resultsCh <- bb
} }
contentType := "application/stream+json"
if format == "prometheus" {
contentType = "text/plain"
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportPrometheusLine(bb, xb)
resultsCh <- bb
}
} else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportPromAPILine(bb, xb)
resultsCh <- bb
}
}
if maxRowsPerLine > 0 { if maxRowsPerLine > 0 {
writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { writeLineFuncOrig := writeLineFunc
valuesOrig := rs.Values writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
timestampsOrig := rs.Timestamps valuesOrig := xb.values
timestampsOrig := xb.timestamps
values := valuesOrig values := valuesOrig
timestamps := timestampsOrig timestamps := timestampsOrig
for len(values) > 0 { for len(values) > 0 {
@ -177,30 +284,12 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo
values = nil values = nil
timestamps = nil timestamps = nil
} }
rs.Values = valuesChunk xb.values = valuesChunk
rs.Timestamps = timestampsChunk xb.timestamps = timestampsChunk
bb := quicktemplate.AcquireByteBuffer() writeLineFuncOrig(xb, resultsCh)
WriteExportJSONLine(bb, rs)
resultsCh <- bb
} }
rs.Values = valuesOrig xb.values = valuesOrig
rs.Timestamps = timestampsOrig xb.timestamps = timestampsOrig
}
}
contentType := "application/stream+json"
if format == "prometheus" {
contentType = "text/plain"
writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportPrometheusLine(bb, rs)
resultsCh <- bb
}
} else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportPromAPILine(bb, rs)
resultsCh <- bb
} }
} }
@ -213,23 +302,52 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo
MaxTimestamp: end, MaxTimestamp: end,
TagFilterss: tagFilterss, TagFilterss: tagFilterss,
} }
rss, err := netstorage.ProcessSearchQuery(sq, true, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
doneCh := make(chan error) doneCh := make(chan error)
go func() {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { if !reduceMemUsage {
writeLineFunc(rs, resultsCh) rss, err := netstorage.ProcessSearchQuery(sq, true, deadline)
}) if err != nil {
close(resultsCh) return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
doneCh <- err }
}() go func() {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
xb := exportBlockPool.Get().(*exportBlock)
xb.mn = &rs.MetricName
xb.timestamps = rs.Timestamps
xb.values = rs.Values
writeLineFunc(xb, resultsCh)
xb.reset()
exportBlockPool.Put(xb)
})
close(resultsCh)
doneCh <- err
}()
} else {
go func() {
err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := b.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block during export: %s", err)
}
xb := exportBlockPool.Get().(*exportBlock)
xb.mn = mn
xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr)
if len(xb.timestamps) > 0 {
writeLineFunc(xb, resultsCh)
}
xb.reset()
exportBlockPool.Put(xb)
return nil
})
close(resultsCh)
doneCh <- err
}()
}
w.Header().Set("Content-Type", contentType) w.Header().Set("Content-Type", contentType)
writeResponseFunc(w, resultsCh) bw := bufio.NewWriterSize(w, bigResponseBufferSize)
writeResponseFunc(bw, resultsCh)
_ = bw.Flush()
// Consume all the data from resultsCh in the event writeResponseFunc // Consume all the data from resultsCh in the event writeResponseFunc
// fails to consume all the data. // fails to consume all the data.
@ -243,6 +361,24 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo
return nil return nil
} }
type exportBlock struct {
mn *storage.MetricName
timestamps []int64
values []float64
}
func (xb *exportBlock) reset() {
xb.mn = nil
xb.timestamps = xb.timestamps[:0]
xb.values = xb.values[:0]
}
var exportBlockPool = &sync.Pool{
New: func() interface{} {
return &exportBlock{}
},
}
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request. // DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
// //
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series // See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
@ -662,7 +798,7 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e
start -= offset start -= offset
end := start end := start
start = end - window start = end - window
if err := exportHandler(w, []string{childQuery}, start, end, "promapi", 0, deadline); err != nil { if err := exportHandler(w, []string{childQuery}, start, end, "promapi", 0, false, deadline); err != nil {
return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %w", childQuery, start, end, err) return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %w", childQuery, start, end, err)
} }
queryDuration.UpdateDuration(startTime) queryDuration.UpdateDuration(startTime)

View File

@ -91,7 +91,7 @@ The main differences between Cortex and VictoriaMetrics:
VictoriaMetrics may lose only a few seconds of recent data, which isn't synced to persistent storage yet. VictoriaMetrics may lose only a few seconds of recent data, which isn't synced to persistent storage yet.
See [this article for details](https://medium.com/@valyala/wal-usage-looks-broken-in-modern-time-series-databases-b62a627ab704). See [this article for details](https://medium.com/@valyala/wal-usage-looks-broken-in-modern-time-series-databases-b62a627ab704).
- Cortex is usually slower and requires more CPU and RAM than VictoriaMetrics. See [this talk from Adidas at PromCon 2019](https://promcon.io/2019-munich/talks/remote-write-storage-wars/) and [other case studies](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies). - Cortex is usually slower and requires more CPU and RAM than VictoriaMetrics. See [this talk from Adidas at PromCon 2019](https://promcon.io/2019-munich/talks/remote-write-storage-wars/) and [other case studies](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies).
- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV. - VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV, JSON, native binary.
See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details.
@ -112,7 +112,7 @@ The main differences between Cortex and VictoriaMetrics:
- Thanos may be harder to setup and operate comparing to VictoriaMetrics, since it has more moving parts, which can be connected with less reliable networks. - Thanos may be harder to setup and operate comparing to VictoriaMetrics, since it has more moving parts, which can be connected with less reliable networks.
See [this article for details](https://medium.com/faun/comparing-thanos-to-victoriametrics-cluster-b193bea1683). See [this article for details](https://medium.com/faun/comparing-thanos-to-victoriametrics-cluster-b193bea1683).
- Thanos is usually slower and requires more CPU and RAM than VictoriaMetrics. See [this talk from Adidas at PromCon 2019](https://promcon.io/2019-munich/talks/remote-write-storage-wars/). - Thanos is usually slower and requires more CPU and RAM than VictoriaMetrics. See [this talk from Adidas at PromCon 2019](https://promcon.io/2019-munich/talks/remote-write-storage-wars/).
- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV. - VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV, JSON, native binary.
See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details.
@ -120,7 +120,7 @@ The main differences between Cortex and VictoriaMetrics:
- VictoriaMetrics requires [10x less RAM](https://medium.com/@valyala/insert-benchmarks-with-inch-influxdb-vs-victoriametrics-e31a41ae2893) and it [works faster](https://medium.com/@valyala/measuring-vertical-scalability-for-time-series-databases-in-google-cloud-92550d78d8ae). - VictoriaMetrics requires [10x less RAM](https://medium.com/@valyala/insert-benchmarks-with-inch-influxdb-vs-victoriametrics-e31a41ae2893) and it [works faster](https://medium.com/@valyala/measuring-vertical-scalability-for-time-series-databases-in-google-cloud-92550d78d8ae).
- VictoriaMetrics provides [better query language](https://medium.com/@valyala/promql-tutorial-for-beginners-9ab455142085) than InfluxQL or Flux. - VictoriaMetrics provides [better query language](https://medium.com/@valyala/promql-tutorial-for-beginners-9ab455142085) than InfluxQL or Flux.
- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to InfluxDB - Prometheus remote_write, OpenTSDB, Graphite, CSV. - VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to InfluxDB - Prometheus remote_write, OpenTSDB, Graphite, CSV, JSON, native binary.
See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details.

View File

@ -77,7 +77,8 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
if `-graphiteListenAddr` is set. if `-graphiteListenAddr` is set.
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set. * [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set.
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set. * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
* [/api/v1/import](#how-to-import-time-series-data). * [JSON line format](#how-to-import-data-in-json-line-format).
* [Native binary format](#how-to-import-data-in-native-format).
* [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format). * [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format).
* [Arbitrary CSV data](#how-to-import-csv-data). * [Arbitrary CSV data](#how-to-import-csv-data).
* Supports metrics' relabeling. See [these docs](#relabeling) for details. * Supports metrics' relabeling. See [these docs](#relabeling) for details.
@ -100,8 +101,6 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) * [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd)
* [Querying Graphite data](#querying-graphite-data) * [Querying Graphite data](#querying-graphite-data)
* [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents) * [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents)
* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format)
* [How to import CSV data](#how-to-import-csv-data)
* [Prometheus querying API usage](#prometheus-querying-api-usage) * [Prometheus querying API usage](#prometheus-querying-api-usage)
* [Prometheus querying API enhancements](#prometheus-querying-api-enhancements) * [Prometheus querying API enhancements](#prometheus-querying-api-enhancements)
* [Graphite Metrics API usage](#graphite-metrics-api-usage) * [Graphite Metrics API usage](#graphite-metrics-api-usage)
@ -117,7 +116,13 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to delete time series](#how-to-delete-time-series) * [How to delete time series](#how-to-delete-time-series)
* [Forced merge](#forced-merge) * [Forced merge](#forced-merge)
* [How to export time series](#how-to-export-time-series) * [How to export time series](#how-to-export-time-series)
* [How to export data in native format](#how-to-export-data-in-native-format)
* [How to export data in JSON line format](#how-to-export-data-in-json-line-format)
* [How to import time series data](#how-to-import-time-series-data) * [How to import time series data](#how-to-import-time-series-data)
* [How to import data in native format](#how-to-import-data-in-native-format)
* [How to import data in json line format](#how-to-import-data-in-json-line-format)
* [How to import CSV data](#how-to-import-csv-data)
* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format)
* [Relabeling](#relabeling) * [Relabeling](#relabeling)
* [Federation](#federation) * [Federation](#federation)
* [Capacity planning](#capacity-planning) * [Capacity planning](#capacity-planning)
@ -345,7 +350,7 @@ curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'ht
``` ```
An arbitrary number of lines delimited by '\n' (aka newline char) may be sent in a single request. An arbitrary number of lines delimited by '\n' (aka newline char) may be sent in a single request.
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash ```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}' curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}'
@ -381,7 +386,7 @@ echo "foo.bar.baz;tag1=value1;tag2=value2 123 `date +%s`" | nc -N localhost 2003
VictoriaMetrics sets the current time if the timestamp is omitted. VictoriaMetrics sets the current time if the timestamp is omitted.
An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go. An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash ```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz' curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
@ -425,7 +430,7 @@ echo "put foo.bar.baz `date +%s` 123 tag1=value1 tag2=value2" | nc -N localhost
``` ```
An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go. An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash ```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz' curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
@ -460,7 +465,7 @@ Example for writing multiple data points in a single request:
curl -H 'Content-Type: application/json' -d '[{"metric":"foo","value":45.34},{"metric":"bar","value":43}]' http://localhost:4242/api/put curl -H 'Content-Type: application/json' -d '[{"metric":"foo","value":45.34},{"metric":"bar","value":43}]' http://localhost:4242/api/put
``` ```
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash ```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]=x.y.z' -d 'match[]=foo' -d 'match[]=bar' curl -G 'http://localhost:8428/api/v1/export' -d 'match[]=x.y.z' -d 'match[]=foo' -d 'match[]=bar'
@ -475,91 +480,6 @@ The `/api/v1/export` endpoint should return the following response:
``` ```
### How to import CSV data
Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg.
The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon:
```
<column_pos>:<type>:<context>
```
* `<column_pos>` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary.
* `<type>` describes the column type. Supported types are:
* `metric` - the corresponding CSV column at `<column_pos>` contains metric value, which must be integer or floating-point number.
The metric name is read from the `<context>`. CSV line must have at least a single metric field. Multiple metric fields per CSV line is OK.
* `label` - the corresponding CSV column at `<column_pos>` contains label value. The label name is read from the `<context>`.
CSV line may have arbitrary number of label fields. All these labels are attached to all the configured metrics.
* `time` - the corresponding CSV column at `<column_pos>` contains metric time. CSV line may contain either one or zero columns with time.
If CSV line has no time, then the current time is used. The time is applied to all the configured metrics.
The format of the time is configured via `<context>`. Supported time formats are:
* `unix_s` - unix timestamp in seconds.
* `unix_ms` - unix timestamp in milliseconds.
* `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds.
* `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`.
* `custom:<layout>` - custom layout for the timestamp. The `<layout>` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse).
Each request to `/api/v1/import/csv` may contain arbitrary number of CSV lines.
Example for importing CSV data via `/api/v1/import/csv`:
```bash
curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
```
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}'
```
The following response should be returned:
```bash
{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]}
{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]}
{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]}
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
```
Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args.
For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
### How to import data in Prometheus exposition format
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics:
```bash
curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus'
```
The following command may be used for verifying the imported data:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}'
```
It should return something like the following:
```
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
```
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
It can be overriden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`.
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
### Prometheus querying API usage ### Prometheus querying API usage
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
@ -756,11 +676,35 @@ when new data is ingested into it.
### How to export time series ### How to export time series
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`, VictoriaMetrics provides the following handlers for exporting data:
* `/api/v1/export/native` for exporting data in native binary format. This is the most efficient format for data export.
See [these docs](#how-to-export-data-in-native-format) for details.
* `/api/v1/export` for exporing data in JSON line format. See [these docs](#how-to-export-data-in-json-line-format) for details.
#### How to export data in native format
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export/native?match[]=<timeseries_selector_for_export>`,
where `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
for metrics to export. Use `{__name__!=""}` selector for fetching all the time series.
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
The exported data can be imported to VictoriaMetrics via [/api/v1/import/native](#how-to-import-data-in-native-format).
#### How to export data in JSON line format
Consider [exporting data in native format](#how-to-export-data-in-native-format) if big amounts of data must be migrated between VictoriaMetrics instances,
since exporting in native format usually consumes lower amounts of CPU and memory resources, while the resulting exported data occupies lower amounts of disk space.
In order to export data in JSON line format, send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`,
where `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) where `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
for metrics to export. Use `{__name__!=""}` selector for fetching all the time series. for metrics to export. Use `{__name__!=""}` selector for fetching all the time series.
The response would contain all the data for the selected time series in [JSON streaming format](https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON). The response would contain all the data for the selected time series in [JSON streaming format](https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON).
Each JSON line would contain data for a single time series. An example output: Each JSON line contains samples for a single time series. An example output:
```jsonl ```jsonl
{"metric":{"__name__":"up","job":"node_exporter","instance":"localhost:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]} {"metric":{"__name__":"up","job":"node_exporter","instance":"localhost:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]}
@ -770,8 +714,9 @@ Each JSON line would contain data for a single time series. An example output:
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values. unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
Optional `max_rows_per_line` arg may be added to the request in order to limit the maximum number of rows exported per each JSON line. Optional `max_rows_per_line` arg may be added to the request for limiting the maximum number of rows exported per each JSON line.
By default each JSON line contains all the rows for a single time series. Optional `reduce_mem_usage=1` arg may be added to the request for reducing memory usage when exporting big number of time series.
In this case the output may contain multiple lines with distinct samples for the same time series.
Pass `Accept-Encoding: gzip` HTTP header in the request to `/api/v1/export` in order to reduce network bandwidth during exporing big amounts Pass `Accept-Encoding: gzip` HTTP header in the request to `/api/v1/export` in order to reduce network bandwidth during exporing big amounts
of time series data. This enables gzip compression for the exported data. Example for exporting gzipped data: of time series data. This enables gzip compression for the exported data. Example for exporting gzipped data:
@ -782,22 +727,58 @@ curl -H 'Accept-Encoding: gzip' http://localhost:8428/api/v1/export -d 'match[]=
The maximum duration for each request to `/api/v1/export` is limited by `-search.maxExportDuration` command-line flag. The maximum duration for each request to `/api/v1/export` is limited by `-search.maxExportDuration` command-line flag.
Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-time-series-data). Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-data-in-json-line-format).
### How to import time series data ### How to import time series data
Time series data can be imported via any supported ingestion protocol: Time series data can be imported via any supported ingestion protocol:
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) * [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
* [Influx line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) * Influx line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
* [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) * Graphite plaintext protocol. See[these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol) * OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.
* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests) * OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details.
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series). * `/api/v1/import` for importing data obtained from [/api/v1/export](#how-to-export-data-in-json-line-format).
* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details. See [these docs](##how-to-import-data-in-json-line-format) for details.
* `/api/v1/import/prometheus` http POST handler, which accepts data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. * `/api/v1/import/native` for importing data obtained from [/api/v1/export/native](#how-to-export-data-in-native-format).
See [these docs](#how-to-import-data-in-native-format) for details.
* `/api/v1/import/csv` for importing arbitrary CSV data. See [these docs](#how-to-import-csv-data) for details.
* `/api/v1/import/prometheus` for importing data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:
#### How to import data in native format
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import/native`.
Example for importing data obtained via [/api/v1/export/native](#how-to-export-data-in-native-format):
```bash
# Export the data from <source-victoriametrics>:
curl http://source-victoriametrics:8428/api/v1/export/native -d 'match={__name__!=""}' > exported_data.bin
# Import the data to <destination-victoriametrics>:
curl -X POST http://destination-victoriametrics:8428/api/v1/import/native -T exported_data.bin
```
Pass `Content-Encoding: gzip` HTTP request header to `/api/v1/import/native` for importing gzipped data:
```bash
# Export gzipped data from <source-victoriametrics>:
curl -H 'Accept-Encoding: gzip' http://source-victoriametrics:8428/api/v1/export/native -d 'match={__name__!=""}' > exported_data.bin.gz
# Import gzipped data to <destination-victoriametrics>:
curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428/api/v1/import/native -T exported_data.bin.gz
```
Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args.
For example, `/api/v1/import/native?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported time series.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
#### How to import data in JSON line format
Example for importing data obtained via [/api/v1/export](#how-to-export-data-in-json-line-format):
```bash ```bash
# Export the data from <source-victoriametrics>: # Export the data from <source-victoriametrics>:
@ -823,6 +804,94 @@ For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
#### How to import CSV data
Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg.
The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon:
```
<column_pos>:<type>:<context>
```
* `<column_pos>` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary.
* `<type>` describes the column type. Supported types are:
* `metric` - the corresponding CSV column at `<column_pos>` contains metric value, which must be integer or floating-point number.
The metric name is read from the `<context>`. CSV line must have at least a single metric field. Multiple metric fields per CSV line is OK.
* `label` - the corresponding CSV column at `<column_pos>` contains label value. The label name is read from the `<context>`.
CSV line may have arbitrary number of label fields. All these labels are attached to all the configured metrics.
* `time` - the corresponding CSV column at `<column_pos>` contains metric time. CSV line may contain either one or zero columns with time.
If CSV line has no time, then the current time is used. The time is applied to all the configured metrics.
The format of the time is configured via `<context>`. Supported time formats are:
* `unix_s` - unix timestamp in seconds.
* `unix_ms` - unix timestamp in milliseconds.
* `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds.
* `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`.
* `custom:<layout>` - custom layout for the timestamp. The `<layout>` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse).
Each request to `/api/v1/import/csv` may contain arbitrary number of CSV lines.
Example for importing CSV data via `/api/v1/import/csv`:
```bash
curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
```
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}'
```
The following response should be returned:
```bash
{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]}
{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]}
{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]}
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
```
Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args.
For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
#### How to import data in Prometheus exposition format
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics:
```bash
curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus'
```
The following command may be used for verifying the imported data:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}'
```
It should return something like the following:
```
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
```
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
It can be overriden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`.
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
### Relabeling ### Relabeling
VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points

View File

@ -25,7 +25,8 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). * Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents). * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents).
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`. * Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data). * JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-json-line-format).
* Native data import protocol via `http://<vmagent>:8429/api/v1/import/native`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-native-format).
* Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details. * Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details.
* Arbitrary CSV data via `http://<vmagent>:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data). * Arbitrary CSV data via `http://<vmagent>:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data).
* Can replicate collected metrics simultaneously to multiple remote storage systems. * Can replicate collected metrics simultaneously to multiple remote storage systems.

View File

@ -0,0 +1,197 @@
package native
import (
"bufio"
"fmt"
"io"
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// ParseStream parses /api/v1/import/native lines from req and calls callback for parsed blocks.
//
// The callback can be called multiple times for streamed data from req.
//
// callback shouldn't hold block after returning.
// callback can be called in parallel from multiple concurrent goroutines.
func ParseStream(req *http.Request, callback func(block *Block) error) error {
r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped vmimport data: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
}
// By default req.Body uses 4Kb buffer. This size is too small for typical request to /api/v1/import/native,
// so use slightly bigger buffer in order to reduce read syscall overhead.
br := bufio.NewReaderSize(r, 1024*1024)
// Read time range (tr)
trBuf := make([]byte, 16)
var tr storage.TimeRange
if _, err := io.ReadFull(br, trBuf); err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read time range: %w", err)
}
tr.MinTimestamp = encoding.UnmarshalInt64(trBuf)
tr.MaxTimestamp = encoding.UnmarshalInt64(trBuf[8:])
// Start GOMAXPROC workers in order to process ingested data in parallel.
gomaxprocs := runtime.GOMAXPROCS(-1)
workCh := make(chan *unmarshalWork, 8*gomaxprocs)
var wg sync.WaitGroup
defer func() {
close(workCh)
wg.Wait()
}()
wg.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {
defer wg.Done()
var tmpBlock storage.Block
for uw := range workCh {
if err := uw.unmarshal(&tmpBlock, tr); err != nil {
parseErrors.Inc()
logger.Errorf("error when unmarshaling native block: %s", err)
putUnmarshalWork(uw)
continue
}
if err := callback(&uw.block); err != nil {
processErrors.Inc()
logger.Errorf("error when processing native block: %s", err)
putUnmarshalWork(uw)
continue
}
putUnmarshalWork(uw)
}
}()
}
// Read native blocks and feed workers with work.
sizeBuf := make([]byte, 4)
for {
uw := getUnmarshalWork()
// Read uw.metricNameBuf
if _, err := io.ReadFull(br, sizeBuf); err != nil {
if err == io.EOF {
// End of stream
return nil
}
readErrors.Inc()
return fmt.Errorf("cannot read metricName size: %w", err)
}
readCalls.Inc()
bufSize := encoding.UnmarshalUint32(sizeBuf)
if bufSize > 1024*1024 {
parseErrors.Inc()
return fmt.Errorf("too big metricName size; got %d; shouldn't exceed %d", bufSize, 1024*1024)
}
uw.metricNameBuf = bytesutil.Resize(uw.metricNameBuf, int(bufSize))
if _, err := io.ReadFull(br, uw.metricNameBuf); err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read metricName with size %d bytes: %w", bufSize, err)
}
readCalls.Inc()
// Read uw.blockBuf
if _, err := io.ReadFull(br, sizeBuf); err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read native block size: %w", err)
}
readCalls.Inc()
bufSize = encoding.UnmarshalUint32(sizeBuf)
if bufSize > 1024*1024 {
parseErrors.Inc()
return fmt.Errorf("too big native block size; got %d; shouldn't exceed %d", bufSize, 1024*1024)
}
uw.blockBuf = bytesutil.Resize(uw.blockBuf, int(bufSize))
if _, err := io.ReadFull(br, uw.blockBuf); err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read native block with size %d bytes: %w", bufSize, err)
}
readCalls.Inc()
blocksRead.Inc()
// Feed workers with work.
workCh <- uw
}
}
// Block is a single block from `/api/v1/import/native` request.
type Block struct {
MetricName storage.MetricName
Values []float64
Timestamps []int64
}
func (b *Block) reset() {
b.MetricName.Reset()
b.Values = b.Values[:0]
b.Timestamps = b.Timestamps[:0]
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="native"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="native"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="native"}`)
blocksRead = metrics.NewCounter(`vm_protoparser_blocks_read_total{type="native"}`)
parseErrors = metrics.NewCounter(`vm_protoparser_parse_errors_total{type="native"}`)
processErrors = metrics.NewCounter(`vm_protoparser_process_errors_total{type="native"}`)
)
type unmarshalWork struct {
tr storage.TimeRange
metricNameBuf []byte
blockBuf []byte
block Block
}
func (uw *unmarshalWork) reset() {
uw.metricNameBuf = uw.metricNameBuf[:0]
uw.blockBuf = uw.blockBuf[:0]
uw.block.reset()
}
func (uw *unmarshalWork) unmarshal(tmpBlock *storage.Block, tr storage.TimeRange) error {
block := &uw.block
if err := block.MetricName.Unmarshal(uw.metricNameBuf); err != nil {
return fmt.Errorf("cannot unmarshal metricName from %d bytes: %w", len(uw.metricNameBuf), err)
}
tail, err := tmpBlock.UnmarshalPortable(uw.blockBuf)
if err != nil {
return fmt.Errorf("cannot unmarshal native block from %d bytes: %w", len(uw.blockBuf), err)
}
if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling native block from %d bytes; len(tail)=%d bytes", len(uw.blockBuf), len(tail))
}
block.Timestamps, block.Values = tmpBlock.AppendRowsWithTimeRangeFilter(block.Timestamps[:0], block.Values[:0], tr)
rowsRead.Add(len(block.Timestamps))
return nil
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View File

@ -2,9 +2,11 @@ package storage
import ( import (
"fmt" "fmt"
"math"
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
@ -86,21 +88,6 @@ func (b *Block) RowsCount() int {
return int(b.bh.RowsCount) return int(b.bh.RowsCount)
} }
// Values returns b values.
func (b *Block) Values() []int64 {
return b.values
}
// Timestamps returns b timestamps.
func (b *Block) Timestamps() []int64 {
return b.timestamps
}
// Scale returns the decimal scale used for encoding values in the block.
func (b *Block) Scale() int16 {
return b.bh.Scale
}
// Init initializes b with the given tsid, timestamps, values and scale. // Init initializes b with the given tsid, timestamps, values and scale.
func (b *Block) Init(tsid *TSID, timestamps, values []int64, scale int16, precisionBits uint8) { func (b *Block) Init(tsid *TSID, timestamps, values []int64, scale int16, precisionBits uint8) {
b.Reset() b.Reset()
@ -289,3 +276,123 @@ func (b *Block) UnmarshalData() error {
return nil return nil
} }
// AppendRowsWithTimeRangeFilter filters samples from b according to tr and appends them to dst*.
//
// It is expected that UnmarshalData has been already called on b.
func (b *Block) AppendRowsWithTimeRangeFilter(dstTimestamps []int64, dstValues []float64, tr TimeRange) ([]int64, []float64) {
timestamps, values := b.filterTimestamps(tr)
dstTimestamps = append(dstTimestamps, timestamps...)
dstValues = decimal.AppendDecimalToFloat(dstValues, values, b.bh.Scale)
return dstTimestamps, dstValues
}
func (b *Block) filterTimestamps(tr TimeRange) ([]int64, []int64) {
timestamps := b.timestamps
// Skip timestamps smaller than tr.MinTimestamp.
i := 0
for i < len(timestamps) && timestamps[i] < tr.MinTimestamp {
i++
}
// Skip timestamps bigger than tr.MaxTimestamp.
j := len(timestamps)
for j > i && timestamps[j-1] > tr.MaxTimestamp {
j--
}
if i == j {
return nil, nil
}
return timestamps[i:j], b.values[i:j]
}
// MarshalPortable marshals b to dst, so it could be portably migrated to other VictoriaMetrics instance.
//
// The marshaled value must be unmarshaled with UnmarshalPortable function.
func (b *Block) MarshalPortable(dst []byte) []byte {
b.MarshalData(0, 0)
dst = encoding.MarshalVarInt64(dst, b.bh.MinTimestamp)
dst = encoding.MarshalVarInt64(dst, b.bh.FirstValue)
dst = encoding.MarshalVarUint64(dst, uint64(b.bh.RowsCount))
dst = encoding.MarshalVarInt64(dst, int64(b.bh.Scale))
dst = append(dst, byte(b.bh.TimestampsMarshalType))
dst = append(dst, byte(b.bh.ValuesMarshalType))
dst = encoding.MarshalBytes(dst, b.timestampsData)
dst = encoding.MarshalBytes(dst, b.valuesData)
return dst
}
// UnmarshalPortable unmarshals block from src to b and returns the remaining tail.
//
// It is assumed that the block has been marshaled with MarshalPortable.
func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) {
b.Reset()
// Read header
src, firstTimestamp, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err)
}
b.bh.MinTimestamp = firstTimestamp
src, firstValue, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal firstValue: %w", err)
}
b.bh.FirstValue = firstValue
src, rowsCount, err := encoding.UnmarshalVarUint64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err)
}
if rowsCount > math.MaxUint32 {
return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, math.MaxUint32)
}
b.bh.RowsCount = uint32(rowsCount)
src, scale, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal scale: %w", err)
}
if scale < math.MinInt16 {
return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16)
}
if scale > math.MaxInt16 {
return src, fmt.Errorf("got too big scale=%d; it mustn't exceeed %d", scale, math.MaxInt16)
}
b.bh.Scale = int16(scale)
if len(src) < 1 {
return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1)
}
b.bh.TimestampsMarshalType = encoding.MarshalType(src[0])
src = src[1:]
if len(src) < 1 {
return src, fmt.Errorf("cannot unmarshal marshalType for values from %d bytes; need at least %d bytes", len(src), 1)
}
b.bh.ValuesMarshalType = encoding.MarshalType(src[0])
src = src[1:]
b.bh.PrecisionBits = 64
// Read data
src, timestampsData, err := encoding.UnmarshalBytes(src)
if err != nil {
return src, fmt.Errorf("cannot read timestampsData: %w", err)
}
b.timestampsData = append(b.timestampsData[:0], timestampsData...)
src, valuesData, err := encoding.UnmarshalBytes(src)
if err != nil {
return src, fmt.Errorf("cannot read valuesData: %w", err)
}
b.valuesData = append(b.valuesData[:0], valuesData...)
// Validate
if err := b.bh.validate(); err != nil {
return src, fmt.Errorf("invalid blockHeader: %w", err)
}
if err := b.UnmarshalData(); err != nil {
return src, fmt.Errorf("invalid data: %w", err)
}
return src, nil
}

116
lib/storage/block_test.go Normal file
View File

@ -0,0 +1,116 @@
package storage
import (
"math/rand"
"reflect"
"strings"
"testing"
)
func TestBlockMarshalUnmarshalPortable(t *testing.T) {
var b Block
for i := 0; i < 1000; i++ {
b.Reset()
rowsCount := rand.Intn(maxRowsPerBlock) + 1
b.timestamps = getRandTimestamps(rowsCount)
b.values = getRandValues(rowsCount)
b.bh.Scale = int16(rand.Intn(30) - 15)
b.bh.PrecisionBits = 64
testBlockMarshalUnmarshalPortable(t, &b)
}
}
func testBlockMarshalUnmarshalPortable(t *testing.T, b *Block) {
var b1, b2 Block
b1.CopyFrom(b)
rowsCount := len(b.values)
data := b1.MarshalPortable(nil)
if b1.bh.RowsCount != uint32(rowsCount) {
t.Fatalf("unexpected number of rows marshaled; got %d; want %d", b1.bh.RowsCount, rowsCount)
}
tail, err := b2.UnmarshalPortable(data)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(tail) > 0 {
t.Fatalf("unexpected non-empty tail: %X", tail)
}
compareBlocksPortable(t, &b2, b, &b1.bh)
// Verify non-empty prefix and suffix
prefix := "prefix"
suffix := "suffix"
data = append(data[:0], prefix...)
data = b1.MarshalPortable(data)
if b1.bh.RowsCount != uint32(rowsCount) {
t.Fatalf("unexpected number of rows marshaled; got %d; want %d", b1.bh.RowsCount, rowsCount)
}
if !strings.HasPrefix(string(data), prefix) {
t.Fatalf("unexpected prefix in %X; want %X", data, prefix)
}
data = data[len(prefix):]
data = append(data, suffix...)
tail, err = b2.UnmarshalPortable(data)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if string(tail) != suffix {
t.Fatalf("unexpected tail; got %X; want %X", tail, suffix)
}
compareBlocksPortable(t, &b2, b, &b1.bh)
}
func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader) {
t.Helper()
if b1.bh.MinTimestamp != bhExpected.MinTimestamp {
t.Fatalf("unexpected MinTimestamp; got %d; want %d", b1.bh.MinTimestamp, bhExpected.MinTimestamp)
}
if b1.bh.FirstValue != bhExpected.FirstValue {
t.Fatalf("unexpected FirstValue; got %d; want %d", b1.bh.FirstValue, bhExpected.FirstValue)
}
if b1.bh.RowsCount != bhExpected.RowsCount {
t.Fatalf("unexpected RowsCount; got %d; want %d", b1.bh.RowsCount, bhExpected.RowsCount)
}
if b1.bh.Scale != bhExpected.Scale {
t.Fatalf("unexpected Scale; got %d; want %d", b1.bh.Scale, bhExpected.Scale)
}
if b1.bh.TimestampsMarshalType != bhExpected.TimestampsMarshalType {
t.Fatalf("unexpected TimestampsMarshalType; got %d; want %d", b1.bh.TimestampsMarshalType, bhExpected.TimestampsMarshalType)
}
if b1.bh.ValuesMarshalType != bhExpected.ValuesMarshalType {
t.Fatalf("unexpected ValuesMarshalType; got %d; want %d", b1.bh.ValuesMarshalType, bhExpected.ValuesMarshalType)
}
if b1.bh.PrecisionBits != bhExpected.PrecisionBits {
t.Fatalf("unexpected PrecisionBits; got %d; want %d", b1.bh.PrecisionBits, bhExpected.PrecisionBits)
}
if !reflect.DeepEqual(b1.values, b2.values) {
t.Fatalf("unexpected values; got %d; want %d", b1.values, b2.values)
}
if !reflect.DeepEqual(b1.timestamps, b2.timestamps) {
t.Fatalf("unexpected timestamps; got %d; want %d", b1.timestamps, b2.timestamps)
}
if len(b1.values) != int(bhExpected.RowsCount) {
t.Fatalf("unexpected number of values; got %d; want %d", len(b1.values), bhExpected.RowsCount)
}
if len(b1.timestamps) != int(bhExpected.RowsCount) {
t.Fatalf("unexpected number of timestamps; got %d; want %d", len(b1.timestamps), bhExpected.RowsCount)
}
}
func getRandValues(rowsCount int) []int64 {
a := make([]int64, rowsCount)
for i := 0; i < rowsCount; i++ {
a[i] = int64(rand.Intn(1e5) - 0.5e5)
}
return a
}
func getRandTimestamps(rowsCount int) []int64 {
a := make([]int64, rowsCount)
ts := int64(rand.Intn(1e12))
for i := 0; i < rowsCount; i++ {
a[i] = ts
ts += int64(rand.Intn(1e5))
}
return a
}