mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 16:30:55 +01:00
lib/storage: move series registration in caches from createAllIndexesForMetricName into a separate function - putSeriesToCache
This makes the code more clear and easier to read
This is a follow-up for 7094fa38bc
This commit is contained in:
parent
a15a66ee89
commit
3d23fd9853
@ -600,7 +600,6 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
|
||||
date := uint64(timestampFromTime(time.Now())) / msecPerDay
|
||||
|
||||
var metricNameBuf []byte
|
||||
var metricNameRawBuf []byte
|
||||
for i := 0; i < 401; i++ {
|
||||
var mn MetricName
|
||||
mn.AccountID = uint32((i + 2) % accountsCount)
|
||||
@ -620,14 +619,12 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
|
||||
}
|
||||
mn.sortTags()
|
||||
metricNameBuf = mn.Marshal(metricNameBuf[:0])
|
||||
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
|
||||
|
||||
// Create tsid for the metricName.
|
||||
var genTSID generationTSID
|
||||
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
|
||||
generateTSID(&genTSID.TSID, &mn)
|
||||
genTSID.generation = db.generation
|
||||
db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date)
|
||||
createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date)
|
||||
}
|
||||
if genTSID.TSID.AccountID != mn.AccountID {
|
||||
return nil, nil, nil, fmt.Errorf("unexpected TSID.AccountID; got %d; want %d; mn:\n%s\ntsid:\n%+v", genTSID.TSID.AccountID, mn.AccountID, &mn, &genTSID.TSID)
|
||||
@ -1638,7 +1635,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||
now := uint64(timestampFromTime(theDay))
|
||||
baseDate := now / msecPerDay
|
||||
var metricNameBuf []byte
|
||||
var metricNameRawBuf []byte
|
||||
perDayMetricIDs := make(map[uint64]*uint64set.Set)
|
||||
var allMetricIDs uint64set.Set
|
||||
labelNames := []string{
|
||||
@ -1675,12 +1671,10 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||
mn.sortTags()
|
||||
|
||||
metricNameBuf = mn.Marshal(metricNameBuf[:0])
|
||||
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
|
||||
var genTSID generationTSID
|
||||
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
|
||||
generateTSID(&genTSID.TSID, &mn)
|
||||
genTSID.generation = db.generation
|
||||
db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date)
|
||||
createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date)
|
||||
}
|
||||
if genTSID.TSID.AccountID != accountID {
|
||||
t.Fatalf("unexpected accountID; got %d; want %d", genTSID.TSID.AccountID, accountID)
|
||||
|
@ -80,7 +80,6 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
||||
|
||||
func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricName, startOffset, recordsPerLoop int) {
|
||||
date := uint64(0)
|
||||
var metricNameRaw []byte
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
defer db.putIndexSearch(is)
|
||||
for i := 0; i < recordsPerLoop; i++ {
|
||||
@ -90,10 +89,8 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricNa
|
||||
}
|
||||
mn.sortTags()
|
||||
|
||||
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
|
||||
generateTSID(&genTSID.TSID, mn)
|
||||
genTSID.generation = db.generation
|
||||
db.s.createAllIndexesForMetricName(is, mn, metricNameRaw, genTSID, date)
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,7 +107,6 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
defer db.putIndexSearch(is)
|
||||
var mn MetricName
|
||||
var metricNameRaw []byte
|
||||
var genTSID generationTSID
|
||||
date := uint64(0)
|
||||
addSeries := func(kvs ...string) {
|
||||
@ -121,10 +117,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
||||
mn.sortTags()
|
||||
mn.AccountID = accountID
|
||||
mn.ProjectID = projectID
|
||||
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
|
||||
generateTSID(&genTSID.TSID, &mn)
|
||||
genTSID.generation = db.generation
|
||||
db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date)
|
||||
createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date)
|
||||
}
|
||||
for n := 0; n < 10; n++ {
|
||||
ns := strconv.Itoa(n)
|
||||
@ -294,7 +288,6 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||
mn.sortTags()
|
||||
|
||||
var genTSID generationTSID
|
||||
var metricNameRaw []byte
|
||||
date := uint64(12345)
|
||||
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
@ -303,10 +296,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||
for i := 0; i < recordsCount; i++ {
|
||||
mn.AccountID = uint32(i % accountsCount)
|
||||
mn.ProjectID = uint32(i % projectsCount)
|
||||
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
|
||||
generateTSID(&genTSID.TSID, &mn)
|
||||
genTSID.generation = db.generation
|
||||
db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date)
|
||||
createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date)
|
||||
}
|
||||
db.s.DebugFlush()
|
||||
|
||||
|
@ -1687,8 +1687,9 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
||||
}
|
||||
mn.sortTags()
|
||||
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
seriesRepopulated++
|
||||
}
|
||||
continue
|
||||
@ -1719,13 +1720,11 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
||||
|
||||
if genTSID.generation != idb.generation {
|
||||
// The found TSID is from the previous indexdb. Create it in the current indexdb.
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
seriesRepopulated++
|
||||
} else {
|
||||
// Store the found TSID in the cache, so future rows for that TSID are ingested via fast path.
|
||||
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
||||
}
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -1739,8 +1738,9 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
||||
|
||||
// Schedule creating TSID indexes instead of creating them synchronously.
|
||||
// This should keep stable the ingestion rate when new time series are ingested.
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
}
|
||||
|
||||
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
|
||||
@ -1848,8 +1848,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||
}
|
||||
mn.sortTags()
|
||||
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
seriesRepopulated++
|
||||
slowInsertsCount++
|
||||
}
|
||||
@ -1884,13 +1885,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||
|
||||
if genTSID.generation != idb.generation {
|
||||
// The found TSID is from the previous indexdb. Create it in the current indexdb.
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
seriesRepopulated++
|
||||
} else {
|
||||
// Store the found TSID in the cache, so future rows for that TSID are ingested via fast path.
|
||||
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
||||
}
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
|
||||
r.TSID = genTSID.TSID
|
||||
prevTSID = genTSID.TSID
|
||||
@ -1907,8 +1906,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||
continue
|
||||
}
|
||||
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
newSeriesCount++
|
||||
|
||||
r.TSID = genTSID.TSID
|
||||
@ -1953,10 +1953,12 @@ func SetLogNewSeries(ok bool) {
|
||||
|
||||
var logNewSeries = false
|
||||
|
||||
func (s *Storage) createAllIndexesForMetricName(is *indexSearch, mn *MetricName, metricNameRaw []byte, genTSID *generationTSID, date uint64) {
|
||||
is.createGlobalIndexes(&genTSID.TSID, mn)
|
||||
is.createPerDayIndexes(date, &genTSID.TSID, mn)
|
||||
func createAllIndexesForMetricName(is *indexSearch, mn *MetricName, tsid *TSID, date uint64) {
|
||||
is.createGlobalIndexes(tsid, mn)
|
||||
is.createPerDayIndexes(date, tsid, mn)
|
||||
}
|
||||
|
||||
func (s *Storage) putSeriesToCache(metricNameRaw []byte, genTSID *generationTSID, date uint64) {
|
||||
// Store the TSID for for the current indexdb into cache,
|
||||
// so future rows for that TSID are ingested via fast path.
|
||||
s.putTSIDToCache(genTSID, metricNameRaw)
|
||||
|
Loading…
Reference in New Issue
Block a user