app/vmagent: add -remoteWrite.maxBlockSize command-line flag for limiting the maximum size of unpacked block to send to remote storage

This commit is contained in:
Aliaksandr Valialkin 2020-02-25 19:57:47 +02:00
parent 4e24839a2c
commit be37d762cd

View File

@ -13,8 +13,12 @@ import (
"github.com/golang/snappy"
)
var flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+
"Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage")
var (
flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+
"Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage")
maxUnpackedBlockSize = flag.Int("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+
"It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics")
)
// the maximum number of rows to send per each block.
const maxRowsPerBlock = 10000
@ -160,17 +164,21 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt
}
bb := writeRequestBufPool.Get()
bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr)
if len(bb.B) <= persistentqueue.MaxBlockSize {
if len(bb.B) <= *maxUnpackedBlockSize {
zb := snappyBufPool.Get()
zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B)
writeRequestBufPool.Put(bb)
pushBlock(zb.B)
blockSizeRows.Update(float64(len(wr.Timeseries)))
blockSizeBytes.Update(float64(len(zb.B)))
if len(zb.B) <= persistentqueue.MaxBlockSize {
pushBlock(zb.B)
blockSizeRows.Update(float64(len(wr.Timeseries)))
blockSizeBytes.Update(float64(len(zb.B)))
snappyBufPool.Put(zb)
return
}
snappyBufPool.Put(zb)
return
} else {
writeRequestBufPool.Put(bb)
}
writeRequestBufPool.Put(bb)
// Too big block. Recursively split it into smaller parts.
timeseries := wr.Timeseries