diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go index b68fc6de44..ccf68259eb 100644 --- a/app/vmselect/netstorage/tmp_blocks_file.go +++ b/app/vmselect/netstorage/tmp_blocks_file.go @@ -1,7 +1,6 @@ package netstorage import ( - "bufio" "fmt" "io/ioutil" "os" @@ -10,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" ) @@ -30,13 +30,23 @@ func InitTmpBlocksDir(tmpDirPath string) { var tmpBlocksDir string -const maxInmemoryTmpBlocksFile = 512 * 1024 +func maxInmemoryTmpBlocksFile() int { + mem := memory.Allowed() + maxLen := mem / 1024 + if maxLen < 64*1024 { + return 64 * 1024 + } + return maxLen +} + +var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 { + return float64(maxInmemoryTmpBlocksFile()) +}) type tmpBlocksFile struct { buf []byte - f *os.File - bw *bufio.Writer + f *os.File offset uint64 } @@ -44,7 +54,9 @@ type tmpBlocksFile struct { func getTmpBlocksFile() *tmpBlocksFile { v := tmpBlocksFilePool.Get() if v == nil { - return &tmpBlocksFile{} + return &tmpBlocksFile{ + buf: make([]byte, 0, maxInmemoryTmpBlocksFile()), + } } return v.(*tmpBlocksFile) } @@ -53,7 +65,6 @@ func putTmpBlocksFile(tbf *tmpBlocksFile) { tbf.MustClose() tbf.buf = tbf.buf[:0] tbf.f = nil - tbf.bw = nil tbf.offset = 0 tmpBlocksFilePool.Put(tbf) } @@ -69,22 +80,6 @@ func (addr tmpBlockAddr) String() string { return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size) } -func getBufioWriter(f *os.File) *bufio.Writer { - v := bufioWriterPool.Get() - if v == nil { - return bufio.NewWriterSize(f, maxInmemoryTmpBlocksFile*2) - } - bw := v.(*bufio.Writer) - bw.Reset(f) - return bw -} - -func putBufioWriter(bw *bufio.Writer) { - bufioWriterPool.Put(bw) -} - -var bufioWriterPool sync.Pool - var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`) // WriteBlock writes b to tbf. @@ -92,28 +87,31 @@ var tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_tota // It returns errors since the operation may fail on space shortage // and this must be handled. func (tbf *tmpBlocksFile) WriteBlock(b *storage.Block) (tmpBlockAddr, error) { + bb := tmpBufPool.Get() + defer tmpBufPool.Put(bb) + bb.B = storage.MarshalBlock(bb.B[:0], b) + var addr tmpBlockAddr addr.offset = tbf.offset - - tbfBufLen := len(tbf.buf) - tbf.buf = storage.MarshalBlock(tbf.buf, b) - addr.size = len(tbf.buf) - tbfBufLen + addr.size = len(bb.B) tbf.offset += uint64(addr.size) - if tbf.offset <= maxInmemoryTmpBlocksFile { + if len(tbf.buf)+len(bb.B) <= cap(tbf.buf) { + // Fast path - the data fits tbf.buf + tbf.buf = append(tbf.buf, bb.B...) return addr, nil } + // Slow path: flush the data from tbf.buf to file. if tbf.f == nil { f, err := ioutil.TempFile(tmpBlocksDir, "") if err != nil { return addr, err } tbf.f = f - tbf.bw = getBufioWriter(f) tmpBlocksFilesCreated.Inc() } - _, err := tbf.bw.Write(tbf.buf) - tbf.buf = tbf.buf[:0] + _, err := tbf.f.Write(tbf.buf) + tbf.buf = append(tbf.buf[:0], bb.B...) if err != nil { return addr, fmt.Errorf("cannot write block to %q: %s", tbf.f.Name(), err) } @@ -124,15 +122,15 @@ func (tbf *tmpBlocksFile) Finalize() error { if tbf.f == nil { return nil } - - err := tbf.bw.Flush() - putBufioWriter(tbf.bw) - tbf.bw = nil + if _, err := tbf.f.Write(tbf.buf); err != nil { + return fmt.Errorf("cannot flush the remaining %d bytes to tmpBlocksFile: %s", len(tbf.buf), err) + } + tbf.buf = tbf.buf[:0] if _, err := tbf.f.Seek(0, 0); err != nil { logger.Panicf("FATAL: cannot seek to the start of file: %s", err) } mustFadviseRandomRead(tbf.f) - return err + return nil } func (tbf *tmpBlocksFile) MustReadBlockAt(dst *storage.Block, addr tmpBlockAddr) { @@ -167,10 +165,6 @@ func (tbf *tmpBlocksFile) MustClose() { if tbf.f == nil { return } - if tbf.bw != nil { - putBufioWriter(tbf.bw) - tbf.bw = nil - } fname := tbf.f.Name() // Remove the file at first, then close it. diff --git a/app/vmselect/netstorage/tmp_blocks_file_test.go b/app/vmselect/netstorage/tmp_blocks_file_test.go index f31f1952c6..586ce80f8b 100644 --- a/app/vmselect/netstorage/tmp_blocks_file_test.go +++ b/app/vmselect/netstorage/tmp_blocks_file_test.go @@ -30,7 +30,7 @@ func TestTmpBlocksFileSerial(t *testing.T) { } func TestTmpBlocksFileConcurrent(t *testing.T) { - concurrency := 4 + concurrency := 3 ch := make(chan error, concurrency) for i := 0; i < concurrency; i++ { go func() { @@ -69,7 +69,7 @@ func testTmpBlocksFile() error { _, _, _ = b.MarshalData(0, 0) return &b } - for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile / 2, 2 * maxInmemoryTmpBlocksFile} { + for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile() / 2, 2 * maxInmemoryTmpBlocksFile()} { err := func() error { tbf := getTmpBlocksFile() defer putTmpBlocksFile(tbf) @@ -94,7 +94,7 @@ func testTmpBlocksFile() error { } // Read blocks in parallel and verify them - concurrency := 3 + concurrency := 2 workCh := make(chan int) doneCh := make(chan error) for i := 0; i < concurrency; i++ {