mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
458d412bb6
The maximum inmemory file size now depends on `-memory.allowedPercent`. This should improve performance and reduce the number of filesystem calls on machines with big amounts of RAM when performing heavy queries over big number of samples and time series.
151 lines
3.9 KiB
Go
151 lines
3.9 KiB
Go
package netstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
)
|
|
|
|
func TestMain(m *testing.M) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
tmpDir := "TestTmpBlocks"
|
|
InitTmpBlocksDir(tmpDir)
|
|
statusCode := m.Run()
|
|
if err := os.RemoveAll(tmpDir); err != nil {
|
|
logger.Panicf("cannot remove %q: %s", tmpDir, err)
|
|
}
|
|
os.Exit(statusCode)
|
|
}
|
|
|
|
func TestTmpBlocksFileSerial(t *testing.T) {
|
|
if err := testTmpBlocksFile(); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
}
|
|
|
|
func TestTmpBlocksFileConcurrent(t *testing.T) {
|
|
concurrency := 3
|
|
ch := make(chan error, concurrency)
|
|
for i := 0; i < concurrency; i++ {
|
|
go func() {
|
|
ch <- testTmpBlocksFile()
|
|
}()
|
|
}
|
|
for i := 0; i < concurrency; i++ {
|
|
select {
|
|
case err := <-ch:
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
case <-time.After(30 * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testTmpBlocksFile() error {
|
|
createBlock := func() *storage.Block {
|
|
rowsCount := rand.Intn(8000) + 1
|
|
var timestamps, values []int64
|
|
ts := int64(rand.Intn(1023434))
|
|
for i := 0; i < rowsCount; i++ {
|
|
ts += int64(rand.Intn(1000) + 1)
|
|
timestamps = append(timestamps, ts)
|
|
values = append(values, int64(i*i+rand.Intn(20)))
|
|
}
|
|
tsid := &storage.TSID{
|
|
MetricID: 234211,
|
|
}
|
|
scale := int16(rand.Intn(123))
|
|
precisionBits := uint8(rand.Intn(63) + 1)
|
|
var b storage.Block
|
|
b.Init(tsid, timestamps, values, scale, precisionBits)
|
|
_, _, _ = b.MarshalData(0, 0)
|
|
return &b
|
|
}
|
|
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile() / 2, 2 * maxInmemoryTmpBlocksFile()} {
|
|
err := func() error {
|
|
tbf := getTmpBlocksFile()
|
|
defer putTmpBlocksFile(tbf)
|
|
|
|
// Write blocks until their summary size exceeds `size`.
|
|
var addrs []tmpBlockAddr
|
|
var blocks []*storage.Block
|
|
for tbf.offset < uint64(size) {
|
|
b := createBlock()
|
|
addr, err := tbf.WriteBlock(b)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot write block at offset %d: %s", tbf.offset, err)
|
|
}
|
|
if addr.offset+uint64(addr.size) != tbf.offset {
|
|
return fmt.Errorf("unexpected addr=%+v for offset %v", &addr, tbf.offset)
|
|
}
|
|
addrs = append(addrs, addr)
|
|
blocks = append(blocks, b)
|
|
}
|
|
if err := tbf.Finalize(); err != nil {
|
|
return fmt.Errorf("cannot finalize tbf: %s", err)
|
|
}
|
|
|
|
// Read blocks in parallel and verify them
|
|
concurrency := 2
|
|
workCh := make(chan int)
|
|
doneCh := make(chan error)
|
|
for i := 0; i < concurrency; i++ {
|
|
go func() {
|
|
doneCh <- func() error {
|
|
var b1 storage.Block
|
|
for idx := range workCh {
|
|
addr := addrs[idx]
|
|
b := blocks[idx]
|
|
if err := b.UnmarshalData(); err != nil {
|
|
return fmt.Errorf("cannot unmarshal data from the original block: %s", err)
|
|
}
|
|
b1.Reset()
|
|
tbf.MustReadBlockAt(&b1, addr)
|
|
if err := b1.UnmarshalData(); err != nil {
|
|
return fmt.Errorf("cannot unmarshal data from tbf: %s", err)
|
|
}
|
|
if b1.RowsCount() != b.RowsCount() {
|
|
return fmt.Errorf("unexpected number of rows in tbf block; got %d; want %d", b1.RowsCount(), b.RowsCount())
|
|
}
|
|
if !reflect.DeepEqual(b1.Timestamps(), b.Timestamps()) {
|
|
return fmt.Errorf("unexpected timestamps; got\n%v\nwant\n%v", b1.Timestamps(), b.Timestamps())
|
|
}
|
|
if !reflect.DeepEqual(b1.Values(), b.Values()) {
|
|
return fmt.Errorf("unexpected values; got\n%v\nwant\n%v", b1.Values(), b.Values())
|
|
}
|
|
}
|
|
return nil
|
|
}()
|
|
}()
|
|
}
|
|
for i := range addrs {
|
|
workCh <- i
|
|
}
|
|
close(workCh)
|
|
for i := 0; i < concurrency; i++ {
|
|
select {
|
|
case err := <-doneCh:
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case <-time.After(time.Second):
|
|
return fmt.Errorf("timeout")
|
|
}
|
|
}
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|