From d442ee46107b8ef7bdaebc561a57a8f0f59a8487 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 9 Jul 2022 00:14:48 +0300 Subject: [PATCH] app/vmselect/netstorage: optimize mergeSortBlocks function - Use binary search instead of linear scan when locating the run of smallest timestamps in blocks with intersected time ranges. This should improve performance when merging blocks with big number of samples - Skip samples with duplicate timestamps. This should increase query performance in cluster version of VictoriaMetrics with the enabled replication. --- app/vmselect/netstorage/netstorage.go | 50 ++++-- app/vmselect/netstorage/netstorage_test.go | 179 +++++++++++++++++++++ docs/CHANGELOG.md | 1 + 3 files changed, 219 insertions(+), 11 deletions(-) create mode 100644 app/vmselect/netstorage/netstorage_test.go diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index e237289780..01755db7c4 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -523,20 +523,20 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) { } sbNext := sbh[0] tsNext := sbNext.Timestamps[sbNext.NextIdx] - idxNext := len(top.Timestamps) - if top.Timestamps[idxNext-1] > tsNext { - idxNext = top.NextIdx - for top.Timestamps[idxNext] <= tsNext { - idxNext++ - } + topTimestamps := top.Timestamps + topNextIdx := top.NextIdx + if n := equalTimestampsPrefix(topTimestamps[topNextIdx:], sbNext.Timestamps[sbNext.NextIdx:]); n > 0 && dedupInterval > 0 { + // Skip n replicated samples at top if deduplication is enabled. + top.NextIdx = topNextIdx + n + } else { + // Copy samples from top to dst with timestamps not exceeding tsNext. + top.NextIdx = topNextIdx + binarySearchTimestamps(topTimestamps[topNextIdx:], tsNext) + dst.Timestamps = append(dst.Timestamps, topTimestamps[topNextIdx:top.NextIdx]...) + dst.Values = append(dst.Values, top.Values[topNextIdx:top.NextIdx]...) } - dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:idxNext]...) - dst.Values = append(dst.Values, top.Values[top.NextIdx:idxNext]...) - if idxNext < len(top.Timestamps) { - top.NextIdx = idxNext + if top.NextIdx < len(topTimestamps) { heap.Push(&sbh, top) } else { - // Return top to the pool. putSortBlock(top) } } @@ -549,6 +549,34 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) { var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`) +func equalTimestampsPrefix(a, b []int64) int { + for i, v := range a { + if i >= len(b) || v != b[i] { + return i + } + } + return len(a) +} + +func binarySearchTimestamps(timestamps []int64, ts int64) int { + // The code has been adapted from sort.Search. + n := len(timestamps) + if n > 0 && timestamps[n-1] < ts { + // Fast path for values scanned in ascending order. + return n + } + i, j := 0, n + for i < j { + h := int(uint(i+j) >> 1) + if h >= 0 && h < len(timestamps) && timestamps[h] <= ts { + i = h + 1 + } else { + j = h + } + } + return i +} + type sortBlock struct { Timestamps []int64 Values []float64 diff --git a/app/vmselect/netstorage/netstorage_test.go b/app/vmselect/netstorage/netstorage_test.go new file mode 100644 index 0000000000..6de0d268d7 --- /dev/null +++ b/app/vmselect/netstorage/netstorage_test.go @@ -0,0 +1,179 @@ +package netstorage + +import ( + "reflect" + "testing" +) + +func TestMergeSortBlocks(t *testing.T) { + f := func(blocks []*sortBlock, dedupInterval int64, expectedResult *Result) { + t.Helper() + var result Result + mergeSortBlocks(&result, blocks, dedupInterval) + if !reflect.DeepEqual(result.Values, expectedResult.Values) { + t.Fatalf("unexpected values;\ngot\n%v\nwant\n%v", result.Values, expectedResult.Values) + } + if !reflect.DeepEqual(result.Timestamps, expectedResult.Timestamps) { + t.Fatalf("unexpected timestamps;\ngot\n%v\nwant\n%v", result.Timestamps, expectedResult.Timestamps) + } + } + + // Zero blocks + f(nil, 1, &Result{}) + + // Single block without samples + f([]*sortBlock{{}}, 1, &Result{}) + + // Single block with a single samples. + f([]*sortBlock{ + { + Timestamps: []int64{1}, + Values: []float64{4.2}, + }, + }, 1, &Result{ + Timestamps: []int64{1}, + Values: []float64{4.2}, + }) + + // Single block with multiple samples. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 3}, + Values: []float64{4.2, 2.1, 10}, + }, + }, 1, &Result{ + Timestamps: []int64{1, 2, 3}, + Values: []float64{4.2, 2.1, 10}, + }) + + // Single block with multiple samples with deduplication. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 3}, + Values: []float64{4.2, 2.1, 10}, + }, + }, 2, &Result{ + Timestamps: []int64{2, 3}, + Values: []float64{2.1, 10}, + }) + + // Multiple blocks without time range intersection. + f([]*sortBlock{ + { + Timestamps: []int64{3, 5}, + Values: []float64{5.2, 6.1}, + }, + { + Timestamps: []int64{1, 2}, + Values: []float64{4.2, 2.1}, + }, + }, 1, &Result{ + Timestamps: []int64{1, 2, 3, 5}, + Values: []float64{4.2, 2.1, 5.2, 6.1}, + }) + + // Multiple blocks with time range intersection. + f([]*sortBlock{ + { + Timestamps: []int64{3, 5}, + Values: []float64{5.2, 6.1}, + }, + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{4.2, 2.1, 42}, + }, + }, 1, &Result{ + Timestamps: []int64{1, 2, 3, 4, 5}, + Values: []float64{4.2, 2.1, 5.2, 42, 6.1}, + }) + + // Multiple blocks with time range inclusion. + f([]*sortBlock{ + { + Timestamps: []int64{0, 3, 5}, + Values: []float64{9, 5.2, 6.1}, + }, + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{4.2, 2.1, 42}, + }, + }, 1, &Result{ + Timestamps: []int64{0, 1, 2, 3, 4, 5}, + Values: []float64{9, 4.2, 2.1, 5.2, 42, 6.1}, + }) + + // Multiple blocks with identical timestamps. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{9, 5.2, 6.1}, + }, + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{4.2, 2.1, 42}, + }, + }, 1, &Result{ + Timestamps: []int64{1, 2, 4}, + Values: []float64{4.2, 2.1, 42}, + }) + + // Multiple blocks with identical timestamps, disabled deduplication. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{9, 5.2, 6.1}, + }, + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{4.2, 2.1, 42}, + }, + }, 0, &Result{ + Timestamps: []int64{1, 1, 2, 2, 4, 4}, + Values: []float64{9, 4.2, 2.1, 5.2, 6.1, 42}, + }) + + // Multiple blocks with identical timestamp ranges. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 5, 10, 11}, + Values: []float64{9, 8, 7, 6, 5}, + }, + { + Timestamps: []int64{1, 2, 4, 10, 11, 12}, + Values: []float64{21, 22, 23, 24, 25, 26}, + }, + }, 1, &Result{ + Timestamps: []int64{1, 2, 4, 5, 10, 11, 12}, + Values: []float64{21, 22, 23, 7, 24, 5, 26}, + }) + + // Multiple blocks with identical timestamp ranges, no deduplication. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 5, 10, 11}, + Values: []float64{9, 8, 7, 6, 5}, + }, + { + Timestamps: []int64{1, 2, 4, 10, 11, 12}, + Values: []float64{21, 22, 23, 24, 25, 26}, + }, + }, 0, &Result{ + Timestamps: []int64{1, 1, 2, 2, 4, 5, 10, 10, 11, 11, 12}, + Values: []float64{9, 21, 22, 8, 23, 7, 6, 24, 25, 5, 26}, + }) + + // Multiple blocks with identical timestamp ranges with deduplication. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 5, 10, 11}, + Values: []float64{9, 8, 7, 6, 5}, + }, + { + Timestamps: []int64{1, 2, 4, 10, 11, 12}, + Values: []float64{21, 22, 23, 24, 25, 26}, + }, + }, 5, &Result{ + Timestamps: []int64{5, 10, 12}, + Values: []float64{7, 24, 26}, + }) +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b251c5e2f7..77561e1d10 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -39,6 +39,7 @@ scrape_configs: * FEATURE: [query tracing](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#query-tracing): show timestamps in query traces in human-readable format (aka `RFC3339` in UTC timezone) instead of milliseconds since Unix epoch. For example, `2022-06-27T10:32:54.506Z` instead of `1656325974506`. This improves traces' readability. * FEATURE: improve performance of [/api/v1/series](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers) requests, which return big number of time series. +* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve query performance when [replication is enabled](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#replication-and-data-safety). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle partial counter resets in [remove_resets](https://docs.victoriametrics.com/MetricsQL.html#remove_resets) function. Now `remove_resets(sum(m))` should returns the expected increasing line when some time series matching `m` disappear on the selected time range. Previously such a query would return horizontal line after the disappeared series. * FEATURE: expose additional histogram metrics at `http://victoriametrics:8428/metrics`, which may help understanding query workload: