2020-02-23 12:35:47 +01:00
package remotewrite
import (
"flag"
"fmt"
2023-11-24 13:42:11 +01:00
"net/http"
2021-09-28 23:52:07 +02:00
"net/url"
2023-03-28 03:15:28 +02:00
"path/filepath"
2024-07-13 01:55:26 +02:00
"slices"
2021-05-20 12:13:40 +02:00
"strconv"
2020-05-30 13:36:40 +02:00
"sync"
2020-02-23 12:35:47 +01:00
"sync/atomic"
2021-05-20 12:13:40 +02:00
"time"
2020-02-23 12:35:47 +01:00
2023-11-24 13:42:11 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2021-08-05 08:46:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2021-05-20 12:13:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2022-11-21 23:38:43 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2023-03-28 03:15:28 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
2020-05-30 13:36:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2021-05-20 01:12:36 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
2023-11-01 23:05:11 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
2024-03-30 05:38:29 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
2023-01-04 07:19:18 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
2023-08-11 14:38:28 +02:00
"github.com/VictoriaMetrics/metrics"
2023-08-11 15:23:00 +02:00
"github.com/cespare/xxhash/v2"
2020-02-23 12:35:47 +01:00
)
var (
2023-02-24 02:36:52 +01:00
remoteWriteURLs = flagutil . NewArrayString ( "remoteWrite.url" , "Remote storage URL to write data to. It must support either VictoriaMetrics remote write protocol " +
"or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . " +
2023-07-25 03:15:24 +02:00
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. " +
2023-12-05 00:20:44 +01:00
"The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set" )
enableMultitenantHandlers = flag . Bool ( "enableMultitenantHandlers" , false , "Whether to process incoming data via multitenant insert handlers according to " +
2024-04-18 02:54:20 +02:00
"https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers " +
2023-12-05 00:20:44 +01:00
"according to https://docs.victoriametrics.com/#how-to-import-time-series-data ." +
2024-04-18 01:31:37 +02:00
"See https://docs.victoriametrics.com/vmagent/#multitenancy for details" )
2024-04-19 11:25:41 +02:00
2023-07-25 03:15:24 +02:00
shardByURL = flag . Bool ( "remoteWrite.shardByURL" , false , "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . " +
2024-04-19 11:25:41 +02:00
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages . " +
"See also -remoteWrite.shardByURLReplicas" )
shardByURLReplicas = flag . Int ( "remoteWrite.shardByURLReplicas" , 1 , "How many copies of data to make among remote storage systems enumerated via -remoteWrite.url " +
"when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages" )
2023-11-01 23:05:11 +01:00
shardByURLLabels = flagutil . NewArrayString ( "remoteWrite.shardByURL.labels" , "Optional list of labels, which must be used for sharding outgoing samples " +
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain " +
2024-04-02 23:36:32 +02:00
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels" )
shardByURLIgnoreLabels = flagutil . NewArrayString ( "remoteWrite.shardByURL.ignoreLabels" , "Optional list of labels, which must be ignored when sharding outgoing samples " +
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain " +
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels" )
2024-04-19 11:25:41 +02:00
2024-04-02 23:36:32 +02:00
tmpDataPath = flag . String ( "remoteWrite.tmpDataPath" , "vmagent-remotewrite-data" , "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . " +
2023-11-25 10:31:30 +01:00
"See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue" )
2023-03-28 03:15:28 +02:00
keepDanglingQueues = flag . Bool ( "remoteWrite.keepDanglingQueues" , false , "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. " +
"Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on." )
2021-06-17 12:26:35 +02:00
queues = flag . Int ( "remoteWrite.queues" , cgroup . AvailableCPUs ( ) * 2 , "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues " +
2024-03-06 12:43:08 +01:00
"isn't enough for sending high volume of collected data to remote storage. " +
"Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage" )
2020-02-23 12:35:47 +01:00
showRemoteWriteURL = flag . Bool ( "remoteWrite.showURL" , false , "Whether to show -remoteWrite.url in the exported metrics. " +
2020-07-10 13:07:02 +02:00
"It is hidden by default, since it can contain sensitive info such as auth key" )
2023-08-12 13:17:55 +02:00
maxPendingBytesPerURL = flagutil . NewArrayBytes ( "remoteWrite.maxDiskUsagePerURL" , 0 , "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath " +
2020-03-03 18:48:46 +01:00
"for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. " +
2023-04-26 12:23:01 +02:00
"Buffered data is stored in ~500MB chunks. It is recommended to set the value for this flag to a multiple of the block size 500MB. " +
2020-03-03 18:48:46 +01:00
"Disk usage is unlimited if the value is set to 0" )
2023-08-12 13:17:55 +02:00
significantFigures = flagutil . NewArrayInt ( "remoteWrite.significantFigures" , 0 , "The number of significant figures to leave in metric values before writing them " +
2021-02-01 13:27:05 +01:00
"to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. " +
"This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits" )
2023-08-12 13:17:55 +02:00
roundDigits = flagutil . NewArrayInt ( "remoteWrite.roundDigits" , 100 , "Round metric values to this number of decimal digits after the point before " +
"writing them to remote storage. " +
2021-02-01 13:27:05 +01:00
"Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. " +
2023-05-10 09:50:41 +02:00
"By default, digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. " +
2021-02-01 13:27:05 +01:00
"This option may be used for improving data compression for the stored metrics" )
2021-05-20 01:12:36 +02:00
sortLabels = flag . Bool ( "sortLabels" , false , ` Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. ` +
` This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. ` +
` For example, if m { k1="v1",k2="v2"} may be sent as m { k2="v2",k1="v1"} ` +
` Enabled sorting for labels can slow down ingestion performance a bit ` )
2021-05-20 12:13:40 +02:00
maxHourlySeries = flag . Int ( "remoteWrite.maxHourlySeries" , 0 , "The maximum number of unique series vmagent can send to remote storage systems during the last hour. " +
2024-04-18 01:31:37 +02:00
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter" )
2021-05-20 12:13:40 +02:00
maxDailySeries = flag . Int ( "remoteWrite.maxDailySeries" , 0 , "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. " +
2024-04-18 01:31:37 +02:00
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter" )
2024-03-30 05:38:29 +01:00
maxIngestionRate = flag . Int ( "maxIngestionRate" , 0 , "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. " +
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit" )
2023-04-01 06:27:45 +02:00
2024-05-10 12:09:21 +02:00
disableOnDiskQueue = flagutil . NewArrayBool ( "remoteWrite.disableOnDiskQueue" , "Whether to disable storing pending data to -remoteWrite.tmpDataPath " +
2024-07-13 01:55:26 +02:00
"when the remote storage system at the corresponding -remoteWrite.url cannot keep up with the data ingestion rate. " +
"See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence . See also -remoteWrite.dropSamplesOnOverload" )
dropSamplesOnOverload = flag . Bool ( "remoteWrite.dropSamplesOnOverload" , false , "Whether to drop samples when -remoteWrite.disableOnDiskQueue is set and if the samples " +
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence" )
2020-02-23 12:35:47 +01:00
)
2021-08-05 08:46:19 +02:00
var (
2024-07-15 09:29:17 +02:00
// rwctxsGlobal contains statically populated entries when -remoteWrite.url is specified.
rwctxsGlobal [ ] * remoteWriteCtx
2021-08-05 08:46:19 +02:00
2024-05-13 15:22:37 +02:00
// Data without tenant id is written to defaultAuthToken if -enableMultitenantHandlers is specified.
2021-08-05 08:46:19 +02:00
defaultAuthToken = & auth . Token { }
2023-11-24 13:42:11 +01:00
2023-11-25 10:31:30 +01:00
// ErrQueueFullHTTPRetry must be returned when TryPush() returns false.
2023-11-24 13:42:11 +01:00
ErrQueueFullHTTPRetry = & httpserver . ErrorWithStatusCode {
2023-11-25 10:31:30 +01:00
Err : fmt . Errorf ( "remote storage systems cannot keep up with the data ingestion rate; retry the request later " +
"or remove -remoteWrite.disableOnDiskQueue from vmagent command-line flags, so it could save pending data to -remoteWrite.tmpDataPath; " +
2024-04-18 01:31:37 +02:00
"see https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence" ) ,
2023-11-24 13:42:11 +01:00
StatusCode : http . StatusTooManyRequests ,
}
2024-05-10 12:09:21 +02:00
2024-07-15 09:29:17 +02:00
// disableOnDiskQueueAny is set to true if at least a single -remoteWrite.url is configured with -remoteWrite.disableOnDiskQueue
disableOnDiskQueueAny bool
2024-07-13 01:55:26 +02:00
// dropSamplesOnFailureGlobal is set to true if -remoteWrite.dropSamplesOnOverload is set or if multiple -remoteWrite.disableOnDiskQueue options are set.
dropSamplesOnFailureGlobal bool
2021-08-05 08:46:19 +02:00
)
2024-05-13 15:22:37 +02:00
// MultitenancyEnabled returns true if -enableMultitenantHandlers is specified.
2021-08-05 08:46:19 +02:00
func MultitenancyEnabled ( ) bool {
2024-05-13 15:22:37 +02:00
return * enableMultitenantHandlers
2021-08-05 08:46:19 +02:00
}
2020-03-03 12:08:17 +01:00
2020-05-30 13:36:40 +02:00
// Contains the current relabelConfigs.
2023-07-20 02:37:49 +02:00
var allRelabelConfigs atomic . Pointer [ relabelConfigs ]
2020-05-30 13:36:40 +02:00
2024-05-17 14:00:47 +02:00
// Contains the current global stream aggregators.
var sasGlobal atomic . Pointer [ streamaggr . Aggregators ]
// Contains the current global deduplicator.
var deduplicatorGlobal * streamaggr . Deduplicator
2020-08-30 20:23:38 +02:00
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers.
2021-04-23 21:01:57 +02:00
var maxQueues = cgroup . AvailableCPUs ( ) * 16
2020-08-30 20:23:38 +02:00
2023-03-28 03:33:05 +02:00
const persistentQueueDirname = "persistent-queue"
2023-03-28 03:15:28 +02:00
2020-09-29 18:48:53 +02:00
// InitSecretFlags must be called after flag.Parse and before any logging.
func InitSecretFlags ( ) {
if ! * showRemoteWriteURL {
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
flagutil . RegisterSecretFlag ( "remoteWrite.url" )
}
}
2024-04-02 23:36:32 +02:00
var (
shardByURLLabelsMap map [ string ] struct { }
shardByURLIgnoreLabelsMap map [ string ] struct { }
)
2023-11-01 23:05:11 +01:00
2020-02-23 12:35:47 +01:00
// Init initializes remotewrite.
//
// It must be called after flag.Parse().
//
// Stop must be called for graceful shutdown.
2021-08-05 08:46:19 +02:00
func Init ( ) {
2024-05-13 15:22:37 +02:00
if len ( * remoteWriteURLs ) == 0 {
logger . Fatalf ( "at least one `-remoteWrite.url` command-line flag must be set" )
2021-08-05 08:44:29 +02:00
}
2021-05-20 12:13:40 +02:00
if * maxHourlySeries > 0 {
hourlySeriesLimiter = bloomfilter . NewLimiter ( * maxHourlySeries , time . Hour )
2021-05-20 14:27:06 +02:00
_ = metrics . NewGauge ( ` vmagent_hourly_series_limit_max_series ` , func ( ) float64 {
return float64 ( hourlySeriesLimiter . MaxItems ( ) )
} )
_ = metrics . NewGauge ( ` vmagent_hourly_series_limit_current_series ` , func ( ) float64 {
return float64 ( hourlySeriesLimiter . CurrentItems ( ) )
} )
2021-05-20 12:13:40 +02:00
}
if * maxDailySeries > 0 {
dailySeriesLimiter = bloomfilter . NewLimiter ( * maxDailySeries , 24 * time . Hour )
2021-05-20 14:27:06 +02:00
_ = metrics . NewGauge ( ` vmagent_daily_series_limit_max_series ` , func ( ) float64 {
return float64 ( dailySeriesLimiter . MaxItems ( ) )
} )
_ = metrics . NewGauge ( ` vmagent_daily_series_limit_current_series ` , func ( ) float64 {
return float64 ( dailySeriesLimiter . CurrentItems ( ) )
} )
2021-05-20 12:13:40 +02:00
}
2024-03-30 05:38:29 +01:00
2020-08-30 20:23:38 +02:00
if * queues > maxQueues {
* queues = maxQueues
}
if * queues <= 0 {
* queues = 1
2020-02-23 12:35:47 +01:00
}
2024-04-02 23:36:32 +02:00
if len ( * shardByURLLabels ) > 0 && len ( * shardByURLIgnoreLabels ) > 0 {
logger . Fatalf ( "-remoteWrite.shardByURL.labels and -remoteWrite.shardByURL.ignoreLabels cannot be set simultaneously; " +
"see https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages" )
2023-11-01 23:05:11 +01:00
}
2024-04-02 23:36:32 +02:00
shardByURLLabelsMap = newMapFromStrings ( * shardByURLLabels )
shardByURLIgnoreLabelsMap = newMapFromStrings ( * shardByURLIgnoreLabels )
2020-05-30 13:36:40 +02:00
initLabelsGlobal ( )
2021-05-21 15:34:03 +02:00
// Register SIGHUP handler for config reload before loadRelabelConfigs.
// This guarantees that the config will be re-read if the signal arrives just after loadRelabelConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil . NewSighupChan ( )
2020-05-30 13:36:40 +02:00
rcs , err := loadRelabelConfigs ( )
if err != nil {
logger . Fatalf ( "cannot load relabel configs: %s" , err )
}
allRelabelConfigs . Store ( rcs )
2023-03-29 18:05:58 +02:00
relabelConfigSuccess . Set ( 1 )
relabelConfigTimestamp . Set ( fasttime . UnixTimestamp ( ) )
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 18:01:37 +02:00
initStreamAggrConfigGlobal ( )
2024-05-17 14:00:47 +02:00
2024-07-15 09:29:17 +02:00
rwctxsGlobal = newRemoteWriteCtxs ( nil , * remoteWriteURLs )
2024-05-10 12:09:21 +02:00
2024-07-13 01:55:26 +02:00
disableOnDiskQueues := [ ] bool ( * disableOnDiskQueue )
2024-07-15 09:29:17 +02:00
disableOnDiskQueueAny = slices . Contains ( disableOnDiskQueues , true )
2024-07-13 01:55:26 +02:00
// Samples must be dropped if multiple -remoteWrite.disableOnDiskQueue options are configured and at least a single is set to true.
// In this case it is impossible to prevent from sending many duplicates of samples passed to TryPush() to all the configured -remoteWrite.url
// if these samples couldn't be sent to the -remoteWrite.url with the disabled persistent queue. So it is better sending samples
// to the remaining -remoteWrite.url and dropping them on the blocked queue.
2024-07-15 09:29:17 +02:00
dropSamplesOnFailureGlobal = * dropSamplesOnOverload || disableOnDiskQueueAny && len ( disableOnDiskQueues ) > 1
2024-05-10 12:09:21 +02:00
2023-11-23 19:39:40 +01:00
dropDanglingQueues ( )
2021-08-05 08:44:29 +02:00
2020-05-30 13:36:40 +02:00
// Start config reloader.
configReloaderWG . Add ( 1 )
go func ( ) {
defer configReloaderWG . Done ( )
for {
select {
2023-04-01 06:27:45 +02:00
case <- configReloaderStopCh :
2020-05-30 13:36:40 +02:00
return
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 18:01:37 +02:00
case <- sighupCh :
2020-05-30 13:36:40 +02:00
}
2023-04-01 06:27:45 +02:00
reloadRelabelConfigs ( )
reloadStreamAggrConfigs ( )
2020-05-30 13:36:40 +02:00
}
} ( )
2020-02-23 12:35:47 +01:00
}
2023-11-23 19:39:40 +01:00
func dropDanglingQueues ( ) {
if * keepDanglingQueues {
return
}
// Remove dangling persistent queues, if any.
// This is required for the case when the number of queues has been changed or URL have been changed.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
//
2024-04-23 14:49:45 +02:00
// In case if there were many persistent queues with identical *remoteWriteURLs
// the queue with the last index will be dropped.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6140
2024-07-15 09:29:17 +02:00
existingQueues := make ( map [ string ] struct { } , len ( rwctxsGlobal ) )
for _ , rwctx := range rwctxsGlobal {
2023-11-23 19:39:40 +01:00
existingQueues [ rwctx . fq . Dirname ( ) ] = struct { } { }
}
queuesDir := filepath . Join ( * tmpDataPath , persistentQueueDirname )
files := fs . MustReadDir ( queuesDir )
removed := 0
for _ , f := range files {
dirname := f . Name ( )
if _ , ok := existingQueues [ dirname ] ; ! ok {
logger . Infof ( "removing dangling queue %q" , dirname )
fullPath := filepath . Join ( queuesDir , dirname )
fs . MustRemoveAll ( fullPath )
removed ++
}
}
if removed > 0 {
2024-07-15 09:29:17 +02:00
logger . Infof ( "removed %d dangling queues from %q, active queues: %d" , removed , * tmpDataPath , len ( rwctxsGlobal ) )
2023-11-23 19:39:40 +01:00
}
}
2023-04-01 06:27:45 +02:00
func reloadRelabelConfigs ( ) {
relabelConfigReloads . Inc ( )
logger . Infof ( "reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig" )
rcs , err := loadRelabelConfigs ( )
if err != nil {
relabelConfigReloadErrors . Inc ( )
relabelConfigSuccess . Set ( 0 )
logger . Errorf ( "cannot reload relabel configs; preserving the previous configs; error: %s" , err )
return
}
allRelabelConfigs . Store ( rcs )
relabelConfigSuccess . Set ( 1 )
relabelConfigTimestamp . Set ( fasttime . UnixTimestamp ( ) )
logger . Infof ( "successfully reloaded relabel configs" )
}
2022-11-21 23:38:43 +01:00
var (
2023-03-29 18:05:58 +02:00
relabelConfigReloads = metrics . NewCounter ( ` vmagent_relabel_config_reloads_total ` )
relabelConfigReloadErrors = metrics . NewCounter ( ` vmagent_relabel_config_reloads_errors_total ` )
2023-12-20 13:23:38 +01:00
relabelConfigSuccess = metrics . NewGauge ( ` vmagent_relabel_config_last_reload_successful ` , nil )
2023-03-29 18:05:58 +02:00
relabelConfigTimestamp = metrics . NewCounter ( ` vmagent_relabel_config_last_reload_success_timestamp_seconds ` )
2022-11-21 23:38:43 +01:00
)
2021-08-05 08:46:19 +02:00
func newRemoteWriteCtxs ( at * auth . Token , urls [ ] string ) [ ] * remoteWriteCtx {
if len ( urls ) == 0 {
logger . Panicf ( "BUG: urls must be non-empty" )
}
2021-11-04 14:39:14 +01:00
maxInmemoryBlocks := memory . Allowed ( ) / len ( urls ) / * maxRowsPerBlock / 100
2021-11-05 14:14:49 +01:00
if maxInmemoryBlocks / * queues > 100 {
2021-08-05 08:46:19 +02:00
// There is no much sense in keeping higher number of blocks in memory,
// since this means that the producer outperforms consumer and the queue
// will continue growing. It is better storing the queue to file.
2021-11-05 14:14:49 +01:00
maxInmemoryBlocks = 100 * * queues
2021-08-05 08:46:19 +02:00
}
if maxInmemoryBlocks < 2 {
maxInmemoryBlocks = 2
}
rwctxs := make ( [ ] * remoteWriteCtx , len ( urls ) )
2021-09-28 23:52:07 +02:00
for i , remoteWriteURLRaw := range urls {
remoteWriteURL , err := url . Parse ( remoteWriteURLRaw )
if err != nil {
logger . Fatalf ( "invalid -remoteWrite.url=%q: %s" , remoteWriteURL , err )
}
2021-08-05 08:46:19 +02:00
sanitizedURL := fmt . Sprintf ( "%d:secret-url" , i + 1 )
if at != nil {
2024-04-18 02:54:20 +02:00
// Construct full remote_write url for the given tenant according to https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
2021-09-28 23:52:07 +02:00
remoteWriteURL . Path = fmt . Sprintf ( "%s/insert/%d:%d/prometheus/api/v1/write" , remoteWriteURL . Path , at . AccountID , at . ProjectID )
2021-08-05 08:46:19 +02:00
sanitizedURL = fmt . Sprintf ( "%s:%d:%d" , sanitizedURL , at . AccountID , at . ProjectID )
}
if * showRemoteWriteURL {
sanitizedURL = fmt . Sprintf ( "%d:%s" , i + 1 , remoteWriteURL )
}
2023-09-01 09:34:16 +02:00
rwctxs [ i ] = newRemoteWriteCtx ( i , remoteWriteURL , maxInmemoryBlocks , sanitizedURL )
2021-08-05 08:46:19 +02:00
}
return rwctxs
}
2024-05-16 09:25:42 +02:00
var (
configReloaderStopCh = make ( chan struct { } )
configReloaderWG sync . WaitGroup
)
2020-05-30 13:36:40 +02:00
2024-03-30 05:38:29 +01:00
// StartIngestionRateLimiter starts ingestion rate limiter.
//
// Ingestion rate limiter must be started before Init() call.
//
// StopIngestionRateLimiter must be called before Stop() call in order to unblock all the callers
// to ingestion rate limiter. Otherwise deadlock may occur at Stop() call.
func StartIngestionRateLimiter ( ) {
if * maxIngestionRate <= 0 {
return
}
ingestionRateLimitReached := metrics . NewCounter ( ` vmagent_max_ingestion_rate_limit_reached_total ` )
ingestionRateLimiterStopCh = make ( chan struct { } )
ingestionRateLimiter = ratelimiter . New ( int64 ( * maxIngestionRate ) , ingestionRateLimitReached , ingestionRateLimiterStopCh )
}
// StopIngestionRateLimiter stops ingestion rate limiter.
func StopIngestionRateLimiter ( ) {
if ingestionRateLimiterStopCh == nil {
return
}
close ( ingestionRateLimiterStopCh )
ingestionRateLimiterStopCh = nil
}
var (
ingestionRateLimiter * ratelimiter . RateLimiter
ingestionRateLimiterStopCh chan struct { }
)
2020-02-23 12:35:47 +01:00
// Stop stops remotewrite.
//
2023-11-25 10:31:30 +01:00
// It is expected that nobody calls TryPush during and after the call to this func.
2020-02-23 12:35:47 +01:00
func Stop ( ) {
2023-04-01 06:27:45 +02:00
close ( configReloaderStopCh )
2020-05-30 13:36:40 +02:00
configReloaderWG . Wait ( )
2021-08-05 08:46:19 +02:00
2024-05-17 14:00:47 +02:00
sasGlobal . Load ( ) . MustStop ( )
2024-05-20 13:23:09 +02:00
if deduplicatorGlobal != nil {
deduplicatorGlobal . MustStop ( )
deduplicatorGlobal = nil
}
2024-05-17 14:00:47 +02:00
2024-07-15 09:29:17 +02:00
for _ , rwctx := range rwctxsGlobal {
2021-08-05 08:46:19 +02:00
rwctx . MustStop ( )
}
2024-07-15 09:29:17 +02:00
rwctxsGlobal = nil
2021-09-01 13:14:37 +02:00
if sl := hourlySeriesLimiter ; sl != nil {
sl . MustStop ( )
}
if sl := dailySeriesLimiter ; sl != nil {
sl . MustStop ( )
}
2020-02-23 12:35:47 +01:00
}
2024-05-13 15:22:37 +02:00
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
2023-11-25 13:42:37 +01:00
//
2024-07-13 01:55:26 +02:00
// PushDropSamplesOnFailure drops wr samples if they cannot be sent to -remoteWrite.url by any reason.
//
2023-11-25 13:42:37 +01:00
// PushDropSamplesOnFailure can modify wr contents.
func PushDropSamplesOnFailure ( at * auth . Token , wr * prompbmarshal . WriteRequest ) {
_ = tryPush ( at , wr , true )
}
2024-05-13 15:22:37 +02:00
// TryPush tries sending wr to the configured remote storage systems set via -remoteWrite.url
2021-08-05 08:46:19 +02:00
//
2023-11-25 10:31:30 +01:00
// TryPush can modify wr contents, so the caller must re-initialize wr before calling TryPush() after unsuccessful attempt.
// TryPush may send partial data from wr on unsuccessful attempt, so repeated call for the same wr may send the data multiple times.
//
// The caller must return ErrQueueFullHTTPRetry to the client, which sends wr, if TryPush returns false.
func TryPush ( at * auth . Token , wr * prompbmarshal . WriteRequest ) bool {
2024-07-13 01:55:26 +02:00
return tryPush ( at , wr , dropSamplesOnFailureGlobal )
2023-11-25 13:42:37 +01:00
}
2024-05-10 12:09:21 +02:00
func tryPush ( at * auth . Token , wr * prompbmarshal . WriteRequest , forceDropSamplesOnFailure bool ) bool {
2023-12-05 00:20:44 +01:00
tss := wr . Timeseries
if at == nil && MultitenancyEnabled ( ) {
// Write data to default tenant if at isn't set when multitenancy is enabled.
2021-08-05 08:46:19 +02:00
at = defaultAuthToken
2021-08-05 08:44:29 +02:00
}
2023-12-05 00:20:44 +01:00
var tenantRctx * relabelCtx
2024-05-13 15:22:37 +02:00
if at != nil {
2023-12-05 00:20:44 +01:00
// Convert at to (vm_account_id, vm_project_id) labels.
tenantRctx = getRelabelCtx ( )
defer putRelabelCtx ( tenantRctx )
2021-08-05 08:44:29 +02:00
}
2023-11-25 10:31:30 +01:00
2024-05-10 12:09:21 +02:00
// Quick check whether writes to configured remote storage systems are blocked.
// This allows saving CPU time spent on relabeling and block compression
// if some of remote storage systems cannot keep up with the data ingestion rate.
2024-07-15 09:29:17 +02:00
rwctxs , ok := getEligibleRemoteWriteCtxs ( tss , forceDropSamplesOnFailure )
if ! ok {
// At least a single remote write queue is blocked and dropSamplesOnFailure isn't set.
// Return false to the caller, so it could re-send samples again.
return false
}
if len ( rwctxs ) == 0 {
// All the remote write queues are skipped because they are blocked and dropSamplesOnFailure is set to true.
// Return true to the caller, so it doesn't re-send the samples again.
return true
2023-11-24 13:42:11 +01:00
}
2021-08-05 08:44:29 +02:00
2020-03-03 12:08:17 +01:00
var rctx * relabelCtx
2023-07-20 02:37:49 +02:00
rcs := allRelabelConfigs . Load ( )
2021-02-22 15:33:55 +01:00
pcsGlobal := rcs . global
2024-08-13 16:32:05 +02:00
if pcsGlobal . Len ( ) > 0 || * usePromCompatibleNaming {
2020-03-03 12:08:17 +01:00
rctx = getRelabelCtx ( )
2023-12-05 00:20:44 +01:00
defer putRelabelCtx ( rctx )
2020-03-03 12:08:17 +01:00
}
2024-07-15 09:29:17 +02:00
rowsCount := getRowsCount ( tss )
2022-05-06 14:28:59 +02:00
globalRowsPushedBeforeRelabel . Add ( rowsCount )
2021-11-04 14:39:14 +01:00
maxSamplesPerBlock := * maxRowsPerBlock
// Allow up to 10x of labels per each block on average.
maxLabelsPerBlock := 10 * maxSamplesPerBlock
2023-11-25 10:31:30 +01:00
2024-05-17 14:00:47 +02:00
sas := sasGlobal . Load ( )
2020-02-28 17:57:45 +01:00
for len ( tss ) > 0 {
2020-07-10 14:13:26 +02:00
// Process big tss in smaller blocks in order to reduce the maximum memory usage
2020-09-26 03:07:45 +02:00
samplesCount := 0
2021-03-30 23:44:31 +02:00
labelsCount := 0
2020-09-26 03:07:45 +02:00
i := 0
for i < len ( tss ) {
samplesCount += len ( tss [ i ] . Samples )
2023-11-25 10:31:30 +01:00
labelsCount += len ( tss [ i ] . Samples ) * len ( tss [ i ] . Labels )
2020-09-26 03:07:45 +02:00
i ++
2021-11-04 14:39:14 +01:00
if samplesCount >= maxSamplesPerBlock || labelsCount >= maxLabelsPerBlock {
2020-09-26 03:07:45 +02:00
break
}
}
2024-03-21 17:14:49 +01:00
2024-03-30 05:38:29 +01:00
ingestionRateLimiter . Register ( samplesCount )
2024-03-21 17:14:49 +01:00
2020-02-28 17:57:45 +01:00
tssBlock := tss
2020-09-26 03:07:45 +02:00
if i < len ( tss ) {
tssBlock = tss [ : i ]
tss = tss [ i : ]
2020-02-28 19:03:38 +01:00
} else {
tss = nil
2020-02-28 17:57:45 +01:00
}
2023-12-05 00:20:44 +01:00
if tenantRctx != nil {
tenantRctx . tenantToLabels ( tssBlock , at . AccountID , at . ProjectID )
}
2020-03-03 12:08:17 +01:00
if rctx != nil {
2022-05-06 14:28:59 +02:00
rowsCountBeforeRelabel := getRowsCount ( tssBlock )
2023-08-15 13:47:48 +02:00
tssBlock = rctx . applyRelabeling ( tssBlock , pcsGlobal )
2022-05-06 14:28:59 +02:00
rowsCountAfterRelabel := getRowsCount ( tssBlock )
rowsDroppedByGlobalRelabel . Add ( rowsCountBeforeRelabel - rowsCountAfterRelabel )
2020-03-03 12:08:17 +01:00
}
2021-05-20 01:12:36 +02:00
sortLabelsIfNeeded ( tssBlock )
2021-05-20 12:13:40 +02:00
tssBlock = limitSeriesCardinality ( tssBlock )
2024-05-20 14:03:28 +02:00
if sas . IsEnabled ( ) {
2024-05-17 14:00:47 +02:00
matchIdxs := matchIdxsPool . Get ( )
matchIdxs . B = sas . Push ( tssBlock , matchIdxs . B )
if ! * streamAggrGlobalKeepInput {
tssBlock = dropAggregatedSeries ( tssBlock , matchIdxs . B , * streamAggrGlobalDropInput )
}
matchIdxsPool . Put ( matchIdxs )
2024-09-03 10:47:05 +02:00
}
if deduplicatorGlobal != nil {
2024-05-17 14:00:47 +02:00
deduplicatorGlobal . Push ( tssBlock )
tssBlock = tssBlock [ : 0 ]
}
2024-07-15 09:29:17 +02:00
if ! tryPushBlockToRemoteStorages ( rwctxs , tssBlock , forceDropSamplesOnFailure ) {
2023-11-24 13:42:11 +01:00
return false
}
2020-02-28 17:57:45 +01:00
}
2023-11-24 13:42:11 +01:00
return true
2020-02-23 12:35:47 +01:00
}
2024-07-15 09:29:17 +02:00
func getEligibleRemoteWriteCtxs ( tss [ ] prompbmarshal . TimeSeries , forceDropSamplesOnFailure bool ) ( [ ] * remoteWriteCtx , bool ) {
if ! disableOnDiskQueueAny {
return rwctxsGlobal , true
}
// This code is applicable if at least a single remote storage has -disableOnDiskQueue
rwctxs := make ( [ ] * remoteWriteCtx , 0 , len ( rwctxsGlobal ) )
for _ , rwctx := range rwctxsGlobal {
if ! rwctx . fq . IsWriteBlocked ( ) {
rwctxs = append ( rwctxs , rwctx )
} else {
rwctx . pushFailures . Inc ( )
if ! forceDropSamplesOnFailure {
return nil , false
}
rowsCount := getRowsCount ( tss )
rwctx . rowsDroppedOnPushFailure . Add ( rowsCount )
}
}
return rwctxs , true
}
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 18:01:37 +02:00
func pushToRemoteStoragesTrackDropped ( tss [ ] prompbmarshal . TimeSeries ) {
2024-07-15 09:29:17 +02:00
rwctxs , _ := getEligibleRemoteWriteCtxs ( tss , true )
if len ( rwctxs ) == 0 {
2024-05-17 14:00:47 +02:00
return
}
2024-07-15 09:29:17 +02:00
if ! tryPushBlockToRemoteStorages ( rwctxs , tss , true ) {
logger . Panicf ( "BUG: tryPushBlockToRemoteStorages() must return true when forceDropSamplesOnFailure=true" )
}
2024-05-17 14:00:47 +02:00
}
2024-07-15 09:29:17 +02:00
func tryPushBlockToRemoteStorages ( rwctxs [ ] * remoteWriteCtx , tssBlock [ ] prompbmarshal . TimeSeries , forceDropSamplesOnFailure bool ) bool {
2021-11-04 14:00:51 +01:00
if len ( tssBlock ) == 0 {
// Nothing to push
2023-11-24 13:42:11 +01:00
return true
2021-11-04 14:00:51 +01:00
}
2023-11-24 13:42:11 +01:00
2023-07-25 03:15:24 +02:00
if len ( rwctxs ) == 1 {
// Fast path - just push data to the configured single remote storage
2024-05-10 12:09:21 +02:00
return rwctxs [ 0 ] . TryPush ( tssBlock , forceDropSamplesOnFailure )
2023-07-25 03:15:24 +02:00
}
// We need to push tssBlock to multiple remote storages.
// This is either sharding or replication depending on -remoteWrite.shardByURL command-line flag value.
2024-04-19 11:25:41 +02:00
if * shardByURL && * shardByURLReplicas < len ( rwctxs ) {
// Shard tssBlock samples among rwctxs.
replicas := * shardByURLReplicas
if replicas <= 0 {
replicas = 1
2023-07-25 03:15:24 +02:00
}
2024-07-15 09:29:17 +02:00
return tryShardingBlockAmongRemoteStorages ( rwctxs , tssBlock , replicas , forceDropSamplesOnFailure )
2023-07-25 03:15:24 +02:00
}
2024-04-19 11:25:41 +02:00
// Replicate tssBlock samples among rwctxs.
// Push tssBlock to remote storage systems in parallel in order to reduce
2023-07-25 03:15:24 +02:00
// the time needed for sending the data to multiple remote storage systems.
2021-11-04 14:00:51 +01:00
var wg sync . WaitGroup
2023-07-25 03:15:24 +02:00
wg . Add ( len ( rwctxs ) )
2024-02-24 01:44:19 +01:00
var anyPushFailed atomic . Bool
2021-11-04 14:00:51 +01:00
for _ , rwctx := range rwctxs {
2021-11-04 15:58:28 +01:00
go func ( rwctx * remoteWriteCtx ) {
2021-11-04 14:00:51 +01:00
defer wg . Done ( )
2024-05-10 12:09:21 +02:00
if ! rwctx . TryPush ( tssBlock , forceDropSamplesOnFailure ) {
2024-02-24 01:44:19 +01:00
anyPushFailed . Store ( true )
2023-11-24 13:42:11 +01:00
}
2021-11-04 15:58:28 +01:00
} ( rwctx )
2021-11-04 14:00:51 +01:00
}
2021-11-04 15:58:28 +01:00
wg . Wait ( )
2024-02-24 01:44:19 +01:00
return ! anyPushFailed . Load ( )
2021-11-04 14:00:51 +01:00
}
2024-07-15 09:29:17 +02:00
func tryShardingBlockAmongRemoteStorages ( rwctxs [ ] * remoteWriteCtx , tssBlock [ ] prompbmarshal . TimeSeries , replicas int , forceDropSamplesOnFailure bool ) bool {
2024-04-19 11:25:41 +02:00
x := getTSSShards ( len ( rwctxs ) )
defer putTSSShards ( x )
shards := x . shards
tmpLabels := promutils . GetLabels ( )
for _ , ts := range tssBlock {
hashLabels := ts . Labels
if len ( shardByURLLabelsMap ) > 0 {
hashLabels = tmpLabels . Labels [ : 0 ]
for _ , label := range ts . Labels {
if _ , ok := shardByURLLabelsMap [ label . Name ] ; ok {
hashLabels = append ( hashLabels , label )
}
}
tmpLabels . Labels = hashLabels
} else if len ( shardByURLIgnoreLabelsMap ) > 0 {
hashLabels = tmpLabels . Labels [ : 0 ]
for _ , label := range ts . Labels {
if _ , ok := shardByURLIgnoreLabelsMap [ label . Name ] ; ! ok {
hashLabels = append ( hashLabels , label )
}
}
tmpLabels . Labels = hashLabels
}
h := getLabelsHash ( hashLabels )
idx := h % uint64 ( len ( shards ) )
i := 0
for {
shards [ idx ] = append ( shards [ idx ] , ts )
i ++
if i >= replicas {
break
}
idx ++
if idx >= uint64 ( len ( shards ) ) {
idx = 0
}
}
}
promutils . PutLabels ( tmpLabels )
// Push sharded samples to remote storage systems in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync . WaitGroup
var anyPushFailed atomic . Bool
for i , rwctx := range rwctxs {
shard := shards [ i ]
if len ( shard ) == 0 {
continue
}
wg . Add ( 1 )
go func ( rwctx * remoteWriteCtx , tss [ ] prompbmarshal . TimeSeries ) {
defer wg . Done ( )
2024-05-10 12:09:21 +02:00
if ! rwctx . TryPush ( tss , forceDropSamplesOnFailure ) {
2024-04-19 11:25:41 +02:00
anyPushFailed . Store ( true )
}
} ( rwctx , shard )
}
wg . Wait ( )
return ! anyPushFailed . Load ( )
}
type tssShards struct {
shards [ ] [ ] prompbmarshal . TimeSeries
}
func getTSSShards ( n int ) * tssShards {
v := tssShardsPool . Get ( )
if v == nil {
v = & tssShards { }
}
x := v . ( * tssShards )
if cap ( x . shards ) < n {
x . shards = make ( [ ] [ ] prompbmarshal . TimeSeries , n )
}
x . shards = x . shards [ : n ]
return x
}
func putTSSShards ( x * tssShards ) {
shards := x . shards
for i := range shards {
clear ( shards [ i ] )
shards [ i ] = shards [ i ] [ : 0 ]
}
tssShardsPool . Put ( x )
}
var tssShardsPool sync . Pool
2021-05-20 01:12:36 +02:00
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
func sortLabelsIfNeeded ( tss [ ] prompbmarshal . TimeSeries ) {
if ! * sortLabels {
return
}
for i := range tss {
promrelabel . SortLabels ( tss [ i ] . Labels )
}
}
2021-05-20 12:13:40 +02:00
func limitSeriesCardinality ( tss [ ] prompbmarshal . TimeSeries ) [ ] prompbmarshal . TimeSeries {
if hourlySeriesLimiter == nil && dailySeriesLimiter == nil {
return tss
}
dst := make ( [ ] prompbmarshal . TimeSeries , 0 , len ( tss ) )
for i := range tss {
labels := tss [ i ] . Labels
h := getLabelsHash ( labels )
if hourlySeriesLimiter != nil && ! hourlySeriesLimiter . Add ( h ) {
2021-05-20 13:15:19 +02:00
hourlySeriesLimitRowsDropped . Add ( len ( tss [ i ] . Samples ) )
logSkippedSeries ( labels , "-remoteWrite.maxHourlySeries" , hourlySeriesLimiter . MaxItems ( ) )
2021-05-20 12:13:40 +02:00
continue
}
if dailySeriesLimiter != nil && ! dailySeriesLimiter . Add ( h ) {
2021-05-20 13:15:19 +02:00
dailySeriesLimitRowsDropped . Add ( len ( tss [ i ] . Samples ) )
logSkippedSeries ( labels , "-remoteWrite.maxDailySeries" , dailySeriesLimiter . MaxItems ( ) )
2021-05-20 12:13:40 +02:00
continue
}
dst = append ( dst , tss [ i ] )
}
return dst
}
var (
hourlySeriesLimiter * bloomfilter . Limiter
dailySeriesLimiter * bloomfilter . Limiter
2021-05-20 13:15:19 +02:00
hourlySeriesLimitRowsDropped = metrics . NewCounter ( ` vmagent_hourly_series_limit_rows_dropped_total ` )
dailySeriesLimitRowsDropped = metrics . NewCounter ( ` vmagent_daily_series_limit_rows_dropped_total ` )
2021-05-20 12:13:40 +02:00
)
func getLabelsHash ( labels [ ] prompbmarshal . Label ) uint64 {
bb := labelsHashBufPool . Get ( )
b := bb . B [ : 0 ]
for _ , label := range labels {
b = append ( b , label . Name ... )
b = append ( b , label . Value ... )
}
h := xxhash . Sum64 ( b )
bb . B = b
labelsHashBufPool . Put ( bb )
return h
}
var labelsHashBufPool bytesutil . ByteBufferPool
func logSkippedSeries ( labels [ ] prompbmarshal . Label , flagName string , flagValue int ) {
select {
case <- logSkippedSeriesTicker . C :
2021-12-21 16:03:25 +01:00
// Do not use logger.WithThrottler() here, since this will increase CPU usage
// because every call to logSkippedSeries will result to a call to labelsToString.
2021-05-20 12:13:40 +02:00
logger . Warnf ( "skip series %s because %s=%d reached" , labelsToString ( labels ) , flagName , flagValue )
default :
}
}
var logSkippedSeriesTicker = time . NewTicker ( 5 * time . Second )
func labelsToString ( labels [ ] prompbmarshal . Label ) string {
var b [ ] byte
b = append ( b , '{' )
for i , label := range labels {
b = append ( b , label . Name ... )
b = append ( b , '=' )
b = strconv . AppendQuote ( b , label . Value )
if i + 1 < len ( labels ) {
b = append ( b , ',' )
}
}
b = append ( b , '}' )
return string ( b )
}
2022-05-06 14:28:59 +02:00
var (
2022-05-06 14:50:50 +02:00
globalRowsPushedBeforeRelabel = metrics . NewCounter ( "vmagent_remotewrite_global_rows_pushed_before_relabel_total" )
2022-05-06 14:28:59 +02:00
rowsDroppedByGlobalRelabel = metrics . NewCounter ( "vmagent_remotewrite_global_relabel_metrics_dropped_total" )
)
2020-03-03 12:08:17 +01:00
type remoteWriteCtx struct {
2023-01-04 07:19:18 +01:00
idx int
fq * persistentqueue . FastQueue
c * client
2024-03-04 23:45:22 +01:00
sas atomic . Pointer [ streamaggr . Aggregators ]
deduplicator * streamaggr . Deduplicator
2024-07-13 01:55:26 +02:00
streamAggrKeepInput bool
streamAggrDropInput bool
2023-01-04 07:19:18 +01:00
2020-03-03 12:08:17 +01:00
pss [ ] * pendingSeries
2024-02-24 01:44:19 +01:00
pssNextIdx atomic . Uint64
2020-03-03 12:08:17 +01:00
2023-11-25 10:31:30 +01:00
rowsPushedAfterRelabel * metrics . Counter
rowsDroppedByRelabel * metrics . Counter
2024-05-10 12:09:21 +02:00
pushFailures * metrics . Counter
rowsDroppedOnPushFailure * metrics . Counter
2020-03-03 12:08:17 +01:00
}
2023-09-01 09:34:16 +02:00
func newRemoteWriteCtx ( argIdx int , remoteWriteURL * url . URL , maxInmemoryBlocks int , sanitizedURL string ) * remoteWriteCtx {
2021-09-28 23:52:07 +02:00
// strip query params, otherwise changing params resets pq
pqURL := * remoteWriteURL
pqURL . RawQuery = ""
pqURL . Fragment = ""
h := xxhash . Sum64 ( [ ] byte ( pqURL . String ( ) ) )
2023-03-28 03:33:05 +02:00
queuePath := filepath . Join ( * tmpDataPath , persistentQueueDirname , fmt . Sprintf ( "%d_%016X" , argIdx + 1 , h ) )
2023-08-12 13:17:55 +02:00
maxPendingBytes := maxPendingBytesPerURL . GetOptionalArg ( argIdx )
2023-04-26 12:23:01 +02:00
if maxPendingBytes != 0 && maxPendingBytes < persistentqueue . DefaultChunkFileSize {
2023-05-09 00:42:28 +02:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4195
logger . Warnf ( "rounding the -remoteWrite.maxDiskUsagePerURL=%d to the minimum supported value: %d" , maxPendingBytes , persistentqueue . DefaultChunkFileSize )
maxPendingBytes = persistentqueue . DefaultChunkFileSize
2023-04-26 12:23:01 +02:00
}
2024-07-13 01:55:26 +02:00
2024-05-10 12:09:21 +02:00
isPQDisabled := disableOnDiskQueue . GetOptionalArg ( argIdx )
fq := persistentqueue . MustOpenFastQueue ( queuePath , sanitizedURL , maxInmemoryBlocks , maxPendingBytes , isPQDisabled )
2021-09-28 23:52:07 +02:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_pending_data_bytes { path=%q, url=%q} ` , queuePath , sanitizedURL ) , func ( ) float64 {
2020-03-03 12:08:17 +01:00
return float64 ( fq . GetPendingBytes ( ) )
} )
2021-09-28 23:52:07 +02:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_pending_inmemory_blocks { path=%q, url=%q} ` , queuePath , sanitizedURL ) , func ( ) float64 {
2020-03-03 12:08:17 +01:00
return float64 ( fq . GetInmemoryQueueLen ( ) )
} )
2023-11-25 10:31:30 +01:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_queue_blocked { path=%q, url=%q} ` , queuePath , sanitizedURL ) , func ( ) float64 {
if fq . IsWriteBlocked ( ) {
return 1
2023-11-24 13:42:11 +01:00
}
return 0
} )
2023-02-24 02:36:52 +01:00
2021-09-28 23:52:07 +02:00
var c * client
switch remoteWriteURL . Scheme {
case "http" , "https" :
2023-02-26 21:07:30 +01:00
c = newHTTPClient ( argIdx , remoteWriteURL . String ( ) , sanitizedURL , fq , * queues )
2021-09-28 23:52:07 +02:00
default :
logger . Fatalf ( "unsupported scheme: %s for remoteWriteURL: %s, want `http`, `https`" , remoteWriteURL . Scheme , sanitizedURL )
}
c . init ( argIdx , * queues , sanitizedURL )
2023-01-04 07:19:18 +01:00
// Initialize pss
2023-08-12 13:17:55 +02:00
sf := significantFigures . GetOptionalArg ( argIdx )
rd := roundDigits . GetOptionalArg ( argIdx )
2021-03-31 15:16:26 +02:00
pssLen := * queues
if n := cgroup . AvailableCPUs ( ) ; pssLen > n {
// There is no sense in running more than availableCPUs concurrent pendingSeries,
// since every pendingSeries can saturate up to a single CPU.
pssLen = n
}
pss := make ( [ ] * pendingSeries , pssLen )
2020-03-03 12:08:17 +01:00
for i := range pss {
2023-11-24 13:42:11 +01:00
pss [ i ] = newPendingSeries ( fq , c . useVMProto , sf , rd )
2020-03-03 12:08:17 +01:00
}
2023-01-04 07:19:18 +01:00
rwctx := & remoteWriteCtx {
2020-05-30 13:36:40 +02:00
idx : argIdx ,
fq : fq ,
c : c ,
pss : pss ,
2020-03-03 12:08:17 +01:00
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 18:01:37 +02:00
rowsPushedAfterRelabel : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_rows_pushed_after_relabel_total { path=%q,url=%q} ` , queuePath , sanitizedURL ) ) ,
rowsDroppedByRelabel : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_relabel_metrics_dropped_total { path=%q,url=%q} ` , queuePath , sanitizedURL ) ) ,
2024-05-10 12:09:21 +02:00
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 18:01:37 +02:00
pushFailures : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_push_failures_total { path=%q,url=%q} ` , queuePath , sanitizedURL ) ) ,
rowsDroppedOnPushFailure : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_samples_dropped_total { path=%q,url=%q} ` , queuePath , sanitizedURL ) ) ,
2023-01-04 07:19:18 +01:00
}
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 18:01:37 +02:00
rwctx . initStreamAggrConfig ( )
2023-01-04 07:19:18 +01:00
return rwctx
2020-02-23 12:35:47 +01:00
}
2020-03-03 12:08:17 +01:00
func ( rwctx * remoteWriteCtx ) MustStop ( ) {
2024-03-04 23:45:22 +01:00
// sas and deduplicator must be stopped before rwctx is closed
2024-07-03 14:12:41 +02:00
// because they can write pending series to rwctx.pss if there are any
2023-06-07 15:45:43 +02:00
sas := rwctx . sas . Swap ( nil )
sas . MustStop ( )
2024-03-04 23:45:22 +01:00
if rwctx . deduplicator != nil {
rwctx . deduplicator . MustStop ( )
rwctx . deduplicator = nil
}
2020-03-03 12:08:17 +01:00
for _ , ps := range rwctx . pss {
ps . MustStop ( )
}
2020-05-30 13:36:40 +02:00
rwctx . idx = 0
2020-03-03 12:08:17 +01:00
rwctx . pss = nil
2021-02-18 23:31:07 +01:00
rwctx . fq . UnblockAllReaders ( )
2020-03-03 12:08:17 +01:00
rwctx . c . MustStop ( )
rwctx . c = nil
2023-04-01 06:27:45 +02:00
2021-02-17 20:23:38 +01:00
rwctx . fq . MustClose ( )
rwctx . fq = nil
2020-03-03 12:08:17 +01:00
2022-05-06 14:28:59 +02:00
rwctx . rowsPushedAfterRelabel = nil
rwctx . rowsDroppedByRelabel = nil
2020-03-03 12:08:17 +01:00
}
2020-02-23 12:35:47 +01:00
2024-05-06 12:09:51 +02:00
// TryPush sends tss series to the configured remote write endpoint
//
2024-07-03 14:12:41 +02:00
// TryPush doesn't modify tss, so tss can be passed concurrently to TryPush across distinct rwctx instances.
2024-05-10 12:09:21 +02:00
func ( rwctx * remoteWriteCtx ) TryPush ( tss [ ] prompbmarshal . TimeSeries , forceDropSamplesOnFailure bool ) bool {
2020-03-03 12:08:17 +01:00
var rctx * relabelCtx
2020-07-10 14:13:26 +02:00
var v * [ ] prompbmarshal . TimeSeries
2024-07-03 14:12:41 +02:00
defer func ( ) {
if rctx == nil {
return
}
* v = prompbmarshal . ResetTimeSeries ( tss )
tssPool . Put ( v )
putRelabelCtx ( rctx )
} ( )
// Apply relabeling
2023-07-20 02:37:49 +02:00
rcs := allRelabelConfigs . Load ( )
2021-02-22 15:33:55 +01:00
pcs := rcs . perURL [ rwctx . idx ]
if pcs . Len ( ) > 0 {
2020-07-10 14:13:26 +02:00
rctx = getRelabelCtx ( )
2020-05-12 21:01:47 +02:00
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
2020-07-10 14:13:26 +02:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
2023-07-25 01:33:30 +02:00
v = tssPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
2020-07-10 14:13:26 +02:00
tss = append ( * v , tss ... )
2022-05-06 14:28:59 +02:00
rowsCountBeforeRelabel := getRowsCount ( tss )
2023-08-15 13:47:48 +02:00
tss = rctx . applyRelabeling ( tss , pcs )
2022-05-06 14:28:59 +02:00
rowsCountAfterRelabel := getRowsCount ( tss )
rwctx . rowsDroppedByRelabel . Add ( rowsCountBeforeRelabel - rowsCountAfterRelabel )
2020-03-03 12:08:17 +01:00
}
2022-05-06 14:28:59 +02:00
rowsCount := getRowsCount ( tss )
rwctx . rowsPushedAfterRelabel . Add ( rowsCount )
2023-01-04 07:19:18 +01:00
2024-03-04 23:45:22 +01:00
// Apply stream aggregation or deduplication if they are configured
2023-07-25 01:44:09 +02:00
sas := rwctx . sas . Load ( )
2024-05-20 14:03:28 +02:00
if sas . IsEnabled ( ) {
2023-07-25 01:44:09 +02:00
matchIdxs := matchIdxsPool . Get ( )
matchIdxs . B = sas . Push ( tss , matchIdxs . B )
if ! rwctx . streamAggrKeepInput {
if rctx == nil {
rctx = getRelabelCtx ( )
// Make a copy of tss before dropping aggregated series
v = tssPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
tss = append ( * v , tss ... )
}
tss = dropAggregatedSeries ( tss , matchIdxs . B , rwctx . streamAggrDropInput )
2023-07-25 01:33:30 +02:00
}
2023-07-25 01:44:09 +02:00
matchIdxsPool . Put ( matchIdxs )
2024-09-03 10:47:05 +02:00
}
if rwctx . deduplicator != nil {
2024-03-04 23:45:22 +01:00
rwctx . deduplicator . Push ( tss )
2024-07-03 14:12:41 +02:00
return true
2023-07-25 01:33:30 +02:00
}
2024-07-03 14:12:41 +02:00
// Try pushing tss to remote storage
if rwctx . tryPushInternal ( tss ) {
return true
2023-11-25 10:31:30 +01:00
}
2024-07-03 14:12:41 +02:00
// Couldn't push tss to remote storage
rwctx . pushFailures . Inc ( )
2024-07-13 01:55:26 +02:00
if forceDropSamplesOnFailure {
rowsCount := getRowsCount ( tss )
rwctx . rowsDroppedOnPushFailure . Add ( rowsCount )
2024-07-03 14:12:41 +02:00
return true
2024-05-10 12:09:21 +02:00
}
2024-07-03 14:12:41 +02:00
return false
2023-07-25 01:44:09 +02:00
}
2023-07-25 01:33:30 +02:00
2023-07-25 01:44:09 +02:00
var matchIdxsPool bytesutil . ByteBufferPool
2023-07-25 01:33:30 +02:00
2023-07-25 01:44:09 +02:00
func dropAggregatedSeries ( src [ ] prompbmarshal . TimeSeries , matchIdxs [ ] byte , dropInput bool ) [ ] prompbmarshal . TimeSeries {
dst := src [ : 0 ]
2023-08-10 14:27:21 +02:00
if ! dropInput {
for i , match := range matchIdxs {
if match == 1 {
continue
}
dst = append ( dst , src [ i ] )
2023-07-25 01:44:09 +02:00
}
}
tail := src [ len ( dst ) : ]
2024-03-04 23:45:22 +01:00
clear ( tail )
2023-07-25 01:44:09 +02:00
return dst
2020-03-03 12:08:17 +01:00
}
2020-07-10 14:13:26 +02:00
2023-11-24 13:42:11 +01:00
func ( rwctx * remoteWriteCtx ) pushInternalTrackDropped ( tss [ ] prompbmarshal . TimeSeries ) {
2023-11-25 10:31:30 +01:00
if rwctx . tryPushInternal ( tss ) {
return
}
2024-07-13 01:55:26 +02:00
if ! rwctx . fq . IsPersistentQueueDisabled ( ) {
2023-11-25 10:31:30 +01:00
logger . Panicf ( "BUG: tryPushInternal must return true if -remoteWrite.disableOnDiskQueue isn't set" )
}
2024-05-10 12:09:21 +02:00
rwctx . pushFailures . Inc ( )
2024-07-13 01:55:26 +02:00
rowsCount := getRowsCount ( tss )
rwctx . rowsDroppedOnPushFailure . Add ( rowsCount )
2023-11-24 13:42:11 +01:00
}
2023-11-25 10:31:30 +01:00
func ( rwctx * remoteWriteCtx ) tryPushInternal ( tss [ ] prompbmarshal . TimeSeries ) bool {
2023-09-08 23:17:16 +02:00
var rctx * relabelCtx
var v * [ ] prompbmarshal . TimeSeries
2024-07-03 14:12:41 +02:00
defer func ( ) {
if rctx == nil {
return
}
* v = prompbmarshal . ResetTimeSeries ( tss )
tssPool . Put ( v )
putRelabelCtx ( rctx )
} ( )
2023-08-15 13:47:48 +02:00
if len ( labelsGlobal ) > 0 {
2023-09-08 23:17:16 +02:00
// Make a copy of tss before adding extra labels in order to prevent
// from affecting time series for other remoteWrite.url configs.
rctx = getRelabelCtx ( )
v = tssPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
tss = append ( * v , tss ... )
2023-08-17 14:35:26 +02:00
rctx . appendExtraLabels ( tss , labelsGlobal )
2023-08-15 13:47:48 +02:00
}
2023-08-17 12:15:03 +02:00
2023-01-04 07:19:18 +01:00
pss := rwctx . pss
2024-02-24 01:44:19 +01:00
idx := rwctx . pssNextIdx . Add ( 1 ) % uint64 ( len ( pss ) )
2023-11-25 10:31:30 +01:00
2024-07-03 14:12:41 +02:00
return pss [ idx ] . TryPush ( tss )
2023-01-04 07:19:18 +01:00
}
2023-07-25 01:33:30 +02:00
var tssPool = & sync . Pool {
2024-07-10 00:14:15 +02:00
New : func ( ) any {
2020-07-14 13:27:50 +02:00
a := [ ] prompbmarshal . TimeSeries { }
return & a
2020-07-10 14:13:26 +02:00
} ,
}
2022-05-06 14:28:59 +02:00
func getRowsCount ( tss [ ] prompbmarshal . TimeSeries ) int {
rowsCount := 0
for _ , ts := range tss {
rowsCount += len ( ts . Samples )
}
return rowsCount
}
2023-04-01 06:27:45 +02:00
2024-04-02 23:36:32 +02:00
func newMapFromStrings ( a [ ] string ) map [ string ] struct { } {
m := make ( map [ string ] struct { } , len ( a ) )
for _ , s := range a {
m [ s ] = struct { } { }
}
return m
}