mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 23:39:48 +01:00
189 lines
4.4 KiB
Go
189 lines
4.4 KiB
Go
|
package netstorage
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"fmt"
|
||
|
"io/ioutil"
|
||
|
"os"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||
|
"github.com/VictoriaMetrics/metrics"
|
||
|
)
|
||
|
|
||
|
// InitTmpBlocksDir initializes directory to store temporary search results.
|
||
|
//
|
||
|
// It stores data in system-defined temporary directory if tmpDirPath is empty.
|
||
|
func InitTmpBlocksDir(tmpDirPath string) {
|
||
|
if len(tmpDirPath) == 0 {
|
||
|
tmpDirPath = os.TempDir()
|
||
|
}
|
||
|
tmpBlocksDir = tmpDirPath + "/searchResults"
|
||
|
if err := os.RemoveAll(tmpBlocksDir); err != nil {
|
||
|
logger.Panicf("FATAL: cannot remove %q: %s", tmpBlocksDir, err)
|
||
|
}
|
||
|
if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil {
|
||
|
logger.Panicf("FATAL: cannot create %q: %s", tmpBlocksDir, err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var tmpBlocksDir string
|
||
|
|
||
|
const maxInmemoryTmpBlocksFile = 512 * 1024
|
||
|
|
||
|
type tmpBlocksFile struct {
|
||
|
buf []byte
|
||
|
|
||
|
f *os.File
|
||
|
bw *bufio.Writer
|
||
|
|
||
|
offset uint64
|
||
|
}
|
||
|
|
||
|
func getTmpBlocksFile() *tmpBlocksFile {
|
||
|
v := tmpBlocksFilePool.Get()
|
||
|
if v == nil {
|
||
|
return &tmpBlocksFile{}
|
||
|
}
|
||
|
return v.(*tmpBlocksFile)
|
||
|
}
|
||
|
|
||
|
func putTmpBlocksFile(tbf *tmpBlocksFile) {
|
||
|
tbf.MustClose()
|
||
|
tbf.buf = tbf.buf[:0]
|
||
|
tbf.f = nil
|
||
|
tbf.bw = nil
|
||
|
tbf.offset = 0
|
||
|
tmpBlocksFilePool.Put(tbf)
|
||
|
}
|
||
|
|
||
|
var tmpBlocksFilePool sync.Pool
|
||
|
|
||
|
type tmpBlockAddr struct {
|
||
|
offset uint64
|
||
|
size int
|
||
|
}
|
||
|
|
||
|
func (addr tmpBlockAddr) String() string {
|
||
|
return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size)
|
||
|
}
|
||
|
|
||
|
func getBufioWriter(f *os.File) *bufio.Writer {
|
||
|
v := bufioWriterPool.Get()
|
||
|
if v == nil {
|
||
|
return bufio.NewWriterSize(f, maxInmemoryTmpBlocksFile*2)
|
||
|
}
|
||
|
bw := v.(*bufio.Writer)
|
||
|
bw.Reset(f)
|
||
|
return bw
|
||
|
}
|
||
|
|
||
|
func putBufioWriter(bw *bufio.Writer) {
|
||
|
bufioWriterPool.Put(bw)
|
||
|
}
|
||
|
|
||
|
var bufioWriterPool sync.Pool
|
||
|
|
||
|
var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
|
||
|
|
||
|
// WriteBlock writes b to tbf.
|
||
|
//
|
||
|
// It returns errors since the operation may fail on space shortage
|
||
|
// and this must be handled.
|
||
|
func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) {
|
||
|
var addr tmpBlockAddr
|
||
|
addr.offset = tbf.offset
|
||
|
|
||
|
tbfBufLen := len(tbf.buf)
|
||
|
tbf.buf = storage.MarshalBlock(tbf.buf, b)
|
||
|
addr.size = len(tbf.buf) - tbfBufLen
|
||
|
tbf.offset += uint64(addr.size)
|
||
|
if tbf.offset <= maxInmemoryTmpBlocksFile {
|
||
|
return addr, nil
|
||
|
}
|
||
|
|
||
|
if tbf.f == nil {
|
||
|
f, err := ioutil.TempFile(tmpBlocksDir, "")
|
||
|
if err != nil {
|
||
|
return addr, err
|
||
|
}
|
||
|
tbf.f = f
|
||
|
tbf.bw = getBufioWriter(f)
|
||
|
tmpBlocksFilesCreated.Inc()
|
||
|
}
|
||
|
_, err := tbf.bw.Write(tbf.buf)
|
||
|
tbf.buf = tbf.buf[:0]
|
||
|
if err != nil {
|
||
|
return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err)
|
||
|
}
|
||
|
return addr, nil
|
||
|
}
|
||
|
|
||
|
func (tbf *tmpBlocksFile) Finalize() error {
|
||
|
if tbf.f == nil {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
err := tbf.bw.Flush()
|
||
|
putBufioWriter(tbf.bw)
|
||
|
tbf.bw = nil
|
||
|
if _, err := tbf.f.Seek(0, 0); err != nil {
|
||
|
logger.Panicf("FATAL: cannot seek to the start of file: %s", err)
|
||
|
}
|
||
|
mustFadviseRandomRead(tbf.f)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) {
|
||
|
var buf []byte
|
||
|
if tbf.f == nil {
|
||
|
buf = tbf.buf[addr.offset : addr.offset+uint64(addr.size)]
|
||
|
} else {
|
||
|
bb := tmpBufPool.Get()
|
||
|
defer tmpBufPool.Put(bb)
|
||
|
bb.B = bytesutil.Resize(bb.B, addr.size)
|
||
|
n, err := tbf.f.ReadAt(bb.B, int64(addr.offset))
|
||
|
if err != nil {
|
||
|
logger.Panicf("FATAL: cannot read from %q at %s: %s", tbf.f.Name(), addr, err)
|
||
|
}
|
||
|
if n != len(bb.B) {
|
||
|
logger.Panicf("FATAL: too short number of bytes read at %s; got %d; want %d", addr, n, len(bb.B))
|
||
|
}
|
||
|
buf = bb.B
|
||
|
}
|
||
|
tail, err := storage.UnmarshalBlock(dst, buf)
|
||
|
if err != nil {
|
||
|
logger.Panicf("FATAL: cannot unmarshal data at %s: %s", addr, err)
|
||
|
}
|
||
|
if len(tail) > 0 {
|
||
|
logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling data at %s; len(tail)=%d", addr, len(tail))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var tmpBufPool bytesutil.ByteBufferPool
|
||
|
|
||
|
func (tbf *tmpBlocksFile) MustClose() {
|
||
|
if tbf.f == nil {
|
||
|
return
|
||
|
}
|
||
|
if tbf.bw != nil {
|
||
|
putBufioWriter(tbf.bw)
|
||
|
tbf.bw = nil
|
||
|
}
|
||
|
fname := tbf.f.Name()
|
||
|
|
||
|
// Remove the file at first, then close it.
|
||
|
// This way the OS shouldn't try to flush file contents to storage
|
||
|
// on close.
|
||
|
if err := os.Remove(fname); err != nil {
|
||
|
logger.Panicf("FATAL: cannot remove %q: %s", fname, err)
|
||
|
}
|
||
|
if err := tbf.f.Close(); err != nil {
|
||
|
logger.Panicf("FATAL: cannot close %q: %s", fname, err)
|
||
|
}
|
||
|
tbf.f = nil
|
||
|
}
|