diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index f778b4a9e..d760e61fe 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -39,7 +39,6 @@ type blockStreamWriter struct { func (bsw *blockStreamWriter) reset() { bsw.compressLevel = 0 - bsw.path = "" bsw.metaindexWriter = nil bsw.indexWriter = nil @@ -124,7 +123,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre bsw.reset() bsw.compressLevel = compressLevel - bsw.path = path bsw.metaindexWriter = metaindexFile bsw.indexWriter = indexFile @@ -151,12 +149,6 @@ func (bsw *blockStreamWriter) MustClose() { bsw.itemsWriter.MustClose() bsw.lensWriter.MustClose() - // Sync bsw.path contents to make sure it doesn't disappear - // after system crash or power loss. - if bsw.path != "" { - fs.MustSyncPath(bsw.path) - } - bsw.reset() } diff --git a/lib/mergeset/part_header.go b/lib/mergeset/part_header.go index 288bd05cf..1f46907ec 100644 --- a/lib/mergeset/part_header.go +++ b/lib/mergeset/part_header.go @@ -125,7 +125,10 @@ func (ph *partHeader) WriteMetadata(partPath string) error { logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err) } metadataPath := filepath.Join(partPath, metadataFilename) - if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil { + // There is no need in calling fs.WriteFileAtomically() here, + // since the file is created only once during part creatinng + // and the part directory is synced aftewards. + if err := fs.WriteFileAndSync(metadataPath, metadata); err != nil { return fmt.Errorf("cannot create %q: %w", metadataPath, err) } return nil diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 6fb107835..fdbe09849 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -1143,6 +1143,9 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal if mpNew != nil { // Update partHeader for destination inmemory part after the merge. mpNew.ph = *ph + } else { + // Make sure the created part directory listing is synced. + fs.MustSyncPath(dstPartPath) } // Atomically swap the source parts with the newly created part. diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index 1c5745c76..e55811948 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -56,7 +56,6 @@ func (bsw *blockStreamWriter) assertWriteClosers() { // Init initializes bsw with the given writers. func (bsw *blockStreamWriter) reset() { bsw.compressLevel = 0 - bsw.path = "" bsw.timestampsWriter = nil bsw.valuesWriter = nil @@ -142,7 +141,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre bsw.reset() bsw.compressLevel = compressLevel - bsw.path = path bsw.timestampsWriter = timestampsFile bsw.valuesWriter = valuesFile @@ -171,12 +169,6 @@ func (bsw *blockStreamWriter) MustClose() { bsw.indexWriter.MustClose() bsw.metaindexWriter.MustClose() - // Sync bsw.path contents to make sure it doesn't disappear - // after system crash or power loss. - if bsw.path != "" { - fs.MustSyncPath(bsw.path) - } - bsw.reset() } diff --git a/lib/storage/part_header.go b/lib/storage/part_header.go index df1962ff7..40b17237a 100644 --- a/lib/storage/part_header.go +++ b/lib/storage/part_header.go @@ -171,7 +171,10 @@ func (ph *partHeader) WriteMetadata(partPath string) error { logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err) } metadataPath := filepath.Join(partPath, metadataFilename) - if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil { + // There is no need in calling fs.WriteFileAtomically() here, + // since the file is created only once during part creatinng + // and the part directory is synced aftewards. + if err := fs.WriteFileAndSync(metadataPath, metadata); err != nil { return fmt.Errorf("cannot create %q: %w", metadataPath, err) } return nil diff --git a/lib/storage/partition.go b/lib/storage/partition.go index af61a5956..1f99c901f 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1324,6 +1324,9 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi if mpNew != nil { // Update partHeader for destination inmemory part after the merge. mpNew.ph = *ph + } else { + // Make sure the created part directory listing is synced. + fs.MustSyncPath(dstPartPath) } // Atomically swap the source parts with the newly created part.