diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 36a61bd1fd..7ef569abc2 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -19,7 +19,7 @@ var ( flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+ "Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage. "+ "Minimum supported interval is 1 second") - maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+ + maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 8*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+ "It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics") ) @@ -127,7 +127,7 @@ func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) { for i := range src { tssDst = append(tssDst, prompbmarshal.TimeSeries{}) wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i]) - if len(tssDst) >= maxRowsPerBlock { + if len(wr.samples) >= maxRowsPerBlock { wr.tss = tssDst wr.flush() tssDst = wr.tss diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 8431eb3bc7..9b05b43418 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -153,10 +153,19 @@ func Push(wr *prompbmarshal.WriteRequest) { tss := wr.Timeseries for len(tss) > 0 { // Process big tss in smaller blocks in order to reduce the maximum memory usage + samplesCount := 0 + i := 0 + for i < len(tss) { + samplesCount += len(tss[i].Samples) + i++ + if samplesCount > maxRowsPerBlock { + break + } + } tssBlock := tss - if len(tssBlock) > maxRowsPerBlock { - tssBlock = tss[:maxRowsPerBlock] - tss = tss[maxRowsPerBlock:] + if i < len(tss) { + tssBlock = tss[:i] + tss = tss[i:] } else { tss = nil }