lib/storage: prioritize data ingestion over heavy queries

Heavy queries could result in the lack of CPU resources for processing the current data ingestion stream.
Prevent this by delaying queries' execution until free resources are available for data ingestion.

Expose `vm_search_delays_total` metric, which may be used in for alerting when there is no enough CPU resources
for data ingestion and/or for executing heavy queries.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291
This commit is contained in:
Aliaksandr Valialkin 2020-07-05 19:37:38 +03:00
parent 703def4b2e
commit 6daa5f7500
4 changed files with 36 additions and 4 deletions

View File

@ -787,6 +787,8 @@ The required resources for query path:
The higher number of scanned time series and lower `step` argument results in the higher RAM usage. The higher number of scanned time series and lower `step` argument results in the higher RAM usage.
* CPU cores: a CPU core per 30 millions of scanned data points per second. * CPU cores: a CPU core per 30 millions of scanned data points per second.
This means that heavy queries that touch big number of time series (over 10K) and/or big number data points (over 100M)
usually require more CPU resources than tiny queries that touch a few time series with small number of data points.
* Network usage: depends on the frequency and the type of incoming requests. Typical Grafana dashboards usually * Network usage: depends on the frequency and the type of incoming requests. Typical Grafana dashboards usually
require negligible network bandwidth. require negligible network bandwidth.
@ -975,7 +977,7 @@ The most interesting metrics are:
of tweaking these flag values arises. of tweaking these flag values arises.
* It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases), * It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases),
since the issue could be already fixed there. since the encountered issue could be already fixed there.
* If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second, * If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second,
then it is likely you have too many active time series for the current amount of RAM. then it is likely you have too many active time series for the current amount of RAM.

View File

@ -429,6 +429,10 @@ func registerStorageMetrics() {
return float64(m().AddRowsConcurrencyCurrent) return float64(m().AddRowsConcurrencyCurrent)
}) })
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
return float64(m().SearchDelays)
})
metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 { metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
return float64(m().SlowRowInserts) return float64(m().SlowRowInserts)
}) })

View File

@ -787,6 +787,8 @@ The required resources for query path:
The higher number of scanned time series and lower `step` argument results in the higher RAM usage. The higher number of scanned time series and lower `step` argument results in the higher RAM usage.
* CPU cores: a CPU core per 30 millions of scanned data points per second. * CPU cores: a CPU core per 30 millions of scanned data points per second.
This means that heavy queries that touch big number of time series (over 10K) and/or big number data points (over 100M)
usually require more CPU resources than tiny queries that touch a few time series with small number of data points.
* Network usage: depends on the frequency and the type of incoming requests. Typical Grafana dashboards usually * Network usage: depends on the frequency and the type of incoming requests. Typical Grafana dashboards usually
require negligible network bandwidth. require negligible network bandwidth.
@ -975,7 +977,7 @@ The most interesting metrics are:
of tweaking these flag values arises. of tweaking these flag values arises.
* It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases), * It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases),
since the issue could be already fixed there. since the encountered issue could be already fixed there.
* If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second, * If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second,
then it is likely you have too many active time series for the current amount of RAM. then it is likely you have too many active time series for the current amount of RAM.

View File

@ -327,6 +327,8 @@ type Metrics struct {
AddRowsConcurrencyCapacity uint64 AddRowsConcurrencyCapacity uint64
AddRowsConcurrencyCurrent uint64 AddRowsConcurrencyCurrent uint64
SearchDelays uint64
SlowRowInserts uint64 SlowRowInserts uint64
SlowPerDayIndexInserts uint64 SlowPerDayIndexInserts uint64
SlowMetricNameLoads uint64 SlowMetricNameLoads uint64
@ -385,6 +387,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh)) m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh))
m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh)) m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh))
m.SearchDelays += atomic.LoadUint64(&searchDelays)
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads) m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
@ -793,8 +797,26 @@ func nextRetentionDuration(retentionMonths int) time.Duration {
return deadline.Sub(t) return deadline.Sub(t)
} }
var (
searchTSIDsCondLock sync.Mutex
searchTSIDsCond = sync.NewCond(&searchTSIDsCondLock)
searchDelays uint64
)
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr. // searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
// Make sure that there are enough resources for processing the ingested data via Storage.AddRows
// before starting the query.
// This should prevent from data ingestion starvation when provessing heavy queries.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 .
searchTSIDsCondLock.Lock()
for len(addRowsConcurrencyCh) >= cap(addRowsConcurrencyCh) {
atomic.AddUint64(&searchDelays, 1)
searchTSIDsCond.Wait()
}
searchTSIDsCondLock.Unlock()
// Do not cache tfss -> tsids here, since the caching is performed // Do not cache tfss -> tsids here, since the caching is performed
// on idb level. // on idb level.
tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics) tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics)
@ -998,7 +1020,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
// goroutines call AddRows. // goroutines call AddRows.
select { select {
case addRowsConcurrencyCh <- struct{}{}: case addRowsConcurrencyCh <- struct{}{}:
defer func() { <-addRowsConcurrencyCh }()
default: default:
// Sleep for a while until giving up // Sleep for a while until giving up
atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1) atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1)
@ -1006,7 +1027,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
select { select {
case addRowsConcurrencyCh <- struct{}{}: case addRowsConcurrencyCh <- struct{}{}:
timerpool.Put(t) timerpool.Put(t)
defer func() { <-addRowsConcurrencyCh }()
case <-t.C: case <-t.C:
timerpool.Put(t) timerpool.Put(t)
atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1)
@ -1022,6 +1042,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
rr.rows, err = s.add(rr.rows, mrs, precisionBits) rr.rows, err = s.add(rr.rows, mrs, precisionBits)
putRawRows(rr) putRawRows(rr)
// Notify blocked goroutines at Storage.searchTSIDs that they may proceed with their work.
<-addRowsConcurrencyCh
searchTSIDsCond.Signal()
return err return err
} }