lib/storage: populate partition names from both small and big directories

Certain partition directories may be missing after restoring from backups
if they had no data. Re-create such directories on start.
This commit is contained in:
Aliaksandr Valialkin 2019-11-06 19:48:01 +02:00
parent 1c777e0245
commit 89c03a5464

View File

@ -436,27 +436,17 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
}
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) ([]*partition, error) {
smallD, err := os.Open(smallPartitionsPath)
if err != nil {
return nil, fmt.Errorf("cannot open directory with small partitions %q: %s", smallPartitionsPath, err)
// Certain partition directories in either `big` or `small` dir may be missing
// after restoring from backup. So populate partition names from both dirs.
ptNames := make(map[string]bool)
if err := populatePartitionNames(smallPartitionsPath, ptNames); err != nil {
return nil, err
}
defer fs.MustClose(smallD)
fis, err := smallD.Readdir(-1)
if err != nil {
return nil, fmt.Errorf("cannot read directory with small partitions %q: %s", smallPartitionsPath, err)
if err := populatePartitionNames(bigPartitionsPath, ptNames); err != nil {
return nil, err
}
var pts []*partition
for _, fi := range fis {
if !fs.IsDirOrSymlink(fi) {
// Skip non-directories
continue
}
ptName := fi.Name()
if ptName == "snapshots" {
// Skipe directory with snapshots
continue
}
for ptName := range ptNames {
smallPartsPath := smallPartitionsPath + "/" + ptName
bigPartsPath := bigPartitionsPath + "/" + ptName
pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs)
@ -469,6 +459,32 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMet
return pts, nil
}
func populatePartitionNames(partitionsPath string, ptNames map[string]bool) error {
d, err := os.Open(partitionsPath)
if err != nil {
return fmt.Errorf("cannot open directory with partitions %q: %s", partitionsPath, err)
}
defer fs.MustClose(d)
fis, err := d.Readdir(-1)
if err != nil {
return fmt.Errorf("cannot read directory with partitions %q: %s", partitionsPath, err)
}
for _, fi := range fis {
if !fs.IsDirOrSymlink(fi) {
// Skip non-directories
continue
}
ptName := fi.Name()
if ptName == "snapshots" {
// Skip directory with snapshots
continue
}
ptNames[ptName] = true
}
return nil
}
func mustClosePartitions(pts []*partition) {
for _, pt := range pts {
pt.MustClose()