lib/logstorage: optimize performance for top pipe when it is applied to a field with millions of unique values

- Use parallel merge of per-CPU shard results. This improves merge performance on multi-CPU systems.
- Use topN heap sort of per-shard results. This improves performance when results contain millions of entries.
This commit is contained in:
Aliaksandr Valialkin 2024-10-17 21:19:16 +02:00
parent 98fcd95438
commit 192c07f76a
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
2 changed files with 193 additions and 34 deletions

View File

@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add basic [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml) for VictoriaLogs process. See details at [monitoring docs](https://docs.victoriametrics.com/victorialogs/index.html#monitoring).
* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on multi-CPU hosts when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster now when `user_id` field contains millions of unique values.
## [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs)

View File

@ -1,13 +1,17 @@
package logstorage
import (
"container/heap"
"fmt"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -243,43 +247,24 @@ func (ptp *pipeTopProcessor) flush() error {
if n := ptp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
}
// merge state across shards
shards := ptp.shards
m := shards[0].getM()
shards = shards[1:]
for i := range shards {
if needStop(ptp.stopCh) {
limit := ptp.pt.limit
if limit == 0 {
return nil
}
for k, pHitsSrc := range shards[i].getM() {
pHits, ok := m[k]
if !ok {
m[k] = pHitsSrc
// merge state across shards in parallel
var entries []*pipeTopEntry
if len(ptp.shards) == 1 {
entries = getTopEntries(ptp.shards[0].getM(), limit, ptp.stopCh)
} else {
*pHits += *pHitsSrc
es, err := ptp.getTopEntriesParallel(limit)
if err != nil {
return err
}
entries = es
}
}
// select top entries with the biggest number of hits
entries := make([]pipeTopEntry, 0, len(m))
for k, pHits := range m {
entries = append(entries, pipeTopEntry{
k: k,
hits: *pHits,
})
}
sort.Slice(entries, func(i, j int) bool {
a, b := &entries[i], &entries[j]
if a.hits == b.hits {
return a.k < b.k
}
return a.hits > b.hits
})
if uint64(len(entries)) > ptp.pt.limit {
entries = entries[:ptp.pt.limit]
if needStop(ptp.stopCh) {
return nil
}
// write result
@ -373,11 +358,184 @@ func (ptp *pipeTopProcessor) flush() error {
return nil
}
func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntry, error) {
shards := ptp.shards
shardsLen := len(shards)
var wg sync.WaitGroup
perShardMaps := make([][]map[string]*uint64, shardsLen)
for i := range shards {
wg.Add(1)
go func(idx int) {
defer wg.Done()
shardMaps := make([]map[string]*uint64, shardsLen)
for i := range shardMaps {
shardMaps[i] = make(map[string]*uint64)
}
n := int64(0)
for k, pHitsSrc := range shards[idx].getM() {
if needStop(ptp.stopCh) {
return
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
m := shardMaps[h%uint64(len(shardMaps))]
n += updatePipeTopMap(m, k, pHitsSrc)
if n > stateSizeBudgetChunk {
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
n = 0
}
}
ptp.stateSizeBudget.Add(-n)
perShardMaps[idx] = shardMaps
shards[idx].m = nil
}(i)
}
wg.Wait()
if needStop(ptp.stopCh) {
return nil, nil
}
if n := ptp.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
}
entriess := make([][]*pipeTopEntry, shardsLen)
for i := range entriess {
wg.Add(1)
go func(idx int) {
defer wg.Done()
m := perShardMaps[0][idx]
n := int64(0)
for _, shardMaps := range perShardMaps[1:] {
for k, pHitsSrc := range shardMaps[idx] {
if needStop(ptp.stopCh) {
return
}
n += updatePipeTopMap(m, k, pHitsSrc)
if n > stateSizeBudgetChunk {
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
n = 0
}
}
}
ptp.stateSizeBudget.Add(-n)
entriess[idx] = getTopEntries(m, ptp.pt.limit, ptp.stopCh)
}(i)
}
wg.Wait()
if needStop(ptp.stopCh) {
return nil, nil
}
if n := ptp.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
}
// merge entriess
entries := entriess[0]
for _, es := range entriess[1:] {
entries = append(entries, es...)
}
sort.Slice(entries, func(i, j int) bool {
return entries[j].less(entries[i])
})
if uint64(len(entries)) > limit {
entries = entries[:limit]
}
return entries, nil
}
func getTopEntries(m map[string]*uint64, limit uint64, stopCh <-chan struct{}) []*pipeTopEntry {
if limit == 0 {
return nil
}
var eh topEntriesHeap
for k, pHits := range m {
if needStop(stopCh) {
return nil
}
e := pipeTopEntry{
k: k,
hits: *pHits,
}
if uint64(len(eh)) < limit {
eCopy := e
heap.Push(&eh, &eCopy)
continue
}
if eh[0].less(&e) {
eCopy := e
eh[0] = &eCopy
heap.Fix(&eh, 0)
}
}
result := ([]*pipeTopEntry)(eh)
for len(eh) > 0 {
x := heap.Pop(&eh)
result[len(eh)] = x.(*pipeTopEntry)
}
return result
}
func updatePipeTopMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 {
pHitsDst, ok := m[k]
if ok {
*pHitsDst += *pHitsSrc
return 0
}
m[k] = pHitsSrc
return int64(unsafe.Sizeof(k) + unsafe.Sizeof(pHitsSrc))
}
type topEntriesHeap []*pipeTopEntry
func (h *topEntriesHeap) Less(i, j int) bool {
a := *h
return a[i].less(a[j])
}
func (h *topEntriesHeap) Swap(i, j int) {
a := *h
a[i], a[j] = a[j], a[i]
}
func (h *topEntriesHeap) Len() int {
return len(*h)
}
func (h *topEntriesHeap) Push(v any) {
x := v.(*pipeTopEntry)
*h = append(*h, x)
}
func (h *topEntriesHeap) Pop() any {
a := *h
x := a[len(a)-1]
a[len(a)-1] = nil
*h = a[:len(a)-1]
return x
}
type pipeTopEntry struct {
k string
hits uint64
}
func (e *pipeTopEntry) less(r *pipeTopEntry) bool {
if e.hits == r.hits {
return e.k > r.k
}
return e.hits < r.hits
}
type pipeTopWriteContext struct {
ptp *pipeTopProcessor
rcs []resultColumn