lib/{mergeset,storage}: do not cache inverted index blocks containing tag->metricIDs items

This should reduce the amounts of used RAM during queries with filters over big number of time series.
This commit is contained in:
Aliaksandr Valialkin 2019-09-25 13:47:06 +03:00
parent 8d398af92f
commit adc18c3ee6
7 changed files with 48 additions and 13 deletions

View File

@ -31,6 +31,8 @@ type partSearch struct {
// Pointer to inmemory block, which may be reused.
inmemoryBlockReuse *inmemoryBlock
shouldCacheBlock func(item []byte) bool
idxbCache *indexBlockCache
ibCache *inmemoryBlockCache
@ -59,6 +61,7 @@ func (ps *partSearch) reset() {
putInmemoryBlock(ps.inmemoryBlockReuse)
ps.inmemoryBlockReuse = nil
}
ps.shouldCacheBlock = nil
ps.idxbCache = nil
ps.ibCache = nil
ps.err = nil
@ -75,7 +78,7 @@ func (ps *partSearch) reset() {
// Init initializes ps for search in the p.
//
// Use Seek for search in p.
func (ps *partSearch) Init(p *part) {
func (ps *partSearch) Init(p *part, shouldCacheBlock func(item []byte) bool) {
ps.reset()
ps.p = p
@ -324,6 +327,16 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
}
func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, bool, error) {
if ps.shouldCacheBlock != nil {
if !ps.shouldCacheBlock(bh.firstItem) {
ib, err := ps.readInmemoryBlock(bh)
if err != nil {
return nil, false, err
}
return ib, true, nil
}
}
var ibKey inmemoryBlockCacheKey
ibKey.Init(bh)
ib := ps.ibCache.Get(ibKey)

View File

@ -51,7 +51,7 @@ func testPartSearchConcurrent(p *part, items []string) error {
func testPartSearchSerial(p *part, items []string) error {
var ps partSearch
ps.Init(p)
ps.Init(p, nil)
var k []byte
// Search for the item smaller than the items[0]

View File

@ -58,7 +58,7 @@ func (ts *TableSearch) reset() {
// Init initializes ts for searching in the tb.
//
// MustClose must be called when the ts is no longer needed.
func (ts *TableSearch) Init(tb *Table) {
func (ts *TableSearch) Init(tb *Table, shouldCacheBlock func(item []byte) bool) {
if ts.needClosing {
logger.Panicf("BUG: missing MustClose call before the next call to Init")
}
@ -76,7 +76,7 @@ func (ts *TableSearch) Init(tb *Table) {
}
ts.psPool = ts.psPool[:len(ts.pws)]
for i, pw := range ts.pws {
ts.psPool[i].Init(pw.p)
ts.psPool[i].Init(pw.p, shouldCacheBlock)
}
}

View File

@ -109,7 +109,7 @@ func testTableSearchConcurrent(tb *Table, items []string) error {
func testTableSearchSerial(tb *Table, items []string) error {
var ts TableSearch
ts.Init(tb)
ts.Init(tb, nil)
for _, key := range []string{
"",
"123",

View File

@ -81,7 +81,7 @@ func benchmarkTableSearchKeysExt(b *testing.B, tb *Table, keys [][]byte, stripSu
b.SetBytes(int64(searchKeysCount * rowsToScan))
b.RunParallel(func(pb *testing.PB) {
var ts TableSearch
ts.Init(tb)
ts.Init(tb, nil)
defer ts.MustClose()
for pb.Next() {
startIdx := rand.Intn(len(keys) - searchKeysCount)

View File

@ -176,10 +176,10 @@ func TestTableCreateSnapshotAt(t *testing.T) {
defer tb2.MustClose()
var ts, ts1, ts2 TableSearch
ts.Init(tb)
ts1.Init(tb1)
ts.Init(tb, nil)
ts1.Init(tb1, nil)
defer ts1.MustClose()
ts2.Init(tb2)
ts2.Init(tb2, nil)
defer ts2.MustClose()
for i := 0; i < itemsCount; i++ {
key := []byte(fmt.Sprintf("item %d", i))

View File

@ -38,12 +38,34 @@ const (
nsPrefixMetricIDToMetricName = 3
// Prefix for deleted MetricID entries.
nsPrefixDeteletedMetricID = 4
nsPrefixDeletedMetricID = 4
// Prefix for Date->MetricID entries.
nsPrefixDateToMetricID = 5
)
func shouldCacheBlock(item []byte) bool {
if len(item) == 0 {
return true
}
// Do not cache items starting from
switch item[0] {
case nsPrefixTagToMetricIDs:
// Do not cache blocks with tag->metricIDs items, since:
// - these blocks are scanned sequentially, so the overhead
// on their unmarshaling is amortized by the sequential scan.
// - these blocks can occupy high amounts of RAM in cache
// and evict other frequently accessed blocks.
return false
case nsPrefixDeletedMetricID:
// Do not cache blocks with deleted metricIDs,
// since these metricIDs are loaded only once during app start.
return false
default:
return true
}
}
// indexDB represents an index db.
type indexDB struct {
name string
@ -500,7 +522,7 @@ func (db *indexDB) getIndexSearch() *indexSearch {
}
}
is := v.(*indexSearch)
is.ts.Init(db.tb)
is.ts.Init(db.tb, shouldCacheBlock)
return is
}
@ -862,7 +884,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
// Mark the found metricIDs as deleted.
items := getIndexItems()
for _, metricID := range metricIDs {
items.B = append(items.B, nsPrefixDeteletedMetricID)
items.B = append(items.B, nsPrefixDeletedMetricID)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
}
@ -918,7 +940,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
dmis := &uint64set.Set{}
ts := &is.ts
kb := &is.kb
kb.B = append(kb.B[:0], nsPrefixDeteletedMetricID)
kb.B = append(kb.B[:0], nsPrefixDeletedMetricID)
ts.Seek(kb.B)
for ts.NextItem() {
item := ts.Item