diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index e109ae81fc..c41936060e 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) @@ -1397,6 +1398,174 @@ func TestMatchTagFilters(t *testing.T) { } } +func TestSearchTSIDWithTimeRange(t *testing.T) { + metricIDCache := workingsetcache.New(1234, time.Hour) + metricNameCache := workingsetcache.New(1234, time.Hour) + defer metricIDCache.Stop() + defer metricNameCache.Stop() + + currMetricIDs := &hourMetricIDs{ + isFull: true, + m: &uint64set.Set{}, + } + + var hmCurr atomic.Value + hmCurr.Store(currMetricIDs) + + prevMetricIDs := &hourMetricIDs{ + isFull: true, + m: &uint64set.Set{}, + } + var hmPrev atomic.Value + hmPrev.Store(prevMetricIDs) + + dbName := "test-index-db-ts-range" + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) + if err != nil { + t.Fatalf("cannot open indexDB: %s", err) + } + defer func() { + db.MustClose() + if err := os.RemoveAll(dbName); err != nil { + t.Fatalf("cannot remove indexDB: %s", err) + } + }() + + is := db.getIndexSearch() + defer db.putIndexSearch(is) + + // Create a bunch of per-day time series + const accountID = 12345 + const projectID = 85453 + const days = 5 + const metricsPerDay = 1000 + theDay := time.Date(2019, time.October, 15, 5, 1, 0, 0, time.UTC) + now := uint64(timestampFromTime(theDay)) + currMetricIDs.hour = now / msecPerHour + prevMetricIDs.hour = (now - msecPerHour) / msecPerHour + baseDate := now / msecPerDay + var metricNameBuf []byte + for day := 0; day < days; day++ { + var tsids []TSID + for metric := 0; metric < metricsPerDay; metric++ { + var mn MetricName + mn.AccountID = accountID + mn.ProjectID = projectID + mn.MetricGroup = []byte("testMetric") + mn.AddTag( + "constant", + "const", + ) + mn.AddTag( + "day", + fmt.Sprintf("%v", day), + ) + mn.AddTag( + "uniqueid", + fmt.Sprintf("%v", metric), + ) + mn.sortTags() + + metricNameBuf = mn.Marshal(metricNameBuf[:0]) + var tsid TSID + if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf); err != nil { + t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err) + } + if tsid.AccountID != accountID { + t.Fatalf("unexpected accountID; got %d; want %d", tsid.AccountID, accountID) + } + if tsid.ProjectID != projectID { + t.Fatalf("unexpected accountID; got %d; want %d", tsid.ProjectID, projectID) + } + tsids = append(tsids, tsid) + } + + // Add the metrics to the per-day stores + date := baseDate - uint64(day*msecPerDay) + for i := range tsids { + tsid := &tsids[i] + if err := db.storeDateMetricID(date, tsid.MetricID, tsid.AccountID, tsid.ProjectID); err != nil { + t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err) + } + } + + // Add the the hour metrics caches + if day == 0 { + for i := 0; i < 256; i++ { + prevMetricIDs.m.Add(tsids[i].MetricID) + currMetricIDs.m.Add(tsids[i].MetricID) + } + k := accountProjectKey{ + AccountID: accountID, + ProjectID: projectID, + } + prevMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: prevMetricIDs.m} + prevMetricIDs.iidx = newInmemoryInvertedIndex() + prevMetricIDs.iidx.MustUpdate(db, prevMetricIDs.byTenant) + if len(prevMetricIDs.iidx.pendingEntries) > 0 { + t.Fatalf("couldn't add %d metricIDs to inmemory inverted index for the previous hour", len(prevMetricIDs.iidx.pendingEntries)) + } + currMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: currMetricIDs.m} + currMetricIDs.iidx = newInmemoryInvertedIndex() + currMetricIDs.iidx.MustUpdate(db, currMetricIDs.byTenant) + if len(currMetricIDs.iidx.pendingEntries) > 0 { + t.Fatalf("couldn't add %d metricIDs to inmemory inverted index for the current hour", len(currMetricIDs.iidx.pendingEntries)) + } + } + } + + // Flush index to disk, so it becomes visible for search + db.tb.DebugFlush() + + // Create a filter that will match series that occur across multiple days + tfs := NewTagFilters(accountID, projectID) + if err := tfs.Add([]byte("constant"), []byte("const"), false, false); err != nil { + t.Fatalf("cannot add filter: %s", err) + } + + // Perform a search that can be fulfilled out of the hour metrics cache. + // This should return the metrics in the hourly cache + tr := TimeRange{ + MinTimestamp: int64(now - msecPerHour + 1), + MaxTimestamp: int64(now), + } + matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000) + if err != nil { + t.Fatalf("error searching tsids: %v", err) + } + if len(matchedTSIDs) != 256 { + t.Fatalf("Expected %d time series for current hour, got %d", 256, len(matchedTSIDs)) + } + + // Perform a search within a day that falls out out of the hour metrics cache. + // This should return the metrics for the day + tr = TimeRange{ + MinTimestamp: int64(now - 2*msecPerHour - 1), + MaxTimestamp: int64(now), + } + matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000) + if err != nil { + t.Fatalf("error searching tsids: %v", err) + } + if len(matchedTSIDs) != metricsPerDay { + t.Fatalf("Expected %d time series for current day, got %d", metricsPerDay, len(matchedTSIDs)) + } + + // Perform a search across all the days, should match all metrics + tr = TimeRange{ + MinTimestamp: int64(now - msecPerDay*days), + MaxTimestamp: int64(now), + } + + matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000) + if err != nil { + t.Fatalf("error searching tsids: %v", err) + } + if len(matchedTSIDs) != metricsPerDay*days { + t.Fatalf("Expected %d time series for all days, got %d", metricsPerDay*days, len(matchedTSIDs)) + } +} + func toTFPointers(tfs []tagFilter) []*tagFilter { tfps := make([]*tagFilter, len(tfs)) for i := range tfs {