From 8f16388428614849d3ffa8f41b11bf20da629c92 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 5 Aug 2020 18:24:51 +0300 Subject: [PATCH] lib/storage: limit the number of concurrent calls to storage.searchTSIDs to GOMAXPROCS*2 This should limit the maximum memory usage and reduce CPU trashing on vmstorage when multiple heavy queries are executed. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648 --- app/vmstorage/main.go | 13 +++++++++++ lib/storage/storage.go | 49 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 81027eb0a..f4e24cf5a 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -429,6 +429,19 @@ func registerStorageMetrics() { return float64(m().AddRowsConcurrencyCurrent) }) + metrics.NewGauge(`vm_concurrent_search_tsids_limit_reached_total`, func() float64 { + return float64(m().SearchTSIDsConcurrencyLimitReached) + }) + metrics.NewGauge(`vm_concurrent_search_tsids_limit_timeout_total`, func() float64 { + return float64(m().SearchTSIDsConcurrencyLimitTimeout) + }) + metrics.NewGauge(`vm_concurrent_search_tsids_capacity`, func() float64 { + return float64(m().SearchTSIDsConcurrencyCapacity) + }) + metrics.NewGauge(`vm_concurrent_search_tsids_current`, func() float64 { + return float64(m().SearchTSIDsConcurrencyCurrent) + }) + metrics.NewGauge(`vm_search_delays_total`, func() float64 { return float64(m().SearchDelays) }) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index ee590873d..795076dc5 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -40,6 +40,9 @@ type Storage struct { addRowsConcurrencyLimitTimeout uint64 addRowsConcurrencyDroppedRows uint64 + searchTSIDsConcurrencyLimitReached uint64 + searchTSIDsConcurrencyLimitTimeout uint64 + slowRowInserts uint64 slowPerDayIndexInserts uint64 slowMetricNameLoads uint64 @@ -328,6 +331,11 @@ type Metrics struct { AddRowsConcurrencyCapacity uint64 AddRowsConcurrencyCurrent uint64 + SearchTSIDsConcurrencyLimitReached uint64 + SearchTSIDsConcurrencyLimitTimeout uint64 + SearchTSIDsConcurrencyCapacity uint64 + SearchTSIDsConcurrencyCurrent uint64 + SearchDelays uint64 SlowRowInserts uint64 @@ -388,6 +396,11 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh)) m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh)) + m.SearchTSIDsConcurrencyLimitReached += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitReached) + m.SearchTSIDsConcurrencyLimitTimeout += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitTimeout) + m.SearchTSIDsConcurrencyCapacity = uint64(cap(searchTSIDsConcurrencyCh)) + m.SearchTSIDsConcurrencyCurrent = uint64(len(searchTSIDsConcurrencyCh)) + m.SearchDelays = storagepacelimiter.Search.DelaysTotal() m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) @@ -802,13 +815,47 @@ func nextRetentionDuration(retentionMonths int) time.Duration { func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) { // Do not cache tfss -> tsids here, since the caching is performed // on idb level. + + // Limit the number of concurrent goroutines that may search TSIDS in the storage. + // This should prevent from out of memory errors and CPU trashing when too many + // goroutines call searchTSIDs. + select { + case searchTSIDsConcurrencyCh <- struct{}{}: + default: + // Sleep for a while until giving up + atomic.AddUint64(&s.searchTSIDsConcurrencyLimitReached, 1) + currentTime := fasttime.UnixTimestamp() + timeoutSecs := uint64(0) + if currentTime < deadline { + timeoutSecs = deadline - currentTime + } + timeout := time.Second * time.Duration(timeoutSecs) + t := timerpool.Get(timeout) + select { + case searchTSIDsConcurrencyCh <- struct{}{}: + timerpool.Put(t) + case <-t.C: + timerpool.Put(t) + atomic.AddUint64(&s.searchTSIDsConcurrencyLimitTimeout, 1) + return nil, fmt.Errorf("cannot search for tsids, since more than %d concurrent searches are performed during %.3f secs; add more CPUs or reduce query load", + cap(searchTSIDsConcurrencyCh), timeout.Seconds()) + } + } tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics, deadline) + <-searchTSIDsConcurrencyCh if err != nil { return nil, fmt.Errorf("error when searching tsids for tfss %q: %w", tfss, err) } return tsids, nil } +var ( + // Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation + // is CPU bound and sometimes disk IO bound, so there is no sense in running more + // than GOMAXPROCS*2 concurrent goroutines for TSID searches. + searchTSIDsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1)*2) +) + // prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache. // // This should speed-up further searchMetricName calls for metricIDs from tsids. @@ -1023,7 +1070,7 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { timerpool.Put(t) atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs))) - return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load", + return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load", len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh)) } }