app/vmselect: add optional max_rows_per_line query arg to /api/v1/export

This arg allows limiting the number of data points that may be exported on a single line.
This commit is contained in:
Aliaksandr Valialkin 2020-03-10 21:45:15 +02:00
parent 2f0a36044c
commit f6410ff2bf

View File

@ -3,6 +3,7 @@ package prometheus
import ( import (
"flag" "flag"
"fmt" "fmt"
"io"
"math" "math"
"net/http" "net/http"
"runtime" "runtime"
@ -20,6 +21,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql" "github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson/fastfloat"
"github.com/valyala/quicktemplate" "github.com/valyala/quicktemplate"
) )
@ -138,11 +140,12 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
return err return err
} }
format := r.FormValue("format") format := r.FormValue("format")
maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
deadline := getDeadlineForExport(r) deadline := getDeadlineForExport(r)
if start >= end { if start >= end {
end = start + defaultStep end = start + defaultStep
} }
if err := exportHandler(at, w, matches, start, end, format, deadline); err != nil { if err := exportHandler(at, w, matches, start, end, format, maxRowsPerLine, deadline); err != nil {
return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %s", matches, start, end, err) return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %s", matches, start, end, err)
} }
exportDuration.UpdateDuration(startTime) exportDuration.UpdateDuration(startTime)
@ -151,9 +154,37 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, start, end int64, format string, deadline netstorage.Deadline) error { func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error {
writeResponseFunc := WriteExportStdResponse writeResponseFunc := WriteExportStdResponse
writeLineFunc := WriteExportJSONLine writeLineFunc := WriteExportJSONLine
if maxRowsPerLine > 0 {
writeLineFunc = func(w io.Writer, rs *netstorage.Result) {
valuesOrig := rs.Values
timestampsOrig := rs.Timestamps
values := valuesOrig
timestamps := timestampsOrig
for len(values) > 0 {
var valuesChunk []float64
var timestampsChunk []int64
if len(values) > maxRowsPerLine {
valuesChunk = values[:maxRowsPerLine]
timestampsChunk = timestamps[:maxRowsPerLine]
values = values[maxRowsPerLine:]
timestamps = timestamps[maxRowsPerLine:]
} else {
valuesChunk = values
timestampsChunk = timestamps
values = nil
timestamps = nil
}
rs.Values = valuesChunk
rs.Timestamps = timestampsChunk
WriteExportJSONLine(w, rs)
}
rs.Values = valuesOrig
rs.Timestamps = timestampsOrig
}
}
contentType := "application/stream+json" contentType := "application/stream+json"
if format == "prometheus" { if format == "prometheus" {
contentType = "text/plain" contentType = "text/plain"
@ -653,7 +684,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
start -= offset start -= offset
end := start end := start
start = end - window start = end - window
if err := exportHandler(at, w, []string{childQuery}, start, end, "promapi", deadline); err != nil { if err := exportHandler(at, w, []string{childQuery}, start, end, "promapi", 0, deadline); err != nil {
return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %s", childQuery, start, end, err) return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %s", childQuery, start, end, err)
} }
queryDuration.UpdateDuration(startTime) queryDuration.UpdateDuration(startTime)