From 5b29be1f4db4b281ca4fc2f6e9134dfe15f8566b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 19 Apr 2024 11:25:41 +0200 Subject: [PATCH] app/vmagent/remotewrite: add support for replication additionally to sharding when both -remoteWrite.shardByURL and -remoteWrite.shardByURLReplicas=RF command-line flags are set This allows setting up data replication among failure domains if the replication factor is smaller than the number of failure domains. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054 See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages --- app/vmagent/remotewrite/remotewrite.go | 155 +++++++++++++------- app/vmagent/remotewrite/remotewrite_test.go | 48 ++++++ docs/CHANGELOG.md | 2 + docs/vmagent.md | 16 +- 4 files changed, 167 insertions(+), 54 deletions(-) create mode 100644 app/vmagent/remotewrite/remotewrite_test.go diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 2dc6982a0c..41f41dc5f7 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -47,14 +47,19 @@ var ( "https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers "+ "according to https://docs.victoriametrics.com/#how-to-import-time-series-data ."+ "See https://docs.victoriametrics.com/vmagent/#multitenancy for details") + shardByURL = flag.Bool("remoteWrite.shardByURL", false, "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . "+ - "By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages") + "By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages . "+ + "See also -remoteWrite.shardByURLReplicas") + shardByURLReplicas = flag.Int("remoteWrite.shardByURLReplicas", 1, "How many copies of data to make among remote storage systems enumerated via -remoteWrite.url "+ + "when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages") shardByURLLabels = flagutil.NewArrayString("remoteWrite.shardByURL.labels", "Optional list of labels, which must be used for sharding outgoing samples "+ "among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+ "even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels") shardByURLIgnoreLabels = flagutil.NewArrayString("remoteWrite.shardByURL.ignoreLabels", "Optional list of labels, which must be ignored when sharding outgoing samples "+ "among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+ "even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels") + tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . "+ "See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue") keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+ @@ -562,58 +567,17 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar // We need to push tssBlock to multiple remote storages. // This is either sharding or replication depending on -remoteWrite.shardByURL command-line flag value. - if *shardByURL { - // Shard the data among rwctxs - tssByURL := make([][]prompbmarshal.TimeSeries, len(rwctxs)) - tmpLabels := promutils.GetLabels() - for _, ts := range tssBlock { - hashLabels := ts.Labels - if len(shardByURLLabelsMap) > 0 { - hashLabels = tmpLabels.Labels[:0] - for _, label := range ts.Labels { - if _, ok := shardByURLLabelsMap[label.Name]; ok { - hashLabels = append(hashLabels, label) - } - } - tmpLabels.Labels = hashLabels - } else if len(shardByURLIgnoreLabelsMap) > 0 { - hashLabels = tmpLabels.Labels[:0] - for _, label := range ts.Labels { - if _, ok := shardByURLIgnoreLabelsMap[label.Name]; !ok { - hashLabels = append(hashLabels, label) - } - } - tmpLabels.Labels = hashLabels - } - h := getLabelsHash(hashLabels) - idx := h % uint64(len(tssByURL)) - tssByURL[idx] = append(tssByURL[idx], ts) + if *shardByURL && *shardByURLReplicas < len(rwctxs) { + // Shard tssBlock samples among rwctxs. + replicas := *shardByURLReplicas + if replicas <= 0 { + replicas = 1 } - promutils.PutLabels(tmpLabels) - - // Push sharded data to remote storages in parallel in order to reduce - // the time needed for sending the data to multiple remote storage systems. - var wg sync.WaitGroup - var anyPushFailed atomic.Bool - for i, rwctx := range rwctxs { - tssShard := tssByURL[i] - if len(tssShard) == 0 { - continue - } - wg.Add(1) - go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) { - defer wg.Done() - if !rwctx.TryPush(tss) { - anyPushFailed.Store(true) - } - }(rwctx, tssShard) - } - wg.Wait() - return !anyPushFailed.Load() + return tryShardingBlockAmongRemoteStorages(rwctxs, tssBlock, replicas) } - // Replicate data among rwctxs. - // Push block to remote storages in parallel in order to reduce + // Replicate tssBlock samples among rwctxs. + // Push tssBlock to remote storage systems in parallel in order to reduce // the time needed for sending the data to multiple remote storage systems. var wg sync.WaitGroup wg.Add(len(rwctxs)) @@ -630,6 +594,97 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar return !anyPushFailed.Load() } +func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, replicas int) bool { + x := getTSSShards(len(rwctxs)) + defer putTSSShards(x) + + shards := x.shards + tmpLabels := promutils.GetLabels() + for _, ts := range tssBlock { + hashLabels := ts.Labels + if len(shardByURLLabelsMap) > 0 { + hashLabels = tmpLabels.Labels[:0] + for _, label := range ts.Labels { + if _, ok := shardByURLLabelsMap[label.Name]; ok { + hashLabels = append(hashLabels, label) + } + } + tmpLabels.Labels = hashLabels + } else if len(shardByURLIgnoreLabelsMap) > 0 { + hashLabels = tmpLabels.Labels[:0] + for _, label := range ts.Labels { + if _, ok := shardByURLIgnoreLabelsMap[label.Name]; !ok { + hashLabels = append(hashLabels, label) + } + } + tmpLabels.Labels = hashLabels + } + h := getLabelsHash(hashLabels) + idx := h % uint64(len(shards)) + i := 0 + for { + shards[idx] = append(shards[idx], ts) + i++ + if i >= replicas { + break + } + idx++ + if idx >= uint64(len(shards)) { + idx = 0 + } + } + } + promutils.PutLabels(tmpLabels) + + // Push sharded samples to remote storage systems in parallel in order to reduce + // the time needed for sending the data to multiple remote storage systems. + var wg sync.WaitGroup + var anyPushFailed atomic.Bool + for i, rwctx := range rwctxs { + shard := shards[i] + if len(shard) == 0 { + continue + } + wg.Add(1) + go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) { + defer wg.Done() + if !rwctx.TryPush(tss) { + anyPushFailed.Store(true) + } + }(rwctx, shard) + } + wg.Wait() + return !anyPushFailed.Load() +} + +type tssShards struct { + shards [][]prompbmarshal.TimeSeries +} + +func getTSSShards(n int) *tssShards { + v := tssShardsPool.Get() + if v == nil { + v = &tssShards{} + } + x := v.(*tssShards) + if cap(x.shards) < n { + x.shards = make([][]prompbmarshal.TimeSeries, n) + } + x.shards = x.shards[:n] + return x +} + +func putTSSShards(x *tssShards) { + shards := x.shards + for i := range shards { + clear(shards[i]) + shards[i] = shards[i][:0] + } + tssShardsPool.Put(x) +} + +var tssShardsPool sync.Pool + // sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set. func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) { if !*sortLabels { diff --git a/app/vmagent/remotewrite/remotewrite_test.go b/app/vmagent/remotewrite/remotewrite_test.go new file mode 100644 index 0000000000..16c404ef2b --- /dev/null +++ b/app/vmagent/remotewrite/remotewrite_test.go @@ -0,0 +1,48 @@ +package remotewrite + +import ( + "fmt" + "math" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestGetLabelsHash_Distribution(t *testing.T) { + f := func(bucketsCount int) { + t.Helper() + + // Distribute itemsCount hashes returned by getLabelsHash() across bucketsCount buckets. + itemsCount := 1_000 * bucketsCount + m := make([]int, bucketsCount) + var labels []prompbmarshal.Label + for i := 0; i < itemsCount; i++ { + labels = append(labels[:0], prompbmarshal.Label{ + Name: "__name__", + Value: fmt.Sprintf("some_name_%d", i), + }) + for j := 0; j < 10; j++ { + labels = append(labels, prompbmarshal.Label{ + Name: fmt.Sprintf("label_%d", j), + Value: fmt.Sprintf("value_%d_%d", i, j), + }) + } + h := getLabelsHash(labels) + m[h%uint64(bucketsCount)]++ + } + + // Verify that the distribution is even + expectedItemsPerBucket := itemsCount / bucketsCount + for _, n := range m { + if math.Abs(1-float64(n)/float64(expectedItemsPerBucket)) > 0.04 { + t.Fatalf("unexpected items in the bucket for %d buckets; got %d; want around %d", bucketsCount, n, expectedItemsPerBucket) + } + } + } + + f(2) + f(3) + f(4) + f(5) + f(10) +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 92a9a90b88..ef34af657d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -32,6 +32,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add support for fault domain awareness to `vmselect`. It can be configured to return full responses if up to `-globalReplicationFactor - 1` fault domains (aka `vmstorage` groups) are unavailable. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054) and [these docs](https://docs.victoriametrics.com/cluster-victoriametrics/#vmstorage-groups-at-vmselect). * FEATURE: all VictoriaMetrics [enterprise](https://docs.victoriametrics.com/enterprise/) components: add support for automatic issuing of TLS certificates for HTTPS server at `-httpListenAddr` via [Let's Encrypt service](https://letsencrypt.org/). See [these docs](https://docs.victoriametrics.com/#automatic-issuing-of-tls-certificates) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5949). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support data replication additionally to sharding among remote storage systems if `-remoteWrite.shardByURLReplicas=N` command-line flag is set additionally to `-remoteWrite.shardByURL` command-line flag, where `N` is desired replication factor. This allows setting up data replication among failure domains when the replication factor is smaller than the number of failure domains. See [these docs](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): reduce CPU usage when [sharding among remote storage systems](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) is enabled. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) addresses in `-remoteWrite.url` command-line option and in scrape target urls. For example, `-remoteWrite.url=http://srv+victoria-metrics/api/v1/write` automatically resolves the `victoria-metrics` DNS SRV to a list of hostnames with TCP ports and then sends the collected metrics to these TCP addresses. See [these docs](https://docs.victoriametrics.com/vmagent/#srv-urls) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support automatic discovering and load balancing for TCP addresses behind DNS SRV addresses. These addresses can be put inside `url_prefix` urls in the form `http://srv+addr/path`, where the `addr` is the [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) address, which is automatically resolved to hostnames with TCP ports. See [these docs](https://docs.victoriametrics.com/vmauth/#srv-urls) for details. * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support specifying client TLS certificates and TLS ServerName for requests to HTTPS backends. See [these docs](https://docs.victoriametrics.com/vmauth/#backend-tls-setup). diff --git a/docs/vmagent.md b/docs/vmagent.md index f16a1115a3..f9de53726e 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -178,9 +178,15 @@ See [these docs](https://docs.victoriametrics.com/cluster-victoriametrics/#repli By default `vmagent` replicates data among remote storage systems enumerated via `-remoteWrite.url` command-line flag. If the `-remoteWrite.shardByURL` command-line flag is set, then `vmagent` spreads evenly -the outgoing [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) -among all the remote storage systems enumerated via `-remoteWrite.url`. Note that samples for the same -time series are routed to the same remote storage system if `-remoteWrite.shardByURL` flag is specified. +the outgoing [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) among all the remote storage systems +enumerated via `-remoteWrite.url`. + +It is possible to replicate samples among remote storage systems by passing `-remoteWrite.shardByURLReplicas=N` +command-line flag to `vmagent` additionally to `-remoteWrite.shardByURL` command-line flag. +This instructs `vmagent` writing every outgoing sample to `N` distinct remote storage systems enumerated via `-remoteWrite.url` +in addition to sharding. + +Samples for the same time series are routed to the same remote storage system if `-remoteWrite.shardByURL` flag is specified. This allows building scalable data processing pipelines when a single remote storage cannot keep up with the data ingestion workload. For example, this allows building horizontally scalable [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/) by routing outgoing samples for the same time series of [counter](https://docs.victoriametrics.com/keyconcepts/#counter) @@ -2131,7 +2137,7 @@ See the docs at https://docs.victoriametrics.com/vmagent/ . Supports array of values separated by comma or specified via multiple flags. Empty values are set to default value. -remoteWrite.shardByURL - Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages + Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages . See also -remoteWrite.shardByURLReplicas -remoteWrite.shardByURL.ignoreLabels array Optional list of labels, which must be ignored when sharding outgoing samples among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels Supports an array of values separated by comma or specified via multiple flags. @@ -2140,6 +2146,8 @@ See the docs at https://docs.victoriametrics.com/vmagent/ . Optional list of labels, which must be used for sharding outgoing samples among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. + -remoteWrite.shardByURLReplicas int + How many copies of data to make among remote storage systems enumerated via -remoteWrite.url when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages (default 1) -remoteWrite.showURL Whether to show -remoteWrite.url in the exported metrics. It is hidden by default, since it can contain sensitive info such as auth key -remoteWrite.significantFigures array