mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-19 23:16:26 +01:00
cb23685681
vmselect uses a cache folder in file system for two purposes: 1. Storing rollup cache results on shutdown; 2. Storing temporary search results from vmstorage during query executions. It could happen that cache folder is deleted accidentally by user, or by OS during cleanup routines. This would cause vmselect to: 1. panic on /metrics call, because `MustGetFreeSpace` will fail; 2. return query error user, as it won't be able to store temporary search results. The changes in this commit are the following: 1. Make `MustGetFreeSpace` to try re-creating the cache folder if it is missing; 2. Make vmselect to try re-creating the cache folder if it can't persist tmp search results. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5985 Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: Nikolay <nik@victoriametrics.com>
234 lines
6.0 KiB
Go
234 lines
6.0 KiB
Go
package netstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// InitTmpBlocksDir initializes directory to store temporary search results.
|
|
//
|
|
// It stores data in system-defined temporary directory if tmpDirPath is empty.
|
|
func InitTmpBlocksDir(tmpDirPath string) {
|
|
if len(tmpDirPath) == 0 {
|
|
tmpDirPath = os.TempDir()
|
|
}
|
|
tmpBlocksDir = filepath.Join(tmpDirPath, "searchResults")
|
|
fs.MustRemoveAll(tmpBlocksDir)
|
|
fs.MustMkdirIfNotExist(tmpBlocksDir)
|
|
}
|
|
|
|
var tmpBlocksDir string
|
|
|
|
func maxInmemoryTmpBlocksFile() int {
|
|
mem := memory.Allowed()
|
|
maxLen := mem / 1024
|
|
if maxLen < 64*1024 {
|
|
return 64 * 1024
|
|
}
|
|
if maxLen > 4*1024*1024 {
|
|
return 4 * 1024 * 1024
|
|
}
|
|
return maxLen
|
|
}
|
|
|
|
var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 {
|
|
return float64(maxInmemoryTmpBlocksFile())
|
|
})
|
|
|
|
type tmpBlocksFile struct {
|
|
buf []byte
|
|
|
|
f *os.File
|
|
r *fs.ReaderAt
|
|
|
|
offset uint64
|
|
}
|
|
|
|
func getTmpBlocksFile() *tmpBlocksFile {
|
|
v := tmpBlocksFilePool.Get()
|
|
if v == nil {
|
|
return &tmpBlocksFile{
|
|
buf: make([]byte, 0, maxInmemoryTmpBlocksFile()),
|
|
}
|
|
}
|
|
return v.(*tmpBlocksFile)
|
|
}
|
|
|
|
func putTmpBlocksFile(tbf *tmpBlocksFile) {
|
|
tbf.MustClose()
|
|
tbf.buf = tbf.buf[:0]
|
|
tbf.f = nil
|
|
tbf.r = nil
|
|
tbf.offset = 0
|
|
tmpBlocksFilePool.Put(tbf)
|
|
}
|
|
|
|
var tmpBlocksFilePool sync.Pool
|
|
|
|
type tmpBlockAddr struct {
|
|
offset uint64
|
|
size int
|
|
}
|
|
|
|
func (addr tmpBlockAddr) String() string {
|
|
return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size)
|
|
}
|
|
|
|
var (
|
|
tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
|
|
_ = metrics.NewGauge(`vm_tmp_blocks_files_directory_free_bytes`, func() float64 {
|
|
return float64(fs.MustGetFreeSpace(tmpBlocksDir))
|
|
})
|
|
)
|
|
|
|
// WriteBlockRefData writes br to tbf.
|
|
//
|
|
// It returns errors since the operation may fail on space shortage
|
|
// and this must be handled.
|
|
func (tbf *tmpBlocksFile) WriteBlockRefData(b []byte) (tmpBlockAddr, error) {
|
|
var addr tmpBlockAddr
|
|
addr.offset = tbf.offset
|
|
addr.size = len(b)
|
|
tbf.offset += uint64(addr.size)
|
|
if len(tbf.buf)+len(b) <= cap(tbf.buf) {
|
|
// Fast path - the data fits tbf.buf
|
|
tbf.buf = append(tbf.buf, b...)
|
|
return addr, nil
|
|
}
|
|
|
|
// Slow path: flush the data from tbf.buf to file.
|
|
if tbf.f == nil {
|
|
f, err := createTemp(tmpBlocksDir)
|
|
if err != nil {
|
|
return addr, err
|
|
}
|
|
tbf.f = f
|
|
tmpBlocksFilesCreated.Inc()
|
|
}
|
|
_, err := tbf.f.Write(tbf.buf)
|
|
tbf.buf = append(tbf.buf[:0], b...)
|
|
if err != nil {
|
|
return addr, fmt.Errorf("cannot write block to %q: %w", tbf.f.Name(), err)
|
|
}
|
|
return addr, nil
|
|
}
|
|
|
|
// createTemp creates new temporary file in the path dir.
|
|
// If path doesn't exist, it will try creating it.
|
|
func createTemp(path string) (*os.File, error) {
|
|
f, err := os.CreateTemp(path, "")
|
|
if err == nil {
|
|
return f, nil
|
|
}
|
|
if os.IsNotExist(err) || strings.Contains(err.Error(), "no such file or directory") {
|
|
// try re-creating the path and trying again
|
|
fs.MustMkdirIfNotExist(path)
|
|
return os.CreateTemp(path, "")
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// Len() returnt tbf size in bytes.
|
|
func (tbf *tmpBlocksFile) Len() uint64 {
|
|
return tbf.offset
|
|
}
|
|
|
|
func (tbf *tmpBlocksFile) Finalize() error {
|
|
if tbf.f == nil {
|
|
return nil
|
|
}
|
|
fname := tbf.f.Name()
|
|
if _, err := tbf.f.Write(tbf.buf); err != nil {
|
|
return fmt.Errorf("cannot write the remaining %d bytes to %q: %w", len(tbf.buf), fname, err)
|
|
}
|
|
tbf.buf = tbf.buf[:0]
|
|
r := fs.NewReaderAt(tbf.f)
|
|
|
|
// Hint the OS that the file is read almost sequentiallly.
|
|
// This should reduce the number of disk seeks, which is important
|
|
// for HDDs.
|
|
r.MustFadviseSequentialRead(true)
|
|
|
|
// Collect local stats in order to improve performance on systems with big number of CPU cores.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966
|
|
r.SetUseLocalStats()
|
|
|
|
tbf.r = r
|
|
tbf.f = nil
|
|
return nil
|
|
}
|
|
|
|
func (tbf *tmpBlocksFile) MustReadBlockRefAt(partRef storage.PartRef, addr tmpBlockAddr) storage.BlockRef {
|
|
var buf []byte
|
|
if tbf.r == nil {
|
|
buf = tbf.buf[addr.offset : addr.offset+uint64(addr.size)]
|
|
} else {
|
|
bb := tmpBufPool.Get()
|
|
defer tmpBufPool.Put(bb)
|
|
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, addr.size)
|
|
tbf.r.MustReadAt(bb.B, int64(addr.offset))
|
|
buf = bb.B
|
|
}
|
|
var br storage.BlockRef
|
|
if err := br.Init(partRef, buf); err != nil {
|
|
logger.Panicf("FATAL: cannot initialize BlockRef: %s", err)
|
|
}
|
|
return br
|
|
}
|
|
|
|
var tmpBufPool bytesutil.ByteBufferPool
|
|
|
|
func (tbf *tmpBlocksFile) MustClose() {
|
|
if tbf.f != nil {
|
|
// tbf.f could be non-nil if Finalize wasn't called.
|
|
// In this case tbf.r must be nil.
|
|
if tbf.r != nil {
|
|
logger.Panicf("BUG: tbf.r must be nil when tbf.f!=nil")
|
|
}
|
|
|
|
// Try removing the file before closing it in order to prevent from flushing the in-memory data
|
|
// from page cache to the disk and save disk write IO. This may fail on non-posix systems such as Windows.
|
|
// Gracefully handle this case by attempting to remove the file after closing it.
|
|
fname := tbf.f.Name()
|
|
errRemove := os.Remove(fname)
|
|
if err := tbf.f.Close(); err != nil {
|
|
logger.Panicf("FATAL: cannot close %q: %s", fname, err)
|
|
}
|
|
if errRemove != nil {
|
|
if err := os.Remove(fname); err != nil {
|
|
logger.Panicf("FATAL: cannot remove %q: %s", fname, err)
|
|
}
|
|
}
|
|
tbf.f = nil
|
|
return
|
|
}
|
|
|
|
if tbf.r == nil {
|
|
// Nothing to do
|
|
return
|
|
}
|
|
|
|
// Try removing the file before closing it in order to prevent from flushing the in-memory data
|
|
// from page cache to the disk and save disk write IO. This may fail on non-posix systems such as Windows.
|
|
// Gracefully handle this case by attempting to remove the file after closing it.
|
|
fname := tbf.r.Path()
|
|
errRemove := os.Remove(fname)
|
|
tbf.r.MustClose()
|
|
if errRemove != nil {
|
|
if err := os.Remove(fname); err != nil {
|
|
logger.Panicf("FATAL: cannot remove %q: %s", fname, err)
|
|
}
|
|
}
|
|
tbf.r = nil
|
|
}
|