VictoriaMetrics/lib/filestream/filestream.go
rtm0 602bedf362
testing: allow disabling fsync to make tests run faster (#6871)
### Describe Your Changes

fsync() ensures that the data is written to disk. In production this is
needed for data durability. However, during the development, when the
unit tests are run, this level of durability is not needed. Therefore
fsync() can be disabled which will makes test runs two times faster.

The disabling is done by setting the `DISABLE_FSYNC_FOR_TESTING`
environment variable. The valid values for this variable are the same as
the values of the arg of `go doc strconv.ParseBool`:

```
1, t, T, TRUE, true, True, 0, f, F, FALSE, false, False.
```

Any other value means `false`.

The variable is set for all test build targets. Compare running times:

Build Target | DISABLE_FSYNC_FOR_TESTING=0 | DISABLE_FSYNC_FOR_TESTING=1
----------------- | ------------------------------------------------ |
-------------------------------------------------
make test | 1m5s  | 0m22s
make test-race | 3m1s | 1m42s
make test-pure | 1m7s | 0m20s
make test-full | 1m21s | 0m32s
make test-full-386 | 1m42s | 0m36s

When running tests for a given package, fsync can be disabled as
follows:

```shell
DISABLE_FSYNC_FOR_TESTING=1 go test ./lib/storage
```

Disabling fsync() is intended for testing purposes only and the name of
the variables reflects that.

What could also have been done but haven't:

- lib/filestream/filestream.go: `Writer.MustFlush()` also uses f.Sync()
but nothing has been done to it, because the Writer.MustFlush() is not
used anywhere in the VM codebase. A side question: what is the general
policy for the unused code?
- lib/filestream/filestream.go: Writer.Write() calls `adviceDontNeed()`
which calls unix.Fdatasync(). Disabling it could potentially improve
running time, but running tests with this code disabled has shown
otherwise.

### Checklist

The following checks are **mandatory**:

- [ x] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com>

(cherry picked from commit 334cd92a6c)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
2024-08-30 11:18:21 +02:00

341 lines
8.4 KiB
Go

package filestream
import (
"bufio"
"flag"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/metrics"
)
var disableFadvise = flag.Bool("filestream.disableFadvise", false, "Whether to disable fadvise() syscall when reading large data files. "+
"The fadvise() syscall prevents from eviction of recently accessed data from OS page cache during background merges and backups. "+
"In some rare cases it is better to disable the syscall if it uses too much CPU")
var disableFSyncForTesting = envutil.GetenvBool("DISABLE_FSYNC_FOR_TESTING")
const dontNeedBlockSize = 16 * 1024 * 1024
// ReadCloser is a standard interface for filestream Reader.
type ReadCloser interface {
Path() string
Read(p []byte) (int, error)
MustClose()
}
// WriteCloser is a standard interface for filestream Writer.
type WriteCloser interface {
Path() string
Write(p []byte) (int, error)
MustClose()
}
func getBufferSize() int {
bufferSizeOnce.Do(func() {
n := memory.Allowed() / 1024 / 8
if n < 4*1024 {
n = 4 * 1024
}
if n > 512*1024 {
n = 512 * 1024
}
bufferSize = n
})
return bufferSize
}
var (
bufferSize int
bufferSizeOnce sync.Once
)
// Reader implements buffered file reader.
type Reader struct {
f *os.File
br *bufio.Reader
st streamTracker
}
// Path returns the path to r
func (r *Reader) Path() string {
return r.f.Name()
}
// OpenReaderAt opens the file at the given path in nocache mode at the given offset.
//
// If nocache is set, then the reader doesn't pollute OS page cache.
func OpenReaderAt(path string, offset int64, nocache bool) (*Reader, error) {
r := MustOpen(path, nocache)
n, err := r.f.Seek(offset, io.SeekStart)
if err != nil {
r.MustClose()
return nil, fmt.Errorf("cannot seek to offset=%d for %q: %w", offset, path, err)
}
if n != offset {
r.MustClose()
return nil, fmt.Errorf("invalid seek offset for %q; got %d; want %d", path, n, offset)
}
return r, nil
}
// MustOpen opens the file from the given path in nocache mode.
//
// If nocache is set, then the reader doesn't pollute OS page cache.
func MustOpen(path string, nocache bool) *Reader {
f, err := os.Open(path)
if err != nil {
logger.Panicf("FATAL: cannot open file: %s", err)
}
r := &Reader{
f: f,
br: getBufioReader(f),
}
if *disableFadvise {
// Unconditionally disable fadvise() syscall
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5120 for details on why this is needed
nocache = false
}
if nocache {
r.st.fd = f.Fd()
}
readersCount.Inc()
return r
}
// MustClose closes the underlying file passed to MustOpen.
func (r *Reader) MustClose() {
if err := r.st.close(); err != nil {
logger.Panicf("FATAL: cannot close streamTracker for file %q: %s", r.f.Name(), err)
}
if err := r.f.Close(); err != nil {
logger.Panicf("FATAL: cannot close file %q: %s", r.f.Name(), err)
}
r.f = nil
putBufioReader(r.br)
r.br = nil
readersCount.Dec()
}
var (
readDuration = metrics.NewFloatCounter(`vm_filestream_read_duration_seconds_total`)
readCallsBuffered = metrics.NewCounter(`vm_filestream_buffered_read_calls_total`)
readCallsReal = metrics.NewCounter(`vm_filestream_real_read_calls_total`)
readBytesBuffered = metrics.NewCounter(`vm_filestream_buffered_read_bytes_total`)
readBytesReal = metrics.NewCounter(`vm_filestream_real_read_bytes_total`)
readersCount = metrics.NewCounter(`vm_filestream_readers`)
)
// Read reads file contents to p.
func (r *Reader) Read(p []byte) (int, error) {
readCallsBuffered.Inc()
n, err := r.br.Read(p)
readBytesBuffered.Add(n)
if err != nil {
return n, err
}
if err := r.st.adviseDontNeed(n, false); err != nil {
return n, fmt.Errorf("advise error for %q: %w", r.f.Name(), err)
}
return n, nil
}
type statReader struct {
*os.File
}
func (sr *statReader) Read(p []byte) (int, error) {
startTime := time.Now()
readCallsReal.Inc()
n, err := sr.File.Read(p)
d := time.Since(startTime).Seconds()
readDuration.Add(d)
readBytesReal.Add(n)
return n, err
}
func getBufioReader(f *os.File) *bufio.Reader {
sr := &statReader{f}
v := brPool.Get()
if v == nil {
return bufio.NewReaderSize(sr, getBufferSize())
}
br := v.(*bufio.Reader)
br.Reset(sr)
return br
}
func putBufioReader(br *bufio.Reader) {
brPool.Put(br)
}
var brPool sync.Pool
// Writer implements buffered file writer.
type Writer struct {
f *os.File
bw *bufio.Writer
st streamTracker
}
// Path returns the path to r
func (w *Writer) Path() string {
return w.f.Name()
}
// OpenWriterAt opens the file at path in nocache mode for writing at the given offset.
//
// The file at path is created if it is missing.
//
// If nocache is set, the writer doesn't pollute OS page cache.
func OpenWriterAt(path string, offset int64, nocache bool) (*Writer, error) {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
n, err := f.Seek(offset, io.SeekStart)
if err != nil {
_ = f.Close()
return nil, fmt.Errorf("cannot seek to offset=%d in %q: %w", offset, path, err)
}
if n != offset {
_ = f.Close()
return nil, fmt.Errorf("invalid seek offset for %q; got %d; want %d", path, n, offset)
}
return newWriter(f, nocache), nil
}
// MustCreate creates the file for the given path in nocache mode.
//
// If nocache is set, the writer doesn't pollute OS page cache.
func MustCreate(path string, nocache bool) *Writer {
f, err := os.Create(path)
if err != nil {
logger.Panicf("FATAL: cannot create file %q: %s", path, err)
}
return newWriter(f, nocache)
}
func newWriter(f *os.File, nocache bool) *Writer {
w := &Writer{
f: f,
bw: getBufioWriter(f),
}
if nocache {
w.st.fd = f.Fd()
}
writersCount.Inc()
return w
}
// MustClose syncs the underlying file to storage and then closes it.
func (w *Writer) MustClose() {
if err := w.bw.Flush(); err != nil {
logger.Panicf("FATAL: cannot flush buffered data to file %q: %s", w.f.Name(), err)
}
putBufioWriter(w.bw)
w.bw = nil
if !disableFSyncForTesting {
if err := w.f.Sync(); err != nil {
logger.Panicf("FATAL: cannot sync file %q: %d", w.f.Name(), err)
}
}
if err := w.st.close(); err != nil {
logger.Panicf("FATAL: cannot close streamTracker for file %q: %s", w.f.Name(), err)
}
if err := w.f.Close(); err != nil {
logger.Panicf("FATAL: cannot close file %q: %s", w.f.Name(), err)
}
w.f = nil
writersCount.Dec()
}
var (
writeDuration = metrics.NewFloatCounter(`vm_filestream_write_duration_seconds_total`)
writeCallsBuffered = metrics.NewCounter(`vm_filestream_buffered_write_calls_total`)
writeCallsReal = metrics.NewCounter(`vm_filestream_real_write_calls_total`)
writtenBytesBuffered = metrics.NewCounter(`vm_filestream_buffered_written_bytes_total`)
writtenBytesReal = metrics.NewCounter(`vm_filestream_real_written_bytes_total`)
writersCount = metrics.NewCounter(`vm_filestream_writers`)
)
// Write writes p to the underlying file.
func (w *Writer) Write(p []byte) (int, error) {
writeCallsBuffered.Inc()
n, err := w.bw.Write(p)
writtenBytesBuffered.Add(n)
if err != nil {
return n, err
}
if err := w.st.adviseDontNeed(n, true); err != nil {
return n, fmt.Errorf("advise error for %q: %w", w.f.Name(), err)
}
return n, nil
}
// MustFlush flushes all the buffered data to file.
//
// if isSync is true, then the flushed data is fsynced to the underlying storage.
func (w *Writer) MustFlush(isSync bool) {
startTime := time.Now()
defer func() {
d := time.Since(startTime).Seconds()
writeDuration.Add(d)
}()
if err := w.bw.Flush(); err != nil {
logger.Panicf("FATAL: cannot flush buffered data to file %q: %s", w.f.Name(), err)
}
if isSync {
if err := w.f.Sync(); err != nil {
logger.Panicf("FATAL: cannot fsync data to the underlying storage for file %q: %s", w.f.Name(), err)
}
}
}
type statWriter struct {
*os.File
}
func (sw *statWriter) Write(p []byte) (int, error) {
startTime := time.Now()
writeCallsReal.Inc()
n, err := sw.File.Write(p)
d := time.Since(startTime).Seconds()
writeDuration.Add(d)
writtenBytesReal.Add(n)
return n, err
}
func getBufioWriter(f *os.File) *bufio.Writer {
sw := &statWriter{f}
v := bwPool.Get()
if v == nil {
return bufio.NewWriterSize(sw, getBufferSize())
}
bw := v.(*bufio.Writer)
bw.Reset(sw)
return bw
}
func putBufioWriter(bw *bufio.Writer) {
bwPool.Put(bw)
}
var bwPool sync.Pool
type streamTracker struct {
fd uintptr
offset uint64
length uint64
}