From 8fc9b77496aac742b54fc02abee778ca1054f646 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 26 Sep 2020 04:07:45 +0300 Subject: [PATCH] app/vmagent: reduce memory usage when importing data via /api/v1/import Previously vmagent could use big amounts of RAM when each ingested JSON line contained many samples. --- app/vmagent/remotewrite/pendingseries.go | 4 ++-- app/vmagent/remotewrite/remotewrite.go | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 36a61bd1fd..7ef569abc2 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -19,7 +19,7 @@ var ( flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+ "Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage. "+ "Minimum supported interval is 1 second") - maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+ + maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 8*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+ "It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics") ) @@ -127,7 +127,7 @@ func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) { for i := range src { tssDst = append(tssDst, prompbmarshal.TimeSeries{}) wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i]) - if len(tssDst) >= maxRowsPerBlock { + if len(wr.samples) >= maxRowsPerBlock { wr.tss = tssDst wr.flush() tssDst = wr.tss diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 8431eb3bc7..9b05b43418 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -153,10 +153,19 @@ func Push(wr *prompbmarshal.WriteRequest) { tss := wr.Timeseries for len(tss) > 0 { // Process big tss in smaller blocks in order to reduce the maximum memory usage + samplesCount := 0 + i := 0 + for i < len(tss) { + samplesCount += len(tss[i].Samples) + i++ + if samplesCount > maxRowsPerBlock { + break + } + } tssBlock := tss - if len(tssBlock) > maxRowsPerBlock { - tssBlock = tss[:maxRowsPerBlock] - tss = tss[maxRowsPerBlock:] + if i < len(tss) { + tssBlock = tss[:i] + tss = tss[i:] } else { tss = nil }