From 62b6e5462298e208db3f196bb43ae79972e23e09 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 10 Aug 2020 20:57:18 +0300 Subject: [PATCH] app/vmselect: reduce memory usage when exporting time series with big number of samples via `/api/v1/export` if `max_rows_per_line` is set to non-zero value Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/685 --- app/vmselect/prometheus/prometheus.go | 29 ++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 6393743f61..a7b2072ae0 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -3,7 +3,6 @@ package prometheus import ( "flag" "fmt" - "io" "math" "net/http" "runtime" @@ -163,9 +162,13 @@ var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error { writeResponseFunc := WriteExportStdResponse - writeLineFunc := WriteExportJSONLine + writeLineFunc := func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { + bb := quicktemplate.AcquireByteBuffer() + WriteExportJSONLine(bb, rs) + resultsCh <- bb + } if maxRowsPerLine > 0 { - writeLineFunc = func(w io.Writer, rs *netstorage.Result) { + writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { valuesOrig := rs.Values timestampsOrig := rs.Timestamps values := valuesOrig @@ -186,7 +189,9 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match } rs.Values = valuesChunk rs.Timestamps = timestampsChunk - WriteExportJSONLine(w, rs) + bb := quicktemplate.AcquireByteBuffer() + WriteExportJSONLine(bb, rs) + resultsCh <- bb } rs.Values = valuesOrig rs.Timestamps = timestampsOrig @@ -195,10 +200,18 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match contentType := "application/stream+json" if format == "prometheus" { contentType = "text/plain" - writeLineFunc = WriteExportPrometheusLine + writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { + bb := quicktemplate.AcquireByteBuffer() + WriteExportPrometheusLine(bb, rs) + resultsCh <- bb + } } else if format == "promapi" { writeResponseFunc = WriteExportPromAPIResponse - writeLineFunc = WriteExportPromAPILine + writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { + bb := quicktemplate.AcquireByteBuffer() + WriteExportPromAPILine(bb, rs) + resultsCh <- bb + } } tagFilterss, err := getTagFilterssFromMatches(matches) @@ -225,9 +238,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match doneCh := make(chan error) go func() { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { - bb := quicktemplate.AcquireByteBuffer() - writeLineFunc(bb, rs) - resultsCh <- bb + writeLineFunc(rs, resultsCh) }) close(resultsCh) doneCh <- err