app/vmselect/netstorage: adaptively adjust the maximum inmemory file size for storing temporary blocks

The maximum inmemory file size now depends on `-memory.allowedPercent`.
This should improve performance and reduce the number of filesystem calls
on machines with big amounts of RAM when performing heavy queries
over big number of samples and time series.
This commit is contained in:
Aliaksandr Valialkin 2019-09-03 12:27:21 +03:00
parent 0b0153ba3d
commit 458d412bb6
2 changed files with 36 additions and 42 deletions

View File

@ -1,7 +1,6 @@
package netstorage package netstorage
import ( import (
"bufio"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
@ -10,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -30,13 +30,23 @@ func InitTmpBlocksDir(tmpDirPath string) {
var tmpBlocksDir string var tmpBlocksDir string
const maxInmemoryTmpBlocksFile = 512 * 1024 func maxInmemoryTmpBlocksFile() int {
mem := memory.Allowed()
maxLen := mem / 1024
if maxLen < 64*1024 {
return 64 * 1024
}
return maxLen
}
var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 {
return float64(maxInmemoryTmpBlocksFile())
})
type tmpBlocksFile struct { type tmpBlocksFile struct {
buf []byte buf []byte
f *os.File f *os.File
bw *bufio.Writer
offset uint64 offset uint64
} }
@ -44,7 +54,9 @@ type tmpBlocksFile struct {
func getTmpBlocksFile() *tmpBlocksFile { func getTmpBlocksFile() *tmpBlocksFile {
v := tmpBlocksFilePool.Get() v := tmpBlocksFilePool.Get()
if v == nil { if v == nil {
return &tmpBlocksFile{} return &tmpBlocksFile{
buf: make([]byte, 0, maxInmemoryTmpBlocksFile()),
}
} }
return v.(*tmpBlocksFile) return v.(*tmpBlocksFile)
} }
@ -53,7 +65,6 @@ func putTmpBlocksFile(tbf *tmpBlocksFile) {
tbf.MustClose() tbf.MustClose()
tbf.buf = tbf.buf[:0] tbf.buf = tbf.buf[:0]
tbf.f = nil tbf.f = nil
tbf.bw = nil
tbf.offset = 0 tbf.offset = 0
tmpBlocksFilePool.Put(tbf) tmpBlocksFilePool.Put(tbf)
} }
@ -69,22 +80,6 @@ func (addr tmpBlockAddr) String() string {
return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size) return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size)
} }
func getBufioWriter(f *os.File) *bufio.Writer {
v := bufioWriterPool.Get()
if v == nil {
return bufio.NewWriterSize(f, maxInmemoryTmpBlocksFile*2)
}
bw := v.(*bufio.Writer)
bw.Reset(f)
return bw
}
func putBufioWriter(bw *bufio.Writer) {
bufioWriterPool.Put(bw)
}
var bufioWriterPool sync.Pool
var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`) var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
// WriteBlock writes b to tbf. // WriteBlock writes b to tbf.
@ -92,28 +87,31 @@ var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_tota
// It returns errors since the operation may fail on space shortage // It returns errors since the operation may fail on space shortage
// and this must be handled. // and this must be handled.
func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) { func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) {
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
bb.B = storage.MarshalBlock(bb.B[:0], b)
var addr tmpBlockAddr var addr tmpBlockAddr
addr.offset = tbf.offset addr.offset = tbf.offset
addr.size = len(bb.B)
tbfBufLen := len(tbf.buf)
tbf.buf = storage.MarshalBlock(tbf.buf, b)
addr.size = len(tbf.buf) - tbfBufLen
tbf.offset += uint64(addr.size) tbf.offset += uint64(addr.size)
if tbf.offset <= maxInmemoryTmpBlocksFile { if len(tbf.buf)+len(bb.B) <= cap(tbf.buf) {
// Fast path - the data fits tbf.buf
tbf.buf = append(tbf.buf, bb.B...)
return addr, nil return addr, nil
} }
// Slow path: flush the data from tbf.buf to file.
if tbf.f == nil { if tbf.f == nil {
f, err := ioutil.TempFile(tmpBlocksDir, "") f, err := ioutil.TempFile(tmpBlocksDir, "")
if err != nil { if err != nil {
return addr, err return addr, err
} }
tbf.f = f tbf.f = f
tbf.bw = getBufioWriter(f)
tmpBlocksFilesCreated.Inc() tmpBlocksFilesCreated.Inc()
} }
_, err := tbf.bw.Write(tbf.buf) _, err := tbf.f.Write(tbf.buf)
tbf.buf = tbf.buf[:0] tbf.buf = append(tbf.buf[:0], bb.B...)
if err != nil { if err != nil {
return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err) return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err)
} }
@ -124,15 +122,15 @@ func (tbf *tmpBlocksFile) Finalize() error {
if tbf.f == nil { if tbf.f == nil {
return nil return nil
} }
if _, err := tbf.f.Write(tbf.buf); err != nil {
err := tbf.bw.Flush() return fmt.Errorf("cannot flush the remaining %d bytes to tmpBlocksFile: %s", len(tbf.buf), err)
putBufioWriter(tbf.bw) }
tbf.bw = nil tbf.buf = tbf.buf[:0]
if _, err := tbf.f.Seek(0, 0); err != nil { if _, err := tbf.f.Seek(0, 0); err != nil {
logger.Panicf("FATAL: cannot seek to the start of file: %s", err) logger.Panicf("FATAL: cannot seek to the start of file: %s", err)
} }
mustFadviseRandomRead(tbf.f) mustFadviseRandomRead(tbf.f)
return err return nil
} }
func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) { func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) {
@ -167,10 +165,6 @@ func (tbf *tmpBlocksFile) MustClose() {
if tbf.f == nil { if tbf.f == nil {
return return
} }
if tbf.bw != nil {
putBufioWriter(tbf.bw)
tbf.bw = nil
}
fname := tbf.f.Name() fname := tbf.f.Name()
// Remove the file at first, then close it. // Remove the file at first, then close it.

View File

@ -30,7 +30,7 @@ func TestTmpBlocksFileSerial(t *testing.T) {
} }
func TestTmpBlocksFileConcurrent(t *testing.T) { func TestTmpBlocksFileConcurrent(t *testing.T) {
concurrency := 4 concurrency := 3
ch := make(chan error, concurrency) ch := make(chan error, concurrency)
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
go func() { go func() {
@ -69,7 +69,7 @@ func testTmpBlocksFile() error {
_, _, _ = b.MarshalData(0, 0) _, _, _ = b.MarshalData(0, 0)
return &b return &b
} }
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile / 2, 2 * maxInmemoryTmpBlocksFile} { for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile() / 2, 2 * maxInmemoryTmpBlocksFile()} {
err := func() error { err := func() error {
tbf := getTmpBlocksFile() tbf := getTmpBlocksFile()
defer putTmpBlocksFile(tbf) defer putTmpBlocksFile(tbf)
@ -94,7 +94,7 @@ func testTmpBlocksFile() error {
} }
// Read blocks in parallel and verify them // Read blocks in parallel and verify them
concurrency := 3 concurrency := 2
workCh := make(chan int) workCh := make(chan int)
doneCh := make(chan error) doneCh := make(chan error)
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {