app/vmselect/netstorage: optimize mergeSortBlocks function

- Use binary search instead of linear scan when locating the run of smallest timestamps
  in blocks with intersected time ranges. This should improve performance
  when merging blocks with big number of samples

- Skip samples with duplicate timestamps. This should increase query performance
  in cluster version of VictoriaMetrics with the enabled replication.
This commit is contained in:
Aliaksandr Valialkin 2022-07-09 00:14:48 +03:00
parent 78da074ad3
commit d442ee4610
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
3 changed files with 219 additions and 11 deletions

View File

@ -523,20 +523,20 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) {
}
sbNext := sbh[0]
tsNext := sbNext.Timestamps[sbNext.NextIdx]
idxNext := len(top.Timestamps)
if top.Timestamps[idxNext-1] > tsNext {
idxNext = top.NextIdx
for top.Timestamps[idxNext] <= tsNext {
idxNext++
topTimestamps := top.Timestamps
topNextIdx := top.NextIdx
if n := equalTimestampsPrefix(topTimestamps[topNextIdx:], sbNext.Timestamps[sbNext.NextIdx:]); n > 0 && dedupInterval > 0 {
// Skip n replicated samples at top if deduplication is enabled.
top.NextIdx = topNextIdx + n
} else {
// Copy samples from top to dst with timestamps not exceeding tsNext.
top.NextIdx = topNextIdx + binarySearchTimestamps(topTimestamps[topNextIdx:], tsNext)
dst.Timestamps = append(dst.Timestamps, topTimestamps[topNextIdx:top.NextIdx]...)
dst.Values = append(dst.Values, top.Values[topNextIdx:top.NextIdx]...)
}
}
dst.Timestamps = append(dst.Timestamps, top.Timestamps[top.NextIdx:idxNext]...)
dst.Values = append(dst.Values, top.Values[top.NextIdx:idxNext]...)
if idxNext < len(top.Timestamps) {
top.NextIdx = idxNext
if top.NextIdx < len(topTimestamps) {
heap.Push(&sbh, top)
} else {
// Return top to the pool.
putSortBlock(top)
}
}
@ -549,6 +549,34 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap, dedupInterval int64) {
var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`)
func equalTimestampsPrefix(a, b []int64) int {
for i, v := range a {
if i >= len(b) || v != b[i] {
return i
}
}
return len(a)
}
func binarySearchTimestamps(timestamps []int64, ts int64) int {
// The code has been adapted from sort.Search.
n := len(timestamps)
if n > 0 && timestamps[n-1] < ts {
// Fast path for values scanned in ascending order.
return n
}
i, j := 0, n
for i < j {
h := int(uint(i+j) >> 1)
if h >= 0 && h < len(timestamps) && timestamps[h] <= ts {
i = h + 1
} else {
j = h
}
}
return i
}
type sortBlock struct {
Timestamps []int64
Values []float64

View File

@ -0,0 +1,179 @@
package netstorage
import (
"reflect"
"testing"
)
func TestMergeSortBlocks(t *testing.T) {
f := func(blocks []*sortBlock, dedupInterval int64, expectedResult *Result) {
t.Helper()
var result Result
mergeSortBlocks(&result, blocks, dedupInterval)
if !reflect.DeepEqual(result.Values, expectedResult.Values) {
t.Fatalf("unexpected values;\ngot\n%v\nwant\n%v", result.Values, expectedResult.Values)
}
if !reflect.DeepEqual(result.Timestamps, expectedResult.Timestamps) {
t.Fatalf("unexpected timestamps;\ngot\n%v\nwant\n%v", result.Timestamps, expectedResult.Timestamps)
}
}
// Zero blocks
f(nil, 1, &Result{})
// Single block without samples
f([]*sortBlock{{}}, 1, &Result{})
// Single block with a single samples.
f([]*sortBlock{
{
Timestamps: []int64{1},
Values: []float64{4.2},
},
}, 1, &Result{
Timestamps: []int64{1},
Values: []float64{4.2},
})
// Single block with multiple samples.
f([]*sortBlock{
{
Timestamps: []int64{1, 2, 3},
Values: []float64{4.2, 2.1, 10},
},
}, 1, &Result{
Timestamps: []int64{1, 2, 3},
Values: []float64{4.2, 2.1, 10},
})
// Single block with multiple samples with deduplication.
f([]*sortBlock{
{
Timestamps: []int64{1, 2, 3},
Values: []float64{4.2, 2.1, 10},
},
}, 2, &Result{
Timestamps: []int64{2, 3},
Values: []float64{2.1, 10},
})
// Multiple blocks without time range intersection.
f([]*sortBlock{
{
Timestamps: []int64{3, 5},
Values: []float64{5.2, 6.1},
},
{
Timestamps: []int64{1, 2},
Values: []float64{4.2, 2.1},
},
}, 1, &Result{
Timestamps: []int64{1, 2, 3, 5},
Values: []float64{4.2, 2.1, 5.2, 6.1},
})
// Multiple blocks with time range intersection.
f([]*sortBlock{
{
Timestamps: []int64{3, 5},
Values: []float64{5.2, 6.1},
},
{
Timestamps: []int64{1, 2, 4},
Values: []float64{4.2, 2.1, 42},
},
}, 1, &Result{
Timestamps: []int64{1, 2, 3, 4, 5},
Values: []float64{4.2, 2.1, 5.2, 42, 6.1},
})
// Multiple blocks with time range inclusion.
f([]*sortBlock{
{
Timestamps: []int64{0, 3, 5},
Values: []float64{9, 5.2, 6.1},
},
{
Timestamps: []int64{1, 2, 4},
Values: []float64{4.2, 2.1, 42},
},
}, 1, &Result{
Timestamps: []int64{0, 1, 2, 3, 4, 5},
Values: []float64{9, 4.2, 2.1, 5.2, 42, 6.1},
})
// Multiple blocks with identical timestamps.
f([]*sortBlock{
{
Timestamps: []int64{1, 2, 4},
Values: []float64{9, 5.2, 6.1},
},
{
Timestamps: []int64{1, 2, 4},
Values: []float64{4.2, 2.1, 42},
},
}, 1, &Result{
Timestamps: []int64{1, 2, 4},
Values: []float64{4.2, 2.1, 42},
})
// Multiple blocks with identical timestamps, disabled deduplication.
f([]*sortBlock{
{
Timestamps: []int64{1, 2, 4},
Values: []float64{9, 5.2, 6.1},
},
{
Timestamps: []int64{1, 2, 4},
Values: []float64{4.2, 2.1, 42},
},
}, 0, &Result{
Timestamps: []int64{1, 1, 2, 2, 4, 4},
Values: []float64{9, 4.2, 2.1, 5.2, 6.1, 42},
})
// Multiple blocks with identical timestamp ranges.
f([]*sortBlock{
{
Timestamps: []int64{1, 2, 5, 10, 11},
Values: []float64{9, 8, 7, 6, 5},
},
{
Timestamps: []int64{1, 2, 4, 10, 11, 12},
Values: []float64{21, 22, 23, 24, 25, 26},
},
}, 1, &Result{
Timestamps: []int64{1, 2, 4, 5, 10, 11, 12},
Values: []float64{21, 22, 23, 7, 24, 5, 26},
})
// Multiple blocks with identical timestamp ranges, no deduplication.
f([]*sortBlock{
{
Timestamps: []int64{1, 2, 5, 10, 11},
Values: []float64{9, 8, 7, 6, 5},
},
{
Timestamps: []int64{1, 2, 4, 10, 11, 12},
Values: []float64{21, 22, 23, 24, 25, 26},
},
}, 0, &Result{
Timestamps: []int64{1, 1, 2, 2, 4, 5, 10, 10, 11, 11, 12},
Values: []float64{9, 21, 22, 8, 23, 7, 6, 24, 25, 5, 26},
})
// Multiple blocks with identical timestamp ranges with deduplication.
f([]*sortBlock{
{
Timestamps: []int64{1, 2, 5, 10, 11},
Values: []float64{9, 8, 7, 6, 5},
},
{
Timestamps: []int64{1, 2, 4, 10, 11, 12},
Values: []float64{21, 22, 23, 24, 25, 26},
},
}, 5, &Result{
Timestamps: []int64{5, 10, 12},
Values: []float64{7, 24, 26},
})
}

View File

@ -39,6 +39,7 @@ scrape_configs:
* FEATURE: [query tracing](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#query-tracing): show timestamps in query traces in human-readable format (aka `RFC3339` in UTC timezone) instead of milliseconds since Unix epoch. For example, `2022-06-27T10:32:54.506Z` instead of `1656325974506`. This improves traces' readability.
* FEATURE: improve performance of [/api/v1/series](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers) requests, which return big number of time series.
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve query performance when [replication is enabled](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#replication-and-data-safety).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle partial counter resets in [remove_resets](https://docs.victoriametrics.com/MetricsQL.html#remove_resets) function. Now `remove_resets(sum(m))` should returns the expected increasing line when some time series matching `m` disappear on the selected time range. Previously such a query would return horizontal line after the disappeared series.
* FEATURE: expose additional histogram metrics at `http://victoriametrics:8428/metrics`, which may help understanding query workload: