VictoriaMetrics/app/vmselect/netstorage/tmp_blocks_file_test.go
Aliaksandr Valialkin c1d3705be0 app/vmselect/netstorage: marshal block outside tmpBlocksFile.WriteBlock
This allows re-using the destination buffer for marshaling in the outer loop.
2019-09-28 21:07:13 +03:00

154 lines
4.0 KiB
Go

package netstorage
import (
"fmt"
"math/rand"
"os"
"reflect"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
func TestMain(m *testing.M) {
rand.Seed(time.Now().UnixNano())
tmpDir := "TestTmpBlocks"
InitTmpBlocksDir(tmpDir)
statusCode := m.Run()
if err := os.RemoveAll(tmpDir); err != nil {
logger.Panicf("cannot remove %q: %s", tmpDir, err)
}
os.Exit(statusCode)
}
func TestTmpBlocksFileSerial(t *testing.T) {
if err := testTmpBlocksFile(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
func TestTmpBlocksFileConcurrent(t *testing.T) {
concurrency := 3
ch := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
ch <- testTmpBlocksFile()
}()
}
for i := 0; i < concurrency; i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(30 * time.Second):
t.Fatalf("timeout")
}
}
}
func testTmpBlocksFile() error {
createBlock := func() *storage.Block {
rowsCount := rand.Intn(8000) + 1
var timestamps, values []int64
ts := int64(rand.Intn(1023434))
for i := 0; i < rowsCount; i++ {
ts += int64(rand.Intn(1000) + 1)
timestamps = append(timestamps, ts)
values = append(values, int64(i*i+rand.Intn(20)))
}
tsid := &storage.TSID{
MetricID: 234211,
}
scale := int16(rand.Intn(123))
precisionBits := uint8(rand.Intn(63) + 1)
var b storage.Block
b.Init(tsid, timestamps, values, scale, precisionBits)
_, _, _ = b.MarshalData(0, 0)
return &b
}
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile() / 2, 2 * maxInmemoryTmpBlocksFile()} {
err := func() error {
tbf := getTmpBlocksFile()
defer putTmpBlocksFile(tbf)
// Write blocks until their summary size exceeds `size`.
var addrs []tmpBlockAddr
var blocks []*storage.Block
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
for tbf.offset < uint64(size) {
b := createBlock()
bb.B = storage.MarshalBlock(bb.B[:0], b)
addr, err := tbf.WriteBlockData(bb.B)
if err != nil {
return fmt.Errorf("cannot write block at offset %d: %s", tbf.offset, err)
}
if addr.offset+uint64(addr.size) != tbf.offset {
return fmt.Errorf("unexpected addr=%+v for offset %v", &addr, tbf.offset)
}
addrs = append(addrs, addr)
blocks = append(blocks, b)
}
if err := tbf.Finalize(); err != nil {
return fmt.Errorf("cannot finalize tbf: %s", err)
}
// Read blocks in parallel and verify them
concurrency := 2
workCh := make(chan int)
doneCh := make(chan error)
for i := 0; i < concurrency; i++ {
go func() {
doneCh <- func() error {
var b1 storage.Block
for idx := range workCh {
addr := addrs[idx]
b := blocks[idx]
if err := b.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal data from the original block: %s", err)
}
b1.Reset()
tbf.MustReadBlockAt(&b1, addr)
if err := b1.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal data from tbf: %s", err)
}
if b1.RowsCount() != b.RowsCount() {
return fmt.Errorf("unexpected number of rows in tbf block; got %d; want %d", b1.RowsCount(), b.RowsCount())
}
if !reflect.DeepEqual(b1.Timestamps(), b.Timestamps()) {
return fmt.Errorf("unexpected timestamps; got\n%v\nwant\n%v", b1.Timestamps(), b.Timestamps())
}
if !reflect.DeepEqual(b1.Values(), b.Values()) {
return fmt.Errorf("unexpected values; got\n%v\nwant\n%v", b1.Values(), b.Values())
}
}
return nil
}()
}()
}
for i := range addrs {
workCh <- i
}
close(workCh)
for i := 0; i < concurrency; i++ {
select {
case err := <-doneCh:
if err != nil {
return err
}
case <-time.After(time.Second):
return fmt.Errorf("timeout")
}
}
return nil
}()
if err != nil {
return err
}
}
return nil
}