app/vmagent: add -remoteWrite.maxBlockSize command-line flag for limiting the maximum size of unpacked block to send to remote storage

This commit is contained in:
Aliaksandr Valialkin 2020-02-25 19:57:47 +02:00
parent c4194020ef
commit c6c7843e93

View File

@ -13,8 +13,12 @@ import (
"github.com/golang/snappy" "github.com/golang/snappy"
) )
var flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+ var (
"Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage") flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+
"Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage")
maxUnpackedBlockSize = flag.Int("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+
"It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics")
)
// the maximum number of rows to send per each block. // the maximum number of rows to send per each block.
const maxRowsPerBlock = 10000 const maxRowsPerBlock = 10000
@ -160,17 +164,21 @@ func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byt
} }
bb := writeRequestBufPool.Get() bb := writeRequestBufPool.Get()
bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr) bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr)
if len(bb.B) <= persistentqueue.MaxBlockSize { if len(bb.B) <= *maxUnpackedBlockSize {
zb := snappyBufPool.Get() zb := snappyBufPool.Get()
zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B) zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B)
writeRequestBufPool.Put(bb) writeRequestBufPool.Put(bb)
pushBlock(zb.B) if len(zb.B) <= persistentqueue.MaxBlockSize {
blockSizeRows.Update(float64(len(wr.Timeseries))) pushBlock(zb.B)
blockSizeBytes.Update(float64(len(zb.B))) blockSizeRows.Update(float64(len(wr.Timeseries)))
blockSizeBytes.Update(float64(len(zb.B)))
snappyBufPool.Put(zb)
return
}
snappyBufPool.Put(zb) snappyBufPool.Put(zb)
return } else {
writeRequestBufPool.Put(bb)
} }
writeRequestBufPool.Put(bb)
// Too big block. Recursively split it into smaller parts. // Too big block. Recursively split it into smaller parts.
timeseries := wr.Timeseries timeseries := wr.Timeseries