lib/storage: invalidate tagFilters -> TSIDS cache when newly added index data becomes visible to search

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/163
This commit is contained in:
Aliaksandr Valialkin 2019-08-29 14:39:05 +03:00
parent 8504a38214
commit 0b0153ba3d
6 changed files with 75 additions and 70 deletions

View File

@ -59,6 +59,8 @@ const rawItemsFlushInterval = time.Second
type Table struct { type Table struct {
path string path string
flushCallback func()
partsLock sync.Mutex partsLock sync.Mutex
parts []*partWrapper parts []*partWrapper
@ -121,8 +123,11 @@ func (pw *partWrapper) decRef() {
// OpenTable opens a table on the given path. // OpenTable opens a table on the given path.
// //
// Optional flushCallback is called every time new data batch is flushed
// to the underlying storage and becomes visible to search.
//
// The table is created if it doesn't exist yet. // The table is created if it doesn't exist yet.
func OpenTable(path string) (*Table, error) { func OpenTable(path string, flushCallback func()) (*Table, error) {
path = filepath.Clean(path) path = filepath.Clean(path)
logger.Infof("opening table %q...", path) logger.Infof("opening table %q...", path)
startTime := time.Now() startTime := time.Now()
@ -145,11 +150,12 @@ func OpenTable(path string) (*Table, error) {
} }
tb := &Table{ tb := &Table{
path: path, path: path,
parts: pws, flushCallback: flushCallback,
mergeIdx: uint64(time.Now().UnixNano()), parts: pws,
flockF: flockF, mergeIdx: uint64(time.Now().UnixNano()),
stopCh: make(chan struct{}), flockF: flockF,
stopCh: make(chan struct{}),
} }
tb.startPartMergers() tb.startPartMergers()
tb.startRawItemsFlusher() tb.startRawItemsFlusher()
@ -444,6 +450,9 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) {
if err := tb.mergeParts(pws, nil, true); err != nil { if err := tb.mergeParts(pws, nil, true); err != nil {
logger.Panicf("FATAL: cannot merge raw parts: %s", err) logger.Panicf("FATAL: cannot merge raw parts: %s", err)
} }
if tb.flushCallback != nil {
tb.flushCallback()
}
} }
for { for {

View File

@ -5,6 +5,7 @@ import (
"math/rand" "math/rand"
"os" "os"
"sort" "sort"
"sync/atomic"
"testing" "testing"
"time" "time"
) )
@ -39,7 +40,7 @@ func TestTableSearchSerial(t *testing.T) {
func() { func() {
// Re-open the table and verify the search works. // Re-open the table and verify the search works.
tb, err := OpenTable(path) tb, err := OpenTable(path, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }
@ -74,7 +75,7 @@ func TestTableSearchConcurrent(t *testing.T) {
// Re-open the table and verify the search works. // Re-open the table and verify the search works.
func() { func() {
tb, err := OpenTable(path) tb, err := OpenTable(path, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }
@ -146,7 +147,11 @@ func testTableSearchSerial(tb *Table, items []string) error {
} }
func newTestTable(path string, itemsCount int) (*Table, []string, error) { func newTestTable(path string, itemsCount int) (*Table, []string, error) {
tb, err := OpenTable(path) var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
}
tb, err := OpenTable(path, flushCallback)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("cannot open table: %s", err) return nil, nil, fmt.Errorf("cannot open table: %s", err)
} }
@ -159,6 +164,9 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) {
items[i] = item items[i] = item
} }
tb.DebugFlush() tb.DebugFlush()
if itemsCount > 0 && atomic.LoadUint64(&flushes) == 0 {
return nil, nil, fmt.Errorf("unexpeted zero flushes for itemsCount=%d", itemsCount)
}
sort.Strings(items) sort.Strings(items)
return tb, items, nil return tb, items, nil

View File

@ -32,7 +32,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) {
// Force finishing pending merges // Force finishing pending merges
tb.MustClose() tb.MustClose()
tb, err = OpenTable(path) tb, err = OpenTable(path, nil)
if err != nil { if err != nil {
b.Fatalf("unexpected error when re-opening table %q: %s", path, err) b.Fatalf("unexpected error when re-opening table %q: %s", path, err)
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -20,7 +21,7 @@ func TestTableOpenClose(t *testing.T) {
}() }()
// Create a new table // Create a new table
tb, err := OpenTable(path) tb, err := OpenTable(path, nil)
if err != nil { if err != nil {
t.Fatalf("cannot create new table: %s", err) t.Fatalf("cannot create new table: %s", err)
} }
@ -30,7 +31,7 @@ func TestTableOpenClose(t *testing.T) {
// Re-open created table multiple times. // Re-open created table multiple times.
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb, err := OpenTable(path) tb, err := OpenTable(path, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open created table: %s", err) t.Fatalf("cannot open created table: %s", err)
} }
@ -44,14 +45,14 @@ func TestTableOpenMultipleTimes(t *testing.T) {
_ = os.RemoveAll(path) _ = os.RemoveAll(path)
}() }()
tb1, err := OpenTable(path) tb1, err := OpenTable(path, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }
defer tb1.MustClose() defer tb1.MustClose()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb2, err := OpenTable(path) tb2, err := OpenTable(path, nil)
if err == nil { if err == nil {
tb2.MustClose() tb2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened table") t.Fatalf("expecting non-nil error when opening already opened table")
@ -68,7 +69,11 @@ func TestTableAddItemSerial(t *testing.T) {
_ = os.RemoveAll(path) _ = os.RemoveAll(path)
}() }()
tb, err := OpenTable(path) var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
}
tb, err := OpenTable(path, flushCallback)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -78,6 +83,9 @@ func TestTableAddItemSerial(t *testing.T) {
// Verify items count after pending items flush. // Verify items count after pending items flush.
tb.DebugFlush() tb.DebugFlush()
if atomic.LoadUint64(&flushes) == 0 {
t.Fatalf("unexpected zero flushes")
}
var m TableMetrics var m TableMetrics
tb.UpdateMetrics(&m) tb.UpdateMetrics(&m)
@ -91,7 +99,7 @@ func TestTableAddItemSerial(t *testing.T) {
testReopenTable(t, path, itemsCount) testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts. // Add more items in order to verify merge between inmemory parts and file-based parts.
tb, err = OpenTable(path) tb, err = OpenTable(path, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -124,7 +132,7 @@ func TestTableCreateSnapshotAt(t *testing.T) {
_ = os.RemoveAll(path) _ = os.RemoveAll(path)
}() }()
tb, err := OpenTable(path) tb, err := OpenTable(path, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -155,13 +163,13 @@ func TestTableCreateSnapshotAt(t *testing.T) {
}() }()
// Verify snapshots contain all the data. // Verify snapshots contain all the data.
tb1, err := OpenTable(snapshot1) tb1, err := OpenTable(snapshot1, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
defer tb1.MustClose() defer tb1.MustClose()
tb2, err := OpenTable(snapshot2) tb2, err := OpenTable(snapshot2, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -205,7 +213,11 @@ func TestTableAddItemsConcurrent(t *testing.T) {
_ = os.RemoveAll(path) _ = os.RemoveAll(path)
}() }()
tb, err := OpenTable(path) var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
}
tb, err := OpenTable(path, flushCallback)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -215,6 +227,10 @@ func TestTableAddItemsConcurrent(t *testing.T) {
// Verify items count after pending items flush. // Verify items count after pending items flush.
tb.DebugFlush() tb.DebugFlush()
if atomic.LoadUint64(&flushes) == 0 {
t.Fatalf("unexpected zero flushes")
}
var m TableMetrics var m TableMetrics
tb.UpdateMetrics(&m) tb.UpdateMetrics(&m)
if m.ItemsCount != itemsCount { if m.ItemsCount != itemsCount {
@ -227,7 +243,7 @@ func TestTableAddItemsConcurrent(t *testing.T) {
testReopenTable(t, path, itemsCount) testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts. // Add more items in order to verify merge between inmemory parts and file-based parts.
tb, err = OpenTable(path) tb, err = OpenTable(path, nil)
if err != nil { if err != nil {
t.Fatalf("cannot open %q: %s", path, err) t.Fatalf("cannot open %q: %s", path, err)
} }
@ -269,7 +285,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) {
t.Helper() t.Helper()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb, err := OpenTable(path) tb, err := OpenTable(path, nil)
if err != nil { if err != nil {
t.Fatalf("cannot re-open %q: %s", path, err) t.Fatalf("cannot re-open %q: %s", path, err)
} }

View File

@ -65,9 +65,6 @@ type indexDB struct {
// matching low number of metrics. // matching low number of metrics.
uselessTagFiltersCache *workingsetcache.Cache uselessTagFiltersCache *workingsetcache.Cache
tagCachePrefixes map[accountProjectKey]uint64
tagCachePrefixesLock sync.RWMutex
indexSearchPool sync.Pool indexSearchPool sync.Pool
// An inmemory map[uint64]struct{} of deleted metricIDs. // An inmemory map[uint64]struct{} of deleted metricIDs.
@ -104,12 +101,6 @@ type indexDB struct {
mustDrop uint64 mustDrop uint64
} }
// accountProjectKey is used for maps keyed by (AccountID, ProjectID).
type accountProjectKey struct {
AccountID uint32
ProjectID uint32
}
// openIndexDB opens index db from the given path with the given caches. // openIndexDB opens index db from the given path with the given caches.
func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) { func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) {
if metricIDCache == nil { if metricIDCache == nil {
@ -125,7 +116,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca
logger.Panicf("BUG: prevHourMetricIDs must be non-nil") logger.Panicf("BUG: prevHourMetricIDs must be non-nil")
} }
tb, err := mergeset.OpenTable(path) tb, err := mergeset.OpenTable(path, invalidateTagCache)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err) return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err)
} }
@ -145,8 +136,6 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca
metricNameCache: metricNameCache, metricNameCache: metricNameCache,
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
tagCachePrefixes: make(map[accountProjectKey]uint64),
currHourMetricIDs: currHourMetricIDs, currHourMetricIDs: currHourMetricIDs,
prevHourMetricIDs: prevHourMetricIDs, prevHourMetricIDs: prevHourMetricIDs,
} }
@ -377,27 +366,17 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
db.metricNameCache.Set(key[:], metricName) db.metricNameCache.Set(key[:], metricName)
} }
func (db *indexDB) marshalTagFiltersKey(dst []byte, tfss []*TagFilters, versioned bool) []byte { func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, versioned bool) []byte {
if len(tfss) == 0 {
return nil
}
prefix := ^uint64(0) prefix := ^uint64(0)
if versioned { if versioned {
k := accountProjectKey{ prefix = atomic.LoadUint64(&tagFiltersKeyGen)
AccountID: tfss[0].accountID,
ProjectID: tfss[0].projectID,
}
db.tagCachePrefixesLock.RLock()
prefix = db.tagCachePrefixes[k]
db.tagCachePrefixesLock.RUnlock()
if prefix == 0 {
// Create missing prefix.
// It is OK if multiple concurrent goroutines call invalidateTagCache
// for the same (accountID, projectID).
prefix = db.invalidateTagCache(k.AccountID, k.ProjectID)
}
} }
dst = encoding.MarshalUint64(dst, prefix) dst = encoding.MarshalUint64(dst, prefix)
if len(tfss) == 0 {
return dst
}
dst = encoding.MarshalUint32(dst, tfss[0].accountID)
dst = encoding.MarshalUint32(dst, tfss[0].projectID)
for _, tfs := range tfss { for _, tfs := range tfss {
dst = append(dst, 0) // separator between tfs groups. dst = append(dst, 0) // separator between tfs groups.
for i := range tfs.tfs { for i := range tfs.tfs {
@ -439,21 +418,13 @@ func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) {
return dst, nil return dst, nil
} }
func (db *indexDB) invalidateTagCache(accountID, projectID uint32) uint64 { func invalidateTagCache() {
// This function must be fast, since it is called each // This function must be fast, since it is called each
// time new timeseries is added. // time new timeseries is added.
prefix := atomic.AddUint64(&tagCacheKeyPrefix, 1) atomic.AddUint64(&tagFiltersKeyGen, 1)
k := accountProjectKey{
AccountID: accountID,
ProjectID: projectID,
}
db.tagCachePrefixesLock.Lock()
db.tagCachePrefixes[k] = prefix
db.tagCachePrefixesLock.Unlock()
return prefix
} }
var tagCacheKeyPrefix uint64 var tagFiltersKeyGen uint64
// getTSIDByNameNoCreate fills the dst with TSID for the given metricName. // getTSIDByNameNoCreate fills the dst with TSID for the given metricName.
// //
@ -555,9 +526,8 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
return fmt.Errorf("cannot create indexes: %s", err) return fmt.Errorf("cannot create indexes: %s", err)
} }
// Invalidate tag cache for the given (AccountID, ProjectID), since // There is no need in invalidating tag cache, since it is invalidated
// it doesn't contain tags for the created mn -> TSID mapping. // on db.tb flush via invalidateTagCache flushCallback passed to OpenTable.
_ = db.invalidateTagCache(mn.AccountID, mn.ProjectID)
return nil return nil
} }
@ -904,8 +874,6 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
if len(tfss) == 0 { if len(tfss) == 0 {
return 0, nil return 0, nil
} }
accountID := tfss[0].accountID
projectID := tfss[0].projectID
// Obtain metricIDs to delete. // Obtain metricIDs to delete.
is := db.getIndexSearch() is := db.getIndexSearch()
@ -937,7 +905,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
db.updateDeletedMetricIDs(metricIDs) db.updateDeletedMetricIDs(metricIDs)
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs. // Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
_ = db.invalidateTagCache(accountID, projectID) invalidateTagCache()
// Do not reset uselessTagFiltersCache, since the found metricIDs // Do not reset uselessTagFiltersCache, since the found metricIDs
// on cache miss are filtered out later with deletedMetricIDs. // on cache miss are filtered out later with deletedMetricIDs.
@ -1010,7 +978,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
tfKeyBuf := tagFiltersKeyBufPool.Get() tfKeyBuf := tagFiltersKeyBufPool.Get()
defer tagFiltersKeyBufPool.Put(tfKeyBuf) defer tagFiltersKeyBufPool.Put(tfKeyBuf)
tfKeyBuf.B = db.marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, true) tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, true)
tsids, ok := db.getFromTagCache(tfKeyBuf.B) tsids, ok := db.getFromTagCache(tfKeyBuf.B)
if ok { if ok {
// Fast path - tsids found in the cache. // Fast path - tsids found in the cache.
@ -1031,7 +999,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
defer tagFiltersKeyBufPool.Put(tfKeyExtBuf) defer tagFiltersKeyBufPool.Put(tfKeyExtBuf)
// Data in extDB cannot be changed, so use unversioned keys for tag cache. // Data in extDB cannot be changed, so use unversioned keys for tag cache.
tfKeyExtBuf.B = extDB.marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, false) tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, false)
tsids, ok := extDB.getFromTagCache(tfKeyExtBuf.B) tsids, ok := extDB.getFromTagCache(tfKeyExtBuf.B)
if ok { if ok {
extTSIDs = tsids extTSIDs = tsids

View File

@ -339,6 +339,10 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
return false return false
} }
type accountProjectKey struct {
AccountID uint32
ProjectID uint32
}
allKeys := make(map[accountProjectKey]map[string]bool) allKeys := make(map[accountProjectKey]map[string]bool)
timeseriesCounters := make(map[accountProjectKey]map[uint64]bool) timeseriesCounters := make(map[accountProjectKey]map[uint64]bool)
var tsidCopy TSID var tsidCopy TSID