diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index e694d5d2c8..c2f78bb7ff 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -2195,6 +2195,7 @@ func newTestStorage() *Storage { metricNameCache: workingsetcache.New(1234), tsidCache: workingsetcache.New(1234), dateMetricIDCache: newDateMetricIDCache(), + retentionMsecs: maxRetentionMsecs, } s.setDeletedMetricIDs(&uint64set.Set{}) return s diff --git a/lib/storage/partition.go b/lib/storage/partition.go index d0c5d99116..d98752924d 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -120,14 +120,6 @@ type partition struct { // The parent storage. s *Storage - // data retention in milliseconds. - // Used for deleting data outside the retention during background merge. - retentionMsecs int64 - - // Whether the storage is in read-only mode. - // Background merge is stopped in read-only mode. - isReadOnly *uint32 - // Name is the name of the partition in the form YYYY_MM. name string @@ -201,7 +193,7 @@ func (pw *partWrapper) decRef() { // createPartition creates new partition for the given timestamp and the given paths // to small and big partitions. -func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*partition, error) { +func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage) (*partition, error) { name := timestampToPartitionName(timestamp) smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name @@ -214,7 +206,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err) } - pt := newPartition(name, smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly) + pt := newPartition(name, smallPartsPath, bigPartsPath, s) pt.tr.fromPartitionTimestamp(timestamp) pt.startMergeWorkers() pt.startRawRowsFlusher() @@ -240,7 +232,7 @@ func (pt *partition) Drop() { } // openPartition opens the existing partition from the given paths. -func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*partition, error) { +func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, error) { smallPartsPath = filepath.Clean(smallPartsPath) bigPartsPath = filepath.Clean(bigPartsPath) @@ -264,7 +256,7 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMse return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err) } - pt := newPartition(name, smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly) + pt := newPartition(name, smallPartsPath, bigPartsPath, s) pt.smallParts = smallParts pt.bigParts = bigParts if err := pt.tr.fromPartitionName(name); err != nil { @@ -278,15 +270,13 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMse return pt, nil } -func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) *partition { +func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partition { p := &partition{ name: name, smallPartsPath: smallPartsPath, bigPartsPath: bigPartsPath, - s: s, - retentionMsecs: retentionMsecs, - isReadOnly: isReadOnly, + s: s, mergeIdx: uint64(time.Now().UnixNano()), stopCh: make(chan struct{}), @@ -1030,7 +1020,7 @@ func getMaxOutBytes(path string, workersCount int) uint64 { } func (pt *partition) canBackgroundMerge() bool { - return atomic.LoadUint32(pt.isReadOnly) == 0 + return atomic.LoadUint32(&pt.s.isReadOnly) == 0 } var errReadOnlyMode = fmt.Errorf("storage is in readonly mode") @@ -1217,7 +1207,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro atomic.AddUint64(&pt.smallMergesCount, 1) atomic.AddUint64(&pt.activeSmallMerges, 1) } - retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs + retentionDeadline := timestampFromTime(startTime) - pt.s.retentionMsecs err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pt.s, retentionDeadline, rowsMerged, rowsDeleted) if isBigPart { atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) @@ -1387,7 +1377,7 @@ func (pt *partition) stalePartsRemover() { func (pt *partition) removeStaleParts() { m := make(map[*partWrapper]bool) startTime := time.Now() - retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs + retentionDeadline := timestampFromTime(startTime) - pt.s.retentionMsecs pt.partsLock.Lock() for _, pw := range pt.bigParts { @@ -1418,7 +1408,7 @@ func (pt *partition) removeStaleParts() { // consistent snapshots with table.CreateSnapshot(). pt.snapshotLock.RLock() for pw := range m { - logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.retentionMsecs/1000) + logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.s.retentionMsecs/1000) fs.MustRemoveDirAtomic(pw.p.path) } // There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath, diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 7844a3bef0..9649770dfd 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -166,9 +166,8 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma // Create partition from rowss and test search on it. strg := newTestStorage() - retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000 - var isReadOnly uint32 - pt, err := createPartition(ptt, "./small-table", "./big-table", strg, retentionMsecs, &isReadOnly) + strg.retentionMsecs = timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000 + pt, err := createPartition(ptt, "./small-table", "./big-table", strg) if err != nil { t.Fatalf("cannot create partition: %s", err) } @@ -192,7 +191,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.MustClose() // Open the created partition and test search on it. - pt, err = openPartition(smallPartsPath, bigPartsPath, strg, retentionMsecs, &isReadOnly) + pt, err = openPartition(smallPartsPath, bigPartsPath, strg) if err != nil { t.Fatalf("cannot open partition: %s", err) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 8c72f3512b..bfaa34fa2a 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -268,7 +268,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer // Load data tablePath := path + "/data" - tb, err := openTable(tablePath, s, retentionMsecs, &s.isReadOnly) + tb, err := openTable(tablePath, s) if err != nil { s.idb().MustClose() return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err) diff --git a/lib/storage/table.go b/lib/storage/table.go index e24b15a956..5b7ffbce71 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -20,9 +20,7 @@ type table struct { smallPartitionsPath string bigPartitionsPath string - s *Storage - retentionMsecs int64 - isReadOnly *uint32 + s *Storage ptws []*partitionWrapper ptwsLock sync.Mutex @@ -78,12 +76,10 @@ func (ptw *partitionWrapper) scheduleToDrop() { atomic.AddUint64(&ptw.mustDrop, 1) } -// openTable opens a table on the given path with the given retentionMsecs. +// openTable opens a table on the given path. // // The table is created if it doesn't exist. -// -// Data older than the retentionMsecs may be dropped at any time. -func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*table, error) { +func openTable(path string, s *Storage) (*table, error) { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. @@ -121,7 +117,7 @@ func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32 fs.MustRemoveTemporaryDirs(bigSnapshotsPath) // Open partitions. - pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s, retentionMsecs, isReadOnly) + pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s) if err != nil { return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err) } @@ -131,8 +127,6 @@ func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32 smallPartitionsPath: smallPartitionsPath, bigPartitionsPath: bigPartitionsPath, s: s, - retentionMsecs: retentionMsecs, - isReadOnly: isReadOnly, flockF: flockF, @@ -365,7 +359,7 @@ func (tb *table) AddRows(rows []rawRow) error { continue } - pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s, tb.retentionMsecs, tb.isReadOnly) + pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s) if err != nil { // Return only the first error, since it has no sense in returning all errors. tb.ptwsLock.Unlock() @@ -381,7 +375,7 @@ func (tb *table) AddRows(rows []rawRow) error { func (tb *table) getMinMaxTimestamps() (int64, int64) { now := int64(fasttime.UnixTimestamp() * 1000) - minTimestamp := now - tb.retentionMsecs + minTimestamp := now - tb.s.retentionMsecs maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :) if minTimestamp < 0 { // Negative timestamps aren't supported by the storage. @@ -411,7 +405,7 @@ func (tb *table) retentionWatcher() { case <-ticker.C: } - minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMsecs + minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.s.retentionMsecs var ptwsDrop []*partitionWrapper tb.ptwsLock.Lock() dst := tb.ptws[:0] @@ -503,7 +497,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) { } } -func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) ([]*partition, error) { +func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) ([]*partition, error) { // Certain partition directories in either `big` or `small` dir may be missing // after restoring from backup. So populate partition names from both dirs. ptNames := make(map[string]bool) @@ -517,7 +511,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage, r for ptName := range ptNames { smallPartsPath := smallPartitionsPath + "/" + ptName bigPartsPath := bigPartitionsPath + "/" + ptName - pt, err := openPartition(smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly) + pt, err := openPartition(smallPartsPath, bigPartsPath, s) if err != nil { mustClosePartitions(pts) return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err) diff --git a/lib/storage/table_search.go b/lib/storage/table_search.go index f344da06ab..bd03d67ca5 100644 --- a/lib/storage/table_search.go +++ b/lib/storage/table_search.go @@ -66,7 +66,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) { // Adjust tr.MinTimestamp, so it doesn't obtain data older // than the tb retention. now := int64(fasttime.UnixTimestamp() * 1000) - minTimestamp := now - tb.retentionMsecs + minTimestamp := now - tb.s.retentionMsecs if tr.MinTimestamp < minTimestamp { tr.MinTimestamp = minTimestamp } diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index 5676bfcd39..c9b1119dcf 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -182,8 +182,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount // Create a table from rowss and test search on it. strg := newTestStorage() - var isReadOnly uint32 - tb, err := openTable("./test-table", strg, maxRetentionMsecs, &isReadOnly) + tb, err := openTable("./test-table", strg) if err != nil { t.Fatalf("cannot create table: %s", err) } @@ -204,7 +203,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount tb.MustClose() // Open the created table and test search on it. - tb, err = openTable("./test-table", strg, maxRetentionMsecs, &isReadOnly) + tb, err = openTable("./test-table", strg) if err != nil { t.Fatalf("cannot open table: %s", err) } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index 1308d8ac5b..cf046c5137 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -45,8 +45,7 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount createdBenchTables[path] = true } strg := newTestStorage() - var isReadOnly uint32 - tb, err := openTable(path, strg, maxRetentionMsecs, &isReadOnly) + tb, err := openTable(path, strg) if err != nil { b.Fatalf("cnanot open table %q: %s", path, err) } @@ -70,8 +69,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn b.Helper() strg := newTestStorage() - var isReadOnly uint32 - tb, err := openTable(path, strg, maxRetentionMsecs, &isReadOnly) + tb, err := openTable(path, strg) if err != nil { b.Fatalf("cannot open table %q: %s", path, err) } diff --git a/lib/storage/table_test.go b/lib/storage/table_test.go index 7268d45d7d..08bf2ba754 100644 --- a/lib/storage/table_test.go +++ b/lib/storage/table_test.go @@ -18,8 +18,8 @@ func TestTableOpenClose(t *testing.T) { // Create a new table strg := newTestStorage() - var isReadOnly uint32 - tb, err := openTable(path, strg, retentionMsecs, &isReadOnly) + strg.retentionMsecs = retentionMsecs + tb, err := openTable(path, strg) if err != nil { t.Fatalf("cannot create new table: %s", err) } @@ -29,7 +29,7 @@ func TestTableOpenClose(t *testing.T) { // Re-open created table multiple times. for i := 0; i < 10; i++ { - tb, err := openTable(path, strg, retentionMsecs, &isReadOnly) + tb, err := openTable(path, strg) if err != nil { t.Fatalf("cannot open created table: %s", err) } @@ -46,15 +46,15 @@ func TestTableOpenMultipleTimes(t *testing.T) { }() strg := newTestStorage() - var isReadOnly uint32 - tb1, err := openTable(path, strg, retentionMsecs, &isReadOnly) + strg.retentionMsecs = retentionMsecs + tb1, err := openTable(path, strg) if err != nil { t.Fatalf("cannot open table the first time: %s", err) } defer tb1.MustClose() for i := 0; i < 10; i++ { - tb2, err := openTable(path, strg, retentionMsecs, &isReadOnly) + tb2, err := openTable(path, strg) if err == nil { tb2.MustClose() t.Fatalf("expecting non-nil error when opening already opened table") diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index 01bae910af..0f7ae00d1c 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -47,8 +47,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { tablePath := "./benchmarkTableAddRows" strg := newTestStorage() for i := 0; i < b.N; i++ { - var isReadOnly uint32 - tb, err := openTable(tablePath, strg, maxRetentionMsecs, &isReadOnly) + tb, err := openTable(tablePath, strg) if err != nil { b.Fatalf("cannot open table %q: %s", tablePath, err) } @@ -96,7 +95,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { tb.MustClose() // Open the table from files and verify the rows count on it - tb, err = openTable(tablePath, strg, maxRetentionMsecs, &isReadOnly) + tb, err = openTable(tablePath, strg) if err != nil { b.Fatalf("cannot open table %q: %s", tablePath, err) }