mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
app/vminsert: add /api/v1/import
handler
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
This commit is contained in:
parent
8501b4a48d
commit
68e1cf8942
37
README.md
37
README.md
@ -49,11 +49,12 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM
|
|||||||
* Storage is protected from corruption on unclean shutdown (i.e. OOM, hardware reset or `kill -9`) thanks to [the storage architecture](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
|
* Storage is protected from corruption on unclean shutdown (i.e. OOM, hardware reset or `kill -9`) thanks to [the storage architecture](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
|
||||||
* Supports metrics' ingestion and [backfilling](#backfilling) via the following protocols:
|
* Supports metrics' ingestion and [backfilling](#backfilling) via the following protocols:
|
||||||
* [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write)
|
* [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write)
|
||||||
* [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/)
|
* [InfluxDB line protocol](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf)
|
||||||
* [Graphite plaintext protocol](https://graphite.readthedocs.io/en/latest/feeding-carbon.html) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon)
|
* [Graphite plaintext protocol](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon)
|
||||||
if `-graphiteListenAddr` is set.
|
if `-graphiteListenAddr` is set.
|
||||||
* [OpenTSDB put message](http://opentsdb.net/docs/build/html/api_telnet/put.html) if `-opentsdbListenAddr` is set.
|
* [OpenTSDB put message](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set.
|
||||||
* [HTTP OpenTSDB /api/put requests](http://opentsdb.net/docs/build/html/api_http/put.html) if `-opentsdbHTTPListenAddr` is set.
|
* [HTTP OpenTSDB /api/put requests](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
|
||||||
|
* [/api/v1/import](#how-to-import-time-series-data)
|
||||||
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
||||||
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
||||||
|
|
||||||
@ -84,6 +85,7 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM
|
|||||||
- [How to work with snapshots?](#how-to-work-with-snapshots)
|
- [How to work with snapshots?](#how-to-work-with-snapshots)
|
||||||
- [How to delete time series?](#how-to-delete-time-series)
|
- [How to delete time series?](#how-to-delete-time-series)
|
||||||
- [How to export time series?](#how-to-export-time-series)
|
- [How to export time series?](#how-to-export-time-series)
|
||||||
|
- [How to import time series data?](#how-to-import-time-series-data)
|
||||||
- [Federation](#federation)
|
- [Federation](#federation)
|
||||||
- [Capacity planning](#capacity-planning)
|
- [Capacity planning](#capacity-planning)
|
||||||
- [High availability](#high-availability)
|
- [High availability](#high-availability)
|
||||||
@ -513,6 +515,33 @@ Each JSON line would contain data for a single time series. An example output:
|
|||||||
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
|
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
|
||||||
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
|
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
|
||||||
|
|
||||||
|
Exported data can be imported via POST'ing it to [/api/v1/import](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data).
|
||||||
|
|
||||||
|
|
||||||
|
### How to import time series data?
|
||||||
|
|
||||||
|
Time series data can be imported via any supported ingestion protocol:
|
||||||
|
|
||||||
|
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write)
|
||||||
|
* [Influx line protocol](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf)
|
||||||
|
* [Graphite plaintext protocol](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd)
|
||||||
|
* [OpenTSDB telnet put protocol](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#sending-data-via-telnet-put-protocol)
|
||||||
|
* [OpenTSDB http /api/put](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#sending-opentsdb-data-via-http-apiput-requests)
|
||||||
|
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-export-time-series).
|
||||||
|
|
||||||
|
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:
|
||||||
|
|
||||||
|
```
|
||||||
|
# Export the data from <source-victoriametrics>:
|
||||||
|
curl -s 'http://source-victoriametrics:8428/api/v1/export' -d 'match={__name__!=""}' > exported_data.jsonl
|
||||||
|
|
||||||
|
# Import the data to <destination-victoriametrics>:
|
||||||
|
curl -X POST 'http://destination-victoriametrics:8428/api/v1/import' -T exported_data.jsonl
|
||||||
|
```
|
||||||
|
|
||||||
|
Each request to `/api/v1/import` can load up to a single vCPU core on VictoriaMetrics. Import speed can be improved by splitting the original file into smaller parts
|
||||||
|
and importing them concurrently. Note that the original file must be split on newlines.
|
||||||
|
|
||||||
|
|
||||||
### Federation
|
### Federation
|
||||||
|
|
||||||
|
@ -47,7 +47,7 @@ func (ctx *InsertCtx) marshalMetricNameRaw(prefix []byte, labels []prompb.Label)
|
|||||||
return metricNameRaw[:len(metricNameRaw):len(metricNameRaw)]
|
return metricNameRaw[:len(metricNameRaw):len(metricNameRaw)]
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteDataPoint writes (timestamp, value) with the given prefix and lables into ctx buffer.
|
// WriteDataPoint writes (timestamp, value) with the given prefix and labels into ctx buffer.
|
||||||
func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) {
|
func (ctx *InsertCtx) WriteDataPoint(prefix []byte, labels []prompb.Label, timestamp int64, value float64) {
|
||||||
metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels)
|
metricNameRaw := ctx.marshalMetricNameRaw(prefix, labels)
|
||||||
ctx.addRow(metricNameRaw, timestamp, value)
|
ctx.addRow(metricNameRaw, timestamp, value)
|
||||||
@ -78,6 +78,26 @@ func (ctx *InsertCtx) addRow(metricNameRaw []byte, timestamp int64, value float6
|
|||||||
mr.Value = value
|
mr.Value = value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddLabelBytes adds (name, value) label to ctx.Labels.
|
||||||
|
//
|
||||||
|
// name and value must exist until ctx.Labels is used.
|
||||||
|
func (ctx *InsertCtx) AddLabelBytes(name, value []byte) {
|
||||||
|
labels := ctx.Labels
|
||||||
|
if cap(labels) > len(labels) {
|
||||||
|
labels = labels[:len(labels)+1]
|
||||||
|
} else {
|
||||||
|
labels = append(labels, prompb.Label{})
|
||||||
|
}
|
||||||
|
label := &labels[len(labels)-1]
|
||||||
|
|
||||||
|
// Do not copy name and value contents for performance reasons.
|
||||||
|
// This reduces GC overhead on the number of objects and allocations.
|
||||||
|
label.Name = name
|
||||||
|
label.Value = value
|
||||||
|
|
||||||
|
ctx.Labels = labels
|
||||||
|
}
|
||||||
|
|
||||||
// AddLabel adds (name, value) label to ctx.Labels.
|
// AddLabel adds (name, value) label to ctx.Labels.
|
||||||
//
|
//
|
||||||
// name and value must exist until ctx.Labels is used.
|
// name and value must exist until ctx.Labels is used.
|
||||||
|
@ -20,6 +20,17 @@ const defaultBlockSize = 64 * 1024
|
|||||||
//
|
//
|
||||||
// Returns (dstBuf, tailBuf).
|
// Returns (dstBuf, tailBuf).
|
||||||
func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) {
|
func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) {
|
||||||
|
return ReadLinesBlockExt(r, dstBuf, tailBuf, maxLineSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadLinesBlockExt reads a block of lines delimited by '\n' from tailBuf and r into dstBuf.
|
||||||
|
//
|
||||||
|
// Trailing chars after the last newline are put into tailBuf.
|
||||||
|
//
|
||||||
|
// Returns (dstBuf, tailBuf).
|
||||||
|
//
|
||||||
|
// maxLineLen limits the maximum length of a single line.
|
||||||
|
func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) {
|
||||||
if cap(dstBuf) < defaultBlockSize {
|
if cap(dstBuf) < defaultBlockSize {
|
||||||
dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize)
|
dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize)
|
||||||
}
|
}
|
||||||
@ -48,8 +59,8 @@ again:
|
|||||||
nn := bytes.LastIndexByte(dstBuf[len(dstBuf)-n:], '\n')
|
nn := bytes.LastIndexByte(dstBuf[len(dstBuf)-n:], '\n')
|
||||||
if nn < 0 {
|
if nn < 0 {
|
||||||
// Didn't found at least a single line.
|
// Didn't found at least a single line.
|
||||||
if len(dstBuf) > maxLineSize {
|
if len(dstBuf) > maxLineLen {
|
||||||
return dstBuf, tailBuf, fmt.Errorf("too long line: more than %d bytes", maxLineSize)
|
return dstBuf, tailBuf, fmt.Errorf("too long line: more than %d bytes", maxLineLen)
|
||||||
}
|
}
|
||||||
if cap(dstBuf) < 2*len(dstBuf) {
|
if cap(dstBuf) < 2*len(dstBuf) {
|
||||||
// Increase dsbBuf capacity, so more data could be read into it.
|
// Increase dsbBuf capacity, so more data could be read into it.
|
||||||
|
@ -82,7 +82,7 @@ func (ctx *pushCtx) InsertRows(db string) error {
|
|||||||
rows := ctx.Rows.Rows
|
rows := ctx.Rows.Rows
|
||||||
rowsLen := 0
|
rowsLen := 0
|
||||||
for i := range rows {
|
for i := range rows {
|
||||||
rowsLen += len(rows[i].Tags)
|
rowsLen += len(rows[i].Fields)
|
||||||
}
|
}
|
||||||
ic := &ctx.Common
|
ic := &ctx.Common
|
||||||
ic.Reset(rowsLen)
|
ic.Reset(rowsLen)
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheus"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheus"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
@ -67,6 +68,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||||||
}
|
}
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
return true
|
return true
|
||||||
|
case "/api/v1/import":
|
||||||
|
vmimportRequests.Inc()
|
||||||
|
if err := vmimport.InsertHandler(r); err != nil {
|
||||||
|
vmimportErrors.Inc()
|
||||||
|
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return true
|
||||||
case "/write", "/api/v2/write":
|
case "/write", "/api/v2/write":
|
||||||
influxWriteRequests.Inc()
|
influxWriteRequests.Inc()
|
||||||
if err := influx.InsertHandler(r); err != nil {
|
if err := influx.InsertHandler(r); err != nil {
|
||||||
@ -92,6 +101,9 @@ var (
|
|||||||
prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/write", protocol="prometheus"}`)
|
prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/write", protocol="prometheus"}`)
|
||||||
prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/write", protocol="prometheus"}`)
|
prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/write", protocol="prometheus"}`)
|
||||||
|
|
||||||
|
vmimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import", protocol="vm"}`)
|
||||||
|
vmimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import", protocol="vm"}`)
|
||||||
|
|
||||||
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`)
|
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`)
|
||||||
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`)
|
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`)
|
||||||
|
|
||||||
|
202
app/vminsert/vmimport/parser.go
Normal file
202
app/vminsert/vmimport/parser.go
Normal file
@ -0,0 +1,202 @@
|
|||||||
|
package vmimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
"github.com/valyala/fastjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Rows contains parsed rows from `/api/v1/import` request.
|
||||||
|
type Rows struct {
|
||||||
|
Rows []Row
|
||||||
|
|
||||||
|
tu tagsUnmarshaler
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset resets rs.
|
||||||
|
func (rs *Rows) Reset() {
|
||||||
|
for i := range rs.Rows {
|
||||||
|
rs.Rows[i].reset()
|
||||||
|
}
|
||||||
|
rs.Rows = rs.Rows[:0]
|
||||||
|
|
||||||
|
rs.tu.reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal unmarshals influx line protocol rows from s.
|
||||||
|
//
|
||||||
|
// See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
|
||||||
|
//
|
||||||
|
// s must be unchanged until rs is in use.
|
||||||
|
func (rs *Rows) Unmarshal(s string) {
|
||||||
|
rs.tu.reset()
|
||||||
|
rs.Rows = unmarshalRows(rs.Rows[:0], s, &rs.tu)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Row is a single row from `/api/v1/import` request.
|
||||||
|
type Row struct {
|
||||||
|
Tags []Tag
|
||||||
|
Values []float64
|
||||||
|
Timestamps []int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Row) reset() {
|
||||||
|
r.Tags = nil
|
||||||
|
r.Values = r.Values[:0]
|
||||||
|
r.Timestamps = r.Timestamps[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Row) unmarshal(s string, tu *tagsUnmarshaler) error {
|
||||||
|
r.reset()
|
||||||
|
v, err := tu.p.Parse(s)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot parse json line: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal tags
|
||||||
|
metric := v.GetObject("metric")
|
||||||
|
if metric == nil {
|
||||||
|
return fmt.Errorf("missing `metric` object")
|
||||||
|
}
|
||||||
|
tagsStart := len(tu.tagsPool)
|
||||||
|
if err := tu.unmarshalTags(metric); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal `metric`: %s", err)
|
||||||
|
}
|
||||||
|
tags := tu.tagsPool[tagsStart:]
|
||||||
|
r.Tags = tags[:len(tags):len(tags)]
|
||||||
|
if len(r.Tags) == 0 {
|
||||||
|
return fmt.Errorf("missing tags")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal values
|
||||||
|
values := v.GetArray("values")
|
||||||
|
if len(values) == 0 {
|
||||||
|
return fmt.Errorf("missing `values` array")
|
||||||
|
}
|
||||||
|
for i, v := range values {
|
||||||
|
f, err := v.Float64()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal value at position %d: %s", i, err)
|
||||||
|
}
|
||||||
|
r.Values = append(r.Values, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmarshal timestamps
|
||||||
|
timestamps := v.GetArray("timestamps")
|
||||||
|
if len(timestamps) == 0 {
|
||||||
|
return fmt.Errorf("missing `timestamps` array")
|
||||||
|
}
|
||||||
|
for i, v := range timestamps {
|
||||||
|
ts, err := v.Int64()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal timestamp at position %d: %s", i, err)
|
||||||
|
}
|
||||||
|
r.Timestamps = append(r.Timestamps, ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(r.Timestamps) != len(r.Values) {
|
||||||
|
return fmt.Errorf("`timestamps` array size must match `values` array size; got %d; want %d", len(r.Timestamps), len(r.Values))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tag represents `/api/v1/import` tag.
|
||||||
|
type Tag struct {
|
||||||
|
Key []byte
|
||||||
|
Value []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tag *Tag) reset() {
|
||||||
|
// tag.Key and tag.Value point to tu.bytesPool, so there is no need in keeping these byte slices here.
|
||||||
|
tag.Key = nil
|
||||||
|
tag.Value = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type tagsUnmarshaler struct {
|
||||||
|
p fastjson.Parser
|
||||||
|
tagsPool []Tag
|
||||||
|
bytesPool []byte
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tu *tagsUnmarshaler) reset() {
|
||||||
|
for i := range tu.tagsPool {
|
||||||
|
tu.tagsPool[i].reset()
|
||||||
|
}
|
||||||
|
tu.tagsPool = tu.tagsPool[:0]
|
||||||
|
|
||||||
|
tu.bytesPool = tu.bytesPool[:0]
|
||||||
|
tu.err = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tu *tagsUnmarshaler) addTag() *Tag {
|
||||||
|
dst := tu.tagsPool
|
||||||
|
if cap(dst) > len(dst) {
|
||||||
|
dst = dst[:len(dst)+1]
|
||||||
|
} else {
|
||||||
|
dst = append(dst, Tag{})
|
||||||
|
}
|
||||||
|
tag := &dst[len(dst)-1]
|
||||||
|
tu.tagsPool = dst
|
||||||
|
return tag
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tu *tagsUnmarshaler) addBytes(b []byte) []byte {
|
||||||
|
bytesPoolLen := len(tu.bytesPool)
|
||||||
|
tu.bytesPool = append(tu.bytesPool, b...)
|
||||||
|
bCopy := tu.bytesPool[bytesPoolLen:]
|
||||||
|
return bCopy[:len(bCopy):len(bCopy)]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tu *tagsUnmarshaler) unmarshalTags(o *fastjson.Object) error {
|
||||||
|
tu.err = nil
|
||||||
|
o.Visit(func(key []byte, v *fastjson.Value) {
|
||||||
|
tag := tu.addTag()
|
||||||
|
tag.Key = tu.addBytes(key)
|
||||||
|
sb, err := v.StringBytes()
|
||||||
|
if err != nil && tu.err != nil {
|
||||||
|
tu.err = fmt.Errorf("cannot parse value for tag %q: %s", tag.Key, err)
|
||||||
|
}
|
||||||
|
tag.Value = tu.addBytes(sb)
|
||||||
|
})
|
||||||
|
return tu.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func unmarshalRows(dst []Row, s string, tu *tagsUnmarshaler) []Row {
|
||||||
|
for len(s) > 0 {
|
||||||
|
n := strings.IndexByte(s, '\n')
|
||||||
|
if n < 0 {
|
||||||
|
// The last line.
|
||||||
|
return unmarshalRow(dst, s, tu)
|
||||||
|
}
|
||||||
|
dst = unmarshalRow(dst, s[:n], tu)
|
||||||
|
s = s[n+1:]
|
||||||
|
}
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
func unmarshalRow(dst []Row, s string, tu *tagsUnmarshaler) []Row {
|
||||||
|
if len(s) > 0 && s[len(s)-1] == '\r' {
|
||||||
|
s = s[:len(s)-1]
|
||||||
|
}
|
||||||
|
if len(s) == 0 {
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
if cap(dst) > len(dst) {
|
||||||
|
dst = dst[:len(dst)+1]
|
||||||
|
} else {
|
||||||
|
dst = append(dst, Row{})
|
||||||
|
}
|
||||||
|
r := &dst[len(dst)-1]
|
||||||
|
if err := r.unmarshal(s, tu); err != nil {
|
||||||
|
dst = dst[:len(dst)-1]
|
||||||
|
logger.Errorf("cannot unmarshal json line %q: %s; skipping it", s, err)
|
||||||
|
invalidLines.Inc()
|
||||||
|
}
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="vmimport"}`)
|
216
app/vminsert/vmimport/parser_test.go
Normal file
216
app/vminsert/vmimport/parser_test.go
Normal file
@ -0,0 +1,216 @@
|
|||||||
|
package vmimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRowsUnmarshalFailure(t *testing.T) {
|
||||||
|
f := func(s string) {
|
||||||
|
t.Helper()
|
||||||
|
var rows Rows
|
||||||
|
rows.Unmarshal(s)
|
||||||
|
if len(rows.Rows) != 0 {
|
||||||
|
t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try again
|
||||||
|
rows.Unmarshal(s)
|
||||||
|
if len(rows.Rows) != 0 {
|
||||||
|
t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invalid json line
|
||||||
|
f("")
|
||||||
|
f("\n")
|
||||||
|
f("foo\n")
|
||||||
|
f("123")
|
||||||
|
f("[1,3]")
|
||||||
|
f("{}")
|
||||||
|
f("[]")
|
||||||
|
f(`{"foo":"bar"}`)
|
||||||
|
|
||||||
|
// Invalid metric
|
||||||
|
f(`{"metric":123,"values":[1,2],"timestamps":[3,4]}`)
|
||||||
|
f(`{"metric":[123],"values":[1,2],"timestamps":[3,4]}`)
|
||||||
|
f(`{"metric":[],"values":[1,2],"timestamps":[3,4]}`)
|
||||||
|
f(`{"metric":{},"values":[1,2],"timestamps":[3,4]}`)
|
||||||
|
f(`{"metric":null,"values":[1,2],"timestamps":[3,4]}`)
|
||||||
|
f(`{"values":[1,2],"timestamps":[3,4]}`)
|
||||||
|
|
||||||
|
// Invalid values
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":1,"timestamps":[3,4]}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":{"x":1},"timestamps":[3,4]}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":{"x":1},"timestamps":[3,4]}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":null,"timestamps":[3,4]}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"timestamps":[3,4]}`)
|
||||||
|
|
||||||
|
// Invalid timestamps
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[1,2],"timestamps":3}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[1,2],"timestamps":false}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[1,2],"timestamps":{}}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[1,2]}`)
|
||||||
|
|
||||||
|
// values and timestamps count mismatch
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[],"timestamps":[]}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[],"timestamps":[1]}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[2],"timestamps":[]}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[2],"timestamps":[3,4]}`)
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[2,3],"timestamps":[4]}`)
|
||||||
|
|
||||||
|
// Garbage after the line
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[2],"timestamps":[4]}{}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRowsUnmarshalSuccess(t *testing.T) {
|
||||||
|
f := func(s string, rowsExpected *Rows) {
|
||||||
|
t.Helper()
|
||||||
|
var rows Rows
|
||||||
|
rows.Unmarshal(s)
|
||||||
|
if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
|
||||||
|
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try unmarshaling again
|
||||||
|
rows.Unmarshal(s)
|
||||||
|
if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
|
||||||
|
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
rows.Reset()
|
||||||
|
if len(rows.Rows) != 0 {
|
||||||
|
t.Fatalf("non-empty rows after reset: %+v", rows.Rows)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Empty line
|
||||||
|
f("", &Rows{})
|
||||||
|
f("\n\n", &Rows{})
|
||||||
|
f("\n\r\n", &Rows{})
|
||||||
|
|
||||||
|
// Single line with a single tag
|
||||||
|
f(`{"metric":{"foo":"bar"},"values":[1.23],"timestamps":[456]}`, &Rows{
|
||||||
|
Rows: []Row{{
|
||||||
|
Tags: []Tag{{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
}},
|
||||||
|
Values: []float64{1.23},
|
||||||
|
Timestamps: []int64{456},
|
||||||
|
}},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Line with multiple tags
|
||||||
|
f(`{"metric":{"foo":"bar","baz":"xx"},"values":[1.23, -3.21],"timestamps" : [456,789]}`, &Rows{
|
||||||
|
Rows: []Row{{
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("baz"),
|
||||||
|
Value: []byte("xx"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: []float64{1.23, -3.21},
|
||||||
|
Timestamps: []int64{456, 789},
|
||||||
|
}},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Multiple lines
|
||||||
|
f(`{"metric":{"foo":"bar","baz":"xx"},"values":[1.23, -3.21],"timestamps" : [456,789]}
|
||||||
|
{"metric":{"__name__":"xx"},"values":[34],"timestamps" : [11]}
|
||||||
|
`, &Rows{
|
||||||
|
Rows: []Row{
|
||||||
|
{
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("baz"),
|
||||||
|
Value: []byte("xx"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: []float64{1.23, -3.21},
|
||||||
|
Timestamps: []int64{456, 789},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("__name__"),
|
||||||
|
Value: []byte("xx"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: []float64{34},
|
||||||
|
Timestamps: []int64{11},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Multiple lines with invalid line in the middle.
|
||||||
|
f(`{"metric":{"xfoo":"bar","baz":"xx"},"values":[1.232, -3.21],"timestamps" : [456,7890]}
|
||||||
|
garbage here
|
||||||
|
{"metric":{"__name__":"xxy"},"values":[34],"timestamps" : [111]}`, &Rows{
|
||||||
|
Rows: []Row{
|
||||||
|
{
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("xfoo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("baz"),
|
||||||
|
Value: []byte("xx"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: []float64{1.232, -3.21},
|
||||||
|
Timestamps: []int64{456, 7890},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("__name__"),
|
||||||
|
Value: []byte("xxy"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: []float64{34},
|
||||||
|
Timestamps: []int64{111},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// No newline after the second line.
|
||||||
|
f(`{"metric":{"foo":"bar","baz":"xx"},"values":[1.23, -3.21],"timestamps" : [456,789]}
|
||||||
|
{"metric":{"__name__":"xx"},"values":[34],"timestamps" : [11]}`, &Rows{
|
||||||
|
Rows: []Row{
|
||||||
|
{
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("baz"),
|
||||||
|
Value: []byte("xx"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: []float64{1.23, -3.21},
|
||||||
|
Timestamps: []int64{456, 789},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Tags: []Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("__name__"),
|
||||||
|
Value: []byte("xx"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Values: []float64{34},
|
||||||
|
Timestamps: []int64{11},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
25
app/vminsert/vmimport/parser_timing_test.go
Normal file
25
app/vminsert/vmimport/parser_timing_test.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package vmimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkRowsUnmarshal(b *testing.B) {
|
||||||
|
s := `{"metric":{"__name__":"up","job":"node_exporter","instance":"localhost:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]}
|
||||||
|
{"metric":{"__name__":"up","job":"prometheus","instance":"localhost:9090"},"values":[1,1,1],"timestamps":[1549891461511,1549891476511,1549891491511]}
|
||||||
|
{"metric":{"__name__":"up","job":"node_exporter","instance":"foobar.com:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]}
|
||||||
|
{"metric":{"__name__":"up","job":"prometheus","instance":"xxx.yyy.zzz:9090"},"values":[1,1,1],"timestamps":[1549891461511,1549891476511,1549891491511]}
|
||||||
|
`
|
||||||
|
b.SetBytes(int64(len(s)))
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
var rows Rows
|
||||||
|
for pb.Next() {
|
||||||
|
rows.Unmarshal(s)
|
||||||
|
if len(rows.Rows) != 4 {
|
||||||
|
panic(fmt.Errorf("unexpected number of rows parsed; got %d; want 4", len(rows.Rows)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
160
app/vminsert/vmimport/request_handler.go
Normal file
160
app/vminsert/vmimport/request_handler.go
Normal file
@ -0,0 +1,160 @@
|
|||||||
|
package vmimport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
var maxLineLen = flag.Int("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by `/api/v1/import`")
|
||||||
|
|
||||||
|
var (
|
||||||
|
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="vmimport"}`)
|
||||||
|
rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="vmimport"}`)
|
||||||
|
)
|
||||||
|
|
||||||
|
// InsertHandler processes `/api/v1/import` request.
|
||||||
|
//
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
|
||||||
|
func InsertHandler(req *http.Request) error {
|
||||||
|
return concurrencylimiter.Do(func() error {
|
||||||
|
return insertHandlerInternal(req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func insertHandlerInternal(req *http.Request) error {
|
||||||
|
readCalls.Inc()
|
||||||
|
|
||||||
|
r := req.Body
|
||||||
|
if req.Header.Get("Content-Encoding") == "gzip" {
|
||||||
|
zr, err := common.GetGzipReader(r)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read gzipped vmimport data: %s", err)
|
||||||
|
}
|
||||||
|
defer common.PutGzipReader(zr)
|
||||||
|
r = zr
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := getPushCtx()
|
||||||
|
defer putPushCtx(ctx)
|
||||||
|
for ctx.Read(r) {
|
||||||
|
if err := ctx.InsertRows(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ctx.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *pushCtx) InsertRows() error {
|
||||||
|
rows := ctx.Rows.Rows
|
||||||
|
rowsLen := 0
|
||||||
|
for i := range rows {
|
||||||
|
rowsLen += len(rows[i].Values)
|
||||||
|
}
|
||||||
|
ic := &ctx.Common
|
||||||
|
ic.Reset(rowsLen)
|
||||||
|
rowsTotal := 0
|
||||||
|
for i := range rows {
|
||||||
|
r := &rows[i]
|
||||||
|
ic.Labels = ic.Labels[:0]
|
||||||
|
for j := range r.Tags {
|
||||||
|
tag := &r.Tags[j]
|
||||||
|
ic.AddLabelBytes(tag.Key, tag.Value)
|
||||||
|
}
|
||||||
|
ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels)
|
||||||
|
values := r.Values
|
||||||
|
timestamps := r.Timestamps
|
||||||
|
_ = timestamps[len(values)-1]
|
||||||
|
for j, value := range values {
|
||||||
|
timestamp := timestamps[j]
|
||||||
|
ic.WriteDataPoint(ctx.metricNameBuf, nil, timestamp, value)
|
||||||
|
}
|
||||||
|
rowsTotal += len(values)
|
||||||
|
}
|
||||||
|
rowsInserted.Add(rowsTotal)
|
||||||
|
rowsPerInsert.Update(float64(rowsTotal))
|
||||||
|
return ic.FlushBufs()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *pushCtx) Read(r io.Reader) bool {
|
||||||
|
if ctx.err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(r, ctx.reqBuf, ctx.tailBuf, *maxLineLen)
|
||||||
|
if ctx.err != nil {
|
||||||
|
if ctx.err != io.EOF {
|
||||||
|
readErrors.Inc()
|
||||||
|
ctx.err = fmt.Errorf("cannot read vmimport data: %s", ctx.err)
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
readCalls = metrics.NewCounter(`vm_read_calls_total{name="vmimport"}`)
|
||||||
|
readErrors = metrics.NewCounter(`vm_read_errors_total{name="vmimport"}`)
|
||||||
|
)
|
||||||
|
|
||||||
|
type pushCtx struct {
|
||||||
|
Rows Rows
|
||||||
|
Common common.InsertCtx
|
||||||
|
|
||||||
|
reqBuf []byte
|
||||||
|
tailBuf []byte
|
||||||
|
metricNameBuf []byte
|
||||||
|
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *pushCtx) Error() error {
|
||||||
|
if ctx.err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return ctx.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ctx *pushCtx) reset() {
|
||||||
|
ctx.Rows.Reset()
|
||||||
|
ctx.Common.Reset(0)
|
||||||
|
|
||||||
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
|
ctx.tailBuf = ctx.tailBuf[:0]
|
||||||
|
ctx.metricNameBuf = ctx.metricNameBuf[:0]
|
||||||
|
|
||||||
|
ctx.err = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPushCtx() *pushCtx {
|
||||||
|
select {
|
||||||
|
case ctx := <-pushCtxPoolCh:
|
||||||
|
return ctx
|
||||||
|
default:
|
||||||
|
if v := pushCtxPool.Get(); v != nil {
|
||||||
|
return v.(*pushCtx)
|
||||||
|
}
|
||||||
|
return &pushCtx{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func putPushCtx(ctx *pushCtx) {
|
||||||
|
ctx.reset()
|
||||||
|
select {
|
||||||
|
case pushCtxPoolCh <- ctx:
|
||||||
|
default:
|
||||||
|
pushCtxPool.Put(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var pushCtxPool sync.Pool
|
||||||
|
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
|
Loading…
Reference in New Issue
Block a user