lib/storage: remove inmemory index for recent hour, since it uses too much memory

Production workload shows that the index requires ~4Kb of RAM per active time series.
This is too much for high number of active time series, so let's delete this index.

Now the queries should fall back to the index for the current day instead of the index
for the recent hour. The query performance for the current day index should be good enough
given the 100M rows/sec scan speed per CPU core.
This commit is contained in:
Aliaksandr Valialkin 2019-11-13 17:58:05 +02:00
parent 90bde025f0
commit 494ad0fdb3
8 changed files with 18 additions and 531 deletions

View File

@ -26,8 +26,6 @@ var (
vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services")
snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages")
disableRecentHourIndex = flag.Bool("disableRecentHourIndex", false, "Whether to disable inmemory inverted index for recent hour. "+
"This may be useful in order to reduce memory usage when working with high number of time series")
bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0") bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0")
smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0") smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0")
) )
@ -37,9 +35,6 @@ func main() {
buildinfo.Init() buildinfo.Init()
logger.Init() logger.Init()
if *disableRecentHourIndex {
storage.DisableRecentHourIndex()
}
storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
@ -357,25 +352,6 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(idbm().ItemsCount) return float64(idbm().ItemsCount)
}) })
metrics.NewGauge(`vm_recent_hour_inverted_index_entries`, func() float64 {
return float64(m().RecentHourInvertedIndexSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_size_bytes`, func() float64 {
return float64(m().RecentHourInvertedIndexSizeBytes)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_unique_tag_pairs`, func() float64 {
return float64(m().RecentHourInvertedIndexUniqueTagPairsSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_pending_metric_ids`, func() float64 {
return float64(m().RecentHourInvertedIndexPendingMetricIDsSize)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_search_calls_total`, func() float64 {
return float64(idbm().RecentHourInvertedIndexSearchCalls)
})
metrics.NewGauge(`vm_recent_hour_inverted_index_search_hits_total`, func() float64 {
return float64(idbm().RecentHourInvertedIndexSearchHits)
})
metrics.NewGauge(`vm_date_range_search_calls_total`, func() float64 { metrics.NewGauge(`vm_date_range_search_calls_total`, func() float64 {
return float64(idbm().DateRangeSearchCalls) return float64(idbm().DateRangeSearchCalls)
}) })
@ -436,6 +412,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 { metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheSizeBytes) return float64(m().MetricNameCacheSizeBytes)
}) })
metrics.NewGauge(`vm_cache_size_bytes{type="storage/date_metricID"}`, func() float64 {
return float64(m().DateMetricIDCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 { metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagCacheSizeBytes) return float64(idbm().TagCacheSizeBytes)
}) })

View File

@ -95,12 +95,6 @@ type indexDB struct {
// The number of successful searches for metric ids by days. // The number of successful searches for metric ids by days.
dateMetricIDsSearchHits uint64 dateMetricIDsSearchHits uint64
// The number of calls for recent hour searches over inverted index.
recentHourInvertedIndexSearchCalls uint64
// The number of hits for recent hour searches over inverted index.
recentHourInvertedIndexSearchHits uint64
// The number of calls for date range searches. // The number of calls for date range searches.
dateRangeSearchCalls uint64 dateRangeSearchCalls uint64
@ -231,9 +225,6 @@ type IndexDBMetrics struct {
DateMetricIDsSearchCalls uint64 DateMetricIDsSearchCalls uint64
DateMetricIDsSearchHits uint64 DateMetricIDsSearchHits uint64
RecentHourInvertedIndexSearchCalls uint64
RecentHourInvertedIndexSearchHits uint64
DateRangeSearchCalls uint64 DateRangeSearchCalls uint64
DateRangeSearchHits uint64 DateRangeSearchHits uint64
@ -275,9 +266,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls) m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls)
m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits) m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits)
m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls)
m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits)
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits) m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits)
@ -1693,10 +1681,6 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
return bytes.Compare(a.prefix, b.prefix) < 0 return bytes.Compare(a.prefix, b.prefix) < 0
}) })
if is.tryUpdatingMetricIDsForRecentHour(metricIDs, tfs, tr) {
// Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour.
return nil
}
ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics)
if err != nil { if err != nil {
return err return err
@ -2230,43 +2214,6 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
return nil, false return nil, false
} }
func (is *indexSearch) tryUpdatingMetricIDsForRecentHour(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool {
if disableRecentHourIndex {
return false
}
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1)
k := accountProjectKey{
AccountID: tfs.accountID,
ProjectID: tfs.projectID,
}
minHour := uint64(tr.MinTimestamp) / msecPerHour
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull {
// The tr fits the current hour.
hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.byTenant[k], tfs)
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
return true
}
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
// The tr fits the previous hour.
hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.byTenant[k], tfs)
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
return true
}
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
// The tr spans the previous and the current hours.
hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.byTenant[k], tfs)
hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.byTenant[k], tfs)
atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1)
return true
}
return false
}
func (db *indexDB) storeDateMetricID(date, metricID uint64, accountID, projectID uint32) error { func (db *indexDB) storeDateMetricID(date, metricID uint64, accountID, projectID uint32) error {
is := db.getIndexSearch() is := db.getIndexSearch()
ok, err := is.hasDateMetricID(date, metricID, accountID, projectID) ok, err := is.hasDateMetricID(date, metricID, accountID, projectID)

View File

@ -1500,17 +1500,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
ProjectID: projectID, ProjectID: projectID,
} }
prevMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: prevMetricIDs.m} prevMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: prevMetricIDs.m}
prevMetricIDs.iidx = newInmemoryInvertedIndex()
prevMetricIDs.iidx.MustUpdate(db, prevMetricIDs.byTenant)
if len(prevMetricIDs.iidx.pendingEntries) > 0 {
t.Fatalf("couldn't add %d metricIDs to inmemory inverted index for the previous hour", len(prevMetricIDs.iidx.pendingEntries))
}
currMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: currMetricIDs.m} currMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: currMetricIDs.m}
currMetricIDs.iidx = newInmemoryInvertedIndex()
currMetricIDs.iidx.MustUpdate(db, currMetricIDs.byTenant)
if len(currMetricIDs.iidx.pendingEntries) > 0 {
t.Fatalf("couldn't add %d metricIDs to inmemory inverted index for the current hour", len(currMetricIDs.iidx.pendingEntries))
}
} }
} }

View File

@ -1,313 +0,0 @@
package storage
import (
"bytes"
"fmt"
"io"
"sync"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
type inmemoryInvertedIndex struct {
mu sync.RWMutex
m map[string]*uint64set.Set
pendingEntries []pendingHourMetricIDEntry
}
func (iidx *inmemoryInvertedIndex) Marshal(dst []byte) []byte {
iidx.mu.RLock()
defer iidx.mu.RUnlock()
// Marshal iidx.m
var metricIDs []uint64
dst = encoding.MarshalUint64(dst, uint64(len(iidx.m)))
for k, v := range iidx.m {
dst = encoding.MarshalBytes(dst, []byte(k))
metricIDs = v.AppendTo(metricIDs[:0])
dst = marshalMetricIDs(dst, metricIDs)
}
// Marshal iidx.pendingEntries
dst = encoding.MarshalUint64(dst, uint64(len(iidx.pendingEntries)))
for _, e := range iidx.pendingEntries {
dst = encoding.MarshalUint32(dst, e.AccountID)
dst = encoding.MarshalUint32(dst, e.ProjectID)
dst = encoding.MarshalUint64(dst, e.MetricID)
}
return dst
}
func (iidx *inmemoryInvertedIndex) Unmarshal(src []byte) ([]byte, error) {
iidx.mu.Lock()
defer iidx.mu.Unlock()
// Unmarshal iidx.m
if len(src) < 8 {
return src, fmt.Errorf("cannot read len(iidx.m) from %d bytes; want at least 8 bytes", len(src))
}
mLen := int(encoding.UnmarshalUint64(src))
src = src[8:]
m := make(map[string]*uint64set.Set, mLen)
var metricIDs []uint64
for i := 0; i < mLen; i++ {
tail, k, err := encoding.UnmarshalBytes(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal key #%d for iidx.m: %s", i, err)
}
src = tail
tail, metricIDs, err = unmarshalMetricIDs(metricIDs[:0], src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal value #%d for iidx.m: %s", i, err)
}
src = tail
var v uint64set.Set
for _, metricID := range metricIDs {
v.Add(metricID)
}
m[string(k)] = &v
}
iidx.m = m
// Unmarshal iidx.pendingEntries
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal pendingEntriesLen from %d bytes; want at least %d bytes", len(src), 8)
}
pendingEntriesLen := int(encoding.UnmarshalUint64(src))
src = src[8:]
if len(src) < pendingEntriesLen*16 {
return src, fmt.Errorf("cannot unmarshal %d pending entries from %d bytes; want at least %d bytes", pendingEntriesLen, len(src), pendingEntriesLen*16)
}
for i := 0; i < pendingEntriesLen; i++ {
var e pendingHourMetricIDEntry
e.AccountID = encoding.UnmarshalUint32(src)
src = src[4:]
e.ProjectID = encoding.UnmarshalUint32(src)
src = src[4:]
e.MetricID = encoding.UnmarshalUint64(src)
src = src[8:]
iidx.pendingEntries = append(iidx.pendingEntries, e)
}
return src, nil
}
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
dst = encoding.MarshalUint64(dst, uint64(len(metricIDs)))
for _, metricID := range metricIDs {
dst = encoding.MarshalUint64(dst, metricID)
}
return dst
}
func unmarshalMetricIDs(dst []uint64, src []byte) ([]byte, []uint64, error) {
if len(src) < 8 {
return src, dst, fmt.Errorf("cannot unmarshal metricIDs len from %d bytes; want at least 8 bytes", len(src))
}
metricIDsLen := int(encoding.UnmarshalUint64(src))
src = src[8:]
if len(src) < 8*metricIDsLen {
return src, dst, fmt.Errorf("not enough bytes for unmarshaling %d metricIDs; want %d bytes; got %d bytes", metricIDsLen, 8*metricIDsLen, len(src))
}
for i := 0; i < metricIDsLen; i++ {
metricID := encoding.UnmarshalUint64(src)
src = src[8:]
dst = append(dst, metricID)
}
return src, dst, nil
}
func (iidx *inmemoryInvertedIndex) SizeBytes() uint64 {
n := uint64(0)
iidx.mu.RLock()
for k, v := range iidx.m {
n += uint64(len(k))
n += v.SizeBytes()
}
n += uint64(len(iidx.pendingEntries)) * uint64(unsafe.Sizeof(pendingHourMetricIDEntry{}))
iidx.mu.RUnlock()
return n
}
func (iidx *inmemoryInvertedIndex) GetUniqueTagPairsLen() int {
if iidx == nil {
return 0
}
iidx.mu.RLock()
n := len(iidx.m)
iidx.mu.RUnlock()
return n
}
func (iidx *inmemoryInvertedIndex) GetEntriesCount() int {
if iidx == nil {
return 0
}
n := 0
iidx.mu.RLock()
for _, v := range iidx.m {
n += v.Len()
}
iidx.mu.RUnlock()
return n
}
func (iidx *inmemoryInvertedIndex) GetPendingMetricIDsLen() int {
if iidx == nil {
return 0
}
iidx.mu.RLock()
n := len(iidx.pendingEntries)
iidx.mu.RUnlock()
return n
}
func newInmemoryInvertedIndex() *inmemoryInvertedIndex {
return &inmemoryInvertedIndex{
m: make(map[string]*uint64set.Set),
}
}
func (iidx *inmemoryInvertedIndex) MustUpdate(idb *indexDB, byTenant map[accountProjectKey]*uint64set.Set) {
var entries []pendingHourMetricIDEntry
var metricIDs []uint64
for k, v := range byTenant {
var e pendingHourMetricIDEntry
e.AccountID = k.AccountID
e.ProjectID = k.ProjectID
metricIDs = v.AppendTo(metricIDs[:0])
for _, metricID := range metricIDs {
e.MetricID = metricID
entries = append(entries, e)
}
}
iidx.mu.Lock()
iidx.pendingEntries = append(iidx.pendingEntries, entries...)
if err := iidx.addPendingEntriesLocked(idb); err != nil {
logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingEntries: %s", err)
}
iidx.mu.Unlock()
}
func (iidx *inmemoryInvertedIndex) AddMetricID(idb *indexDB, e pendingHourMetricIDEntry) {
iidx.mu.Lock()
iidx.pendingEntries = append(iidx.pendingEntries, e)
if err := iidx.addPendingEntriesLocked(idb); err != nil {
logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingEntries: %s", err)
}
iidx.mu.Unlock()
}
func (iidx *inmemoryInvertedIndex) UpdateMetricIDsForTagFilters(metricIDs, allMetricIDs *uint64set.Set, tfs *TagFilters) {
if iidx == nil {
return
}
var result *uint64set.Set
var tfFirst *tagFilter
for i := range tfs.tfs {
if tfs.tfs[i].isNegative {
continue
}
tfFirst = &tfs.tfs[i]
break
}
iidx.mu.RLock()
defer iidx.mu.RUnlock()
if tfFirst == nil {
result = allMetricIDs.Clone()
} else {
result = iidx.getMetricIDsForTagFilterLocked(tfFirst, tfs.commonPrefix)
result.Intersect(allMetricIDs) // This line is required for filtering metrics by (accountID, projectID)
}
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf == tfFirst {
continue
}
m := iidx.getMetricIDsForTagFilterLocked(tf, tfs.commonPrefix)
if tf.isNegative {
result.Subtract(m)
} else {
result.Intersect(m)
}
if result.Len() == 0 {
return
}
}
metricIDs.Union(result)
}
func (iidx *inmemoryInvertedIndex) getMetricIDsForTagFilterLocked(tf *tagFilter, commonPrefix []byte) *uint64set.Set {
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: tf.prefix must start with commonPrefix=%q; got %q", commonPrefix, tf.prefix)
}
prefix := tf.prefix[len(commonPrefix):]
var m uint64set.Set
kb := kbPool.Get()
defer kbPool.Put(kb)
for k, v := range iidx.m {
if len(k) < len(prefix) || k[:len(prefix)] != string(prefix) {
continue
}
kb.B = append(kb.B[:0], k[len(prefix):]...)
ok, err := tf.matchSuffix(kb.B)
if err != nil {
logger.Panicf("BUG: unexpected error from matchSuffix(%q): %s", kb.B, err)
}
if !ok {
continue
}
m.Union(v)
}
return &m
}
func (iidx *inmemoryInvertedIndex) addPendingEntriesLocked(idb *indexDB) error {
entries := iidx.pendingEntries
iidx.pendingEntries = iidx.pendingEntries[:0]
kb := kbPool.Get()
defer kbPool.Put(kb)
mn := GetMetricName()
defer PutMetricName(mn)
for _, e := range entries {
var err error
metricID := e.MetricID
kb.B, err = idb.searchMetricName(kb.B[:0], metricID, e.AccountID, e.ProjectID)
if err != nil {
if err == io.EOF {
iidx.pendingEntries = append(iidx.pendingEntries, e)
continue
}
return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err)
}
if err = mn.Unmarshal(kb.B); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %s", kb.B, err)
}
kb.B = marshalTagValue(kb.B[:0], nil)
kb.B = marshalTagValue(kb.B, mn.MetricGroup)
iidx.addMetricIDLocked(kb.B, metricID)
for i := range mn.Tags {
kb.B = mn.Tags[i].Marshal(kb.B[:0])
iidx.addMetricIDLocked(kb.B, metricID)
}
}
return nil
}
func (iidx *inmemoryInvertedIndex) addMetricIDLocked(key []byte, metricID uint64) {
v := iidx.m[string(key)]
if v == nil {
v = &uint64set.Set{}
iidx.m[string(key)] = v
}
v.Add(metricID)
}

View File

@ -1,47 +0,0 @@
package storage
import (
"fmt"
"reflect"
"testing"
)
func TestInmemoryInvertedIndexMarshalUnmarshal(t *testing.T) {
iidx := newInmemoryInvertedIndex()
const keysCount = 100
const metricIDsCount = 10000
for i := 0; i < metricIDsCount; i++ {
k := fmt.Sprintf("key %d", i%keysCount)
iidx.addMetricIDLocked([]byte(k), uint64(i))
}
for i := 0; i < 10; i++ {
var e pendingHourMetricIDEntry
e.AccountID = uint32(i)
e.ProjectID = uint32(i + 324)
e.MetricID = uint64(i * 43)
iidx.pendingEntries = append(iidx.pendingEntries, e)
}
data := iidx.Marshal(nil)
iidx2 := newInmemoryInvertedIndex()
tail, err := iidx2.Unmarshal(data)
if err != nil {
t.Fatalf("cannot unmarshal iidx: %s", err)
}
if len(tail) != 0 {
t.Fatalf("unexpected tail left after iidx unmarshaling: %d bytes", len(tail))
}
if len(iidx.m) != len(iidx2.m) {
t.Fatalf("unexpected len(iidx2.m); got %d; want %d", len(iidx2.m), len(iidx.m))
}
if !reflect.DeepEqual(iidx.pendingEntries, iidx2.pendingEntries) {
t.Fatalf("unexpected pendingMetricIDs; got\n%v;\nwant\n%v", iidx2.pendingEntries, iidx.pendingEntries)
}
for k, v := range iidx.m {
v2 := iidx2.m[k]
if !v.Equal(v2) {
t.Fatalf("unexpected set for key %q", k)
}
}
}

View File

@ -78,21 +78,15 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
func TestSearch(t *testing.T) { func TestSearch(t *testing.T) {
t.Run("global_inverted_index", func(t *testing.T) { t.Run("global_inverted_index", func(t *testing.T) {
testSearchGeneric(t, false, false) testSearchGeneric(t, false)
}) })
t.Run("perday_inverted_index", func(t *testing.T) { t.Run("perday_inverted_index", func(t *testing.T) {
testSearchGeneric(t, false, true) testSearchGeneric(t, true)
})
t.Run("recent_hour_global_inverted_index", func(t *testing.T) {
testSearchGeneric(t, true, false)
})
t.Run("recent_hour_perday_inverted_index", func(t *testing.T) {
testSearchGeneric(t, true, true)
}) })
} }
func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayInvertedIndex bool) { func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) {
path := fmt.Sprintf("TestSearch_%v_%v", forceRecentHourInvertedIndex, forcePerDayInvertedIndex) path := fmt.Sprintf("TestSearch_%v", forcePerDayInvertedIndex)
st, err := OpenStorage(path, 0) st, err := OpenStorage(path, 0)
if err != nil { if err != nil {
t.Fatalf("cannot open storage %q: %s", path, err) t.Fatalf("cannot open storage %q: %s", path, err)
@ -154,10 +148,6 @@ func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayIn
extDB.startDateForPerDayInvertedIndex = 0 extDB.startDateForPerDayInvertedIndex = 0
}) })
} }
if forceRecentHourInvertedIndex {
hm := st.currHourMetricIDs.Load().(*hourMetricIDs)
hm.isFull = true
}
// Run search. // Run search.
tr := TimeRange{ tr := TimeRange{

View File

@ -27,17 +27,6 @@ import (
const maxRetentionMonths = 12 * 100 const maxRetentionMonths = 12 * 100
var disableRecentHourIndex = false
// DisableRecentHourIndex disables in-memory inverted index for recent hour.
//
// This may be useful in order to save RAM for high cardinality data.
//
// This function must be called before OpenStorage.
func DisableRecentHourIndex() {
disableRecentHourIndex = true
}
// Storage represents TSDB storage. // Storage represents TSDB storage.
type Storage struct { type Storage struct {
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs. // Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
@ -327,16 +316,12 @@ type Metrics struct {
MetricNameCacheCollisions uint64 MetricNameCacheCollisions uint64
DateMetricIDCacheSize uint64 DateMetricIDCacheSize uint64
DateMetricIDCacheSizeBytes uint64
DateMetricIDCacheSyncsCount uint64 DateMetricIDCacheSyncsCount uint64
DateMetricIDCacheResetsCount uint64 DateMetricIDCacheResetsCount uint64
HourMetricIDCacheSize uint64 HourMetricIDCacheSize uint64
RecentHourInvertedIndexSize uint64
RecentHourInvertedIndexSizeBytes uint64
RecentHourInvertedIndexUniqueTagPairsSize uint64
RecentHourInvertedIndexPendingMetricIDsSize uint64
IndexDBMetrics IndexDBMetrics IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics TableMetrics TableMetrics
} }
@ -382,6 +367,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.MetricNameCacheCollisions += cs.Collisions m.MetricNameCacheCollisions += cs.Collisions
m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount()) m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount())
m.DateMetricIDCacheSizeBytes += uint64(s.dateMetricIDCache.SizeBytes())
m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount) m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount)
m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount) m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount)
@ -393,18 +379,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
} }
m.HourMetricIDCacheSize += uint64(hourMetricIDsLen) m.HourMetricIDCacheSize += uint64(hourMetricIDsLen)
m.RecentHourInvertedIndexSize += uint64(hmPrev.iidx.GetEntriesCount())
m.RecentHourInvertedIndexSize += uint64(hmCurr.iidx.GetEntriesCount())
m.RecentHourInvertedIndexSizeBytes += hmPrev.iidx.SizeBytes()
m.RecentHourInvertedIndexSizeBytes += hmCurr.iidx.SizeBytes()
m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmPrev.iidx.GetUniqueTagPairsLen())
m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmCurr.iidx.GetUniqueTagPairsLen())
m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmPrev.iidx.GetPendingMetricIDsLen())
m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmCurr.iidx.GetPendingMetricIDsLen())
s.idb().UpdateMetrics(&m.IndexDBMetrics) s.idb().UpdateMetrics(&m.IndexDBMetrics)
s.tb.UpdateMetrics(&m.TableMetrics) s.tb.UpdateMetrics(&m.TableMetrics)
} }
@ -519,7 +493,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
if !fs.IsPathExist(path) { if !fs.IsPathExist(path) {
logger.Infof("nothing to load from %q", path) logger.Infof("nothing to load from %q", path)
return &hourMetricIDs{ return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour, hour: hour,
} }
} }
@ -531,7 +504,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
if len(src) < 24 { if len(src) < 24 {
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
return &hourMetricIDs{ return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour, hour: hour,
} }
} }
@ -544,7 +516,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
if hourLoaded != hour { if hourLoaded != hour {
logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour) logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour)
return &hourMetricIDs{ return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour, hour: hour,
} }
} }
@ -555,7 +526,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
if uint64(len(src)) < 8*hmLen { if uint64(len(src)) < 8*hmLen {
logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want at least %d bytes", path, len(src), 8*hmLen) logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want at least %d bytes", path, len(src), 8*hmLen)
return &hourMetricIDs{ return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour, hour: hour,
} }
} }
@ -602,30 +572,9 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
byTenant[k] = m byTenant[k] = m
} }
// Unmarshal hm.iidx
iidx := newInmemoryInvertedIndex()
if !disableRecentHourIndex {
tail, err := iidx.Unmarshal(src)
if err != nil {
logger.Errorf("discarding %s, since it has broken hm.iidx data: %s", path, err)
return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
}
if len(tail) > 0 {
logger.Errorf("discarding %s, since it contains superflouos %d bytes of data", path, len(tail))
return &hourMetricIDs{
iidx: newInmemoryInvertedIndex(),
hour: hour,
}
}
}
logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen) logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen)
return &hourMetricIDs{ return &hourMetricIDs{
m: m, m: m,
iidx: iidx,
byTenant: byTenant, byTenant: byTenant,
hour: hourLoaded, hour: hourLoaded,
isFull: isFull != 0, isFull: isFull != 0,
@ -665,11 +614,6 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
} }
} }
if !disableRecentHourIndex {
// Marshal hm.iidx
dst = hm.iidx.Marshal(dst)
}
if err := ioutil.WriteFile(path, dst, 0644); err != nil { if err := ioutil.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
} }
@ -1030,9 +974,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
} }
s.pendingHourEntries = append(s.pendingHourEntries, e) s.pendingHourEntries = append(s.pendingHourEntries, e)
s.pendingHourEntriesLock.Unlock() s.pendingHourEntriesLock.Unlock()
if !disableRecentHourIndex {
hm.iidx.AddMetricID(idb, e)
}
} }
// Slower path: check global cache for (date, metricID) entry. // Slower path: check global cache for (date, metricID) entry.
@ -1098,6 +1039,15 @@ func (dmc *dateMetricIDCache) EntriesCount() int {
return n return n
} }
func (dmc *dateMetricIDCache) SizeBytes() uint64 {
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
n := uint64(0)
for _, e := range byDate.m {
n += e.v.SizeBytes()
}
return n
}
func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
byDate := dmc.byDate.Load().(*byDateMetricIDMap) byDate := dmc.byDate.Load().(*byDateMetricIDMap)
v := byDate.get(date) v := byDate.get(date)
@ -1210,19 +1160,16 @@ func (s *Storage) updateCurrHourMetricIDs() {
// Slow path: hm.m must be updated with non-empty s.pendingHourEntries. // Slow path: hm.m must be updated with non-empty s.pendingHourEntries.
var m *uint64set.Set var m *uint64set.Set
var iidx *inmemoryInvertedIndex
var byTenant map[accountProjectKey]*uint64set.Set var byTenant map[accountProjectKey]*uint64set.Set
isFull := hm.isFull isFull := hm.isFull
if hm.hour == hour { if hm.hour == hour {
m = hm.m.Clone() m = hm.m.Clone()
iidx = hm.iidx
byTenant = make(map[accountProjectKey]*uint64set.Set, len(hm.byTenant)) byTenant = make(map[accountProjectKey]*uint64set.Set, len(hm.byTenant))
for k, e := range hm.byTenant { for k, e := range hm.byTenant {
byTenant[k] = e.Clone() byTenant[k] = e.Clone()
} }
} else { } else {
m = &uint64set.Set{} m = &uint64set.Set{}
iidx = newInmemoryInvertedIndex()
byTenant = make(map[accountProjectKey]*uint64set.Set) byTenant = make(map[accountProjectKey]*uint64set.Set)
isFull = true isFull = true
} }
@ -1243,7 +1190,6 @@ func (s *Storage) updateCurrHourMetricIDs() {
hmNew := &hourMetricIDs{ hmNew := &hourMetricIDs{
m: m, m: m,
iidx: iidx,
byTenant: byTenant, byTenant: byTenant,
hour: hour, hour: hour,
isFull: isFull, isFull: isFull,
@ -1256,7 +1202,6 @@ func (s *Storage) updateCurrHourMetricIDs() {
type hourMetricIDs struct { type hourMetricIDs struct {
m *uint64set.Set m *uint64set.Set
iidx *inmemoryInvertedIndex
byTenant map[accountProjectKey]*uint64set.Set byTenant map[accountProjectKey]*uint64set.Set
hour uint64 hour uint64
isFull bool isFull bool

View File

@ -107,7 +107,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{ hmOrig := &hourMetricIDs{
m: &uint64set.Set{}, m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: 123, hour: 123,
} }
hmOrig.m.Add(12) hmOrig.m.Add(12)
@ -143,7 +142,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{ hmOrig := &hourMetricIDs{
m: &uint64set.Set{}, m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: hour, hour: hour,
} }
hmOrig.m.Add(12) hmOrig.m.Add(12)
@ -204,7 +202,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{ hmOrig := &hourMetricIDs{
m: &uint64set.Set{}, m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: 123, hour: 123,
} }
hmOrig.m.Add(12) hmOrig.m.Add(12)
@ -265,7 +262,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hour := uint64(timestampFromTime(time.Now())) / msecPerHour hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{ hmOrig := &hourMetricIDs{
m: &uint64set.Set{}, m: &uint64set.Set{},
iidx: newInmemoryInvertedIndex(),
hour: hour, hour: hour,
} }
hmOrig.m.Add(12) hmOrig.m.Add(12)