app/vminsert: add -replicationFactor command-line flag for enabling data replication among available -storageNode instances

This commit is contained in:
Aliaksandr Valialkin 2020-05-27 17:29:37 +03:00
parent b4e3bffe4b
commit 77e5165e7b
3 changed files with 60 additions and 23 deletions

View File

@ -16,8 +16,9 @@ Join [our Slack](http://slack.victoriametrics.com/) or [contact us](mailto:info@
## Prominent features ## Prominent features
- Supports all the features of [single-node version](https://github.com/VictoriaMetrics/VictoriaMetrics). - Supports all the features of [single-node version](https://github.com/VictoriaMetrics/VictoriaMetrics).
- Performance and capacity scales horizontally. - Performance and capacity scales horizontally. See [these docs for details](#cluster-resizing-and-scalability).
- Supports multiple independent namespaces for time series data (aka multi-tenancy). - Supports multiple independent namespaces for time series data (aka multi-tenancy). See [these docs for details](#multitenancy).
- Supports replication. See [these docs for details](#replication-and-data-safety).
## Architecture overview ## Architecture overview
@ -203,7 +204,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
across `vmstorage` nodes. across `vmstorage` nodes.
### Cluster resizing and scalability. ### Cluster resizing and scalability
Cluster performance and capacity scales with adding new nodes. Cluster performance and capacity scales with adding new nodes.
@ -283,7 +284,7 @@ Upgrade follows `Cluster resizing procedure` under the hood.
### Replication and data safety ### Replication and data safety
VictoriaMetrics offloads replication to the underlying storage pointed by `-storageDataPath`. By default VictoriaMetrics offloads replication to the underlying storage pointed by `-storageDataPath`.
It is recommended storing data on [Google Compute Engine persistent disks](https://cloud.google.com/compute/docs/disks/#pdspecs), It is recommended storing data on [Google Compute Engine persistent disks](https://cloud.google.com/compute/docs/disks/#pdspecs),
since they are protected from data loss and data corruption. They also provide consistently high performance since they are protected from data loss and data corruption. They also provide consistently high performance
and [may be resized](https://cloud.google.com/compute/docs/disks/add-persistent-disk) without downtime. and [may be resized](https://cloud.google.com/compute/docs/disks/add-persistent-disk) without downtime.
@ -291,7 +292,13 @@ HDD-based persistent disks should be enough for the majority of use cases.
It is recommended using durable replicated persistent volumes in Kubernetes. It is recommended using durable replicated persistent volumes in Kubernetes.
Note that [replication doesn't save from disaster](https://medium.com/@valyala/speeding-up-backups-for-big-time-series-databases-533c1a927883). If `-replicationFactor=N` command-line flag is passed to `vminsert`, then `vminsert` puts `N` copies of the ingested data to distinct `vmstorage` nodes.
This guarantees that all the data remains available for querying if up to `N-1` `vmstorage` nodes are unavailable. Note that `-dedup.minScrapeInterval=1ms` command-line
flag must be passed to `vmselect` if `-replicationFactor` exceeds 1 in order to de-duplicate replicated data during queries.
It is OK if `-dedup.minScrapeInterval` exceeds 1ms.
Note that [replication doesn't save from disaster](https://medium.com/@valyala/speeding-up-backups-for-big-time-series-databases-533c1a927883),
so it is recommended performing regular backups. See [these docs](#backups) for details.
### Backups ### Backups

View File

@ -1,6 +1,7 @@
package netstorage package netstorage
import ( import (
"flag"
"fmt" "fmt"
"net/http" "net/http"
@ -14,6 +15,10 @@ import (
jump "github.com/lithammer/go-jump-consistent-hash" jump "github.com/lithammer/go-jump-consistent-hash"
) )
var replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+
"Higher values for -dedup.minScrapeInterval at vmselect is OK")
// InsertCtx is a generic context for inserting data. // InsertCtx is a generic context for inserting data.
// //
// InsertCtx.Reset must be called before the first usage. // InsertCtx.Reset must be called before the first usage.
@ -115,20 +120,38 @@ func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, time
// WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx. // WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx.
func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error { func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
br := &ctx.bufRowss[storageNodeIdx] idx := storageNodeIdx
sn := storageNodes[storageNodeIdx] replicas := *replicationFactor
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value) if replicas <= 0 {
if len(bufNew) >= maxBufSizePerStorageNode { replicas = 1
// Send buf to storageNode, since it is too big. }
if err := br.pushTo(sn); err != nil { if replicas > len(storageNodes) {
return err replicas = len(storageNodes)
} }
br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value) for {
} else { br := &ctx.bufRowss[idx]
br.buf = bufNew sn := storageNodes[idx]
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value)
if len(bufNew) >= maxBufSizePerStorageNode {
// Send buf to storageNode, since it is too big.
if err := br.pushTo(sn); err != nil {
return err
}
br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value)
} else {
br.buf = bufNew
}
br.rows++
replicas--
if replicas == 0 {
return nil
}
idx++
if idx >= len(storageNodes) {
idx = 0
}
} }
br.rows++
return nil
} }
// FlushBufs flushes ctx bufs to remote storage nodes. // FlushBufs flushes ctx bufs to remote storage nodes.

View File

@ -16,8 +16,9 @@ Join [our Slack](http://slack.victoriametrics.com/) or [contact us](mailto:info@
## Prominent features ## Prominent features
- Supports all the features of [single-node version](https://github.com/VictoriaMetrics/VictoriaMetrics). - Supports all the features of [single-node version](https://github.com/VictoriaMetrics/VictoriaMetrics).
- Performance and capacity scales horizontally. - Performance and capacity scales horizontally. See [these docs for details](#cluster-resizing-and-scalability).
- Supports multiple independent namespaces for time series data (aka multi-tenancy). - Supports multiple independent namespaces for time series data (aka multi-tenancy). See [these docs for details](#multitenancy).
- Supports replication. See [these docs for details](#replication-and-data-safety).
## Architecture overview ## Architecture overview
@ -203,7 +204,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
across `vmstorage` nodes. across `vmstorage` nodes.
### Cluster resizing and scalability. ### Cluster resizing and scalability
Cluster performance and capacity scales with adding new nodes. Cluster performance and capacity scales with adding new nodes.
@ -283,7 +284,7 @@ Upgrade follows `Cluster resizing procedure` under the hood.
### Replication and data safety ### Replication and data safety
VictoriaMetrics offloads replication to the underlying storage pointed by `-storageDataPath`. By default VictoriaMetrics offloads replication to the underlying storage pointed by `-storageDataPath`.
It is recommended storing data on [Google Compute Engine persistent disks](https://cloud.google.com/compute/docs/disks/#pdspecs), It is recommended storing data on [Google Compute Engine persistent disks](https://cloud.google.com/compute/docs/disks/#pdspecs),
since they are protected from data loss and data corruption. They also provide consistently high performance since they are protected from data loss and data corruption. They also provide consistently high performance
and [may be resized](https://cloud.google.com/compute/docs/disks/add-persistent-disk) without downtime. and [may be resized](https://cloud.google.com/compute/docs/disks/add-persistent-disk) without downtime.
@ -291,7 +292,13 @@ HDD-based persistent disks should be enough for the majority of use cases.
It is recommended using durable replicated persistent volumes in Kubernetes. It is recommended using durable replicated persistent volumes in Kubernetes.
Note that [replication doesn't save from disaster](https://medium.com/@valyala/speeding-up-backups-for-big-time-series-databases-533c1a927883). If `-replicationFactor=N` command-line flag is passed to `vminsert`, then `vminsert` puts `N` copies of the ingested data to distinct `vmstorage` nodes.
This guarantees that all the data remains available for querying if up to `N-1` `vmstorage` nodes are unavailable. Note that `-dedup.minScrapeInterval=1ms` command-line
flag must be passed to `vmselect` if `-replicationFactor` exceeds 1 in order to de-duplicate replicated data during queries.
It is OK if `-dedup.minScrapeInterval` exceeds 1ms.
Note that [replication doesn't save from disaster](https://medium.com/@valyala/speeding-up-backups-for-big-time-series-databases-533c1a927883),
so it is recommended performing regular backups. See [these docs](#backups) for details.
### Backups ### Backups