diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 2400e95eb..5ba759a45 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -423,6 +423,13 @@ func registerStorageMetrics() { return float64(idbm().RecentHourInvertedIndexSearchHits) }) + metrics.NewGauge(`vm_date_range_search_calls_total`, func() float64 { + return float64(idbm().DateRangeSearchCalls) + }) + metrics.NewGauge(`vm_date_range_hits_total`, func() float64 { + return float64(idbm().DateRangeSearchHits) + }) + metrics.NewGauge(`vm_cache_entries{type="storage/tsid"}`, func() float64 { return float64(m().TSIDCacheSize) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 711de0e30..fda7e7719 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -42,6 +42,9 @@ const ( // Prefix for Date->MetricID entries. nsPrefixDateToMetricID = 5 + + // Prefix for (Date,Tag)->MetricID entries. + nsPrefixDateTagToMetricIDs = 6 ) func shouldCacheBlock(item []byte) bool { @@ -50,8 +53,8 @@ func shouldCacheBlock(item []byte) bool { } // Do not cache items starting from switch item[0] { - case nsPrefixTagToMetricIDs: - // Do not cache blocks with tag->metricIDs items, since: + case nsPrefixTagToMetricIDs, nsPrefixDateTagToMetricIDs: + // Do not cache blocks with tag->metricIDs and (date,tag)->metricIDs items, since: // - these blocks are scanned sequentially, so the overhead // on their unmarshaling is amortized by the sequential scan. // - these blocks can occupy high amounts of RAM in cache @@ -98,8 +101,17 @@ type indexDB struct { // The number of hits for recent hour searches over inverted index. recentHourInvertedIndexSearchHits uint64 + // The number of calls for date range searches. + dateRangeSearchCalls uint64 + + // The number of hits for date range searches. + dateRangeSearchHits uint64 + mustDrop uint64 + // Start date fully covered by per-day inverted index. + startDateForPerDayInvertedIndex uint64 + name string tb *mergeset.Table @@ -184,6 +196,14 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca } db.setDeletedMetricIDs(dmis) + is = db.getIndexSearch() + date, err := is.getStartDateForPerDayInvertedIndex() + db.putIndexSearch(is) + if err != nil { + return nil, fmt.Errorf("cannot obtain start date for per-day inverted index: %s", err) + } + db.startDateForPerDayInvertedIndex = date + return db, nil } @@ -214,6 +234,9 @@ type IndexDBMetrics struct { RecentHourInvertedIndexSearchCalls uint64 RecentHourInvertedIndexSearchHits uint64 + DateRangeSearchCalls uint64 + DateRangeSearchHits uint64 + IndexBlocksWithMetricIDsProcessed uint64 IndexBlocksWithMetricIDsIncorrectOrder uint64 @@ -255,6 +278,9 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls) m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits) + m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) + m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits) + m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed) m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder) @@ -734,7 +760,7 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er if !bytes.HasPrefix(item, prefix) { break } - if err := mp.Init(item); err != nil { + if err := mp.Init(item, nsPrefixTagToMetricIDs); err != nil { return err } if mp.IsDeletedTag(dmis) { @@ -802,7 +828,7 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m if !bytes.HasPrefix(item, prefix) { break } - if err := mp.Init(item); err != nil { + if err := mp.Init(item, nsPrefixTagToMetricIDs); err != nil { return err } if mp.IsDeletedTag(dmis) { @@ -960,6 +986,35 @@ func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) { db.deletedMetricIDsUpdateLock.Unlock() } +func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) { + minDate := uint64(timestampFromTime(time.Now())) / msecPerDay + kb := &is.kb + ts := &is.ts + kb.B = append(kb.B[:0], nsPrefixDateTagToMetricIDs) + prefix := kb.B + ts.Seek(kb.B) + for ts.NextItem() { + item := ts.Item + if !bytes.HasPrefix(item, prefix) { + break + } + suffix := item[len(prefix):] + + // Suffix must contain encoded 64-bit date. + if len(suffix) < 8 { + return 0, fmt.Errorf("unexpected (date, tag)->metricIDs row len; must be at least 8 bytes; got %d bytes", len(suffix)) + } + minDate = encoding.UnmarshalUint64(suffix) + break + } + if err := ts.Error(); err != nil { + return 0, err + } + // The minDate can contain incomplete inverted index, so increment it. + minDate++ + return minDate, nil +} + func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) { dmis := &uint64set.Set{} ts := &is.ts @@ -1614,7 +1669,16 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf // Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour. return nil } + ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) + if err != nil { + return err + } + if ok { + // Fast path: found metricIDs by date range. + return nil + } + // Slow path - try searching over the whole inverted index. minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics) if err != nil { return err @@ -1922,7 +1986,8 @@ var errMissingMetricIDsForDate = errors.New("missing metricIDs for date") func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*uint64set.Set, error) { atomic.AddUint64(&is.db.recentHourMetricIDsSearchCalls, 1) - if metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics); ok { + metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics) + if ok { // Fast path: tr covers the current and / or the previous hour. // Return the full list of metric ids for this time range. atomic.AddUint64(&is.db.recentHourMetricIDsSearchHits, 1) @@ -1937,17 +2002,160 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (* // Too much dates must be covered. Give up. return nil, errMissingMetricIDsForDate } - metricIDs := &uint64set.Set{} + + // Search for metricIDs for each day in parallel. + metricIDs = &uint64set.Set{} + var wg sync.WaitGroup + var errGlobal error + var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below. for minDate <= maxDate { - if err := is.getMetricIDsForDate(minDate, metricIDs, maxMetrics); err != nil { - return nil, err - } + date := minDate + isLocal := is.db.getIndexSearch() + wg.Add(1) + go func() { + defer wg.Done() + defer is.db.putIndexSearch(isLocal) + var result uint64set.Set + err := isLocal.getMetricIDsForDate(date, &result, maxMetrics) + mu.Lock() + if metricIDs.Len() < maxMetrics { + metricIDs.Union(&result) + } + if err != nil { + errGlobal = err + } + mu.Unlock() + }() minDate++ } + wg.Wait() + if errGlobal != nil { + return nil, errGlobal + } atomic.AddUint64(&is.db.dateMetricIDsSearchHits, 1) return metricIDs, nil } +func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) (bool, error) { + atomic.AddUint64(&is.db.dateRangeSearchCalls, 1) + minDate := uint64(tr.MinTimestamp) / msecPerDay + maxDate := uint64(tr.MaxTimestamp) / msecPerDay + if minDate < is.db.startDateForPerDayInvertedIndex || maxDate < minDate { + // Per-day inverted index doesn't cover the selected date range. + return false, nil + } + if maxDate-minDate > maxDaysForDateMetricIDs { + // Too much dates must be covered. Give up, since it may be slow. + return false, nil + } + + // Search for metricIDs for each day in parallel. + var wg sync.WaitGroup + var errGlobal error + okGlobal := true + var mu sync.Mutex // protects metricIDs + *Global vars from concurrent access below + for minDate <= maxDate { + date := minDate + isLocal := is.db.getIndexSearch() + wg.Add(1) + go func() { + defer wg.Done() + defer is.db.putIndexSearch(isLocal) + var result uint64set.Set + ok, err := isLocal.tryUpdatingMetricIDsForDate(date, &result, tfs, maxMetrics) + mu.Lock() + if metricIDs.Len() < maxMetrics { + metricIDs.Union(&result) + } + if !ok { + okGlobal = ok + } + if err != nil { + errGlobal = fmt.Errorf("cannot search for metricIDs on date %d: %s", date, err) + } + mu.Unlock() + }() + minDate++ + } + wg.Wait() + if errGlobal != nil { + return false, errGlobal + } + atomic.AddUint64(&is.db.dateRangeSearchHits, 1) + return okGlobal, nil +} + +func (is *indexSearch) tryUpdatingMetricIDsForDate(date uint64, metricIDs *uint64set.Set, tfs *TagFilters, maxMetrics int) (bool, error) { + var tfFirst *tagFilter + for i := range tfs.tfs { + tf := &tfs.tfs[i] + if tf.isNegative { + continue + } + tfFirst = tf + break + } + + var result *uint64set.Set + maxDateMetrics := maxMetrics * 20 + if tfFirst == nil { + result = &uint64set.Set{} + if err := is.updateMetricIDsForDateAll(result, date, maxDateMetrics); err != nil { + if err == errMissingMetricIDsForDate { + // Zero data points were written on the given date. + // It is OK, since (date, metricID) entries must exist for the given date + // according to startDateForPerDayInvertedIndex. + return true, nil + } + return false, fmt.Errorf("cannot obtain all the metricIDs for date %d: %s", date, err) + } + } else { + m, err := is.getMetricIDsForDateTagFilter(tfFirst, date, tfs.commonPrefix, maxDateMetrics) + if err != nil { + if err == errFallbackToMetricNameMatch { + // The per-date search is too expensive. Probably it is better to perform global search + // using metric name match. + return false, nil + } + return false, err + } + result = m + } + if result.Len() >= maxDateMetrics { + return false, fmt.Errorf("more than %d time series found; narrow down the query or increase `-search.maxUniqueTimeseries`", maxDateMetrics) + } + + for i := range tfs.tfs { + tf := &tfs.tfs[i] + if tf == tfFirst { + continue + } + m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics) + if err != nil { + if err == errFallbackToMetricNameMatch { + // The per-date search is too expensive. Probably it is better to perform global search + // using metric name match. + return false, nil + } + return false, err + } + if m.Len() >= maxDateMetrics { + return false, fmt.Errorf("more than %d time series found for tag filter %s; narrow down the query or increase `-search.maxUniqueTimeseries`", + maxDateMetrics, tf) + } + if tf.isNegative { + result.Subtract(m) + } else { + result.Intersect(m) + } + if result.Len() == 0 { + return true, nil + } + } + metricIDs.Union(result) + return true, nil +} + func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) { minHour := uint64(tr.MinTimestamp) / msecPerHour maxHour := uint64(tr.MaxTimestamp) / msecPerHour @@ -2023,15 +2231,47 @@ func (db *indexDB) storeDateMetricID(date, metricID uint64) error { return nil } - // Slow path: create (date, metricID) entry. + // Slow path: create (date, metricID) entries. items := getIndexItems() - items.B = marshalCommonPrefix(items.B[:0], nsPrefixDateToMetricID) + defer putIndexItems(items) + + items.B = marshalCommonPrefix(items.B, nsPrefixDateToMetricID) items.B = encoding.MarshalUint64(items.B, date) items.B = encoding.MarshalUint64(items.B, metricID) items.Next() - err = db.tb.AddItems(items.Items) - putIndexItems(items) - return err + + // Create per-day inverted index entries for metricID. + kb := kbPool.Get() + defer kbPool.Put(kb) + mn := GetMetricName() + defer PutMetricName(mn) + kb.B, err = db.searchMetricName(kb.B[:0], metricID) + if err != nil { + return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err) + } + if err = mn.Unmarshal(kb.B); err != nil { + return fmt.Errorf("cannot unmarshal metricName %q obtained by metricID %d: %s", metricID, kb.B, err) + } + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + + items.B = append(items.B, kb.B...) + items.B = marshalTagValue(items.B, nil) + items.B = marshalTagValue(items.B, mn.MetricGroup) + items.B = encoding.MarshalUint64(items.B, metricID) + items.Next() + for i := range mn.Tags { + tag := &mn.Tags[i] + items.B = append(items.B, kb.B...) + items.B = tag.Marshal(items.B) + items.B = encoding.MarshalUint64(items.B, metricID) + items.Next() + } + + if err = db.tb.AddItems(items.Items); err != nil { + return fmt.Errorf("cannot add per-day entires for metricID %d: %s", metricID, err) + } + return nil } func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) { @@ -2052,6 +2292,23 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) { return true, nil } +func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) { + // Augument tag filter prefix for per-date search instead of global search. + if !bytes.HasPrefix(tf.prefix, commonPrefix) { + logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix) + } + kb := kbPool.Get() + defer kbPool.Put(kb) + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = append(kb.B, tf.prefix[len(commonPrefix):]...) + + tfNew := *tf + tfNew.isNegative = false // isNegative for the original tf is handled by the caller. + tfNew.prefix = kb.B + return is.getMetricIDsForTagFilter(&tfNew, maxMetrics) +} + func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs *uint64set.Set, maxMetrics int) error { ts := &is.ts kb := &is.kb @@ -2105,15 +2362,28 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) { return true, nil } -func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error { - ts := &is.ts - kb := &is.kb - mp := &is.mp +func (is *indexSearch) updateMetricIDsForDateAll(metricIDs *uint64set.Set, date uint64, maxMetrics int) error { + // Extract all the metricIDs from (date, __name__=value)->metricIDs entries. + kb := kbPool.Get() + defer kbPool.Put(kb) + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) + kb.B = encoding.MarshalUint64(kb.B, date) + kb.B = marshalTagValue(kb.B, nil) + return is.updateMetricIDsForPrefix(kb.B, metricIDs, maxMetrics) +} - // Extract all the mtricIDs from (__name__=value) metricIDs. +func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error { + kb := kbPool.Get() + defer kbPool.Put(kb) + // Extract all the metricIDs from (__name__=value)->metricIDs entries. kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs) kb.B = marshalTagValue(kb.B, nil) - prefix := kb.B + return is.updateMetricIDsForPrefix(kb.B, metricIDs, maxMetrics) +} + +func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int) error { + ts := &is.ts + mp := &is.mp ts.Seek(prefix) for ts.NextItem() { item := ts.Item @@ -2248,6 +2518,13 @@ func unmarshalCommonPrefix(src []byte) ([]byte, byte, error) { const commonPrefixLen = 1 type tagToMetricIDsRowParser struct { + // NSPrefix contains the first byte parsed from the row after Init call. + // This is either nsPrefixTagToMetricIDs or nsPrefixDateTagToMetricIDs. + NSPrefix byte + + // Date contains parsed date for nsPrefixDateTagToMetricIDs rows after Init call + Date uint64 + // MetricIDs contains parsed MetricIDs after ParseMetricIDs call MetricIDs []uint64 @@ -2259,6 +2536,8 @@ type tagToMetricIDsRowParser struct { } func (mp *tagToMetricIDsRowParser) Reset() { + mp.NSPrefix = 0 + mp.Date = 0 mp.MetricIDs = mp.MetricIDs[:0] mp.Tag.Reset() mp.tail = nil @@ -2267,14 +2546,23 @@ func (mp *tagToMetricIDsRowParser) Reset() { // Init initializes mp from b, which should contain encoded tag->metricIDs row. // // b cannot be re-used until Reset call. -func (mp *tagToMetricIDsRowParser) Init(b []byte) error { - tail, prefix, err := unmarshalCommonPrefix(b) +func (mp *tagToMetricIDsRowParser) Init(b []byte, nsPrefixExpected byte) error { + tail, nsPrefix, err := unmarshalCommonPrefix(b) if err != nil { return fmt.Errorf("invalid tag->metricIDs row %q: %s", b, err) } - if prefix != nsPrefixTagToMetricIDs { - return fmt.Errorf("invalid prefix for tag->metricIDs row %q; got %d; want %d", b, prefix, nsPrefixTagToMetricIDs) + if nsPrefix != nsPrefixExpected { + return fmt.Errorf("invalid prefix for tag->metricIDs row %q; got %d; want %d", b, nsPrefix, nsPrefixExpected) } + if nsPrefix == nsPrefixDateTagToMetricIDs { + // unmarshal date. + if len(tail) < 8 { + return fmt.Errorf("cannot unmarshal date from (date, tag)->metricIDs row %q from %d bytes; want at least 8 bytes", b, len(tail)) + } + mp.Date = encoding.UnmarshalUint64(tail) + tail = tail[8:] + } + mp.NSPrefix = nsPrefix tail, err = mp.Tag.Unmarshal(tail) if err != nil { return fmt.Errorf("cannot unmarshal tag from tag->metricIDs row %q: %s", b, err) @@ -2282,6 +2570,16 @@ func (mp *tagToMetricIDsRowParser) Init(b []byte) error { return mp.InitOnlyTail(b, tail) } +// MarshalPrefix marshals row prefix without tail to dst. +func (mp *tagToMetricIDsRowParser) MarshalPrefix(dst []byte) []byte { + dst = marshalCommonPrefix(dst, mp.NSPrefix) + if mp.NSPrefix == nsPrefixDateTagToMetricIDs { + dst = encoding.MarshalUint64(dst, mp.Date) + } + dst = mp.Tag.Marshal(dst) + return dst +} + // InitOnlyTail initializes mp.tail from tail. // // b must contain tag->metricIDs row. @@ -2301,7 +2599,10 @@ func (mp *tagToMetricIDsRowParser) InitOnlyTail(b, tail []byte) error { // // Prefix contains (tag) func (mp *tagToMetricIDsRowParser) EqualPrefix(x *tagToMetricIDsRowParser) bool { - return mp.Tag.Equal(&x.Tag) + if !mp.Tag.Equal(&x.Tag) { + return false + } + return mp.Date == x.Date && mp.NSPrefix == x.NSPrefix } // FirstAndLastMetricIDs returns the first and the last metricIDs in the mp.tail. @@ -2364,22 +2665,28 @@ func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis *uint64set.Set) bool { } func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { - // Perform quick checks whether items contain tag->metricIDs rows + data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixTagToMetricIDs) + data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixDateTagToMetricIDs) + return data, items +} + +func mergeTagToMetricIDsRowsInternal(data []byte, items [][]byte, nsPrefix byte) ([]byte, [][]byte) { + // Perform quick checks whether items contain rows starting from nsPrefix // based on the fact that items are sorted. if len(items) <= 2 { // The first and the last row must remain unchanged. return data, items } firstItem := items[0] - if len(firstItem) > 0 && firstItem[0] > nsPrefixTagToMetricIDs { + if len(firstItem) > 0 && firstItem[0] > nsPrefix { return data, items } lastItem := items[len(items)-1] - if len(lastItem) > 0 && lastItem[0] < nsPrefixTagToMetricIDs { + if len(lastItem) > 0 && lastItem[0] < nsPrefix { return data, items } - // items contain at least one tag->metricIDs row. Merge rows with common tag. + // items contain at least one row starting from nsPrefix. Merge rows with common tag. tmm := getTagToMetricIDsRowsMerger() tmm.dataCopy = append(tmm.dataCopy[:0], data...) tmm.itemsCopy = append(tmm.itemsCopy[:0], items...) @@ -2388,29 +2695,25 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { dstData := data[:0] dstItems := items[:0] for i, item := range items { - if len(item) == 0 || item[0] != nsPrefixTagToMetricIDs || i == 0 || i == len(items)-1 { - // Write rows other than tag->metricIDs as-is. + if len(item) == 0 || item[0] != nsPrefix || i == 0 || i == len(items)-1 { + // Write rows not starting with nsPrefix as-is. // Additionally write the first and the last row as-is in order to preserve // sort order for adjancent blocks. - if len(tmm.pendingMetricIDs) > 0 { - dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) - } + dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) dstData = append(dstData, item...) dstItems = append(dstItems, dstData[len(dstData)-len(item):]) continue } - if err := mp.Init(item); err != nil { - logger.Panicf("FATAL: cannot parse tag->metricIDs row during merge: %s", err) + if err := mp.Init(item, nsPrefix); err != nil { + logger.Panicf("FATAL: cannot parse row starting with nsPrefix %d during merge: %s", nsPrefix, err) } if mp.MetricIDsLen() >= maxMetricIDsPerRow { - if len(tmm.pendingMetricIDs) > 0 { - dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) - } + dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) dstData = append(dstData, item...) dstItems = append(dstItems, dstData[len(dstData)-len(item):]) continue } - if len(tmm.pendingMetricIDs) > 0 && !mp.EqualPrefix(mpPrev) { + if !mp.EqualPrefix(mpPrev) { dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) } mp.ParseMetricIDs() @@ -2510,7 +2813,8 @@ func (tmm *tagToMetricIDsRowsMerger) Reset() { func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) { if len(tmm.pendingMetricIDs) == 0 { - logger.Panicf("BUG: pendingMetricIDs must be non-empty") + // Nothing to flush + return dstData, dstItems } // Use sort.Sort instead of sort.Slice in order to reduce memory allocations. sort.Sort(&tmm.pendingMetricIDs) @@ -2518,8 +2822,7 @@ func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstIt // Marshal pendingMetricIDs dstDataLen := len(dstData) - dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs) - dstData = mp.Tag.Marshal(dstData) + dstData = mp.MarshalPrefix(dstData) for _, metricID := range tmm.pendingMetricIDs { dstData = encoding.MarshalUint64(dstData, metricID) } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 076500069..3d8e66460 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -52,8 +52,11 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { t.Fatalf("unexpected items;\ngot\n%X\nwant\n%X", resultItems, expectedItems) } } - x := func(key, value string, metricIDs []uint64) string { - dst := marshalCommonPrefix(nil, nsPrefixTagToMetricIDs) + xy := func(nsPrefix byte, key, value string, metricIDs []uint64) string { + dst := marshalCommonPrefix(nil, nsPrefix) + if nsPrefix == nsPrefixDateTagToMetricIDs { + dst = encoding.MarshalUint64(dst, 1234567901233) + } t := &Tag{ Key: []byte(key), Value: []byte(value), @@ -64,6 +67,12 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { } return string(dst) } + x := func(key, value string, metricIDs []uint64) string { + return xy(nsPrefixTagToMetricIDs, key, value, metricIDs) + } + y := func(key, value string, metricIDs []uint64) string { + return xy(nsPrefixDateTagToMetricIDs, key, value, metricIDs) + } f(nil, nil) f([]string{}, nil) @@ -80,6 +89,19 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { x("", "", []uint64{0}), x("", "", []uint64{0}), }) + f([]string{ + x("", "", []uint64{0}), + x("", "", []uint64{0}), + x("", "", []uint64{0}), + y("", "", []uint64{0}), + y("", "", []uint64{0}), + y("", "", []uint64{0}), + }, []string{ + x("", "", []uint64{0}), + x("", "", []uint64{0}), + y("", "", []uint64{0}), + y("", "", []uint64{0}), + }) f([]string{ x("", "", []uint64{0}), x("", "", []uint64{0}), @@ -102,6 +124,17 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { x("", "", []uint64{0}), x("", "", []uint64{0}), }) + f([]string{ + "\x00asdf", + y("", "", []uint64{0}), + y("", "", []uint64{0}), + y("", "", []uint64{0}), + y("", "", []uint64{0}), + }, []string{ + "\x00asdf", + y("", "", []uint64{0}), + y("", "", []uint64{0}), + }) f([]string{ "\x00asdf", x("", "", []uint64{0}), @@ -114,6 +147,19 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { x("", "", []uint64{0}), "xyz", }) + f([]string{ + "\x00asdf", + x("", "", []uint64{0}), + x("", "", []uint64{0}), + y("", "", []uint64{0}), + y("", "", []uint64{0}), + "xyz", + }, []string{ + "\x00asdf", + x("", "", []uint64{0}), + y("", "", []uint64{0}), + "xyz", + }) f([]string{ "\x00asdf", x("", "", []uint64{1}), @@ -210,10 +256,13 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { "\x00aa", x("foo", "bar", metricIDs), x("foo", "bar", metricIDs), + y("foo", "bar", metricIDs), + y("foo", "bar", metricIDs), "x", }, []string{ "\x00aa", x("foo", "bar", metricIDs), + y("foo", "bar", metricIDs), "x", }) @@ -271,10 +320,12 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { "\x00aa", x("foo", "bar", metricIDs), x("foo", "bar", metricIDs), + y("foo", "bar", metricIDs), "x", }, []string{ "\x00aa", x("foo", "bar", []uint64{123}), + y("foo", "bar", []uint64{123}), "x", }) @@ -301,11 +352,13 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { x("foo", "bar", metricIDs), x("foo", "bar", []uint64{123, 123, 125}), x("foo", "bar", []uint64{123, 124}), + y("foo", "bar", []uint64{123, 124}), }, []string{ "\x00aa", x("foo", "bar", metricIDs), x("foo", "bar", []uint64{123, 123, 125}), x("foo", "bar", []uint64{123, 124}), + y("foo", "bar", []uint64{123, 124}), }) f([]string{ x("foo", "bar", metricIDs), diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 014ba635d..a437b74d2 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -71,7 +71,22 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) { } func TestSearch(t *testing.T) { - path := "TestSearch" + t.Run("global_inverted_index", func(t *testing.T) { + testSearchGeneric(t, false, false) + }) + t.Run("perday_inverted_index", func(t *testing.T) { + testSearchGeneric(t, false, true) + }) + t.Run("recent_hour_global_inverted_index", func(t *testing.T) { + testSearchGeneric(t, true, false) + }) + t.Run("recent_hour_perday_inverted_index", func(t *testing.T) { + testSearchGeneric(t, true, true) + }) +} + +func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayInvertedIndex bool) { + path := fmt.Sprintf("TestSearch_%v_%v", forceRecentHourInvertedIndex, forcePerDayInvertedIndex) st, err := OpenStorage(path, 0) if err != nil { t.Fatalf("cannot open storage %q: %s", path, err) @@ -125,6 +140,17 @@ func TestSearch(t *testing.T) { if err != nil { t.Fatalf("cannot re-open storage %q: %s", path, err) } + if forcePerDayInvertedIndex { + idb := st.idb() + idb.startDateForPerDayInvertedIndex = 0 + idb.doExtDB(func(extDB *indexDB) { + extDB.startDateForPerDayInvertedIndex = 0 + }) + } + if forceRecentHourInvertedIndex { + hm := st.currHourMetricIDs.Load().(*hourMetricIDs) + hm.isFull = true + } // Run search. tr := TimeRange{ @@ -133,7 +159,7 @@ func TestSearch(t *testing.T) { } t.Run("serial", func(t *testing.T) { - if err := testSearch(st, tr, mrs, accountsCount); err != nil { + if err := testSearchInternal(st, tr, mrs, accountsCount); err != nil { t.Fatalf("unexpected error: %s", err) } }) @@ -142,7 +168,7 @@ func TestSearch(t *testing.T) { ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func() { - ch <- testSearch(st, tr, mrs, accountsCount) + ch <- testSearchInternal(st, tr, mrs, accountsCount) }() } for i := 0; i < cap(ch); i++ { @@ -158,7 +184,7 @@ func TestSearch(t *testing.T) { }) } -func testSearch(st *Storage, tr TimeRange, mrs []MetricRow, accountsCount int) error { +func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCount int) error { var s Search for i := 0; i < 10; i++ { // Prepare TagFilters for search. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 19a7af9b2..3c32231a3 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -883,7 +883,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra if err := s.tb.AddRows(rows); err != nil { lastError = fmt.Errorf("cannot add rows to table: %s", err) } - if err := s.updateDateMetricIDCache(rows, lastError); err != nil { + if err := s.updatePerDateData(rows, lastError); err != nil { lastError = err } if lastError != nil { @@ -892,7 +892,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra return rows, nil } -func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error { +func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { var date uint64 var hour uint64 var prevTimestamp int64 @@ -910,6 +910,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error // The r belongs to the current hour. Check for the current hour cache. if hm.m.Has(metricID) { // Fast path: the metricID is in the current hour cache. + // This means the metricID has been already added to per-day inverted index. continue } s.pendingHourEntriesLock.Lock() @@ -920,17 +921,20 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error // Slower path: check global cache for (date, metricID) entry. if s.dateMetricIDCache.Has(date, metricID) { + // The metricID has been already added to per-day inverted index. continue } - // Slow path: store the entry in the (date, metricID) cache and in the indexDB. + // Slow path: store the entry (date, metricID) entry in the indexDB. // It is OK if the (date, metricID) entry is added multiple times to db // by concurrent goroutines. - s.dateMetricIDCache.Set(date, metricID) if err := idb.storeDateMetricID(date, metricID); err != nil { lastError = err continue } + + // The metric must be added to cache only after it has been successfully added to indexDB. + s.dateMetricIDCache.Set(date, metricID) } return lastError } @@ -1192,6 +1196,11 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache *workingsetca return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %s", prevPath, err) } + // Adjust startDateForPerDayInvertedIndex for the previous index. + if prev.startDateForPerDayInvertedIndex > curr.startDateForPerDayInvertedIndex { + prev.startDateForPerDayInvertedIndex = curr.startDateForPerDayInvertedIndex + } + return curr, prev, nil } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 962d63766..3c130d21c 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -385,7 +385,7 @@ func TestStorageDeleteMetrics(t *testing.T) { t.Run("serial", func(t *testing.T) { for i := 0; i < 3; i++ { if err = testStorageDeleteMetrics(s, 0); err != nil { - t.Fatalf("unexpected error: %s", err) + t.Fatalf("unexpected error on iteration %d: %s", i, err) } // Re-open the storage in order to check how deleted metricIDs @@ -393,7 +393,7 @@ func TestStorageDeleteMetrics(t *testing.T) { s.MustClose() s, err = OpenStorage(path, 0) if err != nil { - t.Fatalf("cannot open storage after closing: %s", err) + t.Fatalf("cannot open storage after closing on iteration %d: %s", i, err) } } })