app/vmselect: add optional max_rows_per_line query arg to /api/v1/export

This arg allows limiting the number of data points that may be exported on a single line.
This commit is contained in:
Aliaksandr Valialkin 2020-03-10 21:45:15 +02:00
parent 1fe66fb3cc
commit e17702fada
2 changed files with 37 additions and 3 deletions

View File

@ -629,6 +629,9 @@ Each JSON line would contain data for a single time series. An example output:
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values. unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
Optional `max_rows_per_line` arg may be added to the request in order to limit the maximum number of rows exported per each JSON line.
By default each JSON line contains all the rows for a single time series.
Pass `Accept-Encoding: gzip` HTTP header in the request to `/api/v1/export` in order to reduce network bandwidth during exporing big amounts Pass `Accept-Encoding: gzip` HTTP header in the request to `/api/v1/export` in order to reduce network bandwidth during exporing big amounts
of time series data. This enables gzip compression for the exported data. Example for exporting gzipped data: of time series data. This enables gzip compression for the exported data. Example for exporting gzipped data:

View File

@ -3,6 +3,7 @@ package prometheus
import ( import (
"flag" "flag"
"fmt" "fmt"
"io"
"math" "math"
"net/http" "net/http"
"runtime" "runtime"
@ -18,6 +19,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql" "github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson/fastfloat"
"github.com/valyala/quicktemplate" "github.com/valyala/quicktemplate"
) )
@ -129,11 +131,12 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
return err return err
} }
format := r.FormValue("format") format := r.FormValue("format")
maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
deadline := getDeadlineForExport(r) deadline := getDeadlineForExport(r)
if start >= end { if start >= end {
end = start + defaultStep end = start + defaultStep
} }
if err := exportHandler(w, matches, start, end, format, deadline); err != nil { if err := exportHandler(w, matches, start, end, format, maxRowsPerLine, deadline); err != nil {
return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %s", matches, start, end, err) return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %s", matches, start, end, err)
} }
exportDuration.UpdateDuration(startTime) exportDuration.UpdateDuration(startTime)
@ -142,9 +145,37 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request)
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, deadline netstorage.Deadline) error { func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error {
writeResponseFunc := WriteExportStdResponse writeResponseFunc := WriteExportStdResponse
writeLineFunc := WriteExportJSONLine writeLineFunc := WriteExportJSONLine
if maxRowsPerLine > 0 {
writeLineFunc = func(w io.Writer, rs *netstorage.Result) {
valuesOrig := rs.Values
timestampsOrig := rs.Timestamps
values := valuesOrig
timestamps := timestampsOrig
for len(values) > 0 {
var valuesChunk []float64
var timestampsChunk []int64
if len(values) > maxRowsPerLine {
valuesChunk = values[:maxRowsPerLine]
timestampsChunk = timestamps[:maxRowsPerLine]
values = values[maxRowsPerLine:]
timestamps = timestamps[maxRowsPerLine:]
} else {
valuesChunk = values
timestampsChunk = timestamps
values = nil
timestamps = nil
}
rs.Values = valuesChunk
rs.Timestamps = timestampsChunk
WriteExportJSONLine(w, rs)
}
rs.Values = valuesOrig
rs.Timestamps = timestampsOrig
}
}
contentType := "application/stream+json" contentType := "application/stream+json"
if format == "prometheus" { if format == "prometheus" {
contentType = "text/plain" contentType = "text/plain"
@ -576,7 +607,7 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e
start -= offset start -= offset
end := start end := start
start = end - window start = end - window
if err := exportHandler(w, []string{childQuery}, start, end, "promapi", deadline); err != nil { if err := exportHandler(w, []string{childQuery}, start, end, "promapi", 0, deadline); err != nil {
return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %s", childQuery, start, end, err) return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %s", childQuery, start, end, err)
} }
queryDuration.UpdateDuration(startTime) queryDuration.UpdateDuration(startTime)