app/vmagent/remotewrite: add support for replication additionally to sharding when both -remoteWrite.shardByURL and -remoteWrite.shardByURLReplicas=RF command-line flags are set

This allows setting up data replication among failure domains if the replication factor is smaller than the number of failure domains.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054

See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages
This commit is contained in:
Aliaksandr Valialkin 2024-04-19 11:25:41 +02:00
parent 0fde6f4b8f
commit 5b29be1f4d
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
4 changed files with 167 additions and 54 deletions

View File

@ -47,14 +47,19 @@ var (
"https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers "+ "https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers "+
"according to https://docs.victoriametrics.com/#how-to-import-time-series-data ."+ "according to https://docs.victoriametrics.com/#how-to-import-time-series-data ."+
"See https://docs.victoriametrics.com/vmagent/#multitenancy for details") "See https://docs.victoriametrics.com/vmagent/#multitenancy for details")
shardByURL = flag.Bool("remoteWrite.shardByURL", false, "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . "+ shardByURL = flag.Bool("remoteWrite.shardByURL", false, "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . "+
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages") "By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages . "+
"See also -remoteWrite.shardByURLReplicas")
shardByURLReplicas = flag.Int("remoteWrite.shardByURLReplicas", 1, "How many copies of data to make among remote storage systems enumerated via -remoteWrite.url "+
"when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages")
shardByURLLabels = flagutil.NewArrayString("remoteWrite.shardByURL.labels", "Optional list of labels, which must be used for sharding outgoing samples "+ shardByURLLabels = flagutil.NewArrayString("remoteWrite.shardByURL.labels", "Optional list of labels, which must be used for sharding outgoing samples "+
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+ "among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels") "even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels")
shardByURLIgnoreLabels = flagutil.NewArrayString("remoteWrite.shardByURL.ignoreLabels", "Optional list of labels, which must be ignored when sharding outgoing samples "+ shardByURLIgnoreLabels = flagutil.NewArrayString("remoteWrite.shardByURL.ignoreLabels", "Optional list of labels, which must be ignored when sharding outgoing samples "+
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+ "among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels") "even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . "+ tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . "+
"See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue") "See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue")
keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+ keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+
@ -562,9 +567,38 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
// We need to push tssBlock to multiple remote storages. // We need to push tssBlock to multiple remote storages.
// This is either sharding or replication depending on -remoteWrite.shardByURL command-line flag value. // This is either sharding or replication depending on -remoteWrite.shardByURL command-line flag value.
if *shardByURL { if *shardByURL && *shardByURLReplicas < len(rwctxs) {
// Shard the data among rwctxs // Shard tssBlock samples among rwctxs.
tssByURL := make([][]prompbmarshal.TimeSeries, len(rwctxs)) replicas := *shardByURLReplicas
if replicas <= 0 {
replicas = 1
}
return tryShardingBlockAmongRemoteStorages(rwctxs, tssBlock, replicas)
}
// Replicate tssBlock samples among rwctxs.
// Push tssBlock to remote storage systems in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
wg.Add(len(rwctxs))
var anyPushFailed atomic.Bool
for _, rwctx := range rwctxs {
go func(rwctx *remoteWriteCtx) {
defer wg.Done()
if !rwctx.TryPush(tssBlock) {
anyPushFailed.Store(true)
}
}(rwctx)
}
wg.Wait()
return !anyPushFailed.Load()
}
func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, replicas int) bool {
x := getTSSShards(len(rwctxs))
defer putTSSShards(x)
shards := x.shards
tmpLabels := promutils.GetLabels() tmpLabels := promutils.GetLabels()
for _, ts := range tssBlock { for _, ts := range tssBlock {
hashLabels := ts.Labels hashLabels := ts.Labels
@ -586,18 +620,29 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
tmpLabels.Labels = hashLabels tmpLabels.Labels = hashLabels
} }
h := getLabelsHash(hashLabels) h := getLabelsHash(hashLabels)
idx := h % uint64(len(tssByURL)) idx := h % uint64(len(shards))
tssByURL[idx] = append(tssByURL[idx], ts) i := 0
for {
shards[idx] = append(shards[idx], ts)
i++
if i >= replicas {
break
}
idx++
if idx >= uint64(len(shards)) {
idx = 0
}
}
} }
promutils.PutLabels(tmpLabels) promutils.PutLabels(tmpLabels)
// Push sharded data to remote storages in parallel in order to reduce // Push sharded samples to remote storage systems in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems. // the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup var wg sync.WaitGroup
var anyPushFailed atomic.Bool var anyPushFailed atomic.Bool
for i, rwctx := range rwctxs { for i, rwctx := range rwctxs {
tssShard := tssByURL[i] shard := shards[i]
if len(tssShard) == 0 { if len(shard) == 0 {
continue continue
} }
wg.Add(1) wg.Add(1)
@ -606,29 +651,39 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
if !rwctx.TryPush(tss) { if !rwctx.TryPush(tss) {
anyPushFailed.Store(true) anyPushFailed.Store(true)
} }
}(rwctx, tssShard) }(rwctx, shard)
} }
wg.Wait() wg.Wait()
return !anyPushFailed.Load() return !anyPushFailed.Load()
} }
// Replicate data among rwctxs. type tssShards struct {
// Push block to remote storages in parallel in order to reduce shards [][]prompbmarshal.TimeSeries
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
wg.Add(len(rwctxs))
var anyPushFailed atomic.Bool
for _, rwctx := range rwctxs {
go func(rwctx *remoteWriteCtx) {
defer wg.Done()
if !rwctx.TryPush(tssBlock) {
anyPushFailed.Store(true)
} }
}(rwctx)
func getTSSShards(n int) *tssShards {
v := tssShardsPool.Get()
if v == nil {
v = &tssShards{}
} }
wg.Wait() x := v.(*tssShards)
return !anyPushFailed.Load() if cap(x.shards) < n {
x.shards = make([][]prompbmarshal.TimeSeries, n)
} }
x.shards = x.shards[:n]
return x
}
func putTSSShards(x *tssShards) {
shards := x.shards
for i := range shards {
clear(shards[i])
shards[i] = shards[i][:0]
}
tssShardsPool.Put(x)
}
var tssShardsPool sync.Pool
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set. // sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) { func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) {

View File

@ -0,0 +1,48 @@
package remotewrite
import (
"fmt"
"math"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestGetLabelsHash_Distribution(t *testing.T) {
f := func(bucketsCount int) {
t.Helper()
// Distribute itemsCount hashes returned by getLabelsHash() across bucketsCount buckets.
itemsCount := 1_000 * bucketsCount
m := make([]int, bucketsCount)
var labels []prompbmarshal.Label
for i := 0; i < itemsCount; i++ {
labels = append(labels[:0], prompbmarshal.Label{
Name: "__name__",
Value: fmt.Sprintf("some_name_%d", i),
})
for j := 0; j < 10; j++ {
labels = append(labels, prompbmarshal.Label{
Name: fmt.Sprintf("label_%d", j),
Value: fmt.Sprintf("value_%d_%d", i, j),
})
}
h := getLabelsHash(labels)
m[h%uint64(bucketsCount)]++
}
// Verify that the distribution is even
expectedItemsPerBucket := itemsCount / bucketsCount
for _, n := range m {
if math.Abs(1-float64(n)/float64(expectedItemsPerBucket)) > 0.04 {
t.Fatalf("unexpected items in the bucket for %d buckets; got %d; want around %d", bucketsCount, n, expectedItemsPerBucket)
}
}
}
f(2)
f(3)
f(4)
f(5)
f(10)
}

View File

@ -32,6 +32,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add support for fault domain awareness to `vmselect`. It can be configured to return full responses if up to `-globalReplicationFactor - 1` fault domains (aka `vmstorage` groups) are unavailable. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054) and [these docs](https://docs.victoriametrics.com/cluster-victoriametrics/#vmstorage-groups-at-vmselect). * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add support for fault domain awareness to `vmselect`. It can be configured to return full responses if up to `-globalReplicationFactor - 1` fault domains (aka `vmstorage` groups) are unavailable. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054) and [these docs](https://docs.victoriametrics.com/cluster-victoriametrics/#vmstorage-groups-at-vmselect).
* FEATURE: all VictoriaMetrics [enterprise](https://docs.victoriametrics.com/enterprise/) components: add support for automatic issuing of TLS certificates for HTTPS server at `-httpListenAddr` via [Let's Encrypt service](https://letsencrypt.org/). See [these docs](https://docs.victoriametrics.com/#automatic-issuing-of-tls-certificates) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5949). * FEATURE: all VictoriaMetrics [enterprise](https://docs.victoriametrics.com/enterprise/) components: add support for automatic issuing of TLS certificates for HTTPS server at `-httpListenAddr` via [Let's Encrypt service](https://letsencrypt.org/). See [these docs](https://docs.victoriametrics.com/#automatic-issuing-of-tls-certificates) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5949).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support data replication additionally to sharding among remote storage systems if `-remoteWrite.shardByURLReplicas=N` command-line flag is set additionally to `-remoteWrite.shardByURL` command-line flag, where `N` is desired replication factor. This allows setting up data replication among failure domains when the replication factor is smaller than the number of failure domains. See [these docs](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): reduce CPU usage when [sharding among remote storage systems](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) is enabled.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) addresses in `-remoteWrite.url` command-line option and in scrape target urls. For example, `-remoteWrite.url=http://srv+victoria-metrics/api/v1/write` automatically resolves the `victoria-metrics` DNS SRV to a list of hostnames with TCP ports and then sends the collected metrics to these TCP addresses. See [these docs](https://docs.victoriametrics.com/vmagent/#srv-urls) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) addresses in `-remoteWrite.url` command-line option and in scrape target urls. For example, `-remoteWrite.url=http://srv+victoria-metrics/api/v1/write` automatically resolves the `victoria-metrics` DNS SRV to a list of hostnames with TCP ports and then sends the collected metrics to these TCP addresses. See [these docs](https://docs.victoriametrics.com/vmagent/#srv-urls) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support automatic discovering and load balancing for TCP addresses behind DNS SRV addresses. These addresses can be put inside `url_prefix` urls in the form `http://srv+addr/path`, where the `addr` is the [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) address, which is automatically resolved to hostnames with TCP ports. See [these docs](https://docs.victoriametrics.com/vmauth/#srv-urls) for details. * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support automatic discovering and load balancing for TCP addresses behind DNS SRV addresses. These addresses can be put inside `url_prefix` urls in the form `http://srv+addr/path`, where the `addr` is the [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) address, which is automatically resolved to hostnames with TCP ports. See [these docs](https://docs.victoriametrics.com/vmauth/#srv-urls) for details.
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support specifying client TLS certificates and TLS ServerName for requests to HTTPS backends. See [these docs](https://docs.victoriametrics.com/vmauth/#backend-tls-setup). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support specifying client TLS certificates and TLS ServerName for requests to HTTPS backends. See [these docs](https://docs.victoriametrics.com/vmauth/#backend-tls-setup).

View File

@ -178,9 +178,15 @@ See [these docs](https://docs.victoriametrics.com/cluster-victoriametrics/#repli
By default `vmagent` replicates data among remote storage systems enumerated via `-remoteWrite.url` command-line flag. By default `vmagent` replicates data among remote storage systems enumerated via `-remoteWrite.url` command-line flag.
If the `-remoteWrite.shardByURL` command-line flag is set, then `vmagent` spreads evenly If the `-remoteWrite.shardByURL` command-line flag is set, then `vmagent` spreads evenly
the outgoing [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) the outgoing [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) among all the remote storage systems
among all the remote storage systems enumerated via `-remoteWrite.url`. Note that samples for the same enumerated via `-remoteWrite.url`.
time series are routed to the same remote storage system if `-remoteWrite.shardByURL` flag is specified.
It is possible to replicate samples among remote storage systems by passing `-remoteWrite.shardByURLReplicas=N`
command-line flag to `vmagent` additionally to `-remoteWrite.shardByURL` command-line flag.
This instructs `vmagent` writing every outgoing sample to `N` distinct remote storage systems enumerated via `-remoteWrite.url`
in addition to sharding.
Samples for the same time series are routed to the same remote storage system if `-remoteWrite.shardByURL` flag is specified.
This allows building scalable data processing pipelines when a single remote storage cannot keep up with the data ingestion workload. This allows building scalable data processing pipelines when a single remote storage cannot keep up with the data ingestion workload.
For example, this allows building horizontally scalable [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/) For example, this allows building horizontally scalable [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/)
by routing outgoing samples for the same time series of [counter](https://docs.victoriametrics.com/keyconcepts/#counter) by routing outgoing samples for the same time series of [counter](https://docs.victoriametrics.com/keyconcepts/#counter)
@ -2131,7 +2137,7 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Supports array of values separated by comma or specified via multiple flags. Supports array of values separated by comma or specified via multiple flags.
Empty values are set to default value. Empty values are set to default value.
-remoteWrite.shardByURL -remoteWrite.shardByURL
Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages . See also -remoteWrite.shardByURLReplicas
-remoteWrite.shardByURL.ignoreLabels array -remoteWrite.shardByURL.ignoreLabels array
Optional list of labels, which must be ignored when sharding outgoing samples among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels Optional list of labels, which must be ignored when sharding outgoing samples among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
@ -2140,6 +2146,8 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Optional list of labels, which must be used for sharding outgoing samples among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels Optional list of labels, which must be used for sharding outgoing samples among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-remoteWrite.shardByURLReplicas int
How many copies of data to make among remote storage systems enumerated via -remoteWrite.url when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages (default 1)
-remoteWrite.showURL -remoteWrite.showURL
Whether to show -remoteWrite.url in the exported metrics. It is hidden by default, since it can contain sensitive info such as auth key Whether to show -remoteWrite.url in the exported metrics. It is hidden by default, since it can contain sensitive info such as auth key
-remoteWrite.significantFigures array -remoteWrite.significantFigures array