VictoriaMetrics/lib/mergeset/table_test.go
Zakhar Bessarab 26682e369e
lib/storage: enhancements for snapshots process (#3873)
* lib/{fs,mergeset,storage}: skip `.must-remove.` dirs when creating snapshot (#3858)

* lib/{mergeset,storage}: add timeout configuration for snapshots creation, remove incomplete snapshots from storage

* docs: fix formatting

* app/vmstorage: add metrics to track status of snapshots

* app/vmstorage: use `vm_http_requests_total` metric for snapshot endpoints metrics, rename new flag to make name more clear

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* app/vmstorage: update flag name in docs

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* app/vmstorage: reflect new metrics names change in docs

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-02-27 13:11:06 -08:00

304 lines
7.5 KiB
Go

package mergeset
import (
"bytes"
"fmt"
"math/rand"
"os"
"sync"
"sync/atomic"
"testing"
)
func TestTableOpenClose(t *testing.T) {
const path = "TestTableOpenClose"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
// Create a new table
var isReadOnly uint32
tb, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil {
t.Fatalf("cannot create new table: %s", err)
}
// Close it
tb.MustClose()
// Re-open created table multiple times.
for i := 0; i < 4; i++ {
tb, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil {
t.Fatalf("cannot open created table: %s", err)
}
tb.MustClose()
}
}
func TestTableOpenMultipleTimes(t *testing.T) {
const path = "TestTableOpenMultipleTimes"
defer func() {
_ = os.RemoveAll(path)
}()
var isReadOnly uint32
tb1, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil {
t.Fatalf("cannot open table: %s", err)
}
defer tb1.MustClose()
for i := 0; i < 4; i++ {
tb2, err := OpenTable(path, nil, nil, &isReadOnly)
if err == nil {
tb2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened table")
}
}
}
func TestTableAddItemsSerial(t *testing.T) {
r := rand.New(rand.NewSource(1))
const path = "TestTableAddItemsSerial"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
}
var isReadOnly uint32
tb, err := OpenTable(path, flushCallback, nil, &isReadOnly)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
const itemsCount = 10e3
testAddItemsSerial(r, tb, itemsCount)
// Verify items count after pending items flush.
tb.DebugFlush()
if atomic.LoadUint64(&flushes) == 0 {
t.Fatalf("unexpected zero flushes")
}
var m TableMetrics
tb.UpdateMetrics(&m)
if n := m.TotalItemsCount(); n != itemsCount {
t.Fatalf("unexpected itemsCount; got %d; want %v", n, itemsCount)
}
tb.MustClose()
// Re-open the table and make sure itemsCount remains the same.
testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts.
tb, err = OpenTable(path, nil, nil, &isReadOnly)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
const moreItemsCount = itemsCount * 3
testAddItemsSerial(r, tb, moreItemsCount)
tb.MustClose()
// Re-open the table and verify itemsCount again.
testReopenTable(t, path, itemsCount+moreItemsCount)
}
func testAddItemsSerial(r *rand.Rand, tb *Table, itemsCount int) {
for i := 0; i < itemsCount; i++ {
item := getRandomBytes(r)
if len(item) > maxInmemoryBlockSize {
item = item[:maxInmemoryBlockSize]
}
tb.AddItems([][]byte{item})
}
}
func TestTableCreateSnapshotAt(t *testing.T) {
const path = "TestTableCreateSnapshotAt"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
var isReadOnly uint32
tb, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
defer tb.MustClose()
// Write a lot of items into the table, so background merges would start.
const itemsCount = 3e5
for i := 0; i < itemsCount; i++ {
item := []byte(fmt.Sprintf("item %d", i))
tb.AddItems([][]byte{item})
}
tb.DebugFlush()
// Create multiple snapshots.
snapshot1 := path + "-test-snapshot1"
if err := tb.CreateSnapshotAt(snapshot1, 0); err != nil {
t.Fatalf("cannot create snapshot1: %s", err)
}
snapshot2 := path + "-test-snapshot2"
if err := tb.CreateSnapshotAt(snapshot2, 0); err != nil {
t.Fatalf("cannot create snapshot2: %s", err)
}
defer func() {
_ = os.RemoveAll(snapshot1)
_ = os.RemoveAll(snapshot2)
}()
// Verify snapshots contain all the data.
tb1, err := OpenTable(snapshot1, nil, nil, &isReadOnly)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
defer tb1.MustClose()
tb2, err := OpenTable(snapshot2, nil, nil, &isReadOnly)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
defer tb2.MustClose()
var ts, ts1, ts2 TableSearch
ts.Init(tb)
ts1.Init(tb1)
defer ts1.MustClose()
ts2.Init(tb2)
defer ts2.MustClose()
for i := 0; i < itemsCount; i++ {
key := []byte(fmt.Sprintf("item %d", i))
if err := ts.FirstItemWithPrefix(key); err != nil {
t.Fatalf("cannot find item[%d]=%q in the original table: %s", i, key, err)
}
if !bytes.Equal(key, ts.Item) {
t.Fatalf("unexpected item found for key=%q in the original table; got %q", key, ts.Item)
}
if err := ts1.FirstItemWithPrefix(key); err != nil {
t.Fatalf("cannot find item[%d]=%q in snapshot1: %s", i, key, err)
}
if !bytes.Equal(key, ts1.Item) {
t.Fatalf("unexpected item found for key=%q in snapshot1; got %q", key, ts1.Item)
}
if err := ts2.FirstItemWithPrefix(key); err != nil {
t.Fatalf("cannot find item[%d]=%q in snapshot2: %s", i, key, err)
}
if !bytes.Equal(key, ts2.Item) {
t.Fatalf("unexpected item found for key=%q in snapshot2; got %q", key, ts2.Item)
}
}
}
func TestTableAddItemsConcurrent(t *testing.T) {
const path = "TestTableAddItemsConcurrent"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
defer func() {
_ = os.RemoveAll(path)
}()
var flushes uint64
flushCallback := func() {
atomic.AddUint64(&flushes, 1)
}
prepareBlock := func(data []byte, items []Item) ([]byte, []Item) {
return data, items
}
var isReadOnly uint32
tb, err := OpenTable(path, flushCallback, prepareBlock, &isReadOnly)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
const itemsCount = 10e3
testAddItemsConcurrent(tb, itemsCount)
// Verify items count after pending items flush.
tb.DebugFlush()
if atomic.LoadUint64(&flushes) == 0 {
t.Fatalf("unexpected zero flushes")
}
var m TableMetrics
tb.UpdateMetrics(&m)
if n := m.TotalItemsCount(); n != itemsCount {
t.Fatalf("unexpected itemsCount; got %d; want %v", n, itemsCount)
}
tb.MustClose()
// Re-open the table and make sure itemsCount remains the same.
testReopenTable(t, path, itemsCount)
// Add more items in order to verify merge between inmemory parts and file-based parts.
tb, err = OpenTable(path, nil, nil, &isReadOnly)
if err != nil {
t.Fatalf("cannot open %q: %s", path, err)
}
const moreItemsCount = itemsCount * 3
testAddItemsConcurrent(tb, moreItemsCount)
tb.MustClose()
// Re-open the table and verify itemsCount again.
testReopenTable(t, path, itemsCount+moreItemsCount)
}
func testAddItemsConcurrent(tb *Table, itemsCount int) {
const goroutinesCount = 6
workCh := make(chan int, itemsCount)
var wg sync.WaitGroup
for i := 0; i < goroutinesCount; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
r := rand.New(rand.NewSource(int64(n)))
for range workCh {
item := getRandomBytes(r)
if len(item) > maxInmemoryBlockSize {
item = item[:maxInmemoryBlockSize]
}
tb.AddItems([][]byte{item})
}
}(i)
}
for i := 0; i < itemsCount; i++ {
workCh <- i
}
close(workCh)
wg.Wait()
}
func testReopenTable(t *testing.T, path string, itemsCount int) {
t.Helper()
for i := 0; i < 10; i++ {
var isReadOnly uint32
tb, err := OpenTable(path, nil, nil, &isReadOnly)
if err != nil {
t.Fatalf("cannot re-open %q: %s", path, err)
}
var m TableMetrics
tb.UpdateMetrics(&m)
if n := m.TotalItemsCount(); n != uint64(itemsCount) {
t.Fatalf("unexpected itemsCount after re-opening; got %d; want %v", n, itemsCount)
}
tb.MustClose()
}
}