diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 9ac0de4633..4b9c6d4a8d 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -387,8 +387,7 @@ func mustLoadRollupResultCacheKeyPrefix(path string) { func mustSaveRollupResultCacheKeyPrefix(path string) { path = path + ".key.prefix" data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix) - fs.MustRemoveAll(path) - if err := fs.WriteFileAtomically(path, data); err != nil { + if err := fs.WriteFileAtomically(path, data, true); err != nil { logger.Fatalf("cannot store rollupResult cache key prefix to %q: %s", path, err) } } diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 925f160122..14ae2d6823 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -29,11 +29,14 @@ func MustSyncPath(path string) { // // WriteFileAtomically returns only after the file is fully written and synced // to the underlying storage. -func WriteFileAtomically(path string, data []byte) error { +// +// If the file at path already exists, then the file is overwritten atomically if canOverwrite is true. +// Otherwise error is returned. +func WriteFileAtomically(path string, data []byte, canOverwrite bool) error { // Check for the existing file. It is expected that // the WriteFileAtomically function cannot be called concurrently // with the same `path`. - if IsPathExist(path) { + if IsPathExist(path) && !canOverwrite { return fmt.Errorf("cannot create file %q, since it already exists", path) } diff --git a/lib/mergeset/part_header.go b/lib/mergeset/part_header.go index 785f54ba80..43a5ea43b3 100644 --- a/lib/mergeset/part_header.go +++ b/lib/mergeset/part_header.go @@ -163,7 +163,7 @@ func (ph *partHeader) WriteMetadata(partPath string) error { return fmt.Errorf("cannot marshal metadata: %w", err) } metadataPath := partPath + "/metadata.json" - if err := fs.WriteFileAtomically(metadataPath, metadata); err != nil { + if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil { return fmt.Errorf("cannot create %q: %w", metadataPath, err) } return nil diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 25d535d9c4..d4d5f3620c 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -580,7 +580,7 @@ func (tb *Table) convertToV1280() { logger.Infof("finished round 2 of background conversion of %q to v1.28.0 format in %.3f seconds", tb.path, time.Since(startTime).Seconds()) } - if err := fs.WriteFileAtomically(flagFilePath, []byte("ok")); err != nil { + if err := fs.WriteFileAtomically(flagFilePath, []byte("ok"), false); err != nil { logger.Panicf("FATAL: cannot create %q: %s", flagFilePath, err) } } @@ -975,7 +975,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP dstPartPath := ph.Path(tb.path, mergeIdx) fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath) txnPath := fmt.Sprintf("%s/txn/%016X", tb.path, mergeIdx) - if err := fs.WriteFileAtomically(txnPath, bb.B); err != nil { + if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil { return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err) } diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 31a17f171a..dea3dc216b 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -211,7 +211,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB // Create initial chunk file. filepath := q.chunkFilePath(0) - if err := fs.WriteFileAtomically(filepath, nil); err != nil { + if err := fs.WriteFileAtomically(filepath, nil, false); err != nil { return nil, fmt.Errorf("cannot create %q: %w", filepath, err) } } diff --git a/lib/storage/part_header.go b/lib/storage/part_header.go index 1e21e28235..8896dacd82 100644 --- a/lib/storage/part_header.go +++ b/lib/storage/part_header.go @@ -151,7 +151,7 @@ func (ph *partHeader) writeMinDedupInterval(partPath string) error { filePath := partPath + "/min_dedup_interval" dedupInterval := time.Duration(ph.MinDedupInterval) * time.Millisecond data := dedupInterval.String() - if err := fs.WriteFileAtomically(filePath, []byte(data)); err != nil { + if err := fs.WriteFileAtomically(filePath, []byte(data), false); err != nil { return fmt.Errorf("cannot create %q: %w", filePath, err) } return nil diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 8d6a065e20..797c57bdbd 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1246,7 +1246,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro } fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath) txnPath := fmt.Sprintf("%s/txn/%016X", ptPath, mergeIdx) - if err := fs.WriteFileAtomically(txnPath, bb.B); err != nil { + if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil { return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index bfaa34fa2a..efeb2ae76e 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1076,10 +1076,7 @@ func mustGetMinTimestampForCompositeIndex(metadataDir string, isEmptyDB bool) in } minTimestamp = date * msecPerDay dateBuf := encoding.MarshalInt64(nil, minTimestamp) - if err := os.RemoveAll(path); err != nil { - logger.Fatalf("cannot remove a file with minTimestampForCompositeIndex: %s", err) - } - if err := fs.WriteFileAtomically(path, dateBuf); err != nil { + if err := fs.WriteFileAtomically(path, dateBuf, true); err != nil { logger.Fatalf("cannot store minTimestampForCompositeIndex: %s", err) } return minTimestamp