mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-22 00:00:39 +01:00
lib/storage: reset MetricName->TSID cache after deleting time series
This should prevent from adding new data points to deleted time series without the need to check for the deleted time series. This improves ingestion performance a bit when the `deleted time series ids` aka `dmis` set contains big number of time series. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/596 Based on the idea from @n4mine at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/604
This commit is contained in:
parent
77bb0e6595
commit
fe58462bef
@ -942,12 +942,13 @@ func (s *Storage) DeleteMetrics(tfss []*TagFilters) (int, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
|
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
|
||||||
}
|
}
|
||||||
// Do not reset MetricName -> TSID cache (tsidCache), since the obtained
|
// Reset MetricName->TSID cache in order to prevent from adding new data points
|
||||||
// entries must be checked against deleted metricIDs.
|
// to deleted time series in Storage.add.
|
||||||
// See Storage.add for details.
|
s.tsidCache.Reset()
|
||||||
//
|
|
||||||
// Do not reset MetricID -> MetricName cache, since it must be used only
|
// Do not reset MetricID -> MetricName cache, since it must be used only
|
||||||
// after filtering out deleted metricIDs.
|
// after filtering out deleted metricIDs.
|
||||||
|
|
||||||
return deletedCount, nil
|
return deletedCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1127,7 +1128,6 @@ var (
|
|||||||
|
|
||||||
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
|
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
|
||||||
idb := s.idb()
|
idb := s.idb()
|
||||||
dmis := idb.getDeletedMetricIDs()
|
|
||||||
rowsLen := len(rows)
|
rowsLen := len(rows)
|
||||||
if n := rowsLen + len(mrs) - cap(rows); n > 0 {
|
if n := rowsLen + len(mrs) - cap(rows); n > 0 {
|
||||||
rows = append(rows[:cap(rows)], make([]rawRow, n)...)
|
rows = append(rows[:cap(rows)], make([]rawRow, n)...)
|
||||||
@ -1179,7 +1179,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
|||||||
r.TSID = prevTSID
|
r.TSID = prevTSID
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) && !dmis.Has(r.TSID.MetricID) {
|
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
|
||||||
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
|
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
|
||||||
prevTSID = r.TSID
|
prevTSID = r.TSID
|
||||||
prevMetricNameRaw = mr.MetricNameRaw
|
prevMetricNameRaw = mr.MetricNameRaw
|
||||||
@ -1225,7 +1225,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
|||||||
r.TSID = prevTSID
|
r.TSID = prevTSID
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) && !dmis.Has(r.TSID.MetricID) {
|
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
|
||||||
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
|
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
|
||||||
prevTSID = r.TSID
|
prevTSID = r.TSID
|
||||||
prevMetricNameRaw = mr.MetricNameRaw
|
prevMetricNameRaw = mr.MetricNameRaw
|
||||||
|
Loading…
Reference in New Issue
Block a user