lib/storage: properly take into account already registered series when -storage.maxHourlySeries or -storage.maxDailySeries limits are enabled

The commit 5fb45173ae takes into account only newly registered series
when applying cardinality limits. This means that the cardinality limit could be exceeded with already registered series.
This commit returns back accounting for already registered series when applying cardinality limits.
This commit is contained in:
Aliaksandr Valialkin 2022-06-20 13:47:43 +03:00
parent 3ada676879
commit 270ad39359
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
4 changed files with 52 additions and 28 deletions

View File

@ -537,7 +537,7 @@ type indexSearch struct {
//
// It also registers the metricName in global and per-day indexes
// for the given date if the metricName->TSID entry is missing in the index.
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date uint64) error {
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
// A hack: skip searching for the TSID after many serial misses.
// This should improve insertion performance for big batches
// of new time series.
@ -545,7 +545,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date
err := is.getTSIDByMetricName(dst, metricName)
if err == nil {
is.tsidByNameMisses = 0
return nil
return is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw)
}
if err != io.EOF {
return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err)
@ -562,7 +562,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date
// TSID for the given name wasn't found. Create it.
// It is OK if duplicate TSID for mn is created by concurrent goroutines.
// Metric results will be merged by mn after TableSearch.
if err := is.createTSIDByName(dst, metricName, date); err != nil {
if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {
return fmt.Errorf("cannot create TSID by MetricName %q: %w", metricName, err)
}
return nil
@ -597,7 +597,7 @@ func (db *indexDB) putIndexSearch(is *indexSearch) {
db.indexSearchPool.Put(is)
}
func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint64) error {
func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
mn := GetMetricName()
defer PutMetricName(mn)
if err := mn.Unmarshal(metricName); err != nil {
@ -608,8 +608,8 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint6
if err != nil {
return fmt.Errorf("cannot generate TSID: %w", err)
}
if !is.db.s.registerSeriesCardinality(dst.MetricID, mn) {
return errSeriesCardinalityExceeded
if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
return err
}
if err := is.createGlobalIndexes(dst, mn); err != nil {
return fmt.Errorf("cannot create global indexes: %w", err)
@ -631,8 +631,6 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint6
return nil
}
var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded")
// SetLogNewSeries updates new series logging.
//
// This function must be called before any calling any storage functions.

View File

@ -622,7 +622,8 @@ func testIndexDBBigMetricName(db *indexDB) error {
mn.MetricGroup = append(mn.MetricGroup[:0], bigBytes...)
mn.sortTags()
metricName := mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
metricNameRaw := mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big MetricGroup")
}
@ -635,7 +636,8 @@ func testIndexDBBigMetricName(db *indexDB) error {
}}
mn.sortTags()
metricName = mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
metricNameRaw = mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag key")
}
@ -648,7 +650,8 @@ func testIndexDBBigMetricName(db *indexDB) error {
}}
mn.sortTags()
metricName = mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
metricNameRaw = mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag value")
}
@ -663,7 +666,8 @@ func testIndexDBBigMetricName(db *indexDB) error {
}
mn.sortTags()
metricName = mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
metricNameRaw = mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too many tags")
}
@ -679,6 +683,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
defer db.putIndexSearch(is)
var metricNameBuf []byte
var metricNameRawBuf []byte
for i := 0; i < 4e2+1; i++ {
var mn MetricName
mn.AccountID = uint32((i + 2) % accountsCount)
@ -696,10 +701,11 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
}
mn.sortTags()
metricNameBuf = mn.Marshal(metricNameBuf[:0])
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
// Create tsid for the metricName.
var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, 0); err != nil {
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err)
}
if tsid.AccountID != mn.AccountID {
@ -1701,6 +1707,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
now := uint64(timestampFromTime(theDay))
baseDate := now / msecPerDay
var metricNameBuf []byte
var metricNameRawBuf []byte
perDayMetricIDs := make(map[uint64]*uint64set.Set)
var allMetricIDs uint64set.Set
labelNames := []string{
@ -1733,8 +1740,9 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
mn.sortTags()
metricNameBuf = mn.Marshal(metricNameBuf[:0])
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, 0); err != nil {
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err)
}
if tsid.AccountID != accountID {

View File

@ -88,6 +88,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
var metricName []byte
var metricNameRaw []byte
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
for i := 0; i < recordsPerLoop; i++ {
@ -97,7 +98,8 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs
}
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(tsid, metricName, 0); err != nil {
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
if err := is.GetOrCreateTSIDByName(tsid, metricName, metricNameRaw, 0); err != nil {
panic(fmt.Errorf("cannot insert record: %w", err))
}
}
@ -127,6 +129,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
const projectID = 893433
var mn MetricName
var metricName []byte
var metricNameRaw []byte
var tsid TSID
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
@ -139,7 +142,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
mn.AccountID = accountID
mn.ProjectID = projectID
metricName = mn.Marshal(metricName[:0])
if err := is.createTSIDByName(&tsid, metricName, 0); err != nil {
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
if err := is.createTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
b.Fatalf("cannot insert record: %s", err)
}
}
@ -319,6 +323,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
}
var tsid TSID
var metricName []byte
var metricNameRaw []byte
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
@ -327,7 +332,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
mn.ProjectID = uint32(i % projectsCount)
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err != nil {
metricNameRaw = mn.marshalRaw(metricName[:0])
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
b.Fatalf("cannot insert record: %s", err)
}
}
@ -338,6 +344,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
var tsidLocal TSID
var metricNameLocal []byte
var metricNameLocalRaw []byte
mnLocal := mn
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
@ -347,7 +354,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
mnLocal.ProjectID = uint32(i % projectsCount)
mnLocal.sortTags()
metricNameLocal = mnLocal.Marshal(metricNameLocal[:0])
if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, 0); err != nil {
metricNameLocalRaw = mnLocal.marshalRaw(metricNameLocalRaw[:0])
if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, metricNameLocalRaw, 0); err != nil {
panic(fmt.Errorf("cannot obtain tsid: %w", err))
}
}

View File

@ -1786,6 +1786,9 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
for i := range mrs {
mr := &mrs[i]
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
if err := s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw); err != nil {
continue
}
if genTSID.generation == idb.generation {
// Fast path - mr.MetricNameRaw has been already registered in the current idb.
continue
@ -1798,7 +1801,7 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
date := uint64(mr.Timestamp) / msecPerDay
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, date); err != nil {
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, mr.MetricNameRaw, date); err != nil {
if errors.Is(err, errSeriesCardinalityExceeded) {
continue
}
@ -1871,6 +1874,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
continue
}
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
if err := s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw); err != nil {
j--
continue
}
r.TSID = genTSID.TSID
// Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted.
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
@ -1938,7 +1945,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
}
slowInsertsCount++
date := uint64(r.Timestamp) / msecPerDay
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, date); err != nil {
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
j--
if errors.Is(err, errSeriesCardinalityExceeded) {
continue
@ -1983,26 +1990,29 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
return nil
}
func (s *Storage) registerSeriesCardinality(metricID uint64, mn *MetricName) bool {
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) error {
if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1)
logSkippedSeries(mn, "-storage.maxHourlySeries", sl.MaxItems())
return false
logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems())
return errSeriesCardinalityExceeded
}
if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1)
logSkippedSeries(mn, "-storage.maxDailySeries", sl.MaxItems())
return false
logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems())
return errSeriesCardinalityExceeded
}
return true
return nil
}
func logSkippedSeries(mn *MetricName, flagName string, flagValue int) {
var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded")
func logSkippedSeries(metricNameRaw []byte, flagName string, flagValue int) {
select {
case <-logSkippedSeriesTicker.C:
// Do not use logger.WithThrottler() here, since this will result in increased CPU load
// because of getUserReadableMetricName() calls per each logSkippedSeries call.
logger.Warnf("skip series %s because %s=%d reached", mn, flagName, flagValue)
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
logger.Warnf("skip series %s because %s=%d reached", userReadableMetricName, flagName, flagValue)
default:
}
}