VictoriaMetrics/lib/mergeset/table_test.go
Aliaksandr Valialkin cf4701db65
lib/fs: add MustReadDir() function
Use fs.MustReadDir() instead of os.ReadDir() across the code in order to reduce the code verbosity.
The fs.MustReadDir() logs the error with the directory name and the call stack on error
before exit. This information should be enough for debugging the cause of the error.
2023-04-14 22:11:40 -07:00

252 lines
6.4 KiB
Go

package mergeset
import (
"bytes"
"fmt"
"math/rand"
"os"
"sync"
"sync/atomic"
"testing"
)
func TestTableOpenClose(t *testing.T) {
const path = "TestTableOpenClose"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
// Create a new table
var isReadOnly uint32
tb := MustOpenTable(path, nil, nil, &isReadOnly)
// Close it
tb.MustClose()
// Re-open created table multiple times.
for i := 0; i < 4; i++ {
tb := MustOpenTable(path, nil, nil, &isReadOnly)
tb.MustClose()
}
}
func TestTableAddItemsSerial(t *testing.T) {
r := rand.New(rand.NewSource(1))
const path = "TestTableAddItemsSerial"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
}
var isReadOnly uint32
tb := MustOpenTable(path, flushCallback, nil, &isReadOnly)
const itemsCount = 10e3
testAddItemsSerial(r, tb, itemsCount)
// Verify items count after pending items flush.
tb.DebugFlush()
if atomic.LoadUint64(&flushes) == 0 {
t.Fatalf("unexpected zero flushes")
}
var m TableMetrics
tb.UpdateMetrics(&m)
if n := m.TotalItemsCount(); n != itemsCount {
t.Fatalf("unexpected itemsCount; got %d; want %v", n, itemsCount)
}
tb.MustClose()
// Re-open the table and make sure itemsCount remains the same.
testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts.
tb = MustOpenTable(path, nil, nil, &isReadOnly)
const moreItemsCount = itemsCount * 3
testAddItemsSerial(r, tb, moreItemsCount)
tb.MustClose()
// Re-open the table and verify itemsCount again.
testReopenTable(t, path, itemsCount+moreItemsCount)
}
func testAddItemsSerial(r *rand.Rand, tb *Table, itemsCount int) {
for i := 0; i < itemsCount; i++ {
item := getRandomBytes(r)
if len(item) > maxInmemoryBlockSize {
item = item[:maxInmemoryBlockSize]
}
tb.AddItems([][]byte{item})
}
}
func TestTableCreateSnapshotAt(t *testing.T) {
const path = "TestTableCreateSnapshotAt"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
var isReadOnly uint32
tb := MustOpenTable(path, nil, nil, &isReadOnly)
defer tb.MustClose()
// Write a lot of items into the table, so background merges would start.
const itemsCount = 3e5
for i := 0; i < itemsCount; i++ {
item := []byte(fmt.Sprintf("item %d", i))
tb.AddItems([][]byte{item})
}
tb.DebugFlush()
// Create multiple snapshots.
snapshot1 := path + "-test-snapshot1"
if err := tb.CreateSnapshotAt(snapshot1, 0); err != nil {
t.Fatalf("cannot create snapshot1: %s", err)
}
snapshot2 := path + "-test-snapshot2"
if err := tb.CreateSnapshotAt(snapshot2, 0); err != nil {
t.Fatalf("cannot create snapshot2: %s", err)
}
defer func() {
_ = os.RemoveAll(snapshot1)
_ = os.RemoveAll(snapshot2)
}()
// Verify snapshots contain all the data.
tb1 := MustOpenTable(snapshot1, nil, nil, &isReadOnly)
defer tb1.MustClose()
tb2 := MustOpenTable(snapshot2, nil, nil, &isReadOnly)
defer tb2.MustClose()
var ts, ts1, ts2 TableSearch
ts.Init(tb)
ts1.Init(tb1)
defer ts1.MustClose()
ts2.Init(tb2)
defer ts2.MustClose()
for i := 0; i < itemsCount; i++ {
key := []byte(fmt.Sprintf("item %d", i))
if err := ts.FirstItemWithPrefix(key); err != nil {
t.Fatalf("cannot find item[%d]=%q in the original table: %s", i, key, err)
}
if !bytes.Equal(key, ts.Item) {
t.Fatalf("unexpected item found for key=%q in the original table; got %q", key, ts.Item)
}
if err := ts1.FirstItemWithPrefix(key); err != nil {
t.Fatalf("cannot find item[%d]=%q in snapshot1: %s", i, key, err)
}
if !bytes.Equal(key, ts1.Item) {
t.Fatalf("unexpected item found for key=%q in snapshot1; got %q", key, ts1.Item)
}
if err := ts2.FirstItemWithPrefix(key); err != nil {
t.Fatalf("cannot find item[%d]=%q in snapshot2: %s", i, key, err)
}
if !bytes.Equal(key, ts2.Item) {
t.Fatalf("unexpected item found for key=%q in snapshot2; got %q", key, ts2.Item)
}
}
}
func TestTableAddItemsConcurrent(t *testing.T) {
const path = "TestTableAddItemsConcurrent"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
}
prepareBlock := func(data []byte, items []Item) ([]byte, []Item) {
return data, items
}
var isReadOnly uint32
tb := MustOpenTable(path, flushCallback, prepareBlock, &isReadOnly)
const itemsCount = 10e3
testAddItemsConcurrent(tb, itemsCount)
// Verify items count after pending items flush.
tb.DebugFlush()
if atomic.LoadUint64(&flushes) == 0 {
t.Fatalf("unexpected zero flushes")
}
var m TableMetrics
tb.UpdateMetrics(&m)
if n := m.TotalItemsCount(); n != itemsCount {
t.Fatalf("unexpected itemsCount; got %d; want %v", n, itemsCount)
}
tb.MustClose()
// Re-open the table and make sure itemsCount remains the same.
testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts.
tb = MustOpenTable(path, nil, nil, &isReadOnly)
const moreItemsCount = itemsCount * 3
testAddItemsConcurrent(tb, moreItemsCount)
tb.MustClose()
// Re-open the table and verify itemsCount again.
testReopenTable(t, path, itemsCount+moreItemsCount)
}
func testAddItemsConcurrent(tb *Table, itemsCount int) {
const goroutinesCount = 6
workCh := make(chan int, itemsCount)
var wg sync.WaitGroup
for i := 0; i < goroutinesCount; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
r := rand.New(rand.NewSource(int64(n)))
for range workCh {
item := getRandomBytes(r)
if len(item) > maxInmemoryBlockSize {
item = item[:maxInmemoryBlockSize]
}
tb.AddItems([][]byte{item})
}
}(i)
}
for i := 0; i < itemsCount; i++ {
workCh <- i
}
close(workCh)
wg.Wait()
}
func testReopenTable(t *testing.T, path string, itemsCount int) {
t.Helper()
for i := 0; i < 10; i++ {
var isReadOnly uint32
tb := MustOpenTable(path, nil, nil, &isReadOnly)
var m TableMetrics
tb.UpdateMetrics(&m)
if n := m.TotalItemsCount(); n != uint64(itemsCount) {
t.Fatalf("unexpected itemsCount after re-opening; got %d; want %v", n, itemsCount)
}
tb.MustClose()
}
}