lib/storage: do not pass retentionMsecs and isReadOnly args explicitly - access them via Storage arg

This makes code easier to read.

This is a follow-up after d2d30581a0
This commit is contained in:
Aliaksandr Valialkin 2022-10-24 01:30:50 +03:00
parent d51f9b9284
commit 2fc82b846e
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
10 changed files with 37 additions and 57 deletions

View File

@ -2195,6 +2195,7 @@ func newTestStorage() *Storage {
metricNameCache: workingsetcache.New(1234),
tsidCache: workingsetcache.New(1234),
dateMetricIDCache: newDateMetricIDCache(),
retentionMsecs: maxRetentionMsecs,
}
s.setDeletedMetricIDs(&uint64set.Set{})
return s

View File

@ -120,14 +120,6 @@ type partition struct {
// The parent storage.
s *Storage
// data retention in milliseconds.
// Used for deleting data outside the retention during background merge.
retentionMsecs int64
// Whether the storage is in read-only mode.
// Background merge is stopped in read-only mode.
isReadOnly *uint32
// Name is the name of the partition in the form YYYY_MM.
name string
@ -201,7 +193,7 @@ func (pw *partWrapper) decRef() {
// createPartition creates new partition for the given timestamp and the given paths
// to small and big partitions.
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, s *Storage) (*partition, error) {
name := timestampToPartitionName(timestamp)
smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
@ -214,7 +206,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err)
}
pt := newPartition(name, smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly)
pt := newPartition(name, smallPartsPath, bigPartsPath, s)
pt.tr.fromPartitionTimestamp(timestamp)
pt.startMergeWorkers()
pt.startRawRowsFlusher()
@ -240,7 +232,7 @@ func (pt *partition) Drop() {
}
// openPartition opens the existing partition from the given paths.
func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*partition, error) {
func openPartition(smallPartsPath, bigPartsPath string, s *Storage) (*partition, error) {
smallPartsPath = filepath.Clean(smallPartsPath)
bigPartsPath = filepath.Clean(bigPartsPath)
@ -264,7 +256,7 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMse
return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err)
}
pt := newPartition(name, smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly)
pt := newPartition(name, smallPartsPath, bigPartsPath, s)
pt.smallParts = smallParts
pt.bigParts = bigParts
if err := pt.tr.fromPartitionName(name); err != nil {
@ -278,15 +270,13 @@ func openPartition(smallPartsPath, bigPartsPath string, s *Storage, retentionMse
return pt, nil
}
func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) *partition {
func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partition {
p := &partition{
name: name,
smallPartsPath: smallPartsPath,
bigPartsPath: bigPartsPath,
s: s,
retentionMsecs: retentionMsecs,
isReadOnly: isReadOnly,
s: s,
mergeIdx: uint64(time.Now().UnixNano()),
stopCh: make(chan struct{}),
@ -1030,7 +1020,7 @@ func getMaxOutBytes(path string, workersCount int) uint64 {
}
func (pt *partition) canBackgroundMerge() bool {
return atomic.LoadUint32(pt.isReadOnly) == 0
return atomic.LoadUint32(&pt.s.isReadOnly) == 0
}
var errReadOnlyMode = fmt.Errorf("storage is in readonly mode")
@ -1217,7 +1207,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
atomic.AddUint64(&pt.smallMergesCount, 1)
atomic.AddUint64(&pt.activeSmallMerges, 1)
}
retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs
retentionDeadline := timestampFromTime(startTime) - pt.s.retentionMsecs
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pt.s, retentionDeadline, rowsMerged, rowsDeleted)
if isBigPart {
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
@ -1387,7 +1377,7 @@ func (pt *partition) stalePartsRemover() {
func (pt *partition) removeStaleParts() {
m := make(map[*partWrapper]bool)
startTime := time.Now()
retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs
retentionDeadline := timestampFromTime(startTime) - pt.s.retentionMsecs
pt.partsLock.Lock()
for _, pw := range pt.bigParts {
@ -1418,7 +1408,7 @@ func (pt *partition) removeStaleParts() {
// consistent snapshots with table.CreateSnapshot().
pt.snapshotLock.RLock()
for pw := range m {
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.retentionMsecs/1000)
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, pt.s.retentionMsecs/1000)
fs.MustRemoveDirAtomic(pw.p.path)
}
// There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath,

View File

@ -166,9 +166,8 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
// Create partition from rowss and test search on it.
strg := newTestStorage()
retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
var isReadOnly uint32
pt, err := createPartition(ptt, "./small-table", "./big-table", strg, retentionMsecs, &isReadOnly)
strg.retentionMsecs = timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
pt, err := createPartition(ptt, "./small-table", "./big-table", strg)
if err != nil {
t.Fatalf("cannot create partition: %s", err)
}
@ -192,7 +191,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt.MustClose()
// Open the created partition and test search on it.
pt, err = openPartition(smallPartsPath, bigPartsPath, strg, retentionMsecs, &isReadOnly)
pt, err = openPartition(smallPartsPath, bigPartsPath, strg)
if err != nil {
t.Fatalf("cannot open partition: %s", err)
}

View File

@ -268,7 +268,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
// Load data
tablePath := path + "/data"
tb, err := openTable(tablePath, s, retentionMsecs, &s.isReadOnly)
tb, err := openTable(tablePath, s)
if err != nil {
s.idb().MustClose()
return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err)

View File

@ -20,9 +20,7 @@ type table struct {
smallPartitionsPath string
bigPartitionsPath string
s *Storage
retentionMsecs int64
isReadOnly *uint32
s *Storage
ptws []*partitionWrapper
ptwsLock sync.Mutex
@ -78,12 +76,10 @@ func (ptw *partitionWrapper) scheduleToDrop() {
atomic.AddUint64(&ptw.mustDrop, 1)
}
// openTable opens a table on the given path with the given retentionMsecs.
// openTable opens a table on the given path.
//
// The table is created if it doesn't exist.
//
// Data older than the retentionMsecs may be dropped at any time.
func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32) (*table, error) {
func openTable(path string, s *Storage) (*table, error) {
path = filepath.Clean(path)
// Create a directory for the table if it doesn't exist yet.
@ -121,7 +117,7 @@ func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32
fs.MustRemoveTemporaryDirs(bigSnapshotsPath)
// Open partitions.
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s, retentionMsecs, isReadOnly)
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, s)
if err != nil {
return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err)
}
@ -131,8 +127,6 @@ func openTable(path string, s *Storage, retentionMsecs int64, isReadOnly *uint32
smallPartitionsPath: smallPartitionsPath,
bigPartitionsPath: bigPartitionsPath,
s: s,
retentionMsecs: retentionMsecs,
isReadOnly: isReadOnly,
flockF: flockF,
@ -365,7 +359,7 @@ func (tb *table) AddRows(rows []rawRow) error {
continue
}
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s, tb.retentionMsecs, tb.isReadOnly)
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s)
if err != nil {
// Return only the first error, since it has no sense in returning all errors.
tb.ptwsLock.Unlock()
@ -381,7 +375,7 @@ func (tb *table) AddRows(rows []rawRow) error {
func (tb *table) getMinMaxTimestamps() (int64, int64) {
now := int64(fasttime.UnixTimestamp() * 1000)
minTimestamp := now - tb.retentionMsecs
minTimestamp := now - tb.s.retentionMsecs
maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
if minTimestamp < 0 {
// Negative timestamps aren't supported by the storage.
@ -411,7 +405,7 @@ func (tb *table) retentionWatcher() {
case <-ticker.C:
}
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMsecs
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.s.retentionMsecs
var ptwsDrop []*partitionWrapper
tb.ptwsLock.Lock()
dst := tb.ptws[:0]
@ -503,7 +497,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
}
}
func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage, retentionMsecs int64, isReadOnly *uint32) ([]*partition, error) {
func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) ([]*partition, error) {
// Certain partition directories in either `big` or `small` dir may be missing
// after restoring from backup. So populate partition names from both dirs.
ptNames := make(map[string]bool)
@ -517,7 +511,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage, r
for ptName := range ptNames {
smallPartsPath := smallPartitionsPath + "/" + ptName
bigPartsPath := bigPartitionsPath + "/" + ptName
pt, err := openPartition(smallPartsPath, bigPartsPath, s, retentionMsecs, isReadOnly)
pt, err := openPartition(smallPartsPath, bigPartsPath, s)
if err != nil {
mustClosePartitions(pts)
return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err)

View File

@ -66,7 +66,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) {
// Adjust tr.MinTimestamp, so it doesn't obtain data older
// than the tb retention.
now := int64(fasttime.UnixTimestamp() * 1000)
minTimestamp := now - tb.retentionMsecs
minTimestamp := now - tb.s.retentionMsecs
if tr.MinTimestamp < minTimestamp {
tr.MinTimestamp = minTimestamp
}

View File

@ -182,8 +182,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
// Create a table from rowss and test search on it.
strg := newTestStorage()
var isReadOnly uint32
tb, err := openTable("./test-table", strg, maxRetentionMsecs, &isReadOnly)
tb, err := openTable("./test-table", strg)
if err != nil {
t.Fatalf("cannot create table: %s", err)
}
@ -204,7 +203,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
tb.MustClose()
// Open the created table and test search on it.
tb, err = openTable("./test-table", strg, maxRetentionMsecs, &isReadOnly)
tb, err = openTable("./test-table", strg)
if err != nil {
t.Fatalf("cannot open table: %s", err)
}

View File

@ -45,8 +45,7 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount
createdBenchTables[path] = true
}
strg := newTestStorage()
var isReadOnly uint32
tb, err := openTable(path, strg, maxRetentionMsecs, &isReadOnly)
tb, err := openTable(path, strg)
if err != nil {
b.Fatalf("cnanot open table %q: %s", path, err)
}
@ -70,8 +69,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
b.Helper()
strg := newTestStorage()
var isReadOnly uint32
tb, err := openTable(path, strg, maxRetentionMsecs, &isReadOnly)
tb, err := openTable(path, strg)
if err != nil {
b.Fatalf("cannot open table %q: %s", path, err)
}

View File

@ -18,8 +18,8 @@ func TestTableOpenClose(t *testing.T) {
// Create a new table
strg := newTestStorage()
var isReadOnly uint32
tb, err := openTable(path, strg, retentionMsecs, &isReadOnly)
strg.retentionMsecs = retentionMsecs
tb, err := openTable(path, strg)
if err != nil {
t.Fatalf("cannot create new table: %s", err)
}
@ -29,7 +29,7 @@ func TestTableOpenClose(t *testing.T) {
// Re-open created table multiple times.
for i := 0; i < 10; i++ {
tb, err := openTable(path, strg, retentionMsecs, &isReadOnly)
tb, err := openTable(path, strg)
if err != nil {
t.Fatalf("cannot open created table: %s", err)
}
@ -46,15 +46,15 @@ func TestTableOpenMultipleTimes(t *testing.T) {
}()
strg := newTestStorage()
var isReadOnly uint32
tb1, err := openTable(path, strg, retentionMsecs, &isReadOnly)
strg.retentionMsecs = retentionMsecs
tb1, err := openTable(path, strg)
if err != nil {
t.Fatalf("cannot open table the first time: %s", err)
}
defer tb1.MustClose()
for i := 0; i < 10; i++ {
tb2, err := openTable(path, strg, retentionMsecs, &isReadOnly)
tb2, err := openTable(path, strg)
if err == nil {
tb2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened table")

View File

@ -47,8 +47,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
tablePath := "./benchmarkTableAddRows"
strg := newTestStorage()
for i := 0; i < b.N; i++ {
var isReadOnly uint32
tb, err := openTable(tablePath, strg, maxRetentionMsecs, &isReadOnly)
tb, err := openTable(tablePath, strg)
if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err)
}
@ -96,7 +95,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
tb.MustClose()
// Open the table from files and verify the rows count on it
tb, err = openTable(tablePath, strg, maxRetentionMsecs, &isReadOnly)
tb, err = openTable(tablePath, strg)
if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err)
}