app/vmselect/netstorage: adaptively adjust the maximum inmemory file size for storing temporary blocks

The maximum inmemory file size now depends on `-memory.allowedPercent`.
This should improve performance and reduce the number of filesystem calls
on machines with big amounts of RAM when performing heavy queries
over big number of samples and time series.
This commit is contained in:
Aliaksandr Valialkin 2019-09-03 12:27:21 +03:00
parent e1d76ec1f3
commit 1575a560f0
2 changed files with 36 additions and 42 deletions

View File

@ -1,7 +1,6 @@
package netstorage
import (
"bufio"
"fmt"
"io/ioutil"
"os"
@ -10,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
@ -30,13 +30,23 @@ func InitTmpBlocksDir(tmpDirPath string) {
var tmpBlocksDir string
const maxInmemoryTmpBlocksFile = 512 * 1024
func maxInmemoryTmpBlocksFile() int {
mem := memory.Allowed()
maxLen := mem / 1024
if maxLen < 64*1024 {
return 64 * 1024
}
return maxLen
}
var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 {
return float64(maxInmemoryTmpBlocksFile())
})
type tmpBlocksFile struct {
buf []byte
f *os.File
bw *bufio.Writer
f *os.File
offset uint64
}
@ -44,7 +54,9 @@ type tmpBlocksFile struct {
func getTmpBlocksFile() *tmpBlocksFile {
v := tmpBlocksFilePool.Get()
if v == nil {
return &tmpBlocksFile{}
return &tmpBlocksFile{
buf: make([]byte, 0, maxInmemoryTmpBlocksFile()),
}
}
return v.(*tmpBlocksFile)
}
@ -53,7 +65,6 @@ func putTmpBlocksFile(tbf *tmpBlocksFile) {
tbf.MustClose()
tbf.buf = tbf.buf[:0]
tbf.f = nil
tbf.bw = nil
tbf.offset = 0
tmpBlocksFilePool.Put(tbf)
}
@ -69,22 +80,6 @@ func (addr tmpBlockAddr) String() string {
return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size)
}
func getBufioWriter(f *os.File) *bufio.Writer {
v := bufioWriterPool.Get()
if v == nil {
return bufio.NewWriterSize(f, maxInmemoryTmpBlocksFile*2)
}
bw := v.(*bufio.Writer)
bw.Reset(f)
return bw
}
func putBufioWriter(bw *bufio.Writer) {
bufioWriterPool.Put(bw)
}
var bufioWriterPool sync.Pool
var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
// WriteBlock writes b to tbf.
@ -92,28 +87,31 @@ var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_tota
// It returns errors since the operation may fail on space shortage
// and this must be handled.
func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) {
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
bb.B = storage.MarshalBlock(bb.B[:0], b)
var addr tmpBlockAddr
addr.offset = tbf.offset
tbfBufLen := len(tbf.buf)
tbf.buf = storage.MarshalBlock(tbf.buf, b)
addr.size = len(tbf.buf) - tbfBufLen
addr.size = len(bb.B)
tbf.offset += uint64(addr.size)
if tbf.offset <= maxInmemoryTmpBlocksFile {
if len(tbf.buf)+len(bb.B) <= cap(tbf.buf) {
// Fast path - the data fits tbf.buf
tbf.buf = append(tbf.buf, bb.B...)
return addr, nil
}
// Slow path: flush the data from tbf.buf to file.
if tbf.f == nil {
f, err := ioutil.TempFile(tmpBlocksDir, "")
if err != nil {
return addr, err
}
tbf.f = f
tbf.bw = getBufioWriter(f)
tmpBlocksFilesCreated.Inc()
}
_, err := tbf.bw.Write(tbf.buf)
tbf.buf = tbf.buf[:0]
_, err := tbf.f.Write(tbf.buf)
tbf.buf = append(tbf.buf[:0], bb.B...)
if err != nil {
return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err)
}
@ -124,15 +122,15 @@ func (tbf *tmpBlocksFile) Finalize() error {
if tbf.f == nil {
return nil
}
err := tbf.bw.Flush()
putBufioWriter(tbf.bw)
tbf.bw = nil
if _, err := tbf.f.Write(tbf.buf); err != nil {
return fmt.Errorf("cannot flush the remaining %d bytes to tmpBlocksFile: %s", len(tbf.buf), err)
}
tbf.buf = tbf.buf[:0]
if _, err := tbf.f.Seek(0, 0); err != nil {
logger.Panicf("FATAL: cannot seek to the start of file: %s", err)
}
mustFadviseRandomRead(tbf.f)
return err
return nil
}
func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) {
@ -167,10 +165,6 @@ func (tbf *tmpBlocksFile) MustClose() {
if tbf.f == nil {
return
}
if tbf.bw != nil {
putBufioWriter(tbf.bw)
tbf.bw = nil
}
fname := tbf.f.Name()
// Remove the file at first, then close it.

View File

@ -30,7 +30,7 @@ func TestTmpBlocksFileSerial(t *testing.T) {
}
func TestTmpBlocksFileConcurrent(t *testing.T) {
concurrency := 4
concurrency := 3
ch := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
@ -69,7 +69,7 @@ func testTmpBlocksFile() error {
_, _, _ = b.MarshalData(0, 0)
return &b
}
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile / 2, 2 * maxInmemoryTmpBlocksFile} {
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile() / 2, 2 * maxInmemoryTmpBlocksFile()} {
err := func() error {
tbf := getTmpBlocksFile()
defer putTmpBlocksFile(tbf)
@ -94,7 +94,7 @@ func testTmpBlocksFile() error {
}
// Read blocks in parallel and verify them
concurrency := 3
concurrency := 2
workCh := make(chan int)
doneCh := make(chan error)
for i := 0; i < concurrency; i++ {