diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index e237289780..01755db7c4 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -523,20 +523,20 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) { } sbNext := sbh[0] tsNext := sbNext.Timestamps[sbNext.NextIdx] - idxNext := len(top.Timestamps) - if top.Timestamps[idxNext-1] > tsNext { - idxNext = top.NextIdx - for top.Timestamps[idxNext] <= tsNext { - idxNext++ - } + topTimestamps := top.Timestamps + topNextIdx := top.NextIdx + if n := equalTimestampsPrefix(topTimestamps[topNextIdx:], sbNext.Timestamps[sbNext.NextIdx:]); n > 0 && dedupInterval > 0 { + // Skip n replicated samples at top if deduplication is enabled. + top.NextIdx = topNextIdx + n + } else { + // Copy samples from top to dst with timestamps not exceeding tsNext. + top.NextIdx = topNextIdx + binarySearchTimestamps(topTimestamps[topNextIdx:], tsNext) + dst.Timestamps = append(dst.Timestamps, topTimestamps[topNextIdx:top.NextIdx]...) + dst.Values = append(dst.Values, top.Values[topNextIdx:top.NextIdx]...) } - dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:idxNext]...) - dst.Values = append(dst.Values, top.Values[top.NextIdx:idxNext]...) - if idxNext < len(top.Timestamps) { - top.NextIdx = idxNext + if top.NextIdx < len(topTimestamps) { heap.Push(&sbh, top) } else { - // Return top to the pool. putSortBlock(top) } } @@ -549,6 +549,34 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) { var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`) +func equalTimestampsPrefix(a, b []int64) int { + for i, v := range a { + if i >= len(b) || v != b[i] { + return i + } + } + return len(a) +} + +func binarySearchTimestamps(timestamps []int64, ts int64) int { + // The code has been adapted from sort.Search. + n := len(timestamps) + if n > 0 && timestamps[n-1] < ts { + // Fast path for values scanned in ascending order. + return n + } + i, j := 0, n + for i < j { + h := int(uint(i+j) >> 1) + if h >= 0 && h < len(timestamps) && timestamps[h] <= ts { + i = h + 1 + } else { + j = h + } + } + return i +} + type sortBlock struct { Timestamps []int64 Values []float64 diff --git a/app/vmselect/netstorage/netstorage_test.go b/app/vmselect/netstorage/netstorage_test.go new file mode 100644 index 0000000000..6de0d268d7 --- /dev/null +++ b/app/vmselect/netstorage/netstorage_test.go @@ -0,0 +1,179 @@ +package netstorage + +import ( + "reflect" + "testing" +) + +func TestMergeSortBlocks(t *testing.T) { + f := func(blocks []*sortBlock, dedupInterval int64, expectedResult *Result) { + t.Helper() + var result Result + mergeSortBlocks(&result, blocks, dedupInterval) + if !reflect.DeepEqual(result.Values, expectedResult.Values) { + t.Fatalf("unexpected values;\ngot\n%v\nwant\n%v", result.Values, expectedResult.Values) + } + if !reflect.DeepEqual(result.Timestamps, expectedResult.Timestamps) { + t.Fatalf("unexpected timestamps;\ngot\n%v\nwant\n%v", result.Timestamps, expectedResult.Timestamps) + } + } + + // Zero blocks + f(nil, 1, &Result{}) + + // Single block without samples + f([]*sortBlock{{}}, 1, &Result{}) + + // Single block with a single samples. + f([]*sortBlock{ + { + Timestamps: []int64{1}, + Values: []float64{4.2}, + }, + }, 1, &Result{ + Timestamps: []int64{1}, + Values: []float64{4.2}, + }) + + // Single block with multiple samples. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 3}, + Values: []float64{4.2, 2.1, 10}, + }, + }, 1, &Result{ + Timestamps: []int64{1, 2, 3}, + Values: []float64{4.2, 2.1, 10}, + }) + + // Single block with multiple samples with deduplication. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 3}, + Values: []float64{4.2, 2.1, 10}, + }, + }, 2, &Result{ + Timestamps: []int64{2, 3}, + Values: []float64{2.1, 10}, + }) + + // Multiple blocks without time range intersection. + f([]*sortBlock{ + { + Timestamps: []int64{3, 5}, + Values: []float64{5.2, 6.1}, + }, + { + Timestamps: []int64{1, 2}, + Values: []float64{4.2, 2.1}, + }, + }, 1, &Result{ + Timestamps: []int64{1, 2, 3, 5}, + Values: []float64{4.2, 2.1, 5.2, 6.1}, + }) + + // Multiple blocks with time range intersection. + f([]*sortBlock{ + { + Timestamps: []int64{3, 5}, + Values: []float64{5.2, 6.1}, + }, + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{4.2, 2.1, 42}, + }, + }, 1, &Result{ + Timestamps: []int64{1, 2, 3, 4, 5}, + Values: []float64{4.2, 2.1, 5.2, 42, 6.1}, + }) + + // Multiple blocks with time range inclusion. + f([]*sortBlock{ + { + Timestamps: []int64{0, 3, 5}, + Values: []float64{9, 5.2, 6.1}, + }, + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{4.2, 2.1, 42}, + }, + }, 1, &Result{ + Timestamps: []int64{0, 1, 2, 3, 4, 5}, + Values: []float64{9, 4.2, 2.1, 5.2, 42, 6.1}, + }) + + // Multiple blocks with identical timestamps. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{9, 5.2, 6.1}, + }, + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{4.2, 2.1, 42}, + }, + }, 1, &Result{ + Timestamps: []int64{1, 2, 4}, + Values: []float64{4.2, 2.1, 42}, + }) + + // Multiple blocks with identical timestamps, disabled deduplication. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{9, 5.2, 6.1}, + }, + { + Timestamps: []int64{1, 2, 4}, + Values: []float64{4.2, 2.1, 42}, + }, + }, 0, &Result{ + Timestamps: []int64{1, 1, 2, 2, 4, 4}, + Values: []float64{9, 4.2, 2.1, 5.2, 6.1, 42}, + }) + + // Multiple blocks with identical timestamp ranges. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 5, 10, 11}, + Values: []float64{9, 8, 7, 6, 5}, + }, + { + Timestamps: []int64{1, 2, 4, 10, 11, 12}, + Values: []float64{21, 22, 23, 24, 25, 26}, + }, + }, 1, &Result{ + Timestamps: []int64{1, 2, 4, 5, 10, 11, 12}, + Values: []float64{21, 22, 23, 7, 24, 5, 26}, + }) + + // Multiple blocks with identical timestamp ranges, no deduplication. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 5, 10, 11}, + Values: []float64{9, 8, 7, 6, 5}, + }, + { + Timestamps: []int64{1, 2, 4, 10, 11, 12}, + Values: []float64{21, 22, 23, 24, 25, 26}, + }, + }, 0, &Result{ + Timestamps: []int64{1, 1, 2, 2, 4, 5, 10, 10, 11, 11, 12}, + Values: []float64{9, 21, 22, 8, 23, 7, 6, 24, 25, 5, 26}, + }) + + // Multiple blocks with identical timestamp ranges with deduplication. + f([]*sortBlock{ + { + Timestamps: []int64{1, 2, 5, 10, 11}, + Values: []float64{9, 8, 7, 6, 5}, + }, + { + Timestamps: []int64{1, 2, 4, 10, 11, 12}, + Values: []float64{21, 22, 23, 24, 25, 26}, + }, + }, 5, &Result{ + Timestamps: []int64{5, 10, 12}, + Values: []float64{7, 24, 26}, + }) +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b251c5e2f7..77561e1d10 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -39,6 +39,7 @@ scrape_configs: * FEATURE: [query tracing](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#query-tracing): show timestamps in query traces in human-readable format (aka `RFC3339` in UTC timezone) instead of milliseconds since Unix epoch. For example, `2022-06-27T10:32:54.506Z` instead of `1656325974506`. This improves traces' readability. * FEATURE: improve performance of [/api/v1/series](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers) requests, which return big number of time series. +* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve query performance when [replication is enabled](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#replication-and-data-safety). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle partial counter resets in [remove_resets](https://docs.victoriametrics.com/MetricsQL.html#remove_resets) function. Now `remove_resets(sum(m))` should returns the expected increasing line when some time series matching `m` disappear on the selected time range. Previously such a query would return horizontal line after the disappeared series. * FEATURE: expose additional histogram metrics at `http://victoriametrics:8428/metrics`, which may help understanding query workload: