diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index f1a5d0add4..878e10a395 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -26,6 +26,8 @@ var ( vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") + disableRecentHourIndex = flag.Bool("disableRecentHourIndex", false, "Whether to disable inmemory inverted index for recent hour. "+ + "This may be useful in order to reduce memory usage when working with high number of time series") bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0") smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0") ) @@ -35,6 +37,9 @@ func main() { buildinfo.Init() logger.Init() + if *disableRecentHourIndex { + storage.DisableRecentHourIndex() + } storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 9b116536d9..ac3bb9d803 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -2231,6 +2231,10 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, } func (is *indexSearch) tryUpdatingMetricIDsForRecentHour(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool { + if disableRecentHourIndex { + return false + } + atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1) k := accountProjectKey{ AccountID: tfs.accountID, diff --git a/lib/storage/storage.go b/lib/storage/storage.go index b7aa806c54..856fff88e3 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -27,6 +27,17 @@ import ( const maxRetentionMonths = 12 * 100 +var disableRecentHourIndex = false + +// DisableRecentHourIndex disables in-memory inverted index for recent hour. +// +// This may be useful in order to save RAM for high cardinality data. +// +// This function must be called before OpenStorage. +func DisableRecentHourIndex() { + disableRecentHourIndex = true +} + // Storage represents TSDB storage. type Storage struct { // Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs. @@ -593,19 +604,21 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs // Unmarshal hm.iidx iidx := newInmemoryInvertedIndex() - tail, err := iidx.Unmarshal(src) - if err != nil { - logger.Errorf("discarding %s, since it has broken hm.iidx data: %s", path, err) - return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), - hour: hour, + if !disableRecentHourIndex { + tail, err := iidx.Unmarshal(src) + if err != nil { + logger.Errorf("discarding %s, since it has broken hm.iidx data: %s", path, err) + return &hourMetricIDs{ + iidx: newInmemoryInvertedIndex(), + hour: hour, + } } - } - if len(tail) > 0 { - logger.Errorf("discarding %s, since it contains superflouos %d bytes of data", path, len(tail)) - return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), - hour: hour, + if len(tail) > 0 { + logger.Errorf("discarding %s, since it contains superflouos %d bytes of data", path, len(tail)) + return &hourMetricIDs{ + iidx: newInmemoryInvertedIndex(), + hour: hour, + } } } @@ -652,8 +665,10 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { } } - // Marshal hm.iidx - dst = hm.iidx.Marshal(dst) + if !disableRecentHourIndex { + // Marshal hm.iidx + dst = hm.iidx.Marshal(dst) + } if err := ioutil.WriteFile(path, dst, 0644); err != nil { logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) @@ -1015,7 +1030,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { } s.pendingHourEntries = append(s.pendingHourEntries, e) s.pendingHourEntriesLock.Unlock() - hm.iidx.AddMetricID(idb, e) + if !disableRecentHourIndex { + hm.iidx.AddMetricID(idb, e) + } } // Slower path: check global cache for (date, metricID) entry.