mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
app/{vminsert,vmstorage}: follow-up after a171916ef5
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/269
This commit is contained in:
parent
4290b46e8c
commit
cf5cbd1c70
@ -1,6 +1,7 @@
|
||||
package vmstorage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
@ -46,8 +47,7 @@ var (
|
||||
maxDailySeries = flag.Int("storage.maxDailySeries", 0, "The maximum number of unique series can be added to the storage during the last 24 hours. "+
|
||||
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries")
|
||||
|
||||
minFreeDiskSpaceSizeBytes = flagutil.NewBytes("storage.minFreeDiskSpaceSize", 0, "Defines minimum free disk space size for storageDataPath. "+
|
||||
"If limit is reached, storage becomes read-only and tells vminsert to reroute data for other storage nodes.")
|
||||
minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which the storage stops accepting new data")
|
||||
)
|
||||
|
||||
// CheckTimeRange returns true if the given tr is denied for querying.
|
||||
@ -84,7 +84,7 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
|
||||
storage.SetFinalMergeDelay(*finalMergeDelay)
|
||||
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
|
||||
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
|
||||
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceSizeBytes.N)
|
||||
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N)
|
||||
|
||||
logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod)
|
||||
startTime := time.Now()
|
||||
@ -122,6 +122,9 @@ var resetResponseCacheIfNeeded func(mrs []storage.MetricRow)
|
||||
|
||||
// AddRows adds mrs to the storage.
|
||||
func AddRows(mrs []storage.MetricRow) error {
|
||||
if Storage.IsReadOnly() {
|
||||
return errReadOnly
|
||||
}
|
||||
resetResponseCacheIfNeeded(mrs)
|
||||
WG.Add(1)
|
||||
err := Storage.AddRows(mrs, uint8(*precisionBits))
|
||||
@ -129,6 +132,8 @@ func AddRows(mrs []storage.MetricRow) error {
|
||||
return err
|
||||
}
|
||||
|
||||
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
|
||||
|
||||
// RegisterMetricNames registers all the metrics from mrs in the storage.
|
||||
func RegisterMetricNames(mrs []storage.MetricRow) error {
|
||||
WG.Add(1)
|
||||
@ -392,13 +397,11 @@ func registerStorageMetrics() {
|
||||
metrics.NewGauge(fmt.Sprintf(`vm_free_disk_space_bytes{path=%q}`, *DataPath), func() float64 {
|
||||
return float64(fs.MustGetFreeSpace(*DataPath))
|
||||
})
|
||||
|
||||
metrics.NewGauge(fmt.Sprintf(`vm_free_disk_space_limit_bytes{path=%q}`, *storageDataPath), func() float64 {
|
||||
return float64(minFreeDiskSpaceSizeBytes.N)
|
||||
metrics.NewGauge(fmt.Sprintf(`vm_free_disk_space_limit_bytes{path=%q}`, *DataPath), func() float64 {
|
||||
return float64(minFreeDiskSpaceBytes.N)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_storage_read_only`, func() float64 {
|
||||
if strg.IsReadOnly() {
|
||||
metrics.NewGauge(fmt.Sprintf(`vm_storage_is_read_only{path=%q}`, *DataPath), func() float64 {
|
||||
if Storage.IsReadOnly() {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
|
@ -8,6 +8,7 @@ sort: 15
|
||||
|
||||
* FEATURE: add ability to accept metrics from [DataDog agent](https://docs.datadoghq.com/agent/) and [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent). This option simplifies the migration path from DataDog to VictoriaMetrics. See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/206).
|
||||
* FEATURE: vmagent [enterprise](https://victoriametrics.com/enterprise.html): add support for data reading and writing from/to [Apache Kafka](https://kafka.apache.org/). See [these docs](https://docs.victoriametrics.com/vmagent.html#kafka-integration).
|
||||
* FEATURE: vmstorage: stop accepting new data if `-storageDataPath` directory contains less than `-storage.minFreeDiskSpaceBytes` of free space. This should prevent from `out of disk space` crashes. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/269).
|
||||
* FEATURE: calculate quantiles in the same way as Prometheus does in such functions as [quantile_over_time](https://docs.victoriametrics.com/MetricsQL.html#quantile_over_time) and [quantile](https://docs.victoriametrics.com/MetricsQL.html#quantile). Previously results from VictoriaMetrics could be slightly different than results from Prometheus. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1625) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612) issues.
|
||||
* FEATURE: add `rollup_scrape_interval(m[d])` function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html), which returns `min`, `max` and `avg` values for the interval between samples for `m` on the given lookbehind window `d`.
|
||||
* FEATURE: add `topk_last(k, q)` and `bottomk_last(k, q)` functions to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html), which return up to `k` time series from `q` with the maximum / minimum last value on the graph.
|
||||
|
@ -183,6 +183,10 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
|
||||
|
||||
It is recommended setting up alerts in [vmalert](https://docs.victoriametrics.com/vmalert.html) or in Prometheus from [this config](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/cluster/deployment/docker/alerts.yml).
|
||||
|
||||
## Readonly mode
|
||||
|
||||
`vmstorage` nodes automatically switch to readonly mode when the directory pointed by `-storageDataPath` contains less than `-storage.minFreeDiskSpaceBytes` of free space. `vminsert` nodes stop sending data to such nodes and start re-routing the data to the remaining `vmstorage` nodes.
|
||||
|
||||
|
||||
|
||||
## URL format
|
||||
@ -781,6 +785,9 @@ Below is the output for `/path/to/vmstorage -help`:
|
||||
The maximum number of unique series can be added to the storage during the last 24 hours. Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries
|
||||
-storage.maxHourlySeries int
|
||||
The maximum number of unique series can be added to the storage during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -storage.maxDailySeries
|
||||
-storage.minFreeDiskSpaceBytes size
|
||||
The minimum free disk space at -storageDataPath after which the storage stops accepting new data
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 10000000)
|
||||
-storageDataPath string
|
||||
Path to storage data (default "vmstorage-data")
|
||||
-tls
|
||||
|
@ -120,7 +120,7 @@ type Storage struct {
|
||||
currHourMetricIDsUpdaterWG sync.WaitGroup
|
||||
nextDayMetricIDsUpdaterWG sync.WaitGroup
|
||||
retentionWatcherWG sync.WaitGroup
|
||||
freeSpaceWatcherWG sync.WaitGroup
|
||||
freeDiskSpaceWatcherWG sync.WaitGroup
|
||||
|
||||
// The snapshotLock prevents from concurrent creation of snapshots,
|
||||
// since this may result in snapshots without recently added data,
|
||||
@ -562,17 +562,15 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
||||
s.tb.UpdateMetrics(&m.TableMetrics)
|
||||
}
|
||||
|
||||
var (
|
||||
storageFreeSpaceLimitBytes uint64
|
||||
)
|
||||
|
||||
// SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path
|
||||
//
|
||||
// The function must be called before opening or creating any storage.
|
||||
func SetFreeDiskSpaceLimit(bytes int) {
|
||||
storageFreeSpaceLimitBytes = uint64(bytes)
|
||||
freeDiskSpaceLimitBytes = uint64(bytes)
|
||||
}
|
||||
|
||||
var freeDiskSpaceLimitBytes uint64
|
||||
|
||||
// IsReadOnly returns information is storage in read only mode
|
||||
func (s *Storage) IsReadOnly() bool {
|
||||
return atomic.LoadUint32(&s.isReadOnly) == 1
|
||||
@ -581,28 +579,24 @@ func (s *Storage) IsReadOnly() bool {
|
||||
func (s *Storage) startFreeDiskSpaceWatcher() {
|
||||
f := func() {
|
||||
freeSpaceBytes := fs.MustGetFreeSpace(s.path)
|
||||
// not enough free space
|
||||
if freeSpaceBytes < storageFreeSpaceLimitBytes {
|
||||
if freeSpaceBytes < freeDiskSpaceLimitBytes {
|
||||
// Switch the storage to readonly mode if there is no enough free space left at s.path
|
||||
atomic.StoreUint32(&s.isReadOnly, 1)
|
||||
return
|
||||
}
|
||||
atomic.StoreUint32(&s.isReadOnly, 0)
|
||||
}
|
||||
f()
|
||||
s.freeSpaceWatcherWG.Add(1)
|
||||
s.freeDiskSpaceWatcherWG.Add(1)
|
||||
go func() {
|
||||
defer s.freeSpaceWatcherWG.Done()
|
||||
// zero value disables limit.
|
||||
if storageFreeSpaceLimitBytes == 0 {
|
||||
return
|
||||
}
|
||||
t := time.NewTicker(time.Minute)
|
||||
defer t.Stop()
|
||||
defer s.freeDiskSpaceWatcherWG.Done()
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.stop:
|
||||
return
|
||||
case <-t.C:
|
||||
case <-ticker.C:
|
||||
f()
|
||||
}
|
||||
}
|
||||
@ -723,7 +717,7 @@ func (s *Storage) resetAndSaveTSIDCache() {
|
||||
func (s *Storage) MustClose() {
|
||||
close(s.stop)
|
||||
|
||||
s.freeSpaceWatcherWG.Wait()
|
||||
s.freeDiskSpaceWatcherWG.Wait()
|
||||
s.retentionWatcherWG.Wait()
|
||||
s.currHourMetricIDsUpdaterWG.Wait()
|
||||
s.nextDayMetricIDsUpdaterWG.Wait()
|
||||
|
Loading…
Reference in New Issue
Block a user