lib/storage: do not pollute inverted index with data for samples outside the retention period

This commit is contained in:
Aliaksandr Valialkin 2019-07-11 17:04:56 +03:00
parent 0522efb2d6
commit 4ca66344ee
2 changed files with 22 additions and 11 deletions

View File

@ -756,6 +756,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
}
rows = rows[:rowsLen+len(mrs)]
j := 0
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
for i := range mrs {
mr := &mrs[i]
if math.IsNaN(mr.Value) {
@ -763,6 +764,10 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
// doesn't know how to work with them.
continue
}
if mr.Timestamp < minTimestamp || mr.Timestamp > maxTimestamp {
// Skip rows with timestamps outside the retention.
continue
}
r := &rows[rowsLen+j]
j++
r.Timestamp = mr.Timestamp

View File

@ -310,22 +310,14 @@ func (tb *table) AddRows(rows []rawRow) error {
// The slowest path - there are rows that don't fit any existing partition.
// Create new partitions for these rows.
// Do this under tb.ptwsLock.
now := timestampFromTime(time.Now())
minTimestamp := now - tb.retentionMilliseconds
maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
minTimestamp, maxTimestamp := tb.getMinMaxTimestamps()
tb.ptwsLock.Lock()
var errors []error
for i := range missingRows {
r := &missingRows[i]
if r.Timestamp < minTimestamp {
// Silently skip row with too small timestamp, since it should be deleted anyway.
continue
}
if r.Timestamp > maxTimestamp {
err := fmt.Errorf("cannot add row %+v with too big timestamp to table %q; the timestamp cannot be bigger than %d (+2 days from now)",
r, tb.path, maxTimestamp)
errors = append(errors, err)
if r.Timestamp < minTimestamp || r.Timestamp > maxTimestamp {
// Silently skip row outside retention, since it should be deleted anyway.
continue
}
@ -359,6 +351,20 @@ func (tb *table) AddRows(rows []rawRow) error {
return nil
}
func (tb *table) getMinMaxTimestamps() (int64, int64) {
now := timestampFromTime(time.Now())
minTimestamp := now - tb.retentionMilliseconds
maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
if minTimestamp < 0 {
// Negative timestamps aren't supported by the storage.
minTimestamp = 0
}
if maxTimestamp < 0 {
maxTimestamp = (1 << 63) - 1
}
return minTimestamp, maxTimestamp
}
func (tb *table) startRetentionWatcher() {
tb.retentionWatcherWG.Add(1)
go func() {