From cba820e390ed4e862611cc182b7ed4a08415e151 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 10 Jul 2020 12:00:35 +0300 Subject: [PATCH] app/{vminsert,vmagent}: add ability to import data in Prometheus exposition format via `/api/v1/import/prometheus` --- README.md | 31 ++++ app/vmagent/README.md | 1 + app/vmagent/main.go | 13 ++ .../prometheusimport/request_handler.go | 64 +++++++++ app/vminsert/main.go | 13 ++ .../prometheusimport/request_handler.go | 48 +++++++ docs/Single-server-VictoriaMetrics.md | 31 ++++ docs/vmagent.md | 1 + lib/protoparser/prometheus/streamparser.go | 134 ++++++++++++++++++ .../prometheus/streamparser_test.go | 92 ++++++++++++ 10 files changed, 428 insertions(+) create mode 100644 app/vmagent/prometheusimport/request_handler.go create mode 100644 app/vminsert/prometheusimport/request_handler.go create mode 100644 lib/protoparser/prometheus/streamparser.go create mode 100644 lib/protoparser/prometheus/streamparser_test.go diff --git a/README.md b/README.md index 7d667304a..e61a51da8 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,8 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) * [Querying Graphite data](#querying-graphite-data) * [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents) +* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format) +* [How to import CSV data](#how-to-import-csv-data) * [Prometheus querying API usage](#prometheus-querying-api-usage) * [How to build from sources](#how-to-build-from-sources) * [Development build](#development-build) @@ -284,6 +286,8 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la In the future other `*_sd_config` types will be supported. +VictoriaMetrics also supports [importing data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format). + See also [vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md), which can be used as drop-in replacement for Prometheus. ### How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) @@ -509,6 +513,32 @@ The following response should be returned: Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. +### How to import data in Prometheus exposition format + +VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) +via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics: + +```bash +curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus' +``` + +The following command may be used for verifying the imported data: + +```bash +curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}' +``` + +It should return somethins like the following: + +``` +{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]} +``` + +VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. + +VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). + + ### Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): @@ -701,6 +731,7 @@ Time series data can be imported via any supported ingestion protocol: * [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests) * `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series). * `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details. +* `/api/v1/import/prometheus` http POST handler, which accepts data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`: diff --git a/app/vmagent/README.md b/app/vmagent/README.md index ea2801ba5..27aced8ef 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -26,6 +26,7 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents). * Prometheus remote write protocol via `http://:8429/api/v1/write`. * JSON lines import protocol via `http://:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data). + * Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details. * Arbitrary CSV data via `http://:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data). * Can replicate collected metrics simultaneously to multiple remote storage systems. * Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 68a06ddf5..6552ca7b0 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport" @@ -158,6 +159,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true + case "/api/v1/import/prometheus": + prometheusimportRequests.Inc() + if err := prometheusimport.InsertHandler(r); err != nil { + prometheusimportErrors.Inc() + httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true case "/write", "/api/v2/write": influxWriteRequests.Inc() if err := influx.InsertHandlerForHTTP(r); err != nil { @@ -197,6 +207,9 @@ var ( csvimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/csv", protocol="csvimport"}`) csvimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/csv", protocol="csvimport"}`) + prometheusimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) + prometheusimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) + influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`) influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`) diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go new file mode 100644 index 000000000..af672e3c5 --- /dev/null +++ b/app/vmagent/prometheusimport/request_handler.go @@ -0,0 +1,64 @@ +package prometheusimport + +import ( + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="prometheus"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="prometheus"}`) +) + +// InsertHandler processes `/api/v1/import/prometheus` request. +func InsertHandler(req *http.Request) error { + return writeconcurrencylimiter.Do(func() error { + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + return parser.ParseStream(req.Body, isGzipped, insertRows) + }) +} + +func insertRows(rows []parser.Row) error { + ctx := common.GetPushCtx() + defer common.PutPushCtx(ctx) + + tssDst := ctx.WriteRequest.Timeseries[:0] + labels := ctx.Labels[:0] + samples := ctx.Samples[:0] + for i := range rows { + r := &rows[i] + labelsLen := len(labels) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: r.Metric, + }) + for j := range r.Tags { + tag := &r.Tags[j] + labels = append(labels, prompbmarshal.Label{ + Name: tag.Key, + Value: tag.Value, + }) + } + samples = append(samples, prompbmarshal.Sample{ + Value: r.Value, + Timestamp: r.Timestamp, + }) + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[len(samples)-1:], + }) + } + ctx.WriteRequest.Timeseries = tssDst + ctx.Labels = labels + ctx.Samples = samples + remotewrite.Push(&ctx.WriteRequest) + rowsInserted.Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) + return nil +} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index bcbf7ec05..61587a97e 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prompush" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" @@ -114,6 +115,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true + case "/api/v1/import/prometheus": + prometheusimportRequests.Inc() + if err := prometheusimport.InsertHandler(r); err != nil { + prometheusimportErrors.Inc() + httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true case "/write", "/api/v2/write": influxWriteRequests.Inc() if err := influx.InsertHandlerForHTTP(r); err != nil { @@ -155,6 +165,9 @@ var ( csvimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/csv", protocol="csvimport"}`) csvimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/csv", protocol="csvimport"}`) + prometheusimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) + prometheusimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`) + influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`) influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`) diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go new file mode 100644 index 000000000..98054b22e --- /dev/null +++ b/app/vminsert/prometheusimport/request_handler.go @@ -0,0 +1,48 @@ +package prometheusimport + +import ( + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="prometheus"}`) + rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="prometheus"}`) +) + +// InsertHandler processes `/api/v1/import/prometheus` request. +func InsertHandler(req *http.Request) error { + return writeconcurrencylimiter.Do(func() error { + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + return parser.ParseStream(req.Body, isGzipped, insertRows) + }) +} + +func insertRows(rows []parser.Row) error { + ctx := common.GetInsertCtx() + defer common.PutInsertCtx(ctx) + + ctx.Reset(len(rows)) + for i := range rows { + r := &rows[i] + ctx.Labels = ctx.Labels[:0] + ctx.AddLabel("", r.Metric) + for j := range r.Tags { + tag := &r.Tags[j] + ctx.AddLabel(tag.Key, tag.Value) + } + ctx.ApplyRelabeling() + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value) + } + rowsInserted.Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) + return ctx.FlushBufs() +} diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 7d667304a..e61a51da8 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -99,6 +99,8 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) * [Querying Graphite data](#querying-graphite-data) * [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents) +* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format) +* [How to import CSV data](#how-to-import-csv-data) * [Prometheus querying API usage](#prometheus-querying-api-usage) * [How to build from sources](#how-to-build-from-sources) * [Development build](#development-build) @@ -284,6 +286,8 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la In the future other `*_sd_config` types will be supported. +VictoriaMetrics also supports [importing data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format). + See also [vmagent](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmagent/README.md), which can be used as drop-in replacement for Prometheus. ### How to send data from InfluxDB-compatible agents such as [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) @@ -509,6 +513,32 @@ The following response should be returned: Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail. +### How to import data in Prometheus exposition format + +VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format) +via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics: + +```bash +curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus' +``` + +The following command may be used for verifying the imported data: + +```bash +curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}' +``` + +It should return somethins like the following: + +``` +{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]} +``` + +VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming. + +VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter). + + ### Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): @@ -701,6 +731,7 @@ Time series data can be imported via any supported ingestion protocol: * [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests) * `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series). * `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details. +* `/api/v1/import/prometheus` http POST handler, which accepts data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details. The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`: diff --git a/docs/vmagent.md b/docs/vmagent.md index ea2801ba5..27aced8ef 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -26,6 +26,7 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents). * Prometheus remote write protocol via `http://:8429/api/v1/write`. * JSON lines import protocol via `http://:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data). + * Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details. * Arbitrary CSV data via `http://:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data). * Can replicate collected metrics simultaneously to multiple remote storage systems. * Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go new file mode 100644 index 000000000..6d51170f4 --- /dev/null +++ b/lib/protoparser/prometheus/streamparser.go @@ -0,0 +1,134 @@ +package prometheus + +import ( + "errors" + "fmt" + "io" + "net" + "runtime" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/metrics" +) + +// ParseStream parses lines with Prometheus exposition format from r and calls callback for the parsed rows. +// +// The callback can be called multiple times for streamed data from r. +// +// callback shouldn't hold rows after returning. +func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error { + if isGzipped { + zr, err := common.GetGzipReader(r) + if err != nil { + return fmt.Errorf("cannot read gzipped lines with Prometheus exposition format: %w", err) + } + defer common.PutGzipReader(zr) + r = zr + } + ctx := getStreamContext() + defer putStreamContext(ctx) + for ctx.Read(r) { + if err := callback(ctx.Rows.Rows); err != nil { + return err + } + } + return ctx.Error() +} + +const flushTimeout = 3 * time.Second + +func (ctx *streamContext) Read(r io.Reader) bool { + readCalls.Inc() + if ctx.err != nil { + return false + } + if c, ok := r.(net.Conn); ok { + if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil { + readErrors.Inc() + ctx.err = fmt.Errorf("cannot set read deadline: %w", err) + return false + } + } + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) + if ctx.err != nil { + var ne net.Error + if errors.As(ctx.err, &ne) && ne.Timeout() { + // Flush the read data on timeout and try reading again. + ctx.err = nil + } else { + if ctx.err != io.EOF { + readErrors.Inc() + ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %w", ctx.err) + } + return false + } + } + ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) + rowsRead.Add(len(ctx.Rows.Rows)) + + rows := ctx.Rows.Rows + + // Fill missing timestamps with the current timestamp. + currentTimestamp := int64(time.Now().UnixNano() / 1e6) + for i := range rows { + r := &rows[i] + if r.Timestamp == 0 { + r.Timestamp = currentTimestamp + } + } + return true +} + +type streamContext struct { + Rows Rows + reqBuf []byte + tailBuf []byte + err error +} + +func (ctx *streamContext) Error() error { + if ctx.err == io.EOF { + return nil + } + return ctx.err +} + +func (ctx *streamContext) reset() { + ctx.Rows.Reset() + ctx.reqBuf = ctx.reqBuf[:0] + ctx.tailBuf = ctx.tailBuf[:0] + ctx.err = nil +} + +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="prometheus"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="prometheus"}`) + rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="prometheus"}`) +) + +func getStreamContext() *streamContext { + select { + case ctx := <-streamContextPoolCh: + return ctx + default: + if v := streamContextPool.Get(); v != nil { + return v.(*streamContext) + } + return &streamContext{} + } +} + +func putStreamContext(ctx *streamContext) { + ctx.reset() + select { + case streamContextPoolCh <- ctx: + default: + streamContextPool.Put(ctx) + } +} + +var streamContextPool sync.Pool +var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) diff --git a/lib/protoparser/prometheus/streamparser_test.go b/lib/protoparser/prometheus/streamparser_test.go new file mode 100644 index 000000000..3a120d590 --- /dev/null +++ b/lib/protoparser/prometheus/streamparser_test.go @@ -0,0 +1,92 @@ +package prometheus + +import ( + "bytes" + "compress/gzip" + "reflect" + "testing" +) + +func TestParseStream(t *testing.T) { + f := func(s string, rowsExpected []Row) { + t.Helper() + bb := bytes.NewBufferString(s) + var result []Row + err := ParseStream(bb, false, func(rows []Row) error { + result = appendRowCopies(result, rows) + return nil + }) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", s, err) + } + if !reflect.DeepEqual(result, rowsExpected) { + t.Fatalf("unexpected rows parsed; got\n%v\nwant\n%v", result, rowsExpected) + } + + // Parse compressed stream. + bb.Reset() + zw := gzip.NewWriter(bb) + if _, err := zw.Write([]byte(s)); err != nil { + t.Fatalf("unexpected error when gzipping %q: %s", s, err) + } + if err := zw.Close(); err != nil { + t.Fatalf("unexpected error when closing gzip writer: %s", err) + } + result = nil + err = ParseStream(bb, true, func(rows []Row) error { + result = appendRowCopies(result, rows) + return nil + }) + if err != nil { + t.Fatalf("unexpected error when parsing compressed %q: %s", s, err) + } + if !reflect.DeepEqual(result, rowsExpected) { + t.Fatalf("unexpected rows parsed; got\n%v\nwant\n%v", result, rowsExpected) + } + } + + f("", nil) + f("foo 123 456", []Row{{ + Metric: "foo", + Value: 123, + Timestamp: 456, + }}) + f(`foo{bar="baz"} 1 2`+"\n"+`aaa{} 3 4`, []Row{ + { + Metric: "foo", + Tags: []Tag{{ + Key: "bar", + Value: "baz", + }}, + Value: 1, + Timestamp: 2, + }, + { + Metric: "aaa", + Value: 3, + Timestamp: 4, + }, + }) +} + +func appendRowCopies(dst, src []Row) []Row { + for _, r := range src { + // Make a copy of r, since r may contain garbage after returning from the callback to ParseStream. + var rCopy Row + rCopy.Metric = copyString(r.Metric) + rCopy.Value = r.Value + rCopy.Timestamp = r.Timestamp + for _, tag := range r.Tags { + rCopy.Tags = append(rCopy.Tags, Tag{ + Key: copyString(tag.Key), + Value: copyString(tag.Value), + }) + } + dst = append(dst, rCopy) + } + return dst +} + +func copyString(s string) string { + return string(append([]byte(nil), s...)) +}