mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-04 13:52:05 +01:00
73f0a805e2
Previously the interval between item addition and its conversion to searchable in-memory part could vary significantly because of too coarse per-second precision. Switch from fasttime.UnixTimestamp() to time.Now().UnixMilli() for millisecond precision. It is OK to use time.Now() for tracking the time when buffered items must be converted to searchable in-memory parts, since time.Now() calls aren't located in hot paths. Increase the flush interval for converting buffered samples to searchable in-memory parts from one second to two seconds. This should reduce the number of blocks, which are needed to be processed during high-frequency alerting queries. This, in turn, should reduce CPU usage. While at it, hardcode the maximum size of rawRows shard to 8Mb, since this size gives the optimal data ingestion pefromance according to load tests. This reduces memory usage and CPU usage on systems with big amounts of RAM under high data ingestion rate.
141 lines
3.7 KiB
Go
141 lines
3.7 KiB
Go
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
)
|
|
|
|
func TestMain(m *testing.M) {
|
|
isDebug = true
|
|
n := m.Run()
|
|
if err := os.RemoveAll("benchmarkTableSearch"); err != nil {
|
|
panic(fmt.Errorf("cannot remove benchmark tables: %w", err))
|
|
}
|
|
os.Exit(n)
|
|
}
|
|
|
|
func BenchmarkTableSearch(b *testing.B) {
|
|
for _, rowsCount := range []int{1e5, 1e6, 1e7, 1e8} {
|
|
b.Run(fmt.Sprintf("rowsCount_%d", rowsCount), func(b *testing.B) {
|
|
for _, tsidsCount := range []int{1e3, 1e4} {
|
|
b.Run(fmt.Sprintf("tsidsCount_%d", tsidsCount), func(b *testing.B) {
|
|
for _, tsidsSearch := range []int{1, 1e1, 1e2, 1e3, 1e4} {
|
|
b.Run(fmt.Sprintf("tsidsSearch_%d", tsidsSearch), func(b *testing.B) {
|
|
benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch)
|
|
})
|
|
}
|
|
})
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) (*table, *Storage) {
|
|
b.Helper()
|
|
|
|
path := filepath.Join("benchmarkTableSearch", fmt.Sprintf("rows%d_tsids%d", rowsCount, tsidsCount))
|
|
if !createdBenchTables[path] {
|
|
createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
|
|
createdBenchTables[path] = true
|
|
}
|
|
strg := newTestStorage()
|
|
tb := mustOpenTable(path, strg)
|
|
|
|
// Verify rows count in the table opened from files.
|
|
insertsCount := uint64((rowsCount + rowsPerInsert - 1) / rowsPerInsert)
|
|
rowsCountExpected := insertsCount * uint64(rowsPerInsert)
|
|
var m TableMetrics
|
|
tb.UpdateMetrics(&m)
|
|
if rowsCount := m.TotalRowsCount(); rowsCount != rowsCountExpected {
|
|
b.Fatalf("unexpected rows count in the table %q; got %d; want %d", path, rowsCount, rowsCountExpected)
|
|
}
|
|
|
|
return tb, strg
|
|
}
|
|
|
|
var createdBenchTables = make(map[string]bool)
|
|
|
|
func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) {
|
|
b.Helper()
|
|
|
|
strg := newTestStorage()
|
|
tb := mustOpenTable(path, strg)
|
|
|
|
insertsCount := uint64((rowsCount + rowsPerInsert - 1) / rowsPerInsert)
|
|
timestamp := uint64(startTimestamp)
|
|
|
|
var wg sync.WaitGroup
|
|
for k := 0; k < cgroup.AvailableCPUs(); k++ {
|
|
wg.Add(1)
|
|
go func(n int) {
|
|
rng := rand.New(rand.NewSource(int64(n)))
|
|
rows := make([]rawRow, rowsPerInsert)
|
|
value := float64(100)
|
|
for int(atomic.AddUint64(&insertsCount, ^uint64(0))) >= 0 {
|
|
for j := 0; j < rowsPerInsert; j++ {
|
|
ts := atomic.AddUint64(×tamp, uint64(10+rng.Int63n(2)))
|
|
value += float64(int(rng.NormFloat64() * 5))
|
|
|
|
r := &rows[j]
|
|
r.PrecisionBits = defaultPrecisionBits
|
|
r.TSID.MetricID = uint64(rng.Intn(tsidsCount) + 1)
|
|
r.Timestamp = int64(ts)
|
|
r.Value = value
|
|
}
|
|
tb.MustAddRows(rows)
|
|
}
|
|
wg.Done()
|
|
}(k)
|
|
}
|
|
wg.Wait()
|
|
|
|
tb.MustClose()
|
|
stopTestStorage(strg)
|
|
}
|
|
|
|
func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int) {
|
|
startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000
|
|
rowsPerInsert := maxRawRowsPerShard
|
|
|
|
tb, strg := openBenchTable(b, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
|
|
tr := TimeRange{
|
|
MinTimestamp: startTimestamp,
|
|
MaxTimestamp: (1 << 63) - 1,
|
|
}
|
|
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
rowsPerBench := int64(float64(rowsCount) * float64(tsidsSearch) / float64(tsidsCount))
|
|
if rowsPerBench > int64(rowsCount) {
|
|
rowsPerBench = int64(rowsCount)
|
|
}
|
|
b.SetBytes(rowsPerBench)
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
var ts tableSearch
|
|
tsids := make([]TSID, tsidsSearch)
|
|
var tmpBlock Block
|
|
for pb.Next() {
|
|
for i := range tsids {
|
|
tsids[i].MetricID = 1 + uint64(i)
|
|
}
|
|
ts.Init(tb, tsids, tr)
|
|
for ts.NextBlock() {
|
|
ts.BlockRef.MustReadBlock(&tmpBlock)
|
|
}
|
|
ts.MustClose()
|
|
}
|
|
})
|
|
b.StopTimer()
|
|
|
|
tb.MustClose()
|
|
stopTestStorage(strg)
|
|
}
|