lib/storage: prioritize data ingestion over heavy queries

Heavy queries could result in the lack of CPU resources for processing the current data ingestion stream.
Prevent this by delaying queries' execution until free resources are available for data ingestion.

Expose `vm_search_delays_total` metric, which may be used in for alerting when there is no enough CPU resources
for data ingestion and/or for executing heavy queries.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291
This commit is contained in:
Aliaksandr Valialkin 2020-07-05 19:37:38 +03:00
parent 9afd19d375
commit 0bff96fe4b
3 changed files with 33 additions and 3 deletions

View File

@ -360,6 +360,10 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(m().AddRowsConcurrencyCurrent)
})
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
return float64(m().SearchDelays)
})
metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
return float64(m().SlowRowInserts)
})

View File

@ -787,6 +787,8 @@ The required resources for query path:
The higher number of scanned time series and lower `step` argument results in the higher RAM usage.
* CPU cores: a CPU core per 30 millions of scanned data points per second.
This means that heavy queries that touch big number of time series (over 10K) and/or big number data points (over 100M)
usually require more CPU resources than tiny queries that touch a few time series with small number of data points.
* Network usage: depends on the frequency and the type of incoming requests. Typical Grafana dashboards usually
require negligible network bandwidth.
@ -975,7 +977,7 @@ The most interesting metrics are:
of tweaking these flag values arises.
* It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases),
since the issue could be already fixed there.
since the encountered issue could be already fixed there.
* If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second,
then it is likely you have too many active time series for the current amount of RAM.

View File

@ -342,6 +342,8 @@ type Metrics struct {
AddRowsConcurrencyCapacity uint64
AddRowsConcurrencyCurrent uint64
SearchDelays uint64
SlowRowInserts uint64
SlowPerDayIndexInserts uint64
SlowMetricNameLoads uint64
@ -400,6 +402,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh))
m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh))
m.SearchDelays += atomic.LoadUint64(&searchDelays)
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
@ -856,8 +860,26 @@ func nextRetentionDuration(retentionMonths int) time.Duration {
return deadline.Sub(t)
}
var (
searchTSIDsCondLock sync.Mutex
searchTSIDsCond = sync.NewCond(&searchTSIDsCondLock)
searchDelays uint64
)
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
// Make sure that there are enough resources for processing the ingested data via Storage.AddRows
// before starting the query.
// This should prevent from data ingestion starvation when provessing heavy queries.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 .
searchTSIDsCondLock.Lock()
for len(addRowsConcurrencyCh) >= cap(addRowsConcurrencyCh) {
atomic.AddUint64(&searchDelays, 1)
searchTSIDsCond.Wait()
}
searchTSIDsCondLock.Unlock()
// Do not cache tfss -> tsids here, since the caching is performed
// on idb level.
tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics)
@ -1069,7 +1091,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
// goroutines call AddRows.
select {
case addRowsConcurrencyCh <- struct{}{}:
defer func() { <-addRowsConcurrencyCh }()
default:
// Sleep for a while until giving up
atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1)
@ -1077,7 +1098,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
select {
case addRowsConcurrencyCh <- struct{}{}:
timerpool.Put(t)
defer func() { <-addRowsConcurrencyCh }()
case <-t.C:
timerpool.Put(t)
atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1)
@ -1093,6 +1113,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
rr.rows, err = s.add(rr.rows, mrs, precisionBits)
putRawRows(rr)
// Notify blocked goroutines at Storage.searchTSIDs that they may proceed with their work.
<-addRowsConcurrencyCh
searchTSIDsCond.Signal()
return err
}