diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 73083fa51a..6c4e8d5ec2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -23,6 +23,7 @@ sort: 15 * FEATURE: vmagent: add support for OAuth2 authorization for scrape targets and service discovery in the same way as Prometheus does. See [these docs](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#oauth2). * FEATURE: vmagent: add support for OAuth2 authorization when writing data to `-remoteWrite.url`. See `-remoteWrite.oauth2.*` config params in `/path/to/vmagent -help` output. * FEATURE: vmalert: add ability to set `extra_filter_labels` at alerting and recording group configs. See [these docs](https://docs.victoriametrics.com/vmalert.html#groups). +* FEATURE: vmstorage: reduce memory usage by up to 30% when ingesting big number of active time series. * BUGFIX: vmagent: do not retry scraping targets, which don't support HTTP. This should reduce CPU load and network usage at `vmagent` and at scrape target. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1289). * BUGFIX: vmagent: fix possible race when refreshing `role: endpoints` and `role: endpointslices` scrape targets in `kubernetes_sd_config`. Prevoiusly `pod` objects could be updated after the related `endpoints` object update. This could lead to missing scrape targets. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240). diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 0a7986d237..5248bcabbb 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -588,7 +588,6 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error { if err := db.generateTSID(dst, metricName, mn); err != nil { return fmt.Errorf("cannot generate TSID: %w", err) } - db.putMetricNameToCache(dst.MetricID, metricName) if err := db.createIndexes(dst, mn); err != nil { return fmt.Errorf("cannot create indexes: %w", err) } @@ -3072,7 +3071,7 @@ const ( int64Max = int64((1 << 63) - 1) ) -func (is *indexSearch) storeDateMetricID(date, metricID uint64) error { +func (is *indexSearch) storeDateMetricID(date, metricID uint64, mn *MetricName) error { ii := getIndexItems() defer putIndexItems(ii) @@ -3084,31 +3083,10 @@ func (is *indexSearch) storeDateMetricID(date, metricID uint64) error { // Create per-day inverted index entries for metricID. kb := kbPool.Get() defer kbPool.Put(kb) - mn := GetMetricName() - defer PutMetricName(mn) - var err error - // There is no need in searching for metric name in is.db.extDB, - // Since the storeDateMetricID function is called only after the metricID->metricName - // is added into the current is.db. - kb.B, err = is.searchMetricNameWithCache(kb.B[:0], metricID) - if err != nil { - if err == io.EOF { - logger.Errorf("missing metricName by metricID %d; this could be the case after unclean shutdown; "+ - "deleting the metricID, so it could be re-created next time", metricID) - if err := is.db.deleteMetricIDs([]uint64{metricID}); err != nil { - return fmt.Errorf("cannot delete metricID %d after unclean shutdown: %w", metricID, err) - } - return nil - } - return fmt.Errorf("cannot find metricName by metricID %d: %w", metricID, err) - } - if err = mn.Unmarshal(kb.B); err != nil { - return fmt.Errorf("cannot unmarshal metricName %q obtained by metricID %d: %w", metricID, kb.B, err) - } kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) kb.B = encoding.MarshalUint64(kb.B, date) ii.registerTagIndexes(kb.B, mn, metricID) - if err = is.db.tb.AddItems(ii.Items); err != nil { + if err := is.db.tb.AddItems(ii.Items); err != nil { return fmt.Errorf("cannot add per-day entires for metricID %d: %w", metricID, err) } return nil diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 81843a9272..122824a950 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -725,7 +725,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, tsid := &tsids[i] is.accountID = tsid.AccountID is.projectID = tsid.ProjectID - if err := is.storeDateMetricID(date, tsid.MetricID); err != nil { + if err := is.storeDateMetricID(date, tsid.MetricID, &mns[i]); err != nil { return nil, nil, fmt.Errorf("error in storeDateMetricID(%d, %d, %d, %d): %w", date, tsid.MetricID, tsid.AccountID, tsid.ProjectID, err) } } @@ -1575,6 +1575,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { sort.Strings(tagKeys) for day := 0; day < days; day++ { var tsids []TSID + var mns []MetricName for metric := 0; metric < metricsPerDay; metric++ { var mn MetricName mn.AccountID = accountID @@ -1605,6 +1606,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { if tsid.ProjectID != projectID { t.Fatalf("unexpected accountID; got %d; want %d", tsid.ProjectID, projectID) } + mns = append(mns, mn) tsids = append(tsids, tsid) } @@ -1614,7 +1616,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { for i := range tsids { tsid := &tsids[i] metricIDs.Add(tsid.MetricID) - if err := is.storeDateMetricID(date, tsid.MetricID); err != nil { + if err := is.storeDateMetricID(date, tsid.MetricID, &mns[i]); err != nil { t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err) } } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 907e023e36..7c61d789f3 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1483,7 +1483,7 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { // Add rows to the storage. var err error rr := getRawRowsWithSize(len(mrs)) - rr.rows, err = s.add(rr.rows, mrs, precisionBits) + rr.rows, err = s.add(rr.rows[:0], mrs, precisionBits) putRawRows(rr) <-addRowsConcurrencyCh @@ -1507,9 +1507,10 @@ var ( func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { var ( tsid TSID - mn MetricName metricName []byte ) + mn := GetMetricName() + defer PutMetricName(mn) idb := s.idb() is := idb.getIndexSearch(0, 0, noDeadline) defer idb.putIndexSearch(is) @@ -1549,7 +1550,7 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { } if !ok { // The (date, metricID) entry is missing in the indexDB. Add it there. - if err := is.storeDateMetricID(date, metricID); err != nil { + if err := is.storeDateMetricID(date, metricID, mn); err != nil { return fmt.Errorf("cannot register the metric in per-date inverted index because of error when storing (date=%d, metricID=%d) in database: %w", date, metricID, err) } @@ -1565,11 +1566,11 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) { idb := s.idb() - rowsLen := len(rows) - if n := rowsLen + len(mrs) - cap(rows); n > 0 { + dstMrs := make([]*MetricRow, len(mrs)) + if n := len(mrs) - cap(rows); n > 0 { rows = append(rows[:cap(rows)], make([]rawRow, n)...) } - rows = rows[:rowsLen+len(mrs)] + rows = rows[:len(mrs)] j := 0 var ( // These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName. @@ -1608,7 +1609,8 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra atomic.AddUint64(&s.tooBigTimestampRows, 1) continue } - r := &rows[rowsLen+j] + dstMrs[j] = mr + r := &rows[j] j++ r.Timestamp = mr.Timestamp r.Value = mr.Value @@ -1661,8 +1663,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra var slowInsertsCount uint64 for i := range pendingMetricRows { pmr := &pendingMetricRows[i] - mr := &pmr.mr - r := &rows[rowsLen+j] + mr := pmr.mr + dstMrs[j] = mr + r := &rows[j] j++ r.Timestamp = mr.Timestamp r.Value = mr.Value @@ -1700,13 +1703,14 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra if firstWarn != nil { logger.Warnf("warn occurred during rows addition: %s", firstWarn) } - rows = rows[:rowsLen+j] + dstMrs = dstMrs[:j] + rows = rows[:j] var firstError error if err := s.tb.AddRows(rows); err != nil { firstError = fmt.Errorf("cannot add rows to table: %w", err) } - if err := s.updatePerDateData(rows); err != nil && firstError == nil { + if err := s.updatePerDateData(rows, dstMrs); err != nil && firstError == nil { firstError = fmt.Errorf("cannot update per-date data: %w", err) } if firstError != nil { @@ -1740,7 +1744,8 @@ func logSkippedSeries(metricNameRaw []byte, flagName string, flagValue int) { var logSkippedSeriesTicker = time.NewTicker(5 * time.Second) func getUserReadableMetricName(metricNameRaw []byte) string { - var mn MetricName + mn := GetMetricName() + defer PutMetricName(mn) if err := mn.UnmarshalRaw(metricNameRaw); err != nil { return fmt.Sprintf("cannot unmarshal metricNameRaw %q: %s", metricNameRaw, err) } @@ -1749,7 +1754,7 @@ func getUserReadableMetricName(metricNameRaw []byte) string { type pendingMetricRow struct { MetricName []byte - mr MetricRow + mr *MetricRow } type pendingMetricRows struct { @@ -1764,7 +1769,7 @@ type pendingMetricRows struct { func (pmrs *pendingMetricRows) reset() { for _, pmr := range pmrs.pmrs { pmr.MetricName = nil - pmr.mr.MetricNameRaw = nil + pmr.mr = nil } pmrs.pmrs = pmrs.pmrs[:0] pmrs.metricNamesBuf = pmrs.metricNamesBuf[:0] @@ -1788,7 +1793,7 @@ func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error { } pmrs.pmrs = append(pmrs.pmrs, pendingMetricRow{ MetricName: pmrs.lastMetricName, - mr: *mr, + mr: mr, }) return nil } @@ -1808,7 +1813,7 @@ func putPendingMetricRows(pmrs *pendingMetricRows) { var pendingMetricRowsPool sync.Pool -func (s *Storage) updatePerDateData(rows []rawRow) error { +func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { var date uint64 var hour uint64 var prevTimestamp int64 @@ -1828,6 +1833,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { metricID uint64 accountID uint32 projectID uint32 + mr *MetricRow } var pendingDateMetricIDs []pendingDateMetricID var pendingNextDayMetricIDs []uint64 @@ -1863,6 +1869,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { metricID: metricID, accountID: r.TSID.AccountID, projectID: r.TSID.ProjectID, + mr: mrs[i], }) pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID) } @@ -1890,6 +1897,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { metricID: metricID, accountID: r.TSID.AccountID, projectID: r.TSID.ProjectID, + mr: mrs[i], }) } if len(pendingNextDayMetricIDs) > 0 { @@ -1930,6 +1938,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { defer idb.putIndexSearch(is) var firstError error dateMetricIDsForCache := make([]dateMetricID, 0, len(pendingDateMetricIDs)) + mn := GetMetricName() for _, dmid := range pendingDateMetricIDs { date := dmid.date metricID := dmid.metricID @@ -1947,7 +1956,14 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { // The (date, metricID) entry is missing in the indexDB. Add it there. // It is OK if the (date, metricID) entry is added multiple times to db // by concurrent goroutines. - if err := is.storeDateMetricID(date, metricID); err != nil { + if err := mn.UnmarshalRaw(dmid.mr.MetricNameRaw); err != nil { + if firstError == nil { + firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", dmid.mr.MetricNameRaw, err) + } + continue + } + mn.sortTags() + if err := is.storeDateMetricID(date, metricID, mn); err != nil { if firstError == nil { firstError = fmt.Errorf("error when storing (date=%d, metricID=%d) in database: %w", date, metricID, err) } @@ -1959,6 +1975,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { metricID: metricID, }) } + PutMetricName(mn) // The (date, metricID) entries must be added to cache only after they have been successfully added to indexDB. s.dateMetricIDCache.Store(dateMetricIDsForCache) return firstError