mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
app/vmstorage: add ability to limit series cardinality via -storage.maxHourlySeries
and -storage.maxDailySeries
command-line flags
This commit is contained in:
parent
7e526effaa
commit
ad73f226ff
24
README.md
24
README.md
@ -93,6 +93,7 @@ Alphabetically sorted links to case studies:
|
||||
* [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format).
|
||||
* [Arbitrary CSV data](#how-to-import-csv-data).
|
||||
* Supports metrics' relabeling. See [these docs](#relabeling) for details.
|
||||
* Can deal with high cardinality and high churn rate issues using [series limiter](#cardinality-limiter).
|
||||
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
||||
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
||||
* See also technical [Articles about VictoriaMetrics](https://docs.victoriametrics.com/Articles.html).
|
||||
@ -154,6 +155,8 @@ Alphabetically sorted links to case studies:
|
||||
* [Security](#security)
|
||||
* [Tuning](#tuning)
|
||||
* [Monitoring](#monitoring)
|
||||
* [TSDB stats](#tsdb-stats)
|
||||
* [Cardinality limiter](#cardinality-limiter)
|
||||
* [Troubleshooting](#troubleshooting)
|
||||
* [Data migration](#data-migration)
|
||||
* [Backfilling](#backfilling)
|
||||
@ -1334,6 +1337,23 @@ VictoriaMetrics returns TSDB stats at `/api/v1/status/tsdb` page in the way simi
|
||||
* `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details.
|
||||
|
||||
|
||||
## Cardinality limiter
|
||||
|
||||
By default VictoriaMetrics doesn't limit the number of stored time series. The limit can be enforced by setting the following command-line flags:
|
||||
|
||||
* `-storage.maxHourlySeries` - limits the number of time series that can be added during the last hour. Useful for limiting the number of active time series.
|
||||
* `-storage.maxDailySeries` - limits the number of time series that can be added during the last day. Useful for limiting daily churn rate.
|
||||
|
||||
Both limits can be set simultaneously. If any of these limits is reached, then incoming samples for new time series are dropped. A sample of dropped series is put in the log with `WARNING` level.
|
||||
|
||||
The exceeded limits can be [monitored](#monitoring) with the following metrics:
|
||||
|
||||
* `vm_hourly_series_limit_rows_dropped_total` - the number of metrics dropped due to exceeded hourly limit on the number of unique time series.
|
||||
* `vm_daily_series_limit_rows_dropped_total` - the number of metrics dropped due to exceeded daily limit on the number of unique time series.
|
||||
|
||||
These limits are approximate, so VictoriaMetrics can underflow/overflow the limit by a small percentage (usually less than 1%).
|
||||
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
* It is recommended to use default command-line flag values (i.e. don't set them explicitly) until the need
|
||||
@ -1807,6 +1827,10 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
||||
authKey, which must be passed in query string to /snapshot* pages
|
||||
-sortLabels
|
||||
Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit
|
||||
-storage.maxDailySeries int
|
||||
The maximum number of unique series can be added to the storage during the last 24 hours. Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries
|
||||
-storage.maxHourlySeries int
|
||||
The maximum number of unique series can be added to the storage during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -storage.maxDailySeries
|
||||
-storageDataPath string
|
||||
Path to storage data (default "victoria-metrics-data")
|
||||
-tls
|
||||
|
@ -325,12 +325,12 @@ By default `vmagent` doesn't limit the number of time series written to remote s
|
||||
* `-remoteWrite.maxHourlySeries` - limits the number of unique time series `vmagent` can write to remote storage systems during the last hour. Useful for limiting the number of active time series.
|
||||
* `-remoteWrite.maxDailySeries` - limits the number of unique time series `vmagent` can write to remote storage systems during the last day. Useful for limiting daily churn rate.
|
||||
|
||||
Both limits can be set simultaneously. It any of these limits is reached, then new time series are dropped before sending the data to remote storage systems. A sample of dropped series is put in the log with `WARNING` level.
|
||||
Both limits can be set simultaneously. If any of these limits is reached, then samples for new time series are dropped instead of sending them to remote storage systems. A sample of dropped series is put in the log with `WARNING` level.
|
||||
|
||||
The exceeded limits can be [monitored](#monitoring) with the following metrics:
|
||||
|
||||
* `vmagent_hourly_series_limit_samples_dropped_total` - the number of metrics dropped due to exceeding hourly limit on the number of unique time series.
|
||||
* `vmagent_daily_series_limit_samples_dropped_total` - the number of metrics dropped due to exceeding daily limit on the number of unique time series.
|
||||
* `vmagent_hourly_series_limit_rows_dropped_total` - the number of metrics dropped due to exceeded hourly limit on the number of unique time series.
|
||||
* `vmagent_daily_series_limit_rows_dropped_total` - the number of metrics dropped due to exceeded daily limit on the number of unique time series.
|
||||
|
||||
These limits are approximate, so `vmagent` can underflow/overflow the limit by a small percentage (usually less than 1%).
|
||||
|
||||
@ -683,9 +683,13 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
||||
-remoteWrite.maxBlockSize size
|
||||
The maximum size in bytes of unpacked request to send to remote storage. It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 8388608)
|
||||
-remoteWrite.maxDailySeries int
|
||||
The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -remoteWrite.maxHourlySeries
|
||||
-remoteWrite.maxDiskUsagePerURL size
|
||||
The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. Buffered data is stored in ~500MB chunks, so the minimum practical value for this flag is 500000000. Disk usage is unlimited if the value is set to 0
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
|
||||
-remoteWrite.maxHourlySeries int
|
||||
The maximum number of unique series vmagent can send to remote storage systems during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxDailySeries
|
||||
-remoteWrite.proxyURL array
|
||||
Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
|
@ -50,7 +50,7 @@ var (
|
||||
maxHourlySeries = flag.Int("remoteWrite.maxHourlySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last hour. "+
|
||||
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxDailySeries")
|
||||
maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
|
||||
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxHourlySeries")
|
||||
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -remoteWrite.maxHourlySeries")
|
||||
)
|
||||
|
||||
var rwctxs []*remoteWriteCtx
|
||||
@ -227,13 +227,13 @@ func limitSeriesCardinality(tss []prompbmarshal.TimeSeries) []prompbmarshal.Time
|
||||
labels := tss[i].Labels
|
||||
h := getLabelsHash(labels)
|
||||
if hourlySeriesLimiter != nil && !hourlySeriesLimiter.Add(h) {
|
||||
hourlySeriesLimit.Add(len(tss[i].Samples))
|
||||
logSkippedSeries(labels, "-remoteWrite.maxHourlySeries", *maxHourlySeries)
|
||||
hourlySeriesLimitRowsDropped.Add(len(tss[i].Samples))
|
||||
logSkippedSeries(labels, "-remoteWrite.maxHourlySeries", hourlySeriesLimiter.MaxItems())
|
||||
continue
|
||||
}
|
||||
if dailySeriesLimiter != nil && !dailySeriesLimiter.Add(h) {
|
||||
dailySeriesLimit.Add(len(tss[i].Samples))
|
||||
logSkippedSeries(labels, "-remoteWrite.maxDailySeries", *maxDailySeries)
|
||||
dailySeriesLimitRowsDropped.Add(len(tss[i].Samples))
|
||||
logSkippedSeries(labels, "-remoteWrite.maxDailySeries", dailySeriesLimiter.MaxItems())
|
||||
continue
|
||||
}
|
||||
dst = append(dst, tss[i])
|
||||
@ -245,8 +245,8 @@ var (
|
||||
hourlySeriesLimiter *bloomfilter.Limiter
|
||||
dailySeriesLimiter *bloomfilter.Limiter
|
||||
|
||||
hourlySeriesLimit = metrics.NewCounter(`vmagent_hourly_series_limit_samples_dropped_total`)
|
||||
dailySeriesLimit = metrics.NewCounter(`vmagent_daily_series_limit_samples_dropped_total`)
|
||||
hourlySeriesLimitRowsDropped = metrics.NewCounter(`vmagent_hourly_series_limit_rows_dropped_total`)
|
||||
dailySeriesLimitRowsDropped = metrics.NewCounter(`vmagent_daily_series_limit_rows_dropped_total`)
|
||||
)
|
||||
|
||||
func getLabelsHash(labels []prompbmarshal.Label) uint64 {
|
||||
|
@ -41,6 +41,10 @@ var (
|
||||
denyQueriesOutsideRetention = flag.Bool("denyQueriesOutsideRetention", false, "Whether to deny queries outside of the configured -retentionPeriod. "+
|
||||
"When set, then /api/v1/query_range would return '503 Service Unavailable' error for queries with 'from' value outside -retentionPeriod. "+
|
||||
"This may be useful when multiple data sources with distinct retentions are hidden behind query-tee")
|
||||
maxHourlySeries = flag.Int("storage.maxHourlySeries", 0, "The maximum number of unique series can be added to the storage during the last hour. "+
|
||||
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -storage.maxDailySeries")
|
||||
maxDailySeries = flag.Int("storage.maxDailySeries", 0, "The maximum number of unique series can be added to the storage during the last 24 hours. "+
|
||||
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries")
|
||||
)
|
||||
|
||||
// CheckTimeRange returns true if the given tr is denied for querying.
|
||||
@ -81,7 +85,7 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod)
|
||||
startTime := time.Now()
|
||||
WG = syncwg.WaitGroup{}
|
||||
strg, err := storage.OpenStorage(*DataPath, retentionPeriod.Msecs)
|
||||
strg, err := storage.OpenStorage(*DataPath, retentionPeriod.Msecs, *maxHourlySeries, *maxDailySeries)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot open a storage at %s with -retentionPeriod=%s: %s", *DataPath, retentionPeriod, err)
|
||||
}
|
||||
@ -575,6 +579,13 @@ func registerStorageMetrics() {
|
||||
return float64(m().SlowMetricNameLoads)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_hourly_series_limit_rows_dropped_total`, func() float64 {
|
||||
return float64(m().HourlySeriesLimitRowsDropped)
|
||||
})
|
||||
metrics.NewGauge(`vm_daily_series_limit_rows_dropped_total`, func() float64 {
|
||||
return float64(m().DailySeriesLimitRowsDropped)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_timestamps_blocks_merged_total`, func() float64 {
|
||||
return float64(m().TimestampsBlocksMerged)
|
||||
})
|
||||
|
@ -4,6 +4,7 @@ sort: 15
|
||||
|
||||
# CHANGELOG
|
||||
|
||||
* FEATURE: add ability to limit the number of unique time series, which can be added to storage per hour and per day. This can help dealing with high cardinality and high churn rate issues. See [these docs](https://docs.victoriametrics.com/#cardinality-limiter).
|
||||
* FEATURE: vmagent: add ability to limit the number of unique time series, which can be sent to remote storage systems per hour and per day. This can help dealing with high cardinality and high churn rate issues. See [these docs](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter).
|
||||
* FEATURE: vmalert: add ability to run alerting and recording rules for multiple tenants. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/740) and [these docs](https://docs.victoriametrics.com/vmalert.html#multitenancy).
|
||||
* FEATURE: vminsert: add support for data ingestion via other `vminsert` nodes. This allows building multi-level data ingestion paths in VictoriaMetrics cluster by writing data from one level of `vminsert` nodes to another level of `vminsert` nodes. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup) and [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/541#issuecomment-835487858) for details.
|
||||
|
@ -97,6 +97,7 @@ Alphabetically sorted links to case studies:
|
||||
* [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format).
|
||||
* [Arbitrary CSV data](#how-to-import-csv-data).
|
||||
* Supports metrics' relabeling. See [these docs](#relabeling) for details.
|
||||
* Can deal with high cardinality and high churn rate issues using [series limiter](#cardinality-limiter).
|
||||
* Ideally works with big amounts of time series data from Kubernetes, IoT sensors, connected cars, industrial telemetry, financial data and various Enterprise workloads.
|
||||
* Has open source [cluster version](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/cluster).
|
||||
* See also technical [Articles about VictoriaMetrics](https://docs.victoriametrics.com/Articles.html).
|
||||
@ -158,6 +159,8 @@ Alphabetically sorted links to case studies:
|
||||
* [Security](#security)
|
||||
* [Tuning](#tuning)
|
||||
* [Monitoring](#monitoring)
|
||||
* [TSDB stats](#tsdb-stats)
|
||||
* [Cardinality limiter](#cardinality-limiter)
|
||||
* [Troubleshooting](#troubleshooting)
|
||||
* [Data migration](#data-migration)
|
||||
* [Backfilling](#backfilling)
|
||||
@ -1338,6 +1341,23 @@ VictoriaMetrics returns TSDB stats at `/api/v1/status/tsdb` page in the way simi
|
||||
* `extra_label=LABEL=VALUE`. See [these docs](#prometheus-querying-api-enhancements) for more details.
|
||||
|
||||
|
||||
## Cardinality limiter
|
||||
|
||||
By default VictoriaMetrics doesn't limit the number of stored time series. The limit can be enforced by setting the following command-line flags:
|
||||
|
||||
* `-storage.maxHourlySeries` - limits the number of time series that can be added during the last hour. Useful for limiting the number of active time series.
|
||||
* `-storage.maxDailySeries` - limits the number of time series that can be added during the last day. Useful for limiting daily churn rate.
|
||||
|
||||
Both limits can be set simultaneously. If any of these limits is reached, then incoming samples for new time series are dropped. A sample of dropped series is put in the log with `WARNING` level.
|
||||
|
||||
The exceeded limits can be [monitored](#monitoring) with the following metrics:
|
||||
|
||||
* `vm_hourly_series_limit_rows_dropped_total` - the number of metrics dropped due to exceeded hourly limit on the number of unique time series.
|
||||
* `vm_daily_series_limit_rows_dropped_total` - the number of metrics dropped due to exceeded daily limit on the number of unique time series.
|
||||
|
||||
These limits are approximate, so VictoriaMetrics can underflow/overflow the limit by a small percentage (usually less than 1%).
|
||||
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
* It is recommended to use default command-line flag values (i.e. don't set them explicitly) until the need
|
||||
@ -1811,6 +1831,10 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
||||
authKey, which must be passed in query string to /snapshot* pages
|
||||
-sortLabels
|
||||
Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit
|
||||
-storage.maxDailySeries int
|
||||
The maximum number of unique series can be added to the storage during the last 24 hours. Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries
|
||||
-storage.maxHourlySeries int
|
||||
The maximum number of unique series can be added to the storage during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -storage.maxDailySeries
|
||||
-storageDataPath string
|
||||
Path to storage data (default "victoria-metrics-data")
|
||||
-tls
|
||||
|
@ -329,12 +329,12 @@ By default `vmagent` doesn't limit the number of time series written to remote s
|
||||
* `-remoteWrite.maxHourlySeries` - limits the number of unique time series `vmagent` can write to remote storage systems during the last hour. Useful for limiting the number of active time series.
|
||||
* `-remoteWrite.maxDailySeries` - limits the number of unique time series `vmagent` can write to remote storage systems during the last day. Useful for limiting daily churn rate.
|
||||
|
||||
Both limits can be set simultaneously. It any of these limits is reached, then new time series are dropped before sending the data to remote storage systems. A sample of dropped series is put in the log with `WARNING` level.
|
||||
Both limits can be set simultaneously. If any of these limits is reached, then samples for new time series are dropped instead of sending them to remote storage systems. A sample of dropped series is put in the log with `WARNING` level.
|
||||
|
||||
The exceeded limits can be [monitored](#monitoring) with the following metrics:
|
||||
|
||||
* `vmagent_hourly_series_limit_samples_dropped_total` - the number of metrics dropped due to exceeding hourly limit on the number of unique time series.
|
||||
* `vmagent_daily_series_limit_samples_dropped_total` - the number of metrics dropped due to exceeding daily limit on the number of unique time series.
|
||||
* `vmagent_hourly_series_limit_rows_dropped_total` - the number of metrics dropped due to exceeded hourly limit on the number of unique time series.
|
||||
* `vmagent_daily_series_limit_rows_dropped_total` - the number of metrics dropped due to exceeded daily limit on the number of unique time series.
|
||||
|
||||
These limits are approximate, so `vmagent` can underflow/overflow the limit by a small percentage (usually less than 1%).
|
||||
|
||||
@ -687,9 +687,13 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
||||
-remoteWrite.maxBlockSize size
|
||||
The maximum size in bytes of unpacked request to send to remote storage. It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 8388608)
|
||||
-remoteWrite.maxDailySeries int
|
||||
The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -remoteWrite.maxHourlySeries
|
||||
-remoteWrite.maxDiskUsagePerURL size
|
||||
The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. Buffered data is stored in ~500MB chunks, so the minimum practical value for this flag is 500000000. Disk usage is unlimited if the value is set to 0
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
|
||||
-remoteWrite.maxHourlySeries int
|
||||
The maximum number of unique series vmagent can send to remote storage systems during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxDailySeries
|
||||
-remoteWrite.proxyURL array
|
||||
Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
|
@ -9,12 +9,15 @@ import (
|
||||
//
|
||||
// It is safe using the Limiter from concurrent goroutines.
|
||||
type Limiter struct {
|
||||
v atomic.Value
|
||||
maxItems int
|
||||
v atomic.Value
|
||||
}
|
||||
|
||||
// NewLimiter creates new Limiter, which can hold up to maxItems unique items during the given refreshInterval.
|
||||
func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter {
|
||||
var l Limiter
|
||||
l := &Limiter{
|
||||
maxItems: maxItems,
|
||||
}
|
||||
l.v.Store(newLimiter(maxItems))
|
||||
go func() {
|
||||
for {
|
||||
@ -22,7 +25,12 @@ func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter {
|
||||
l.v.Store(newLimiter(maxItems))
|
||||
}
|
||||
}()
|
||||
return &l
|
||||
return l
|
||||
}
|
||||
|
||||
// MaxItems returns the maxItems passed to NewLimiter.
|
||||
func (l *Limiter) MaxItems() int {
|
||||
return l.maxItems
|
||||
}
|
||||
|
||||
// Add adds h to the limiter.
|
||||
|
@ -16,6 +16,9 @@ func TestLimiter(t *testing.T) {
|
||||
func testLimiter(t *testing.T, maxItems int) {
|
||||
r := rand.New(rand.NewSource(int64(0)))
|
||||
l := NewLimiter(maxItems, time.Hour)
|
||||
if n := l.MaxItems(); n != maxItems {
|
||||
t.Fatalf("unexpected maxItems returned; got %d; want %d", n, maxItems)
|
||||
}
|
||||
items := make(map[uint64]struct{}, maxItems)
|
||||
|
||||
// Populate the l with new items.
|
||||
@ -57,6 +60,9 @@ func TestLimiterConcurrent(t *testing.T) {
|
||||
doneCh := make(chan struct{}, concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
if n := l.MaxItems(); n != maxItems {
|
||||
panic(fmt.Errorf("unexpected maxItems returned; got %d; want %d", n, maxItems))
|
||||
}
|
||||
r := rand.New(rand.NewSource(0))
|
||||
for i := 0; i < maxItems; i++ {
|
||||
h := r.Uint64()
|
||||
|
@ -72,7 +72,7 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
|
||||
|
||||
func TestSearch(t *testing.T) {
|
||||
path := "TestSearch"
|
||||
st, err := OpenStorage(path, 0)
|
||||
st, err := OpenStorage(path, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage %q: %s", path, err)
|
||||
}
|
||||
@ -121,7 +121,7 @@ func TestSearch(t *testing.T) {
|
||||
|
||||
// Re-open the storage in order to flush all the pending cached data.
|
||||
st.MustClose()
|
||||
st, err = OpenStorage(path, 0)
|
||||
st, err = OpenStorage(path, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot re-open storage %q: %s", path, err)
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
@ -52,6 +53,9 @@ type Storage struct {
|
||||
slowPerDayIndexInserts uint64
|
||||
slowMetricNameLoads uint64
|
||||
|
||||
hourlySeriesLimitRowsDropped uint64
|
||||
dailySeriesLimitRowsDropped uint64
|
||||
|
||||
path string
|
||||
cachePath string
|
||||
retentionMsecs int64
|
||||
@ -63,6 +67,10 @@ type Storage struct {
|
||||
|
||||
tb *table
|
||||
|
||||
// Series cardinality limiters.
|
||||
hourlySeriesLimiter *bloomfilter.Limiter
|
||||
dailySeriesLimiter *bloomfilter.Limiter
|
||||
|
||||
// tsidCache is MetricName -> TSID cache.
|
||||
tsidCache *workingsetcache.Cache
|
||||
|
||||
@ -115,7 +123,7 @@ type Storage struct {
|
||||
}
|
||||
|
||||
// OpenStorage opens storage on the given path with the given retentionMsecs.
|
||||
func OpenStorage(path string, retentionMsecs int64) (*Storage, error) {
|
||||
func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySeries int) (*Storage, error) {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot determine absolute path for %q: %w", path, err)
|
||||
@ -150,6 +158,14 @@ func OpenStorage(path string, retentionMsecs int64) (*Storage, error) {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", snapshotsPath, err)
|
||||
}
|
||||
|
||||
// Initialize series cardinality limiter.
|
||||
if maxHourlySeries > 0 {
|
||||
s.hourlySeriesLimiter = bloomfilter.NewLimiter(maxHourlySeries, time.Hour)
|
||||
}
|
||||
if maxDailySeries > 0 {
|
||||
s.dailySeriesLimiter = bloomfilter.NewLimiter(maxDailySeries, 24*time.Hour)
|
||||
}
|
||||
|
||||
// Load caches.
|
||||
mem := memory.Allowed()
|
||||
s.tsidCache = s.mustLoadCache("MetricName->TSID", "metricName_tsid", mem/3)
|
||||
@ -362,6 +378,9 @@ type Metrics struct {
|
||||
SlowPerDayIndexInserts uint64
|
||||
SlowMetricNameLoads uint64
|
||||
|
||||
HourlySeriesLimitRowsDropped uint64
|
||||
DailySeriesLimitRowsDropped uint64
|
||||
|
||||
TimestampsBlocksMerged uint64
|
||||
TimestampsBytesSaved uint64
|
||||
|
||||
@ -431,6 +450,9 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
||||
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
|
||||
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
|
||||
|
||||
m.HourlySeriesLimitRowsDropped += atomic.LoadUint64(&s.hourlySeriesLimitRowsDropped)
|
||||
m.DailySeriesLimitRowsDropped += atomic.LoadUint64(&s.dailySeriesLimitRowsDropped)
|
||||
|
||||
m.TimestampsBlocksMerged = atomic.LoadUint64(×tampsBlocksMerged)
|
||||
m.TimestampsBytesSaved = atomic.LoadUint64(×tampsBytesSaved)
|
||||
|
||||
@ -1485,6 +1507,11 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
||||
continue
|
||||
}
|
||||
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
|
||||
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
|
||||
// Skip the row, since the limit on the number of unique series has been exceeded.
|
||||
j--
|
||||
continue
|
||||
}
|
||||
// Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted.
|
||||
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
|
||||
// contain MetricName->TSID entries for deleted time series.
|
||||
@ -1544,6 +1571,11 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
||||
j--
|
||||
continue
|
||||
}
|
||||
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
|
||||
// Skip the row, since the limit on the number of unique series has been exceeded.
|
||||
j--
|
||||
continue
|
||||
}
|
||||
s.putTSIDToCache(&r.TSID, mr.MetricNameRaw)
|
||||
prevTSID = r.TSID
|
||||
prevMetricNameRaw = mr.MetricNameRaw
|
||||
@ -1570,6 +1602,30 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
func (s *Storage) isSeriesCardinalityExceeded(metricID uint64, metricNameRaw []byte) bool {
|
||||
if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) {
|
||||
atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1)
|
||||
logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems())
|
||||
return true
|
||||
}
|
||||
if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) {
|
||||
atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1)
|
||||
logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems())
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func logSkippedSeries(metricNameRaw []byte, flagName string, flagValue int) {
|
||||
select {
|
||||
case <-logSkippedSeriesTicker.C:
|
||||
logger.Warnf("skip series %s because %s=%d reached", getUserReadableMetricName(metricNameRaw), flagName, flagValue)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
var logSkippedSeriesTicker = time.NewTicker(5 * time.Second)
|
||||
|
||||
func getUserReadableMetricName(metricNameRaw []byte) string {
|
||||
var mn MetricName
|
||||
if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
|
||||
|
@ -348,7 +348,7 @@ func TestNextRetentionDuration(t *testing.T) {
|
||||
func TestStorageOpenClose(t *testing.T) {
|
||||
path := "TestStorageOpenClose"
|
||||
for i := 0; i < 10; i++ {
|
||||
s, err := OpenStorage(path, -1)
|
||||
s, err := OpenStorage(path, -1, 1e5, 1e6)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
@ -361,13 +361,13 @@ func TestStorageOpenClose(t *testing.T) {
|
||||
|
||||
func TestStorageOpenMultipleTimes(t *testing.T) {
|
||||
path := "TestStorageOpenMultipleTimes"
|
||||
s1, err := OpenStorage(path, -1)
|
||||
s1, err := OpenStorage(path, -1, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage the first time: %s", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
s2, err := OpenStorage(path, -1)
|
||||
s2, err := OpenStorage(path, -1, 0, 0)
|
||||
if err == nil {
|
||||
s2.MustClose()
|
||||
t.Fatalf("expecting non-nil error when opening already opened storage")
|
||||
@ -382,7 +382,7 @@ func TestStorageOpenMultipleTimes(t *testing.T) {
|
||||
func TestStorageRandTimestamps(t *testing.T) {
|
||||
path := "TestStorageRandTimestamps"
|
||||
retentionMsecs := int64(60 * msecsPerMonth)
|
||||
s, err := OpenStorage(path, retentionMsecs)
|
||||
s, err := OpenStorage(path, retentionMsecs, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
@ -392,7 +392,7 @@ func TestStorageRandTimestamps(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s.MustClose()
|
||||
s, err = OpenStorage(path, retentionMsecs)
|
||||
s, err = OpenStorage(path, retentionMsecs, 0, 0)
|
||||
}
|
||||
})
|
||||
t.Run("concurrent", func(t *testing.T) {
|
||||
@ -475,7 +475,7 @@ func testStorageRandTimestamps(s *Storage) error {
|
||||
|
||||
func TestStorageDeleteMetrics(t *testing.T) {
|
||||
path := "TestStorageDeleteMetrics"
|
||||
s, err := OpenStorage(path, 0)
|
||||
s, err := OpenStorage(path, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
@ -498,7 +498,7 @@ func TestStorageDeleteMetrics(t *testing.T) {
|
||||
// Re-open the storage in order to check how deleted metricIDs
|
||||
// are persisted.
|
||||
s.MustClose()
|
||||
s, err = OpenStorage(path, 0)
|
||||
s, err = OpenStorage(path, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage after closing on iteration %d: %s", i, err)
|
||||
}
|
||||
@ -694,7 +694,7 @@ func checkTagKeys(tks []string, tksExpected map[string]bool) error {
|
||||
|
||||
func TestStorageRegisterMetricNamesSerial(t *testing.T) {
|
||||
path := "TestStorageRegisterMetricNamesSerial"
|
||||
s, err := OpenStorage(path, 0)
|
||||
s, err := OpenStorage(path, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
@ -709,7 +709,7 @@ func TestStorageRegisterMetricNamesSerial(t *testing.T) {
|
||||
|
||||
func TestStorageRegisterMetricNamesConcurrent(t *testing.T) {
|
||||
path := "TestStorageRegisterMetricNamesConcurrent"
|
||||
s, err := OpenStorage(path, 0)
|
||||
s, err := OpenStorage(path, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
@ -855,7 +855,7 @@ func testStorageRegisterMetricNames(s *Storage) error {
|
||||
|
||||
func TestStorageAddRowsSerial(t *testing.T) {
|
||||
path := "TestStorageAddRowsSerial"
|
||||
s, err := OpenStorage(path, 0)
|
||||
s, err := OpenStorage(path, 0, 1e5, 1e5)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
@ -870,7 +870,7 @@ func TestStorageAddRowsSerial(t *testing.T) {
|
||||
|
||||
func TestStorageAddRowsConcurrent(t *testing.T) {
|
||||
path := "TestStorageAddRowsConcurrent"
|
||||
s, err := OpenStorage(path, 0)
|
||||
s, err := OpenStorage(path, 0, 1e5, 1e5)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
@ -950,7 +950,7 @@ func testStorageAddRows(s *Storage) error {
|
||||
|
||||
// Try opening the storage from snapshot.
|
||||
snapshotPath := s.path + "/snapshots/" + snapshotName
|
||||
s1, err := OpenStorage(snapshotPath, 0)
|
||||
s1, err := OpenStorage(snapshotPath, 0, 0, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open storage from snapshot: %w", err)
|
||||
}
|
||||
@ -997,7 +997,7 @@ func testStorageAddRows(s *Storage) error {
|
||||
|
||||
func TestStorageRotateIndexDB(t *testing.T) {
|
||||
path := "TestStorageRotateIndexDB"
|
||||
s, err := OpenStorage(path, 0)
|
||||
s, err := OpenStorage(path, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open storage: %s", err)
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ func BenchmarkStorageAddRows(b *testing.B) {
|
||||
|
||||
func benchmarkStorageAddRows(b *testing.B, rowsPerBatch int) {
|
||||
path := fmt.Sprintf("BenchmarkStorageAddRows_%d", rowsPerBatch)
|
||||
s, err := OpenStorage(path, 0)
|
||||
s, err := OpenStorage(path, 0, 0, 0)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open storage at %q: %s", path, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user