diff --git a/README.md b/README.md index 419ed5072..46e403e29 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,8 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM if `-graphiteListenAddr` is set. * [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set. * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set. - * [/api/v1/import](#how-to-import-time-series-data) + * [/api/v1/import](#how-to-import-time-series-data). + * [Arbitrary CSV data](#how-to-import-csv-data). * Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads. * Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster). * See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles). @@ -425,6 +426,55 @@ The `/api/v1/export` endpoint should return the following response: {"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]} ``` + +### How to import CSV data + +Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg. +The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon: + +``` +:: +``` + +* `` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary. +* `` describes the column type. Supported types are: + * `metric` - the corresponding CSV column at `` contains metric value. The metric name is read from the ``. + CSV line must have at least a single metric field. + * `label` - the corresponding CSV column at `` contains label value. The label name is read from the ``. + CSV line may have arbitrary number of label fields. All these fields are attached to all the configured metrics. + * `time` - the corresponding CSV column at `` contains metric time. CSV line may contain either one or zero columns with time. + If CSV line has no time, then the current time is used. The time is applied to all the configured metrics. + The format of the time is configured via ``. Supported time formats are: + * `unix_s` - unix timestamp in seconds. + * `unix_ms` - unix timestamp in milliseconds. + * `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds. + * `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`. + * `custom:` - custom layout for the timestamp. The `` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse). + +Each request to `/api/v1/import/csv` can contain arbitrary number of CSV lines. + +Example for importing CSV data via `/api/v1/import/csv`: + +```bash +curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' +curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' +``` + +After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: + +```bash +curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}' +``` + +The following response should be returned: +```bash +{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]} +{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]} +{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]} +{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]} +``` + + ### Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): @@ -600,6 +650,7 @@ Time series data can be imported via any supported ingestion protocol: * [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol) * [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests) * `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series). +* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details. The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`: diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 10ef9116a..ea216f089 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -21,10 +21,11 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did * Can add, remove and modify labels via Prometheus relabeling. See [these docs](#relabeling) for details. * Accepts data via all the ingestion protocols supported by VictoriaMetrics: * Influx line protocol via `http://:8429/write`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf). - * JSON lines import protocol via `http://:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data). * Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents). * Prometheus remote write protocol via `http://:8429/api/v1/write`. + * JSON lines import protocol via `http://:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data). + * Arbitrary CSV data via `http://:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data). * Can replicate collected metrics simultaneously to multiple remote storage systems. * Works in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as connection @@ -53,7 +54,7 @@ If you need collecting only Influx data, then the following command line would b /path/to/vmagent -remoteWrite.url=https://victoria-metrics-host:8428/api/v1/write ``` -Then send Influx data to `http://vmagent-host:8429/write`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for more details. +Then send Influx data to `http://vmagent-host:8429`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for more details. `vmagent` is also available in [docker images](https://hub.docker.com/r/victoriametrics/vmagent/). diff --git a/app/vmagent/csvimport/request_handler.go b/app/vmagent/csvimport/request_handler.go new file mode 100644 index 000000000..247c998e3 --- /dev/null +++ b/app/vmagent/csvimport/request_handler.go @@ -0,0 +1,63 @@ +package csvimport + +import ( + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="csvimport"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="csvimport"}`) +) + +// InsertHandler processes csv data from req. +func InsertHandler(req *http.Request) error { + return writeconcurrencylimiter.Do(func() error { + return parser.ParseStream(req, insertRows) + }) +} + +func insertRows(rows []parser.Row) error { + ctx := common.GetPushCtx() + defer common.PutPushCtx(ctx) + + tssDst := ctx.WriteRequest.Timeseries[:0] + labels := ctx.Labels[:0] + samples := ctx.Samples[:0] + for i := range rows { + r := &rows[i] + labelsLen := len(labels) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: r.Metric, + }) + for j := range r.Tags { + tag := &r.Tags[j] + labels = append(labels, prompbmarshal.Label{ + Name: tag.Key, + Value: tag.Value, + }) + } + samples = append(samples, prompbmarshal.Sample{ + Value: r.Value, + Timestamp: r.Timestamp, + }) + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[len(samples)-1:], + }) + } + ctx.WriteRequest.Timeseries = tssDst + ctx.Labels = labels + ctx.Samples = samples + remotewrite.Push(&ctx.WriteRequest) + rowsInserted.Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) + return nil +} diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 37ae801b8..b7fc4630e 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb" @@ -127,6 +128,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true + case "/api/v1/import/csv": + csvimportRequests.Inc() + if err := csvimport.InsertHandler(r); err != nil { + csvimportErrors.Inc() + httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true case "/write", "/api/v2/write": influxWriteRequests.Inc() if err := influx.InsertHandlerForHTTP(r); err != nil { @@ -152,11 +162,14 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } var ( - prometheusWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/write", protocol="prometheus"}`) - prometheusWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/write", protocol="prometheus"}`) + prometheusWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/write", protocol="promremotewrite"}`) + prometheusWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/write", protocol="promremotewrite"}`) - vmimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import", protocol="vm"}`) - vmimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import", protocol="vm"}`) + vmimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import", protocol="vmimport"}`) + vmimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import", protocol="vmimport"}`) + + csvimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/csv", protocol="csvimport"}`) + csvimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/csv", protocol="csvimport"}`) influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`) influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`) diff --git a/app/vminsert/csvimport/request_handler.go b/app/vminsert/csvimport/request_handler.go new file mode 100644 index 000000000..4d2644175 --- /dev/null +++ b/app/vminsert/csvimport/request_handler.go @@ -0,0 +1,44 @@ +package csvimport + +import ( + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="csvimport"}`) + rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="csvimport"}`) +) + +// InsertHandler processes /api/v1/import/csv requests. +func InsertHandler(req *http.Request) error { + return writeconcurrencylimiter.Do(func() error { + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(rows) + }) + }) +} + +func insertRows(rows []parser.Row) error { + ctx := common.GetInsertCtx() + defer common.PutInsertCtx(ctx) + + ctx.Reset(len(rows)) + for i := range rows { + r := &rows[i] + ctx.Labels = ctx.Labels[:0] + ctx.AddLabel("", r.Metric) + for j := range r.Tags { + tag := &r.Tags[j] + ctx.AddLabel(tag.Key, tag.Value) + } + ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value) + } + rowsInserted.Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) + return ctx.FlushBufs() +} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 80c4359f2..3b889a499 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -6,6 +6,7 @@ import ( "net/http" "strings" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb" @@ -100,6 +101,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true + case "/api/v1/import/csv": + csvimportRequests.Inc() + if err := csvimport.InsertHandler(r); err != nil { + csvimportErrors.Inc() + httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true case "/write", "/api/v2/write": influxWriteRequests.Inc() if err := influx.InsertHandlerForHTTP(r); err != nil { @@ -127,11 +137,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } var ( - prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/write", protocol="prometheus"}`) - prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/write", protocol="prometheus"}`) + prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/write", protocol="promremotewrite"}`) + prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/write", protocol="promremotewrite"}`) - vmimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import", protocol="vm"}`) - vmimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import", protocol="vm"}`) + vmimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import", protocol="vmimport"}`) + vmimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import", protocol="vmimport"}`) + + csvimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/import/csv", protocol="csvimport"}`) + csvimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/import/csv", protocol="csvimport"}`) influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/write", protocol="influx"}`) influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/write", protocol="influx"}`) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index be0a0f1ef..11d4a9066 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -52,7 +52,8 @@ Cluster version is available [here](https://github.com/VictoriaMetrics/VictoriaM if `-graphiteListenAddr` is set. * [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set. * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set. - * [/api/v1/import](#how-to-import-time-series-data) + * [/api/v1/import](#how-to-import-time-series-data). + * [Arbitrary CSV data](#how-to-import-csv-data). * Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads. * Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster). * See also technical [Articles about VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/Articles). @@ -415,6 +416,55 @@ The `/api/v1/export` endpoint should return the following response: {"metric":{"__name__":"x.y.z","t1":"v1","t2":"v2"},"values":[45.34],"timestamps":[1566464763000]} ``` + +### How to import CSV data + +Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg. +The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon: + +``` +:: +``` + +* `` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary. +* `` describes the column type. Supported types are: + * `metric` - the corresponding CSV column at `` contains metric value. The metric name is read from the ``. + CSV line must have at least a single metric field. + * `label` - the corresponding CSV column at `` contains label value. The label name is read from the ``. + CSV line may have arbitrary number of label fields. All these fields are attached to all the configured metrics. + * `time` - the corresponding CSV column at `` contains metric time. CSV line may contain either one or zero columns with time. + If CSV line has no time, then the current time is used. The time is applied to all the configured metrics. + The format of the time is configured via ``. Supported time formats are: + * `unix_s` - unix timestamp in seconds. + * `unix_ms` - unix timestamp in milliseconds. + * `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds. + * `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`. + * `custom:` - custom layout for the timestamp. The `` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse). + +Each request to `/api/v1/import/csv` can contain arbitrary number of CSV lines. + +Example for importing CSV data via `/api/v1/import/csv`: + +```bash +curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' +curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market' +``` + +After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint: + +```bash +curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}' +``` + +The following response should be returned: +```bash +{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]} +{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]} +{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]} +{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]} +``` + + ### Prometheus querying API usage VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/): @@ -590,6 +640,7 @@ Time series data can be imported via any supported ingestion protocol: * [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol) * [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests) * `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series). +* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details. The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`: diff --git a/lib/protoparser/csvimport/column_descriptor.go b/lib/protoparser/csvimport/column_descriptor.go new file mode 100644 index 000000000..1ee35088b --- /dev/null +++ b/lib/protoparser/csvimport/column_descriptor.go @@ -0,0 +1,172 @@ +package csvimport + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/valyala/fastjson/fastfloat" +) + +// ColumnDescriptor represents parsing rules for a single csv column. +// +// The column is transformed to either timestamp, tag or metric value +// depending on the corresponding non-empty field. +// +// If all the fields are empty, then the given column is ignored. +type ColumnDescriptor struct { + // ParseTimestamp is set to a function, which is used for timestamp + // parsing from the given column. + ParseTimestamp func(s string) (int64, error) + + // TagName is set to tag name for tag value, which should be obtained + // from the given column. + TagName string + + // MetricName is set to metric name for value obtained from the given column. + MetricName string +} + +const maxColumnsPerRow = 64 * 1024 + +// ParseColumnDescriptors parses column descriptors from s. +// +// s must have comma-separated list of the following entries: +// +// :: +// +// Where: +// +// - is numeric csv column position. The first column has position 1. +// - is one of the following types: +// - time - the corresponding column contains timestamp. Timestamp format is determined by . The following formats are supported: +// - unix_s - unix timestamp in seconds +// - unix_ms - unix timestamp in milliseconds +// - unix_ns - unix_timestamp in nanoseconds +// - rfc3339 - RFC3339 format in the form `2006-01-02T15:04:05Z07:00` +// - label - the corresponding column contains metric label with the name set in . +// - metric - the corresponding column contains metric value with the name set in . +// +// s must contain at least a single 'metric' column and no more than a single `time` column. +func ParseColumnDescriptors(s string) ([]ColumnDescriptor, error) { + m := make(map[int]ColumnDescriptor) + cols := strings.Split(s, ",") + hasValueCol := false + hasTimeCol := false + maxPos := 0 + for i, col := range cols { + var cd ColumnDescriptor + a := strings.SplitN(col, ":", 3) + if len(a) != 3 { + return nil, fmt.Errorf("entry #%d must have the following form: ::; got %q", i+1, a) + } + pos, err := strconv.Atoi(a[0]) + if err != nil { + return nil, fmt.Errorf("cannot parse part from the entry #%d %q: %s", i+1, col, err) + } + if pos <= 0 { + return nil, fmt.Errorf(" cannot be smaller than 1; got %d for entry #%d %q", pos, i+1, col) + } + if pos > maxColumnsPerRow { + return nil, fmt.Errorf(" cannot be bigger than %d; got %d for entry #%d %q", maxColumnsPerRow, pos, i+1, col) + } + if pos > maxPos { + maxPos = pos + } + typ := a[1] + switch typ { + case "time": + if hasTimeCol { + return nil, fmt.Errorf("duplicate time column has been found at entry #%d %q for %q", i+1, col, s) + } + parseTimestamp, err := parseTimeFormat(a[2]) + if err != nil { + return nil, fmt.Errorf("cannot parse time format from the entry #%d %q: %s", i+1, col, err) + } + cd.ParseTimestamp = parseTimestamp + hasTimeCol = true + case "label": + cd.TagName = a[2] + if len(cd.TagName) == 0 { + return nil, fmt.Errorf("label name cannot be empty in the entry #%d %q", i+1, col) + } + case "metric": + cd.MetricName = a[2] + if len(cd.MetricName) == 0 { + return nil, fmt.Errorf("metric name cannot be empty in the entry #%d %q", i+1, col) + } + hasValueCol = true + default: + return nil, fmt.Errorf("unknown : %q; allowed values: time, metric, label", typ) + } + pos-- + if _, ok := m[pos]; ok { + return nil, fmt.Errorf("duplicate %d for the entry #%d %q", pos, i+1, col) + } + m[pos] = cd + } + if !hasValueCol { + return nil, fmt.Errorf("missing 'metric' column in %q", s) + } + cds := make([]ColumnDescriptor, maxPos) + for pos, cd := range m { + cds[pos] = cd + } + return cds, nil +} + +func parseTimeFormat(format string) (func(s string) (int64, error), error) { + if strings.HasPrefix(format, "custom:") { + format = format[len("custom:"):] + return newParseCustomTimeFunc(format), nil + } + switch format { + case "unix_s": + return parseUnixTimestampSeconds, nil + case "unix_ms": + return parseUnixTimestampMilliseconds, nil + case "unix_ns": + return parseUnixTimestampNanoseconds, nil + case "rfc3339": + return parseRFC3339, nil + default: + return nil, fmt.Errorf("unknown format for time parsing: %q; supported formats: unix_s, unix_ms, unix_ns, rfc3339", format) + } +} + +func parseUnixTimestampSeconds(s string) (int64, error) { + n := fastfloat.ParseInt64BestEffort(s) + if n > int64(1<<63-1)/1e3 { + return 0, fmt.Errorf("too big unix timestamp in seconds: %d; must be smaller than %d", n, int64(1<<63-1)/1e3) + } + return n * 1e3, nil +} + +func parseUnixTimestampMilliseconds(s string) (int64, error) { + n := fastfloat.ParseInt64BestEffort(s) + return n, nil +} + +func parseUnixTimestampNanoseconds(s string) (int64, error) { + n := fastfloat.ParseInt64BestEffort(s) + return n / 1e6, nil +} + +func parseRFC3339(s string) (int64, error) { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + return 0, fmt.Errorf("cannot parse time in RFC3339 from %q: %s", s, err) + } + return t.UnixNano() / 1e6, nil +} + +func newParseCustomTimeFunc(format string) func(s string) (int64, error) { + return func(s string) (int64, error) { + t, err := time.Parse(format, s) + if err != nil { + return 0, fmt.Errorf("cannot parse time in custom format %q from %q: %s", format, s, err) + } + return t.UnixNano() / 1e6, nil + } +} diff --git a/lib/protoparser/csvimport/column_descriptor_test.go b/lib/protoparser/csvimport/column_descriptor_test.go new file mode 100644 index 000000000..9e996c3d8 --- /dev/null +++ b/lib/protoparser/csvimport/column_descriptor_test.go @@ -0,0 +1,226 @@ +package csvimport + +import ( + "bytes" + "fmt" + "reflect" + "testing" + "time" + "unsafe" +) + +func TestParseColumnDescriptorsSuccess(t *testing.T) { + f := func(s string, cdsExpected []ColumnDescriptor) { + t.Helper() + cds, err := ParseColumnDescriptors(s) + if err != nil { + t.Fatalf("unexpected error on ParseColumnDescriptors(%q): %s", s, err) + } + if !equalColumnDescriptors(cds, cdsExpected) { + t.Fatalf("unexpected cds returned from ParseColumnDescriptors(%q);\ngot\n%v\nwant\n%v", s, cds, cdsExpected) + } + } + f("1:time:unix_s,3:metric:temperature", []ColumnDescriptor{ + { + ParseTimestamp: parseUnixTimestampSeconds, + }, + {}, + { + MetricName: "temperature", + }, + }) + f("2:time:unix_ns,1:metric:temperature,3:label:city,4:label:country", []ColumnDescriptor{ + { + MetricName: "temperature", + }, + { + ParseTimestamp: parseUnixTimestampNanoseconds, + }, + { + TagName: "city", + }, + { + TagName: "country", + }, + }) + f("2:time:unix_ms,1:metric:temperature", []ColumnDescriptor{ + { + MetricName: "temperature", + }, + { + ParseTimestamp: parseUnixTimestampMilliseconds, + }, + }) + f("2:time:rfc3339,1:metric:temperature", []ColumnDescriptor{ + { + MetricName: "temperature", + }, + { + ParseTimestamp: parseRFC3339, + }, + }) +} + +func TestParseColumnDescriptorsFailure(t *testing.T) { + f := func(s string) { + t.Helper() + cds, err := ParseColumnDescriptors(s) + if err == nil { + t.Fatalf("expecting non-nil error for ParseColumnDescriptors(%q)", s) + } + if cds != nil { + t.Fatalf("expecting nil cds; got %v", cds) + } + } + // Empty string + f("") + + // Missing metric column + f("1:time:unix_s") + f("1:label:aaa") + + // Invalid column number + f("foo:time:unix_s,bar:metric:temp") + f("0:metric:aaa") + f("-123:metric:aaa") + f(fmt.Sprintf("%d:metric:aaa", maxColumnsPerRow+10)) + + // Duplicate time column + f("1:time:unix_s,2:time:rfc3339,3:metric:aaa") + f("1:time:custom:2006,2:time:rfc3339,3:metric:aaa") + + // Invalid time format + f("1:time:foobar,2:metric:aaa") + f("1:time:,2:metric:aaa") + f("1:time:sss:sss,2:metric:aaa") + + // empty label name + f("2:label:,1:metric:aaa") + + // Empty metric name + f("1:metric:") + + // Unknown type + f("1:metric:aaa,2:aaaa:bbb") + + // duplicate column number + f("1:metric:a,1:metric:b") +} + +func TestParseUnixTimestampSeconds(t *testing.T) { + f := func(s string, tsExpected int64) { + t.Helper() + ts, err := parseUnixTimestampSeconds(s) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", s, err) + } + if ts != tsExpected { + t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected) + } + } + f("0", 0) + f("123", 123000) + f("-123", -123000) +} + +func TestParseUnixTimestampMilliseconds(t *testing.T) { + f := func(s string, tsExpected int64) { + t.Helper() + ts, err := parseUnixTimestampMilliseconds(s) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", s, err) + } + if ts != tsExpected { + t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected) + } + } + f("0", 0) + f("123", 123) + f("-123", -123) +} + +func TestParseUnixTimestampNanoseconds(t *testing.T) { + f := func(s string, tsExpected int64) { + t.Helper() + ts, err := parseUnixTimestampNanoseconds(s) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", s, err) + } + if ts != tsExpected { + t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected) + } + } + f("0", 0) + f("123", 0) + f("12343567", 12) + f("-12343567", -12) +} + +func TestParseRFC3339(t *testing.T) { + f := func(s string, tsExpected int64) { + t.Helper() + ts, err := parseRFC3339(s) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", s, err) + } + if ts != tsExpected { + t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected) + } + } + f("2006-01-02T15:04:05Z", 1136214245000) + f("2020-03-11T18:23:46Z", 1583951026000) +} + +func TestParseCustomTimeFunc(t *testing.T) { + f := func(format, s string, tsExpected int64) { + t.Helper() + f := newParseCustomTimeFunc(format) + ts, err := f(s) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", s, err) + } + if ts != tsExpected { + t.Fatalf("unexpected ts when parsing %q; got %d; want %d", s, ts, tsExpected) + } + } + f(time.RFC1123, "Mon, 29 Oct 2018 07:50:37 GMT", 1540799437000) + f("2006-01-02 15:04:05.999Z", "2015-08-10 20:04:40.123Z", 1439237080123) +} + +func equalColumnDescriptors(a, b []ColumnDescriptor) bool { + if len(a) != len(b) { + return false + } + for i, x := range a { + y := b[i] + if !equalColumnDescriptor(x, y) { + return false + } + } + return true +} + +func equalColumnDescriptor(x, y ColumnDescriptor) bool { + sh1 := &reflect.SliceHeader{ + Data: uintptr(unsafe.Pointer(&x.ParseTimestamp)), + Len: int(unsafe.Sizeof(x.ParseTimestamp)), + Cap: int(unsafe.Sizeof(x.ParseTimestamp)), + } + b1 := *(*[]byte)(unsafe.Pointer(sh1)) + sh2 := &reflect.SliceHeader{ + Data: uintptr(unsafe.Pointer(&y.ParseTimestamp)), + Len: int(unsafe.Sizeof(y.ParseTimestamp)), + Cap: int(unsafe.Sizeof(y.ParseTimestamp)), + } + b2 := *(*[]byte)(unsafe.Pointer(sh2)) + if !bytes.Equal(b1, b2) { + return false + } + if x.TagName != y.TagName { + return false + } + if x.MetricName != y.MetricName { + return false + } + return true +} diff --git a/lib/protoparser/csvimport/parser.go b/lib/protoparser/csvimport/parser.go new file mode 100644 index 000000000..3a2efde79 --- /dev/null +++ b/lib/protoparser/csvimport/parser.go @@ -0,0 +1,141 @@ +package csvimport + +import ( + "fmt" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/valyala/fastjson/fastfloat" +) + +// Rows represents csv rows. +type Rows struct { + // Rows contains parsed csv rows after the call to Unmarshal. + Rows []Row + + sc scanner + tagsPool []Tag + metricsPool []metric +} + +// Reset resets rs. +func (rs *Rows) Reset() { + rows := rs.Rows + for i := range rows { + r := &rows[i] + r.Metric = "" + r.Tags = nil + r.Value = 0 + r.Timestamp = 0 + } + rs.Rows = rs.Rows[:0] + + rs.sc.Init("") + + tags := rs.tagsPool + for i := range tags { + t := &tags[i] + t.Key = "" + t.Value = "" + } + rs.tagsPool = rs.tagsPool[:0] + + metrics := rs.metricsPool + for i := range metrics { + m := &metrics[i] + m.Name = "" + m.Value = 0 + } + rs.metricsPool = rs.metricsPool[:0] +} + +// Row represents a single metric row +type Row struct { + Metric string + Tags []Tag + Value float64 + Timestamp int64 +} + +// Tag represents metric tag +type Tag struct { + Key string + Value string +} + +type metric struct { + Name string + Value float64 +} + +// Unmarshal unmarshal csv lines from s according to the given cds. +func (rs *Rows) Unmarshal(s string, cds []ColumnDescriptor) { + rs.sc.Init(s) + rs.Rows, rs.tagsPool, rs.metricsPool = parseRows(&rs.sc, rs.Rows[:0], rs.tagsPool[:0], rs.metricsPool[:0], cds) +} + +func parseRows(sc *scanner, dst []Row, tags []Tag, metrics []metric, cds []ColumnDescriptor) ([]Row, []Tag, []metric) { + for sc.NextLine() { + line := sc.Line + var r Row + col := uint(0) + metrics = metrics[:0] + tagsLen := len(tags) + for sc.NextColumn() { + if col >= uint(len(cds)) { + // Skip superflouous column. + continue + } + cd := &cds[col] + col++ + if parseTimestamp := cd.ParseTimestamp; parseTimestamp != nil { + timestamp, err := parseTimestamp(sc.Column) + if err != nil { + sc.Error = fmt.Errorf("cannot parse timestamp from %q: %s", sc.Column, err) + break + } + r.Timestamp = timestamp + continue + } + if tagName := cd.TagName; tagName != "" { + tags = append(tags, Tag{ + Key: tagName, + Value: sc.Column, + }) + continue + } + metricName := cd.MetricName + if metricName == "" { + // The given field is ignored. + continue + } + value := fastfloat.ParseBestEffort(sc.Column) + metrics = append(metrics, metric{ + Name: metricName, + Value: value, + }) + } + if col < uint(len(cds)) && sc.Error == nil { + sc.Error = fmt.Errorf("missing columns in the csv line %q; got %d columns; want at least %d columns", line, col, len(cds)) + } + if sc.Error != nil { + logger.Errorf("error when parsing csv line %q: %s; skipping this line", line, sc.Error) + continue + } + if len(metrics) == 0 { + logger.Panicf("BUG: expecting at least a single metric in columnDescriptors=%#v", cds) + } + r.Metric = metrics[0].Name + r.Tags = tags[tagsLen:] + r.Value = metrics[0].Value + dst = append(dst, r) + for _, m := range metrics[1:] { + dst = append(dst, Row{ + Metric: m.Name, + Tags: r.Tags, + Value: m.Value, + Timestamp: r.Timestamp, + }) + } + } + return dst, tags, metrics +} diff --git a/lib/protoparser/csvimport/parser_test.go b/lib/protoparser/csvimport/parser_test.go new file mode 100644 index 000000000..81dfa8918 --- /dev/null +++ b/lib/protoparser/csvimport/parser_test.go @@ -0,0 +1,171 @@ +package csvimport + +import ( + "reflect" + "testing" +) + +func TestRowsUnmarshalFailure(t *testing.T) { + f := func(format, s string) { + t.Helper() + cds, err := ParseColumnDescriptors(format) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", format, err) + } + var rs Rows + rs.Unmarshal(s, cds) + if len(rs.Rows) != 0 { + t.Fatalf("unexpected rows unmarshaled: %#v", rs.Rows) + } + } + // Invalid timestamp + f("1:metric:foo,2:time:rfc3339", "234,foobar") + + // Missing columns + f("3:metric:aaa", "123,456") +} + +func TestRowsUnmarshalSuccess(t *testing.T) { + f := func(format, s string, rowsExpected []Row) { + t.Helper() + cds, err := ParseColumnDescriptors(format) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", format, err) + } + var rs Rows + rs.Unmarshal(s, cds) + if !reflect.DeepEqual(rs.Rows, rowsExpected) { + t.Fatalf("unexpected rows;\ngot\n%v\nwant\n%v", rs.Rows, rowsExpected) + } + rs.Reset() + + // Unmarshal rows the second time + rs.Unmarshal(s, cds) + if !reflect.DeepEqual(rs.Rows, rowsExpected) { + t.Fatalf("unexpected rows on the second unmarshal;\ngot\n%v\nwant\n%v", rs.Rows, rowsExpected) + } + } + f("1:metric:foo", "", nil) + f("1:metric:foo", `123`, []Row{ + { + Metric: "foo", + Value: 123, + }, + }) + f("1:metric:foo,2:time:unix_s,3:label:foo,4:label:bar", `123,456,xxx,yy`, []Row{ + { + Metric: "foo", + Tags: []Tag{ + { + Key: "foo", + Value: "xxx", + }, + { + Key: "bar", + Value: "yy", + }, + }, + Value: 123, + Timestamp: 456000, + }, + }) + + // Multiple metrics + f("2:metric:bar,1:metric:foo,3:label:foo,4:label:bar,5:time:custom:2006-01-02 15:04:05.999Z", + `"2.34",5.6,"foo"",bar","aa",2015-08-10 20:04:40.123Z`, []Row{ + { + Metric: "foo", + Tags: []Tag{ + { + Key: "foo", + Value: "foo\",bar", + }, + { + Key: "bar", + Value: "aa", + }, + }, + Value: 2.34, + Timestamp: 1439237080123, + }, + { + Metric: "bar", + Tags: []Tag{ + { + Key: "foo", + Value: "foo\",bar", + }, + { + Key: "bar", + Value: "aa", + }, + }, + Value: 5.6, + Timestamp: 1439237080123, + }, + }) + f("2:label:symbol,3:time:custom:2006-01-02 15:04:05.999Z,4:metric:bid,5:metric:ask", + ` +"aaa","AUDCAD","2015-08-10 00:00:01.000Z",0.9725,0.97273 +"aaa","AUDCAD","2015-08-10 00:00:02.000Z",0.97253,0.97276 +`, []Row{ + { + Metric: "bid", + Tags: []Tag{ + { + Key: "symbol", + Value: "AUDCAD", + }, + }, + Value: 0.9725, + Timestamp: 1439164801000, + }, + { + Metric: "ask", + Tags: []Tag{ + { + Key: "symbol", + Value: "AUDCAD", + }, + }, + Value: 0.97273, + Timestamp: 1439164801000, + }, + { + Metric: "bid", + Tags: []Tag{ + { + Key: "symbol", + Value: "AUDCAD", + }, + }, + Value: 0.97253, + Timestamp: 1439164802000, + }, + { + Metric: "ask", + Tags: []Tag{ + { + Key: "symbol", + Value: "AUDCAD", + }, + }, + Value: 0.97276, + Timestamp: 1439164802000, + }, + }) + + // Superflouos columns + f("1:metric:foo", `123,456,foo,bar`, []Row{ + { + Metric: "foo", + Value: 123, + }, + }) + f("2:metric:foo", `123,-45.6,foo,bar`, []Row{ + { + Metric: "foo", + Value: -45.6, + }, + }) +} diff --git a/lib/protoparser/csvimport/parser_timing_test.go b/lib/protoparser/csvimport/parser_timing_test.go new file mode 100644 index 000000000..9e47fcb51 --- /dev/null +++ b/lib/protoparser/csvimport/parser_timing_test.go @@ -0,0 +1,31 @@ +package csvimport + +import ( + "fmt" + "testing" +) + +func BenchmarkRowsUnmarshal(b *testing.B) { + cds, err := ParseColumnDescriptors("1:label:symbol,2:metric:bid,3:metric:ask,4:time:unix_ms") + if err != nil { + b.Fatalf("cannot parse column descriptors: %s", err) + } + s := `GOOG,123.456,789.234,1345678999003 +GOOG,223.456,889.234,1345678939003 +GOOG,323.456,989.234,1345678949003 +MSFT,423.456,189.234,1345678959003 +AMZN,523.456,189.234,1345678959005 +` + const rowsExpected = 10 + b.SetBytes(int64(len(s))) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var rs Rows + for pb.Next() { + rs.Unmarshal(s, cds) + if len(rs.Rows) != rowsExpected { + panic(fmt.Errorf("unexpected rows parsed; got %d; want %d; rows: %v", len(rs.Rows), rowsExpected, rs.Rows)) + } + } + }) +} diff --git a/lib/protoparser/csvimport/scanner.go b/lib/protoparser/csvimport/scanner.go new file mode 100644 index 000000000..c1ab924d4 --- /dev/null +++ b/lib/protoparser/csvimport/scanner.go @@ -0,0 +1,127 @@ +package csvimport + +import ( + "fmt" + "strings" +) + +// scanner is csv scanner +type scanner struct { + // The line value read after the call to NextLine() + Line string + + // The column value read after the call to NextColumn() + Column string + + // Error may be set only on NextColumn call. + // It is cleared on NextLine call. + Error error + + s string +} + +// Init initializes sc with s +func (sc *scanner) Init(s string) { + sc.Line = "" + sc.Column = "" + sc.Error = nil + sc.s = s +} + +// NextLine advances csv scanner to the next line and sets cs.Line to it. +// +// It clears sc.Error. +// +// false is returned if no more lines left in sc.s +func (sc *scanner) NextLine() bool { + s := sc.s + sc.Error = nil + for len(s) > 0 { + n := strings.IndexByte(s, '\n') + var line string + if n >= 0 { + line = trimTrailingSpace(s[:n]) + s = s[n+1:] + } else { + line = trimTrailingSpace(s) + s = "" + } + sc.Line = line + sc.s = s + if len(line) > 0 { + return true + } + } + return false +} + +// NextColumn advances sc.Line to the next Column and sets sc.Column to it. +// +// false is returned if no more columns left in sc.Line or if any error occurs. +// sc.Error is set to error in the case of error. +func (sc *scanner) NextColumn() bool { + s := sc.Line + if len(s) == 0 { + return false + } + if sc.Error != nil { + return false + } + if s[0] == '"' { + sc.Column, sc.Line, sc.Error = readQuotedField(s) + return sc.Error == nil + } + n := strings.IndexByte(s, ',') + if n >= 0 { + sc.Column = s[:n] + sc.Line = s[n+1:] + } else { + sc.Column = s + sc.Line = "" + } + return true +} + +func trimTrailingSpace(s string) string { + if len(s) > 0 && s[len(s)-1] == '\r' { + return s[:len(s)-1] + } + return s +} + +func readQuotedField(s string) (string, string, error) { + sOrig := s + if len(s) == 0 || s[0] != '"' { + return "", sOrig, fmt.Errorf("missing opening quote for %q", sOrig) + } + s = s[1:] + hasEscapedQuote := false + for { + n := strings.IndexByte(s, '"') + if n < 0 { + return "", sOrig, fmt.Errorf("missing closing quote for %q", sOrig) + } + s = s[n+1:] + if len(s) == 0 { + // The end of string found + return unquote(sOrig[1:len(sOrig)-1], hasEscapedQuote), "", nil + } + if s[0] == '"' { + // Take into account escaped quote + s = s[1:] + hasEscapedQuote = true + continue + } + if s[0] != ',' { + return "", sOrig, fmt.Errorf("missing comma after quoted field in %q", sOrig) + } + return unquote(sOrig[1:len(sOrig)-len(s)-1], hasEscapedQuote), s[1:], nil + } +} + +func unquote(s string, hasEscapedQuote bool) string { + if !hasEscapedQuote { + return s + } + return strings.ReplaceAll(s, `""`, `"`) +} diff --git a/lib/protoparser/csvimport/scanner_test.go b/lib/protoparser/csvimport/scanner_test.go new file mode 100644 index 000000000..8e4381ec0 --- /dev/null +++ b/lib/protoparser/csvimport/scanner_test.go @@ -0,0 +1,85 @@ +package csvimport + +import ( + "testing" +) + +func TestScannerSuccess(t *testing.T) { + var sc scanner + sc.Init("foo,bar\n\"aa,\"\"bb\",\"\"") + if !sc.NextLine() { + t.Fatalf("expecting the first line") + } + if sc.Line != "foo,bar" { + t.Fatalf("unexpected line; got %q; want %q", sc.Line, "foo,bar") + } + if !sc.NextColumn() { + t.Fatalf("expecting the first column") + } + if sc.Column != "foo" { + t.Fatalf("unexpected first column; got %q; want %q", sc.Column, "foo") + } + if !sc.NextColumn() { + t.Fatalf("expecting the second column") + } + if sc.Column != "bar" { + t.Fatalf("unexpected second column; got %q; want %q", sc.Column, "bar") + } + if sc.NextColumn() { + t.Fatalf("unexpected next column: %q", sc.Column) + } + if sc.Error != nil { + t.Fatalf("unexpected error: %s", sc.Error) + } + if !sc.NextLine() { + t.Fatalf("expecting the second line") + } + if sc.Line != "\"aa,\"\"bb\",\"\"" { + t.Fatalf("unexpected the second line; got %q; want %q", sc.Line, "\"aa,\"\"bb\",\"\"") + } + if !sc.NextColumn() { + t.Fatalf("expecting the first column on the second line") + } + if sc.Column != "aa,\"bb" { + t.Fatalf("unexpected column on the second line; got %q; want %q", sc.Column, "aa,\"bb") + } + if !sc.NextColumn() { + t.Fatalf("expecting the second column on the second line") + } + if sc.Column != "" { + t.Fatalf("unexpected column on the second line; got %q; want %q", sc.Column, "") + } + if sc.NextColumn() { + t.Fatalf("unexpected next column on the second line: %q", sc.Column) + } + if sc.Error != nil { + t.Fatalf("unexpected error: %s", sc.Error) + } + if sc.NextLine() { + t.Fatalf("unexpected next line: %q", sc.Line) + } +} + +func TestScannerFailure(t *testing.T) { + f := func(s string) { + t.Helper() + var sc scanner + sc.Init(s) + for sc.NextLine() { + for sc.NextColumn() { + } + if sc.Error != nil { + if sc.NextColumn() { + t.Fatalf("unexpected NextColumn success after the error %v", sc.Error) + } + return + } + } + t.Fatalf("expecting at least a single error") + } + // Unclosed quote + f("foo\r\n\"bar,") + f(`"foo,"bar`) + f(`foo,"bar",""a`) + f(`foo,"bar","a""`) +} diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go new file mode 100644 index 000000000..9c6eb30b8 --- /dev/null +++ b/lib/protoparser/csvimport/streamparser.go @@ -0,0 +1,124 @@ +package csvimport + +import ( + "fmt" + "io" + "net/http" + "runtime" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/metrics" +) + +// ParseStream parses csv from req and calls callback for the parsed rows. +// +// The callback can be called multiple times for streamed data from req. +// +// callback shouldn't hold rows after returning. +func ParseStream(req *http.Request, callback func(rows []Row) error) error { + readCalls.Inc() + q := req.URL.Query() + format := q.Get("format") + cds, err := ParseColumnDescriptors(format) + if err != nil { + return fmt.Errorf("cannot parse the provided csv format: %s", err) + } + r := req.Body + if req.Header.Get("Content-Encoding") == "gzip" { + zr, err := common.GetGzipReader(r) + if err != nil { + return fmt.Errorf("cannot read gzipped csv data: %s", err) + } + defer common.PutGzipReader(zr) + r = zr + } + + ctx := getStreamContext() + defer putStreamContext(ctx) + for ctx.Read(r, cds) { + if err := callback(ctx.Rows.Rows); err != nil { + return err + } + } + return ctx.Error() +} + +func (ctx *streamContext) Read(r io.Reader, cds []ColumnDescriptor) bool { + if ctx.err != nil { + return false + } + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) + if ctx.err != nil { + if ctx.err != io.EOF { + readErrors.Inc() + ctx.err = fmt.Errorf("cannot read csv data: %s", ctx.err) + } + return false + } + ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf), cds) + rowsRead.Add(len(ctx.Rows.Rows)) + + // Set missing timestamps + currentTs := time.Now().UnixNano() / 1e6 + for i := range ctx.Rows.Rows { + row := &ctx.Rows.Rows[i] + if row.Timestamp == 0 { + row.Timestamp = currentTs + } + } + return true +} + +var ( + readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="csvimport"}`) + readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="csvimport"}`) + rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="csvimport"}`) +) + +type streamContext struct { + Rows Rows + reqBuf []byte + tailBuf []byte + err error +} + +func (ctx *streamContext) Error() error { + if ctx.err == io.EOF { + return nil + } + return ctx.err +} + +func (ctx *streamContext) reset() { + ctx.Rows.Reset() + ctx.reqBuf = ctx.reqBuf[:0] + ctx.tailBuf = ctx.tailBuf[:0] + ctx.err = nil +} + +func getStreamContext() *streamContext { + select { + case ctx := <-streamContextPoolCh: + return ctx + default: + if v := streamContextPool.Get(); v != nil { + return v.(*streamContext) + } + return &streamContext{} + } +} + +func putStreamContext(ctx *streamContext) { + ctx.reset() + select { + case streamContextPoolCh <- ctx: + default: + streamContextPool.Put(ctx) + } +} + +var streamContextPool sync.Pool +var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))