diff --git a/README.md b/README.md index 38ba08b65..aa883ee19 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [Setting up service](#setting-up-service) * [How to work with snapshots](#how-to-work-with-snapshots) * [How to delete time series](#how-to-delete-time-series) +* [Forced merge](#forced-merge) * [How to export time series](#how-to-export-time-series) * [How to import time series data](#how-to-import-time-series-data) * [Relabeling](#relabeling) @@ -712,6 +713,8 @@ Send a request to `http://:8428/api/v1/admin/tsdb/delete_s where `` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for metrics to delete. After that all the time series matching the given selector are deleted. Storage space for the deleted time series isn't freed instantly - it is freed during subsequent [background merges of data files](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). +Note that background merges may never occur for data from previous months, so storage space won't be freed for historical data. +In this case [forced merge](#forced-merge) may help freeing up storage space. It is recommended verifying which metrics will be deleted with the call to `http://:8428/api/v1/series?match[]=` before actually deleting the metrics. By default this query will only scan active series in the past 5 minutes, so you may need to @@ -731,9 +734,26 @@ It isn't recommended using delete API for the following cases, since it brings n See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details. * Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted time series occupy disk space until the next merge operation, which can never occur when deleting too old data. + [Forced merge](#forced-merge) may be used for freeing up disk space occupied by old data. It is better using `-retentionPeriod` command-line flag for efficient pruning of old data. + +### Forced merge + +VictoriaMetrics performs [data compations in background](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282) +in order to keep good performance characteristics when accepting new data. These compactions (merges) are performed independently on per-month partitions. +This means that compactions are stopped for per-month partitions if no new data is ingested into these partitions. +Sometimes it is necessary to trigger compactions for old partitions. For instance, in order to free up disk space occupied by [deleted time series](#how-to-delete-time-series). +In this case forced compaction may be initiated on the specified per-month partition by sending request to `/internal/force_merge?partition_prefix=YYYY_MM`, +where `YYYY_MM` is per-month partition name. For example, `http://victoriametrics:8428/internal/force_merge?partition_prefix=2020_08` would initiate forced +merge for August 2020 partition. The call to `/internal/force_merge` returns immediately, while the corresponding forced merges continues running in background. + +Forced merges may require additional CPU, disk IO and storage space resources. It is unnecessary to run forced merge under normal conditions, +since VictoriaMetrics automatically performs [optimal merges in background](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282) +when new data is ingested into it. + + ### How to export time series Send a request to `http://:8428/api/v1/export?match[]=`, diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index abd1ed9ec..c1395b452 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -180,6 +180,21 @@ func Stop() { // RequestHandler is a storage request handler. func RequestHandler(w http.ResponseWriter, r *http.Request) bool { path := r.URL.Path + if path == "/internal/force_merge" { + // Run force merge in background + partitionNamePrefix := r.FormValue("partition_prefix") + go func() { + activeForceMerges.Inc() + defer activeForceMerges.Dec() + logger.Infof("forced merge for partition_prefix=%q has been started", partitionNamePrefix) + startTime := time.Now() + if err := Storage.ForceMergePartitions(partitionNamePrefix); err != nil { + logger.Errorf("error in forced merge for partition_prefix=%q: %s", partitionNamePrefix, err) + } + logger.Infof("forced merge for partition_prefix=%q has been successfully finished in %.3f seconds", partitionNamePrefix, time.Since(startTime).Seconds()) + }() + return true + } prometheusCompatibleResponse := false if path == "/api/v1/admin/tsdb/snapshot" { // Handle Prometheus API - https://prometheus.io/docs/prometheus/latest/querying/api/#snapshot . @@ -260,6 +275,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } } +var activeForceMerges = metrics.NewCounter("vm_active_force_merges") + func registerStorageMetrics() { mCache := &storage.Metrics{} var mCacheLock sync.Mutex diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 507231b42..0434d8f1c 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -206,6 +206,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr be used on a regular basis, since it carries non-zero overhead. * `vmstorage` nodes provide the following HTTP endpoints on `8482` port: + - `/internal/force_merge` - initiate [forced compactions](https://victoriametrics.github.io/#force-merge) on the given `vmstorage` node. - `/snapshot/create` - create [instant snapshot](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282), which can be used for backups in background. Snapshots are created in `/snapshots` folder, where `` is the corresponding command-line flag value. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 38ba08b65..aa883ee19 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -115,6 +115,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet * [Setting up service](#setting-up-service) * [How to work with snapshots](#how-to-work-with-snapshots) * [How to delete time series](#how-to-delete-time-series) +* [Forced merge](#forced-merge) * [How to export time series](#how-to-export-time-series) * [How to import time series data](#how-to-import-time-series-data) * [Relabeling](#relabeling) @@ -712,6 +713,8 @@ Send a request to `http://:8428/api/v1/admin/tsdb/delete_s where `` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) for metrics to delete. After that all the time series matching the given selector are deleted. Storage space for the deleted time series isn't freed instantly - it is freed during subsequent [background merges of data files](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). +Note that background merges may never occur for data from previous months, so storage space won't be freed for historical data. +In this case [forced merge](#forced-merge) may help freeing up storage space. It is recommended verifying which metrics will be deleted with the call to `http://:8428/api/v1/series?match[]=` before actually deleting the metrics. By default this query will only scan active series in the past 5 minutes, so you may need to @@ -731,9 +734,26 @@ It isn't recommended using delete API for the following cases, since it brings n See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details. * Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted time series occupy disk space until the next merge operation, which can never occur when deleting too old data. + [Forced merge](#forced-merge) may be used for freeing up disk space occupied by old data. It is better using `-retentionPeriod` command-line flag for efficient pruning of old data. + +### Forced merge + +VictoriaMetrics performs [data compations in background](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282) +in order to keep good performance characteristics when accepting new data. These compactions (merges) are performed independently on per-month partitions. +This means that compactions are stopped for per-month partitions if no new data is ingested into these partitions. +Sometimes it is necessary to trigger compactions for old partitions. For instance, in order to free up disk space occupied by [deleted time series](#how-to-delete-time-series). +In this case forced compaction may be initiated on the specified per-month partition by sending request to `/internal/force_merge?partition_prefix=YYYY_MM`, +where `YYYY_MM` is per-month partition name. For example, `http://victoriametrics:8428/internal/force_merge?partition_prefix=2020_08` would initiate forced +merge for August 2020 partition. The call to `/internal/force_merge` returns immediately, while the corresponding forced merges continues running in background. + +Forced merges may require additional CPU, disk IO and storage space resources. It is unnecessary to run forced merge under normal conditions, +since VictoriaMetrics automatically performs [optimal merges in background](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282) +when new data is ingested into it. + + ### How to export time series Send a request to `http://:8428/api/v1/export?match[]=`, diff --git a/lib/storage/partition.go b/lib/storage/partition.go index b4046cfba..2ec85077d 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -668,7 +668,7 @@ func (pt *partition) MustClose() { } pt.partsLock.Unlock() - if err := pt.mergePartsOptimal(pws); err != nil { + if err := pt.mergePartsOptimal(pws, nil); err != nil { logger.Panicf("FATAL: cannot flush %d inmemory parts to files on %q: %s", len(pws), pt.smallPartsPath, err) } logger.Infof("%d inmemory parts have been flushed to files in %.3f seconds on %q", len(pws), time.Since(startTime).Seconds(), pt.smallPartsPath) @@ -794,13 +794,13 @@ func (pt *partition) flushInmemoryParts(dstPws []*partWrapper, force bool) ([]*p } pt.partsLock.Unlock() - if err := pt.mergePartsOptimal(dstPws); err != nil { + if err := pt.mergePartsOptimal(dstPws, nil); err != nil { return dstPws, fmt.Errorf("cannot merge %d inmemory parts: %w", len(dstPws), err) } return dstPws, nil } -func (pt *partition) mergePartsOptimal(pws []*partWrapper) error { +func (pt *partition) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error { defer func() { // Remove isInMerge flag from pws. pt.partsLock.Lock() @@ -812,7 +812,7 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error { pt.partsLock.Unlock() }() for len(pws) > defaultPartsToMerge { - if err := pt.mergeParts(pws[:defaultPartsToMerge], nil); err != nil { + if err := pt.mergeParts(pws[:defaultPartsToMerge], stopCh); err != nil { return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err) } pws = pws[defaultPartsToMerge:] @@ -820,12 +820,53 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error { if len(pws) == 0 { return nil } - if err := pt.mergeParts(pws, nil); err != nil { + if err := pt.mergeParts(pws, stopCh); err != nil { return fmt.Errorf("cannot merge %d parts: %w", len(pws), err) } return nil } +// ForceMergeAllParts runs merge for all the parts in pt - small and big. +func (pt *partition) ForceMergeAllParts() error { + var pws []*partWrapper + pt.partsLock.Lock() + if !hasActiveMerges(pt.smallParts) && !hasActiveMerges(pt.bigParts) { + pws = appendAllPartsToMerge(pws, pt.smallParts) + pws = appendAllPartsToMerge(pws, pt.bigParts) + } + pt.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge. + return nil + } + // If len(pws) == 1, then the merge must run anyway, so deleted time series could be removed from the part. + if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil { + return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err) + } + return nil +} + +func appendAllPartsToMerge(dst, src []*partWrapper) []*partWrapper { + for _, pw := range src { + if pw.isInMerge { + logger.Panicf("BUG: part %q is already in merge", pw.p.path) + } + pw.isInMerge = true + dst = append(dst, pw) + } + return dst +} + +func hasActiveMerges(pws []*partWrapper) bool { + for _, pw := range pws { + if pw.isInMerge { + return true + } + } + return false +} + var ( bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 diff --git a/lib/storage/storage.go b/lib/storage/storage.go index d038351d3..292a32215 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1044,6 +1044,13 @@ func (mr *MetricRow) Unmarshal(src []byte) ([]byte, error) { return tail, nil } +// ForceMergePartitions force-merges partitions in s with names starting from the given partitionNamePrefix. +// +// Partitions are merged sequentially in order to reduce load on the system. +func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error { + return s.tb.ForceMergePartitions(partitionNamePrefix) +} + // AddRows adds the given mrs to s. func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { if len(mrs) == 0 { diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 3d4eb4cab..7e41c0fc8 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -773,6 +773,21 @@ func testStorageAddRows(s *Storage) error { return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, m1.TableMetrics.SmallRowsCount) } + // Verify that force merge for the snapshot leaves only a single part per partition. + if err := s1.ForceMergePartitions(""); err != nil { + return fmt.Errorf("error when force merging partitions: %w", err) + } + ptws := s1.tb.GetPartitions(nil) + defer s1.tb.PutPartitions(ptws) + for _, ptw := range ptws { + pws := ptw.pt.GetParts(nil) + numParts := len(pws) + ptw.pt.PutParts(pws) + if numParts != 1 { + return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want 1", ptw.pt.name, numParts) + } + } + s1.MustClose() // Delete the snapshot and make sure it is no longer visible. diff --git a/lib/storage/table.go b/lib/storage/table.go index f3d6dfb05..ed2780361 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync" "sync/atomic" "time" @@ -242,6 +243,26 @@ func (tb *table) UpdateMetrics(m *TableMetrics) { tb.ptwsLock.Unlock() } +// ForceMergePartitions force-merges partitions in tb with names starting from the given partitionNamePrefix. +// +// Partitions are merged sequentially in order to reduce load on the system. +func (tb *table) ForceMergePartitions(partitionNamePrefix string) error { + ptws := tb.GetPartitions(nil) + defer tb.PutPartitions(ptws) + for _, ptw := range ptws { + if !strings.HasPrefix(ptw.pt.name, partitionNamePrefix) { + continue + } + logger.Infof("starting forced merge for partition %q", ptw.pt.name) + startTime := time.Now() + if err := ptw.pt.ForceMergeAllParts(); err != nil { + return fmt.Errorf("cannot complete forced merge for partition %q: %w", ptw.pt.name, err) + } + logger.Infof("forced merge for partition %q has been finished in %.3f seconds", ptw.pt.name, time.Since(startTime).Seconds()) + } + return nil +} + // AddRows adds the given rows to the table tb. func (tb *table) AddRows(rows []rawRow) error { if len(rows) == 0 {