app/vmagent: reduce memory usage when importing data via /api/v1/import

Previously vmagent could use big amounts of RAM when each ingested JSON line
contained many samples.
This commit is contained in:
Aliaksandr Valialkin 2020-09-26 04:07:45 +03:00
parent 973df09686
commit 8fc9b77496
2 changed files with 14 additions and 5 deletions

View File

@ -19,7 +19,7 @@ var (
flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+ flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+
"Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage. "+ "Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage. "+
"Minimum supported interval is 1 second") "Minimum supported interval is 1 second")
maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+ maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 8*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+
"It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics") "It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics")
) )
@ -127,7 +127,7 @@ func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) {
for i := range src { for i := range src {
tssDst = append(tssDst, prompbmarshal.TimeSeries{}) tssDst = append(tssDst, prompbmarshal.TimeSeries{})
wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i]) wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i])
if len(tssDst) >= maxRowsPerBlock { if len(wr.samples) >= maxRowsPerBlock {
wr.tss = tssDst wr.tss = tssDst
wr.flush() wr.flush()
tssDst = wr.tss tssDst = wr.tss

View File

@ -153,10 +153,19 @@ func Push(wr *prompbmarshal.WriteRequest) {
tss := wr.Timeseries tss := wr.Timeseries
for len(tss) > 0 { for len(tss) > 0 {
// Process big tss in smaller blocks in order to reduce the maximum memory usage // Process big tss in smaller blocks in order to reduce the maximum memory usage
samplesCount := 0
i := 0
for i < len(tss) {
samplesCount += len(tss[i].Samples)
i++
if samplesCount > maxRowsPerBlock {
break
}
}
tssBlock := tss tssBlock := tss
if len(tssBlock) > maxRowsPerBlock { if i < len(tss) {
tssBlock = tss[:maxRowsPerBlock] tssBlock = tss[:i]
tss = tss[maxRowsPerBlock:] tss = tss[i:]
} else { } else {
tss = nil tss = nil
} }