diff --git a/README.md b/README.md index b231fb955..cd4a6c348 100644 --- a/README.md +++ b/README.md @@ -1023,6 +1023,9 @@ VictoriaMetrics also exposes currently running queries with their execution time Another option is to increase `-memory.allowedPercent` command-line flag value. Be careful with this option, since too big value for `-memory.allowedPercent` may result in high I/O usage. +* VictoriaMetrics prioritizes data ingestion over data querying. So if it has no enough resources for data ingestion, + then data querying may slow down significantly. + * VictoriaMetrics requires free disk space for [merging data files to bigger ones](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). It may slow down when there is no enough free space left. So make sure `-storageDataPath` directory has at least 20% of free space comparing to disk size. The remaining amount of free space diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 0400f1a58..5a078951f 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -432,6 +432,9 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_search_delays_total`, func() float64 { return float64(m().SearchDelays) }) + metrics.NewGauge(`vm_big_merges_delays_total`, func() float64 { + return float64(tm().BigMergesDelays) + }) metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 { return float64(m().SlowRowInserts) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index b231fb955..cd4a6c348 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -1023,6 +1023,9 @@ VictoriaMetrics also exposes currently running queries with their execution time Another option is to increase `-memory.allowedPercent` command-line flag value. Be careful with this option, since too big value for `-memory.allowedPercent` may result in high I/O usage. +* VictoriaMetrics prioritizes data ingestion over data querying. So if it has no enough resources for data ingestion, + then data querying may slow down significantly. + * VictoriaMetrics requires free disk space for [merging data files to bigger ones](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). It may slow down when there is no enough free space left. So make sure `-storageDataPath` directory has at least 20% of free space comparing to disk size. The remaining amount of free space diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index afad1ad53..4b3f36fc6 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" ) // PrepareBlockCallback can transform the passed items allocated at the given data. @@ -27,9 +28,10 @@ type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte) // The function immediately returns when stopCh is closed. // // It also atomically adds the number of items merged to itemsMerged. -func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{}, itemsMerged *uint64) error { +func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{}, + pl *pacelimiter.PaceLimiter, itemsMerged *uint64) error { bsm := bsmPool.Get().(*blockStreamMerger) - if err := bsm.Init(bsrs, prepareBlock); err != nil { + if err := bsm.Init(bsrs, prepareBlock, pl); err != nil { return fmt.Errorf("cannot initialize blockStreamMerger: %w", err) } err := bsm.Merge(bsw, ph, stopCh, itemsMerged) @@ -61,6 +63,9 @@ type blockStreamMerger struct { phFirstItemCaught bool + // optional pace limiter for merge process. + pl *pacelimiter.PaceLimiter + // This are auxiliary buffers used in flushIB // for consistency checks after prepareBlock call. firstItem []byte @@ -77,11 +82,13 @@ func (bsm *blockStreamMerger) reset() { bsm.ib.Reset() bsm.phFirstItemCaught = false + bsm.pl = nil } -func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback) error { +func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, pl *pacelimiter.PaceLimiter) error { bsm.reset() bsm.prepareBlock = prepareBlock + bsm.pl = pl for _, bsr := range bsrs { if bsr.Next() { bsm.bsrHeap = append(bsm.bsrHeap, bsr) @@ -104,6 +111,9 @@ var errForciblyStopped = fmt.Errorf("forcibly stopped") func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error { again: + if bsm.pl != nil { + bsm.pl.WaitIfNeeded() + } if len(bsm.bsrHeap) == 0 { // Write the last (maybe incomplete) inmemoryBlock to bsw. bsm.flushIB(bsw, ph, itemsMerged) diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index a4d7cc058..971fcc298 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -7,6 +7,8 @@ import ( "sort" "testing" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" ) func TestMergeBlockStreams(t *testing.T) { @@ -30,14 +32,14 @@ func TestMultilevelMerge(t *testing.T) { var dstIP1 inmemoryPart var bsw1 blockStreamWriter bsw1.InitFromInmemoryPart(&dstIP1) - if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 1: %s", err) } var dstIP2 inmemoryPart var bsw2 blockStreamWriter bsw2.InitFromInmemoryPart(&dstIP2) - if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil { t.Fatalf("cannot merge first level part 2: %s", err) } @@ -54,7 +56,7 @@ func TestMultilevelMerge(t *testing.T) { newTestBlockStreamReader(&dstIP2), } bsw.InitFromInmemoryPart(&dstIP) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil { t.Fatalf("cannot merge second level: %s", err) } if itemsMerged != uint64(len(items)) { @@ -76,7 +78,7 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var itemsMerged uint64 close(ch) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); err != errForciblyStopped { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, nil, &itemsMerged); err != errForciblyStopped { t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped) } if itemsMerged != 0 { @@ -120,7 +122,7 @@ func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error { var dstIP inmemoryPart var bsw blockStreamWriter bsw.InitFromInmemoryPart(&dstIP) - if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, storagepacelimiter.BigMerges, &itemsMerged); err != nil { return fmt.Errorf("cannot merge block streams: %w", err) } if itemsMerged != uint64(len(items)) { diff --git a/lib/mergeset/part_search_test.go b/lib/mergeset/part_search_test.go index 37b87f13d..9a3d7f9d9 100644 --- a/lib/mergeset/part_search_test.go +++ b/lib/mergeset/part_search_test.go @@ -150,7 +150,7 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) { var ip inmemoryPart var bsw blockStreamWriter bsw.InitFromInmemoryPart(&ip) - if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { + if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, nil, &itemsMerged); err != nil { return nil, nil, fmt.Errorf("cannot merge blocks: %w", err) } if itemsMerged != uint64(len(items)) { diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 6dc189caa..4f5d25374 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -17,6 +17,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" ) @@ -566,7 +567,11 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) { } // The added part exceeds maxParts count. Assist with merging other parts. + // + // Prioritize assisted merges over searches. + storagepacelimiter.Search.Inc() err := tb.mergeExistingParts(false) + storagepacelimiter.Search.Dec() if err == nil { atomic.AddUint64(&tb.assistedMerges, 1) continue @@ -630,7 +635,12 @@ func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrappe // Merge parts. // The merge shouldn't be interrupted by stopCh, // since it may be final after stopCh is closed. - if err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged); err != nil { + // + // Prioritize merging of inmemory blocks over merging file parts. + storagepacelimiter.BigMerges.Inc() + err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, nil, &tb.itemsMerged) + storagepacelimiter.BigMerges.Dec() + if err != nil { logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err) } putBlockStreamWriter(bsw) @@ -791,7 +801,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP // Merge parts into a temporary location. var ph partHeader - err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, &tb.itemsMerged) + err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, storagepacelimiter.BigMerges, &tb.itemsMerged) putBlockStreamWriter(bsw) if err != nil { if err == errForciblyStopped { diff --git a/lib/pacelimiter/pacelimiter.go b/lib/pacelimiter/pacelimiter.go new file mode 100644 index 000000000..1d5714041 --- /dev/null +++ b/lib/pacelimiter/pacelimiter.go @@ -0,0 +1,62 @@ +package pacelimiter + +import ( + "sync" +) + +// PaceLimiter throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls. +// +// It is expected that Inc is called before performing high-priority work, +// while Dec is called when the work is done. +// WaitIfNeeded must be called inside the work which must be throttled (i.e. lower-priority work). +// It may be called in the loop before performing a part of low-priority work. +type PaceLimiter struct { + mu sync.Mutex + cond *sync.Cond + delaysTotal uint64 + n int +} + +// New returns pace limiter that throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls. +func New() *PaceLimiter { + var pl PaceLimiter + pl.cond = sync.NewCond(&pl.mu) + return &pl +} + +// Inc increments pl. +func (pl *PaceLimiter) Inc() { + pl.mu.Lock() + pl.n++ + pl.mu.Unlock() +} + +// Dec decrements pl. +func (pl *PaceLimiter) Dec() { + pl.mu.Lock() + pl.n-- + if pl.n == 0 { + // Wake up all the goroutines blocked in WaitIfNeeded, + // since the number of Dec calls equals the number of Inc calls. + pl.cond.Broadcast() + } + pl.mu.Unlock() +} + +// WaitIfNeeded blocks while the number of Inc calls is bigger than the number of Dec calls. +func (pl *PaceLimiter) WaitIfNeeded() { + pl.mu.Lock() + for pl.n > 0 { + pl.delaysTotal++ + pl.cond.Wait() + } + pl.mu.Unlock() +} + +// DelaysTotal returns the number of delays inside WaitIfNeeded. +func (pl *PaceLimiter) DelaysTotal() uint64 { + pl.mu.Lock() + n := pl.delaysTotal + pl.mu.Unlock() + return n +} diff --git a/lib/pacelimiter/pacelimiter_test.go b/lib/pacelimiter/pacelimiter_test.go new file mode 100644 index 000000000..2b136e3b9 --- /dev/null +++ b/lib/pacelimiter/pacelimiter_test.go @@ -0,0 +1,106 @@ +package pacelimiter + +import ( + "runtime" + "sync" + "testing" + "time" +) + +func TestPacelimiter(t *testing.T) { + t.Run("nonblocking", func(t *testing.T) { + pl := New() + ch := make(chan struct{}, 10) + for i := 0; i < cap(ch); i++ { + go func() { + for j := 0; j < 10; j++ { + pl.WaitIfNeeded() + runtime.Gosched() + } + ch <- struct{}{} + }() + } + + // Check that all the goroutines are finished. + timeoutCh := time.After(5 * time.Second) + for i := 0; i < cap(ch); i++ { + select { + case <-ch: + case <-timeoutCh: + t.Fatalf("timeout") + } + } + if n := pl.DelaysTotal(); n > 0 { + t.Fatalf("unexpected non-zero number of delays: %d", n) + } + }) + t.Run("blocking", func(t *testing.T) { + pl := New() + pl.Inc() + ch := make(chan struct{}, 10) + var wg sync.WaitGroup + for i := 0; i < cap(ch); i++ { + wg.Add(1) + go func() { + wg.Done() + for j := 0; j < 10; j++ { + pl.WaitIfNeeded() + } + ch <- struct{}{} + }() + } + + // Check that all the goroutines created above are started and blocked in WaitIfNeeded + wg.Wait() + select { + case <-ch: + t.Fatalf("the pl must be blocked") + default: + } + + // Unblock goroutines and check that they are unblocked. + pl.Dec() + timeoutCh := time.After(5 * time.Second) + for i := 0; i < cap(ch); i++ { + select { + case <-ch: + case <-timeoutCh: + t.Fatalf("timeout") + } + } + if n := pl.DelaysTotal(); n == 0 { + t.Fatalf("expecting non-zero number of delays") + } + // Verify that the pl is unblocked now. + pl.WaitIfNeeded() + }) + t.Run("concurrent_inc_dec", func(t *testing.T) { + pl := New() + ch := make(chan struct{}, 10) + for i := 0; i < cap(ch); i++ { + go func() { + for j := 0; j < 10; j++ { + pl.Inc() + runtime.Gosched() + pl.Dec() + } + ch <- struct{}{} + }() + } + + // Verify that all the goroutines are finished + timeoutCh := time.After(5 * time.Second) + for i := 0; i < cap(ch); i++ { + select { + case <-ch: + case <-timeoutCh: + t.Fatalf("timeout") + } + } + // Verify that the pl is unblocked. + pl.WaitIfNeeded() + if n := pl.DelaysTotal(); n > 0 { + t.Fatalf("expecting zer number of delays; got %d", n) + } + }) +} diff --git a/lib/storage/block_stream_merger.go b/lib/storage/block_stream_merger.go index e41301e70..436d12363 100644 --- a/lib/storage/block_stream_merger.go +++ b/lib/storage/block_stream_merger.go @@ -4,6 +4,8 @@ import ( "container/heap" "fmt" "io" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" ) // blockStreamMerger is used for merging block streams. @@ -16,6 +18,9 @@ type blockStreamMerger struct { // Whether the call to NextBlock must be no-op. nextBlockNoop bool + // Optional pace limiter for limiting the pace for NextBlock calls. + pl *pacelimiter.PaceLimiter + // The last error err error } @@ -27,11 +32,14 @@ func (bsm *blockStreamMerger) reset() { } bsm.bsrHeap = bsm.bsrHeap[:0] bsm.nextBlockNoop = false + bsm.pl = nil bsm.err = nil } // Init initializes bsm with the given bsrs. -func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) { +// +// pl is an optional pace limiter, which allows limiting the pace for NextBlock calls. +func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, pl *pacelimiter.PaceLimiter) { bsm.reset() for _, bsr := range bsrs { if bsr.NextBlock() { @@ -52,6 +60,7 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) { heap.Init(&bsm.bsrHeap) bsm.Block = &bsm.bsrHeap[0].Block bsm.nextBlockNoop = true + bsm.pl = pl } // NextBlock stores the next block in bsm.Block. @@ -66,6 +75,9 @@ func (bsm *blockStreamMerger) NextBlock() bool { bsm.nextBlockNoop = false return true } + if bsm.pl != nil { + bsm.pl.WaitIfNeeded() + } bsm.err = bsm.nextBlock() switch bsm.err { diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 1420e5692..7c4a43508 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -6,6 +6,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) @@ -15,12 +16,12 @@ import ( // // rowsMerged is atomically updated with the number of merged rows during the merge. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, - deletedMetricIDs *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { + pl *pacelimiter.PaceLimiter, dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { ph.Reset() bsm := bsmPool.Get().(*blockStreamMerger) - bsm.Init(bsrs) - err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, deletedMetricIDs, rowsMerged, rowsDeleted) + bsm.Init(bsrs, pl) + err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted) bsm.reset() bsmPool.Put(bsm) bsw.MustClose() @@ -42,7 +43,7 @@ var bsmPool = &sync.Pool{ var errForciblyStopped = fmt.Errorf("forcibly stopped") func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, - deletedMetricIDs *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { + dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { // Search for the first block to merge var pendingBlock *Block for bsm.NextBlock() { @@ -51,7 +52,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc return errForciblyStopped default: } - if deletedMetricIDs.Has(bsm.Block.bh.TSID.MetricID) { + if dmis.Has(bsm.Block.bh.TSID.MetricID) { // Skip blocks for deleted metrics. *rowsDeleted += uint64(bsm.Block.bh.RowsCount) continue @@ -73,7 +74,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc return errForciblyStopped default: } - if deletedMetricIDs.Has(bsm.Block.bh.TSID.MetricID) { + if dmis.Has(bsm.Block.bh.TSID.MetricID) { // Skip blocks for deleted metrics. *rowsDeleted += uint64(bsm.Block.bh.RowsCount) continue diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index 8b4e5380d..b76bb09d4 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -3,6 +3,8 @@ package storage import ( "math/rand" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" ) func TestMergeBlockStreamsOneStreamOneRow(t *testing.T) { @@ -364,7 +366,7 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var rowsMerged, rowsDeleted uint64 close(ch) - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped { t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped) } if rowsMerged != 0 { @@ -384,7 +386,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc bsw.InitFromInmemoryPart(&mp) var rowsMerged, rowsDeleted uint64 - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, storagepacelimiter.BigMerges, nil, &rowsMerged, &rowsDeleted); err != nil { t.Fatalf("unexpected error in mergeBlockStreams: %s", err) } diff --git a/lib/storage/merge_timing_test.go b/lib/storage/merge_timing_test.go index 73a03ea29..fe426b60c 100644 --- a/lib/storage/merge_timing_test.go +++ b/lib/storage/merge_timing_test.go @@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i } mpOut.Reset() bsw.InitFromInmemoryPart(&mpOut) - if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, nil, &rowsMerged, &rowsDeleted); err != nil { panic(fmt.Errorf("cannot merge block streams: %w", err)) } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 8919e0dbe..08ae49ef2 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -20,6 +20,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) @@ -328,6 +329,7 @@ type partitionMetrics struct { SmallPartsRefCount uint64 SmallAssistedMerges uint64 + BigMergesDelays uint64 } // UpdateMetrics updates m with metrics from pt. @@ -386,6 +388,8 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted) m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges) + + m.BigMergesDelays = storagepacelimiter.BigMerges.DelaysTotal() } // AddRows adds the given rows to the partition pt. @@ -574,7 +578,11 @@ func (pt *partition) addRowsPart(rows []rawRow) { } // The added part exceeds available limit. Help merging parts. + // + // Prioritize assisted merges over searches. + storagepacelimiter.Search.Inc() err = pt.mergeSmallParts(false) + storagepacelimiter.Search.Dec() if err == nil { atomic.AddUint64(&pt.smallAssistedMerges, 1) return @@ -952,13 +960,7 @@ func (pt *partition) mergeBigParts(isFinal bool) error { if len(pws) == 0 { return errNothingToMerge } - - atomic.AddUint64(&pt.bigMergesCount, 1) - atomic.AddUint64(&pt.activeBigMerges, 1) - err := pt.mergeParts(pws, pt.stopCh) - atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) - - return err + return pt.mergeParts(pws, pt.stopCh) } func (pt *partition) mergeSmallParts(isFinal bool) error { @@ -984,13 +986,7 @@ func (pt *partition) mergeSmallParts(isFinal bool) error { if len(pws) == 0 { return errNothingToMerge } - - atomic.AddUint64(&pt.smallMergesCount, 1) - atomic.AddUint64(&pt.activeSmallMerges, 1) - err := pt.mergeParts(pws, pt.stopCh) - atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0)) - - return err + return pt.mergeParts(pws, pt.stopCh) } var errNothingToMerge = fmt.Errorf("nothing to merge") @@ -1058,15 +1054,30 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro } // Merge parts. + dmis := pt.getDeletedMetricIDs() var ph partHeader rowsMerged := &pt.smallRowsMerged rowsDeleted := &pt.smallRowsDeleted + pl := storagepacelimiter.BigMerges if isBigPart { rowsMerged = &pt.bigRowsMerged rowsDeleted = &pt.bigRowsDeleted + atomic.AddUint64(&pt.bigMergesCount, 1) + atomic.AddUint64(&pt.activeBigMerges, 1) + } else { + pl = nil + atomic.AddUint64(&pt.smallMergesCount, 1) + atomic.AddUint64(&pt.activeSmallMerges, 1) + // Prioritize small merges over big merges. + storagepacelimiter.BigMerges.Inc() + } + err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pl, dmis, rowsMerged, rowsDeleted) + if isBigPart { + atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) + } else { + atomic.AddUint64(&pt.activeSmallMerges, ^uint64(0)) + storagepacelimiter.BigMerges.Dec() } - dmis := pt.getDeletedMetricIDs() - err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted) putBlockStreamWriter(bsw) if err != nil { if err == errForciblyStopped { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 19d9856a2..4dd82f68c 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -20,6 +20,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" @@ -387,7 +388,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh)) m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh)) - m.SearchDelays += atomic.LoadUint64(&searchDelays) + m.SearchDelays = storagepacelimiter.Search.DelaysTotal() m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) @@ -797,25 +798,12 @@ func nextRetentionDuration(retentionMonths int) time.Duration { return deadline.Sub(t) } -var ( - searchTSIDsCondLock sync.Mutex - searchTSIDsCond = sync.NewCond(&searchTSIDsCondLock) - - searchDelays uint64 -) - // searchTSIDs returns sorted TSIDs for the given tfss and the given tr. func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { - // Make sure that there are enough resources for processing the ingested data via Storage.AddRows - // before starting the query. + // Make sure that there are enough resources for processing data ingestion before starting the query. // This should prevent from data ingestion starvation when provessing heavy queries. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 . - searchTSIDsCondLock.Lock() - for len(addRowsConcurrencyCh) >= cap(addRowsConcurrencyCh) { - atomic.AddUint64(&searchDelays, 1) - searchTSIDsCond.Wait() - } - searchTSIDsCondLock.Unlock() + storagepacelimiter.Search.WaitIfNeeded() // Do not cache tfss -> tsids here, since the caching is performed // on idb level. @@ -1040,11 +1028,7 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { rr.rows, err = s.add(rr.rows, mrs, precisionBits) putRawRows(rr) - // Notify blocked goroutines at Storage.searchTSIDs that they may proceed with their work. - searchTSIDsCondLock.Lock() <-addRowsConcurrencyCh - searchTSIDsCond.Signal() - searchTSIDsCondLock.Unlock() return err } diff --git a/lib/storagepacelimiter/storagepacelimiter.go b/lib/storagepacelimiter/storagepacelimiter.go new file mode 100644 index 000000000..1ca5860a1 --- /dev/null +++ b/lib/storagepacelimiter/storagepacelimiter.go @@ -0,0 +1,11 @@ +package storagepacelimiter + +import ( + "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" +) + +// Search limits the pace of search calls when there is at least a single in-flight assisted merge. +var Search = pacelimiter.New() + +// BigMerges limits the pace for big merges when there is at least a single in-flight small merge. +var BigMerges = pacelimiter.New()