lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
package promutils
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"unsafe"
|
|
|
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
|
|
)
|
|
|
|
|
|
|
|
// LabelsCompressor compresses []prompbmarshal.Label into short binary strings
|
|
|
|
type LabelsCompressor struct {
|
|
|
|
labelToIdx sync.Map
|
2024-03-03 22:21:20 +01:00
|
|
|
idxToLabel labelsMap
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
|
|
|
|
nextIdx atomic.Uint64
|
|
|
|
|
|
|
|
totalSizeBytes atomic.Uint64
|
|
|
|
}
|
|
|
|
|
|
|
|
// SizeBytes returns the size of lc data in bytes
|
|
|
|
func (lc *LabelsCompressor) SizeBytes() uint64 {
|
|
|
|
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
|
|
|
|
}
|
|
|
|
|
|
|
|
// ItemsCount returns the number of items in lc
|
|
|
|
func (lc *LabelsCompressor) ItemsCount() uint64 {
|
|
|
|
return lc.nextIdx.Load()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Compress compresses labels, appends the compressed labels to dst and returns the result.
|
2024-03-03 22:21:20 +01:00
|
|
|
//
|
|
|
|
// It is safe calling Compress from concurrent goroutines.
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompbmarshal.Label) []byte {
|
|
|
|
if len(labels) == 0 {
|
|
|
|
// Fast path
|
|
|
|
return append(dst, 0)
|
|
|
|
}
|
|
|
|
|
|
|
|
a := encoding.GetUint64s(len(labels) + 1)
|
|
|
|
a.A[0] = uint64(len(labels))
|
|
|
|
lc.compress(a.A[1:], labels)
|
|
|
|
dst = encoding.MarshalVarUint64s(dst, a.A)
|
|
|
|
encoding.PutUint64s(a)
|
|
|
|
return dst
|
|
|
|
}
|
|
|
|
|
|
|
|
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label) {
|
|
|
|
if len(labels) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
_ = dst[len(labels)-1]
|
2024-03-03 22:21:20 +01:00
|
|
|
for i, label := range labels {
|
|
|
|
v, ok := lc.labelToIdx.Load(label)
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
if !ok {
|
2024-03-03 22:21:20 +01:00
|
|
|
idx := lc.nextIdx.Add(1)
|
|
|
|
v = idx
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
labelCopy := cloneLabel(label)
|
2024-03-03 22:21:20 +01:00
|
|
|
lc.idxToLabel.Store(idx, labelCopy)
|
|
|
|
lc.labelToIdx.Store(labelCopy, v)
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
|
|
|
|
// Update lc.totalSizeBytes
|
|
|
|
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
|
2024-03-03 22:21:20 +01:00
|
|
|
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
lc.totalSizeBytes.Add(entrySizeBytes)
|
|
|
|
}
|
|
|
|
dst[i] = v.(uint64)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-03-03 22:21:20 +01:00
|
|
|
func cloneLabel(label prompbmarshal.Label) prompbmarshal.Label {
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
// pre-allocate memory for label name and value
|
|
|
|
n := len(label.Name) + len(label.Value)
|
|
|
|
buf := make([]byte, 0, n)
|
|
|
|
|
|
|
|
buf = append(buf, label.Name...)
|
|
|
|
labelName := bytesutil.ToUnsafeString(buf)
|
|
|
|
|
|
|
|
buf = append(buf, label.Value...)
|
|
|
|
labelValue := bytesutil.ToUnsafeString(buf[len(labelName):])
|
2024-03-03 22:21:20 +01:00
|
|
|
return prompbmarshal.Label{
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
Name: labelName,
|
|
|
|
Value: labelValue,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Decompress decompresses src into []prompbmarshal.Label, appends it to dst and returns the result.
|
2024-03-03 22:21:20 +01:00
|
|
|
//
|
|
|
|
// It is safe calling Decompress from concurrent goroutines.
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
func (lc *LabelsCompressor) Decompress(dst []prompbmarshal.Label, src []byte) []prompbmarshal.Label {
|
|
|
|
tail, labelsLen, err := encoding.UnmarshalVarUint64(src)
|
|
|
|
if err != nil {
|
|
|
|
logger.Panicf("BUG: cannot unmarshal labels length: %s", err)
|
|
|
|
}
|
|
|
|
if labelsLen == 0 {
|
|
|
|
// fast path - nothing to decode
|
|
|
|
if len(tail) > 0 {
|
|
|
|
logger.Panicf("BUG: unexpected non-empty tail left; len(tail)=%d; tail=%X", len(tail), tail)
|
|
|
|
}
|
|
|
|
return dst
|
|
|
|
}
|
|
|
|
|
|
|
|
a := encoding.GetUint64s(int(labelsLen))
|
|
|
|
tail, err = encoding.UnmarshalVarUint64s(a.A, tail)
|
|
|
|
if err != nil {
|
|
|
|
logger.Panicf("BUG: cannot unmarshal label indexes: %s", err)
|
|
|
|
}
|
|
|
|
if len(tail) > 0 {
|
|
|
|
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
|
|
|
|
}
|
|
|
|
dst = lc.decompress(dst, a.A)
|
|
|
|
encoding.PutUint64s(a)
|
|
|
|
return dst
|
|
|
|
}
|
|
|
|
|
|
|
|
func (lc *LabelsCompressor) decompress(dst []prompbmarshal.Label, src []uint64) []prompbmarshal.Label {
|
|
|
|
for _, idx := range src {
|
2024-03-03 22:21:20 +01:00
|
|
|
label, ok := lc.idxToLabel.Load(idx)
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 01:42:26 +01:00
|
|
|
if !ok {
|
|
|
|
logger.Panicf("BUG: missing label for idx=%d", idx)
|
|
|
|
}
|
|
|
|
dst = append(dst, label)
|
|
|
|
}
|
|
|
|
return dst
|
|
|
|
}
|
2024-03-03 22:21:20 +01:00
|
|
|
|
|
|
|
// labelsMap maps uint64 key to prompbmarshal.Label
|
|
|
|
//
|
|
|
|
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
|
|
|
|
type labelsMap struct {
|
|
|
|
readOnly atomic.Pointer[[]*prompbmarshal.Label]
|
|
|
|
|
|
|
|
mutableLock sync.Mutex
|
|
|
|
mutable map[uint64]*prompbmarshal.Label
|
|
|
|
misses uint64
|
|
|
|
}
|
|
|
|
|
|
|
|
// Store stores label under the given idx.
|
|
|
|
//
|
|
|
|
// It is safe calling Store from concurrent goroutines.
|
|
|
|
func (lm *labelsMap) Store(idx uint64, label prompbmarshal.Label) {
|
|
|
|
lm.mutableLock.Lock()
|
|
|
|
if lm.mutable == nil {
|
|
|
|
lm.mutable = make(map[uint64]*prompbmarshal.Label)
|
|
|
|
}
|
|
|
|
lm.mutable[idx] = &label
|
|
|
|
lm.mutableLock.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Load returns the label for the given idx.
|
|
|
|
//
|
|
|
|
// Load returns false if lm doesn't contain label for the given idx.
|
|
|
|
//
|
|
|
|
// It is safe calling Load from concurrent goroutines.
|
|
|
|
//
|
|
|
|
// The performance of Load() scales linearly with CPU cores.
|
|
|
|
func (lm *labelsMap) Load(idx uint64) (prompbmarshal.Label, bool) {
|
|
|
|
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
|
|
|
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
|
|
|
// Fast path - the label for the given idx has been found in lm.readOnly.
|
|
|
|
return *pLabel, true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Slow path - search in lm.mutable.
|
|
|
|
return lm.loadSlow(idx)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (lm *labelsMap) loadSlow(idx uint64) (prompbmarshal.Label, bool) {
|
|
|
|
lm.mutableLock.Lock()
|
|
|
|
|
|
|
|
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
|
|
|
|
pReadOnly := lm.readOnly.Load()
|
|
|
|
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
|
|
|
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
|
|
|
lm.mutableLock.Unlock()
|
|
|
|
return *pLabel, true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// The label for the idx wasn't found in readOnly. Search it in mutable.
|
|
|
|
lm.misses++
|
|
|
|
pLabel := lm.mutable[idx]
|
|
|
|
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
|
|
|
|
lm.moveMutableToReadOnlyLocked(pReadOnly)
|
|
|
|
lm.misses = 0
|
|
|
|
}
|
|
|
|
lm.mutableLock.Unlock()
|
|
|
|
|
|
|
|
if pLabel == nil {
|
|
|
|
return prompbmarshal.Label{}, false
|
|
|
|
}
|
|
|
|
return *pLabel, true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompbmarshal.Label) {
|
|
|
|
if len(lm.mutable) == 0 {
|
|
|
|
// Nothing to move
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
var labels []*prompbmarshal.Label
|
|
|
|
if pReadOnly != nil {
|
|
|
|
labels = append(labels, *pReadOnly...)
|
|
|
|
}
|
|
|
|
for idx, pLabel := range lm.mutable {
|
|
|
|
if idx < uint64(len(labels)) {
|
|
|
|
labels[idx] = pLabel
|
|
|
|
} else {
|
|
|
|
for idx > uint64(len(labels)) {
|
|
|
|
labels = append(labels, nil)
|
|
|
|
}
|
|
|
|
labels = append(labels, pLabel)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
clear(lm.mutable)
|
|
|
|
lm.readOnly.Store(&labels)
|
|
|
|
}
|