lib/mergeset: add sparse indexdb cache (#7269)

Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7182

- add a separate index cache for searches which might read through large
amounts of random entries. Primary use-case for this is retention and
downsampling filters, when applying filters background merge needs to
fetch large amount of random entries which pollutes an index cache.
Using different caches allows to reduce effect on memory usage and cache
efficiency of the main cache while still having high cache hit rate. A
separate cache size is 5% of allowed memory.

- reduce size of indexdb/dataBlocks cache in order to free memory for
new sparse cache. Reduced size by 5% and moved this to a separate cache.

- add a separate metricName search which does not cache metric names -
this is needed in order to allow disabling metric name caching when
applying downsampling/retention filters. Applying filters during
background merge accesses random entries, this fills up cache and does
not provide an actual improvement due to random access nature.


Merge performance and memory usage stats before and after the change:

- before

![image](https://github.com/user-attachments/assets/485fffbb-c225-47ae-b5c5-bc8a7c57b36e)


- after

![image](https://github.com/user-attachments/assets/f4ba3440-7c1c-4ec1-bc54-4d2ab431eef5)

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2024-10-24 10:21:17 -03:00 committed by GitHub
parent 5fecb77f69
commit 837d0d136d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 123 additions and 16 deletions

View File

@ -67,6 +67,8 @@ var (
"See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning")
cacheSizeIndexDBDataBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocks", 0, "Overrides max size for indexdb/dataBlocks cache. "+
"See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning")
cacheSizeIndexDBDataBlocksSparse = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocksSparse", 0, "Overrides max size for indexdb/dataBlocksSparse cache. "+
"See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning")
cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFiltersToMetricIDs cache. "+
"See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning")
)
@ -100,6 +102,7 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.IntN())
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.IntN())
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.IntN())
mergeset.SetDataBlocksSparseCacheSize(cacheSizeIndexDBDataBlocksSparse.IntN())
if retentionPeriod.Duration() < 24*time.Hour {
logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod)
@ -581,6 +584,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="storage/next_day_metric_ids"}`, m.NextDayMetricIDCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="storage/indexBlocks"}`, tm.IndexBlocksCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheSize)
metrics.WriteGaugeUint64(w, `vm_cache_entries{type="storage/regexps"}`, uint64(storage.RegexpCacheSize()))
@ -592,6 +596,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="storage/metricName"}`, m.MetricNameCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="storage/indexBlocks"}`, tm.IndexBlocksCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="storage/date_metricID"}`, m.DateMetricIDCacheSizeBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_bytes{type="storage/hour_metric_ids"}`, m.HourMetricIDCacheSizeBytes)
@ -606,6 +611,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="storage/metricName"}`, m.MetricNameCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="storage/indexBlocks"}`, tm.IndexBlocksCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheSizeMaxBytes)
metrics.WriteGaugeUint64(w, `vm_cache_size_max_bytes{type="storage/regexps"}`, uint64(storage.RegexpCacheMaxSizeBytes()))
@ -616,6 +622,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="storage/metricName"}`, m.MetricNameCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="storage/indexBlocks"}`, tm.IndexBlocksCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheRequests)
metrics.WriteCounterUint64(w, `vm_cache_requests_total{type="storage/regexps"}`, storage.RegexpCacheRequests())
@ -626,6 +633,7 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="storage/metricName"}`, m.MetricNameCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="storage/indexBlocks"}`, tm.IndexBlocksCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/dataBlocks"}`, idbm.DataBlocksCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/dataBlocksSparse"}`, idbm.DataBlocksSparseCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/indexBlocks"}`, idbm.IndexBlocksCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="indexdb/tagFiltersToMetricIDs"}`, idbm.TagFiltersToMetricIDsCacheMisses)
metrics.WriteCounterUint64(w, `vm_cache_misses_total{type="storage/regexps"}`, storage.RegexpCacheMisses())

View File

@ -1910,6 +1910,9 @@ Below is the output for `/path/to/vmstorage -help`:
-storage.cacheSizeIndexDBDataBlocks size
Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeIndexDBDataBlocksSparse size
Overrides max size for indexdb/dataBlocksSparse cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeIndexDBIndexBlocks size
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)

View File

@ -3258,6 +3258,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-storage.cacheSizeIndexDBDataBlocks size
Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeIndexDBDataBlocksSparse size
Overrides max size for indexdb/dataBlocksSparse cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
-storage.cacheSizeIndexDBIndexBlocks size
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)

View File

@ -19,6 +19,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be useful when rules are retrieved via HTTP URL where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995).
* FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add a separate cache type for storing sparse entries when performing large index scans. This significantly reduces memory usage when applying [downsampling filters](https://docs.victoriametrics.com/#downsampling) and [retention filters](https://docs.victoriametrics.com/#retention-filters) during background merge. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7182) for the details.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert): properly set `group_name` and `file` fields for recording rules in `/api/v1/rules`.
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl/): fix issue with series matching for `vmctl vm-native` with `--vm-native-disable-per-metric-migration` flag enabled. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7309).

View File

@ -166,7 +166,7 @@ func (idb *indexdb) getIndexSearch() *indexSearch {
}
}
is := v.(*indexSearch)
is.ts.Init(idb.tb)
is.ts.Init(idb.tb, false)
return is
}

View File

@ -14,6 +14,7 @@ import (
var idxbCache = blockcache.NewCache(getMaxIndexBlocksCacheSize)
var ibCache = blockcache.NewCache(getMaxInmemoryBlocksCacheSize)
var ibSparseCache = blockcache.NewCache(getMaxInmemoryBlocksSparseCacheSize)
// SetIndexBlocksCacheSize overrides the default size of indexdb/indexBlocks cache
func SetIndexBlocksCacheSize(size int) {
@ -42,15 +43,32 @@ func SetDataBlocksCacheSize(size int) {
func getMaxInmemoryBlocksCacheSize() int {
maxInmemoryBlockCacheSizeOnce.Do(func() {
if maxInmemoryBlockCacheSize <= 0 {
maxInmemoryBlockCacheSize = int(0.25 * float64(memory.Allowed()))
maxInmemoryBlockCacheSize = int(0.20 * float64(memory.Allowed()))
}
})
return maxInmemoryBlockCacheSize
}
// SetDataBlocksSparseCacheSize overrides the default size of indexdb/dataBlocksSparse cache
func SetDataBlocksSparseCacheSize(size int) {
maxInmemorySparseMergeCacheSize = size
}
func getMaxInmemoryBlocksSparseCacheSize() int {
maxInmemoryBlockSparseCacheSizeOnce.Do(func() {
if maxInmemorySparseMergeCacheSize <= 0 {
maxInmemorySparseMergeCacheSize = int(0.05 * float64(memory.Allowed()))
}
})
return maxInmemorySparseMergeCacheSize
}
var (
maxInmemoryBlockCacheSize int
maxInmemoryBlockCacheSizeOnce sync.Once
maxInmemorySparseMergeCacheSize int
maxInmemoryBlockSparseCacheSizeOnce sync.Once
)
type part struct {
@ -118,6 +136,7 @@ func (p *part) MustClose() {
idxbCache.RemoveBlocksForPart(p)
ibCache.RemoveBlocksForPart(p)
ibSparseCache.RemoveBlocksForPart(p)
}
type indexBlock struct {

View File

@ -36,6 +36,8 @@ type partSearch struct {
ib *inmemoryBlock
ibItemIdx int
sparse bool
}
func (ps *partSearch) reset() {
@ -57,10 +59,11 @@ func (ps *partSearch) reset() {
// Init initializes ps for search in the p.
//
// Use Seek for search in p.
func (ps *partSearch) Init(p *part) {
func (ps *partSearch) Init(p *part, sparse bool) {
ps.reset()
ps.p = p
ps.sparse = sparse
}
// Seek seeks for the first item greater or equal to k in ps.
@ -299,18 +302,22 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
}
func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) {
cache := ibCache
if ps.sparse {
cache = ibSparseCache
}
ibKey := blockcache.Key{
Part: ps.p,
Offset: bh.itemsBlockOffset,
}
b := ibCache.GetBlock(ibKey)
b := cache.GetBlock(ibKey)
if b == nil {
ib, err := ps.readInmemoryBlock(bh)
if err != nil {
return nil, err
}
b = ib
ibCache.PutBlock(ibKey, b)
cache.PutBlock(ibKey, b)
}
ib := b.(*inmemoryBlock)
return ib, nil

View File

@ -54,7 +54,7 @@ func testPartSearchConcurrent(p *part, items []string) error {
func testPartSearchSerial(r *rand.Rand, p *part, items []string) error {
var ps partSearch
ps.Init(p)
ps.Init(p, true)
var k []byte
// Search for the item smaller than the items[0]

View File

@ -574,6 +574,12 @@ type TableMetrics struct {
DataBlocksCacheRequests uint64
DataBlocksCacheMisses uint64
DataBlocksSparseCacheSize uint64
DataBlocksSparseCacheSizeBytes uint64
DataBlocksSparseCacheSizeMaxBytes uint64
DataBlocksSparseCacheRequests uint64
DataBlocksSparseCacheMisses uint64
IndexBlocksCacheSize uint64
IndexBlocksCacheSizeBytes uint64
IndexBlocksCacheSizeMaxBytes uint64
@ -635,6 +641,12 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.DataBlocksCacheRequests = ibCache.Requests()
m.DataBlocksCacheMisses = ibCache.Misses()
m.DataBlocksSparseCacheSize = uint64(ibSparseCache.Len())
m.DataBlocksSparseCacheSizeBytes = uint64(ibSparseCache.SizeBytes())
m.DataBlocksSparseCacheSizeMaxBytes = uint64(ibSparseCache.SizeMaxBytes())
m.DataBlocksSparseCacheRequests = ibSparseCache.Requests()
m.DataBlocksSparseCacheMisses = ibSparseCache.Misses()
m.IndexBlocksCacheSize = uint64(idxbCache.Len())
m.IndexBlocksCacheSizeBytes = uint64(idxbCache.SizeBytes())
m.IndexBlocksCacheSizeMaxBytes = uint64(idxbCache.SizeMaxBytes())

View File

@ -59,7 +59,7 @@ func (ts *TableSearch) reset() {
// Init initializes ts for searching in the tb.
//
// MustClose must be called when the ts is no longer needed.
func (ts *TableSearch) Init(tb *Table) {
func (ts *TableSearch) Init(tb *Table, sparse bool) {
if ts.needClosing {
logger.Panicf("BUG: missing MustClose call before the next call to Init")
}
@ -74,7 +74,7 @@ func (ts *TableSearch) Init(tb *Table) {
// Initialize the psPool.
ts.psPool = slicesutil.SetLength(ts.psPool, len(ts.pws))
for i, pw := range ts.pws {
ts.psPool[i].Init(pw.p)
ts.psPool[i].Init(pw.p, sparse)
}
}

View File

@ -107,7 +107,7 @@ func testTableSearchConcurrent(tb *Table, items []string) error {
func testTableSearchSerial(tb *Table, items []string) error {
var ts TableSearch
ts.Init(tb)
ts.Init(tb, false)
for _, key := range []string{
"",
"123",

View File

@ -83,7 +83,7 @@ func benchmarkTableSearchKeysExt(b *testing.B, tb *Table, keys [][]byte, stripSu
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(1))
var ts TableSearch
ts.Init(tb)
ts.Init(tb, false)
defer ts.MustClose()
for pb.Next() {
startIdx := r.Intn(len(keys) - searchKeysCount)

View File

@ -139,9 +139,9 @@ func TestTableCreateSnapshotAt(t *testing.T) {
tb2 := MustOpenTable(snapshot2, nil, nil, &isReadOnly)
var ts, ts1, ts2 TableSearch
ts.Init(tb)
ts1.Init(tb1)
ts2.Init(tb2)
ts.Init(tb, false)
ts1.Init(tb1, false)
ts2.Init(tb2, false)
for i := 0; i < itemsCount; i++ {
key := []byte(fmt.Sprintf("item %d", i))
if err := ts.FirstItemWithPrefix(key); err != nil {

View File

@ -14,6 +14,9 @@ import (
"time"
"unsafe"
"github.com/VictoriaMetrics/fastcache"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -25,8 +28,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
"github.com/cespare/xxhash/v2"
)
const (
@ -520,7 +521,21 @@ type indexSearch struct {
deadline uint64
}
// getIndexSearch returns an indexSearch with default configuration
func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch {
return db.getIndexSearchInternal(deadline, false)
}
// getIndexSearchSparse returns an indexSearch with sparse cache
// It is useful for search operations that can scan through the large amount index entries
// Without the need to keep all the entries in the caches used for queries
// used in ENT version
// nolint:unused
func (db *indexDB) getIndexSearchSparse(deadline uint64) *indexSearch {
return db.getIndexSearchInternal(deadline, true)
}
func (db *indexDB) getIndexSearchInternal(deadline uint64, sparse bool) *indexSearch {
v := db.indexSearchPool.Get()
if v == nil {
v = &indexSearch{
@ -528,7 +543,7 @@ func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch {
}
}
is := v.(*indexSearch)
is.ts.Init(db.tb)
is.ts.Init(db.tb, sparse)
is.deadline = deadline
return is
}
@ -1564,6 +1579,45 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byt
return dst, false
}
// searchMetricNameWithoutCache appends metric name for the given metricID to dst
// and returns the result.
// It does not cache the result and uses sparse cache for index scan.
// used in ENT version
// nolint:unused
func (db *indexDB) searchMetricNameWithoutCache(dst []byte, metricID uint64) ([]byte, bool) {
is := db.getIndexSearchSparse(noDeadline)
var ok bool
dst, ok = is.searchMetricName(dst, metricID)
db.putIndexSearch(is)
if ok {
// There is no need in verifying whether the given metricID is deleted,
// since the filtering must be performed before calling this func.
return dst, true
}
// Try searching in the external indexDB.
db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearchSparse(noDeadline)
dst, ok = is.searchMetricName(dst, metricID)
extDB.putIndexSearch(is)
})
if ok {
return dst, true
}
if db.s.wasMetricIDMissingBefore(metricID) {
// Cannot find the MetricName for the given metricID for the last 60 seconds.
// It is likely the indexDB contains incomplete set of metricID -> metricName entries
// after unclean shutdown or after restoring from a snapshot.
// Mark the metricID as deleted, so it is created again when new sample
// for the given time series is ingested next time.
db.missingMetricNamesForMetricID.Add(1)
db.deleteMetricIDs([]uint64{metricID})
}
return dst, false
}
// DeleteTSIDs marks as deleted all the TSIDs matching the given tfss and
// updates or resets all caches where TSIDs and the corresponding MetricIDs may
// be stored.