app/vmstorage: add metrics for determining whether background merges need additional disk space to complete

These metrics are:

* vm_small_merge_need_free_disk_space
* vm_big_merge_need_free_disk_space

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/686
This commit is contained in:
Aliaksandr Valialkin 2020-09-29 21:47:40 +03:00
parent a4361a4c07
commit 097a4c10dd
3 changed files with 48 additions and 13 deletions

View File

@ -324,6 +324,14 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(idbm().AssistedMerges) return float64(idbm().AssistedMerges)
}) })
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/686
metrics.NewGauge(`vm_small_merge_need_free_disk_space`, func() float64 {
return float64(tm().SmallMergeNeedFreeDiskSpace)
})
metrics.NewGauge(`vm_big_merge_need_free_disk_space`, func() float64 {
return float64(tm().BigMergeNeedFreeDiskSpace)
})
metrics.NewGauge(`vm_pending_rows{type="storage"}`, func() float64 { metrics.NewGauge(`vm_pending_rows{type="storage"}`, func() float64 {
return float64(tm().PendingRows) return float64(tm().PendingRows)
}) })

View File

@ -330,6 +330,9 @@ type partitionMetrics struct {
SmallPartsRefCount uint64 SmallPartsRefCount uint64
SmallAssistedMerges uint64 SmallAssistedMerges uint64
SmallMergeNeedFreeDiskSpace uint64
BigMergeNeedFreeDiskSpace uint64
} }
// UpdateMetrics updates m with metrics from pt. // UpdateMetrics updates m with metrics from pt.
@ -388,6 +391,9 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted) m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted)
m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges) m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges)
m.SmallMergeNeedFreeDiskSpace = atomic.LoadUint64(&smallMergeNeedFreeDiskSpace)
m.BigMergeNeedFreeDiskSpace = atomic.LoadUint64(&bigMergeNeedFreeDiskSpace)
} }
// AddRows adds the given rows to the partition pt. // AddRows adds the given rows to the partition pt.
@ -990,9 +996,10 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
maxRows := maxRowsByPath(pt.bigPartsPath) maxRows := maxRowsByPath(pt.bigPartsPath)
pt.partsLock.Lock() pt.partsLock.Lock()
pws := getPartsToMerge(pt.bigParts, maxRows, isFinal) pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxRows, isFinal)
pt.partsLock.Unlock() pt.partsLock.Unlock()
atomicSetBool(&bigMergeNeedFreeDiskSpace, needFreeSpace)
return pt.mergeParts(pws, pt.stopCh) return pt.mergeParts(pws, pt.stopCh)
} }
@ -1008,14 +1015,28 @@ func (pt *partition) mergeSmallParts(isFinal bool) error {
} }
pt.partsLock.Lock() pt.partsLock.Lock()
pws := getPartsToMerge(pt.smallParts, maxRows, isFinal) pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxRows, isFinal)
pt.partsLock.Unlock() pt.partsLock.Unlock()
atomicSetBool(&smallMergeNeedFreeDiskSpace, needFreeSpace)
return pt.mergeParts(pws, pt.stopCh) return pt.mergeParts(pws, pt.stopCh)
} }
var errNothingToMerge = fmt.Errorf("nothing to merge") var errNothingToMerge = fmt.Errorf("nothing to merge")
func atomicSetBool(p *uint64, b bool) {
v := uint64(0)
if b {
v = 1
}
atomic.StoreUint64(p, v)
}
var (
smallMergeNeedFreeDiskSpace uint64
bigMergeNeedFreeDiskSpace uint64
)
// mergeParts merges pws. // mergeParts merges pws.
// //
// Merging is immediately stopped if stopCh is closed. // Merging is immediately stopped if stopCh is closed.
@ -1242,7 +1263,8 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig
// getPartsToMerge returns optimal parts to merge from pws. // getPartsToMerge returns optimal parts to merge from pws.
// //
// The returned rows will contain less than maxRows rows. // The returned rows will contain less than maxRows rows.
func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) []*partWrapper { // The function returns true if pws contains parts, which cannot be merged because of maxRows limit.
func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partWrapper, bool) {
pwsRemaining := make([]*partWrapper, 0, len(pws)) pwsRemaining := make([]*partWrapper, 0, len(pws))
for _, pw := range pws { for _, pw := range pws {
if !pw.isInMerge { if !pw.isInMerge {
@ -1251,13 +1273,14 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) []*partWr
} }
maxPartsToMerge := defaultPartsToMerge maxPartsToMerge := defaultPartsToMerge
var pms []*partWrapper var pms []*partWrapper
needFreeSpace := false
if isFinal { if isFinal {
for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge { for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge {
pms = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows) pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows)
maxPartsToMerge-- maxPartsToMerge--
} }
} else { } else {
pms = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows) pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows)
} }
for _, pw := range pms { for _, pw := range pms {
if pw.isInMerge { if pw.isInMerge {
@ -1265,15 +1288,16 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) []*partWr
} }
pw.isInMerge = true pw.isInMerge = true
} }
return pms return pms, needFreeSpace
} }
// appendPartsToMerge finds optimal parts to merge from src, appends // appendPartsToMerge finds optimal parts to merge from src, appends
// them to dst and returns the result. // them to dst and returns the result.
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows uint64) []*partWrapper { // The function returns true if src contains parts, which cannot be merged because of maxRows limit.
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows uint64) ([]*partWrapper, bool) {
if len(src) < 2 { if len(src) < 2 {
// There is no need in merging zero or one part :) // There is no need in merging zero or one part :)
return dst return dst, false
} }
if maxPartsToMerge < 2 { if maxPartsToMerge < 2 {
logger.Panicf("BUG: maxPartsToMerge cannot be smaller than 2; got %d", maxPartsToMerge) logger.Panicf("BUG: maxPartsToMerge cannot be smaller than 2; got %d", maxPartsToMerge)
@ -1281,10 +1305,12 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
// Filter out too big parts. // Filter out too big parts.
// This should reduce N for O(N^2) algorithm below. // This should reduce N for O(N^2) algorithm below.
needFreeSpace := false
maxInPartRows := maxRows / 2 maxInPartRows := maxRows / 2
tmp := make([]*partWrapper, 0, len(src)) tmp := make([]*partWrapper, 0, len(src))
for _, pw := range src { for _, pw := range src {
if pw.p.ph.RowsCount > maxInPartRows { if pw.p.ph.RowsCount > maxInPartRows {
needFreeSpace = true
continue continue
} }
tmp = append(tmp, pw) tmp = append(tmp, pw)
@ -1320,6 +1346,7 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
} }
if rowsSum > maxRows { if rowsSum > maxRows {
// There is no need in verifying remaining parts with higher number of rows // There is no need in verifying remaining parts with higher number of rows
needFreeSpace = true
break break
} }
m := float64(rowsSum) / float64(a[len(a)-1].p.ph.RowsCount) m := float64(rowsSum) / float64(a[len(a)-1].p.ph.RowsCount)
@ -1337,9 +1364,9 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
} }
if maxM < minM { if maxM < minM {
// There is no sense in merging parts with too small m. // There is no sense in merging parts with too small m.
return dst return dst, needFreeSpace
} }
return append(dst, pws...) return append(dst, pws...), needFreeSpace
} }
func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) { func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {

View File

@ -49,7 +49,7 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
iterationsCount := 0 iterationsCount := 0
rowsMerged := uint64(0) rowsMerged := uint64(0)
for { for {
pms := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows) pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows)
if len(pms) == 0 { if len(pms) == 0 {
break break
} }
@ -99,7 +99,7 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
pws := newTestPartWrappersForRowsCount(initialRowsCount) pws := newTestPartWrappersForRowsCount(initialRowsCount)
// Verify appending to nil. // Verify appending to nil.
pms := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9) pms, _ := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9)
rowsCount := newTestRowsCountFromPartWrappers(pms) rowsCount := newTestRowsCountFromPartWrappers(pms)
if !reflect.DeepEqual(rowsCount, expectedRowsCount) { if !reflect.DeepEqual(rowsCount, expectedRowsCount) {
t.Fatalf("unexpected rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d", t.Fatalf("unexpected rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d",
@ -118,7 +118,7 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
{}, {},
{}, {},
} }
pms = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9) pms, _ = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9)
if !reflect.DeepEqual(pms[:len(prefix)], prefix) { if !reflect.DeepEqual(pms[:len(prefix)], prefix) {
t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialRowsCount=%d; got\n%+v; want\n%+v", t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialRowsCount=%d; got\n%+v; want\n%+v",
maxPartsToMerge, initialRowsCount, pms[:len(prefix)], prefix) maxPartsToMerge, initialRowsCount, pms[:len(prefix)], prefix)