app/vlstorage: add support for forced merge via /internal/force_merge HTTP endpoint
Some checks are pending
build / Build (push) Waiting to run
CodeQL Go / Analyze (push) Waiting to run
main / lint (push) Waiting to run
main / test (test-full) (push) Blocked by required conditions
main / test (test-full-386) (push) Blocked by required conditions
main / test (test-pure) (push) Blocked by required conditions
publish-docs / Build (push) Waiting to run

This commit is contained in:
Aliaksandr Valialkin 2024-10-13 22:20:31 +02:00
parent b4b79a4961
commit 3c73dbbacc
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
7 changed files with 125 additions and 0 deletions

View File

@ -92,6 +92,9 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
if vlselect.RequestHandler(w, r) {
return true
}
if vlstorage.RequestHandler(w, r) {
return true
}
return false
}

View File

@ -37,6 +37,8 @@ var (
"see https://docs.victoriametrics.com/victorialogs/data-ingestion/ ; see also -logNewStreams")
minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which "+
"the storage stops accepting new data")
forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*")
)
// Init initializes vlstorage.
@ -87,6 +89,28 @@ func Stop() {
strg = nil
}
// RequestHandler is a storage request handler.
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
path := r.URL.Path
if path == "/internal/force_merge" {
if !httpserver.CheckAuthFlag(w, r, forceMergeAuthKey) {
return true
}
// Run force merge in background
partitionNamePrefix := r.FormValue("partition_prefix")
go func() {
activeForceMerges.Inc()
defer activeForceMerges.Dec()
logger.Infof("forced merge for partition_prefix=%q has been started", partitionNamePrefix)
startTime := time.Now()
strg.MustForceMerge(partitionNamePrefix)
logger.Infof("forced merge for partition_prefix=%q has been successfully finished in %.3f seconds", partitionNamePrefix, time.Since(startTime).Seconds())
}()
return true
}
return false
}
var strg *logstorage.Storage
var storageMetrics *metrics.Set
@ -205,3 +229,5 @@ func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
metrics.WriteCounterUint64(w, `vl_rows_dropped_total{reason="too_big_timestamp"}`, ss.RowsDroppedTooBigTimestamp)
metrics.WriteCounterUint64(w, `vl_rows_dropped_total{reason="too_small_timestamp"}`, ss.RowsDroppedTooSmallTimestamp)
}
var activeForceMerges = metrics.NewCounter("vl_active_force_merges")

View File

@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add support for forced merge. See [these docs](https://docs.victoriametrics.com/victorialogs/#forced-merge).
* BUGFIX: avoid possible panic when logs for a new day are ingested during execution of concurrent queries.
* BUGFIX: avoid panic at `lib/logstorage.(*blockResultColumn).forEachDictValue()` when [stats with additional filters](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters). The panic has been introduced in [v0.33.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.33.0-victorialogs) in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/a350be48b68330ee1a487e1fb09b002d3be45163).

View File

@ -131,6 +131,19 @@ For example, the following command starts VictoriaLogs, which stores the data at
VictoriaLogs automatically creates the `-storageDataPath` directory on the first run if it is missing.
## Forced merge
VictoriaLogs performs data compactions in background in order to keep good performance characteristics when accepting new data.
These compactions (merges) are performed independently on per-day partitions.
This means that compactions are stopped for per-day partitions if no new data is ingested into these partitions.
Sometimes it is necessary to trigger compactions for old partitions. In this case forced compaction may be initiated on the specified per-month partition
by sending request to `/internal/force_merge?partition_prefix=YYYYMMDD`,
where `YYYYMMDD` is per-day partition name. For example, `http://victoria-logs:9428/internal/force_merge?partition_prefix=20240921` would initiate forced
merge for September 21, 2024 partition. The call to `/internal/force_merge` returns immediately, while the corresponding forced merge continues running in background.
Forced merges may require additional CPU, disk IO and storage space resources. It is unnecessary to run forced merge under normal conditions,
since VictoriaLogs automatically performs optimal merges in background when new data is ingested into it.
## High Availability
### High Availability (HA) Setup with VictoriaLogs Single-Node Instances

View File

@ -459,6 +459,7 @@ func assertIsInMerge(pws []*partWrapper) {
// mustMergeParts merges pws to a single resulting part.
//
// if isFinal is set, then the resulting part is guaranteed to be saved to disk.
// if isFinal is set, then the merge process cannot be interrupted.
// The pws may remain unmerged after returning from the function if there is no enough disk space.
//
// All the parts inside pws must have isInMerge field set to true.
@ -1226,3 +1227,48 @@ func getBlocksCount(pws []*partWrapper) uint64 {
}
return n
}
func (ddb *datadb) mustForceMergeAllParts() {
// Flush inmemory parts to files before forced merge
ddb.mustFlushInmemoryPartsToFiles(true)
var pws []*partWrapper
// Collect all the file parts for forced merge
ddb.partsLock.Lock()
pws = appendAllPartsForMergeLocked(pws, ddb.smallParts)
pws = appendAllPartsForMergeLocked(pws, ddb.bigParts)
ddb.partsLock.Unlock()
// If len(pws) == 1, then the merge must run anyway.
// This allows applying the configured retention, removing the deleted data, etc.
// Merge pws optimally
wg := getWaitGroup()
for len(pws) > 0 {
pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
wg.Add(1)
bigPartsConcurrencyCh <- struct{}{}
go func(pwsChunk []*partWrapper) {
defer func() {
<-bigPartsConcurrencyCh
wg.Done()
}()
ddb.mustMergeParts(pwsChunk, false)
}(pwsToMerge)
pws = pwsRemaining
}
wg.Wait()
putWaitGroup(wg)
}
func appendAllPartsForMergeLocked(dst, src []*partWrapper) []*partWrapper {
for _, pw := range src {
if !pw.isInMerge {
pw.isInMerge = true
dst = append(dst, pw)
}
}
return dst
}

View File

@ -195,3 +195,8 @@ func (pt *partition) updateStats(ps *PartitionStats) {
pt.ddb.updateStats(&ps.DatadbStats)
pt.idb.updateStats(&ps.IndexdbStats)
}
// mustForceMerge runs forced merge for all the parts in pt.
func (pt *partition) mustForceMerge() {
pt.ddb.mustForceMergeAllParts()
}

View File

@ -4,6 +4,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@ -113,6 +114,8 @@ type Storage struct {
// partitions is a list of partitions for the Storage.
//
// It must be accessed under partitionsLock.
//
// partitions are sorted by time.
partitions []*partitionWrapper
// ptwHot is the "hot" partition, were the last rows were ingested.
@ -472,6 +475,33 @@ func (s *Storage) MustClose() {
s.path = ""
}
// MustForceMerge force-merges parts in s partitions with names starting from the given partitionNamePrefix.
//
// Partitions are merged sequentially in order to reduce load on the system.
func (s *Storage) MustForceMerge(partitionNamePrefix string) {
var ptws []*partitionWrapper
s.partitionsLock.Lock()
for _, ptw := range s.partitions {
if strings.HasPrefix(ptw.pt.name, partitionNamePrefix) {
ptw.incRef()
ptws = append(ptws, ptw)
}
}
s.partitionsLock.Unlock()
s.wg.Add(1)
defer s.wg.Done()
for _, ptw := range ptws {
logger.Infof("started force merge for partition %s", ptw.pt.name)
startTime := time.Now()
ptw.pt.mustForceMerge()
ptw.decRef()
logger.Infof("finished force merge for partition %s in %.3fs", ptw.pt.name, time.Since(startTime).Seconds())
}
}
// MustAddRows adds lr to s.
//
// It is recommended checking whether the s is in read-only mode by calling IsReadOnly()