diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index f7ec7b203a..e9373d6cb8 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -3,6 +3,7 @@ package prometheus import ( "flag" "fmt" + "io" "math" "net/http" "runtime" @@ -20,6 +21,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" + "github.com/valyala/fastjson/fastfloat" "github.com/valyala/quicktemplate" ) @@ -138,11 +140,12 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r return err } format := r.FormValue("format") + maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line"))) deadline := getDeadlineForExport(r) if start >= end { end = start + defaultStep } - if err := exportHandler(at, w, matches, start, end, format, deadline); err != nil { + if err := exportHandler(at, w, matches, start, end, format, maxRowsPerLine, deadline); err != nil { return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %s", matches, start, end, err) } exportDuration.UpdateDuration(startTime) @@ -151,9 +154,37 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) -func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, start, end int64, format string, deadline netstorage.Deadline) error { +func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error { writeResponseFunc := WriteExportStdResponse writeLineFunc := WriteExportJSONLine + if maxRowsPerLine > 0 { + writeLineFunc = func(w io.Writer, rs *netstorage.Result) { + valuesOrig := rs.Values + timestampsOrig := rs.Timestamps + values := valuesOrig + timestamps := timestampsOrig + for len(values) > 0 { + var valuesChunk []float64 + var timestampsChunk []int64 + if len(values) > maxRowsPerLine { + valuesChunk = values[:maxRowsPerLine] + timestampsChunk = timestamps[:maxRowsPerLine] + values = values[maxRowsPerLine:] + timestamps = timestamps[maxRowsPerLine:] + } else { + valuesChunk = values + timestampsChunk = timestamps + values = nil + timestamps = nil + } + rs.Values = valuesChunk + rs.Timestamps = timestampsChunk + WriteExportJSONLine(w, rs) + } + rs.Values = valuesOrig + rs.Timestamps = timestampsOrig + } + } contentType := "application/stream+json" if format == "prometheus" { contentType = "text/plain" @@ -653,7 +684,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r start -= offset end := start start = end - window - if err := exportHandler(at, w, []string{childQuery}, start, end, "promapi", deadline); err != nil { + if err := exportHandler(at, w, []string{childQuery}, start, end, "promapi", 0, deadline); err != nil { return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %s", childQuery, start, end, err) } queryDuration.UpdateDuration(startTime)