From dc9eafcd023d569b2283f9c77bfdecbbf79639f8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 31 Mar 2021 23:12:56 +0300 Subject: [PATCH] app/{vminsert,vmagent}: add `-sortLabels` command-line option for sorting time series labels before ingesting them in the storage This option can be useful when samples for the same time series are ingested with distinct order of labels. For example, metric{k1="v1",k2="v2"} and metric{k2="v2",k1="v1"}. --- README.md | 13 +++-- app/vmagent/README.md | 2 + app/vmagent/remotewrite/pendingseries.go | 1 + app/vmagent/remotewrite/sort_labels.go | 51 ++++++++++++++++++ app/vminsert/common/insert_ctx.go | 2 +- app/vminsert/common/sort_labels.go | 32 +++++++++++ app/vminsert/csvimport/request_handler.go | 1 + app/vminsert/graphite/request_handler.go | 1 + app/vminsert/influx/request_handler.go | 2 + app/vminsert/native/request_handler.go | 1 + app/vminsert/opentsdb/request_handler.go | 1 + app/vminsert/opentsdbhttp/request_handler.go | 1 + .../prometheusimport/request_handler.go | 1 + app/vminsert/prompush/push.go | 1 + .../promremotewrite/request_handler.go | 1 + app/vminsert/vmimport/request_handler.go | 1 + app/vmstorage/main.go | 3 -- docs/CHANGELOG.md | 2 +- docs/Single-server-VictoriaMetrics.md | 13 +++-- docs/vmagent.md | 2 + lib/storage/storage.go | 53 ++++++------------- 21 files changed, 136 insertions(+), 49 deletions(-) create mode 100644 app/vmagent/remotewrite/sort_labels.go create mode 100644 app/vminsert/common/sort_labels.go diff --git a/README.md b/README.md index 793818290..aed76c314 100644 --- a/README.md +++ b/README.md @@ -1324,6 +1324,8 @@ See the example of alerting rules for VM components [here](https://github.com/Vi * It is recommended to use default command-line flag values (i.e. don't set them explicitly) until the need of tweaking these flag values arises. +* It is recommended inspecting logs during troubleshooting, since they may contain useful information. + * It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases), since the encountered issue could be already fixed there. @@ -1338,8 +1340,6 @@ See the example of alerting rules for VM components [here](https://github.com/Vi if background merge cannot be initiated due to free disk space shortage. The value shows the number of per-month partitions, which would start background merge if they had more free disk space. -* It is recommended inspecting logs during troubleshooting, since they may contain useful information. - * VictoriaMetrics buffers incoming data in memory for up to a few seconds before flushing it to persistent storage. This may lead to the following "issues": * Data becomes available for querying in a few seconds after inserting. It is possible to flush in-memory buffers to persistent storage @@ -1349,10 +1349,13 @@ See the example of alerting rules for VM components [here](https://github.com/Vi * If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second, then it is likely you have too many active time series for the current amount of RAM. - VictoriaMetrics [exposes](#monitoring) `vm_slow_*` metrics, which could be used as an indicator of low amounts of RAM. - It is recommended increasing the amount of RAM on the node with VictoriaMetrics in order to improve + VictoriaMetrics [exposes](#monitoring) `vm_slow_*` metrics such as `vm_slow_row_inserts_total` and `vm_slow_metric_name_loads_total`, which could be used + as an indicator of low amounts of RAM. It is recommended increasing the amount of RAM on the node with VictoriaMetrics in order to improve ingestion and query performance in this case. +* If the order of labels for the same metrics can change over time (e.g. if `metric{k1="v1",k2="v2"}` may become `metric{k2="v2",k1="v1"}`), + then it is recommended running VictoriaMetrics with `-sortLabels` command-line flag in order to reduce memory usage and CPU usage. + * VictoriaMetrics prioritizes data ingestion over data querying. So if it has no enough resources for data ingestion, then data querying may slow down significantly. @@ -1790,6 +1793,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The maximum number of CPU cores to use for small merges. Default value is used if set to 0 -snapshotAuthKey string authKey, which must be passed in query string to /snapshot* pages + -sortLabels + Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit -storageDataPath string Path to storage data (default "victoria-metrics-data") -tls diff --git a/app/vmagent/README.md b/app/vmagent/README.md index cf52fb800..815a0d030 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -702,6 +702,8 @@ See the docs at https://victoriametrics.github.io/vmagent.html . -remoteWrite.urlRelabelConfig array Optional path to relabel config for the corresponding -remoteWrite.url Supports array of values separated by comma or specified via multiple flags. + -sortLabels + Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit -tls Whether to enable TLS (aka HTTPS) for incoming requests. -tlsCertFile and -tlsKeyFile must be set if -tls is set -tlsCertFile string diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 23dc72a8a..2d6cf12c0 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -128,6 +128,7 @@ func (wr *writeRequest) reset() { } func (wr *writeRequest) flush() { + sortLabelsIfNeeded(wr.tss) wr.wr.Timeseries = wr.tss wr.adjustSampleValues() atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp()) diff --git a/app/vmagent/remotewrite/sort_labels.go b/app/vmagent/remotewrite/sort_labels.go new file mode 100644 index 000000000..e9ec252bd --- /dev/null +++ b/app/vmagent/remotewrite/sort_labels.go @@ -0,0 +1,51 @@ +package remotewrite + +import ( + "flag" + "sort" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. `+ + `This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. `+ + `For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+ + `Enabled sorting for labels can slow down ingestion performance a bit`) + +// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set. +func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) { + if !*sortLabels { + return + } + // The slc is used for avoiding memory allocation when passing labels to sort.Sort. + slc := sortLabelsCtxPool.Get().(*sortLabelsCtx) + for i := range tss { + slc.labels = tss[i].Labels + sort.Sort(&slc.labels) + } + slc.labels = nil + sortLabelsCtxPool.Put(slc) +} + +type sortLabelsCtx struct { + labels sortedLabels +} + +var sortLabelsCtxPool = &sync.Pool{ + New: func() interface{} { + return &sortLabelsCtx{} + }, +} + +type sortedLabels []prompbmarshal.Label + +func (sl *sortedLabels) Len() int { return len(*sl) } +func (sl *sortedLabels) Less(i, j int) bool { + a := *sl + return a[i].Name < a[j].Name +} +func (sl *sortedLabels) Swap(i, j int) { + a := *sl + a[i], a[j] = a[j], a[i] +} diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index ce6ff849f..e62cf267c 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -14,7 +14,7 @@ import ( // InsertCtx contains common bits for data points insertion. type InsertCtx struct { - Labels []prompb.Label + Labels sortedLabels mrs []storage.MetricRow metricNamesBuf []byte diff --git a/app/vminsert/common/sort_labels.go b/app/vminsert/common/sort_labels.go new file mode 100644 index 000000000..16fa88cc0 --- /dev/null +++ b/app/vminsert/common/sort_labels.go @@ -0,0 +1,32 @@ +package common + +import ( + "flag" + "sort" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" +) + +var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to storage. `+ + `This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. `+ + `For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. `+ + `Enabled sorting for labels can slow down ingestion performance a bit`) + +// SortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set +func (ctx *InsertCtx) SortLabelsIfNeeded() { + if *sortLabels { + sort.Sort(&ctx.Labels) + } +} + +type sortedLabels []prompb.Label + +func (sl *sortedLabels) Len() int { return len(*sl) } +func (sl *sortedLabels) Less(i, j int) bool { + a := *sl + return string(a[i].Name) < string(a[j].Name) +} +func (sl *sortedLabels) Swap(i, j int) { + a := *sl + a[i], a[j] = a[j], a[i] +} diff --git a/app/vminsert/csvimport/request_handler.go b/app/vminsert/csvimport/request_handler.go index fc858936e..6997dd4cb 100644 --- a/app/vminsert/csvimport/request_handler.go +++ b/app/vminsert/csvimport/request_handler.go @@ -55,6 +55,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { // Skip metric without labels. continue } + ctx.SortLabelsIfNeeded() if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index 84532485c..5aa24f929 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -45,6 +45,7 @@ func insertRows(rows []parser.Row) error { // Skip metric without labels. continue } + ctx.SortLabelsIfNeeded() if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 784c2c3ec..f1c81bf80 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -117,11 +117,13 @@ func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) // Skip metric without labels. continue } + ic.SortLabelsIfNeeded() if err := ic.WriteDataPoint(nil, ic.Labels, r.Timestamp, f.Value); err != nil { return err } } } else { + ic.SortLabelsIfNeeded() ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels) labelsLen := len(ic.Labels) for j := range r.Fields { diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go index cb36025dc..78cbe2d8e 100644 --- a/app/vminsert/native/request_handler.go +++ b/app/vminsert/native/request_handler.go @@ -65,6 +65,7 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { // Skip metric without labels. return nil } + ic.SortLabelsIfNeeded() ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels) values := block.Values timestamps := block.Timestamps diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 852708862..49a6157a1 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -45,6 +45,7 @@ func insertRows(rows []parser.Row) error { // Skip metric without labels. continue } + ctx.SortLabelsIfNeeded() if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index 83fc33729..b5927ecc7 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -63,6 +63,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { // Skip metric without labels. continue } + ctx.SortLabelsIfNeeded() if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index cc916e8b5..aa65b7da5 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -60,6 +60,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { // Skip metric without labels. continue } + ctx.SortLabelsIfNeeded() if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } diff --git a/app/vminsert/prompush/push.go b/app/vminsert/prompush/push.go index 1c6ebe0d7..40d495971 100644 --- a/app/vminsert/prompush/push.go +++ b/app/vminsert/prompush/push.go @@ -62,6 +62,7 @@ func push(ctx *common.InsertCtx, tss []prompbmarshal.TimeSeries) { // Skip metric without labels. continue } + ctx.SortLabelsIfNeeded() var metricNameRaw []byte var err error for i := range ts.Samples { diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index 15f3895ed..e56f12e25 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -61,6 +61,7 @@ func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Labe // Skip metric without labels. continue } + ctx.SortLabelsIfNeeded() var metricNameRaw []byte var err error samples := ts.Samples diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 64002b4d4..5b4332ac9 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -67,6 +67,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { // Skip metric without labels. continue } + ic.SortLabelsIfNeeded() ctx.metricNameBuf = storage.MarshalMetricNameRaw(ctx.metricNameBuf[:0], ic.Labels) values := r.Values timestamps := r.Timestamps diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index b23140a8d..731fc0002 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -557,9 +557,6 @@ func registerStorageMetrics() { return float64(m().SearchDelays) }) - metrics.NewGauge(`vm_sorted_row_labels_inserts_total`, func() float64 { - return float64(m().SortedRowLabelsInserts) - }) metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 { return float64(m().SlowRowInserts) }) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 00fea8f42..36285e2c8 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,7 +2,7 @@ # tip -* FEATURE: reduce the size of `MetricName -> internal_series_id` cache (aka `vm_cache_size_bytes{type="storage/tsid"}`) when ingesting samples for the same time series with distinct order of labels. For example, `foo{k1="v1",k2="v2"}` and `foo{k2="v2",k1="v1"}` represent a single time series. Previously VictoriaMetrics could need additional memory when ingesting such samples. The number of ingested samples with distinct order of labels for the same time series can be monitored with `vm_sorted_row_labels_inserts_total` metric. +* FEATURE: vminsert and vmagent: add `-sortLabels` command-line flag for sorting metric labels before pushing them to `vmstorage`. This should reduce the size of `MetricName -> internal_series_id` cache (aka `vm_cache_size_bytes{type="storage/tsid"}`) when ingesting samples for the same time series with distinct order of labels. For example, `foo{k1="v1",k2="v2"}` and `foo{k2="v2",k1="v1"}` represent a single time series. * FEATURE: vmagent: reduce memory usage when `-remoteWrite.queues` is set to a big value. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1167). diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 793818290..aed76c314 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -1324,6 +1324,8 @@ See the example of alerting rules for VM components [here](https://github.com/Vi * It is recommended to use default command-line flag values (i.e. don't set them explicitly) until the need of tweaking these flag values arises. +* It is recommended inspecting logs during troubleshooting, since they may contain useful information. + * It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases), since the encountered issue could be already fixed there. @@ -1338,8 +1340,6 @@ See the example of alerting rules for VM components [here](https://github.com/Vi if background merge cannot be initiated due to free disk space shortage. The value shows the number of per-month partitions, which would start background merge if they had more free disk space. -* It is recommended inspecting logs during troubleshooting, since they may contain useful information. - * VictoriaMetrics buffers incoming data in memory for up to a few seconds before flushing it to persistent storage. This may lead to the following "issues": * Data becomes available for querying in a few seconds after inserting. It is possible to flush in-memory buffers to persistent storage @@ -1349,10 +1349,13 @@ See the example of alerting rules for VM components [here](https://github.com/Vi * If VictoriaMetrics works slowly and eats more than a CPU core per 100K ingested data points per second, then it is likely you have too many active time series for the current amount of RAM. - VictoriaMetrics [exposes](#monitoring) `vm_slow_*` metrics, which could be used as an indicator of low amounts of RAM. - It is recommended increasing the amount of RAM on the node with VictoriaMetrics in order to improve + VictoriaMetrics [exposes](#monitoring) `vm_slow_*` metrics such as `vm_slow_row_inserts_total` and `vm_slow_metric_name_loads_total`, which could be used + as an indicator of low amounts of RAM. It is recommended increasing the amount of RAM on the node with VictoriaMetrics in order to improve ingestion and query performance in this case. +* If the order of labels for the same metrics can change over time (e.g. if `metric{k1="v1",k2="v2"}` may become `metric{k2="v2",k1="v1"}`), + then it is recommended running VictoriaMetrics with `-sortLabels` command-line flag in order to reduce memory usage and CPU usage. + * VictoriaMetrics prioritizes data ingestion over data querying. So if it has no enough resources for data ingestion, then data querying may slow down significantly. @@ -1790,6 +1793,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The maximum number of CPU cores to use for small merges. Default value is used if set to 0 -snapshotAuthKey string authKey, which must be passed in query string to /snapshot* pages + -sortLabels + Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit -storageDataPath string Path to storage data (default "victoria-metrics-data") -tls diff --git a/docs/vmagent.md b/docs/vmagent.md index cf52fb800..815a0d030 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -702,6 +702,8 @@ See the docs at https://victoriametrics.github.io/vmagent.html . -remoteWrite.urlRelabelConfig array Optional path to relabel config for the corresponding -remoteWrite.url Supports array of values separated by comma or specified via multiple flags. + -sortLabels + Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit -tls Whether to enable TLS (aka HTTPS) for incoming requests. -tlsCertFile and -tlsKeyFile must be set if -tls is set -tlsCertFile string diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 8300400d5..7c5c82911 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -48,7 +48,6 @@ type Storage struct { searchTSIDsConcurrencyLimitReached uint64 searchTSIDsConcurrencyLimitTimeout uint64 - sortedRowLabelsInserts uint64 slowRowInserts uint64 slowPerDayIndexInserts uint64 slowMetricNameLoads uint64 @@ -359,7 +358,6 @@ type Metrics struct { SearchDelays uint64 - SortedRowLabelsInserts uint64 SlowRowInserts uint64 SlowPerDayIndexInserts uint64 SlowMetricNameLoads uint64 @@ -429,7 +427,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.SearchDelays = storagepacelimiter.Search.DelaysTotal() - m.SortedRowLabelsInserts += atomic.LoadUint64(&s.sortedRowLabelsInserts) m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads) @@ -1321,8 +1318,6 @@ func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error { var rowsAddedTotal uint64 // AddRows adds the given mrs to s. -// -// AddRows can modify mrs contents. func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { if len(mrs) == 0 { return nil @@ -1447,9 +1442,6 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra prevMetricNameRaw []byte ) var pmrs *pendingMetricRows - var mn MetricName - var metricNameRawSorted []byte - var sortedRowLabelsInserts uint64 minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps() // Return only the first error, since it has no sense in returning all errors. var firstWarn error @@ -1502,40 +1494,22 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra continue } - // Slower path - sort labels in MetricNameRaw and check the cache again. - // This should limit the number of cache entries for metrics with distinct order of labels to 1. - if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil { - if firstWarn == nil { - firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) - } - j-- - continue - } - mn.sortTags() - metricNameRawSorted = mn.marshalRaw(metricNameRawSorted[:0]) - if s.getTSIDFromCache(&r.TSID, metricNameRawSorted) { - // The TSID for the given metricNameRawSorted has been found in cache and isn't deleted. - // There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't - // contain MetricName->TSID entries for deleted time series. - // See Storage.DeleteMetrics code for details. - sortedRowLabelsInserts++ - prevTSID = r.TSID - prevMetricNameRaw = mr.MetricNameRaw - continue - } - // Slow path - the TSID is missing in the cache. // Postpone its search in the loop below. j-- if pmrs == nil { pmrs = getPendingMetricRows() } - if string(mr.MetricNameRaw) != string(metricNameRawSorted) { - mr.MetricNameRaw = append(mr.MetricNameRaw[:0], metricNameRawSorted...) + if err := pmrs.addRow(mr); err != nil { + // Do not stop adding rows on error - just skip invalid row. + // This guarantees that invalid rows don't prevent + // from adding valid rows into the storage. + if firstWarn == nil { + firstWarn = err + } + continue } - pmrs.addRow(mr, &mn) } - atomic.AddUint64(&s.sortedRowLabelsInserts, sortedRowLabelsInserts) if pmrs != nil { // Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below. pendingMetricRows := pmrs.pmrs @@ -1615,6 +1589,7 @@ type pendingMetricRows struct { lastMetricNameRaw []byte lastMetricName []byte + mn MetricName } func (pmrs *pendingMetricRows) reset() { @@ -1626,14 +1601,19 @@ func (pmrs *pendingMetricRows) reset() { pmrs.metricNamesBuf = pmrs.metricNamesBuf[:0] pmrs.lastMetricNameRaw = nil pmrs.lastMetricName = nil + pmrs.mn.Reset() } -func (pmrs *pendingMetricRows) addRow(mr *MetricRow, mn *MetricName) { +func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error { // Do not spend CPU time on re-calculating canonical metricName during bulk import // of many rows for the same metric. if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) { + if err := pmrs.mn.unmarshalRaw(mr.MetricNameRaw); err != nil { + return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) + } + pmrs.mn.sortTags() metricNamesBufLen := len(pmrs.metricNamesBuf) - pmrs.metricNamesBuf = mn.Marshal(pmrs.metricNamesBuf) + pmrs.metricNamesBuf = pmrs.mn.Marshal(pmrs.metricNamesBuf) pmrs.lastMetricName = pmrs.metricNamesBuf[metricNamesBufLen:] pmrs.lastMetricNameRaw = mr.MetricNameRaw } @@ -1641,6 +1621,7 @@ func (pmrs *pendingMetricRows) addRow(mr *MetricRow, mn *MetricName) { MetricName: pmrs.lastMetricName, mr: *mr, }) + return nil } func getPendingMetricRows() *pendingMetricRows {