From e1f699bb6ca22f31030ea6802c112a5072cc9b97 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 31 Mar 2021 21:22:40 +0300 Subject: [PATCH] lib/storage: reduce memory usage when ingesting samples for the same time series with distinct order of labels --- app/vmstorage/main.go | 3 ++ docs/CHANGELOG.md | 1 + lib/storage/storage.go | 66 +++++++++++++++++++++++++----------------- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 731fc0002..b23140a8d 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -557,6 +557,9 @@ func registerStorageMetrics() { return float64(m().SearchDelays) }) + metrics.NewGauge(`vm_sorted_row_labels_inserts_total`, func() float64 { + return float64(m().SortedRowLabelsInserts) + }) metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 { return float64(m().SlowRowInserts) }) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c72b82f73..00fea8f42 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,7 @@ # tip +* FEATURE: reduce the size of `MetricName -> internal_series_id` cache (aka `vm_cache_size_bytes{type="storage/tsid"}`) when ingesting samples for the same time series with distinct order of labels. For example, `foo{k1="v1",k2="v2"}` and `foo{k2="v2",k1="v1"}` represent a single time series. Previously VictoriaMetrics could need additional memory when ingesting such samples. The number of ingested samples with distinct order of labels for the same time series can be monitored with `vm_sorted_row_labels_inserts_total` metric. * FEATURE: vmagent: reduce memory usage when `-remoteWrite.queues` is set to a big value. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1167). diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 48af275c1..8300400d5 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -48,6 +48,7 @@ type Storage struct { searchTSIDsConcurrencyLimitReached uint64 searchTSIDsConcurrencyLimitTimeout uint64 + sortedRowLabelsInserts uint64 slowRowInserts uint64 slowPerDayIndexInserts uint64 slowMetricNameLoads uint64 @@ -358,6 +359,7 @@ type Metrics struct { SearchDelays uint64 + SortedRowLabelsInserts uint64 SlowRowInserts uint64 SlowPerDayIndexInserts uint64 SlowMetricNameLoads uint64 @@ -427,6 +429,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.SearchDelays = storagepacelimiter.Search.DelaysTotal() + m.SortedRowLabelsInserts += atomic.LoadUint64(&s.sortedRowLabelsInserts) m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads) @@ -1318,6 +1321,8 @@ func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error { var rowsAddedTotal uint64 // AddRows adds the given mrs to s. +// +// AddRows can modify mrs contents. func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { if len(mrs) == 0 { return nil @@ -1442,6 +1447,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra prevMetricNameRaw []byte ) var pmrs *pendingMetricRows + var mn MetricName + var metricNameRawSorted []byte + var sortedRowLabelsInserts uint64 minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps() // Return only the first error, since it has no sense in returning all errors. var firstWarn error @@ -1485,7 +1493,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra continue } if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) { - // Fast path - the TSID for the given MetricName has been found in cache and isn't deleted. + // Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted. // There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't // contain MetricName->TSID entries for deleted time series. // See Storage.DeleteMetrics code for details. @@ -1494,22 +1502,40 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra continue } + // Slower path - sort labels in MetricNameRaw and check the cache again. + // This should limit the number of cache entries for metrics with distinct order of labels to 1. + if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil { + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) + } + j-- + continue + } + mn.sortTags() + metricNameRawSorted = mn.marshalRaw(metricNameRawSorted[:0]) + if s.getTSIDFromCache(&r.TSID, metricNameRawSorted) { + // The TSID for the given metricNameRawSorted has been found in cache and isn't deleted. + // There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't + // contain MetricName->TSID entries for deleted time series. + // See Storage.DeleteMetrics code for details. + sortedRowLabelsInserts++ + prevTSID = r.TSID + prevMetricNameRaw = mr.MetricNameRaw + continue + } + // Slow path - the TSID is missing in the cache. // Postpone its search in the loop below. j-- if pmrs == nil { pmrs = getPendingMetricRows() } - if err := pmrs.addRow(mr); err != nil { - // Do not stop adding rows on error - just skip invalid row. - // This guarantees that invalid rows don't prevent - // from adding valid rows into the storage. - if firstWarn == nil { - firstWarn = err - } - continue + if string(mr.MetricNameRaw) != string(metricNameRawSorted) { + mr.MetricNameRaw = append(mr.MetricNameRaw[:0], metricNameRawSorted...) } + pmrs.addRow(mr, &mn) } + atomic.AddUint64(&s.sortedRowLabelsInserts, sortedRowLabelsInserts) if pmrs != nil { // Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below. pendingMetricRows := pmrs.pmrs @@ -1533,15 +1559,6 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra r.TSID = prevTSID continue } - if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) { - // Fast path - the TSID for the given MetricName has been found in cache and isn't deleted. - // There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't - // contain MetricName->TSID entries for deleted time series. - // See Storage.DeleteMetrics code for details. - prevTSID = r.TSID - prevMetricNameRaw = mr.MetricNameRaw - continue - } slowInsertsCount++ if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName); err != nil { // Do not stop adding rows on error - just skip invalid row. @@ -1554,6 +1571,8 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra continue } s.putTSIDToCache(&r.TSID, mr.MetricNameRaw) + prevTSID = r.TSID + prevMetricNameRaw = mr.MetricNameRaw } idb.putIndexSearch(is) putPendingMetricRows(pmrs) @@ -1596,7 +1615,6 @@ type pendingMetricRows struct { lastMetricNameRaw []byte lastMetricName []byte - mn MetricName } func (pmrs *pendingMetricRows) reset() { @@ -1608,19 +1626,14 @@ func (pmrs *pendingMetricRows) reset() { pmrs.metricNamesBuf = pmrs.metricNamesBuf[:0] pmrs.lastMetricNameRaw = nil pmrs.lastMetricName = nil - pmrs.mn.Reset() } -func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error { +func (pmrs *pendingMetricRows) addRow(mr *MetricRow, mn *MetricName) { // Do not spend CPU time on re-calculating canonical metricName during bulk import // of many rows for the same metric. if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) { - if err := pmrs.mn.unmarshalRaw(mr.MetricNameRaw); err != nil { - return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) - } - pmrs.mn.sortTags() metricNamesBufLen := len(pmrs.metricNamesBuf) - pmrs.metricNamesBuf = pmrs.mn.Marshal(pmrs.metricNamesBuf) + pmrs.metricNamesBuf = mn.Marshal(pmrs.metricNamesBuf) pmrs.lastMetricName = pmrs.metricNamesBuf[metricNamesBufLen:] pmrs.lastMetricNameRaw = mr.MetricNameRaw } @@ -1628,7 +1641,6 @@ func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error { MetricName: pmrs.lastMetricName, mr: *mr, }) - return nil } func getPendingMetricRows() *pendingMetricRows {