mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-16 00:41:24 +01:00
lib/storage: optimize getMetricIDsForRecentHours for per-tenant lookups
This commit is contained in:
parent
ca480915ca
commit
6a22727676
@ -1873,10 +1873,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac
|
|||||||
return nil, errMissingMetricIDsForDate
|
return nil, errMissingMetricIDsForDate
|
||||||
}
|
}
|
||||||
atomic.AddUint64(&is.db.recentHourMetricIDsSearchCalls, 1)
|
atomic.AddUint64(&is.db.recentHourMetricIDsSearchCalls, 1)
|
||||||
metricIDs, ok, err := is.getMetricIDsForRecentHours(tr, maxMetrics, accountID, projectID)
|
metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics, accountID, projectID)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if ok {
|
if ok {
|
||||||
// Fast path: tr covers the current and / or the previous hour.
|
// Fast path: tr covers the current and / or the previous hour.
|
||||||
// Return the full list of metric ids for this time range.
|
// Return the full list of metric ids for this time range.
|
||||||
@ -1903,38 +1900,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac
|
|||||||
return metricIDs, nil
|
return metricIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, accountID, projectID uint32) (*uint64set.Set, bool, error) {
|
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, accountID, projectID uint32) (*uint64set.Set, bool) {
|
||||||
metricIDs, ok := is.getMetricIDsForRecentHoursAll(tr, maxMetrics)
|
|
||||||
if !ok {
|
|
||||||
return nil, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter out metricIDs for non-matching (accountID, projectID).
|
|
||||||
// Sort metricIDs for faster lookups below.
|
|
||||||
sortedMetricIDs := metricIDs.AppendTo(nil)
|
|
||||||
ts := &is.ts
|
|
||||||
kb := &is.kb
|
|
||||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID, accountID, projectID)
|
|
||||||
prefixLen := len(kb.B)
|
|
||||||
kb.B = encoding.MarshalUint64(kb.B, 0)
|
|
||||||
prefix := kb.B[:prefixLen]
|
|
||||||
for _, metricID := range sortedMetricIDs {
|
|
||||||
kb.B = encoding.MarshalUint64(prefix, metricID)
|
|
||||||
ts.Seek(kb.B)
|
|
||||||
if !ts.NextItem() {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if !bytes.HasPrefix(ts.Item, kb.B) {
|
|
||||||
metricIDs.Del(metricID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := ts.Error(); err != nil {
|
|
||||||
return nil, false, fmt.Errorf("cannot filter out metricIDs by (accountID=%d, projectID=%d): %s", accountID, projectID, err)
|
|
||||||
}
|
|
||||||
return metricIDs, true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForRecentHoursAll(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) {
|
|
||||||
// Return all the metricIDs for all the (AccountID, ProjectID) entries.
|
// Return all the metricIDs for all the (AccountID, ProjectID) entries.
|
||||||
// The caller is responsible for proper filtering later.
|
// The caller is responsible for proper filtering later.
|
||||||
minHour := uint64(tr.MinTimestamp) / msecPerHour
|
minHour := uint64(tr.MinTimestamp) / msecPerHour
|
||||||
@ -1944,28 +1910,44 @@ func (is *indexSearch) getMetricIDsForRecentHoursAll(tr TimeRange, maxMetrics in
|
|||||||
// The tr fits the current hour.
|
// The tr fits the current hour.
|
||||||
// Return a copy of hmCurr.m, because the caller may modify
|
// Return a copy of hmCurr.m, because the caller may modify
|
||||||
// the returned map.
|
// the returned map.
|
||||||
if hmCurr.m.Len() > maxMetrics {
|
k := accountProjectKey{
|
||||||
|
AccountID: accountID,
|
||||||
|
ProjectID: projectID,
|
||||||
|
}
|
||||||
|
m := hmCurr.byTenant[k]
|
||||||
|
if m.Len() > maxMetrics {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
return hmCurr.m.Clone(), true
|
return m.Clone(), true
|
||||||
}
|
}
|
||||||
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
|
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||||
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
|
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
|
||||||
// The tr fits the previous hour.
|
// The tr fits the previous hour.
|
||||||
// Return a copy of hmPrev.m, because the caller may modify
|
// Return a copy of hmPrev.m, because the caller may modify
|
||||||
// the returned map.
|
// the returned map.
|
||||||
if hmPrev.m.Len() > maxMetrics {
|
k := accountProjectKey{
|
||||||
|
AccountID: accountID,
|
||||||
|
ProjectID: projectID,
|
||||||
|
}
|
||||||
|
m := hmPrev.byTenant[k]
|
||||||
|
if m.Len() > maxMetrics {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
return hmPrev.m.Clone(), true
|
return m.Clone(), true
|
||||||
}
|
}
|
||||||
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
|
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
|
||||||
// The tr spans the previous and the current hours.
|
// The tr spans the previous and the current hours.
|
||||||
if hmCurr.m.Len()+hmPrev.m.Len() > maxMetrics {
|
k := accountProjectKey{
|
||||||
|
AccountID: accountID,
|
||||||
|
ProjectID: projectID,
|
||||||
|
}
|
||||||
|
mCurr := hmCurr.byTenant[k]
|
||||||
|
mPrev := hmPrev.byTenant[k]
|
||||||
|
if mCurr.Len()+mPrev.Len() > maxMetrics {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
metricIDs := hmCurr.m.Clone()
|
metricIDs := mCurr.Clone()
|
||||||
for _, metricID := range hmPrev.m.AppendTo(nil) {
|
for _, metricID := range mPrev.AppendTo(nil) {
|
||||||
metricIDs.Add(metricID)
|
metricIDs.Add(metricID)
|
||||||
}
|
}
|
||||||
return metricIDs, true
|
return metricIDs, true
|
||||||
|
@ -678,10 +678,6 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
type accountProjectKey struct {
|
|
||||||
AccountID uint32
|
|
||||||
ProjectID uint32
|
|
||||||
}
|
|
||||||
allKeys := make(map[accountProjectKey]map[string]bool)
|
allKeys := make(map[accountProjectKey]map[string]bool)
|
||||||
timeseriesCounters := make(map[accountProjectKey]map[uint64]bool)
|
timeseriesCounters := make(map[accountProjectKey]map[uint64]bool)
|
||||||
var tsidCopy TSID
|
var tsidCopy TSID
|
||||||
|
@ -69,7 +69,7 @@ type Storage struct {
|
|||||||
|
|
||||||
// Pending MetricID values to be added to currHourMetricIDs.
|
// Pending MetricID values to be added to currHourMetricIDs.
|
||||||
pendingHourMetricIDsLock sync.Mutex
|
pendingHourMetricIDsLock sync.Mutex
|
||||||
pendingHourMetricIDs *uint64set.Set
|
pendingHourMetricIDs []pendingHourMetricIDEntry
|
||||||
|
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
|
|
||||||
@ -77,6 +77,17 @@ type Storage struct {
|
|||||||
retentionWatcherWG sync.WaitGroup
|
retentionWatcherWG sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type pendingHourMetricIDEntry struct {
|
||||||
|
AccountID uint32
|
||||||
|
ProjectID uint32
|
||||||
|
MetricID uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type accountProjectKey struct {
|
||||||
|
AccountID uint32
|
||||||
|
ProjectID uint32
|
||||||
|
}
|
||||||
|
|
||||||
// OpenStorage opens storage on the given path with the given number of retention months.
|
// OpenStorage opens storage on the given path with the given number of retention months.
|
||||||
func OpenStorage(path string, retentionMonths int) (*Storage, error) {
|
func OpenStorage(path string, retentionMonths int) (*Storage, error) {
|
||||||
if retentionMonths > maxRetentionMonths {
|
if retentionMonths > maxRetentionMonths {
|
||||||
@ -125,7 +136,6 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
|
|||||||
hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids")
|
hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids")
|
||||||
s.currHourMetricIDs.Store(hmCurr)
|
s.currHourMetricIDs.Store(hmCurr)
|
||||||
s.prevHourMetricIDs.Store(hmPrev)
|
s.prevHourMetricIDs.Store(hmPrev)
|
||||||
s.pendingHourMetricIDs = &uint64set.Set{}
|
|
||||||
|
|
||||||
// Load indexdb
|
// Load indexdb
|
||||||
idbPath := path + "/indexdb"
|
idbPath := path + "/indexdb"
|
||||||
@ -497,6 +507,8 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
|||||||
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
|
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
|
||||||
return &hourMetricIDs{}
|
return &hourMetricIDs{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unmarshal header
|
||||||
isFull := encoding.UnmarshalUint64(src)
|
isFull := encoding.UnmarshalUint64(src)
|
||||||
src = src[8:]
|
src = src[8:]
|
||||||
hourLoaded := encoding.UnmarshalUint64(src)
|
hourLoaded := encoding.UnmarshalUint64(src)
|
||||||
@ -505,10 +517,12 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
|||||||
logger.Infof("discarding %s, since it is outdated", name)
|
logger.Infof("discarding %s, since it is outdated", name)
|
||||||
return &hourMetricIDs{}
|
return &hourMetricIDs{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unmarshal hm.m
|
||||||
hmLen := encoding.UnmarshalUint64(src)
|
hmLen := encoding.UnmarshalUint64(src)
|
||||||
src = src[8:]
|
src = src[8:]
|
||||||
if uint64(len(src)) != 8*hmLen {
|
if uint64(len(src)) < 8*hmLen {
|
||||||
logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen)
|
logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want %d bytes", path, len(src), 8*hmLen)
|
||||||
return &hourMetricIDs{}
|
return &hourMetricIDs{}
|
||||||
}
|
}
|
||||||
m := &uint64set.Set{}
|
m := &uint64set.Set{}
|
||||||
@ -517,9 +531,47 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
|||||||
src = src[8:]
|
src = src[8:]
|
||||||
m.Add(metricID)
|
m.Add(metricID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unmarshal hm.byTenant
|
||||||
|
if len(src) < 8 {
|
||||||
|
logger.Errorf("discarding %s, since it has broken hm.byTenant header; got %d bytes; want %d bytes", path, len(src), 8)
|
||||||
|
return &hourMetricIDs{}
|
||||||
|
}
|
||||||
|
byTenantLen := encoding.UnmarshalUint64(src)
|
||||||
|
src = src[8:]
|
||||||
|
byTenant := make(map[accountProjectKey]*uint64set.Set, byTenantLen)
|
||||||
|
for i := uint64(0); i < byTenantLen; i++ {
|
||||||
|
if len(src) < 16 {
|
||||||
|
logger.Errorf("discarding %s, since it has broken accountID:projectID prefix; got %d bytes; want %d bytes", path, len(src), 16)
|
||||||
|
return &hourMetricIDs{}
|
||||||
|
}
|
||||||
|
accountID := encoding.UnmarshalUint32(src)
|
||||||
|
src = src[4:]
|
||||||
|
projectID := encoding.UnmarshalUint32(src)
|
||||||
|
src = src[4:]
|
||||||
|
mLen := encoding.UnmarshalUint64(src)
|
||||||
|
src = src[8:]
|
||||||
|
if uint64(len(src)) < 8*mLen {
|
||||||
|
logger.Errorf("discarding %s, since it has borken accountID:projectID entry; got %d bytes; want %d bytes", path, len(src), 8*mLen)
|
||||||
|
return &hourMetricIDs{}
|
||||||
|
}
|
||||||
|
m := &uint64set.Set{}
|
||||||
|
for j := uint64(0); j < mLen; j++ {
|
||||||
|
metricID := encoding.UnmarshalUint64(src)
|
||||||
|
src = src[8:]
|
||||||
|
m.Add(metricID)
|
||||||
|
}
|
||||||
|
k := accountProjectKey{
|
||||||
|
AccountID: accountID,
|
||||||
|
ProjectID: projectID,
|
||||||
|
}
|
||||||
|
byTenant[k] = m
|
||||||
|
}
|
||||||
|
|
||||||
logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen)
|
logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen)
|
||||||
return &hourMetricIDs{
|
return &hourMetricIDs{
|
||||||
m: m,
|
m: m,
|
||||||
|
byTenant: byTenant,
|
||||||
hour: hourLoaded,
|
hour: hourLoaded,
|
||||||
isFull: isFull != 0,
|
isFull: isFull != 0,
|
||||||
}
|
}
|
||||||
@ -534,12 +586,29 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
|
|||||||
if hm.isFull {
|
if hm.isFull {
|
||||||
isFull = 1
|
isFull = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Marshal header
|
||||||
dst = encoding.MarshalUint64(dst, isFull)
|
dst = encoding.MarshalUint64(dst, isFull)
|
||||||
dst = encoding.MarshalUint64(dst, hm.hour)
|
dst = encoding.MarshalUint64(dst, hm.hour)
|
||||||
|
|
||||||
|
// Marshal hm.m
|
||||||
dst = encoding.MarshalUint64(dst, uint64(hm.m.Len()))
|
dst = encoding.MarshalUint64(dst, uint64(hm.m.Len()))
|
||||||
for _, metricID := range hm.m.AppendTo(nil) {
|
for _, metricID := range hm.m.AppendTo(nil) {
|
||||||
dst = encoding.MarshalUint64(dst, metricID)
|
dst = encoding.MarshalUint64(dst, metricID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Marshal hm.byTenant
|
||||||
|
var metricIDs []uint64
|
||||||
|
dst = encoding.MarshalUint64(dst, uint64(len(hm.byTenant)))
|
||||||
|
for k, e := range hm.byTenant {
|
||||||
|
dst = encoding.MarshalUint32(dst, k.AccountID)
|
||||||
|
dst = encoding.MarshalUint32(dst, k.ProjectID)
|
||||||
|
dst = encoding.MarshalUint64(dst, uint64(e.Len()))
|
||||||
|
metricIDs = e.AppendTo(metricIDs[:0])
|
||||||
|
for _, metricID := range metricIDs {
|
||||||
|
dst = encoding.MarshalUint64(dst, metricID)
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := ioutil.WriteFile(path, dst, 0644); err != nil {
|
if err := ioutil.WriteFile(path, dst, 0644); err != nil {
|
||||||
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
|
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
|
||||||
}
|
}
|
||||||
@ -898,7 +967,12 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.pendingHourMetricIDsLock.Lock()
|
s.pendingHourMetricIDsLock.Lock()
|
||||||
s.pendingHourMetricIDs.Add(metricID)
|
e := pendingHourMetricIDEntry{
|
||||||
|
AccountID: r.TSID.AccountID,
|
||||||
|
ProjectID: r.TSID.ProjectID,
|
||||||
|
MetricID: metricID,
|
||||||
|
}
|
||||||
|
s.pendingHourMetricIDs = append(s.pendingHourMetricIDs, e)
|
||||||
s.pendingHourMetricIDsLock.Unlock()
|
s.pendingHourMetricIDsLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -924,7 +998,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
|
|||||||
func (s *Storage) updateCurrHourMetricIDs() {
|
func (s *Storage) updateCurrHourMetricIDs() {
|
||||||
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||||
s.pendingHourMetricIDsLock.Lock()
|
s.pendingHourMetricIDsLock.Lock()
|
||||||
newMetricIDsLen := s.pendingHourMetricIDs.Len()
|
newMetricIDsLen := len(s.pendingHourMetricIDs)
|
||||||
s.pendingHourMetricIDsLock.Unlock()
|
s.pendingHourMetricIDsLock.Unlock()
|
||||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||||
if newMetricIDsLen == 0 && hm.hour == hour {
|
if newMetricIDsLen == 0 && hm.hour == hour {
|
||||||
@ -934,23 +1008,42 @@ func (s *Storage) updateCurrHourMetricIDs() {
|
|||||||
|
|
||||||
// Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs.
|
// Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs.
|
||||||
var m *uint64set.Set
|
var m *uint64set.Set
|
||||||
|
var byTenant map[accountProjectKey]*uint64set.Set
|
||||||
isFull := hm.isFull
|
isFull := hm.isFull
|
||||||
if hm.hour == hour {
|
if hm.hour == hour {
|
||||||
m = hm.m.Clone()
|
m = hm.m.Clone()
|
||||||
|
byTenant = make(map[accountProjectKey]*uint64set.Set, len(hm.byTenant))
|
||||||
|
for k, e := range hm.byTenant {
|
||||||
|
byTenant[k] = e.Clone()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
m = &uint64set.Set{}
|
m = &uint64set.Set{}
|
||||||
|
byTenant = make(map[accountProjectKey]*uint64set.Set)
|
||||||
isFull = true
|
isFull = true
|
||||||
}
|
}
|
||||||
|
|
||||||
s.pendingHourMetricIDsLock.Lock()
|
s.pendingHourMetricIDsLock.Lock()
|
||||||
newMetricIDs := s.pendingHourMetricIDs.AppendTo(nil)
|
a := append([]pendingHourMetricIDEntry{}, s.pendingHourMetricIDs...)
|
||||||
s.pendingHourMetricIDs = &uint64set.Set{}
|
s.pendingHourMetricIDs = s.pendingHourMetricIDs[:0]
|
||||||
s.pendingHourMetricIDsLock.Unlock()
|
s.pendingHourMetricIDsLock.Unlock()
|
||||||
for _, metricID := range newMetricIDs {
|
|
||||||
m.Add(metricID)
|
for _, x := range a {
|
||||||
|
m.Add(x.MetricID)
|
||||||
|
k := accountProjectKey{
|
||||||
|
AccountID: x.AccountID,
|
||||||
|
ProjectID: x.ProjectID,
|
||||||
|
}
|
||||||
|
e := byTenant[k]
|
||||||
|
if e == nil {
|
||||||
|
e = &uint64set.Set{}
|
||||||
|
byTenant[k] = e
|
||||||
|
}
|
||||||
|
e.Add(x.MetricID)
|
||||||
}
|
}
|
||||||
|
|
||||||
hmNew := &hourMetricIDs{
|
hmNew := &hourMetricIDs{
|
||||||
m: m,
|
m: m,
|
||||||
|
byTenant: byTenant,
|
||||||
hour: hour,
|
hour: hour,
|
||||||
isFull: isFull,
|
isFull: isFull,
|
||||||
}
|
}
|
||||||
@ -962,6 +1055,7 @@ func (s *Storage) updateCurrHourMetricIDs() {
|
|||||||
|
|
||||||
type hourMetricIDs struct {
|
type hourMetricIDs struct {
|
||||||
m *uint64set.Set
|
m *uint64set.Set
|
||||||
|
byTenant map[accountProjectKey]*uint64set.Set
|
||||||
hour uint64
|
hour uint64
|
||||||
isFull bool
|
isFull bool
|
||||||
}
|
}
|
||||||
|
@ -18,10 +18,9 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||||||
var s Storage
|
var s Storage
|
||||||
s.currHourMetricIDs.Store(&hourMetricIDs{})
|
s.currHourMetricIDs.Store(&hourMetricIDs{})
|
||||||
s.prevHourMetricIDs.Store(&hourMetricIDs{})
|
s.prevHourMetricIDs.Store(&hourMetricIDs{})
|
||||||
s.pendingHourMetricIDs = &uint64set.Set{}
|
|
||||||
return &s
|
return &s
|
||||||
}
|
}
|
||||||
t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) {
|
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
||||||
s := newStorage()
|
s := newStorage()
|
||||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||||
hmOrig := &hourMetricIDs{
|
hmOrig := &hourMetricIDs{
|
||||||
@ -52,11 +51,11 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.pendingHourMetricIDs.Len() != 0 {
|
if len(s.pendingHourMetricIDs) != 0 {
|
||||||
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
|
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) {
|
t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
|
||||||
s := newStorage()
|
s := newStorage()
|
||||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||||
hmOrig := &hourMetricIDs{
|
hmOrig := &hourMetricIDs{
|
||||||
@ -89,18 +88,34 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||||||
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
||||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
||||||
}
|
}
|
||||||
|
if len(s.pendingHourMetricIDs) != 0 {
|
||||||
if s.pendingHourMetricIDs.Len() != 0 {
|
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0)
|
||||||
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
||||||
s := newStorage()
|
s := newStorage()
|
||||||
pendingHourMetricIDs := &uint64set.Set{}
|
s.pendingHourMetricIDs = []pendingHourMetricIDEntry{
|
||||||
pendingHourMetricIDs.Add(343)
|
{AccountID: 123, ProjectID: 431, MetricID: 343},
|
||||||
pendingHourMetricIDs.Add(32424)
|
{AccountID: 123, ProjectID: 431, MetricID: 32424},
|
||||||
pendingHourMetricIDs.Add(8293432)
|
{AccountID: 1, ProjectID: 2, MetricID: 8293432},
|
||||||
s.pendingHourMetricIDs = pendingHourMetricIDs
|
}
|
||||||
|
mExpected := &uint64set.Set{}
|
||||||
|
for _, e := range s.pendingHourMetricIDs {
|
||||||
|
mExpected.Add(e.MetricID)
|
||||||
|
}
|
||||||
|
byTenantExpected := make(map[accountProjectKey]*uint64set.Set)
|
||||||
|
for _, e := range s.pendingHourMetricIDs {
|
||||||
|
k := accountProjectKey{
|
||||||
|
AccountID: e.AccountID,
|
||||||
|
ProjectID: e.ProjectID,
|
||||||
|
}
|
||||||
|
x := byTenantExpected[k]
|
||||||
|
if x == nil {
|
||||||
|
x = &uint64set.Set{}
|
||||||
|
byTenantExpected[k] = x
|
||||||
|
}
|
||||||
|
x.Add(e.MetricID)
|
||||||
|
}
|
||||||
|
|
||||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||||
hmOrig := &hourMetricIDs{
|
hmOrig := &hourMetricIDs{
|
||||||
@ -119,8 +134,11 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(hmCurr.m, pendingHourMetricIDs) {
|
if !reflect.DeepEqual(hmCurr.m, mExpected) {
|
||||||
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, pendingHourMetricIDs)
|
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, mExpected)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) {
|
||||||
|
t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected)
|
||||||
}
|
}
|
||||||
if !hmCurr.isFull {
|
if !hmCurr.isFull {
|
||||||
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true)
|
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true)
|
||||||
@ -130,18 +148,34 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||||||
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
||||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
||||||
}
|
}
|
||||||
|
if len(s.pendingHourMetricIDs) != 0 {
|
||||||
if s.pendingHourMetricIDs.Len() != 0 {
|
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0)
|
||||||
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
|
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
|
||||||
s := newStorage()
|
s := newStorage()
|
||||||
pendingHourMetricIDs := &uint64set.Set{}
|
s.pendingHourMetricIDs = []pendingHourMetricIDEntry{
|
||||||
pendingHourMetricIDs.Add(343)
|
{AccountID: 123, ProjectID: 431, MetricID: 343},
|
||||||
pendingHourMetricIDs.Add(32424)
|
{AccountID: 123, ProjectID: 431, MetricID: 32424},
|
||||||
pendingHourMetricIDs.Add(8293432)
|
{AccountID: 1, ProjectID: 2, MetricID: 8293432},
|
||||||
s.pendingHourMetricIDs = pendingHourMetricIDs
|
}
|
||||||
|
mExpected := &uint64set.Set{}
|
||||||
|
for _, e := range s.pendingHourMetricIDs {
|
||||||
|
mExpected.Add(e.MetricID)
|
||||||
|
}
|
||||||
|
byTenantExpected := make(map[accountProjectKey]*uint64set.Set)
|
||||||
|
for _, e := range s.pendingHourMetricIDs {
|
||||||
|
k := accountProjectKey{
|
||||||
|
AccountID: e.AccountID,
|
||||||
|
ProjectID: e.ProjectID,
|
||||||
|
}
|
||||||
|
x := byTenantExpected[k]
|
||||||
|
if x == nil {
|
||||||
|
x = &uint64set.Set{}
|
||||||
|
byTenantExpected[k] = x
|
||||||
|
}
|
||||||
|
x.Add(e.MetricID)
|
||||||
|
}
|
||||||
|
|
||||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||||
hmOrig := &hourMetricIDs{
|
hmOrig := &hourMetricIDs{
|
||||||
@ -162,7 +196,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||||||
// Do not run other checks, since they may fail.
|
// Do not run other checks, since they may fail.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
m := pendingHourMetricIDs.Clone()
|
m := mExpected.Clone()
|
||||||
origMetricIDs := hmOrig.m.AppendTo(nil)
|
origMetricIDs := hmOrig.m.AppendTo(nil)
|
||||||
for _, metricID := range origMetricIDs {
|
for _, metricID := range origMetricIDs {
|
||||||
m.Add(metricID)
|
m.Add(metricID)
|
||||||
@ -170,6 +204,9 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||||||
if !reflect.DeepEqual(hmCurr.m, m) {
|
if !reflect.DeepEqual(hmCurr.m, m) {
|
||||||
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
|
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
|
||||||
}
|
}
|
||||||
|
if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) {
|
||||||
|
t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected)
|
||||||
|
}
|
||||||
if hmCurr.isFull {
|
if hmCurr.isFull {
|
||||||
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false)
|
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false)
|
||||||
}
|
}
|
||||||
@ -179,9 +216,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||||||
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
||||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
||||||
}
|
}
|
||||||
|
if len(s.pendingHourMetricIDs) != 0 {
|
||||||
if s.pendingHourMetricIDs.Len() != 0 {
|
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", len(s.pendingHourMetricIDs), 0)
|
||||||
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user