From e969ef26399e1aa8adbfae1d8a7b03369e38d0bc Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 10 Aug 2020 20:57:18 +0300 Subject: [PATCH] app/vmselect: reduce memory usage when exporting time series with big number of samples via `/api/v1/export` if `max_rows_per_line` is set to non-zero value Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/685 --- app/vmselect/prometheus/prometheus.go | 29 ++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index f4238166a..940be3792 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -3,7 +3,6 @@ package prometheus import ( "flag" "fmt" - "io" "math" "net/http" "runtime" @@ -154,9 +153,13 @@ var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error { writeResponseFunc := WriteExportStdResponse - writeLineFunc := WriteExportJSONLine + writeLineFunc := func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { + bb := quicktemplate.AcquireByteBuffer() + WriteExportJSONLine(bb, rs) + resultsCh <- bb + } if maxRowsPerLine > 0 { - writeLineFunc = func(w io.Writer, rs *netstorage.Result) { + writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { valuesOrig := rs.Values timestampsOrig := rs.Timestamps values := valuesOrig @@ -177,7 +180,9 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo } rs.Values = valuesChunk rs.Timestamps = timestampsChunk - WriteExportJSONLine(w, rs) + bb := quicktemplate.AcquireByteBuffer() + WriteExportJSONLine(bb, rs) + resultsCh <- bb } rs.Values = valuesOrig rs.Timestamps = timestampsOrig @@ -186,10 +191,18 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo contentType := "application/stream+json" if format == "prometheus" { contentType = "text/plain" - writeLineFunc = WriteExportPrometheusLine + writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { + bb := quicktemplate.AcquireByteBuffer() + WriteExportPrometheusLine(bb, rs) + resultsCh <- bb + } } else if format == "promapi" { writeResponseFunc = WriteExportPromAPIResponse - writeLineFunc = WriteExportPromAPILine + writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { + bb := quicktemplate.AcquireByteBuffer() + WriteExportPromAPILine(bb, rs) + resultsCh <- bb + } } tagFilterss, err := getTagFilterssFromMatches(matches) @@ -210,9 +223,7 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo doneCh := make(chan error) go func() { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { - bb := quicktemplate.AcquireByteBuffer() - writeLineFunc(bb, rs) - resultsCh <- bb + writeLineFunc(rs, resultsCh) }) close(resultsCh) doneCh <- err