2020-02-23 12:35:47 +01:00
package remotewrite
import (
"flag"
"fmt"
2021-05-20 12:13:40 +02:00
"strconv"
2020-05-30 13:36:40 +02:00
"sync"
2020-02-23 12:35:47 +01:00
"sync/atomic"
2021-05-20 12:13:40 +02:00
"time"
2020-02-23 12:35:47 +01:00
2021-05-20 12:13:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
2020-05-30 13:36:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2021-05-20 01:12:36 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/metrics"
xxhash "github.com/cespare/xxhash/v2"
)
var (
remoteWriteURLs = flagutil . NewArray ( "remoteWrite.url" , "Remote storage URL to write data to. It must support Prometheus remote_write API. " +
"It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428/api/v1/write . " +
"Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems" )
2021-03-23 15:34:20 +01:00
tmpDataPath = flag . String ( "remoteWrite.tmpDataPath" , "vmagent-remotewrite-data" , "Path to directory where temporary data for remote write component is stored. " +
"See also -remoteWrite.maxDiskUsagePerURL" )
queues = flag . Int ( "remoteWrite.queues" , 4 , "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues " +
2020-02-23 12:35:47 +01:00
"isn't enough for sending high volume of collected data to remote storage" )
showRemoteWriteURL = flag . Bool ( "remoteWrite.showURL" , false , "Whether to show -remoteWrite.url in the exported metrics. " +
2020-07-10 13:07:02 +02:00
"It is hidden by default, since it can contain sensitive info such as auth key" )
2020-08-16 16:05:52 +02:00
maxPendingBytesPerURL = flagutil . NewBytes ( "remoteWrite.maxDiskUsagePerURL" , 0 , "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath " +
2020-03-03 18:48:46 +01:00
"for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. " +
"Buffered data is stored in ~500MB chunks, so the minimum practical value for this flag is 500000000. " +
"Disk usage is unlimited if the value is set to 0" )
2021-02-01 13:27:05 +01:00
significantFigures = flagutil . NewArrayInt ( "remoteWrite.significantFigures" , "The number of significant figures to leave in metric values before writing them " +
"to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. " +
"This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits" )
roundDigits = flagutil . NewArrayInt ( "remoteWrite.roundDigits" , "Round metric values to this number of decimal digits after the point before writing them to remote storage. " +
"Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. " +
"By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. " +
"This option may be used for improving data compression for the stored metrics" )
2021-05-20 01:12:36 +02:00
sortLabels = flag . Bool ( "sortLabels" , false , ` Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. ` +
` This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. ` +
` For example, if m { k1="v1",k2="v2"} may be sent as m { k2="v2",k1="v1"} ` +
` Enabled sorting for labels can slow down ingestion performance a bit ` )
2021-05-20 12:13:40 +02:00
maxHourlySeries = flag . Int ( "remoteWrite.maxHourlySeries" , 0 , "The maximum number of unique series vmagent can send to remote storage systems during the last hour. " +
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxDailySeries" )
maxDailySeries = flag . Int ( "remoteWrite.maxDailySeries" , 0 , "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. " +
2021-05-20 13:15:19 +02:00
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -remoteWrite.maxHourlySeries" )
2020-02-23 12:35:47 +01:00
)
2020-03-03 12:08:17 +01:00
var rwctxs [ ] * remoteWriteCtx
2020-05-30 13:36:40 +02:00
// Contains the current relabelConfigs.
var allRelabelConfigs atomic . Value
2020-08-30 20:23:38 +02:00
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers.
2021-04-23 21:01:57 +02:00
var maxQueues = cgroup . AvailableCPUs ( ) * 16
2020-08-30 20:23:38 +02:00
2020-09-29 18:48:53 +02:00
// InitSecretFlags must be called after flag.Parse and before any logging.
func InitSecretFlags ( ) {
if ! * showRemoteWriteURL {
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
flagutil . RegisterSecretFlag ( "remoteWrite.url" )
}
}
2020-02-23 12:35:47 +01:00
// Init initializes remotewrite.
//
// It must be called after flag.Parse().
//
// Stop must be called for graceful shutdown.
func Init ( ) {
if len ( * remoteWriteURLs ) == 0 {
2020-08-30 20:23:38 +02:00
logger . Fatalf ( "at least one `-remoteWrite.url` command-line flag must be set" )
}
2021-05-20 12:13:40 +02:00
if * maxHourlySeries > 0 {
hourlySeriesLimiter = bloomfilter . NewLimiter ( * maxHourlySeries , time . Hour )
2021-05-20 14:27:06 +02:00
_ = metrics . NewGauge ( ` vmagent_hourly_series_limit_max_series ` , func ( ) float64 {
return float64 ( hourlySeriesLimiter . MaxItems ( ) )
} )
_ = metrics . NewGauge ( ` vmagent_hourly_series_limit_current_series ` , func ( ) float64 {
return float64 ( hourlySeriesLimiter . CurrentItems ( ) )
} )
2021-05-20 12:13:40 +02:00
}
if * maxDailySeries > 0 {
dailySeriesLimiter = bloomfilter . NewLimiter ( * maxDailySeries , 24 * time . Hour )
2021-05-20 14:27:06 +02:00
_ = metrics . NewGauge ( ` vmagent_daily_series_limit_max_series ` , func ( ) float64 {
return float64 ( dailySeriesLimiter . MaxItems ( ) )
} )
_ = metrics . NewGauge ( ` vmagent_daily_series_limit_current_series ` , func ( ) float64 {
return float64 ( dailySeriesLimiter . CurrentItems ( ) )
} )
2021-05-20 12:13:40 +02:00
}
2020-08-30 20:23:38 +02:00
if * queues > maxQueues {
* queues = maxQueues
}
if * queues <= 0 {
* queues = 1
2020-02-23 12:35:47 +01:00
}
2020-05-30 13:36:40 +02:00
initLabelsGlobal ( )
2021-05-21 15:34:03 +02:00
// Register SIGHUP handler for config reload before loadRelabelConfigs.
// This guarantees that the config will be re-read if the signal arrives just after loadRelabelConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil . NewSighupChan ( )
2020-05-30 13:36:40 +02:00
rcs , err := loadRelabelConfigs ( )
if err != nil {
logger . Fatalf ( "cannot load relabel configs: %s" , err )
}
allRelabelConfigs . Store ( rcs )
2020-02-23 12:35:47 +01:00
maxInmemoryBlocks := memory . Allowed ( ) / len ( * remoteWriteURLs ) / maxRowsPerBlock / 100
2021-04-23 21:01:57 +02:00
if maxInmemoryBlocks > 400 {
2020-02-23 12:35:47 +01:00
// There is no much sense in keeping higher number of blocks in memory,
// since this means that the producer outperforms consumer and the queue
// will continue growing. It is better storing the queue to file.
2021-04-23 21:01:57 +02:00
maxInmemoryBlocks = 400
2020-02-23 12:35:47 +01:00
}
if maxInmemoryBlocks < 2 {
maxInmemoryBlocks = 2
}
for i , remoteWriteURL := range * remoteWriteURLs {
2020-09-16 21:34:01 +02:00
sanitizedURL := fmt . Sprintf ( "%d:secret-url" , i + 1 )
2020-02-23 12:35:47 +01:00
if * showRemoteWriteURL {
2020-09-16 21:34:01 +02:00
sanitizedURL = fmt . Sprintf ( "%d:%s" , i + 1 , remoteWriteURL )
2020-02-23 12:35:47 +01:00
}
2020-09-16 21:34:01 +02:00
rwctx := newRemoteWriteCtx ( i , remoteWriteURL , maxInmemoryBlocks , sanitizedURL )
2020-03-03 12:08:17 +01:00
rwctxs = append ( rwctxs , rwctx )
2020-02-23 12:35:47 +01:00
}
2020-05-30 13:36:40 +02:00
// Start config reloader.
configReloaderWG . Add ( 1 )
go func ( ) {
defer configReloaderWG . Done ( )
for {
select {
case <- sighupCh :
case <- stopCh :
return
}
logger . Infof ( "SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig" )
rcs , err := loadRelabelConfigs ( )
if err != nil {
logger . Errorf ( "cannot reload relabel configs; preserving the previous configs; error: %s" , err )
continue
}
allRelabelConfigs . Store ( rcs )
logger . Infof ( "Successfully reloaded relabel configs" )
}
} ( )
2020-02-23 12:35:47 +01:00
}
2020-05-30 13:36:40 +02:00
var stopCh = make ( chan struct { } )
var configReloaderWG sync . WaitGroup
2020-02-23 12:35:47 +01:00
// Stop stops remotewrite.
//
// It is expected that nobody calls Push during and after the call to this func.
func Stop ( ) {
2020-05-30 13:36:40 +02:00
close ( stopCh )
configReloaderWG . Wait ( )
2020-03-03 12:08:17 +01:00
for _ , rwctx := range rwctxs {
rwctx . MustStop ( )
2020-02-23 12:35:47 +01:00
}
2020-03-03 12:08:17 +01:00
rwctxs = nil
2020-02-23 12:35:47 +01:00
}
// Push sends wr to remote storage systems set via `-remoteWrite.url`.
//
2020-07-21 20:55:24 +02:00
// Note that wr may be modified by Push due to relabeling and rounding.
2020-02-23 12:35:47 +01:00
func Push ( wr * prompbmarshal . WriteRequest ) {
2020-03-03 12:08:17 +01:00
var rctx * relabelCtx
2020-05-30 13:36:40 +02:00
rcs := allRelabelConfigs . Load ( ) . ( * relabelConfigs )
2021-02-22 15:33:55 +01:00
pcsGlobal := rcs . global
if pcsGlobal . Len ( ) > 0 || len ( labelsGlobal ) > 0 {
2020-03-03 12:08:17 +01:00
rctx = getRelabelCtx ( )
}
2020-02-28 17:57:45 +01:00
tss := wr . Timeseries
for len ( tss ) > 0 {
2020-07-10 14:13:26 +02:00
// Process big tss in smaller blocks in order to reduce the maximum memory usage
2020-09-26 03:07:45 +02:00
samplesCount := 0
2021-03-30 23:44:31 +02:00
labelsCount := 0
2020-09-26 03:07:45 +02:00
i := 0
for i < len ( tss ) {
samplesCount += len ( tss [ i ] . Samples )
2021-03-30 23:44:31 +02:00
labelsCount += len ( tss [ i ] . Labels )
2020-09-26 03:07:45 +02:00
i ++
2021-03-30 23:44:31 +02:00
if samplesCount >= maxRowsPerBlock || labelsCount >= maxLabelsPerBlock {
2020-09-26 03:07:45 +02:00
break
}
}
2020-02-28 17:57:45 +01:00
tssBlock := tss
2020-09-26 03:07:45 +02:00
if i < len ( tss ) {
tssBlock = tss [ : i ]
tss = tss [ i : ]
2020-02-28 19:03:38 +01:00
} else {
tss = nil
2020-02-28 17:57:45 +01:00
}
2020-03-03 12:08:17 +01:00
if rctx != nil {
tssBlockLen := len ( tssBlock )
2021-02-22 15:33:55 +01:00
tssBlock = rctx . applyRelabeling ( tssBlock , labelsGlobal , pcsGlobal )
2020-03-03 12:08:17 +01:00
globalRelabelMetricsDropped . Add ( tssBlockLen - len ( tssBlock ) )
}
2021-05-20 01:12:36 +02:00
sortLabelsIfNeeded ( tssBlock )
2021-05-20 12:13:40 +02:00
tssBlock = limitSeriesCardinality ( tssBlock )
if len ( tssBlock ) > 0 {
for _ , rwctx := range rwctxs {
rwctx . Push ( tssBlock )
}
2020-03-03 12:08:17 +01:00
}
2020-03-03 14:00:52 +01:00
if rctx != nil {
rctx . reset ( )
}
2020-02-28 17:57:45 +01:00
}
2020-03-03 12:08:17 +01:00
if rctx != nil {
putRelabelCtx ( rctx )
}
2020-02-23 12:35:47 +01:00
}
2021-05-20 01:12:36 +02:00
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
func sortLabelsIfNeeded ( tss [ ] prompbmarshal . TimeSeries ) {
if ! * sortLabels {
return
}
for i := range tss {
promrelabel . SortLabels ( tss [ i ] . Labels )
}
}
2021-05-20 12:13:40 +02:00
func limitSeriesCardinality ( tss [ ] prompbmarshal . TimeSeries ) [ ] prompbmarshal . TimeSeries {
if hourlySeriesLimiter == nil && dailySeriesLimiter == nil {
return tss
}
dst := make ( [ ] prompbmarshal . TimeSeries , 0 , len ( tss ) )
for i := range tss {
labels := tss [ i ] . Labels
h := getLabelsHash ( labels )
if hourlySeriesLimiter != nil && ! hourlySeriesLimiter . Add ( h ) {
2021-05-20 13:15:19 +02:00
hourlySeriesLimitRowsDropped . Add ( len ( tss [ i ] . Samples ) )
logSkippedSeries ( labels , "-remoteWrite.maxHourlySeries" , hourlySeriesLimiter . MaxItems ( ) )
2021-05-20 12:13:40 +02:00
continue
}
if dailySeriesLimiter != nil && ! dailySeriesLimiter . Add ( h ) {
2021-05-20 13:15:19 +02:00
dailySeriesLimitRowsDropped . Add ( len ( tss [ i ] . Samples ) )
logSkippedSeries ( labels , "-remoteWrite.maxDailySeries" , dailySeriesLimiter . MaxItems ( ) )
2021-05-20 12:13:40 +02:00
continue
}
dst = append ( dst , tss [ i ] )
}
return dst
}
var (
hourlySeriesLimiter * bloomfilter . Limiter
dailySeriesLimiter * bloomfilter . Limiter
2021-05-20 13:15:19 +02:00
hourlySeriesLimitRowsDropped = metrics . NewCounter ( ` vmagent_hourly_series_limit_rows_dropped_total ` )
dailySeriesLimitRowsDropped = metrics . NewCounter ( ` vmagent_daily_series_limit_rows_dropped_total ` )
2021-05-20 12:13:40 +02:00
)
func getLabelsHash ( labels [ ] prompbmarshal . Label ) uint64 {
bb := labelsHashBufPool . Get ( )
b := bb . B [ : 0 ]
for _ , label := range labels {
b = append ( b , label . Name ... )
b = append ( b , label . Value ... )
}
h := xxhash . Sum64 ( b )
bb . B = b
labelsHashBufPool . Put ( bb )
return h
}
var labelsHashBufPool bytesutil . ByteBufferPool
func logSkippedSeries ( labels [ ] prompbmarshal . Label , flagName string , flagValue int ) {
select {
case <- logSkippedSeriesTicker . C :
logger . Warnf ( "skip series %s because %s=%d reached" , labelsToString ( labels ) , flagName , flagValue )
default :
}
}
var logSkippedSeriesTicker = time . NewTicker ( 5 * time . Second )
func labelsToString ( labels [ ] prompbmarshal . Label ) string {
var b [ ] byte
b = append ( b , '{' )
for i , label := range labels {
b = append ( b , label . Name ... )
b = append ( b , '=' )
b = strconv . AppendQuote ( b , label . Value )
if i + 1 < len ( labels ) {
b = append ( b , ',' )
}
}
b = append ( b , '}' )
return string ( b )
}
2020-03-03 12:08:17 +01:00
var globalRelabelMetricsDropped = metrics . NewCounter ( "vmagent_remotewrite_global_relabel_metrics_dropped_total" )
type remoteWriteCtx struct {
2020-05-30 13:36:40 +02:00
idx int
2020-03-03 12:08:17 +01:00
fq * persistentqueue . FastQueue
c * client
pss [ ] * pendingSeries
pssNextIdx uint64
relabelMetricsDropped * metrics . Counter
}
2020-09-16 21:34:01 +02:00
func newRemoteWriteCtx ( argIdx int , remoteWriteURL string , maxInmemoryBlocks int , sanitizedURL string ) * remoteWriteCtx {
2020-03-03 12:08:17 +01:00
h := xxhash . Sum64 ( [ ] byte ( remoteWriteURL ) )
2020-09-11 14:16:02 +02:00
path := fmt . Sprintf ( "%s/persistent-queue/%d_%016X" , * tmpDataPath , argIdx + 1 , h )
2020-09-16 21:34:01 +02:00
fq := persistentqueue . MustOpenFastQueue ( path , sanitizedURL , maxInmemoryBlocks , maxPendingBytesPerURL . N )
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_pending_data_bytes { path=%q, url=%q} ` , path , sanitizedURL ) , func ( ) float64 {
2020-03-03 12:08:17 +01:00
return float64 ( fq . GetPendingBytes ( ) )
} )
2020-09-16 21:34:01 +02:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_pending_inmemory_blocks { path=%q, url=%q} ` , path , sanitizedURL ) , func ( ) float64 {
2020-03-03 12:08:17 +01:00
return float64 ( fq . GetInmemoryQueueLen ( ) )
} )
2020-09-16 21:34:01 +02:00
c := newClient ( argIdx , remoteWriteURL , sanitizedURL , fq , * queues )
2021-02-01 13:27:05 +01:00
sf := significantFigures . GetOptionalArgOrDefault ( argIdx , 0 )
rd := roundDigits . GetOptionalArgOrDefault ( argIdx , 100 )
2021-03-31 15:16:26 +02:00
pssLen := * queues
if n := cgroup . AvailableCPUs ( ) ; pssLen > n {
// There is no sense in running more than availableCPUs concurrent pendingSeries,
// since every pendingSeries can saturate up to a single CPU.
pssLen = n
}
pss := make ( [ ] * pendingSeries , pssLen )
2020-03-03 12:08:17 +01:00
for i := range pss {
2021-02-01 13:27:05 +01:00
pss [ i ] = newPendingSeries ( fq . MustWriteBlock , sf , rd )
2020-03-03 12:08:17 +01:00
}
return & remoteWriteCtx {
2020-05-30 13:36:40 +02:00
idx : argIdx ,
fq : fq ,
c : c ,
pss : pss ,
2020-03-03 12:08:17 +01:00
2020-09-16 21:34:01 +02:00
relabelMetricsDropped : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_relabel_metrics_dropped_total { path=%q, url=%q} ` , path , sanitizedURL ) ) ,
2020-02-23 12:35:47 +01:00
}
}
2020-03-03 12:08:17 +01:00
func ( rwctx * remoteWriteCtx ) MustStop ( ) {
for _ , ps := range rwctx . pss {
ps . MustStop ( )
}
2020-05-30 13:36:40 +02:00
rwctx . idx = 0
2020-03-03 12:08:17 +01:00
rwctx . pss = nil
2021-02-18 23:31:07 +01:00
rwctx . fq . UnblockAllReaders ( )
2020-03-03 12:08:17 +01:00
rwctx . c . MustStop ( )
rwctx . c = nil
2021-02-17 20:23:38 +01:00
rwctx . fq . MustClose ( )
rwctx . fq = nil
2020-03-03 12:08:17 +01:00
rwctx . relabelMetricsDropped = nil
}
2020-02-23 12:35:47 +01:00
2020-03-03 12:08:17 +01:00
func ( rwctx * remoteWriteCtx ) Push ( tss [ ] prompbmarshal . TimeSeries ) {
var rctx * relabelCtx
2020-07-10 14:13:26 +02:00
var v * [ ] prompbmarshal . TimeSeries
2020-05-30 13:36:40 +02:00
rcs := allRelabelConfigs . Load ( ) . ( * relabelConfigs )
2021-02-22 15:33:55 +01:00
pcs := rcs . perURL [ rwctx . idx ]
if pcs . Len ( ) > 0 {
2020-07-10 14:13:26 +02:00
rctx = getRelabelCtx ( )
2020-05-12 21:01:47 +02:00
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
2020-07-10 14:13:26 +02:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
v = tssRelabelPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
tss = append ( * v , tss ... )
2020-03-03 12:08:17 +01:00
tssLen := len ( tss )
2021-02-22 15:33:55 +01:00
tss = rctx . applyRelabeling ( tss , nil , pcs )
2020-03-03 12:08:17 +01:00
rwctx . relabelMetricsDropped . Add ( tssLen - len ( tss ) )
}
pss := rwctx . pss
idx := atomic . AddUint64 ( & rwctx . pssNextIdx , 1 ) % uint64 ( len ( pss ) )
pss [ idx ] . Push ( tss )
if rctx != nil {
2020-07-10 14:13:26 +02:00
* v = prompbmarshal . ResetTimeSeries ( tss )
tssRelabelPool . Put ( v )
2020-03-03 12:08:17 +01:00
putRelabelCtx ( rctx )
}
}
2020-07-10 14:13:26 +02:00
var tssRelabelPool = & sync . Pool {
New : func ( ) interface { } {
2020-07-14 13:27:50 +02:00
a := [ ] prompbmarshal . TimeSeries { }
return & a
2020-07-10 14:13:26 +02:00
} ,
}