mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
lib/storage: limit the number of concurrent calls to storage.searchTSIDs to GOMAXPROCS*2
This should limit the maximum memory usage and reduce CPU trashing on vmstorage when multiple heavy queries are executed. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648
This commit is contained in:
parent
76064ba9e7
commit
a3e91c593b
@ -360,6 +360,19 @@ func registerStorageMetrics(strg *storage.Storage) {
|
|||||||
return float64(m().AddRowsConcurrencyCurrent)
|
return float64(m().AddRowsConcurrencyCurrent)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
metrics.NewGauge(`vm_concurrent_search_tsids_limit_reached_total`, func() float64 {
|
||||||
|
return float64(m().SearchTSIDsConcurrencyLimitReached)
|
||||||
|
})
|
||||||
|
metrics.NewGauge(`vm_concurrent_search_tsids_limit_timeout_total`, func() float64 {
|
||||||
|
return float64(m().SearchTSIDsConcurrencyLimitTimeout)
|
||||||
|
})
|
||||||
|
metrics.NewGauge(`vm_concurrent_search_tsids_capacity`, func() float64 {
|
||||||
|
return float64(m().SearchTSIDsConcurrencyCapacity)
|
||||||
|
})
|
||||||
|
metrics.NewGauge(`vm_concurrent_search_tsids_current`, func() float64 {
|
||||||
|
return float64(m().SearchTSIDsConcurrencyCurrent)
|
||||||
|
})
|
||||||
|
|
||||||
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
|
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
|
||||||
return float64(m().SearchDelays)
|
return float64(m().SearchDelays)
|
||||||
})
|
})
|
||||||
|
@ -40,6 +40,9 @@ type Storage struct {
|
|||||||
addRowsConcurrencyLimitTimeout uint64
|
addRowsConcurrencyLimitTimeout uint64
|
||||||
addRowsConcurrencyDroppedRows uint64
|
addRowsConcurrencyDroppedRows uint64
|
||||||
|
|
||||||
|
searchTSIDsConcurrencyLimitReached uint64
|
||||||
|
searchTSIDsConcurrencyLimitTimeout uint64
|
||||||
|
|
||||||
slowRowInserts uint64
|
slowRowInserts uint64
|
||||||
slowPerDayIndexInserts uint64
|
slowPerDayIndexInserts uint64
|
||||||
slowMetricNameLoads uint64
|
slowMetricNameLoads uint64
|
||||||
@ -343,6 +346,11 @@ type Metrics struct {
|
|||||||
AddRowsConcurrencyCapacity uint64
|
AddRowsConcurrencyCapacity uint64
|
||||||
AddRowsConcurrencyCurrent uint64
|
AddRowsConcurrencyCurrent uint64
|
||||||
|
|
||||||
|
SearchTSIDsConcurrencyLimitReached uint64
|
||||||
|
SearchTSIDsConcurrencyLimitTimeout uint64
|
||||||
|
SearchTSIDsConcurrencyCapacity uint64
|
||||||
|
SearchTSIDsConcurrencyCurrent uint64
|
||||||
|
|
||||||
SearchDelays uint64
|
SearchDelays uint64
|
||||||
|
|
||||||
SlowRowInserts uint64
|
SlowRowInserts uint64
|
||||||
@ -403,6 +411,11 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
|||||||
m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh))
|
m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh))
|
||||||
m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh))
|
m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh))
|
||||||
|
|
||||||
|
m.SearchTSIDsConcurrencyLimitReached += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitReached)
|
||||||
|
m.SearchTSIDsConcurrencyLimitTimeout += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitTimeout)
|
||||||
|
m.SearchTSIDsConcurrencyCapacity = uint64(cap(searchTSIDsConcurrencyCh))
|
||||||
|
m.SearchTSIDsConcurrencyCurrent = uint64(len(searchTSIDsConcurrencyCh))
|
||||||
|
|
||||||
m.SearchDelays = storagepacelimiter.Search.DelaysTotal()
|
m.SearchDelays = storagepacelimiter.Search.DelaysTotal()
|
||||||
|
|
||||||
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
|
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
|
||||||
@ -865,13 +878,47 @@ func nextRetentionDuration(retentionMonths int) time.Duration {
|
|||||||
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
|
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
|
||||||
// Do not cache tfss -> tsids here, since the caching is performed
|
// Do not cache tfss -> tsids here, since the caching is performed
|
||||||
// on idb level.
|
// on idb level.
|
||||||
|
|
||||||
|
// Limit the number of concurrent goroutines that may search TSIDS in the storage.
|
||||||
|
// This should prevent from out of memory errors and CPU trashing when too many
|
||||||
|
// goroutines call searchTSIDs.
|
||||||
|
select {
|
||||||
|
case searchTSIDsConcurrencyCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
// Sleep for a while until giving up
|
||||||
|
atomic.AddUint64(&s.searchTSIDsConcurrencyLimitReached, 1)
|
||||||
|
currentTime := fasttime.UnixTimestamp()
|
||||||
|
timeoutSecs := uint64(0)
|
||||||
|
if currentTime < deadline {
|
||||||
|
timeoutSecs = deadline - currentTime
|
||||||
|
}
|
||||||
|
timeout := time.Second * time.Duration(timeoutSecs)
|
||||||
|
t := timerpool.Get(timeout)
|
||||||
|
select {
|
||||||
|
case searchTSIDsConcurrencyCh <- struct{}{}:
|
||||||
|
timerpool.Put(t)
|
||||||
|
case <-t.C:
|
||||||
|
timerpool.Put(t)
|
||||||
|
atomic.AddUint64(&s.searchTSIDsConcurrencyLimitTimeout, 1)
|
||||||
|
return nil, fmt.Errorf("cannot search for tsids, since more than %d concurrent searches are performed during %.3f secs; add more CPUs or reduce query load",
|
||||||
|
cap(searchTSIDsConcurrencyCh), timeout.Seconds())
|
||||||
|
}
|
||||||
|
}
|
||||||
tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics, deadline)
|
tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics, deadline)
|
||||||
|
<-searchTSIDsConcurrencyCh
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error when searching tsids for tfss %q: %w", tfss, err)
|
return nil, fmt.Errorf("error when searching tsids for tfss %q: %w", tfss, err)
|
||||||
}
|
}
|
||||||
return tsids, nil
|
return tsids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
// Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation
|
||||||
|
// is CPU bound and sometimes disk IO bound, so there is no sense in running more
|
||||||
|
// than GOMAXPROCS*2 concurrent goroutines for TSID searches.
|
||||||
|
searchTSIDsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1)*2)
|
||||||
|
)
|
||||||
|
|
||||||
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
|
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
|
||||||
//
|
//
|
||||||
// It is expected that all the tsdis have the same (accountID, projectID)
|
// It is expected that all the tsdis have the same (accountID, projectID)
|
||||||
@ -1098,7 +1145,7 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
|
|||||||
timerpool.Put(t)
|
timerpool.Put(t)
|
||||||
atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1)
|
atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1)
|
||||||
atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs)))
|
atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs)))
|
||||||
return fmt.Errorf("Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load",
|
return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load",
|
||||||
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
|
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user