From 4b688fffee3d9d7dde64f631082bc59f07b8da9b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 25 Aug 2019 14:39:39 +0300 Subject: [PATCH] lib/storage: calculate the maximum number of rows per small part from `-memory.allowedPercent` This should improve query speed over recent data on machines with big amounts of RAM --- lib/memory/memory.go | 44 ++++++++++++++++++++++++++-------------- lib/storage/part.go | 2 +- lib/storage/partition.go | 23 +++++++++++---------- 3 files changed, 42 insertions(+), 27 deletions(-) diff --git a/lib/memory/memory.go b/lib/memory/memory.go index 9ee001a4d..53325b1b0 100644 --- a/lib/memory/memory.go +++ b/lib/memory/memory.go @@ -10,27 +10,41 @@ import ( var allowedMemPercent = flag.Float64("memory.allowedPercent", 60, "Allowed percent of system memory VictoriaMetrics caches may occupy") -var allowedMemory int +var ( + allowedMemory int + remainingMemory int +) var once sync.Once +func initOnce() { + if !flag.Parsed() { + // Do not use logger.Panicf here, since logger may be uninitialized yet. + panic(fmt.Errorf("BUG: memory.Allowed must be called only after flag.Parse call")) + } + if *allowedMemPercent < 10 || *allowedMemPercent > 200 { + logger.Panicf("FATAL: -memory.allowedPercent must be in the range [10...200]; got %f", *allowedMemPercent) + } + percent := *allowedMemPercent / 100 + + mem := sysTotalMemory() + allowedMemory = int(float64(mem) * percent) + remainingMemory = mem - allowedMemory + logger.Infof("limiting caches to %d bytes, leaving %d bytes to the OS according to -memory.allowedPercent=%g", allowedMemory, remainingMemory, *allowedMemPercent) +} + // Allowed returns the amount of system memory allowed to use by the app. // // The function must be called only after flag.Parse is called. func Allowed() int { - once.Do(func() { - if !flag.Parsed() { - // Do not use logger.Panicf here, since logger may be uninitialized yet. - panic(fmt.Errorf("BUG: memory.Allowed must be called only after flag.Parse call")) - } - if *allowedMemPercent < 10 || *allowedMemPercent > 200 { - logger.Panicf("FATAL: -memory.allowedPercent must be in the range [10...200]; got %f", *allowedMemPercent) - } - percent := *allowedMemPercent / 100 - - mem := sysTotalMemory() - allowedMemory = int(float64(mem) * percent) - logger.Infof("limiting caches to %d bytes of RAM according to -memory.allowedPercent=%g", allowedMemory, *allowedMemPercent) - }) + once.Do(initOnce) return allowedMemory } + +// Remaining returns the amount of memory remaining to the OS. +// +// This function must be called only after flag.Parse is called. +func Remaining() int { + once.Do(initOnce) + return remainingMemory +} diff --git a/lib/storage/part.go b/lib/storage/part.go index 9cf7ec758..315aa196c 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -144,7 +144,7 @@ func (p *part) MustClose() { p.valuesFile.MustClose() p.indexFile.MustClose() - isBig := p.ph.RowsCount > maxRowsPerSmallPart + isBig := p.ph.RowsCount > maxRowsPerSmallPart() p.ibCache.Reset(isBig) } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 49235333f..c511c681a 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -22,15 +22,16 @@ import ( "golang.org/x/sys/unix" ) -// The maximum number of rows in a small part. -// -// This number limits the maximum size of small parts storage. -// Production simultation shows that the required size of the storage -// may be estimated as: -// -// maxRowsPerSmallPart * 2 * defaultPartsToMerge * mergeWorkers -// -const maxRowsPerSmallPart = 300e6 +func maxRowsPerSmallPart() uint64 { + // Small parts are cached in the OS page cache, + // so limit the number of rows for small part + // by the remaining free RAM. + mem := memory.Remaining() + if mem <= 0 { + return 100e6 + } + return uint64(mem) / defaultPartsToMerge +} // The maximum number of rows per big part. // @@ -885,7 +886,7 @@ func (pt *partition) mergeBigParts(isFinal bool) error { func (pt *partition) mergeSmallParts(isFinal bool) error { maxRows := maxRowsByPath(pt.smallPartsPath) - if maxRows > maxRowsPerSmallPart { + if maxRows > maxRowsPerSmallPart() { // The output part may go to big part, // so make sure it as enough space. maxBigPartRows := maxRowsByPath(pt.bigPartsPath) @@ -955,7 +956,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro for _, pw := range pws { outRowsCount += pw.p.ph.RowsCount } - isBigPart := outRowsCount > maxRowsPerSmallPart + isBigPart := outRowsCount > maxRowsPerSmallPart() nocache := isBigPart // Prepare BlockStreamWriter for destination part.