VictoriaMetrics/lib/storage/table_search_timing_test.go
Aliaksandr Valialkin 8669584e9f
lib/{storage,mergeset}: convert beffered items into searchable in-memory parts exactly once per the given flush interval
Previously the interval between item addition and its conversion to searchable in-memory part
could vary significantly because of too coarse per-second precision. Switch from fasttime.UnixTimestamp()
to time.Now().UnixMilli() for millisecond precision. It is OK to use time.Now() for tracking
the time when buffered items must be converted to searchable in-memory parts, since time.Now()
calls aren't located in hot paths.

Increase the flush interval for converting buffered samples to searchable in-memory parts
from one second to two seconds. This should reduce the number of blocks, which are needed
to be processed during high-frequency alerting queries. This, in turn, should reduce CPU usage.

While at it, hardcode the maximum size of rawRows shard to 8Mb, since this size gives the optimal
data ingestion pefromance according to load tests. This reduces memory usage and CPU usage on systems
with big amounts of RAM under high data ingestion rate.
2024-02-23 01:11:57 +02:00

141 lines
3.7 KiB
Go

package storage
import (
"fmt"
"math/rand"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
)
func TestMain(m *testing.M) {
isDebug = true
n := m.Run()
if err := os.RemoveAll("benchmarkTableSearch"); err != nil {
panic(fmt.Errorf("cannot remove benchmark tables: %w", err))
}
os.Exit(n)
}
func BenchmarkTableSearch(b *testing.B) {
for _, rowsCount := range []int{1e5, 1e6, 1e7, 1e8} {
b.Run(fmt.Sprintf("rowsCount_%d", rowsCount), func(b *testing.B) {
for _, tsidsCount := range []int{1e3, 1e4} {
b.Run(fmt.Sprintf("tsidsCount_%d", tsidsCount), func(b *testing.B) {
for _, tsidsSearch := range []int{1, 1e1, 1e2, 1e3, 1e4} {
b.Run(fmt.Sprintf("tsidsSearch_%d", tsidsSearch), func(b *testing.B) {
benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch)
})
}
})
}
})
}
}
func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) (*table, *Storage) {
b.Helper()
path := filepath.Join("benchmarkTableSearch", fmt.Sprintf("rows%d_tsids%d", rowsCount, tsidsCount))
if !createdBenchTables[path] {
createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
createdBenchTables[path] = true
}
strg := newTestStorage()
tb := mustOpenTable(path, strg)
// Verify rows count in the table opened from files.
insertsCount := uint64((rowsCount + rowsPerInsert - 1) / rowsPerInsert)
rowsCountExpected := insertsCount * uint64(rowsPerInsert)
var m TableMetrics
tb.UpdateMetrics(&m)
if rowsCount := m.TotalRowsCount(); rowsCount != rowsCountExpected {
b.Fatalf("unexpected rows count in the table %q; got %d; want %d", path, rowsCount, rowsCountExpected)
}
return tb, strg
}
var createdBenchTables = make(map[string]bool)
func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) {
b.Helper()
strg := newTestStorage()
tb := mustOpenTable(path, strg)
insertsCount := uint64((rowsCount + rowsPerInsert - 1) / rowsPerInsert)
timestamp := uint64(startTimestamp)
var wg sync.WaitGroup
for k := 0; k < cgroup.AvailableCPUs(); k++ {
wg.Add(1)
go func(n int) {
rng := rand.New(rand.NewSource(int64(n)))
rows := make([]rawRow, rowsPerInsert)
value := float64(100)
for int(atomic.AddUint64(&insertsCount, ^uint64(0))) >= 0 {
for j := 0; j < rowsPerInsert; j++ {
ts := atomic.AddUint64(&timestamp, uint64(10+rng.Int63n(2)))
value += float64(int(rng.NormFloat64() * 5))
r := &rows[j]
r.PrecisionBits = defaultPrecisionBits
r.TSID.MetricID = uint64(rng.Intn(tsidsCount) + 1)
r.Timestamp = int64(ts)
r.Value = value
}
tb.MustAddRows(rows)
}
wg.Done()
}(k)
}
wg.Wait()
tb.MustClose()
stopTestStorage(strg)
}
func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int) {
startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000
rowsPerInsert := maxRawRowsPerShard
tb, strg := openBenchTable(b, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
tr := TimeRange{
MinTimestamp: startTimestamp,
MaxTimestamp: (1 << 63) - 1,
}
b.ResetTimer()
b.ReportAllocs()
rowsPerBench := int64(float64(rowsCount) * float64(tsidsSearch) / float64(tsidsCount))
if rowsPerBench > int64(rowsCount) {
rowsPerBench = int64(rowsCount)
}
b.SetBytes(rowsPerBench)
b.RunParallel(func(pb *testing.PB) {
var ts tableSearch
tsids := make([]TSID, tsidsSearch)
var tmpBlock Block
for pb.Next() {
for i := range tsids {
tsids[i].MetricID = 1 + uint64(i)
}
ts.Init(tb, tsids, tr)
for ts.NextBlock() {
ts.BlockRef.MustReadBlock(&tmpBlock)
}
ts.MustClose()
}
})
b.StopTimer()
tb.MustClose()
stopTestStorage(strg)
}