lib/storage: make sure non-nil args are passed to openIndexDB

This commit is contained in:
Aliaksandr Valialkin 2019-06-25 20:09:57 +03:00
parent eb2283a029
commit 683bf2a11f
3 changed files with 59 additions and 17 deletions

View File

@ -102,6 +102,19 @@ type indexDB struct {
// openIndexDB opens index db from the given path with the given caches.
func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) {
if metricIDCache == nil {
logger.Panicf("BUG: metricIDCache must be non-nil")
}
if metricNameCache == nil {
logger.Panicf("BUG: metricNameCache must be non-nil")
}
if currHourMetricIDs == nil {
logger.Panicf("BUG: currHourMetricIDs must be non-nil")
}
if prevHourMetricIDs == nil {
logger.Panicf("BUG: prevHourMetricIDs must be non-nil")
}
tb, err := mergeset.OpenTable(path)
if err != nil {
return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err)
@ -1251,11 +1264,8 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters
var errTooManyMetrics = errors.New("all the tag filters match too many metrics")
func (is *indexSearch) adjustMaxMetricsAdaptive(maxMetrics int) int {
if is.db.prevHourMetricIDs == nil {
return maxMetrics
}
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
if hmPrev == nil || !hmPrev.isFull {
if !hmPrev.isFull {
return maxMetrics
}
hourMetrics := len(hmPrev.m)
@ -1678,9 +1688,6 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (m
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (map[uint64]struct{}, bool) {
minHour := uint64(tr.MinTimestamp) / msecPerHour
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
if is.db.currHourMetricIDs == nil {
return nil, false
}
hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull {
// The tr fits the current hour.
@ -1691,9 +1698,6 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int)
}
return getMetricIDsCopy(hmCurr.m), true
}
if is.db.prevHourMetricIDs == nil {
return nil, false
}
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
// The tr fits the previous hour.

View File

@ -7,6 +7,7 @@ import (
"math/rand"
"os"
"reflect"
"sync/atomic"
"testing"
"time"
@ -60,8 +61,14 @@ func TestIndexDBOpenClose(t *testing.T) {
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
for i := 0; i < 5; i++ {
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, nil, nil)
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, &hmCurr, &hmPrev)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -82,8 +89,14 @@ func TestIndexDB(t *testing.T) {
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
dbName := "test-index-db-serial"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -113,7 +126,7 @@ func TestIndexDB(t *testing.T) {
// Re-open the db and verify it works as expected.
db.MustClose()
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil)
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -133,8 +146,14 @@ func TestIndexDB(t *testing.T) {
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
dbName := "test-index-db-concurrent"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"strconv"
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/fastcache"
@ -16,8 +17,14 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
const dbName = "bench-index-db-add-tsids"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
@ -76,8 +83,14 @@ func BenchmarkIndexDBSearchTSIDs(b *testing.B) {
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
const dbName = "bench-index-db-search-tsids"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
@ -145,8 +158,14 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
metricNameCache := fastcache.New(1234)
defer metricIDCache.Reset()
defer metricNameCache.Reset()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
const dbName = "bench-index-db-get-tsids"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}