mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
Fix issue-3309 - currHourMetricIDs shouldn't contain metrics from prev hour (#3320)
* fix issue-3309 currHourMetricIDs shouldn't contain metrics from prev hour * Update storage.go
This commit is contained in:
parent
63d4cf661b
commit
790768f20b
@ -98,7 +98,7 @@ type Storage struct {
|
||||
|
||||
// Pending MetricID values to be added to currHourMetricIDs.
|
||||
pendingHourEntriesLock sync.Mutex
|
||||
pendingHourEntries *uint64set.Set
|
||||
pendingHourEntries pendingHourEntries
|
||||
|
||||
// Pending MetricIDs to be added to nextDayMetricIDs.
|
||||
pendingNextDayMetricIDsLock sync.Mutex
|
||||
@ -213,7 +213,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
||||
hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids")
|
||||
s.currHourMetricIDs.Store(hmCurr)
|
||||
s.prevHourMetricIDs.Store(hmPrev)
|
||||
s.pendingHourEntries = &uint64set.Set{}
|
||||
s.pendingHourEntries = pendingHourEntries{{}, {}}
|
||||
|
||||
date := fasttime.UnixDate()
|
||||
nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date)
|
||||
@ -700,10 +700,12 @@ func (s *Storage) currHourMetricIDsUpdater() {
|
||||
for {
|
||||
select {
|
||||
case <-s.stop:
|
||||
s.updateCurrHourMetricIDs()
|
||||
hour := fasttime.UnixHour()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.updateCurrHourMetricIDs()
|
||||
hour := fasttime.UnixHour()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -768,7 +770,7 @@ func (s *Storage) mustRotateIndexDB() {
|
||||
// So queries for the last 24 hours stop returning samples added at step 3.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698
|
||||
s.pendingHourEntriesLock.Lock()
|
||||
s.pendingHourEntries = &uint64set.Set{}
|
||||
s.pendingHourEntries = pendingHourEntries{{}, {}}
|
||||
s.pendingHourEntriesLock.Unlock()
|
||||
s.currHourMetricIDs.Store(&hourMetricIDs{})
|
||||
s.prevHourMetricIDs.Store(&hourMetricIDs{})
|
||||
@ -2027,7 +2029,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
||||
}
|
||||
if len(pendingHourEntries) > 0 {
|
||||
s.pendingHourEntriesLock.Lock()
|
||||
s.pendingHourEntries.AddMulti(pendingHourEntries)
|
||||
s.pendingHourEntries.At(hm.hour).AddMulti(pendingHourEntries)
|
||||
s.pendingHourEntriesLock.Unlock()
|
||||
}
|
||||
if len(pendingDateMetricIDs) == 0 {
|
||||
@ -2321,15 +2323,24 @@ func (s *Storage) updateNextDayMetricIDs() {
|
||||
s.nextDayMetricIDs.Store(eNew)
|
||||
}
|
||||
|
||||
func (s *Storage) updateCurrHourMetricIDs() {
|
||||
type pendingHourEntries [2]*uint64set.Set
|
||||
|
||||
func (p pendingHourEntries) At(hour uint64) *uint64set.Set {
|
||||
return p[hour%2]
|
||||
}
|
||||
|
||||
func (p pendingHourEntries) Len() int {
|
||||
return p[0].Len() + p[1].Len()
|
||||
}
|
||||
|
||||
func (s *Storage) updateCurrHourMetricIDs(hour uint64) {
|
||||
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
s.pendingHourEntriesLock.Lock()
|
||||
newMetricIDs := s.pendingHourEntries
|
||||
s.pendingHourEntries = &uint64set.Set{}
|
||||
s.pendingHourEntries = pendingHourEntries{{}, {}}
|
||||
s.pendingHourEntriesLock.Unlock()
|
||||
|
||||
hour := fasttime.UnixHour()
|
||||
if newMetricIDs.Len() == 0 && hm.hour == hour {
|
||||
if newMetricIDs.At(hour).Len() == 0 && hm.hour == hour {
|
||||
// Fast path: nothing to update.
|
||||
return
|
||||
}
|
||||
@ -2343,7 +2354,7 @@ func (s *Storage) updateCurrHourMetricIDs() {
|
||||
m = &uint64set.Set{}
|
||||
isFull = true
|
||||
}
|
||||
m.Union(newMetricIDs)
|
||||
m.Union(newMetricIDs.At(hour))
|
||||
hmNew := &hourMetricIDs{
|
||||
m: m,
|
||||
hour: hour,
|
||||
@ -2351,6 +2362,11 @@ func (s *Storage) updateCurrHourMetricIDs() {
|
||||
}
|
||||
s.currHourMetricIDs.Store(hmNew)
|
||||
if hm.hour != hour {
|
||||
if hm.m != nil {
|
||||
hm.m.Union(newMetricIDs.At(hm.hour))
|
||||
} else {
|
||||
hm.m = newMetricIDs.At(hm.hour)
|
||||
}
|
||||
s.prevHourMetricIDs.Store(hm)
|
||||
}
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
var s Storage
|
||||
s.currHourMetricIDs.Store(&hourMetricIDs{})
|
||||
s.prevHourMetricIDs.Store(&hourMetricIDs{})
|
||||
s.pendingHourEntries = &uint64set.Set{}
|
||||
s.pendingHourEntries = pendingHourEntries{{}, {}}
|
||||
return &s
|
||||
}
|
||||
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
||||
@ -158,19 +158,15 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
hour: 123,
|
||||
hour: hour - 1,
|
||||
}
|
||||
hmOrig.m.Add(12)
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
if hmCurr.hour != hour {
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
if hmCurr.m.Len() != 0 {
|
||||
t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0)
|
||||
@ -198,16 +194,10 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hmOrig.m.Add(12)
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
if hmCurr.hour != hour {
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
// Do not run other checks, since they may fail.
|
||||
return
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
if !reflect.DeepEqual(hmCurr, hmOrig) {
|
||||
t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig)
|
||||
@ -228,28 +218,25 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
})
|
||||
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
||||
s := newStorage()
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
|
||||
pendingHourEntries := &uint64set.Set{}
|
||||
pendingHourEntries.Add(343)
|
||||
pendingHourEntries.Add(32424)
|
||||
pendingHourEntries.Add(8293432)
|
||||
s.pendingHourEntries = pendingHourEntries
|
||||
s.pendingHourEntries.At(hour).Union(pendingHourEntries)
|
||||
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
hour: 123,
|
||||
hour: hour - 1,
|
||||
}
|
||||
hmOrig.m.Add(12)
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
if hmCurr.hour != hour {
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
if !hmCurr.m.Equal(pendingHourEntries) {
|
||||
t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries)
|
||||
@ -269,13 +256,15 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
})
|
||||
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
|
||||
s := newStorage()
|
||||
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
|
||||
pendingHourEntries := &uint64set.Set{}
|
||||
pendingHourEntries.Add(343)
|
||||
pendingHourEntries.Add(32424)
|
||||
pendingHourEntries.Add(8293432)
|
||||
s.pendingHourEntries = pendingHourEntries
|
||||
s.pendingHourEntries.At(hour).Union(pendingHourEntries)
|
||||
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
hour: hour,
|
||||
@ -283,16 +272,10 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hmOrig.m.Add(12)
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
if hmCurr.hour != hour {
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
// Do not run other checks, since they may fail.
|
||||
return
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
m := pendingHourEntries.Clone()
|
||||
hmOrig.m.ForEach(func(part []uint64) bool {
|
||||
@ -314,6 +297,51 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
||||
}
|
||||
|
||||
if s.pendingHourEntries.Len() != 0 {
|
||||
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
||||
}
|
||||
})
|
||||
t.Run("nonempty_pending_metric_ids_from_previous_hour_stale_hour", func(t *testing.T) {
|
||||
s := newStorage()
|
||||
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
|
||||
pendingPreviousHourEntries := &uint64set.Set{}
|
||||
pendingPreviousHourEntries.Add(343)
|
||||
pendingPreviousHourEntries.Add(32424)
|
||||
pendingPreviousHourEntries.Add(8293432)
|
||||
s.pendingHourEntries.At(hour - 1).Union(pendingPreviousHourEntries)
|
||||
|
||||
pendingCurrentHourEntries := &uint64set.Set{}
|
||||
pendingCurrentHourEntries.Add(12)
|
||||
pendingCurrentHourEntries.Add(34)
|
||||
s.pendingHourEntries.At(hour).Union(pendingCurrentHourEntries)
|
||||
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
hour: hour - 1,
|
||||
}
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if hmCurr.hour != hour {
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
if !hmCurr.m.Equal(pendingCurrentHourEntries) {
|
||||
t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingCurrentHourEntries)
|
||||
}
|
||||
if !hmCurr.isFull {
|
||||
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true)
|
||||
}
|
||||
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if !hmPrev.m.Equal(pendingPreviousHourEntries) {
|
||||
t.Fatalf("unexpected hmPrev.m; got %v; want %v", hmPrev.m, pendingPreviousHourEntries)
|
||||
}
|
||||
if hmPrev.isFull {
|
||||
t.Fatalf("unexpected hmPrev.isFull; got %v; want %v", hmPrev.isFull, false)
|
||||
}
|
||||
|
||||
if s.pendingHourEntries.Len() != 0 {
|
||||
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user