app/vmselect: add support for vmstorage groups with independent -replicationFactor per group

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5197

See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#vmstorage-groups-at-vmselect

Thanks to @zekker6 for the initial pull request at https://github.com/VictoriaMetrics/VictoriaMetrics-enterprise/pull/718
This commit is contained in:
Aliaksandr Valialkin 2023-12-13 00:06:30 +02:00
parent 3b841fe9ce
commit e4bb2808f1
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
8 changed files with 462 additions and 59 deletions

View File

@ -556,6 +556,8 @@ The cluster works in the following way when some of `vmstorage` nodes are unavai
if less than `-replicationFactor` vmstorage nodes are unavailable during querying, since it assumes that the remaining
`vmstorage` nodes contain the full data. See [these docs](#replication-and-data-safety) for details.
It is also possible to configure independent replication factor per distinct `vmstorage` groups - see [these docs](#vmstorage-groups-at-vmselect).
`vmselect` doesn't serve partial responses for API handlers returning raw datapoints - [`/api/v1/export*` endpoints](https://docs.victoriametrics.com/#how-to-export-time-series), since users usually expect this data is always complete.
Data replication can be used for increasing storage durability. See [these docs](#replication-and-data-safety) for details.
@ -667,17 +669,61 @@ Then an additional `vmselect` nodes can be configured for reading the data from
## Multi-level cluster setup
`vmselect` nodes can be queried by other `vmselect` nodes if they run with `-clusternativeListenAddr` command-line flag. For example, if `vmselect` is started with `-clusternativeListenAddr=:8401`, then it can accept queries from another `vmselect` nodes at TCP port 8401 in the same way as `vmstorage` nodes do. This allows chaining `vmselect` nodes and building multi-level cluster topologies. For example, the top-level `vmselect` node can query second-level `vmselect` nodes in different availability zones (AZ), while the second-level `vmselect` nodes can query `vmstorage` nodes in local AZ.
`vmselect` nodes can be queried by other `vmselect` nodes if they run with `-clusternativeListenAddr` command-line flag.
For example, if `vmselect` is started with `-clusternativeListenAddr=:8401`, then it can accept queries from another `vmselect` nodes at TCP port 8401
in the same way as `vmstorage` nodes do. This allows chaining `vmselect` nodes and building multi-level cluster topologies.
For example, the top-level `vmselect` node can query second-level `vmselect` nodes in different availability zones (AZ),
while the second-level `vmselect` nodes can query `vmstorage` nodes in local AZ. See also [vmstorage groups at vmselect](#vmstorage-groups-at-vmselect).
`vminsert` nodes can accept data from another `vminsert` nodes if they run with `-clusternativeListenAddr` command-line flag. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400`, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies. For example, the top-level `vminsert` node can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes in local AZ.
`vminsert` nodes can accept data from another `vminsert` nodes if they run with `-clusternativeListenAddr` command-line flag.
For example, if `vminsert` is started with `-clusternativeListenAddr=:8400`, then it can accept data from another `vminsert` nodes at TCP port 8400
in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies.
For example, the top-level `vminsert` node can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ),
while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes in local AZ.
The multi-level cluster setup for `vminsert` nodes has the following shortcomings because of synchronous replication and data sharding:
* Data ingestion speed is limited by the slowest link to AZ.
* `vminsert` nodes at top level re-route incoming data to the remaining AZs when some AZs are temporarily unavailable. This results in data gaps at AZs which were temporarily unavailable.
These issues are addressed by [vmagent](https://docs.victoriametrics.com/vmagent.html) when it runs in [multitenancy mode](https://docs.victoriametrics.com/vmagent.html#multitenancy). `vmagent` buffers data, which must be sent to a particular AZ, when this AZ is temporarily unavailable. The buffer is stored on disk. The buffered data is sent to AZ as soon as it becomes available.
These issues are addressed by [vmagent](https://docs.victoriametrics.com/vmagent.html) when it runs in [multitenancy mode](https://docs.victoriametrics.com/vmagent.html#multitenancy).
`vmagent` buffers data, which must be sent to a particular AZ, when this AZ is temporarily unavailable. The buffer is stored on disk. The buffered data is sent to AZ as soon as it becomes available.
## vmstorage groups at vmselect
`vmselect` can be configured to query multiple distinct groups of `vmstorage` nodes with individual `-replicationFactor` per each group.
The following format for `-storageNode` command-line flag value should be used for assigning a particular `addr` of `vmstorage` to a particular `groupName` -
`-storageNode=groupName/addr`. For example, the following command runs `vmselect`, which continues returning full responses if up to one node per each group is temporarily unavailable
because the given `-replicationFactor=2` is applied individually per each group:
```
/path/to/vmselect \
-replicationFactor=2 \
-storageNode=group1/host1 \
-storageNode=group1/host2 \
-storageNode=group1/host3 \
-storageNode=group2/host4 \
-storageNode=group2/host5 \
-storageNode=group2/host6 \
-storageNode=group3/host7 \
-storageNode=group3/host8 \
-storageNode=group3/host9
```
It is possible to specify distinct `-replicationFactor` per each group via the following format - `-replicationFactor=groupName:rf`.
For example, the following command runs `vmselect`, which uses `-replicationFactor=3` for the `group1`, while it uses `-replicationFactor=1` for the `group2`:
```
/path/to/vmselect \
-replicationFactor=group1:3 \
-storageNode=group1/host1 \
-storageNode=group1/host2 \
-storageNode=group1/host3 \
-replicationFactor=group2:1 \
-storageNode=group2/host4 \
-storageNode=group2/host5 \
-storageNode=group2/host6
```
## Helm
@ -690,19 +736,32 @@ It is available in the [helm-charts](https://github.com/VictoriaMetrics/helm-cha
## Replication and data safety
By default, VictoriaMetrics offloads replication to the underlying storage pointed by `-storageDataPath` such as [Google compute persistent disk](https://cloud.google.com/compute/docs/disks#pdspecs), which guarantees data durability. VictoriaMetrics supports application-level replication if replicated durable persistent disks cannot be used for some reason.
By default, VictoriaMetrics offloads replication to the underlying storage pointed by `-storageDataPath` such as [Google compute persistent disk](https://cloud.google.com/compute/docs/disks#pdspecs),
which guarantees data durability. VictoriaMetrics supports application-level replication if replicated durable persistent disks cannot be used for some reason.
The replication can be enabled by passing `-replicationFactor=N` command-line flag to `vminsert`. This instructs `vminsert` to store `N` copies for every ingested sample on `N` distinct `vmstorage` nodes. This guarantees that all the stored data remains available for querying if up to `N-1` `vmstorage` nodes are unavailable.
The replication can be enabled by passing `-replicationFactor=N` command-line flag to `vminsert`. This instructs `vminsert` to store `N` copies for every ingested sample
on `N` distinct `vmstorage` nodes. This guarantees that all the stored data remains available for querying if up to `N-1` `vmstorage` nodes are unavailable.
Passing `-replicationFactor=N` command-line flag to `vmselect` instructs it to not mark responses as `partial` if less than `-replicationFactor` vmstorage nodes are unavailable during the query. See [cluster availability docs](#cluster-availability) for details.
Passing `-replicationFactor=N` command-line flag to `vmselect` instructs it to not mark responses as `partial` if less than `-replicationFactor` vmstorage nodes are unavailable during the query.
See [cluster availability docs](#cluster-availability) for details.
The cluster must contain at least `2*N-1` `vmstorage` nodes, where `N` is replication factor, in order to maintain the given replication factor for newly ingested data when `N-1` of storage nodes are unavailable.
The cluster must contain at least `2*N-1` `vmstorage` nodes, where `N` is replication factor, in order to maintain the given replication factor
for newly ingested data when `N-1` of storage nodes are unavailable.
VictoriaMetrics stores timestamps with millisecond precision, so `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes when the replication is enabled, so they could de-duplicate replicated samples obtained from distinct `vmstorage` nodes during querying. If duplicate data is pushed to VictoriaMetrics from identically configured [vmagent](https://docs.victoriametrics.com/vmagent.html) instances or Prometheus instances, then the `-dedup.minScrapeInterval` must be set to `scrape_interval` from scrape configs according to [deduplication docs](#deduplication).
VictoriaMetrics stores timestamps with millisecond precision, so `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes when the replication is enabled,
so they could de-duplicate replicated samples obtained from distinct `vmstorage` nodes during querying. If duplicate data is pushed to VictoriaMetrics
from identically configured [vmagent](https://docs.victoriametrics.com/vmagent.html) instances or Prometheus instances, then the `-dedup.minScrapeInterval` must be set
to `scrape_interval` from scrape configs according to [deduplication docs](#deduplication).
Note that [replication doesn't save from disaster](https://medium.com/@valyala/speeding-up-backups-for-big-time-series-databases-533c1a927883), so it is recommended performing regular backups. See [these docs](#backups) for details.
Note that [replication doesn't save from disaster](https://medium.com/@valyala/speeding-up-backups-for-big-time-series-databases-533c1a927883),
so it is recommended performing regular backups. See [these docs](#backups) for details.
Note that the replication increases resource usage - CPU, RAM, disk space, network bandwidth - by up to `-replicationFactor=N` times, because `vminsert` stores `N` copies of incoming data to distinct `vmstorage` nodes and `vmselect` needs to de-duplicate the replicated data obtained from `vmstorage` nodes during querying. So it is more cost-effective to offload the replication to underlying replicated durable storage pointed by `-storageDataPath` such as [Google Compute Engine persistent disk](https://cloud.google.com/compute/docs/disks/#pdspecs), which is protected from data loss and data corruption. It also provides consistently high performance and [may be resized](https://cloud.google.com/compute/docs/disks/add-persistent-disk) without downtime. HDD-based persistent disks should be enough for the majority of use cases. It is recommended using durable replicated persistent volumes in Kubernetes.
Note that the replication increases resource usage - CPU, RAM, disk space, network bandwidth - by up to `-replicationFactor=N` times, because `vminsert` stores `N` copies
of incoming data to distinct `vmstorage` nodes and `vmselect` needs to de-duplicate the replicated data obtained from `vmstorage` nodes during querying.
So it is more cost-effective to offload the replication to underlying replicated durable storage pointed by `-storageDataPath`
such as [Google Compute Engine persistent disk](https://cloud.google.com/compute/docs/disks/#pdspecs), which is protected from data loss and data corruption.
It also provides consistently high performance and [may be resized](https://cloud.google.com/compute/docs/disks/add-persistent-disk) without downtime.
HDD-based persistent disks should be enough for the majority of use cases. It is recommended using durable replicated persistent volumes in Kubernetes.
## Deduplication

View File

@ -16,24 +16,26 @@ import (
"time"
"unsafe"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
"github.com/cespare/xxhash/v2"
)
var (
replicationFactor = flag.Int("replicationFactor", 1, "How many copies of every time series is available on the provided -storageNode nodes. "+
replicationFactor = flagutil.NewDictInt("replicationFactor", 1, "How many copies of every time series is available on the provided -storageNode nodes. "+
"vmselect continues returning full responses when up to replicationFactor-1 vmstorage nodes are temporarily unavailable during querying. "+
"See also -search.skipSlowReplicas")
skipSlowReplicas = flag.Bool("search.skipSlowReplicas", false, "Whether to skip -replicationFactor - 1 slowest vmstorage nodes during querying. "+
@ -1691,8 +1693,9 @@ type storageNodesRequest struct {
}
type rpcResult struct {
data interface{}
qt *querytracer.Tracer
data interface{}
qt *querytracer.Tracer
group *storageNodesGroup
}
func startStorageNodesRequest(qt *querytracer.Tracer, sns []*storageNode, denyPartialResponse bool,
@ -1705,8 +1708,9 @@ func startStorageNodesRequest(qt *querytracer.Tracer, sns []*storageNode, denyPa
go func(workerID uint, sn *storageNode) {
data := f(qtChild, workerID, sn)
resultsCh <- rpcResult{
data: data,
qt: qtChild,
data: data,
qt: qtChild,
group: sn.group,
}
}(uint(idx), sn)
}
@ -1750,13 +1754,14 @@ func (snr *storageNodesRequest) collectAllResults(f func(result interface{}) err
}
func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Counter, f func(result interface{}) error) (bool, error) {
var errsPartial []error
resultsCollected := 0
errsPartialPerGroup := make(map[*storageNodesGroup][]error)
resultsCollectedPerGroup := make(map[*storageNodesGroup]int)
sns := snr.sns
for i := 0; i < len(sns); i++ {
// There is no need in timer here, since all the goroutines executing the f function
// passed to startStorageNodesRequest must be finished until the deadline.
result := <-snr.resultsCh
group := result.group
if err := f(result.data); err != nil {
snr.finishQueryTracer(result.qt, fmt.Sprintf("error: %s", err))
var er *errRemote
@ -1774,12 +1779,14 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co
snr.finishQueryTracers("cancel request because query complexity limit was exceeded")
return false, err
}
errsPartial = append(errsPartial, err)
if snr.denyPartialResponse && len(errsPartial) >= *replicationFactor {
errsPartialPerGroup[group] = append(errsPartialPerGroup[group], err)
if snr.denyPartialResponse && len(errsPartialPerGroup[group]) >= group.replicationFactor {
// Return the error to the caller if partial responses are denied
// and the number of partial responses reach -replicationFactor,
// and the number of partial responses for the given group reach its replicationFactor,
// since this means that the response is partial.
snr.finishQueryTracers("cancel request because partial responses are denied and some vmstorage nodes failed to return response")
snr.finishQueryTracers(fmt.Sprintf("cancel request because partial responses are denied and replicationFactor=%d vmstorage nodes at group %q failed to return response",
group.replicationFactor, group.name))
// Returns 503 status code for partial response, so the caller could retry it if needed.
err = &httpserver.ErrorWithStatusCode{
@ -1791,47 +1798,102 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co
continue
}
snr.finishQueryTracer(result.qt, "")
resultsCollected++
if *skipSlowReplicas && resultsCollected > len(sns)-*replicationFactor {
// There is no need in waiting for the remaining results,
// because the collected results contain all the data according to the given -replicationFactor.
// This should speed up responses when a part of vmstorage nodes are slow and/or temporarily unavailable.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
//
// It is expected that cap(snr.resultsCh) == len(sns), otherwise goroutine leak is possible.
snr.finishQueryTracers(fmt.Sprintf("cancel request because -search.skipSlowReplicas is set and %d out of %d nodes already returned response "+
"according to -replicationFactor=%d", resultsCollected, len(sns), *replicationFactor))
return false, nil
resultsCollectedPerGroup[group]++
if *skipSlowReplicas {
canSkipSlowReplicas := true
for g, n := range resultsCollectedPerGroup {
if n <= g.nodesCount-g.replicationFactor {
canSkipSlowReplicas = false
break
}
}
if canSkipSlowReplicas {
// There is no need in waiting for the remaining results,
// because the collected results contain all the data according to the given per-group replicationFactor.
// This should speed up responses when a part of vmstorage nodes are slow and/or temporarily unavailable.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
snr.finishQueryTracers("cancel request because -search.skipSlowReplicas is set and every group returned the needed number of responses according to replicationFactor")
return false, nil
}
}
}
if len(errsPartial) < *replicationFactor {
// Verify whether the full result can be returned
isFullResponse := true
for g, errsPartial := range errsPartialPerGroup {
if len(errsPartial) >= g.replicationFactor {
isFullResponse = false
break
}
}
if isFullResponse {
// Assume that the result is full if the the number of failing vmstorage nodes
// is smaller than the -replicationFactor.
// is smaller than the replicationFactor per each group.
return false, nil
}
if len(errsPartial) == len(sns) {
// All the vmstorage nodes returned error.
// Return only the first error, since it has no sense in returning all errors.
// Returns 503 status code for partial response, so the caller could retry it if needed.
err := &httpserver.ErrorWithStatusCode{
Err: errsPartial[0],
StatusCode: http.StatusServiceUnavailable,
// Verify whether there is at least a single node per each group, which successfully returned result,
// in order to return partial result.
for g, errsPartial := range errsPartialPerGroup {
if len(errsPartial) == g.nodesCount {
// All the vmstorage nodes at the given group g returned error.
// Return only the first error, since it has no sense in returning all errors.
// Returns 503 status code for partial response, so the caller could retry it if needed.
err := &httpserver.ErrorWithStatusCode{
Err: errsPartial[0],
StatusCode: http.StatusServiceUnavailable,
}
return false, err
}
if len(errsPartial) > 0 {
partialErrorsLogger.Warnf("%d out of %d vmstorage nodes at group %q were unavailable during the query; a sample error: %s", len(errsPartial), len(sns), g.name, errsPartial[0])
}
return false, err
}
// Return partial results.
// This allows gracefully degrade vmselect in the case
// This allows continuing returning responses in the case
// if a part of vmstorage nodes are temporarily unavailable.
partialResultsCounter.Inc()
// Do not return the error, since it may spam logs on busy vmselect
// serving high amounts of requests.
partialErrorsLogger.Warnf("%d out of %d vmstorage nodes were unavailable during the query; a sample error: %s", len(errsPartial), len(sns), errsPartial[0])
return true, nil
}
var partialErrorsLogger = logger.WithThrottler("partialErrors", 10*time.Second)
type storageNodesGroup struct {
// group name
name string
// replicationFactor for the given group
replicationFactor int
// the number of nodes in the group
nodesCount int
}
func initStorageNodeGroups(addrs []string) map[string]*storageNodesGroup {
m := make(map[string]*storageNodesGroup)
for _, addr := range addrs {
groupName, _ := netutil.ParseGroupAddr(addr)
g, ok := m[groupName]
if !ok {
g = &storageNodesGroup{
name: groupName,
replicationFactor: replicationFactor.Get(groupName),
}
m[groupName] = g
}
g.nodesCount++
}
return m
}
type storageNode struct {
// The group this storageNode belongs to.
group *storageNodesGroup
// Connection pool for the given storageNode.
connPool *netutil.ConnPool
// The number of concurrent queries to storageNode.
@ -2765,6 +2827,9 @@ func initStorageNodes(addrs []string) *storageNodesBucket {
if len(addrs) == 0 {
logger.Panicf("BUG: addrs must be non-empty")
}
groupsMap := initStorageNodeGroups(addrs)
var snsLock sync.Mutex
sns := make([]*storageNode, 0, len(addrs))
var wg sync.WaitGroup
@ -2773,10 +2838,14 @@ func initStorageNodes(addrs []string) *storageNodesBucket {
// for big number of storage nodes.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4364
for _, addr := range addrs {
var groupName string
groupName, addr = netutil.ParseGroupAddr(addr)
group := groupsMap[groupName]
wg.Add(1)
go func(addr string) {
defer wg.Done()
sn := newStorageNode(ms, addr)
sn := newStorageNode(ms, group, addr)
snsLock.Lock()
sns = append(sns, sn)
snsLock.Unlock()
@ -2790,7 +2859,7 @@ func initStorageNodes(addrs []string) *storageNodesBucket {
}
}
func newStorageNode(ms *metrics.Set, addr string) *storageNode {
func newStorageNode(ms *metrics.Set, group *storageNodesGroup, addr string) *storageNode {
if _, _, err := net.SplitHostPort(addr); err != nil {
// Automatically add missing port.
addr += ":8401"
@ -2799,6 +2868,7 @@ func newStorageNode(ms *metrics.Set, addr string) *storageNode {
connPool := netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout, *vmstorageUserTimeout)
sn := &storageNode{
group: group,
connPool: connPool,
concurrentQueries: ms.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),

View File

@ -34,6 +34,7 @@ The sandbox cluster installation is running under the constant load generated by
* SECURITY: upgrade Go builder from Go1.21.4 to Go1.21.5. See [the list of issues addressed in Go1.21.5](https://github.com/golang/go/issues?q=milestone%3AGo1.21.5+label%3ACherryPickApproved).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add ability to send requests to the first available backend and fall back to other `hot standby` backends when the first backend is unavailable. This allows building highly available setups as shown in [these docs](https://docs.victoriametrics.com/vmauth.html#high-availability). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4792).
* FEATURE: `vmselect`: allow specifying multiple groups of `vmstorage` nodes with independent `-replicationFactor` per each group. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#vmstorage-groups-at-vmselect) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5197) for details.
* FEATURE: `vmselect`: allow opening [vmui](https://docs.victoriametrics.com/#vmui) and investigating [Top queries](https://docs.victoriametrics.com/#top-queries) and [Active queries](https://docs.victoriametrics.com/#active-queries) when the `vmselect` is overloaded with concurrent queries (e.g. when more than `-search.maxConcurrentRequests` concurrent queries are executed). Previously an attempt to open `Top queries` or `Active queries` at `vmui` could result in `couldn't start executing the request in ... seconds, since -search.maxConcurrentRequests=... concurrent requests are executed` error, which could complicate debugging of overloaded `vmselect` or single-node VictoriaMetrics.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-enableMultitenantHandlers` command-line flag, which allows receiving data via [VictoriaMetrics cluster urls](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) at `vmagent` and converting [tenant ids](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) to (`vm_account_id`, `vm_project_id`) labels before sending the data to the configured `-remoteWrite.url`. See [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.disableOnDiskQueue` command-line flag, which can be used for disabling data queueing to disk when the remote storage cannot keep up with the data ingestion rate. See [these docs](https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2110).

View File

@ -567,6 +567,8 @@ The cluster works in the following way when some of `vmstorage` nodes are unavai
if less than `-replicationFactor` vmstorage nodes are unavailable during querying, since it assumes that the remaining
`vmstorage` nodes contain the full data. See [these docs](#replication-and-data-safety) for details.
It is also possible to configure independent replication factor per distinct `vmstorage` groups - see [these docs](#vmstorage-groups-at-vmselect).
`vmselect` doesn't serve partial responses for API handlers returning raw datapoints - [`/api/v1/export*` endpoints](https://docs.victoriametrics.com/#how-to-export-time-series), since users usually expect this data is always complete.
Data replication can be used for increasing storage durability. See [these docs](#replication-and-data-safety) for details.
@ -678,17 +680,61 @@ Then an additional `vmselect` nodes can be configured for reading the data from
## Multi-level cluster setup
`vmselect` nodes can be queried by other `vmselect` nodes if they run with `-clusternativeListenAddr` command-line flag. For example, if `vmselect` is started with `-clusternativeListenAddr=:8401`, then it can accept queries from another `vmselect` nodes at TCP port 8401 in the same way as `vmstorage` nodes do. This allows chaining `vmselect` nodes and building multi-level cluster topologies. For example, the top-level `vmselect` node can query second-level `vmselect` nodes in different availability zones (AZ), while the second-level `vmselect` nodes can query `vmstorage` nodes in local AZ.
`vmselect` nodes can be queried by other `vmselect` nodes if they run with `-clusternativeListenAddr` command-line flag.
For example, if `vmselect` is started with `-clusternativeListenAddr=:8401`, then it can accept queries from another `vmselect` nodes at TCP port 8401
in the same way as `vmstorage` nodes do. This allows chaining `vmselect` nodes and building multi-level cluster topologies.
For example, the top-level `vmselect` node can query second-level `vmselect` nodes in different availability zones (AZ),
while the second-level `vmselect` nodes can query `vmstorage` nodes in local AZ. See also [vmstorage groups at vmselect](#vmstorage-groups-at-vmselect).
`vminsert` nodes can accept data from another `vminsert` nodes if they run with `-clusternativeListenAddr` command-line flag. For example, if `vminsert` is started with `-clusternativeListenAddr=:8400`, then it can accept data from another `vminsert` nodes at TCP port 8400 in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies. For example, the top-level `vminsert` node can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ), while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes in local AZ.
`vminsert` nodes can accept data from another `vminsert` nodes if they run with `-clusternativeListenAddr` command-line flag.
For example, if `vminsert` is started with `-clusternativeListenAddr=:8400`, then it can accept data from another `vminsert` nodes at TCP port 8400
in the same way as `vmstorage` nodes do. This allows chaining `vminsert` nodes and building multi-level cluster topologies.
For example, the top-level `vminsert` node can replicate data among the second level of `vminsert` nodes located in distinct availability zones (AZ),
while the second-level `vminsert` nodes can spread the data among `vmstorage` nodes in local AZ.
The multi-level cluster setup for `vminsert` nodes has the following shortcomings because of synchronous replication and data sharding:
* Data ingestion speed is limited by the slowest link to AZ.
* `vminsert` nodes at top level re-route incoming data to the remaining AZs when some AZs are temporarily unavailable. This results in data gaps at AZs which were temporarily unavailable.
These issues are addressed by [vmagent](https://docs.victoriametrics.com/vmagent.html) when it runs in [multitenancy mode](https://docs.victoriametrics.com/vmagent.html#multitenancy). `vmagent` buffers data, which must be sent to a particular AZ, when this AZ is temporarily unavailable. The buffer is stored on disk. The buffered data is sent to AZ as soon as it becomes available.
These issues are addressed by [vmagent](https://docs.victoriametrics.com/vmagent.html) when it runs in [multitenancy mode](https://docs.victoriametrics.com/vmagent.html#multitenancy).
`vmagent` buffers data, which must be sent to a particular AZ, when this AZ is temporarily unavailable. The buffer is stored on disk. The buffered data is sent to AZ as soon as it becomes available.
## vmstorage groups at vmselect
`vmselect` can be configured to query multiple distinct groups of `vmstorage` nodes with individual `-replicationFactor` per each group.
The following format for `-storageNode` command-line flag value should be used for assigning a particular `addr` of `vmstorage` to a particular `groupName` -
`-storageNode=groupName/addr`. For example, the following command runs `vmselect`, which continues returning full responses if up to one node per each group is temporarily unavailable
because the given `-replicationFactor=2` is applied individually per each group:
```
/path/to/vmselect \
-replicationFactor=2 \
-storageNode=group1/host1 \
-storageNode=group1/host2 \
-storageNode=group1/host3 \
-storageNode=group2/host4 \
-storageNode=group2/host5 \
-storageNode=group2/host6 \
-storageNode=group3/host7 \
-storageNode=group3/host8 \
-storageNode=group3/host9
```
It is possible to specify distinct `-replicationFactor` per each group via the following format - `-replicationFactor=groupName:rf`.
For example, the following command runs `vmselect`, which uses `-replicationFactor=3` for the `group1`, while it uses `-replicationFactor=1` for the `group2`:
```
/path/to/vmselect \
-replicationFactor=group1:3 \
-storageNode=group1/host1 \
-storageNode=group1/host2 \
-storageNode=group1/host3 \
-replicationFactor=group2:1 \
-storageNode=group2/host4 \
-storageNode=group2/host5 \
-storageNode=group2/host6
```
## Helm
@ -701,19 +747,32 @@ It is available in the [helm-charts](https://github.com/VictoriaMetrics/helm-cha
## Replication and data safety
By default, VictoriaMetrics offloads replication to the underlying storage pointed by `-storageDataPath` such as [Google compute persistent disk](https://cloud.google.com/compute/docs/disks#pdspecs), which guarantees data durability. VictoriaMetrics supports application-level replication if replicated durable persistent disks cannot be used for some reason.
By default, VictoriaMetrics offloads replication to the underlying storage pointed by `-storageDataPath` such as [Google compute persistent disk](https://cloud.google.com/compute/docs/disks#pdspecs),
which guarantees data durability. VictoriaMetrics supports application-level replication if replicated durable persistent disks cannot be used for some reason.
The replication can be enabled by passing `-replicationFactor=N` command-line flag to `vminsert`. This instructs `vminsert` to store `N` copies for every ingested sample on `N` distinct `vmstorage` nodes. This guarantees that all the stored data remains available for querying if up to `N-1` `vmstorage` nodes are unavailable.
The replication can be enabled by passing `-replicationFactor=N` command-line flag to `vminsert`. This instructs `vminsert` to store `N` copies for every ingested sample
on `N` distinct `vmstorage` nodes. This guarantees that all the stored data remains available for querying if up to `N-1` `vmstorage` nodes are unavailable.
Passing `-replicationFactor=N` command-line flag to `vmselect` instructs it to not mark responses as `partial` if less than `-replicationFactor` vmstorage nodes are unavailable during the query. See [cluster availability docs](#cluster-availability) for details.
Passing `-replicationFactor=N` command-line flag to `vmselect` instructs it to not mark responses as `partial` if less than `-replicationFactor` vmstorage nodes are unavailable during the query.
See [cluster availability docs](#cluster-availability) for details.
The cluster must contain at least `2*N-1` `vmstorage` nodes, where `N` is replication factor, in order to maintain the given replication factor for newly ingested data when `N-1` of storage nodes are unavailable.
The cluster must contain at least `2*N-1` `vmstorage` nodes, where `N` is replication factor, in order to maintain the given replication factor
for newly ingested data when `N-1` of storage nodes are unavailable.
VictoriaMetrics stores timestamps with millisecond precision, so `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes when the replication is enabled, so they could de-duplicate replicated samples obtained from distinct `vmstorage` nodes during querying. If duplicate data is pushed to VictoriaMetrics from identically configured [vmagent](https://docs.victoriametrics.com/vmagent.html) instances or Prometheus instances, then the `-dedup.minScrapeInterval` must be set to `scrape_interval` from scrape configs according to [deduplication docs](#deduplication).
VictoriaMetrics stores timestamps with millisecond precision, so `-dedup.minScrapeInterval=1ms` command-line flag must be passed to `vmselect` nodes when the replication is enabled,
so they could de-duplicate replicated samples obtained from distinct `vmstorage` nodes during querying. If duplicate data is pushed to VictoriaMetrics
from identically configured [vmagent](https://docs.victoriametrics.com/vmagent.html) instances or Prometheus instances, then the `-dedup.minScrapeInterval` must be set
to `scrape_interval` from scrape configs according to [deduplication docs](#deduplication).
Note that [replication doesn't save from disaster](https://medium.com/@valyala/speeding-up-backups-for-big-time-series-databases-533c1a927883), so it is recommended performing regular backups. See [these docs](#backups) for details.
Note that [replication doesn't save from disaster](https://medium.com/@valyala/speeding-up-backups-for-big-time-series-databases-533c1a927883),
so it is recommended performing regular backups. See [these docs](#backups) for details.
Note that the replication increases resource usage - CPU, RAM, disk space, network bandwidth - by up to `-replicationFactor=N` times, because `vminsert` stores `N` copies of incoming data to distinct `vmstorage` nodes and `vmselect` needs to de-duplicate the replicated data obtained from `vmstorage` nodes during querying. So it is more cost-effective to offload the replication to underlying replicated durable storage pointed by `-storageDataPath` such as [Google Compute Engine persistent disk](https://cloud.google.com/compute/docs/disks/#pdspecs), which is protected from data loss and data corruption. It also provides consistently high performance and [may be resized](https://cloud.google.com/compute/docs/disks/add-persistent-disk) without downtime. HDD-based persistent disks should be enough for the majority of use cases. It is recommended using durable replicated persistent volumes in Kubernetes.
Note that the replication increases resource usage - CPU, RAM, disk space, network bandwidth - by up to `-replicationFactor=N` times, because `vminsert` stores `N` copies
of incoming data to distinct `vmstorage` nodes and `vmselect` needs to de-duplicate the replicated data obtained from `vmstorage` nodes during querying.
So it is more cost-effective to offload the replication to underlying replicated durable storage pointed by `-storageDataPath`
such as [Google Compute Engine persistent disk](https://cloud.google.com/compute/docs/disks/#pdspecs), which is protected from data loss and data corruption.
It also provides consistently high performance and [may be resized](https://cloud.google.com/compute/docs/disks/add-persistent-disk) without downtime.
HDD-based persistent disks should be enough for the majority of use cases. It is recommended using durable replicated persistent volumes in Kubernetes.
## Deduplication

100
lib/flagutil/dict.go Normal file
View File

@ -0,0 +1,100 @@
package flagutil
import (
"flag"
"fmt"
"strconv"
"strings"
)
// DictInt allows specifying a dictionary of named ints in the form `name1:value1,...,nameN:valueN`.
type DictInt struct {
defaultValue int
kvs []kIntValue
}
type kIntValue struct {
k string
v int
}
// NewDictInt creates DictInt with the given name, defaultValue and description.
func NewDictInt(name string, defaultValue int, description string) *DictInt {
description += fmt.Sprintf(" (default %d)", defaultValue)
description += "\nSupports an `array` of `key:value` entries separated by comma or specified via multiple flags."
di := &DictInt{
defaultValue: defaultValue,
}
flag.Var(di, name, description)
return di
}
// String implements flag.Value interface
func (di *DictInt) String() string {
kvs := di.kvs
if len(kvs) == 1 && kvs[0].k == "" {
// Short form - a single int value
return strconv.Itoa(kvs[0].v)
}
formattedResults := make([]string, len(kvs))
for i, kv := range kvs {
formattedResults[i] = fmt.Sprintf("%s:%d", kv.k, kv.v)
}
return strings.Join(formattedResults, ",")
}
// Set implements flag.Value interface
func (di *DictInt) Set(value string) error {
values := parseArrayValues(value)
if len(di.kvs) == 0 && len(values) == 1 && strings.IndexByte(values[0], ':') < 0 {
v, err := strconv.Atoi(values[0])
if err != nil {
return err
}
di.kvs = append(di.kvs, kIntValue{
v: v,
})
return nil
}
for _, x := range values {
n := strings.IndexByte(x, ':')
if n < 0 {
return fmt.Errorf("missing ':' in %q", x)
}
k := x[:n]
v, err := strconv.Atoi(x[n+1:])
if err != nil {
return fmt.Errorf("cannot parse value for key=%q: %w", k, err)
}
if di.contains(k) {
return fmt.Errorf("duplicate value for key=%q: %d", k, v)
}
di.kvs = append(di.kvs, kIntValue{
k: k,
v: v,
})
}
return nil
}
func (di *DictInt) contains(key string) bool {
for _, kv := range di.kvs {
if kv.k == key {
return true
}
}
return false
}
// Get returns value for the given key.
//
// Default value is returned if key isn't found in di.
func (di *DictInt) Get(key string) int {
for _, kv := range di.kvs {
if kv.k == key {
return kv.v
}
}
return di.defaultValue
}

69
lib/flagutil/dict_test.go Normal file
View File

@ -0,0 +1,69 @@
package flagutil
import (
"testing"
)
func TestDictIntSetSuccess(t *testing.T) {
f := func(s string) {
t.Helper()
var di DictInt
if err := di.Set(s); err != nil {
t.Fatalf("unexpected error: %s", err)
}
result := di.String()
if result != s {
t.Fatalf("unexpected DictInt.String(); got %q; want %q", result, s)
}
}
f("")
f("123")
f("-234")
f("foo:123")
f("foo:123,bar:-42,baz:0,aa:43")
}
func TestDictIntFailure(t *testing.T) {
f := func(s string) {
t.Helper()
var di DictInt
if err := di.Set(s); err == nil {
t.Fatalf("expecting non-nil error")
}
}
// missing values
f("foo")
f("foo:")
// non-integer values
f("foo:bar")
f("12.34")
f("foo:123.34")
// duplicate keys
f("a:234,k:123,k:432")
}
func TestDictIntGet(t *testing.T) {
f := func(s, key string, defaultValue, expectedValue int) {
t.Helper()
var di DictInt
di.defaultValue = defaultValue
if err := di.Set(s); err != nil {
t.Fatalf("unexpected error: %s", err)
}
value := di.Get(key)
if value != expectedValue {
t.Fatalf("unexpected value; got %d; want %d", value, expectedValue)
}
}
f("", "", 123, 123)
f("", "foo", 123, 123)
f("foo:42", "", 123, 123)
f("foo:42", "foo", 123, 42)
f("532", "", 123, 532)
f("532", "foo", 123, 123)
}

19
lib/netutil/dns.go Normal file
View File

@ -0,0 +1,19 @@
package netutil
import (
"strings"
)
// ParseGroupAddr parses `groupID/addrX` addr and returns (groupID, addrX).
//
// If addr doesn't contain `groupID/` prefix, then ("", addr) is returned.
func ParseGroupAddr(addr string) (string, string) {
n := strings.IndexByte(addr, '/')
if n < 0 {
return "", addr
}
if strings.HasPrefix(addr, "file:") {
return "", addr
}
return addr[:n], addr[n+1:]
}

26
lib/netutil/dns_test.go Normal file
View File

@ -0,0 +1,26 @@
package netutil
import (
"testing"
)
func TestParseGroupAddr(t *testing.T) {
f := func(s, groupIDExpected, addrExpected string) {
t.Helper()
groupID, addr := ParseGroupAddr(s)
if groupID != groupIDExpected {
t.Fatalf("unexpected groupID; got %q; want %q", groupID, groupIDExpected)
}
if addr != addrExpected {
t.Fatalf("unexpected addr; got %q; want %q", addr, addrExpected)
}
}
f("", "", "")
f("foo", "", "foo")
f("file:/foo/bar", "", "file:/foo/bar")
f("foo/bar", "foo", "bar")
f("foo/dns+srv:bar", "foo", "dns+srv:bar")
f("foo/file:/bar/baz", "foo", "file:/bar/baz")
}