2019-05-22 23:16:55 +02:00
package netstorage
import (
"fmt"
"os"
2023-04-14 07:11:56 +02:00
"path/filepath"
2019-05-22 23:16:55 +02:00
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2024-08-26 14:37:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-09-03 11:27:21 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
2024-08-26 14:37:45 +02:00
var tmpBufSize = flagutil . NewBytes ( "search.inmemoryBufSizeBytes" , 0 , "Size for in-memory data blocks used during processing search requests. " +
"By default, the size is automatically calculated based on available memory. " +
"Adjust this flag value if you observe that vm_tmp_blocks_max_inmemory_file_size_bytes metric constantly shows much higher values than vm_tmp_blocks_inmemory_file_size_bytes. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6851" )
2019-05-22 23:16:55 +02:00
// InitTmpBlocksDir initializes directory to store temporary search results.
//
// It stores data in system-defined temporary directory if tmpDirPath is empty.
func InitTmpBlocksDir ( tmpDirPath string ) {
if len ( tmpDirPath ) == 0 {
tmpDirPath = os . TempDir ( )
}
2023-04-14 07:11:56 +02:00
tmpBlocksDir = filepath . Join ( tmpDirPath , "searchResults" )
2019-06-12 00:53:43 +02:00
fs . MustRemoveAll ( tmpBlocksDir )
2023-04-14 07:11:56 +02:00
fs . MustMkdirIfNotExist ( tmpBlocksDir )
2019-05-22 23:16:55 +02:00
}
var tmpBlocksDir string
2019-09-03 11:27:21 +02:00
func maxInmemoryTmpBlocksFile ( ) int {
2024-08-26 14:37:45 +02:00
if tmpBufSize . IntN ( ) > 0 {
return tmpBufSize . IntN ( )
}
2019-09-03 11:27:21 +02:00
mem := memory . Allowed ( )
maxLen := mem / 1024
if maxLen < 64 * 1024 {
return 64 * 1024
}
2020-01-17 15:27:16 +01:00
if maxLen > 4 * 1024 * 1024 {
2020-01-17 16:46:20 +01:00
return 4 * 1024 * 1024
2020-01-17 15:27:16 +01:00
}
2019-09-03 11:27:21 +02:00
return maxLen
}
2024-08-26 14:37:45 +02:00
var (
_ = metrics . NewGauge ( ` vm_tmp_blocks_max_inmemory_file_size_bytes ` , func ( ) float64 {
return float64 ( maxInmemoryTmpBlocksFile ( ) )
} )
tmpBufSizeSummary = metrics . NewSummary ( ` vm_tmp_blocks_inmemory_file_size_bytes ` )
)
2019-05-22 23:16:55 +02:00
type tmpBlocksFile struct {
buf [ ] byte
2019-09-03 11:27:21 +02:00
f * os . File
2020-01-30 14:03:24 +01:00
r * fs . ReaderAt
2019-05-22 23:16:55 +02:00
offset uint64
}
func getTmpBlocksFile ( ) * tmpBlocksFile {
v := tmpBlocksFilePool . Get ( )
if v == nil {
2019-09-03 11:27:21 +02:00
return & tmpBlocksFile {
buf : make ( [ ] byte , 0 , maxInmemoryTmpBlocksFile ( ) ) ,
}
2019-05-22 23:16:55 +02:00
}
return v . ( * tmpBlocksFile )
}
func putTmpBlocksFile ( tbf * tmpBlocksFile ) {
tbf . MustClose ( )
2024-08-26 14:37:45 +02:00
bufLen := tbf . Len ( )
tmpBufSizeSummary . Update ( float64 ( bufLen ) )
2019-05-22 23:16:55 +02:00
tbf . buf = tbf . buf [ : 0 ]
tbf . f = nil
2020-01-30 14:03:24 +01:00
tbf . r = nil
2019-05-22 23:16:55 +02:00
tbf . offset = 0
tmpBlocksFilePool . Put ( tbf )
}
var tmpBlocksFilePool sync . Pool
type tmpBlockAddr struct {
offset uint64
size int
2022-10-01 21:05:43 +02:00
tbfIdx uint
2019-05-22 23:16:55 +02:00
}
func ( addr tmpBlockAddr ) String ( ) string {
2022-08-11 22:22:53 +02:00
return fmt . Sprintf ( "offset %d, size %d, tbfIdx %d" , addr . offset , addr . size , addr . tbfIdx )
2019-05-22 23:16:55 +02:00
}
2020-06-04 12:05:50 +02:00
var (
tmpBlocksFilesCreated = metrics . NewCounter ( ` vm_tmp_blocks_files_created_total ` )
2020-06-04 12:13:00 +02:00
_ = metrics . NewGauge ( ` vm_tmp_blocks_files_directory_free_bytes ` , func ( ) float64 {
2020-06-04 12:05:50 +02:00
return float64 ( fs . MustGetFreeSpace ( tmpBlocksDir ) )
} )
)
2019-05-22 23:16:55 +02:00
2019-09-28 19:38:24 +02:00
// WriteBlockData writes b to tbf.
2019-05-22 23:16:55 +02:00
//
// It returns errors since the operation may fail on space shortage
// and this must be handled.
2022-10-01 21:05:43 +02:00
func ( tbf * tmpBlocksFile ) WriteBlockData ( b [ ] byte , tbfIdx uint ) ( tmpBlockAddr , error ) {
2019-05-22 23:16:55 +02:00
var addr tmpBlockAddr
2022-08-11 22:22:53 +02:00
addr . tbfIdx = tbfIdx
2019-05-22 23:16:55 +02:00
addr . offset = tbf . offset
2019-09-28 19:38:24 +02:00
addr . size = len ( b )
2019-05-22 23:16:55 +02:00
tbf . offset += uint64 ( addr . size )
2019-09-28 19:38:24 +02:00
if len ( tbf . buf ) + len ( b ) <= cap ( tbf . buf ) {
2019-09-03 11:27:21 +02:00
// Fast path - the data fits tbf.buf
2019-09-28 19:38:24 +02:00
tbf . buf = append ( tbf . buf , b ... )
2019-05-22 23:16:55 +02:00
return addr , nil
}
2019-09-03 11:27:21 +02:00
// Slow path: flush the data from tbf.buf to file.
2019-05-22 23:16:55 +02:00
if tbf . f == nil {
2024-03-30 06:29:24 +01:00
f , err := os . CreateTemp ( tmpBlocksDir , "" )
2019-05-22 23:16:55 +02:00
if err != nil {
return addr , err
}
tbf . f = f
tmpBlocksFilesCreated . Inc ( )
}
2019-09-03 11:27:21 +02:00
_ , err := tbf . f . Write ( tbf . buf )
2019-09-28 19:38:24 +02:00
tbf . buf = append ( tbf . buf [ : 0 ] , b ... )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return addr , fmt . Errorf ( "cannot write block to %q: %w" , tbf . f . Name ( ) , err )
2019-05-22 23:16:55 +02:00
}
return addr , nil
}
2022-06-01 01:31:40 +02:00
// Len() returnt tbf size in bytes.
func ( tbf * tmpBlocksFile ) Len ( ) uint64 {
return tbf . offset
}
2019-05-22 23:16:55 +02:00
func ( tbf * tmpBlocksFile ) Finalize ( ) error {
if tbf . f == nil {
return nil
}
2020-01-30 14:03:24 +01:00
fname := tbf . f . Name ( )
2019-09-03 11:27:21 +02:00
if _ , err := tbf . f . Write ( tbf . buf ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot write the remaining %d bytes to %q: %w" , len ( tbf . buf ) , fname , err )
2019-09-03 11:27:21 +02:00
}
tbf . buf = tbf . buf [ : 0 ]
2024-02-01 18:09:03 +01:00
r := fs . NewReaderAt ( tbf . f )
2024-08-06 14:54:49 +02:00
// Hint the OS that the file is read almost sequentially.
2019-09-29 23:11:01 +02:00
// This should reduce the number of disk seeks, which is important
// for HDDs.
2020-01-30 14:03:24 +01:00
r . MustFadviseSequentialRead ( true )
2024-02-01 18:09:03 +01:00
2023-03-26 00:33:09 +01:00
// Collect local stats in order to improve performance on systems with big number of CPU cores.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966
r . SetUseLocalStats ( )
2024-02-01 18:09:03 +01:00
2020-01-30 14:03:24 +01:00
tbf . r = r
2024-02-01 18:09:03 +01:00
tbf . f = nil
2019-09-03 11:27:21 +02:00
return nil
2019-05-22 23:16:55 +02:00
}
func ( tbf * tmpBlocksFile ) MustReadBlockAt ( dst * storage . Block , addr tmpBlockAddr ) {
var buf [ ] byte
2024-02-01 18:09:03 +01:00
if tbf . r == nil {
2019-05-22 23:16:55 +02:00
buf = tbf . buf [ addr . offset : addr . offset + uint64 ( addr . size ) ]
} else {
bb := tmpBufPool . Get ( )
defer tmpBufPool . Put ( bb )
2022-01-31 23:18:39 +01:00
bb . B = bytesutil . ResizeNoCopyMayOverallocate ( bb . B , addr . size )
2020-01-30 14:03:24 +01:00
tbf . r . MustReadAt ( bb . B , int64 ( addr . offset ) )
2019-05-22 23:16:55 +02:00
buf = bb . B
}
tail , err := storage . UnmarshalBlock ( dst , buf )
if err != nil {
logger . Panicf ( "FATAL: cannot unmarshal data at %s: %s" , addr , err )
}
if len ( tail ) > 0 {
logger . Panicf ( "FATAL: unexpected non-empty tail left after unmarshaling data at %s; len(tail)=%d" , addr , len ( tail ) )
}
}
var tmpBufPool bytesutil . ByteBufferPool
func ( tbf * tmpBlocksFile ) MustClose ( ) {
2024-02-01 18:09:03 +01:00
if tbf . f != nil {
// tbf.f could be non-nil if Finalize wasn't called.
// In this case tbf.r must be nil.
if tbf . r != nil {
logger . Panicf ( "BUG: tbf.r must be nil when tbf.f!=nil" )
}
// Try removing the file before closing it in order to prevent from flushing the in-memory data
// from page cache to the disk and save disk write IO. This may fail on non-posix systems such as Windows.
// Gracefully handle this case by attempting to remove the file after closing it.
fname := tbf . f . Name ( )
errRemove := os . Remove ( fname )
if err := tbf . f . Close ( ) ; err != nil {
logger . Panicf ( "FATAL: cannot close %q: %s" , fname , err )
}
if errRemove != nil {
if err := os . Remove ( fname ) ; err != nil {
logger . Panicf ( "FATAL: cannot remove %q: %s" , fname , err )
}
}
tbf . f = nil
2019-05-22 23:16:55 +02:00
return
}
2024-02-01 18:09:03 +01:00
if tbf . r == nil {
// Nothing to do
return
2019-05-22 23:16:55 +02:00
}
2024-02-01 18:09:03 +01:00
// Try removing the file before closing it in order to prevent from flushing the in-memory data
// from page cache to the disk and save disk write IO. This may fail on non-posix systems such as Windows.
// Gracefully handle this case by attempting to remove the file after closing it.
fname := tbf . r . Path ( )
errRemove := os . Remove ( fname )
tbf . r . MustClose ( )
if errRemove != nil {
if err := os . Remove ( fname ) ; err != nil {
logger . Panicf ( "FATAL: cannot remove %q: %s" , fname , err )
}
2023-03-28 03:10:15 +02:00
}
2024-02-01 18:09:03 +01:00
tbf . r = nil
2019-05-22 23:16:55 +02:00
}