diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 3c34f54c31..4b66fc94e3 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1873,10 +1873,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac return nil, errMissingMetricIDsForDate } atomic.AddUint64(&is.db.recentHourMetricIDsSearchCalls, 1) - metricIDs, ok, err := is.getMetricIDsForRecentHours(tr, maxMetrics, accountID, projectID) - if err != nil { - return nil, err - } + metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics, accountID, projectID) if ok { // Fast path: tr covers the current and / or the previous hour. // Return the full list of metric ids for this time range. @@ -1903,38 +1900,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac return metricIDs, nil } -func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, accountID, projectID uint32) (*uint64set.Set, bool, error) { - metricIDs, ok := is.getMetricIDsForRecentHoursAll(tr, maxMetrics) - if !ok { - return nil, false, nil - } - - // Filter out metricIDs for non-matching (accountID, projectID). - // Sort metricIDs for faster lookups below. - sortedMetricIDs := metricIDs.AppendTo(nil) - ts := &is.ts - kb := &is.kb - kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID, accountID, projectID) - prefixLen := len(kb.B) - kb.B = encoding.MarshalUint64(kb.B, 0) - prefix := kb.B[:prefixLen] - for _, metricID := range sortedMetricIDs { - kb.B = encoding.MarshalUint64(prefix, metricID) - ts.Seek(kb.B) - if !ts.NextItem() { - break - } - if !bytes.HasPrefix(ts.Item, kb.B) { - metricIDs.Del(metricID) - } - } - if err := ts.Error(); err != nil { - return nil, false, fmt.Errorf("cannot filter out metricIDs by (accountID=%d, projectID=%d): %s", accountID, projectID, err) - } - return metricIDs, true, nil -} - -func (is *indexSearch) getMetricIDsForRecentHoursAll(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) { +func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, accountID, projectID uint32) (*uint64set.Set, bool) { // Return all the metricIDs for all the (AccountID, ProjectID) entries. // The caller is responsible for proper filtering later. minHour := uint64(tr.MinTimestamp) / msecPerHour @@ -1944,28 +1910,44 @@ func (is *indexSearch) getMetricIDsForRecentHoursAll(tr TimeRange, maxMetrics in // The tr fits the current hour. // Return a copy of hmCurr.m, because the caller may modify // the returned map. - if hmCurr.m.Len() > maxMetrics { + k := accountProjectKey{ + AccountID: accountID, + ProjectID: projectID, + } + m := hmCurr.byTenant[k] + if m.Len() > maxMetrics { return nil, false } - return hmCurr.m.Clone(), true + return m.Clone(), true } hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull { // The tr fits the previous hour. // Return a copy of hmPrev.m, because the caller may modify // the returned map. - if hmPrev.m.Len() > maxMetrics { + k := accountProjectKey{ + AccountID: accountID, + ProjectID: projectID, + } + m := hmPrev.byTenant[k] + if m.Len() > maxMetrics { return nil, false } - return hmPrev.m.Clone(), true + return m.Clone(), true } if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull { // The tr spans the previous and the current hours. - if hmCurr.m.Len()+hmPrev.m.Len() > maxMetrics { + k := accountProjectKey{ + AccountID: accountID, + ProjectID: projectID, + } + mCurr := hmCurr.byTenant[k] + mPrev := hmPrev.byTenant[k] + if mCurr.Len()+mPrev.Len() > maxMetrics { return nil, false } - metricIDs := hmCurr.m.Clone() - for _, metricID := range hmPrev.m.AppendTo(nil) { + metricIDs := mCurr.Clone() + for _, metricID := range mPrev.AppendTo(nil) { metricIDs.Add(metricID) } return metricIDs, true diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index f04d72d42e..0bd00180bf 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -678,10 +678,6 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC return false } - type accountProjectKey struct { - AccountID uint32 - ProjectID uint32 - } allKeys := make(map[accountProjectKey]map[string]bool) timeseriesCounters := make(map[accountProjectKey]map[uint64]bool) var tsidCopy TSID diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 3f9b3616ea..8a2dfad186 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -69,7 +69,7 @@ type Storage struct { // Pending MetricID values to be added to currHourMetricIDs. pendingHourMetricIDsLock sync.Mutex - pendingHourMetricIDs *uint64set.Set + pendingHourMetricIDs []pendingHourMetricIDEntry stop chan struct{} @@ -77,6 +77,17 @@ type Storage struct { retentionWatcherWG sync.WaitGroup } +type pendingHourMetricIDEntry struct { + AccountID uint32 + ProjectID uint32 + MetricID uint64 +} + +type accountProjectKey struct { + AccountID uint32 + ProjectID uint32 +} + // OpenStorage opens storage on the given path with the given number of retention months. func OpenStorage(path string, retentionMonths int) (*Storage, error) { if retentionMonths > maxRetentionMonths { @@ -125,7 +136,6 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids") s.currHourMetricIDs.Store(hmCurr) s.prevHourMetricIDs.Store(hmPrev) - s.pendingHourMetricIDs = &uint64set.Set{} // Load indexdb idbPath := path + "/indexdb" @@ -497,6 +507,8 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) return &hourMetricIDs{} } + + // Unmarshal header isFull := encoding.UnmarshalUint64(src) src = src[8:] hourLoaded := encoding.UnmarshalUint64(src) @@ -505,10 +517,12 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs logger.Infof("discarding %s, since it is outdated", name) return &hourMetricIDs{} } + + // Unmarshal hm.m hmLen := encoding.UnmarshalUint64(src) src = src[8:] - if uint64(len(src)) != 8*hmLen { - logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen) + if uint64(len(src)) < 8*hmLen { + logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want %d bytes", path, len(src), 8*hmLen) return &hourMetricIDs{} } m := &uint64set.Set{} @@ -517,11 +531,49 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs src = src[8:] m.Add(metricID) } + + // Unmarshal hm.byTenant + if len(src) < 8 { + logger.Errorf("discarding %s, since it has broken hm.byTenant header; got %d bytes; want %d bytes", path, len(src), 8) + return &hourMetricIDs{} + } + byTenantLen := encoding.UnmarshalUint64(src) + src = src[8:] + byTenant := make(map[accountProjectKey]*uint64set.Set, byTenantLen) + for i := uint64(0); i < byTenantLen; i++ { + if len(src) < 16 { + logger.Errorf("discarding %s, since it has broken accountID:projectID prefix; got %d bytes; want %d bytes", path, len(src), 16) + return &hourMetricIDs{} + } + accountID := encoding.UnmarshalUint32(src) + src = src[4:] + projectID := encoding.UnmarshalUint32(src) + src = src[4:] + mLen := encoding.UnmarshalUint64(src) + src = src[8:] + if uint64(len(src)) < 8*mLen { + logger.Errorf("discarding %s, since it has borken accountID:projectID entry; got %d bytes; want %d bytes", path, len(src), 8*mLen) + return &hourMetricIDs{} + } + m := &uint64set.Set{} + for j := uint64(0); j < mLen; j++ { + metricID := encoding.UnmarshalUint64(src) + src = src[8:] + m.Add(metricID) + } + k := accountProjectKey{ + AccountID: accountID, + ProjectID: projectID, + } + byTenant[k] = m + } + logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen) return &hourMetricIDs{ - m: m, - hour: hourLoaded, - isFull: isFull != 0, + m: m, + byTenant: byTenant, + hour: hourLoaded, + isFull: isFull != 0, } } @@ -534,12 +586,29 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { if hm.isFull { isFull = 1 } + + // Marshal header dst = encoding.MarshalUint64(dst, isFull) dst = encoding.MarshalUint64(dst, hm.hour) + + // Marshal hm.m dst = encoding.MarshalUint64(dst, uint64(hm.m.Len())) for _, metricID := range hm.m.AppendTo(nil) { dst = encoding.MarshalUint64(dst, metricID) } + + // Marshal hm.byTenant + var metricIDs []uint64 + dst = encoding.MarshalUint64(dst, uint64(len(hm.byTenant))) + for k, e := range hm.byTenant { + dst = encoding.MarshalUint32(dst, k.AccountID) + dst = encoding.MarshalUint32(dst, k.ProjectID) + dst = encoding.MarshalUint64(dst, uint64(e.Len())) + metricIDs = e.AppendTo(metricIDs[:0]) + for _, metricID := range metricIDs { + dst = encoding.MarshalUint64(dst, metricID) + } + } if err := ioutil.WriteFile(path, dst, 0644); err != nil { logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) } @@ -898,7 +967,12 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error continue } s.pendingHourMetricIDsLock.Lock() - s.pendingHourMetricIDs.Add(metricID) + e := pendingHourMetricIDEntry{ + AccountID: r.TSID.AccountID, + ProjectID: r.TSID.ProjectID, + MetricID: metricID, + } + s.pendingHourMetricIDs = append(s.pendingHourMetricIDs, e) s.pendingHourMetricIDsLock.Unlock() } @@ -924,7 +998,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error func (s *Storage) updateCurrHourMetricIDs() { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) s.pendingHourMetricIDsLock.Lock() - newMetricIDsLen := s.pendingHourMetricIDs.Len() + newMetricIDsLen := len(s.pendingHourMetricIDs) s.pendingHourMetricIDsLock.Unlock() hour := uint64(timestampFromTime(time.Now())) / msecPerHour if newMetricIDsLen == 0 && hm.hour == hour { @@ -934,25 +1008,44 @@ func (s *Storage) updateCurrHourMetricIDs() { // Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs. var m *uint64set.Set + var byTenant map[accountProjectKey]*uint64set.Set isFull := hm.isFull if hm.hour == hour { m = hm.m.Clone() + byTenant = make(map[accountProjectKey]*uint64set.Set, len(hm.byTenant)) + for k, e := range hm.byTenant { + byTenant[k] = e.Clone() + } } else { m = &uint64set.Set{} + byTenant = make(map[accountProjectKey]*uint64set.Set) isFull = true } + s.pendingHourMetricIDsLock.Lock() - newMetricIDs := s.pendingHourMetricIDs.AppendTo(nil) - s.pendingHourMetricIDs = &uint64set.Set{} + a := append([]pendingHourMetricIDEntry{}, s.pendingHourMetricIDs...) + s.pendingHourMetricIDs = s.pendingHourMetricIDs[:0] s.pendingHourMetricIDsLock.Unlock() - for _, metricID := range newMetricIDs { - m.Add(metricID) + + for _, x := range a { + m.Add(x.MetricID) + k := accountProjectKey{ + AccountID: x.AccountID, + ProjectID: x.ProjectID, + } + e := byTenant[k] + if e == nil { + e = &uint64set.Set{} + byTenant[k] = e + } + e.Add(x.MetricID) } hmNew := &hourMetricIDs{ - m: m, - hour: hour, - isFull: isFull, + m: m, + byTenant: byTenant, + hour: hour, + isFull: isFull, } s.currHourMetricIDs.Store(hmNew) if hm.hour != hour { @@ -961,9 +1054,10 @@ func (s *Storage) updateCurrHourMetricIDs() { } type hourMetricIDs struct { - m *uint64set.Set - hour uint64 - isFull bool + m *uint64set.Set + byTenant map[accountProjectKey]*uint64set.Set + hour uint64 + isFull bool } func (s *Storage) getTSIDFromCache(dst *TSID, metricName []byte) bool { diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 9cc9b0af69..4b438c28d1 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -18,10 +18,9 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { var s Storage s.currHourMetricIDs.Store(&hourMetricIDs{}) s.prevHourMetricIDs.Store(&hourMetricIDs{}) - s.pendingHourMetricIDs = &uint64set.Set{} return &s } - t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) { + t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ @@ -52,11 +51,11 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } - if s.pendingHourMetricIDs.Len() != 0 { - t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) + if len(s.pendingHourMetricIDs) != 0 { + t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) } }) - t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) { + t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ @@ -89,18 +88,34 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if !reflect.DeepEqual(hmPrev, hmEmpty) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) } - - if s.pendingHourMetricIDs.Len() != 0 { - t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) + if len(s.pendingHourMetricIDs) != 0 { + t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) } }) t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() - pendingHourMetricIDs := &uint64set.Set{} - pendingHourMetricIDs.Add(343) - pendingHourMetricIDs.Add(32424) - pendingHourMetricIDs.Add(8293432) - s.pendingHourMetricIDs = pendingHourMetricIDs + s.pendingHourMetricIDs = []pendingHourMetricIDEntry{ + {AccountID: 123, ProjectID: 431, MetricID: 343}, + {AccountID: 123, ProjectID: 431, MetricID: 32424}, + {AccountID: 1, ProjectID: 2, MetricID: 8293432}, + } + mExpected := &uint64set.Set{} + for _, e := range s.pendingHourMetricIDs { + mExpected.Add(e.MetricID) + } + byTenantExpected := make(map[accountProjectKey]*uint64set.Set) + for _, e := range s.pendingHourMetricIDs { + k := accountProjectKey{ + AccountID: e.AccountID, + ProjectID: e.ProjectID, + } + x := byTenantExpected[k] + if x == nil { + x = &uint64set.Set{} + byTenantExpected[k] = x + } + x.Add(e.MetricID) + } hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ @@ -119,8 +134,11 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } } - if !reflect.DeepEqual(hmCurr.m, pendingHourMetricIDs) { - t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, pendingHourMetricIDs) + if !reflect.DeepEqual(hmCurr.m, mExpected) { + t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, mExpected) + } + if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) { + t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected) } if !hmCurr.isFull { t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) @@ -130,18 +148,34 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if !reflect.DeepEqual(hmPrev, hmOrig) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } - - if s.pendingHourMetricIDs.Len() != 0 { - t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) + if len(s.pendingHourMetricIDs) != 0 { + t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0) } }) t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() - pendingHourMetricIDs := &uint64set.Set{} - pendingHourMetricIDs.Add(343) - pendingHourMetricIDs.Add(32424) - pendingHourMetricIDs.Add(8293432) - s.pendingHourMetricIDs = pendingHourMetricIDs + s.pendingHourMetricIDs = []pendingHourMetricIDEntry{ + {AccountID: 123, ProjectID: 431, MetricID: 343}, + {AccountID: 123, ProjectID: 431, MetricID: 32424}, + {AccountID: 1, ProjectID: 2, MetricID: 8293432}, + } + mExpected := &uint64set.Set{} + for _, e := range s.pendingHourMetricIDs { + mExpected.Add(e.MetricID) + } + byTenantExpected := make(map[accountProjectKey]*uint64set.Set) + for _, e := range s.pendingHourMetricIDs { + k := accountProjectKey{ + AccountID: e.AccountID, + ProjectID: e.ProjectID, + } + x := byTenantExpected[k] + if x == nil { + x = &uint64set.Set{} + byTenantExpected[k] = x + } + x.Add(e.MetricID) + } hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ @@ -162,7 +196,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { // Do not run other checks, since they may fail. return } - m := pendingHourMetricIDs.Clone() + m := mExpected.Clone() origMetricIDs := hmOrig.m.AppendTo(nil) for _, metricID := range origMetricIDs { m.Add(metricID) @@ -170,6 +204,9 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if !reflect.DeepEqual(hmCurr.m, m) { t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) } + if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) { + t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected) + } if hmCurr.isFull { t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false) } @@ -179,9 +216,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if !reflect.DeepEqual(hmPrev, hmEmpty) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) } - - if s.pendingHourMetricIDs.Len() != 0 { - t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0) + if len(s.pendingHourMetricIDs) != 0 { + t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", len(s.pendingHourMetricIDs), 0) } }) }