2020-02-23 12:35:47 +01:00
package remotewrite
import (
"flag"
"sync"
2020-09-03 11:10:47 +02:00
"sync/atomic"
2020-02-23 12:35:47 +01:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2021-02-01 13:27:05 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
2023-02-21 03:38:49 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
2020-05-14 21:01:51 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-08-16 16:05:52 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2022-03-18 18:06:18 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2020-11-07 15:16:56 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy"
)
2020-02-25 18:57:47 +01:00
var (
flushInterval = flag . Duration ( "remoteWrite.flushInterval" , time . Second , "Interval for flushing the data to remote storage. " +
2021-03-01 10:50:39 +01:00
"This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url" )
2021-11-04 14:39:14 +01:00
maxUnpackedBlockSize = flagutil . NewBytes ( "remoteWrite.maxBlockSize" , 8 * 1024 * 1024 , "The maximum block size to send to remote storage. Bigger blocks may improve performance at the cost of the increased memory usage. See also -remoteWrite.maxRowsPerBlock" )
maxRowsPerBlock = flag . Int ( "remoteWrite.maxRowsPerBlock" , 10000 , "The maximum number of samples to send in each block to remote storage. Higher number may improve performance at the cost of the increased memory usage. See also -remoteWrite.maxBlockSize" )
2023-02-27 20:03:49 +01:00
vmProtoCompressLevel = flag . Int ( "remoteWrite.vmProtoCompressLevel" , 0 , "The compression level for VictoriaMetrics remote write protocol. " +
"Higher values reduce network traffic at the cost of higher CPU usage. Negative values reduce CPU usage at the cost of increased network traffic. " +
"See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol" )
2020-02-25 18:57:47 +01:00
)
2020-02-23 12:35:47 +01:00
type pendingSeries struct {
mu sync . Mutex
wr writeRequest
stopCh chan struct { }
periodicFlusherWG sync . WaitGroup
}
2023-02-21 03:38:49 +01:00
func newPendingSeries ( pushBlock func ( block [ ] byte ) , isVMRemoteWrite bool , significantFigures , roundDigits int ) * pendingSeries {
2020-02-23 12:35:47 +01:00
var ps pendingSeries
ps . wr . pushBlock = pushBlock
2023-02-21 03:38:49 +01:00
ps . wr . isVMRemoteWrite = isVMRemoteWrite
2021-02-01 13:27:05 +01:00
ps . wr . significantFigures = significantFigures
ps . wr . roundDigits = roundDigits
2020-02-23 12:35:47 +01:00
ps . stopCh = make ( chan struct { } )
ps . periodicFlusherWG . Add ( 1 )
go func ( ) {
defer ps . periodicFlusherWG . Done ( )
ps . periodicFlusher ( )
} ( )
return & ps
}
func ( ps * pendingSeries ) MustStop ( ) {
close ( ps . stopCh )
ps . periodicFlusherWG . Wait ( )
}
func ( ps * pendingSeries ) Push ( tss [ ] prompbmarshal . TimeSeries ) {
ps . mu . Lock ( )
ps . wr . push ( tss )
ps . mu . Unlock ( )
}
func ( ps * pendingSeries ) periodicFlusher ( ) {
2020-05-14 21:01:51 +02:00
flushSeconds := int64 ( flushInterval . Seconds ( ) )
if flushSeconds <= 0 {
flushSeconds = 1
}
2020-02-23 12:35:47 +01:00
ticker := time . NewTicker ( * flushInterval )
defer ticker . Stop ( )
mustStop := false
for ! mustStop {
select {
case <- ps . stopCh :
mustStop = true
case <- ticker . C :
2020-09-03 11:10:47 +02:00
if fasttime . UnixTimestamp ( ) - atomic . LoadUint64 ( & ps . wr . lastFlushTime ) < uint64 ( flushSeconds ) {
2020-02-23 12:35:47 +01:00
continue
}
}
ps . mu . Lock ( )
ps . wr . flush ( )
ps . mu . Unlock ( )
}
}
type writeRequest struct {
2020-09-03 11:10:47 +02:00
// Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures.
2020-05-14 21:01:51 +02:00
lastFlushTime uint64
2020-02-23 12:35:47 +01:00
2021-02-01 13:27:05 +01:00
// pushBlock is called when whe write request is ready to be sent.
2020-09-03 11:10:47 +02:00
pushBlock func ( block [ ] byte )
2023-02-21 03:38:49 +01:00
// Whether to encode the write request with VictoriaMetrics remote write protocol.
isVMRemoteWrite bool
2021-02-01 13:27:05 +01:00
// How many significant figures must be left before sending the writeRequest to pushBlock.
significantFigures int
// How many decimal digits after point must be left before sending the writeRequest to pushBlock.
roundDigits int
wr prompbmarshal . WriteRequest
2020-02-23 12:35:47 +01:00
tss [ ] prompbmarshal . TimeSeries
labels [ ] prompbmarshal . Label
samples [ ] prompbmarshal . Sample
buf [ ] byte
}
func ( wr * writeRequest ) reset ( ) {
2023-02-21 03:38:49 +01:00
// Do not reset lastFlushTime, pushBlock, isVMRemoteWrite, significantFigures and roundDigits, since they are re-used.
2021-02-01 13:27:05 +01:00
2020-02-23 12:35:47 +01:00
wr . wr . Timeseries = nil
for i := range wr . tss {
ts := & wr . tss [ i ]
ts . Labels = nil
ts . Samples = nil
}
wr . tss = wr . tss [ : 0 ]
2020-11-07 15:16:56 +01:00
promrelabel . CleanLabels ( wr . labels )
2020-02-23 12:35:47 +01:00
wr . labels = wr . labels [ : 0 ]
wr . samples = wr . samples [ : 0 ]
wr . buf = wr . buf [ : 0 ]
}
func ( wr * writeRequest ) flush ( ) {
wr . wr . Timeseries = wr . tss
2021-02-01 13:27:05 +01:00
wr . adjustSampleValues ( )
2020-09-03 11:10:47 +02:00
atomic . StoreUint64 ( & wr . lastFlushTime , fasttime . UnixTimestamp ( ) )
2023-02-21 03:38:49 +01:00
pushWriteRequest ( & wr . wr , wr . pushBlock , wr . isVMRemoteWrite )
2020-02-23 12:35:47 +01:00
wr . reset ( )
}
2021-02-01 13:27:05 +01:00
func ( wr * writeRequest ) adjustSampleValues ( ) {
samples := wr . samples
if n := wr . significantFigures ; n > 0 {
for i := range samples {
s := & samples [ i ]
s . Value = decimal . RoundToSignificantFigures ( s . Value , n )
}
}
if n := wr . roundDigits ; n < 100 {
for i := range samples {
s := & samples [ i ]
s . Value = decimal . RoundToDecimalDigits ( s . Value , n )
}
}
}
2020-02-23 12:35:47 +01:00
func ( wr * writeRequest ) push ( src [ ] prompbmarshal . TimeSeries ) {
tssDst := wr . tss
2021-11-04 14:39:14 +01:00
maxSamplesPerBlock := * maxRowsPerBlock
// Allow up to 10x of labels per each block on average.
maxLabelsPerBlock := 10 * maxSamplesPerBlock
2020-02-23 12:35:47 +01:00
for i := range src {
tssDst = append ( tssDst , prompbmarshal . TimeSeries { } )
2020-09-03 11:08:14 +02:00
wr . copyTimeSeries ( & tssDst [ len ( tssDst ) - 1 ] , & src [ i ] )
2021-11-04 14:39:14 +01:00
if len ( wr . samples ) >= maxSamplesPerBlock || len ( wr . labels ) >= maxLabelsPerBlock {
2020-09-03 11:08:14 +02:00
wr . tss = tssDst
2020-02-23 12:35:47 +01:00
wr . flush ( )
tssDst = wr . tss
}
}
wr . tss = tssDst
}
func ( wr * writeRequest ) copyTimeSeries ( dst , src * prompbmarshal . TimeSeries ) {
labelsDst := wr . labels
labelsLen := len ( wr . labels )
samplesDst := wr . samples
buf := wr . buf
for i := range src . Labels {
labelsDst = append ( labelsDst , prompbmarshal . Label { } )
dstLabel := & labelsDst [ len ( labelsDst ) - 1 ]
srcLabel := & src . Labels [ i ]
buf = append ( buf , srcLabel . Name ... )
dstLabel . Name = bytesutil . ToUnsafeString ( buf [ len ( buf ) - len ( srcLabel . Name ) : ] )
buf = append ( buf , srcLabel . Value ... )
dstLabel . Value = bytesutil . ToUnsafeString ( buf [ len ( buf ) - len ( srcLabel . Value ) : ] )
}
dst . Labels = labelsDst [ labelsLen : ]
2020-05-15 16:35:59 +02:00
samplesDst = append ( samplesDst , src . Samples ... )
dst . Samples = samplesDst [ len ( samplesDst ) - len ( src . Samples ) : ]
2020-02-23 12:35:47 +01:00
wr . samples = samplesDst
wr . labels = labelsDst
wr . buf = buf
}
2023-02-21 03:38:49 +01:00
func pushWriteRequest ( wr * prompbmarshal . WriteRequest , pushBlock func ( block [ ] byte ) , isVMRemoteWrite bool ) {
2020-02-23 12:35:47 +01:00
if len ( wr . Timeseries ) == 0 {
// Nothing to push
return
}
bb := writeRequestBufPool . Get ( )
bb . B = prompbmarshal . MarshalWriteRequest ( bb . B [ : 0 ] , wr )
2022-12-15 04:26:24 +01:00
if len ( bb . B ) <= maxUnpackedBlockSize . IntN ( ) {
2020-02-25 18:34:35 +01:00
zb := snappyBufPool . Get ( )
2023-02-21 03:38:49 +01:00
if isVMRemoteWrite {
2023-02-27 20:03:49 +01:00
zb . B = zstd . CompressLevel ( zb . B [ : 0 ] , bb . B , * vmProtoCompressLevel )
2023-02-21 03:38:49 +01:00
} else {
zb . B = snappy . Encode ( zb . B [ : cap ( zb . B ) ] , bb . B )
}
2020-02-25 18:34:35 +01:00
writeRequestBufPool . Put ( bb )
2020-02-25 18:57:47 +01:00
if len ( zb . B ) <= persistentqueue . MaxBlockSize {
pushBlock ( zb . B )
blockSizeRows . Update ( float64 ( len ( wr . Timeseries ) ) )
blockSizeBytes . Update ( float64 ( len ( zb . B ) ) )
snappyBufPool . Put ( zb )
return
}
2020-02-23 12:35:47 +01:00
snappyBufPool . Put ( zb )
2020-02-25 18:57:47 +01:00
} else {
writeRequestBufPool . Put ( bb )
2020-02-23 12:35:47 +01:00
}
2022-03-18 18:06:18 +01:00
// Too big block. Recursively split it into smaller parts if possible.
if len ( wr . Timeseries ) == 1 {
// A single time series left. Recursively split its samples into smaller parts if possible.
samples := wr . Timeseries [ 0 ] . Samples
if len ( samples ) == 1 {
logger . Warnf ( "dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes" , maxUnpackedBlockSize . N )
return
}
n := len ( samples ) / 2
wr . Timeseries [ 0 ] . Samples = samples [ : n ]
2023-02-21 03:38:49 +01:00
pushWriteRequest ( wr , pushBlock , isVMRemoteWrite )
2022-03-18 18:06:18 +01:00
wr . Timeseries [ 0 ] . Samples = samples [ n : ]
2023-02-21 03:38:49 +01:00
pushWriteRequest ( wr , pushBlock , isVMRemoteWrite )
2022-03-18 18:06:18 +01:00
wr . Timeseries [ 0 ] . Samples = samples
return
}
2020-02-23 12:35:47 +01:00
timeseries := wr . Timeseries
n := len ( timeseries ) / 2
wr . Timeseries = timeseries [ : n ]
2023-02-21 03:38:49 +01:00
pushWriteRequest ( wr , pushBlock , isVMRemoteWrite )
2020-02-23 12:35:47 +01:00
wr . Timeseries = timeseries [ n : ]
2023-02-21 03:38:49 +01:00
pushWriteRequest ( wr , pushBlock , isVMRemoteWrite )
2020-02-23 12:35:47 +01:00
wr . Timeseries = timeseries
}
var (
blockSizeBytes = metrics . NewHistogram ( ` vmagent_remotewrite_block_size_bytes ` )
blockSizeRows = metrics . NewHistogram ( ` vmagent_remotewrite_block_size_rows ` )
)
var writeRequestBufPool bytesutil . ByteBufferPool
var snappyBufPool bytesutil . ByteBufferPool