app/vmagent/remotewrite: add support for replication additionally to sharding when both -remoteWrite.shardByURL and -remoteWrite.shardByURLReplicas=RF command-line flags are set

This allows setting up data replication among failure domains if the replication factor is smaller than the number of failure domains.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054

See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages
This commit is contained in:
Aliaksandr Valialkin 2024-04-19 11:25:41 +02:00
parent 54605f5dd1
commit 8f535f9e76
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
4 changed files with 167 additions and 54 deletions

View File

@ -47,14 +47,19 @@ var (
"https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers "+
"according to https://docs.victoriametrics.com/#how-to-import-time-series-data ."+
"See https://docs.victoriametrics.com/vmagent/#multitenancy for details")
shardByURL = flag.Bool("remoteWrite.shardByURL", false, "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . "+
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages")
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages . "+
"See also -remoteWrite.shardByURLReplicas")
shardByURLReplicas = flag.Int("remoteWrite.shardByURLReplicas", 1, "How many copies of data to make among remote storage systems enumerated via -remoteWrite.url "+
"when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages")
shardByURLLabels = flagutil.NewArrayString("remoteWrite.shardByURL.labels", "Optional list of labels, which must be used for sharding outgoing samples "+
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels")
shardByURLIgnoreLabels = flagutil.NewArrayString("remoteWrite.shardByURL.ignoreLabels", "Optional list of labels, which must be ignored when sharding outgoing samples "+
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+
"even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . "+
"See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue")
keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+
@ -562,58 +567,17 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
// We need to push tssBlock to multiple remote storages.
// This is either sharding or replication depending on -remoteWrite.shardByURL command-line flag value.
if *shardByURL {
// Shard the data among rwctxs
tssByURL := make([][]prompbmarshal.TimeSeries, len(rwctxs))
tmpLabels := promutils.GetLabels()
for _, ts := range tssBlock {
hashLabels := ts.Labels
if len(shardByURLLabelsMap) > 0 {
hashLabels = tmpLabels.Labels[:0]
for _, label := range ts.Labels {
if _, ok := shardByURLLabelsMap[label.Name]; ok {
hashLabels = append(hashLabels, label)
}
}
tmpLabels.Labels = hashLabels
} else if len(shardByURLIgnoreLabelsMap) > 0 {
hashLabels = tmpLabels.Labels[:0]
for _, label := range ts.Labels {
if _, ok := shardByURLIgnoreLabelsMap[label.Name]; !ok {
hashLabels = append(hashLabels, label)
}
}
tmpLabels.Labels = hashLabels
}
h := getLabelsHash(hashLabels)
idx := h % uint64(len(tssByURL))
tssByURL[idx] = append(tssByURL[idx], ts)
if *shardByURL && *shardByURLReplicas < len(rwctxs) {
// Shard tssBlock samples among rwctxs.
replicas := *shardByURLReplicas
if replicas <= 0 {
replicas = 1
}
promutils.PutLabels(tmpLabels)
// Push sharded data to remote storages in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
var anyPushFailed atomic.Bool
for i, rwctx := range rwctxs {
tssShard := tssByURL[i]
if len(tssShard) == 0 {
continue
}
wg.Add(1)
go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) {
defer wg.Done()
if !rwctx.TryPush(tss) {
anyPushFailed.Store(true)
}
}(rwctx, tssShard)
}
wg.Wait()
return !anyPushFailed.Load()
return tryShardingBlockAmongRemoteStorages(rwctxs, tssBlock, replicas)
}
// Replicate data among rwctxs.
// Push block to remote storages in parallel in order to reduce
// Replicate tssBlock samples among rwctxs.
// Push tssBlock to remote storage systems in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
wg.Add(len(rwctxs))
@ -630,6 +594,97 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
return !anyPushFailed.Load()
}
func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, replicas int) bool {
x := getTSSShards(len(rwctxs))
defer putTSSShards(x)
shards := x.shards
tmpLabels := promutils.GetLabels()
for _, ts := range tssBlock {
hashLabels := ts.Labels
if len(shardByURLLabelsMap) > 0 {
hashLabels = tmpLabels.Labels[:0]
for _, label := range ts.Labels {
if _, ok := shardByURLLabelsMap[label.Name]; ok {
hashLabels = append(hashLabels, label)
}
}
tmpLabels.Labels = hashLabels
} else if len(shardByURLIgnoreLabelsMap) > 0 {
hashLabels = tmpLabels.Labels[:0]
for _, label := range ts.Labels {
if _, ok := shardByURLIgnoreLabelsMap[label.Name]; !ok {
hashLabels = append(hashLabels, label)
}
}
tmpLabels.Labels = hashLabels
}
h := getLabelsHash(hashLabels)
idx := h % uint64(len(shards))
i := 0
for {
shards[idx] = append(shards[idx], ts)
i++
if i >= replicas {
break
}
idx++
if idx >= uint64(len(shards)) {
idx = 0
}
}
}
promutils.PutLabels(tmpLabels)
// Push sharded samples to remote storage systems in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
var anyPushFailed atomic.Bool
for i, rwctx := range rwctxs {
shard := shards[i]
if len(shard) == 0 {
continue
}
wg.Add(1)
go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) {
defer wg.Done()
if !rwctx.TryPush(tss) {
anyPushFailed.Store(true)
}
}(rwctx, shard)
}
wg.Wait()
return !anyPushFailed.Load()
}
type tssShards struct {
shards [][]prompbmarshal.TimeSeries
}
func getTSSShards(n int) *tssShards {
v := tssShardsPool.Get()
if v == nil {
v = &tssShards{}
}
x := v.(*tssShards)
if cap(x.shards) < n {
x.shards = make([][]prompbmarshal.TimeSeries, n)
}
x.shards = x.shards[:n]
return x
}
func putTSSShards(x *tssShards) {
shards := x.shards
for i := range shards {
clear(shards[i])
shards[i] = shards[i][:0]
}
tssShardsPool.Put(x)
}
var tssShardsPool sync.Pool
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) {
if !*sortLabels {

View File

@ -0,0 +1,48 @@
package remotewrite
import (
"fmt"
"math"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestGetLabelsHash_Distribution(t *testing.T) {
f := func(bucketsCount int) {
t.Helper()
// Distribute itemsCount hashes returned by getLabelsHash() across bucketsCount buckets.
itemsCount := 1_000 * bucketsCount
m := make([]int, bucketsCount)
var labels []prompbmarshal.Label
for i := 0; i < itemsCount; i++ {
labels = append(labels[:0], prompbmarshal.Label{
Name: "__name__",
Value: fmt.Sprintf("some_name_%d", i),
})
for j := 0; j < 10; j++ {
labels = append(labels, prompbmarshal.Label{
Name: fmt.Sprintf("label_%d", j),
Value: fmt.Sprintf("value_%d_%d", i, j),
})
}
h := getLabelsHash(labels)
m[h%uint64(bucketsCount)]++
}
// Verify that the distribution is even
expectedItemsPerBucket := itemsCount / bucketsCount
for _, n := range m {
if math.Abs(1-float64(n)/float64(expectedItemsPerBucket)) > 0.04 {
t.Fatalf("unexpected items in the bucket for %d buckets; got %d; want around %d", bucketsCount, n, expectedItemsPerBucket)
}
}
}
f(2)
f(3)
f(4)
f(5)
f(10)
}

View File

@ -32,6 +32,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add support for fault domain awareness to `vmselect`. It can be configured to return full responses if up to `-globalReplicationFactor - 1` fault domains (aka `vmstorage` groups) are unavailable. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054) and [these docs](https://docs.victoriametrics.com/cluster-victoriametrics/#vmstorage-groups-at-vmselect).
* FEATURE: all VictoriaMetrics [enterprise](https://docs.victoriametrics.com/enterprise/) components: add support for automatic issuing of TLS certificates for HTTPS server at `-httpListenAddr` via [Let's Encrypt service](https://letsencrypt.org/). See [these docs](https://docs.victoriametrics.com/#automatic-issuing-of-tls-certificates) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5949).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support data replication additionally to sharding among remote storage systems if `-remoteWrite.shardByURLReplicas=N` command-line flag is set additionally to `-remoteWrite.shardByURL` command-line flag, where `N` is desired replication factor. This allows setting up data replication among failure domains when the replication factor is smaller than the number of failure domains. See [these docs](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6054).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): reduce CPU usage when [sharding among remote storage systems](https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages) is enabled.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) addresses in `-remoteWrite.url` command-line option and in scrape target urls. For example, `-remoteWrite.url=http://srv+victoria-metrics/api/v1/write` automatically resolves the `victoria-metrics` DNS SRV to a list of hostnames with TCP ports and then sends the collected metrics to these TCP addresses. See [these docs](https://docs.victoriametrics.com/vmagent/#srv-urls) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support automatic discovering and load balancing for TCP addresses behind DNS SRV addresses. These addresses can be put inside `url_prefix` urls in the form `http://srv+addr/path`, where the `addr` is the [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) address, which is automatically resolved to hostnames with TCP ports. See [these docs](https://docs.victoriametrics.com/vmauth/#srv-urls) for details.
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): support specifying client TLS certificates and TLS ServerName for requests to HTTPS backends. See [these docs](https://docs.victoriametrics.com/vmauth/#backend-tls-setup).

View File

@ -178,9 +178,15 @@ See [these docs](https://docs.victoriametrics.com/cluster-victoriametrics/#repli
By default `vmagent` replicates data among remote storage systems enumerated via `-remoteWrite.url` command-line flag.
If the `-remoteWrite.shardByURL` command-line flag is set, then `vmagent` spreads evenly
the outgoing [time series](https://docs.victoriametrics.com/keyconcepts/#time-series)
among all the remote storage systems enumerated via `-remoteWrite.url`. Note that samples for the same
time series are routed to the same remote storage system if `-remoteWrite.shardByURL` flag is specified.
the outgoing [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) among all the remote storage systems
enumerated via `-remoteWrite.url`.
It is possible to replicate samples among remote storage systems by passing `-remoteWrite.shardByURLReplicas=N`
command-line flag to `vmagent` additionally to `-remoteWrite.shardByURL` command-line flag.
This instructs `vmagent` writing every outgoing sample to `N` distinct remote storage systems enumerated via `-remoteWrite.url`
in addition to sharding.
Samples for the same time series are routed to the same remote storage system if `-remoteWrite.shardByURL` flag is specified.
This allows building scalable data processing pipelines when a single remote storage cannot keep up with the data ingestion workload.
For example, this allows building horizontally scalable [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/)
by routing outgoing samples for the same time series of [counter](https://docs.victoriametrics.com/keyconcepts/#counter)
@ -2131,7 +2137,7 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Supports array of values separated by comma or specified via multiple flags.
Empty values are set to default value.
-remoteWrite.shardByURL
Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages
Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages . See also -remoteWrite.shardByURLReplicas
-remoteWrite.shardByURL.ignoreLabels array
Optional list of labels, which must be ignored when sharding outgoing samples among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.labels
Supports an array of values separated by comma or specified via multiple flags.
@ -2140,6 +2146,8 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Optional list of labels, which must be used for sharding outgoing samples among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain even distribution of series over the specified -remoteWrite.url systems. See also -remoteWrite.shardByURL.ignoreLabels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-remoteWrite.shardByURLReplicas int
How many copies of data to make among remote storage systems enumerated via -remoteWrite.url when -remoteWrite.shardByURL is set. See https://docs.victoriametrics.com/vmagent/#sharding-among-remote-storages (default 1)
-remoteWrite.showURL
Whether to show -remoteWrite.url in the exported metrics. It is hidden by default, since it can contain sensitive info such as auth key
-remoteWrite.significantFigures array