2020-02-23 12:35:47 +01:00
package remotewrite
import (
"flag"
"fmt"
2023-11-24 13:42:11 +01:00
"net/http"
2021-09-28 23:52:07 +02:00
"net/url"
2023-03-28 03:15:28 +02:00
"path/filepath"
2021-05-20 12:13:40 +02:00
"strconv"
2020-05-30 13:36:40 +02:00
"sync"
2020-02-23 12:35:47 +01:00
"sync/atomic"
2021-05-20 12:13:40 +02:00
"time"
2020-02-23 12:35:47 +01:00
2023-11-24 13:42:11 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2021-08-05 08:46:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2021-05-20 12:13:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2022-11-21 23:38:43 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2023-03-28 03:15:28 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
2020-05-30 13:36:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2021-05-20 01:12:36 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
2023-11-01 23:05:11 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
2023-01-04 07:19:18 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
2021-08-05 08:46:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
2023-08-11 14:38:28 +02:00
"github.com/VictoriaMetrics/metrics"
2023-08-11 15:23:00 +02:00
"github.com/cespare/xxhash/v2"
2020-02-23 12:35:47 +01:00
)
var (
2023-02-24 02:36:52 +01:00
remoteWriteURLs = flagutil . NewArrayString ( "remoteWrite.url" , "Remote storage URL to write data to. It must support either VictoriaMetrics remote write protocol " +
"or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . " +
2023-07-25 03:15:24 +02:00
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. " +
2023-12-05 00:20:44 +01:00
"The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set" )
2022-10-01 17:26:05 +02:00
remoteWriteMultitenantURLs = flagutil . NewArrayString ( "remoteWrite.multitenantURL" , "Base path for multitenant remote storage URL to write data to. " +
2021-08-05 08:46:19 +02:00
"See https://docs.victoriametrics.com/vmagent.html#multitenancy for details. Example url: http://<vminsert>:8480 . " +
2023-12-05 00:20:44 +01:00
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. " +
"This flag is deprecated in favor of -enableMultitenantHandlers . See https://docs.victoriametrics.com/vmagent.html#multitenancy" )
enableMultitenantHandlers = flag . Bool ( "enableMultitenantHandlers" , false , "Whether to process incoming data via multitenant insert handlers according to " +
"https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format . By default incoming data is processed via single-node insert handlers " +
"according to https://docs.victoriametrics.com/#how-to-import-time-series-data ." +
"See https://docs.victoriametrics.com/vmagent.html#multitenancy for details" )
2023-07-25 03:15:24 +02:00
shardByURL = flag . Bool ( "remoteWrite.shardByURL" , false , "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . " +
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#sharding-among-remote-storages" )
2023-11-01 23:05:11 +01:00
shardByURLLabels = flagutil . NewArrayString ( "remoteWrite.shardByURL.labels" , "Optional list of labels, which must be used for sharding outgoing samples " +
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain " +
"even distribution of series over the specified -remoteWrite.url systems" )
2023-11-25 10:31:30 +01:00
tmpDataPath = flag . String ( "remoteWrite.tmpDataPath" , "vmagent-remotewrite-data" , "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . " +
"See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue" )
2023-03-28 03:15:28 +02:00
keepDanglingQueues = flag . Bool ( "remoteWrite.keepDanglingQueues" , false , "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. " +
"Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on." )
2021-06-17 12:26:35 +02:00
queues = flag . Int ( "remoteWrite.queues" , cgroup . AvailableCPUs ( ) * 2 , "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues " +
2024-03-06 12:43:08 +01:00
"isn't enough for sending high volume of collected data to remote storage. " +
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage" )
2020-02-23 12:35:47 +01:00
showRemoteWriteURL = flag . Bool ( "remoteWrite.showURL" , false , "Whether to show -remoteWrite.url in the exported metrics. " +
2020-07-10 13:07:02 +02:00
"It is hidden by default, since it can contain sensitive info such as auth key" )
2023-08-12 13:17:55 +02:00
maxPendingBytesPerURL = flagutil . NewArrayBytes ( "remoteWrite.maxDiskUsagePerURL" , 0 , "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath " +
2020-03-03 18:48:46 +01:00
"for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. " +
2023-04-26 12:23:01 +02:00
"Buffered data is stored in ~500MB chunks. It is recommended to set the value for this flag to a multiple of the block size 500MB. " +
2020-03-03 18:48:46 +01:00
"Disk usage is unlimited if the value is set to 0" )
2023-08-12 13:17:55 +02:00
significantFigures = flagutil . NewArrayInt ( "remoteWrite.significantFigures" , 0 , "The number of significant figures to leave in metric values before writing them " +
2021-02-01 13:27:05 +01:00
"to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. " +
"This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits" )
2023-08-12 13:17:55 +02:00
roundDigits = flagutil . NewArrayInt ( "remoteWrite.roundDigits" , 100 , "Round metric values to this number of decimal digits after the point before " +
"writing them to remote storage. " +
2021-02-01 13:27:05 +01:00
"Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. " +
2023-05-10 09:50:41 +02:00
"By default, digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. " +
2021-02-01 13:27:05 +01:00
"This option may be used for improving data compression for the stored metrics" )
2021-05-20 01:12:36 +02:00
sortLabels = flag . Bool ( "sortLabels" , false , ` Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. ` +
` This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. ` +
` For example, if m { k1="v1",k2="v2"} may be sent as m { k2="v2",k1="v1"} ` +
` Enabled sorting for labels can slow down ingestion performance a bit ` )
2021-05-20 12:13:40 +02:00
maxHourlySeries = flag . Int ( "remoteWrite.maxHourlySeries" , 0 , "The maximum number of unique series vmagent can send to remote storage systems during the last hour. " +
2021-09-01 15:34:32 +02:00
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter" )
2021-05-20 12:13:40 +02:00
maxDailySeries = flag . Int ( "remoteWrite.maxDailySeries" , 0 , "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. " +
2021-09-01 15:34:32 +02:00
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter" )
2024-03-21 17:14:49 +01:00
maxIngestionRate = flag . Int ( "maxIngestionRate" , 0 , "The maximum number of samples vmagent can receive per second. " +
"If the limit is exceeded, the ingestion rate will be throttled." )
2023-04-01 06:27:45 +02:00
streamAggrConfig = flagutil . NewArrayString ( "remoteWrite.streamAggr.config" , "Optional path to file with stream aggregation config. " +
"See https://docs.victoriametrics.com/stream-aggregation.html . " +
2023-07-25 01:44:09 +02:00
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval" )
streamAggrKeepInput = flagutil . NewArrayBool ( "remoteWrite.streamAggr.keepInput" , "Whether to keep all the input samples after the aggregation " +
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples " +
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html" )
streamAggrDropInput = flagutil . NewArrayBool ( "remoteWrite.streamAggr.dropInput" , "Whether to drop all the input samples after the aggregation " +
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples " +
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html" )
2024-03-04 23:45:22 +01:00
streamAggrDedupInterval = flagutil . NewArrayDuration ( "remoteWrite.streamAggr.dedupInterval" , 0 , "Input samples are de-duplicated with this interval before optional aggregation " +
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication" )
2024-03-17 22:01:44 +01:00
streamAggrIgnoreOldSamples = flagutil . NewArrayBool ( "remoteWrite.streamAggr.ignoreOldSamples" , "Whether to ignore input samples with old timestamps outside the current aggregation interval " +
"for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples" )
2024-03-05 01:13:21 +01:00
streamAggrDropInputLabels = flagutil . NewArrayString ( "streamAggr.dropInputLabels" , "An optional list of labels to drop from samples " +
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels" )
2023-11-25 10:31:30 +01:00
disableOnDiskQueue = flag . Bool ( "remoteWrite.disableOnDiskQueue" , false , "Whether to disable storing pending data to -remoteWrite.tmpDataPath " +
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ." +
"See also -remoteWrite.dropSamplesOnOverload" )
dropSamplesOnOverload = flag . Bool ( "remoteWrite.dropSamplesOnOverload" , false , "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples " +
"cannot be pushed into the configured remote storage systems in a timely manner. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence" )
2020-02-23 12:35:47 +01:00
)
2021-08-05 08:46:19 +02:00
var (
// rwctxsDefault contains statically populated entries when -remoteWrite.url is specified.
rwctxsDefault [ ] * remoteWriteCtx
// rwctxsMap contains dynamically populated entries when -remoteWrite.multitenantURL is specified.
rwctxsMap = make ( map [ tenantmetrics . TenantID ] [ ] * remoteWriteCtx )
rwctxsMapLock sync . Mutex
2021-08-05 08:44:29 +02:00
2021-08-05 08:46:19 +02:00
// Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified.
defaultAuthToken = & auth . Token { }
2023-11-24 13:42:11 +01:00
2023-11-25 10:31:30 +01:00
// ErrQueueFullHTTPRetry must be returned when TryPush() returns false.
2023-11-24 13:42:11 +01:00
ErrQueueFullHTTPRetry = & httpserver . ErrorWithStatusCode {
2023-11-25 10:31:30 +01:00
Err : fmt . Errorf ( "remote storage systems cannot keep up with the data ingestion rate; retry the request later " +
"or remove -remoteWrite.disableOnDiskQueue from vmagent command-line flags, so it could save pending data to -remoteWrite.tmpDataPath; " +
"see https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence" ) ,
2023-11-24 13:42:11 +01:00
StatusCode : http . StatusTooManyRequests ,
}
2021-08-05 08:46:19 +02:00
)
2023-12-05 00:20:44 +01:00
// MultitenancyEnabled returns true if -enableMultitenantHandlers or -remoteWrite.multitenantURL is specified.
2021-08-05 08:46:19 +02:00
func MultitenancyEnabled ( ) bool {
2023-12-05 00:20:44 +01:00
return * enableMultitenantHandlers || len ( * remoteWriteMultitenantURLs ) > 0
2021-08-05 08:46:19 +02:00
}
2020-03-03 12:08:17 +01:00
2020-05-30 13:36:40 +02:00
// Contains the current relabelConfigs.
2023-07-20 02:37:49 +02:00
var allRelabelConfigs atomic . Pointer [ relabelConfigs ]
2020-05-30 13:36:40 +02:00
2020-08-30 20:23:38 +02:00
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers.
2021-04-23 21:01:57 +02:00
var maxQueues = cgroup . AvailableCPUs ( ) * 16
2020-08-30 20:23:38 +02:00
2023-03-28 03:33:05 +02:00
const persistentQueueDirname = "persistent-queue"
2023-03-28 03:15:28 +02:00
2020-09-29 18:48:53 +02:00
// InitSecretFlags must be called after flag.Parse and before any logging.
func InitSecretFlags ( ) {
if ! * showRemoteWriteURL {
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
flagutil . RegisterSecretFlag ( "remoteWrite.url" )
}
}
2023-11-01 23:05:11 +01:00
var shardByURLLabelsMap map [ string ] struct { }
2020-02-23 12:35:47 +01:00
// Init initializes remotewrite.
//
// It must be called after flag.Parse().
//
// Stop must be called for graceful shutdown.
2021-08-05 08:46:19 +02:00
func Init ( ) {
2021-08-05 08:44:29 +02:00
if len ( * remoteWriteURLs ) == 0 && len ( * remoteWriteMultitenantURLs ) == 0 {
logger . Fatalf ( "at least one `-remoteWrite.url` or `-remoteWrite.multitenantURL` command-line flag must be set" )
}
2021-08-05 08:46:19 +02:00
if len ( * remoteWriteURLs ) > 0 && len ( * remoteWriteMultitenantURLs ) > 0 {
logger . Fatalf ( "cannot set both `-remoteWrite.url` and `-remoteWrite.multitenantURL` command-line flags" )
2021-08-05 08:44:29 +02:00
}
2021-05-20 12:13:40 +02:00
if * maxHourlySeries > 0 {
hourlySeriesLimiter = bloomfilter . NewLimiter ( * maxHourlySeries , time . Hour )
2021-05-20 14:27:06 +02:00
_ = metrics . NewGauge ( ` vmagent_hourly_series_limit_max_series ` , func ( ) float64 {
return float64 ( hourlySeriesLimiter . MaxItems ( ) )
} )
_ = metrics . NewGauge ( ` vmagent_hourly_series_limit_current_series ` , func ( ) float64 {
return float64 ( hourlySeriesLimiter . CurrentItems ( ) )
} )
2021-05-20 12:13:40 +02:00
}
if * maxDailySeries > 0 {
dailySeriesLimiter = bloomfilter . NewLimiter ( * maxDailySeries , 24 * time . Hour )
2021-05-20 14:27:06 +02:00
_ = metrics . NewGauge ( ` vmagent_daily_series_limit_max_series ` , func ( ) float64 {
return float64 ( dailySeriesLimiter . MaxItems ( ) )
} )
_ = metrics . NewGauge ( ` vmagent_daily_series_limit_current_series ` , func ( ) float64 {
return float64 ( dailySeriesLimiter . CurrentItems ( ) )
} )
2021-05-20 12:13:40 +02:00
}
2024-03-21 17:14:49 +01:00
if * maxIngestionRate > 0 {
// Start ingestion rate limiter.
ingestionRateLimitReached = metrics . NewCounter ( ` vmagent_max_ingestion_rate_limit_reached_total ` )
ingestionRateLimiterStopCh = make ( chan struct { } )
ingestionRateLimiter = newRateLimiter ( time . Second , int64 ( * maxIngestionRate ) , ingestionRateLimitReached , ingestionRateLimiterStopCh )
}
2020-08-30 20:23:38 +02:00
if * queues > maxQueues {
* queues = maxQueues
}
if * queues <= 0 {
* queues = 1
2020-02-23 12:35:47 +01:00
}
2023-11-01 23:05:11 +01:00
if len ( * shardByURLLabels ) > 0 {
m := make ( map [ string ] struct { } , len ( * shardByURLLabels ) )
for _ , label := range * shardByURLLabels {
m [ label ] = struct { } { }
}
shardByURLLabelsMap = m
}
2020-05-30 13:36:40 +02:00
initLabelsGlobal ( )
2021-05-21 15:34:03 +02:00
// Register SIGHUP handler for config reload before loadRelabelConfigs.
// This guarantees that the config will be re-read if the signal arrives just after loadRelabelConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil . NewSighupChan ( )
2020-05-30 13:36:40 +02:00
rcs , err := loadRelabelConfigs ( )
if err != nil {
logger . Fatalf ( "cannot load relabel configs: %s" , err )
}
allRelabelConfigs . Store ( rcs )
2023-03-29 18:05:58 +02:00
relabelConfigSuccess . Set ( 1 )
relabelConfigTimestamp . Set ( fasttime . UnixTimestamp ( ) )
2021-08-05 08:46:19 +02:00
if len ( * remoteWriteURLs ) > 0 {
rwctxsDefault = newRemoteWriteCtxs ( nil , * remoteWriteURLs )
2021-08-05 08:44:29 +02:00
}
2023-11-23 19:39:40 +01:00
dropDanglingQueues ( )
2021-08-05 08:44:29 +02:00
2020-05-30 13:36:40 +02:00
// Start config reloader.
configReloaderWG . Add ( 1 )
go func ( ) {
defer configReloaderWG . Done ( )
for {
select {
case <- sighupCh :
2023-04-01 06:27:45 +02:00
case <- configReloaderStopCh :
2020-05-30 13:36:40 +02:00
return
}
2023-04-01 06:27:45 +02:00
reloadRelabelConfigs ( )
reloadStreamAggrConfigs ( )
2020-05-30 13:36:40 +02:00
}
} ( )
2020-02-23 12:35:47 +01:00
}
2023-11-23 19:39:40 +01:00
func dropDanglingQueues ( ) {
if * keepDanglingQueues {
return
}
if len ( * remoteWriteMultitenantURLs ) > 0 {
// Do not drop dangling queues for *remoteWriteMultitenantURLs, since it is impossible to determine
// unused queues for multitenant urls - they are created on demand when new sample for the given
// tenant is pushed to remote storage.
return
}
// Remove dangling persistent queues, if any.
// This is required for the case when the number of queues has been changed or URL have been changed.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
//
existingQueues := make ( map [ string ] struct { } , len ( rwctxsDefault ) )
for _ , rwctx := range rwctxsDefault {
existingQueues [ rwctx . fq . Dirname ( ) ] = struct { } { }
}
queuesDir := filepath . Join ( * tmpDataPath , persistentQueueDirname )
files := fs . MustReadDir ( queuesDir )
removed := 0
for _ , f := range files {
dirname := f . Name ( )
if _ , ok := existingQueues [ dirname ] ; ! ok {
logger . Infof ( "removing dangling queue %q" , dirname )
fullPath := filepath . Join ( queuesDir , dirname )
fs . MustRemoveAll ( fullPath )
removed ++
}
}
if removed > 0 {
logger . Infof ( "removed %d dangling queues from %q, active queues: %d" , removed , * tmpDataPath , len ( rwctxsDefault ) )
}
}
2023-04-01 06:27:45 +02:00
func reloadRelabelConfigs ( ) {
relabelConfigReloads . Inc ( )
logger . Infof ( "reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig" )
rcs , err := loadRelabelConfigs ( )
if err != nil {
relabelConfigReloadErrors . Inc ( )
relabelConfigSuccess . Set ( 0 )
logger . Errorf ( "cannot reload relabel configs; preserving the previous configs; error: %s" , err )
return
}
allRelabelConfigs . Store ( rcs )
relabelConfigSuccess . Set ( 1 )
relabelConfigTimestamp . Set ( fasttime . UnixTimestamp ( ) )
logger . Infof ( "successfully reloaded relabel configs" )
}
2022-11-21 23:38:43 +01:00
var (
2023-03-29 18:05:58 +02:00
relabelConfigReloads = metrics . NewCounter ( ` vmagent_relabel_config_reloads_total ` )
relabelConfigReloadErrors = metrics . NewCounter ( ` vmagent_relabel_config_reloads_errors_total ` )
2023-12-20 13:23:38 +01:00
relabelConfigSuccess = metrics . NewGauge ( ` vmagent_relabel_config_last_reload_successful ` , nil )
2023-03-29 18:05:58 +02:00
relabelConfigTimestamp = metrics . NewCounter ( ` vmagent_relabel_config_last_reload_success_timestamp_seconds ` )
2022-11-21 23:38:43 +01:00
)
2023-04-01 06:27:45 +02:00
func reloadStreamAggrConfigs ( ) {
if len ( * remoteWriteMultitenantURLs ) > 0 {
rwctxsMapLock . Lock ( )
for _ , rwctxs := range rwctxsMap {
reinitStreamAggr ( rwctxs )
}
rwctxsMapLock . Unlock ( )
} else {
reinitStreamAggr ( rwctxsDefault )
}
}
func reinitStreamAggr ( rwctxs [ ] * remoteWriteCtx ) {
for _ , rwctx := range rwctxs {
rwctx . reinitStreamAggr ( )
}
}
2021-08-05 08:46:19 +02:00
func newRemoteWriteCtxs ( at * auth . Token , urls [ ] string ) [ ] * remoteWriteCtx {
if len ( urls ) == 0 {
logger . Panicf ( "BUG: urls must be non-empty" )
}
2021-11-04 14:39:14 +01:00
maxInmemoryBlocks := memory . Allowed ( ) / len ( urls ) / * maxRowsPerBlock / 100
2021-11-05 14:14:49 +01:00
if maxInmemoryBlocks / * queues > 100 {
2021-08-05 08:46:19 +02:00
// There is no much sense in keeping higher number of blocks in memory,
// since this means that the producer outperforms consumer and the queue
// will continue growing. It is better storing the queue to file.
2021-11-05 14:14:49 +01:00
maxInmemoryBlocks = 100 * * queues
2021-08-05 08:46:19 +02:00
}
if maxInmemoryBlocks < 2 {
maxInmemoryBlocks = 2
}
rwctxs := make ( [ ] * remoteWriteCtx , len ( urls ) )
2021-09-28 23:52:07 +02:00
for i , remoteWriteURLRaw := range urls {
remoteWriteURL , err := url . Parse ( remoteWriteURLRaw )
if err != nil {
logger . Fatalf ( "invalid -remoteWrite.url=%q: %s" , remoteWriteURL , err )
}
2021-08-05 08:46:19 +02:00
sanitizedURL := fmt . Sprintf ( "%d:secret-url" , i + 1 )
if at != nil {
// Construct full remote_write url for the given tenant according to https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format
2021-09-28 23:52:07 +02:00
remoteWriteURL . Path = fmt . Sprintf ( "%s/insert/%d:%d/prometheus/api/v1/write" , remoteWriteURL . Path , at . AccountID , at . ProjectID )
2021-08-05 08:46:19 +02:00
sanitizedURL = fmt . Sprintf ( "%s:%d:%d" , sanitizedURL , at . AccountID , at . ProjectID )
}
if * showRemoteWriteURL {
sanitizedURL = fmt . Sprintf ( "%d:%s" , i + 1 , remoteWriteURL )
}
2023-09-01 09:34:16 +02:00
rwctxs [ i ] = newRemoteWriteCtx ( i , remoteWriteURL , maxInmemoryBlocks , sanitizedURL )
2021-08-05 08:46:19 +02:00
}
return rwctxs
}
2023-04-01 06:27:45 +02:00
var configReloaderStopCh = make ( chan struct { } )
2020-05-30 13:36:40 +02:00
var configReloaderWG sync . WaitGroup
2020-02-23 12:35:47 +01:00
// Stop stops remotewrite.
//
2023-11-25 10:31:30 +01:00
// It is expected that nobody calls TryPush during and after the call to this func.
2020-02-23 12:35:47 +01:00
func Stop ( ) {
2024-03-21 17:14:49 +01:00
if ingestionRateLimiterStopCh != nil {
close ( ingestionRateLimiterStopCh )
}
2023-04-01 06:27:45 +02:00
close ( configReloaderStopCh )
2020-05-30 13:36:40 +02:00
configReloaderWG . Wait ( )
2021-08-05 08:46:19 +02:00
for _ , rwctx := range rwctxsDefault {
rwctx . MustStop ( )
}
rwctxsDefault = nil
2023-11-25 10:31:30 +01:00
// There is no need in locking rwctxsMapLock here, since nobody should call TryPush during the Stop call.
2021-08-05 08:44:29 +02:00
for _ , rwctxs := range rwctxsMap {
for _ , rwctx := range rwctxs {
rwctx . MustStop ( )
}
2020-02-23 12:35:47 +01:00
}
2021-08-05 08:44:29 +02:00
rwctxsMap = nil
2021-09-01 13:14:37 +02:00
if sl := hourlySeriesLimiter ; sl != nil {
sl . MustStop ( )
}
if sl := dailySeriesLimiter ; sl != nil {
sl . MustStop ( )
}
2020-02-23 12:35:47 +01:00
}
2023-11-25 13:42:37 +01:00
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url and -remoteWrite.multitenantURL
//
// If at is nil, then the data is pushed to the configured -remoteWrite.url.
// If at isn't nil, the data is pushed to the configured -remoteWrite.multitenantURL.
//
// PushDropSamplesOnFailure can modify wr contents.
func PushDropSamplesOnFailure ( at * auth . Token , wr * prompbmarshal . WriteRequest ) {
_ = tryPush ( at , wr , true )
}
2023-11-25 10:31:30 +01:00
// TryPush tries sending wr to the configured remote storage systems set via -remoteWrite.url and -remoteWrite.multitenantURL
2020-02-23 12:35:47 +01:00
//
2023-11-25 10:31:30 +01:00
// If at is nil, then the data is pushed to the configured -remoteWrite.url.
// If at isn't nil, the data is pushed to the configured -remoteWrite.multitenantURL.
2021-08-05 08:46:19 +02:00
//
2023-11-25 10:31:30 +01:00
// TryPush can modify wr contents, so the caller must re-initialize wr before calling TryPush() after unsuccessful attempt.
// TryPush may send partial data from wr on unsuccessful attempt, so repeated call for the same wr may send the data multiple times.
//
// The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false.
func TryPush ( at * auth . Token , wr * prompbmarshal . WriteRequest ) bool {
2023-11-25 13:42:37 +01:00
return tryPush ( at , wr , * dropSamplesOnOverload )
}
func tryPush ( at * auth . Token , wr * prompbmarshal . WriteRequest , dropSamplesOnFailure bool ) bool {
2023-12-05 00:20:44 +01:00
tss := wr . Timeseries
if at == nil && MultitenancyEnabled ( ) {
// Write data to default tenant if at isn't set when multitenancy is enabled.
2021-08-05 08:46:19 +02:00
at = defaultAuthToken
2021-08-05 08:44:29 +02:00
}
2023-12-05 00:20:44 +01:00
var tenantRctx * relabelCtx
2021-08-05 08:46:19 +02:00
var rwctxs [ ] * remoteWriteCtx
if at == nil {
rwctxs = rwctxsDefault
2023-12-05 00:20:44 +01:00
} else if len ( * remoteWriteMultitenantURLs ) == 0 {
// Convert at to (vm_account_id, vm_project_id) labels.
tenantRctx = getRelabelCtx ( )
defer putRelabelCtx ( tenantRctx )
rwctxs = rwctxsDefault
2021-08-05 08:46:19 +02:00
} else {
rwctxsMapLock . Lock ( )
tenantID := tenantmetrics . TenantID {
AccountID : at . AccountID ,
ProjectID : at . ProjectID ,
}
rwctxs = rwctxsMap [ tenantID ]
if rwctxs == nil {
rwctxs = newRemoteWriteCtxs ( at , * remoteWriteMultitenantURLs )
rwctxsMap [ tenantID ] = rwctxs
}
rwctxsMapLock . Unlock ( )
2021-08-05 08:44:29 +02:00
}
2023-11-25 10:31:30 +01:00
rowsCount := getRowsCount ( tss )
if * disableOnDiskQueue {
// Quick check whether writes to configured remote storage systems are blocked.
// This allows saving CPU time spent on relabeling and block compression
// if some of remote storage systems cannot keep up with the data ingestion rate.
for _ , rwctx := range rwctxs {
if rwctx . fq . IsWriteBlocked ( ) {
pushFailures . Inc ( )
2023-11-25 13:42:37 +01:00
if dropSamplesOnFailure {
2023-11-25 10:31:30 +01:00
// Just drop samples
samplesDropped . Add ( rowsCount )
return true
}
return false
}
2023-11-24 13:42:11 +01:00
}
}
2021-08-05 08:44:29 +02:00
2020-03-03 12:08:17 +01:00
var rctx * relabelCtx
2023-07-20 02:37:49 +02:00
rcs := allRelabelConfigs . Load ( )
2021-02-22 15:33:55 +01:00
pcsGlobal := rcs . global
2023-08-17 14:35:26 +02:00
if pcsGlobal . Len ( ) > 0 {
2020-03-03 12:08:17 +01:00
rctx = getRelabelCtx ( )
2023-12-05 00:20:44 +01:00
defer putRelabelCtx ( rctx )
2020-03-03 12:08:17 +01:00
}
2022-05-06 14:28:59 +02:00
globalRowsPushedBeforeRelabel . Add ( rowsCount )
2021-11-04 14:39:14 +01:00
maxSamplesPerBlock := * maxRowsPerBlock
// Allow up to 10x of labels per each block on average.
maxLabelsPerBlock := 10 * maxSamplesPerBlock
2023-11-25 10:31:30 +01:00
2020-02-28 17:57:45 +01:00
for len ( tss ) > 0 {
2020-07-10 14:13:26 +02:00
// Process big tss in smaller blocks in order to reduce the maximum memory usage
2020-09-26 03:07:45 +02:00
samplesCount := 0
2021-03-30 23:44:31 +02:00
labelsCount := 0
2020-09-26 03:07:45 +02:00
i := 0
for i < len ( tss ) {
samplesCount += len ( tss [ i ] . Samples )
2023-11-25 10:31:30 +01:00
labelsCount += len ( tss [ i ] . Samples ) * len ( tss [ i ] . Labels )
2020-09-26 03:07:45 +02:00
i ++
2021-11-04 14:39:14 +01:00
if samplesCount >= maxSamplesPerBlock || labelsCount >= maxLabelsPerBlock {
2020-09-26 03:07:45 +02:00
break
}
}
2024-03-21 17:14:49 +01:00
ingestionRateLimiter . register ( samplesCount )
2020-02-28 17:57:45 +01:00
tssBlock := tss
2020-09-26 03:07:45 +02:00
if i < len ( tss ) {
tssBlock = tss [ : i ]
tss = tss [ i : ]
2020-02-28 19:03:38 +01:00
} else {
tss = nil
2020-02-28 17:57:45 +01:00
}
2023-12-05 00:20:44 +01:00
if tenantRctx != nil {
tenantRctx . tenantToLabels ( tssBlock , at . AccountID , at . ProjectID )
}
2020-03-03 12:08:17 +01:00
if rctx != nil {
2022-05-06 14:28:59 +02:00
rowsCountBeforeRelabel := getRowsCount ( tssBlock )
2023-08-15 13:47:48 +02:00
tssBlock = rctx . applyRelabeling ( tssBlock , pcsGlobal )
2022-05-06 14:28:59 +02:00
rowsCountAfterRelabel := getRowsCount ( tssBlock )
rowsDroppedByGlobalRelabel . Add ( rowsCountBeforeRelabel - rowsCountAfterRelabel )
2020-03-03 12:08:17 +01:00
}
2021-05-20 01:12:36 +02:00
sortLabelsIfNeeded ( tssBlock )
2021-05-20 12:13:40 +02:00
tssBlock = limitSeriesCardinality ( tssBlock )
2023-11-25 10:31:30 +01:00
if ! tryPushBlockToRemoteStorages ( rwctxs , tssBlock ) {
if ! * disableOnDiskQueue {
logger . Panicf ( "BUG: tryPushBlockToRemoteStorages must return true if -remoteWrite.disableOnDiskQueue isn't set" )
}
pushFailures . Inc ( )
2023-11-25 13:42:37 +01:00
if dropSamplesOnFailure {
2023-11-25 10:31:30 +01:00
samplesDropped . Add ( rowsCount )
return true
2023-11-24 13:42:11 +01:00
}
return false
}
2020-02-28 17:57:45 +01:00
}
2023-11-24 13:42:11 +01:00
return true
2020-02-23 12:35:47 +01:00
}
2023-11-25 10:31:30 +01:00
var (
samplesDropped = metrics . NewCounter ( ` vmagent_remotewrite_samples_dropped_total ` )
pushFailures = metrics . NewCounter ( ` vmagent_remotewrite_push_failures_total ` )
)
func tryPushBlockToRemoteStorages ( rwctxs [ ] * remoteWriteCtx , tssBlock [ ] prompbmarshal . TimeSeries ) bool {
2021-11-04 14:00:51 +01:00
if len ( tssBlock ) == 0 {
// Nothing to push
2023-11-24 13:42:11 +01:00
return true
2021-11-04 14:00:51 +01:00
}
2023-11-24 13:42:11 +01:00
2023-07-25 03:15:24 +02:00
if len ( rwctxs ) == 1 {
// Fast path - just push data to the configured single remote storage
2023-11-25 10:31:30 +01:00
return rwctxs [ 0 ] . TryPush ( tssBlock )
2023-07-25 03:15:24 +02:00
}
// We need to push tssBlock to multiple remote storages.
// This is either sharding or replication depending on -remoteWrite.shardByURL command-line flag value.
if * shardByURL {
// Shard the data among rwctxs
tssByURL := make ( [ ] [ ] prompbmarshal . TimeSeries , len ( rwctxs ) )
2023-11-01 23:05:11 +01:00
tmpLabels := promutils . GetLabels ( )
2023-07-25 03:15:24 +02:00
for _ , ts := range tssBlock {
2023-11-01 23:05:11 +01:00
hashLabels := ts . Labels
if len ( shardByURLLabelsMap ) > 0 {
hashLabels = tmpLabels . Labels [ : 0 ]
for _ , label := range ts . Labels {
if _ , ok := shardByURLLabelsMap [ label . Name ] ; ok {
hashLabels = append ( hashLabels , label )
}
}
}
h := getLabelsHash ( hashLabels )
2023-07-25 03:15:24 +02:00
idx := h % uint64 ( len ( tssByURL ) )
tssByURL [ idx ] = append ( tssByURL [ idx ] , ts )
}
2023-11-01 23:05:11 +01:00
promutils . PutLabels ( tmpLabels )
2023-07-25 03:15:24 +02:00
// Push sharded data to remote storages in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync . WaitGroup
2024-02-24 01:44:19 +01:00
var anyPushFailed atomic . Bool
2023-07-25 03:15:24 +02:00
for i , rwctx := range rwctxs {
tssShard := tssByURL [ i ]
if len ( tssShard ) == 0 {
continue
}
2024-02-22 10:54:53 +01:00
wg . Add ( 1 )
2023-07-25 03:15:24 +02:00
go func ( rwctx * remoteWriteCtx , tss [ ] prompbmarshal . TimeSeries ) {
defer wg . Done ( )
2023-11-25 10:31:30 +01:00
if ! rwctx . TryPush ( tss ) {
2024-02-24 01:44:19 +01:00
anyPushFailed . Store ( true )
2023-11-24 13:42:11 +01:00
}
2023-07-25 03:15:24 +02:00
} ( rwctx , tssShard )
}
wg . Wait ( )
2024-02-24 01:44:19 +01:00
return ! anyPushFailed . Load ( )
2023-07-25 03:15:24 +02:00
}
// Replicate data among rwctxs.
// Push block to remote storages in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
2021-11-04 14:00:51 +01:00
var wg sync . WaitGroup
2023-07-25 03:15:24 +02:00
wg . Add ( len ( rwctxs ) )
2024-02-24 01:44:19 +01:00
var anyPushFailed atomic . Bool
2021-11-04 14:00:51 +01:00
for _ , rwctx := range rwctxs {
2021-11-04 15:58:28 +01:00
go func ( rwctx * remoteWriteCtx ) {
2021-11-04 14:00:51 +01:00
defer wg . Done ( )
2023-11-25 10:31:30 +01:00
if ! rwctx . TryPush ( tssBlock ) {
2024-02-24 01:44:19 +01:00
anyPushFailed . Store ( true )
2023-11-24 13:42:11 +01:00
}
2021-11-04 15:58:28 +01:00
} ( rwctx )
2021-11-04 14:00:51 +01:00
}
2021-11-04 15:58:28 +01:00
wg . Wait ( )
2024-02-24 01:44:19 +01:00
return ! anyPushFailed . Load ( )
2021-11-04 14:00:51 +01:00
}
2021-05-20 01:12:36 +02:00
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
func sortLabelsIfNeeded ( tss [ ] prompbmarshal . TimeSeries ) {
if ! * sortLabels {
return
}
for i := range tss {
promrelabel . SortLabels ( tss [ i ] . Labels )
}
}
2021-05-20 12:13:40 +02:00
func limitSeriesCardinality ( tss [ ] prompbmarshal . TimeSeries ) [ ] prompbmarshal . TimeSeries {
if hourlySeriesLimiter == nil && dailySeriesLimiter == nil {
return tss
}
dst := make ( [ ] prompbmarshal . TimeSeries , 0 , len ( tss ) )
for i := range tss {
labels := tss [ i ] . Labels
h := getLabelsHash ( labels )
if hourlySeriesLimiter != nil && ! hourlySeriesLimiter . Add ( h ) {
2021-05-20 13:15:19 +02:00
hourlySeriesLimitRowsDropped . Add ( len ( tss [ i ] . Samples ) )
logSkippedSeries ( labels , "-remoteWrite.maxHourlySeries" , hourlySeriesLimiter . MaxItems ( ) )
2021-05-20 12:13:40 +02:00
continue
}
if dailySeriesLimiter != nil && ! dailySeriesLimiter . Add ( h ) {
2021-05-20 13:15:19 +02:00
dailySeriesLimitRowsDropped . Add ( len ( tss [ i ] . Samples ) )
logSkippedSeries ( labels , "-remoteWrite.maxDailySeries" , dailySeriesLimiter . MaxItems ( ) )
2021-05-20 12:13:40 +02:00
continue
}
dst = append ( dst , tss [ i ] )
}
return dst
}
var (
2024-03-21 17:14:49 +01:00
ingestionRateLimiter * rateLimiter
ingestionRateLimiterStopCh chan struct { }
ingestionRateLimitReached * metrics . Counter
2021-05-20 12:13:40 +02:00
hourlySeriesLimiter * bloomfilter . Limiter
dailySeriesLimiter * bloomfilter . Limiter
2021-05-20 13:15:19 +02:00
hourlySeriesLimitRowsDropped = metrics . NewCounter ( ` vmagent_hourly_series_limit_rows_dropped_total ` )
dailySeriesLimitRowsDropped = metrics . NewCounter ( ` vmagent_daily_series_limit_rows_dropped_total ` )
2021-05-20 12:13:40 +02:00
)
func getLabelsHash ( labels [ ] prompbmarshal . Label ) uint64 {
bb := labelsHashBufPool . Get ( )
b := bb . B [ : 0 ]
for _ , label := range labels {
b = append ( b , label . Name ... )
b = append ( b , label . Value ... )
}
h := xxhash . Sum64 ( b )
bb . B = b
labelsHashBufPool . Put ( bb )
return h
}
var labelsHashBufPool bytesutil . ByteBufferPool
func logSkippedSeries ( labels [ ] prompbmarshal . Label , flagName string , flagValue int ) {
select {
case <- logSkippedSeriesTicker . C :
2021-12-21 16:03:25 +01:00
// Do not use logger.WithThrottler() here, since this will increase CPU usage
// because every call to logSkippedSeries will result to a call to labelsToString.
2021-05-20 12:13:40 +02:00
logger . Warnf ( "skip series %s because %s=%d reached" , labelsToString ( labels ) , flagName , flagValue )
default :
}
}
var logSkippedSeriesTicker = time . NewTicker ( 5 * time . Second )
func labelsToString ( labels [ ] prompbmarshal . Label ) string {
var b [ ] byte
b = append ( b , '{' )
for i , label := range labels {
b = append ( b , label . Name ... )
b = append ( b , '=' )
b = strconv . AppendQuote ( b , label . Value )
if i + 1 < len ( labels ) {
b = append ( b , ',' )
}
}
b = append ( b , '}' )
return string ( b )
}
2022-05-06 14:28:59 +02:00
var (
2022-05-06 14:50:50 +02:00
globalRowsPushedBeforeRelabel = metrics . NewCounter ( "vmagent_remotewrite_global_rows_pushed_before_relabel_total" )
2022-05-06 14:28:59 +02:00
rowsDroppedByGlobalRelabel = metrics . NewCounter ( "vmagent_remotewrite_global_relabel_metrics_dropped_total" )
)
2020-03-03 12:08:17 +01:00
type remoteWriteCtx struct {
2023-01-04 07:19:18 +01:00
idx int
fq * persistentqueue . FastQueue
c * client
2024-03-04 23:45:22 +01:00
sas atomic . Pointer [ streamaggr . Aggregators ]
deduplicator * streamaggr . Deduplicator
2023-01-04 07:19:18 +01:00
streamAggrKeepInput bool
2023-07-25 01:44:09 +02:00
streamAggrDropInput bool
2023-01-04 07:19:18 +01:00
2020-03-03 12:08:17 +01:00
pss [ ] * pendingSeries
2024-02-24 01:44:19 +01:00
pssNextIdx atomic . Uint64
2020-03-03 12:08:17 +01:00
2023-11-25 10:31:30 +01:00
rowsPushedAfterRelabel * metrics . Counter
rowsDroppedByRelabel * metrics . Counter
2020-03-03 12:08:17 +01:00
}
2023-09-01 09:34:16 +02:00
func newRemoteWriteCtx ( argIdx int , remoteWriteURL * url . URL , maxInmemoryBlocks int , sanitizedURL string ) * remoteWriteCtx {
2021-09-28 23:52:07 +02:00
// strip query params, otherwise changing params resets pq
pqURL := * remoteWriteURL
pqURL . RawQuery = ""
pqURL . Fragment = ""
h := xxhash . Sum64 ( [ ] byte ( pqURL . String ( ) ) )
2023-03-28 03:33:05 +02:00
queuePath := filepath . Join ( * tmpDataPath , persistentQueueDirname , fmt . Sprintf ( "%d_%016X" , argIdx + 1 , h ) )
2023-08-12 13:17:55 +02:00
maxPendingBytes := maxPendingBytesPerURL . GetOptionalArg ( argIdx )
2023-04-26 12:23:01 +02:00
if maxPendingBytes != 0 && maxPendingBytes < persistentqueue . DefaultChunkFileSize {
2023-05-09 00:42:28 +02:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4195
logger . Warnf ( "rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d" , maxPendingBytes , persistentqueue . DefaultChunkFileSize )
maxPendingBytes = persistentqueue . DefaultChunkFileSize
2023-04-26 12:23:01 +02:00
}
2023-11-24 13:42:11 +01:00
fq := persistentqueue . MustOpenFastQueue ( queuePath , sanitizedURL , maxInmemoryBlocks , maxPendingBytes , * disableOnDiskQueue )
2021-09-28 23:52:07 +02:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_pending_data_bytes { path=%q, url=%q} ` , queuePath , sanitizedURL ) , func ( ) float64 {
2020-03-03 12:08:17 +01:00
return float64 ( fq . GetPendingBytes ( ) )
} )
2021-09-28 23:52:07 +02:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_pending_inmemory_blocks { path=%q, url=%q} ` , queuePath , sanitizedURL ) , func ( ) float64 {
2020-03-03 12:08:17 +01:00
return float64 ( fq . GetInmemoryQueueLen ( ) )
} )
2023-11-25 10:31:30 +01:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_queue_blocked { path=%q, url=%q} ` , queuePath , sanitizedURL ) , func ( ) float64 {
if fq . IsWriteBlocked ( ) {
return 1
2023-11-24 13:42:11 +01:00
}
return 0
} )
2023-02-24 02:36:52 +01:00
2021-09-28 23:52:07 +02:00
var c * client
switch remoteWriteURL . Scheme {
case "http" , "https" :
2023-02-26 21:07:30 +01:00
c = newHTTPClient ( argIdx , remoteWriteURL . String ( ) , sanitizedURL , fq , * queues )
2021-09-28 23:52:07 +02:00
default :
logger . Fatalf ( "unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`" , remoteWriteURL . Scheme , sanitizedURL )
}
c . init ( argIdx , * queues , sanitizedURL )
2023-01-04 07:19:18 +01:00
// Initialize pss
2023-08-12 13:17:55 +02:00
sf := significantFigures . GetOptionalArg ( argIdx )
rd := roundDigits . GetOptionalArg ( argIdx )
2021-03-31 15:16:26 +02:00
pssLen := * queues
if n := cgroup . AvailableCPUs ( ) ; pssLen > n {
// There is no sense in running more than availableCPUs concurrent pendingSeries,
// since every pendingSeries can saturate up to a single CPU.
pssLen = n
}
pss := make ( [ ] * pendingSeries , pssLen )
2020-03-03 12:08:17 +01:00
for i := range pss {
2023-11-24 13:42:11 +01:00
pss [ i ] = newPendingSeries ( fq , c . useVMProto , sf , rd )
2020-03-03 12:08:17 +01:00
}
2023-01-04 07:19:18 +01:00
rwctx := & remoteWriteCtx {
2020-05-30 13:36:40 +02:00
idx : argIdx ,
fq : fq ,
c : c ,
pss : pss ,
2020-03-03 12:08:17 +01:00
2023-11-25 10:31:30 +01:00
rowsPushedAfterRelabel : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_rows_pushed_after_relabel_total { path=%q, url=%q} ` , queuePath , sanitizedURL ) ) ,
rowsDroppedByRelabel : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_relabel_metrics_dropped_total { path=%q, url=%q} ` , queuePath , sanitizedURL ) ) ,
2020-02-23 12:35:47 +01:00
}
2023-01-04 07:19:18 +01:00
// Initialize sas
2023-04-01 06:27:45 +02:00
sasFile := streamAggrConfig . GetOptionalArg ( argIdx )
2024-03-04 23:45:22 +01:00
dedupInterval := streamAggrDedupInterval . GetOptionalArg ( argIdx )
2024-03-17 22:01:44 +01:00
ignoreOldSamples := streamAggrIgnoreOldSamples . GetOptionalArg ( argIdx )
2023-04-01 06:27:45 +02:00
if sasFile != "" {
2024-03-04 04:42:55 +01:00
opts := & streamaggr . Options {
2024-03-17 22:01:44 +01:00
DedupInterval : dedupInterval ,
DropInputLabels : * streamAggrDropInputLabels ,
IgnoreOldSamples : ignoreOldSamples ,
2024-03-04 04:42:55 +01:00
}
sas , err := streamaggr . LoadFromFile ( sasFile , rwctx . pushInternalTrackDropped , opts )
2023-01-04 07:19:18 +01:00
if err != nil {
2023-04-01 06:27:45 +02:00
logger . Fatalf ( "cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s" , sasFile , err )
2023-01-04 07:19:18 +01:00
}
2023-04-01 06:27:45 +02:00
rwctx . sas . Store ( sas )
2023-01-04 07:19:18 +01:00
rwctx . streamAggrKeepInput = streamAggrKeepInput . GetOptionalArg ( argIdx )
2023-07-25 01:44:09 +02:00
rwctx . streamAggrDropInput = streamAggrDropInput . GetOptionalArg ( argIdx )
2023-04-01 06:27:45 +02:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reload_successful { path=%q} ` , sasFile ) ) . Set ( 1 )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reload_success_timestamp_seconds { path=%q} ` , sasFile ) ) . Set ( fasttime . UnixTimestamp ( ) )
2024-03-04 23:45:22 +01:00
} else if dedupInterval > 0 {
2024-03-05 01:13:21 +01:00
rwctx . deduplicator = streamaggr . NewDeduplicator ( rwctx . pushInternalTrackDropped , dedupInterval , * streamAggrDropInputLabels )
2023-01-04 07:19:18 +01:00
}
return rwctx
2020-02-23 12:35:47 +01:00
}
2020-03-03 12:08:17 +01:00
func ( rwctx * remoteWriteCtx ) MustStop ( ) {
2024-03-04 23:45:22 +01:00
// sas and deduplicator must be stopped before rwctx is closed
2023-06-07 15:45:43 +02:00
// because sas can write pending series to rwctx.pss if there are any
sas := rwctx . sas . Swap ( nil )
sas . MustStop ( )
2024-03-04 23:45:22 +01:00
if rwctx . deduplicator != nil {
rwctx . deduplicator . MustStop ( )
rwctx . deduplicator = nil
}
2020-03-03 12:08:17 +01:00
for _ , ps := range rwctx . pss {
ps . MustStop ( )
}
2020-05-30 13:36:40 +02:00
rwctx . idx = 0
2020-03-03 12:08:17 +01:00
rwctx . pss = nil
2021-02-18 23:31:07 +01:00
rwctx . fq . UnblockAllReaders ( )
2020-03-03 12:08:17 +01:00
rwctx . c . MustStop ( )
rwctx . c = nil
2023-04-01 06:27:45 +02:00
2021-02-17 20:23:38 +01:00
rwctx . fq . MustClose ( )
rwctx . fq = nil
2020-03-03 12:08:17 +01:00
2022-05-06 14:28:59 +02:00
rwctx . rowsPushedAfterRelabel = nil
rwctx . rowsDroppedByRelabel = nil
2020-03-03 12:08:17 +01:00
}
2020-02-23 12:35:47 +01:00
2023-11-25 10:31:30 +01:00
func ( rwctx * remoteWriteCtx ) TryPush ( tss [ ] prompbmarshal . TimeSeries ) bool {
2023-01-04 07:19:18 +01:00
// Apply relabeling
2020-03-03 12:08:17 +01:00
var rctx * relabelCtx
2020-07-10 14:13:26 +02:00
var v * [ ] prompbmarshal . TimeSeries
2023-07-20 02:37:49 +02:00
rcs := allRelabelConfigs . Load ( )
2021-02-22 15:33:55 +01:00
pcs := rcs . perURL [ rwctx . idx ]
if pcs . Len ( ) > 0 {
2020-07-10 14:13:26 +02:00
rctx = getRelabelCtx ( )
2020-05-12 21:01:47 +02:00
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
2020-07-10 14:13:26 +02:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
2023-07-25 01:33:30 +02:00
v = tssPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
2020-07-10 14:13:26 +02:00
tss = append ( * v , tss ... )
2022-05-06 14:28:59 +02:00
rowsCountBeforeRelabel := getRowsCount ( tss )
2023-08-15 13:47:48 +02:00
tss = rctx . applyRelabeling ( tss , pcs )
2022-05-06 14:28:59 +02:00
rowsCountAfterRelabel := getRowsCount ( tss )
rwctx . rowsDroppedByRelabel . Add ( rowsCountBeforeRelabel - rowsCountAfterRelabel )
2020-03-03 12:08:17 +01:00
}
2022-05-06 14:28:59 +02:00
rowsCount := getRowsCount ( tss )
rwctx . rowsPushedAfterRelabel . Add ( rowsCount )
2023-01-04 07:19:18 +01:00
2024-03-04 23:45:22 +01:00
// Apply stream aggregation or deduplication if they are configured
2023-07-25 01:44:09 +02:00
sas := rwctx . sas . Load ( )
if sas != nil {
matchIdxs := matchIdxsPool . Get ( )
matchIdxs . B = sas . Push ( tss , matchIdxs . B )
if ! rwctx . streamAggrKeepInput {
if rctx == nil {
rctx = getRelabelCtx ( )
// Make a copy of tss before dropping aggregated series
v = tssPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
tss = append ( * v , tss ... )
}
tss = dropAggregatedSeries ( tss , matchIdxs . B , rwctx . streamAggrDropInput )
2023-07-25 01:33:30 +02:00
}
2023-07-25 01:44:09 +02:00
matchIdxsPool . Put ( matchIdxs )
2024-03-04 23:45:22 +01:00
} else if rwctx . deduplicator != nil {
rwctx . deduplicator . Push ( tss )
clear ( tss )
tss = tss [ : 0 ]
2023-07-25 01:33:30 +02:00
}
2023-11-25 10:31:30 +01:00
// Try pushing the data to remote storage
ok := rwctx . tryPushInternal ( tss )
// Return back relabeling contexts to the pool
if rctx != nil {
* v = prompbmarshal . ResetTimeSeries ( tss )
tssPool . Put ( v )
putRelabelCtx ( rctx )
}
return ok
2023-07-25 01:44:09 +02:00
}
2023-07-25 01:33:30 +02:00
2023-07-25 01:44:09 +02:00
var matchIdxsPool bytesutil . ByteBufferPool
2023-07-25 01:33:30 +02:00
2023-07-25 01:44:09 +02:00
func dropAggregatedSeries ( src [ ] prompbmarshal . TimeSeries , matchIdxs [ ] byte , dropInput bool ) [ ] prompbmarshal . TimeSeries {
dst := src [ : 0 ]
2023-08-10 14:27:21 +02:00
if ! dropInput {
for i , match := range matchIdxs {
if match == 1 {
continue
}
dst = append ( dst , src [ i ] )
2023-07-25 01:44:09 +02:00
}
}
tail := src [ len ( dst ) : ]
2024-03-04 23:45:22 +01:00
clear ( tail )
2023-07-25 01:44:09 +02:00
return dst
2020-03-03 12:08:17 +01:00
}
2020-07-10 14:13:26 +02:00
2023-11-24 13:42:11 +01:00
func ( rwctx * remoteWriteCtx ) pushInternalTrackDropped ( tss [ ] prompbmarshal . TimeSeries ) {
2023-11-25 10:31:30 +01:00
if rwctx . tryPushInternal ( tss ) {
return
}
if ! * disableOnDiskQueue {
logger . Panicf ( "BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set" )
}
pushFailures . Inc ( )
if * dropSamplesOnOverload {
rowsCount := getRowsCount ( tss )
samplesDropped . Add ( rowsCount )
2023-11-24 13:42:11 +01:00
}
}
2023-11-25 10:31:30 +01:00
func ( rwctx * remoteWriteCtx ) tryPushInternal ( tss [ ] prompbmarshal . TimeSeries ) bool {
2023-09-08 23:17:16 +02:00
var rctx * relabelCtx
var v * [ ] prompbmarshal . TimeSeries
2023-08-15 13:47:48 +02:00
if len ( labelsGlobal ) > 0 {
2023-09-08 23:17:16 +02:00
// Make a copy of tss before adding extra labels in order to prevent
// from affecting time series for other remoteWrite.url configs.
rctx = getRelabelCtx ( )
v = tssPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
tss = append ( * v , tss ... )
2023-08-17 14:35:26 +02:00
rctx . appendExtraLabels ( tss , labelsGlobal )
2023-08-15 13:47:48 +02:00
}
2023-08-17 12:15:03 +02:00
2023-01-04 07:19:18 +01:00
pss := rwctx . pss
2024-02-24 01:44:19 +01:00
idx := rwctx . pssNextIdx . Add ( 1 ) % uint64 ( len ( pss ) )
2023-11-25 10:31:30 +01:00
ok := pss [ idx ] . TryPush ( tss )
if rctx != nil {
* v = prompbmarshal . ResetTimeSeries ( tss )
tssPool . Put ( v )
putRelabelCtx ( rctx )
}
return ok
2023-01-04 07:19:18 +01:00
}
2023-03-29 18:05:58 +02:00
func ( rwctx * remoteWriteCtx ) reinitStreamAggr ( ) {
2023-10-16 15:57:24 +02:00
sasFile := streamAggrConfig . GetOptionalArg ( rwctx . idx )
if sasFile == "" {
2023-04-01 06:27:45 +02:00
// There is no stream aggregation for rwctx
2023-03-29 18:05:58 +02:00
return
}
2023-10-16 16:00:24 +02:00
2023-04-01 06:27:45 +02:00
logger . Infof ( "reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q" , sasFile )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reloads_total { path=%q} ` , sasFile ) ) . Inc ( )
2024-03-04 04:42:55 +01:00
opts := & streamaggr . Options {
2024-03-17 22:01:44 +01:00
DedupInterval : streamAggrDedupInterval . GetOptionalArg ( rwctx . idx ) ,
DropInputLabels : * streamAggrDropInputLabels ,
IgnoreOldSamples : streamAggrIgnoreOldSamples . GetOptionalArg ( rwctx . idx ) ,
2024-03-04 04:42:55 +01:00
}
sasNew , err := streamaggr . LoadFromFile ( sasFile , rwctx . pushInternalTrackDropped , opts )
2023-04-01 06:27:45 +02:00
if err != nil {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reloads_errors_total { path=%q} ` , sasFile ) ) . Inc ( )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reload_successful { path=%q} ` , sasFile ) ) . Set ( 0 )
logger . Errorf ( "cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s" , sasFile , err )
2023-03-29 18:05:58 +02:00
return
}
2023-10-16 20:52:52 +02:00
sas := rwctx . sas . Load ( )
2023-04-01 06:27:45 +02:00
if ! sasNew . Equal ( sas ) {
sasOld := rwctx . sas . Swap ( sasNew )
sasOld . MustStop ( )
logger . Infof ( "successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q" , sasFile )
} else {
sasNew . MustStop ( )
logger . Infof ( "the config at -remoteWrite.streamAggr.config=%q wasn't changed" , sasFile )
2023-03-29 18:05:58 +02:00
}
2023-04-01 06:27:45 +02:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reload_successful { path=%q} ` , sasFile ) ) . Set ( 1 )
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_streamaggr_config_reload_success_timestamp_seconds { path=%q} ` , sasFile ) ) . Set ( fasttime . UnixTimestamp ( ) )
2023-03-29 18:05:58 +02:00
}
2023-07-25 01:33:30 +02:00
var tssPool = & sync . Pool {
2020-07-10 14:13:26 +02:00
New : func ( ) interface { } {
2020-07-14 13:27:50 +02:00
a := [ ] prompbmarshal . TimeSeries { }
return & a
2020-07-10 14:13:26 +02:00
} ,
}
2022-05-06 14:28:59 +02:00
func getRowsCount ( tss [ ] prompbmarshal . TimeSeries ) int {
rowsCount := 0
for _ , ts := range tss {
rowsCount += len ( ts . Samples )
}
return rowsCount
}
2023-04-01 06:27:45 +02:00
// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config
func CheckStreamAggrConfigs ( ) error {
pushNoop := func ( tss [ ] prompbmarshal . TimeSeries ) { }
for idx , sasFile := range * streamAggrConfig {
if sasFile == "" {
continue
}
2024-03-04 04:42:55 +01:00
opts := & streamaggr . Options {
2024-03-17 22:01:44 +01:00
DedupInterval : streamAggrDedupInterval . GetOptionalArg ( idx ) ,
DropInputLabels : * streamAggrDropInputLabels ,
IgnoreOldSamples : streamAggrIgnoreOldSamples . GetOptionalArg ( idx ) ,
2024-03-04 04:42:55 +01:00
}
sas , err := streamaggr . LoadFromFile ( sasFile , pushNoop , opts )
2023-04-01 06:27:45 +02:00
if err != nil {
return fmt . Errorf ( "cannot load -remoteWrite.streamAggr.config=%q: %w" , sasFile , err )
}
sas . MustStop ( )
}
return nil
}