app/vmselect: reduce memory usage when exporting time series with big number of samples via /api/v1/export if max_rows_per_line is set to non-zero value

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/685
This commit is contained in:
Aliaksandr Valialkin 2020-08-10 20:57:18 +03:00
parent 890cfe5b61
commit 62b6e54622

View File

@ -3,7 +3,6 @@ package prometheus
import ( import (
"flag" "flag"
"fmt" "fmt"
"io"
"math" "math"
"net/http" "net/http"
"runtime" "runtime"
@ -163,9 +162,13 @@ var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/
func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error { func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error {
writeResponseFunc := WriteExportStdResponse writeResponseFunc := WriteExportStdResponse
writeLineFunc := WriteExportJSONLine writeLineFunc := func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportJSONLine(bb, rs)
resultsCh <- bb
}
if maxRowsPerLine > 0 { if maxRowsPerLine > 0 {
writeLineFunc = func(w io.Writer, rs *netstorage.Result) { writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
valuesOrig := rs.Values valuesOrig := rs.Values
timestampsOrig := rs.Timestamps timestampsOrig := rs.Timestamps
values := valuesOrig values := valuesOrig
@ -186,7 +189,9 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
} }
rs.Values = valuesChunk rs.Values = valuesChunk
rs.Timestamps = timestampsChunk rs.Timestamps = timestampsChunk
WriteExportJSONLine(w, rs) bb := quicktemplate.AcquireByteBuffer()
WriteExportJSONLine(bb, rs)
resultsCh <- bb
} }
rs.Values = valuesOrig rs.Values = valuesOrig
rs.Timestamps = timestampsOrig rs.Timestamps = timestampsOrig
@ -195,10 +200,18 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
contentType := "application/stream+json" contentType := "application/stream+json"
if format == "prometheus" { if format == "prometheus" {
contentType = "text/plain" contentType = "text/plain"
writeLineFunc = WriteExportPrometheusLine writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportPrometheusLine(bb, rs)
resultsCh <- bb
}
} else if format == "promapi" { } else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = WriteExportPromAPILine writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportPromAPILine(bb, rs)
resultsCh <- bb
}
} }
tagFilterss, err := getTagFilterssFromMatches(matches) tagFilterss, err := getTagFilterssFromMatches(matches)
@ -225,9 +238,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
doneCh := make(chan error) doneCh := make(chan error)
go func() { go func() {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
bb := quicktemplate.AcquireByteBuffer() writeLineFunc(rs, resultsCh)
writeLineFunc(bb, rs)
resultsCh <- bb
}) })
close(resultsCh) close(resultsCh)
doneCh <- err doneCh <- err