VictoriaMetrics/lib/storage/storage_test.go
Roman Khavronenko cf1a8bce6b
lib/index: reduce read/write load after indexDB rotation (#2177)
* lib/index: reduce read/write load after indexDB rotation

IndexDB in VM is responsible for storing TSID - ID's used for identifying
time series. The index is stored on disk and used by both ingestion and read path.

IndexDB is stored separately to data parts and is global for all stored data.
It can't be deleted partially as VM deletes data parts. Instead, indexDB is
rotated once in `retention` interval.

The rotation procedure means that `current` indexDB becomes `previous`,
and new freshly created indexDB struct becomes `current`. So in any time,
VM holds indexDB for current and previous retention periods.
When time series is ingested or queried, VM checks if its TSID is present
in `current` indexDB. If it is missing, it checks the `previous` indexDB.
If TSID was found, it gets copied to the `current` indexDB. In this way
`current` indexDB stores only series which were active during the retention
period.

To improve indexDB lookups, VM uses a cache layer called `tsidCache`. Both
write and read path consult `tsidCache` and on miss the relad lookup happens.

When rotation happens, VM resets the `tsidCache`. This is needed for ingestion
path to trigger `current` indexDB re-population. Since index re-population
requires additional resources, every index rotation event may cause some extra
load on CPU and disk. While it may be unnoticeable for most of the cases,
for systems with very high number of unique series each rotation may lead
to performance degradation for some period of time.

This PR makes an attempt to smooth out resource usage after the rotation.
The changes are following:
1. `tsidCache` is no longer reset after the rotation;
2. Instead, each entry in `tsidCache` gains a notion of indexDB to which
they belong;
3. On ingestion path after the rotation we check if requested TSID was
found in `tsidCache`. Then we have 3 branches:
3.1 Fast path. It was found, and belongs to the `current` indexDB. Return TSID.
3.2 Slow path. It wasn't found, so we generate it from scratch,
add to `current` indexDB, add it to `tsidCache`.
3.3 Smooth path. It was found but does not belong to the `current` indexDB.
In this case, we add it to the `current` indexDB with some probability.
The probability is based on time passed since the last rotation with some threshold.
The more time has passed since rotation the higher is chance to re-populate `current` indexDB.
The default re-population interval in this PR is set to `1h`, during which entries from
`previous` index supposed to slowly re-populate `current` index.

The new metric `vm_timeseries_repopulated_total` was added to identify how many TSIDs
were moved from `previous` indexDB to the `current` indexDB. This metric supposed to
grow only during the first `1h` after the last rotation.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* wip

* wip

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2022-02-12 00:30:08 +02:00

1117 lines
31 KiB
Go

package storage
import (
"fmt"
"math/rand"
"os"
"reflect"
"sort"
"strings"
"testing"
"testing/quick"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
func TestReplaceAlternateRegexpsWithGraphiteWildcards(t *testing.T) {
f := func(q, resultExpected string) {
t.Helper()
result := replaceAlternateRegexpsWithGraphiteWildcards([]byte(q))
if string(result) != resultExpected {
t.Fatalf("unexpected result for %s\ngot\n%s\nwant\n%s", q, result, resultExpected)
}
}
f("", "")
f("foo", "foo")
f("foo(bar", "foo(bar")
f("foo.(bar|baz", "foo.(bar|baz")
f("foo.(bar).x", "foo.{bar}.x")
f("foo.(bar|baz).*.{x,y}", "foo.{bar,baz}.*.{x,y}")
f("foo.(bar|baz).*.{x,y}(z|aa)", "foo.{bar,baz}.*.{x,y}{z,aa}")
f("foo(.*)", "foo*")
}
func TestGetRegexpForGraphiteNodeQuery(t *testing.T) {
f := func(q, expectedRegexp string) {
t.Helper()
re, err := getRegexpForGraphiteQuery(q)
if err != nil {
t.Fatalf("unexpected error for query=%q: %s", q, err)
}
reStr := re.String()
if reStr != expectedRegexp {
t.Fatalf("unexpected regexp for query %q; got %q want %q", q, reStr, expectedRegexp)
}
}
f(``, `^$`)
f(`*`, `^[^.]*$`)
f(`foo.`, `^foo\.$`)
f(`foo.bar`, `^foo\.bar$`)
f(`{foo,b*ar,b[a-z]}`, `^(?:foo|b[^.]*ar|b[a-z])$`)
f(`[-a-zx.]`, `^[-a-zx.]$`)
f(`**`, `^[^.]*[^.]*$`)
f(`a*[de]{x,y}z`, `^a[^.]*[de](?:x|y)z$`)
f(`foo{bar`, `^foo\{bar$`)
f(`foo{ba,r`, `^foo\{ba,r$`)
f(`foo[bar`, `^foo\[bar$`)
f(`foo{bar}`, `^foobar$`)
f(`foo{bar,,b{{a,b*},z},[x-y]*z}a`, `^foo(?:bar||b(?:(?:a|b[^.]*)|z)|[x-y][^.]*z)a$`)
}
func TestDateMetricIDCacheSerial(t *testing.T) {
c := newDateMetricIDCache()
if err := testDateMetricIDCache(c, false); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
func TestDateMetricIDCacheConcurrent(t *testing.T) {
c := newDateMetricIDCache()
ch := make(chan error, 5)
for i := 0; i < 5; i++ {
go func() {
ch <- testDateMetricIDCache(c, true)
}()
}
for i := 0; i < 5; i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(time.Second * 5):
t.Fatalf("timeout")
}
}
}
func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
type dmk struct {
date uint64
metricID uint64
}
m := make(map[dmk]bool)
for i := 0; i < 1e5; i++ {
date := uint64(i) % 3
metricID := uint64(i) % 1237
if !concurrent && c.Has(date, metricID) {
if !m[dmk{date, metricID}] {
return fmt.Errorf("c.Has(%d, %d) must return false, but returned true", date, metricID)
}
continue
}
c.Set(date, metricID)
m[dmk{date, metricID}] = true
if !concurrent && !c.Has(date, metricID) {
return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID)
}
if i%11234 == 0 {
c.mu.Lock()
c.syncLocked()
c.mu.Unlock()
}
if i%34323 == 0 {
c.Reset()
m = make(map[dmk]bool)
}
}
// Verify fast path after sync.
for i := 0; i < 1e5; i++ {
date := uint64(i) % 3
metricID := uint64(i) % 123
c.Set(date, metricID)
}
c.mu.Lock()
c.syncLocked()
c.mu.Unlock()
for i := 0; i < 1e5; i++ {
date := uint64(i) % 3
metricID := uint64(i) % 123
if !concurrent && !c.Has(date, metricID) {
return fmt.Errorf("c.Has(%d, %d) must return true after sync", date, metricID)
}
}
// Verify c.Reset
if n := c.EntriesCount(); !concurrent && n < 123 {
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
}
c.Reset()
if n := c.EntriesCount(); !concurrent && n > 0 {
return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n)
}
return nil
}
func TestUpdateCurrHourMetricIDs(t *testing.T) {
newStorage := func() *Storage {
var s Storage
s.currHourMetricIDs.Store(&hourMetricIDs{})
s.prevHourMetricIDs.Store(&hourMetricIDs{})
s.pendingHourEntries = &uint64set.Set{}
return &s
}
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: 123,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hmCurr.hour != hour {
// It is possible new hour occurred. Update the hour and verify it again.
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
}
if hmCurr.m.Len() != 0 {
t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0)
}
if !hmCurr.isFull {
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true)
}
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
if !reflect.DeepEqual(hmPrev, hmOrig) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hmCurr.hour != hour {
// It is possible new hour occurred. Update the hour and verify it again.
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
// Do not run other checks, since they may fail.
return
}
if !reflect.DeepEqual(hmCurr, hmOrig) {
t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig)
}
if hmCurr.isFull {
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false)
}
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
hmEmpty := &hourMetricIDs{}
if !reflect.DeepEqual(hmPrev, hmEmpty) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries = pendingHourEntries
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: 123,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hmCurr.hour != hour {
// It is possible new hour occurred. Update the hour and verify it again.
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
}
if !hmCurr.m.Equal(pendingHourEntries) {
t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries)
}
if !hmCurr.isFull {
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true)
}
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
if !reflect.DeepEqual(hmPrev, hmOrig) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries = pendingHourEntries
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hmCurr.hour != hour {
// It is possible new hour occurred. Update the hour and verify it again.
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
// Do not run other checks, since they may fail.
return
}
m := pendingHourEntries.Clone()
hmOrig.m.ForEach(func(part []uint64) bool {
for _, metricID := range part {
m.Add(metricID)
}
return true
})
if !hmCurr.m.Equal(m) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
}
if hmCurr.isFull {
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false)
}
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
hmEmpty := &hourMetricIDs{}
if !reflect.DeepEqual(hmPrev, hmEmpty) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
}
func TestMetricRowMarshalUnmarshal(t *testing.T) {
var buf []byte
typ := reflect.TypeOf(&MetricRow{})
rnd := rand.New(rand.NewSource(1))
for i := 0; i < 1000; i++ {
v, ok := quick.Value(typ, rnd)
if !ok {
t.Fatalf("cannot create random MetricRow via quick.Value")
}
mr1 := v.Interface().(*MetricRow)
if mr1 == nil {
continue
}
buf = mr1.Marshal(buf[:0])
var mr2 MetricRow
tail, err := mr2.UnmarshalX(buf)
if err != nil {
t.Fatalf("cannot unmarshal mr1=%s: %s", mr1, err)
}
if len(tail) > 0 {
t.Fatalf("non-empty tail returned after MetricRow.Unmarshal for mr1=%s", mr1)
}
if mr1.MetricNameRaw == nil {
mr1.MetricNameRaw = []byte{}
}
if mr2.MetricNameRaw == nil {
mr2.MetricNameRaw = []byte{}
}
if !reflect.DeepEqual(mr1, &mr2) {
t.Fatalf("mr1 should match mr2; got\nmr1=%s\nmr2=%s", mr1, &mr2)
}
}
}
func TestNextRetentionDuration(t *testing.T) {
for retentionMonths := float64(0.1); retentionMonths < 120; retentionMonths += 0.3 {
d := nextRetentionDuration(int64(retentionMonths * msecsPerMonth))
if d <= 0 {
currTime := time.Now().UTC()
nextTime := time.Now().UTC().Add(d)
t.Fatalf("unexpected retention duration for retentionMonths=%f; got %s; must be %s + %f months", retentionMonths, nextTime, currTime, retentionMonths)
}
}
}
func TestStorageOpenClose(t *testing.T) {
path := "TestStorageOpenClose"
for i := 0; i < 10; i++ {
s, err := OpenStorage(path, -1, 1e5, 1e6)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s.MustClose()
}
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func TestStorageOpenMultipleTimes(t *testing.T) {
path := "TestStorageOpenMultipleTimes"
s1, err := OpenStorage(path, -1, 0, 0)
if err != nil {
t.Fatalf("cannot open storage the first time: %s", err)
}
for i := 0; i < 10; i++ {
s2, err := OpenStorage(path, -1, 0, 0)
if err == nil {
s2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened storage")
}
}
s1.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func TestStorageRandTimestamps(t *testing.T) {
path := "TestStorageRandTimestamps"
retentionMsecs := int64(60 * msecsPerMonth)
s, err := OpenStorage(path, retentionMsecs, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
t.Run("serial", func(t *testing.T) {
for i := 0; i < 3; i++ {
if err := testStorageRandTimestamps(s); err != nil {
t.Fatal(err)
}
s.MustClose()
s, err = OpenStorage(path, retentionMsecs, 0, 0)
}
})
t.Run("concurrent", func(t *testing.T) {
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
var err error
for i := 0; i < 2; i++ {
err = testStorageRandTimestamps(s)
}
ch <- err
}()
}
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatal(err)
}
case <-time.After(time.Second * 10):
t.Fatal("timeout")
}
}
})
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testStorageRandTimestamps(s *Storage) error {
const rowsPerAdd = 1e3
const addsCount = 2
typ := reflect.TypeOf(int64(0))
rnd := rand.New(rand.NewSource(1))
for i := 0; i < addsCount; i++ {
var mrs []MetricRow
var mn MetricName
mn.Tags = []Tag{
{[]byte("job"), []byte("webservice")},
{[]byte("instance"), []byte("1.2.3.4")},
}
for j := 0; j < rowsPerAdd; j++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", rand.Intn(100)))
metricNameRaw := mn.marshalRaw(nil)
timestamp := int64(rnd.NormFloat64() * 1e12)
if j%2 == 0 {
ts, ok := quick.Value(typ, rnd)
if !ok {
return fmt.Errorf("cannot create random timestamp via quick.Value")
}
timestamp = ts.Interface().(int64)
}
value := rnd.NormFloat64() * 1e12
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
mrs = append(mrs, mr)
}
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
errStr := err.Error()
if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") {
return fmt.Errorf("unexpected error when adding mrs: %w", err)
}
}
}
// Verify the storage contains rows.
var m Metrics
s.UpdateMetrics(&m)
if m.TableMetrics.SmallRowsCount == 0 {
return fmt.Errorf("expecting at least one row in the table")
}
return nil
}
func TestStorageDeleteMetrics(t *testing.T) {
path := "TestStorageDeleteMetrics"
s, err := OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
// Verify no tag keys exist
tks, err := s.SearchTagKeys(1e5, noDeadline)
if err != nil {
t.Fatalf("error in SearchTagKeys at the start: %s", err)
}
if len(tks) != 0 {
t.Fatalf("found non-empty tag keys at the start: %q", tks)
}
t.Run("serial", func(t *testing.T) {
for i := 0; i < 3; i++ {
if err = testStorageDeleteMetrics(s, 0); err != nil {
t.Fatalf("unexpected error on iteration %d: %s", i, err)
}
// Re-open the storage in order to check how deleted metricIDs
// are persisted.
s.MustClose()
s, err = OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage after closing on iteration %d: %s", i, err)
}
}
})
t.Run("concurrent", func(t *testing.T) {
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func(workerNum int) {
var err error
for j := 0; j < 2; j++ {
err = testStorageDeleteMetrics(s, workerNum)
if err != nil {
break
}
}
ch <- err
}(i)
}
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(30 * time.Second):
t.Fatalf("timeout")
}
}
})
// Verify no more tag keys exist
tks, err = s.SearchTagKeys(1e5, noDeadline)
if err != nil {
t.Fatalf("error in SearchTagKeys after the test: %s", err)
}
if len(tks) != 0 {
t.Fatalf("found non-empty tag keys after the test: %q", tks)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testStorageDeleteMetrics(s *Storage, workerNum int) error {
const rowsPerMetric = 100
const metricsCount = 30
workerTag := []byte(fmt.Sprintf("workerTag_%d", workerNum))
tksAll := make(map[string]bool)
tksAll[""] = true // __name__
for i := 0; i < metricsCount; i++ {
var mrs []MetricRow
var mn MetricName
job := fmt.Sprintf("job_%d_%d", i, workerNum)
instance := fmt.Sprintf("instance_%d_%d", i, workerNum)
mn.Tags = []Tag{
{[]byte("job"), []byte(job)},
{[]byte("instance"), []byte(instance)},
{workerTag, []byte("foobar")},
}
for i := range mn.Tags {
tksAll[string(mn.Tags[i].Key)] = true
}
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", i, workerNum))
metricNameRaw := mn.marshalRaw(nil)
for j := 0; j < rowsPerMetric; j++ {
timestamp := rand.Int63n(1e10)
value := rand.NormFloat64() * 1e6
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
mrs = append(mrs, mr)
}
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
return fmt.Errorf("unexpected error when adding mrs: %w", err)
}
}
s.DebugFlush()
// Verify tag values exist
tvs, err := s.SearchTagValues(workerTag, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagValues before metrics removal: %w", err)
}
if len(tvs) == 0 {
return fmt.Errorf("unexpected empty number of tag values for workerTag")
}
// Verify tag keys exist
tks, err := s.SearchTagKeys(1e5, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagKeys before metrics removal: %w", err)
}
if err := checkTagKeys(tks, tksAll); err != nil {
return fmt.Errorf("unexpected tag keys before metrics removal: %w", err)
}
var sr Search
tr := TimeRange{
MinTimestamp: 0,
MaxTimestamp: 2e10,
}
metricBlocksCount := func(tfs *TagFilters) int {
// Verify the number of blocks
n := 0
sr.Init(s, []*TagFilters{tfs}, tr, 1e5, noDeadline)
for sr.NextMetricBlock() {
n++
}
sr.MustClose()
return n
}
for i := 0; i < metricsCount; i++ {
tfs := NewTagFilters()
if err := tfs.Add(nil, []byte("metric_.+"), false, true); err != nil {
return fmt.Errorf("cannot add regexp tag filter: %w", err)
}
job := fmt.Sprintf("job_%d_%d", i, workerNum)
if err := tfs.Add([]byte("job"), []byte(job), false, false); err != nil {
return fmt.Errorf("cannot add job tag filter: %w", err)
}
if n := metricBlocksCount(tfs); n == 0 {
return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs)
}
deletedCount, err := s.DeleteMetrics([]*TagFilters{tfs})
if err != nil {
return fmt.Errorf("cannot delete metrics: %w", err)
}
if deletedCount == 0 {
return fmt.Errorf("expecting non-zero number of deleted metrics on iteration %d", i)
}
if n := metricBlocksCount(tfs); n != 0 {
return fmt.Errorf("expecting zero metric blocks after DeleteMetrics call for tfs=%s; got %d blocks", tfs, n)
}
// Try deleting empty tfss
deletedCount, err = s.DeleteMetrics(nil)
if err != nil {
return fmt.Errorf("cannot delete empty tfss: %w", err)
}
if deletedCount != 0 {
return fmt.Errorf("expecting zero deleted metrics for empty tfss; got %d", deletedCount)
}
}
// Make sure no more metrics left for the given workerNum
tfs := NewTagFilters()
if err := tfs.Add(nil, []byte(fmt.Sprintf("metric_.+_%d", workerNum)), false, true); err != nil {
return fmt.Errorf("cannot add regexp tag filter for worker metrics: %w", err)
}
if n := metricBlocksCount(tfs); n != 0 {
return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n)
}
tvs, err = s.SearchTagValues(workerTag, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagValues after all the metrics are removed: %w", err)
}
if len(tvs) != 0 {
return fmt.Errorf("found non-empty tag values for %q after metrics removal: %q", workerTag, tvs)
}
return nil
}
func checkTagKeys(tks []string, tksExpected map[string]bool) error {
if len(tks) < len(tksExpected) {
return fmt.Errorf("unexpected number of tag keys found; got %d; want at least %d; tks=%q, tksExpected=%v", len(tks), len(tksExpected), tks, tksExpected)
}
hasItem := func(k string, tks []string) bool {
for _, kk := range tks {
if k == kk {
return true
}
}
return false
}
for k := range tksExpected {
if !hasItem(k, tks) {
return fmt.Errorf("cannot find %q in tag keys %q", k, tks)
}
}
return nil
}
func TestStorageRegisterMetricNamesSerial(t *testing.T) {
path := "TestStorageRegisterMetricNamesSerial"
s, err := OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
if err := testStorageRegisterMetricNames(s); err != nil {
t.Fatalf("unexpected error: %s", err)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func TestStorageRegisterMetricNamesConcurrent(t *testing.T) {
path := "TestStorageRegisterMetricNamesConcurrent"
s, err := OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
ch <- testStorageRegisterMetricNames(s)
}()
}
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(10 * time.Second):
t.Fatalf("timeout")
}
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testStorageRegisterMetricNames(s *Storage) error {
const metricsPerAdd = 1e3
const addsCount = 10
addIDsMap := make(map[string]struct{})
for i := 0; i < addsCount; i++ {
var mrs []MetricRow
var mn MetricName
addID := fmt.Sprintf("%d", i)
addIDsMap[addID] = struct{}{}
mn.Tags = []Tag{
{[]byte("job"), []byte("webservice")},
{[]byte("instance"), []byte("1.2.3.4")},
{[]byte("add_id"), []byte(addID)},
}
now := timestampFromTime(time.Now())
for j := 0; j < metricsPerAdd; j++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", j))
metricNameRaw := mn.marshalRaw(nil)
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: now,
}
mrs = append(mrs, mr)
}
if err := s.RegisterMetricNames(mrs); err != nil {
return fmt.Errorf("unexpected error in AddMetrics: %w", err)
}
}
var addIDsExpected []string
for k := range addIDsMap {
addIDsExpected = append(addIDsExpected, k)
}
sort.Strings(addIDsExpected)
// Verify the storage contains the added metric names.
s.DebugFlush()
// Verify that SearchTagKeys returns correct result.
tksExpected := []string{
"",
"add_id",
"instance",
"job",
}
tks, err := s.SearchTagKeys(100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagKeys: %w", err)
}
sort.Strings(tks)
if !reflect.DeepEqual(tks, tksExpected) {
return fmt.Errorf("unexpected tag keys returned from SearchTagKeys;\ngot\n%q\nwant\n%q", tks, tksExpected)
}
// Verify that SearchTagKeysOnTimeRange returns correct result.
now := timestampFromTime(time.Now())
start := now - msecPerDay
end := now + 60*1000
tr := TimeRange{
MinTimestamp: start,
MaxTimestamp: end,
}
tks, err = s.SearchTagKeysOnTimeRange(tr, 100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagKeysOnTimeRange: %w", err)
}
sort.Strings(tks)
if !reflect.DeepEqual(tks, tksExpected) {
return fmt.Errorf("unexpected tag keys returned from SearchTagKeysOnTimeRange;\ngot\n%q\nwant\n%q", tks, tksExpected)
}
// Verify that SearchTagValues returns correct result.
addIDs, err := s.SearchTagValues([]byte("add_id"), addsCount+100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagValues: %w", err)
}
sort.Strings(addIDs)
if !reflect.DeepEqual(addIDs, addIDsExpected) {
return fmt.Errorf("unexpected tag values returned from SearchTagValues;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
}
// Verify that SearchTagValuesOnTimeRange returns correct result.
addIDs, err = s.SearchTagValuesOnTimeRange([]byte("add_id"), tr, addsCount+100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchTagValuesOnTimeRange: %w", err)
}
sort.Strings(addIDs)
if !reflect.DeepEqual(addIDs, addIDsExpected) {
return fmt.Errorf("unexpected tag values returned from SearchTagValuesOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
}
// Verify that SearchMetricNames returns correct result.
tfs := NewTagFilters()
if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil {
return fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
}
mns, err := s.SearchMetricNames([]*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchMetricNames: %w", err)
}
if len(mns) < metricsPerAdd {
return fmt.Errorf("unexpected number of metricNames returned from SearchMetricNames; got %d; want at least %d", len(mns), int(metricsPerAdd))
}
for i, mn := range mns {
addID := mn.GetTagValue("add_id")
if string(addID) != "0" {
return fmt.Errorf("unexpected addID for metricName #%d; got %q; want %q", i, addID, "0")
}
job := mn.GetTagValue("job")
if string(job) != "webservice" {
return fmt.Errorf("unexpected job for metricName #%d; got %q; want %q", i, job, "webservice")
}
}
return nil
}
func TestStorageAddRowsSerial(t *testing.T) {
path := "TestStorageAddRowsSerial"
s, err := OpenStorage(path, 0, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
if err := testStorageAddRows(s); err != nil {
t.Fatalf("unexpected error: %s", err)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func TestStorageAddRowsConcurrent(t *testing.T) {
path := "TestStorageAddRowsConcurrent"
s, err := OpenStorage(path, 0, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
ch <- testStorageAddRows(s)
}()
}
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(10 * time.Second):
t.Fatalf("timeout")
}
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testGenerateMetricRows(rows uint64, timestampMin, timestampMax int64) []MetricRow {
var mrs []MetricRow
var mn MetricName
mn.Tags = []Tag{
{[]byte("job"), []byte("webservice")},
{[]byte("instance"), []byte("1.2.3.4")},
}
for i := 0; i < int(rows); i++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", i))
metricNameRaw := mn.marshalRaw(nil)
timestamp := rand.Int63n(timestampMax-timestampMin) + timestampMin
value := rand.NormFloat64() * 1e6
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
mrs = append(mrs, mr)
}
return mrs
}
func testStorageAddRows(s *Storage) error {
const rowsPerAdd = 1e3
const addsCount = 10
for i := 0; i < addsCount; i++ {
mrs := testGenerateMetricRows(rowsPerAdd, 0, 1e10)
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
return fmt.Errorf("unexpected error when adding mrs: %w", err)
}
}
// Verify the storage contains rows.
minRowsExpected := uint64(rowsPerAdd * addsCount)
var m Metrics
s.UpdateMetrics(&m)
if m.TableMetrics.SmallRowsCount < minRowsExpected {
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, m.TableMetrics.SmallRowsCount)
}
// Try creating a snapshot from the storage.
snapshotName, err := s.CreateSnapshot()
if err != nil {
return fmt.Errorf("cannot create snapshot from the storage: %w", err)
}
// Verify the snapshot is visible
snapshots, err := s.ListSnapshots()
if err != nil {
return fmt.Errorf("cannot list snapshots: %w", err)
}
if !containsString(snapshots, snapshotName) {
return fmt.Errorf("cannot find snapshot %q in %q", snapshotName, snapshots)
}
// Try opening the storage from snapshot.
snapshotPath := s.path + "/snapshots/" + snapshotName
s1, err := OpenStorage(snapshotPath, 0, 0, 0)
if err != nil {
return fmt.Errorf("cannot open storage from snapshot: %w", err)
}
// Verify the snapshot contains rows
var m1 Metrics
s1.UpdateMetrics(&m1)
if m1.TableMetrics.SmallRowsCount < minRowsExpected {
return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, m1.TableMetrics.SmallRowsCount)
}
// Verify that force merge for the snapshot leaves only a single part per partition.
if err := s1.ForceMergePartitions(""); err != nil {
return fmt.Errorf("error when force merging partitions: %w", err)
}
ptws := s1.tb.GetPartitions(nil)
for _, ptw := range ptws {
pws := ptw.pt.GetParts(nil)
numParts := len(pws)
ptw.pt.PutParts(pws)
if numParts != 1 {
s1.tb.PutPartitions(ptws)
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want 1", ptw.pt.name, numParts)
}
}
s1.tb.PutPartitions(ptws)
s1.MustClose()
// Delete the snapshot and make sure it is no longer visible.
if err := s.DeleteSnapshot(snapshotName); err != nil {
return fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err)
}
snapshots, err = s.ListSnapshots()
if err != nil {
return fmt.Errorf("cannot list snapshots: %w", err)
}
if containsString(snapshots, snapshotName) {
return fmt.Errorf("snapshot %q must be deleted, but is still visible in %q", snapshotName, snapshots)
}
return nil
}
func TestStorageRotateIndexDB(t *testing.T) {
path := "TestStorageRotateIndexDB"
s, err := OpenStorage(path, 0, 0, 0)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
// Start indexDB rotater in a separate goroutine
stopCh := make(chan struct{})
rotateDoneCh := make(chan struct{})
go func() {
for {
select {
case <-stopCh:
close(rotateDoneCh)
return
default:
time.Sleep(time.Millisecond)
s.mustRotateIndexDB()
}
}
}()
// Run concurrent workers that insert / select data from the storage.
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func(workerNum int) {
ch <- testStorageAddMetrics(s, workerNum)
}(i)
}
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(10 * time.Second):
t.Fatalf("timeout")
}
}
close(stopCh)
<-rotateDoneCh
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testStorageAddMetrics(s *Storage, workerNum int) error {
const rowsCount = 1e3
var mn MetricName
mn.Tags = []Tag{
{[]byte("job"), []byte(fmt.Sprintf("webservice_%d", workerNum))},
{[]byte("instance"), []byte("1.2.3.4")},
}
for i := 0; i < rowsCount; i++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", workerNum, rand.Intn(10)))
metricNameRaw := mn.marshalRaw(nil)
timestamp := rand.Int63n(1e10)
value := rand.NormFloat64() * 1e6
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
if err := s.AddRows([]MetricRow{mr}, defaultPrecisionBits); err != nil {
return fmt.Errorf("unexpected error when adding mrs: %w", err)
}
}
// Verify the storage contains rows.
minRowsExpected := uint64(rowsCount)
var m Metrics
s.UpdateMetrics(&m)
if m.TableMetrics.SmallRowsCount < minRowsExpected {
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, m.TableMetrics.SmallRowsCount)
}
return nil
}
func containsString(a []string, s string) bool {
for i := range a {
if a[i] == s {
return true
}
}
return false
}