From 0bff96fe4bc3e2c9716a2eaf3c59a8bb15b5aeb6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 5 Jul 2020 19:37:38 +0300 Subject: [PATCH] lib/storage: prioritize data ingestion over heavy queries Heavy queries could result in the lack of CPU resources for processing the current data ingestion stream. Prevent this by delaying queries' execution until free resources are available for data ingestion. Expose `vm_search_delays_total` metric, which may be used in for alerting when there is no enough CPU resources for data ingestion and/or for executing heavy queries. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 --- app/vmstorage/main.go | 4 ++++ docs/Single-server-VictoriaMetrics.md | 4 +++- lib/storage/storage.go | 28 +++++++++++++++++++++++++-- 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index eb1cd85506..f0b3149885 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -360,6 +360,10 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(m().AddRowsConcurrencyCurrent) }) + metrics.NewGauge(`vm_search_delays_total`, func() float64 { + return float64(m().SearchDelays) + }) + metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 { return float64(m().SlowRowInserts) }) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index e15addbf51..9c991b80f2 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -787,6 +787,8 @@ The required resources for query path: The higher number of scanned time series and lower `step` argument results in the higher RAM usage. * CPU cores: a CPU core per 30 millions of scanned data points per second. + This means that heavy queries that touch big number of time series (over 10K) and/or big number data points (over 100M) + usually require more CPU resources than tiny queries that touch a few time series with small number of data points. * Network usage: depends on the frequency and the type of incoming requests. Typical Grafana dashboards usually require negligible network bandwidth. @@ -975,7 +977,7 @@ The most interesting metrics are: of tweaking these flag values arises. * It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases), - since the issue could be already fixed there. + since the encountered issue could be already fixed there. * If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second, then it is likely you have too many active time series for the current amount of RAM. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 9d33d890ff..d13c093e3d 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -342,6 +342,8 @@ type Metrics struct { AddRowsConcurrencyCapacity uint64 AddRowsConcurrencyCurrent uint64 + SearchDelays uint64 + SlowRowInserts uint64 SlowPerDayIndexInserts uint64 SlowMetricNameLoads uint64 @@ -400,6 +402,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh)) m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh)) + m.SearchDelays += atomic.LoadUint64(&searchDelays) + m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads) @@ -856,8 +860,26 @@ func nextRetentionDuration(retentionMonths int) time.Duration { return deadline.Sub(t) } +var ( + searchTSIDsCondLock sync.Mutex + searchTSIDsCond = sync.NewCond(&searchTSIDsCondLock) + + searchDelays uint64 +) + // searchTSIDs returns sorted TSIDs for the given tfss and the given tr. func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { + // Make sure that there are enough resources for processing the ingested data via Storage.AddRows + // before starting the query. + // This should prevent from data ingestion starvation when provessing heavy queries. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 . + searchTSIDsCondLock.Lock() + for len(addRowsConcurrencyCh) >= cap(addRowsConcurrencyCh) { + atomic.AddUint64(&searchDelays, 1) + searchTSIDsCond.Wait() + } + searchTSIDsCondLock.Unlock() + // Do not cache tfss -> tsids here, since the caching is performed // on idb level. tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics) @@ -1069,7 +1091,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { // goroutines call AddRows. select { case addRowsConcurrencyCh <- struct{}{}: - defer func() { <-addRowsConcurrencyCh }() default: // Sleep for a while until giving up atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1) @@ -1077,7 +1098,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { select { case addRowsConcurrencyCh <- struct{}{}: timerpool.Put(t) - defer func() { <-addRowsConcurrencyCh }() case <-t.C: timerpool.Put(t) atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) @@ -1093,6 +1113,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { rr.rows, err = s.add(rr.rows, mrs, precisionBits) putRawRows(rr) + // Notify blocked goroutines at Storage.searchTSIDs that they may proceed with their work. + <-addRowsConcurrencyCh + searchTSIDsCond.Signal() + return err }