mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-22 08:10:44 +01:00
lib/logstorage: optimize 'stats by(...)' calculations for by(...) fields with millions of unique values on multi-CPU systems
- Parallelize merging of per-CPU `stats by(...)` result shards.
- Parallelize writing `stats by(...)` results to the next pipe.
(cherry picked from commit c4b2fdff70
)
This commit is contained in:
parent
1000ae437c
commit
cd7823a310
@ -16,7 +16,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||||||
## tip
|
## tip
|
||||||
|
|
||||||
* FEATURE: add basic [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml) for VictoriaLogs process. See details at [monitoring docs](https://docs.victoriametrics.com/victorialogs/index.html#monitoring).
|
* FEATURE: add basic [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml) for VictoriaLogs process. See details at [monitoring docs](https://docs.victoriametrics.com/victorialogs/index.html#monitoring).
|
||||||
* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on multi-CPU hosts when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster now when `user_id` field contains millions of unique values.
|
* FEATURE: improve [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) performance on systems with many CPU cores when `by(...)` fields contain big number of unique values. For example, `_time:1d | stats by (user_id) count() x` should be executed much faster when `user_id` field contains millions of unique values.
|
||||||
|
* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on systems with many CPU cores when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster when `user_id` field contains millions of unique values.
|
||||||
|
|
||||||
## [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs)
|
## [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs)
|
||||||
|
|
||||||
|
@ -3,9 +3,12 @@ package logstorage
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/cespare/xxhash/v2"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
@ -425,40 +428,38 @@ func (psp *pipeStatsProcessor) flush() error {
|
|||||||
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
|
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge states across shards
|
// Merge states across shards in parallel
|
||||||
shards := psp.shards
|
ms, err := psp.mergeShardsParallel()
|
||||||
shardMain := &shards[0]
|
if err != nil {
|
||||||
shardMain.init()
|
return err
|
||||||
m := shardMain.m
|
}
|
||||||
shards = shards[1:]
|
if needStop(psp.stopCh) {
|
||||||
for i := range shards {
|
return nil
|
||||||
shard := &shards[i]
|
|
||||||
for key, psg := range shard.m {
|
|
||||||
// shard.m may be quite big, so this loop can take a lot of time and CPU.
|
|
||||||
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
|
|
||||||
if needStop(psp.stopCh) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
spgBase := m[key]
|
|
||||||
if spgBase == nil {
|
|
||||||
m[key] = psg
|
|
||||||
} else {
|
|
||||||
for i, sfp := range spgBase.sfps {
|
|
||||||
sfp.mergeState(psg.sfps[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write per-group states to ppNext
|
if len(psp.ps.byFields) == 0 && len(ms) == 0 {
|
||||||
byFields := psp.ps.byFields
|
|
||||||
if len(byFields) == 0 && len(m) == 0 {
|
|
||||||
// Special case - zero matching rows.
|
// Special case - zero matching rows.
|
||||||
_ = shardMain.getPipeStatsGroup(nil)
|
psp.shards[0].init()
|
||||||
m = shardMain.m
|
_ = psp.shards[0].getPipeStatsGroup(nil)
|
||||||
|
ms = append(ms, psp.shards[0].m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write the calculated stats in parallel to the next pipe.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i, m := range ms {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(workerID uint) {
|
||||||
|
defer wg.Done()
|
||||||
|
psp.writeShardData(workerID, m)
|
||||||
|
}(uint(i))
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (psp *pipeStatsProcessor) writeShardData(workerID uint, m map[string]*pipeStatsGroup) {
|
||||||
|
byFields := psp.ps.byFields
|
||||||
rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.funcs))
|
rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.funcs))
|
||||||
for _, bf := range byFields {
|
for _, bf := range byFields {
|
||||||
rcs = appendResultColumnWithName(rcs, bf.name)
|
rcs = appendResultColumnWithName(rcs, bf.name)
|
||||||
@ -475,7 +476,7 @@ func (psp *pipeStatsProcessor) flush() error {
|
|||||||
// m may be quite big, so this loop can take a lot of time and CPU.
|
// m may be quite big, so this loop can take a lot of time and CPU.
|
||||||
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
|
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
|
||||||
if needStop(psp.stopCh) {
|
if needStop(psp.stopCh) {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal values for byFields from key.
|
// Unmarshal values for byFields from key.
|
||||||
@ -511,7 +512,7 @@ func (psp *pipeStatsProcessor) flush() error {
|
|||||||
if valuesLen >= 1_000_000 {
|
if valuesLen >= 1_000_000 {
|
||||||
br.setResultColumns(rcs, rowsCount)
|
br.setResultColumns(rcs, rowsCount)
|
||||||
rowsCount = 0
|
rowsCount = 0
|
||||||
psp.ppNext.writeBlock(0, &br)
|
psp.ppNext.writeBlock(workerID, &br)
|
||||||
br.reset()
|
br.reset()
|
||||||
for i := range rcs {
|
for i := range rcs {
|
||||||
rcs[i].resetValues()
|
rcs[i].resetValues()
|
||||||
@ -521,9 +522,134 @@ func (psp *pipeStatsProcessor) flush() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
br.setResultColumns(rcs, rowsCount)
|
br.setResultColumns(rcs, rowsCount)
|
||||||
psp.ppNext.writeBlock(0, &br)
|
psp.ppNext.writeBlock(workerID, &br)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
func (psp *pipeStatsProcessor) mergeShardsParallel() ([]map[string]*pipeStatsGroup, error) {
|
||||||
|
shards := psp.shards
|
||||||
|
shardsLen := len(shards)
|
||||||
|
|
||||||
|
if shardsLen == 1 {
|
||||||
|
var ms []map[string]*pipeStatsGroup
|
||||||
|
shards[0].init()
|
||||||
|
if len(shards[0].m) > 0 {
|
||||||
|
ms = append(ms, shards[0].m)
|
||||||
|
}
|
||||||
|
return ms, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
perShardMaps := make([][]map[string]*pipeStatsGroup, shardsLen)
|
||||||
|
for i := range shards {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(idx int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
shardMaps := make([]map[string]*pipeStatsGroup, shardsLen)
|
||||||
|
for i := range shardMaps {
|
||||||
|
shardMaps[i] = make(map[string]*pipeStatsGroup)
|
||||||
|
}
|
||||||
|
|
||||||
|
shards[idx].init()
|
||||||
|
|
||||||
|
n := int64(0)
|
||||||
|
nTotal := int64(0)
|
||||||
|
for k, psg := range shards[idx].m {
|
||||||
|
if needStop(psp.stopCh) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
|
||||||
|
m := shardMaps[h%uint64(len(shardMaps))]
|
||||||
|
n += updatePipeStatsMap(m, k, psg)
|
||||||
|
if n > stateSizeBudgetChunk {
|
||||||
|
if nRemaining := psp.stateSizeBudget.Add(-n); nRemaining < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
nTotal += n
|
||||||
|
n = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nTotal += n
|
||||||
|
psp.stateSizeBudget.Add(-n)
|
||||||
|
|
||||||
|
perShardMaps[idx] = shardMaps
|
||||||
|
|
||||||
|
// Clean the original map and return its state size budget back.
|
||||||
|
shards[idx].m = nil
|
||||||
|
psp.stateSizeBudget.Add(nTotal)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if needStop(psp.stopCh) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if n := psp.stateSizeBudget.Load(); n < 0 {
|
||||||
|
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge per-shard entries into perShardMaps[0]
|
||||||
|
for i := range perShardMaps {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(idx int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
m := perShardMaps[0][idx]
|
||||||
|
for i := 1; i < len(perShardMaps); i++ {
|
||||||
|
n := int64(0)
|
||||||
|
nTotal := int64(0)
|
||||||
|
for k, psg := range perShardMaps[i][idx] {
|
||||||
|
if needStop(psp.stopCh) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n += updatePipeStatsMap(m, k, psg)
|
||||||
|
if n > stateSizeBudgetChunk {
|
||||||
|
if nRemaining := psp.stateSizeBudget.Add(-n); nRemaining < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
nTotal += n
|
||||||
|
n = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nTotal += n
|
||||||
|
psp.stateSizeBudget.Add(-n)
|
||||||
|
|
||||||
|
// Clean the original map and return its state size budget back.
|
||||||
|
perShardMaps[i][idx] = nil
|
||||||
|
psp.stateSizeBudget.Add(nTotal)
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if needStop(psp.stopCh) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if n := psp.stateSizeBudget.Load(); n < 0 {
|
||||||
|
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter out maps without entries
|
||||||
|
ms := perShardMaps[0]
|
||||||
|
result := ms[:0]
|
||||||
|
for _, m := range ms {
|
||||||
|
if len(m) > 0 {
|
||||||
|
result = append(result, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func updatePipeStatsMap(m map[string]*pipeStatsGroup, k string, psgSrc *pipeStatsGroup) int64 {
|
||||||
|
psgDst := m[k]
|
||||||
|
if psgDst != nil {
|
||||||
|
for i, sfp := range psgDst.sfps {
|
||||||
|
sfp.mergeState(psgSrc.sfps[i])
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
m[k] = psgSrc
|
||||||
|
return int64(unsafe.Sizeof(k) + unsafe.Sizeof(psgSrc))
|
||||||
}
|
}
|
||||||
|
|
||||||
func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) {
|
func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) {
|
||||||
|
@ -201,8 +201,8 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
|
|||||||
|
|
||||||
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
|
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
|
||||||
m := shard.getM()
|
m := shard.getM()
|
||||||
pHits, ok := m[v]
|
pHits := m[v]
|
||||||
if !ok {
|
if pHits == nil {
|
||||||
vCopy := strings.Clone(v)
|
vCopy := strings.Clone(v)
|
||||||
hits := uint64(0)
|
hits := uint64(0)
|
||||||
pHits = &hits
|
pHits = &hits
|
||||||
@ -247,21 +247,11 @@ func (ptp *pipeTopProcessor) flush() error {
|
|||||||
if n := ptp.stateSizeBudget.Load(); n <= 0 {
|
if n := ptp.stateSizeBudget.Load(); n <= 0 {
|
||||||
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
||||||
}
|
}
|
||||||
limit := ptp.pt.limit
|
|
||||||
if limit == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// merge state across shards in parallel
|
// merge state across shards in parallel
|
||||||
var entries []*pipeTopEntry
|
entries, err := ptp.mergeShardsParallel()
|
||||||
if len(ptp.shards) == 1 {
|
if err != nil {
|
||||||
entries = getTopEntries(ptp.shards[0].getM(), limit, ptp.stopCh)
|
return err
|
||||||
} else {
|
|
||||||
es, err := ptp.getTopEntriesParallel(limit)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
entries = es
|
|
||||||
}
|
}
|
||||||
if needStop(ptp.stopCh) {
|
if needStop(ptp.stopCh) {
|
||||||
return nil
|
return nil
|
||||||
@ -358,9 +348,18 @@ func (ptp *pipeTopProcessor) flush() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntry, error) {
|
func (ptp *pipeTopProcessor) mergeShardsParallel() ([]*pipeTopEntry, error) {
|
||||||
|
limit := ptp.pt.limit
|
||||||
|
if limit == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
shards := ptp.shards
|
shards := ptp.shards
|
||||||
shardsLen := len(shards)
|
shardsLen := len(shards)
|
||||||
|
if shardsLen == 1 {
|
||||||
|
entries := getTopEntries(shards[0].getM(), limit, ptp.stopCh)
|
||||||
|
return entries, nil
|
||||||
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
perShardMaps := make([][]map[string]*uint64, shardsLen)
|
perShardMaps := make([][]map[string]*uint64, shardsLen)
|
||||||
@ -375,24 +374,30 @@ func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntr
|
|||||||
}
|
}
|
||||||
|
|
||||||
n := int64(0)
|
n := int64(0)
|
||||||
for k, pHitsSrc := range shards[idx].getM() {
|
nTotal := int64(0)
|
||||||
|
for k, pHits := range shards[idx].getM() {
|
||||||
if needStop(ptp.stopCh) {
|
if needStop(ptp.stopCh) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
|
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
|
||||||
m := shardMaps[h%uint64(len(shardMaps))]
|
m := shardMaps[h%uint64(len(shardMaps))]
|
||||||
n += updatePipeTopMap(m, k, pHitsSrc)
|
n += updatePipeTopMap(m, k, pHits)
|
||||||
if n > stateSizeBudgetChunk {
|
if n > stateSizeBudgetChunk {
|
||||||
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
|
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
nTotal += n
|
||||||
n = 0
|
n = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
nTotal += n
|
||||||
ptp.stateSizeBudget.Add(-n)
|
ptp.stateSizeBudget.Add(-n)
|
||||||
|
|
||||||
perShardMaps[idx] = shardMaps
|
perShardMaps[idx] = shardMaps
|
||||||
|
|
||||||
|
// Clean the original map and return its state size budget back.
|
||||||
shards[idx].m = nil
|
shards[idx].m = nil
|
||||||
|
ptp.stateSizeBudget.Add(nTotal)
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -403,6 +408,7 @@ func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntr
|
|||||||
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Obtain topN entries per each shard
|
||||||
entriess := make([][]*pipeTopEntry, shardsLen)
|
entriess := make([][]*pipeTopEntry, shardsLen)
|
||||||
for i := range entriess {
|
for i := range entriess {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -410,22 +416,30 @@ func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntr
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
m := perShardMaps[0][idx]
|
m := perShardMaps[0][idx]
|
||||||
n := int64(0)
|
for i := 1; i < len(perShardMaps); i++ {
|
||||||
for _, shardMaps := range perShardMaps[1:] {
|
n := int64(0)
|
||||||
for k, pHitsSrc := range shardMaps[idx] {
|
nTotal := int64(0)
|
||||||
|
for k, pHits := range perShardMaps[i][idx] {
|
||||||
if needStop(ptp.stopCh) {
|
if needStop(ptp.stopCh) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
n += updatePipeTopMap(m, k, pHitsSrc)
|
n += updatePipeTopMap(m, k, pHits)
|
||||||
if n > stateSizeBudgetChunk {
|
if n > stateSizeBudgetChunk {
|
||||||
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
|
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
nTotal += n
|
||||||
n = 0
|
n = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
nTotal += n
|
||||||
|
ptp.stateSizeBudget.Add(-n)
|
||||||
|
|
||||||
|
// Clean the original map and return its state size budget back.
|
||||||
|
perShardMaps[i][idx] = nil
|
||||||
|
ptp.stateSizeBudget.Add(nTotal)
|
||||||
}
|
}
|
||||||
ptp.stateSizeBudget.Add(-n)
|
perShardMaps[0][idx] = nil
|
||||||
|
|
||||||
entriess[idx] = getTopEntries(m, ptp.pt.limit, ptp.stopCh)
|
entriess[idx] = getTopEntries(m, ptp.pt.limit, ptp.stopCh)
|
||||||
}(i)
|
}(i)
|
||||||
@ -489,8 +503,8 @@ func getTopEntries(m map[string]*uint64, limit uint64, stopCh <-chan struct{}) [
|
|||||||
}
|
}
|
||||||
|
|
||||||
func updatePipeTopMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 {
|
func updatePipeTopMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 {
|
||||||
pHitsDst, ok := m[k]
|
pHitsDst := m[k]
|
||||||
if ok {
|
if pHitsDst != nil {
|
||||||
*pHitsDst += *pHitsSrc
|
*pHitsDst += *pHitsSrc
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user