mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
lib/storage: add /internal/force_merge
handler for running forced compactions on historical per-month partitions
This may be useful for freeing up storage space after time series deletion. See https://victoriametrics.github.io/#force-merge for more details. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/686
This commit is contained in:
parent
8beb0da6ad
commit
1f33dd717f
20
README.md
20
README.md
@ -115,6 +115,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
|
|||||||
* [Setting up service](#setting-up-service)
|
* [Setting up service](#setting-up-service)
|
||||||
* [How to work with snapshots](#how-to-work-with-snapshots)
|
* [How to work with snapshots](#how-to-work-with-snapshots)
|
||||||
* [How to delete time series](#how-to-delete-time-series)
|
* [How to delete time series](#how-to-delete-time-series)
|
||||||
|
* [Forced merge](#forced-merge)
|
||||||
* [How to export time series](#how-to-export-time-series)
|
* [How to export time series](#how-to-export-time-series)
|
||||||
* [How to import time series data](#how-to-import-time-series-data)
|
* [How to import time series data](#how-to-import-time-series-data)
|
||||||
* [Relabeling](#relabeling)
|
* [Relabeling](#relabeling)
|
||||||
@ -712,6 +713,8 @@ Send a request to `http://<victoriametrics-addr>:8428/api/v1/admin/tsdb/delete_s
|
|||||||
where `<timeseries_selector_for_delete>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
|
where `<timeseries_selector_for_delete>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
|
||||||
for metrics to delete. After that all the time series matching the given selector are deleted. Storage space for
|
for metrics to delete. After that all the time series matching the given selector are deleted. Storage space for
|
||||||
the deleted time series isn't freed instantly - it is freed during subsequent [background merges of data files](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
|
the deleted time series isn't freed instantly - it is freed during subsequent [background merges of data files](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
|
||||||
|
Note that background merges may never occur for data from previous months, so storage space won't be freed for historical data.
|
||||||
|
In this case [forced merge](#forced-merge) may help freeing up storage space.
|
||||||
|
|
||||||
It is recommended verifying which metrics will be deleted with the call to `http://<victoria-metrics-addr>:8428/api/v1/series?match[]=<timeseries_selector_for_delete>`
|
It is recommended verifying which metrics will be deleted with the call to `http://<victoria-metrics-addr>:8428/api/v1/series?match[]=<timeseries_selector_for_delete>`
|
||||||
before actually deleting the metrics. By default this query will only scan active series in the past 5 minutes, so you may need to
|
before actually deleting the metrics. By default this query will only scan active series in the past 5 minutes, so you may need to
|
||||||
@ -731,9 +734,26 @@ It isn't recommended using delete API for the following cases, since it brings n
|
|||||||
See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details.
|
See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details.
|
||||||
* Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted
|
* Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted
|
||||||
time series occupy disk space until the next merge operation, which can never occur when deleting too old data.
|
time series occupy disk space until the next merge operation, which can never occur when deleting too old data.
|
||||||
|
[Forced merge](#forced-merge) may be used for freeing up disk space occupied by old data.
|
||||||
|
|
||||||
It is better using `-retentionPeriod` command-line flag for efficient pruning of old data.
|
It is better using `-retentionPeriod` command-line flag for efficient pruning of old data.
|
||||||
|
|
||||||
|
|
||||||
|
### Forced merge
|
||||||
|
|
||||||
|
VictoriaMetrics performs [data compations in background](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282)
|
||||||
|
in order to keep good performance characteristics when accepting new data. These compactions (merges) are performed independently on per-month partitions.
|
||||||
|
This means that compactions are stopped for per-month partitions if no new data is ingested into these partitions.
|
||||||
|
Sometimes it is necessary to trigger compactions for old partitions. For instance, in order to free up disk space occupied by [deleted time series](#how-to-delete-time-series).
|
||||||
|
In this case forced compaction may be initiated on the specified per-month partition by sending request to `/internal/force_merge?partition_prefix=YYYY_MM`,
|
||||||
|
where `YYYY_MM` is per-month partition name. For example, `http://victoriametrics:8428/internal/force_merge?partition_prefix=2020_08` would initiate forced
|
||||||
|
merge for August 2020 partition. The call to `/internal/force_merge` returns immediately, while the corresponding forced merges continues running in background.
|
||||||
|
|
||||||
|
Forced merges may require additional CPU, disk IO and storage space resources. It is unnecessary to run forced merge under normal conditions,
|
||||||
|
since VictoriaMetrics automatically performs [optimal merges in background](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282)
|
||||||
|
when new data is ingested into it.
|
||||||
|
|
||||||
|
|
||||||
### How to export time series
|
### How to export time series
|
||||||
|
|
||||||
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`,
|
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`,
|
||||||
|
@ -180,6 +180,21 @@ func Stop() {
|
|||||||
// RequestHandler is a storage request handler.
|
// RequestHandler is a storage request handler.
|
||||||
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
path := r.URL.Path
|
path := r.URL.Path
|
||||||
|
if path == "/internal/force_merge" {
|
||||||
|
// Run force merge in background
|
||||||
|
partitionNamePrefix := r.FormValue("partition_prefix")
|
||||||
|
go func() {
|
||||||
|
activeForceMerges.Inc()
|
||||||
|
defer activeForceMerges.Dec()
|
||||||
|
logger.Infof("forced merge for partition_prefix=%q has been started", partitionNamePrefix)
|
||||||
|
startTime := time.Now()
|
||||||
|
if err := Storage.ForceMergePartitions(partitionNamePrefix); err != nil {
|
||||||
|
logger.Errorf("error in forced merge for partition_prefix=%q: %s", partitionNamePrefix, err)
|
||||||
|
}
|
||||||
|
logger.Infof("forced merge for partition_prefix=%q has been successfully finished in %.3f seconds", partitionNamePrefix, time.Since(startTime).Seconds())
|
||||||
|
}()
|
||||||
|
return true
|
||||||
|
}
|
||||||
prometheusCompatibleResponse := false
|
prometheusCompatibleResponse := false
|
||||||
if path == "/api/v1/admin/tsdb/snapshot" {
|
if path == "/api/v1/admin/tsdb/snapshot" {
|
||||||
// Handle Prometheus API - https://prometheus.io/docs/prometheus/latest/querying/api/#snapshot .
|
// Handle Prometheus API - https://prometheus.io/docs/prometheus/latest/querying/api/#snapshot .
|
||||||
@ -260,6 +275,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var activeForceMerges = metrics.NewCounter("vm_active_force_merges")
|
||||||
|
|
||||||
func registerStorageMetrics() {
|
func registerStorageMetrics() {
|
||||||
mCache := &storage.Metrics{}
|
mCache := &storage.Metrics{}
|
||||||
var mCacheLock sync.Mutex
|
var mCacheLock sync.Mutex
|
||||||
|
@ -206,6 +206,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
|
|||||||
be used on a regular basis, since it carries non-zero overhead.
|
be used on a regular basis, since it carries non-zero overhead.
|
||||||
|
|
||||||
* `vmstorage` nodes provide the following HTTP endpoints on `8482` port:
|
* `vmstorage` nodes provide the following HTTP endpoints on `8482` port:
|
||||||
|
- `/internal/force_merge` - initiate [forced compactions](https://victoriametrics.github.io/#force-merge) on the given `vmstorage` node.
|
||||||
- `/snapshot/create` - create [instant snapshot](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282),
|
- `/snapshot/create` - create [instant snapshot](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282),
|
||||||
which can be used for backups in background. Snapshots are created in `<storageDataPath>/snapshots` folder, where `<storageDataPath>` is the corresponding
|
which can be used for backups in background. Snapshots are created in `<storageDataPath>/snapshots` folder, where `<storageDataPath>` is the corresponding
|
||||||
command-line flag value.
|
command-line flag value.
|
||||||
|
@ -115,6 +115,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
|
|||||||
* [Setting up service](#setting-up-service)
|
* [Setting up service](#setting-up-service)
|
||||||
* [How to work with snapshots](#how-to-work-with-snapshots)
|
* [How to work with snapshots](#how-to-work-with-snapshots)
|
||||||
* [How to delete time series](#how-to-delete-time-series)
|
* [How to delete time series](#how-to-delete-time-series)
|
||||||
|
* [Forced merge](#forced-merge)
|
||||||
* [How to export time series](#how-to-export-time-series)
|
* [How to export time series](#how-to-export-time-series)
|
||||||
* [How to import time series data](#how-to-import-time-series-data)
|
* [How to import time series data](#how-to-import-time-series-data)
|
||||||
* [Relabeling](#relabeling)
|
* [Relabeling](#relabeling)
|
||||||
@ -712,6 +713,8 @@ Send a request to `http://<victoriametrics-addr>:8428/api/v1/admin/tsdb/delete_s
|
|||||||
where `<timeseries_selector_for_delete>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
|
where `<timeseries_selector_for_delete>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
|
||||||
for metrics to delete. After that all the time series matching the given selector are deleted. Storage space for
|
for metrics to delete. After that all the time series matching the given selector are deleted. Storage space for
|
||||||
the deleted time series isn't freed instantly - it is freed during subsequent [background merges of data files](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
|
the deleted time series isn't freed instantly - it is freed during subsequent [background merges of data files](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
|
||||||
|
Note that background merges may never occur for data from previous months, so storage space won't be freed for historical data.
|
||||||
|
In this case [forced merge](#forced-merge) may help freeing up storage space.
|
||||||
|
|
||||||
It is recommended verifying which metrics will be deleted with the call to `http://<victoria-metrics-addr>:8428/api/v1/series?match[]=<timeseries_selector_for_delete>`
|
It is recommended verifying which metrics will be deleted with the call to `http://<victoria-metrics-addr>:8428/api/v1/series?match[]=<timeseries_selector_for_delete>`
|
||||||
before actually deleting the metrics. By default this query will only scan active series in the past 5 minutes, so you may need to
|
before actually deleting the metrics. By default this query will only scan active series in the past 5 minutes, so you may need to
|
||||||
@ -731,9 +734,26 @@ It isn't recommended using delete API for the following cases, since it brings n
|
|||||||
See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details.
|
See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details.
|
||||||
* Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted
|
* Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted
|
||||||
time series occupy disk space until the next merge operation, which can never occur when deleting too old data.
|
time series occupy disk space until the next merge operation, which can never occur when deleting too old data.
|
||||||
|
[Forced merge](#forced-merge) may be used for freeing up disk space occupied by old data.
|
||||||
|
|
||||||
It is better using `-retentionPeriod` command-line flag for efficient pruning of old data.
|
It is better using `-retentionPeriod` command-line flag for efficient pruning of old data.
|
||||||
|
|
||||||
|
|
||||||
|
### Forced merge
|
||||||
|
|
||||||
|
VictoriaMetrics performs [data compations in background](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282)
|
||||||
|
in order to keep good performance characteristics when accepting new data. These compactions (merges) are performed independently on per-month partitions.
|
||||||
|
This means that compactions are stopped for per-month partitions if no new data is ingested into these partitions.
|
||||||
|
Sometimes it is necessary to trigger compactions for old partitions. For instance, in order to free up disk space occupied by [deleted time series](#how-to-delete-time-series).
|
||||||
|
In this case forced compaction may be initiated on the specified per-month partition by sending request to `/internal/force_merge?partition_prefix=YYYY_MM`,
|
||||||
|
where `YYYY_MM` is per-month partition name. For example, `http://victoriametrics:8428/internal/force_merge?partition_prefix=2020_08` would initiate forced
|
||||||
|
merge for August 2020 partition. The call to `/internal/force_merge` returns immediately, while the corresponding forced merges continues running in background.
|
||||||
|
|
||||||
|
Forced merges may require additional CPU, disk IO and storage space resources. It is unnecessary to run forced merge under normal conditions,
|
||||||
|
since VictoriaMetrics automatically performs [optimal merges in background](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282)
|
||||||
|
when new data is ingested into it.
|
||||||
|
|
||||||
|
|
||||||
### How to export time series
|
### How to export time series
|
||||||
|
|
||||||
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`,
|
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`,
|
||||||
|
@ -668,7 +668,7 @@ func (pt *partition) MustClose() {
|
|||||||
}
|
}
|
||||||
pt.partsLock.Unlock()
|
pt.partsLock.Unlock()
|
||||||
|
|
||||||
if err := pt.mergePartsOptimal(pws); err != nil {
|
if err := pt.mergePartsOptimal(pws, nil); err != nil {
|
||||||
logger.Panicf("FATAL: cannot flush %d inmemory parts to files on %q: %s", len(pws), pt.smallPartsPath, err)
|
logger.Panicf("FATAL: cannot flush %d inmemory parts to files on %q: %s", len(pws), pt.smallPartsPath, err)
|
||||||
}
|
}
|
||||||
logger.Infof("%d inmemory parts have been flushed to files in %.3f seconds on %q", len(pws), time.Since(startTime).Seconds(), pt.smallPartsPath)
|
logger.Infof("%d inmemory parts have been flushed to files in %.3f seconds on %q", len(pws), time.Since(startTime).Seconds(), pt.smallPartsPath)
|
||||||
@ -794,13 +794,13 @@ func (pt *partition) flushInmemoryParts(dstPws []*partWrapper, force bool) ([]*p
|
|||||||
}
|
}
|
||||||
pt.partsLock.Unlock()
|
pt.partsLock.Unlock()
|
||||||
|
|
||||||
if err := pt.mergePartsOptimal(dstPws); err != nil {
|
if err := pt.mergePartsOptimal(dstPws, nil); err != nil {
|
||||||
return dstPws, fmt.Errorf("cannot merge %d inmemory parts: %w", len(dstPws), err)
|
return dstPws, fmt.Errorf("cannot merge %d inmemory parts: %w", len(dstPws), err)
|
||||||
}
|
}
|
||||||
return dstPws, nil
|
return dstPws, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
|
func (pt *partition) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
// Remove isInMerge flag from pws.
|
// Remove isInMerge flag from pws.
|
||||||
pt.partsLock.Lock()
|
pt.partsLock.Lock()
|
||||||
@ -812,7 +812,7 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
|
|||||||
pt.partsLock.Unlock()
|
pt.partsLock.Unlock()
|
||||||
}()
|
}()
|
||||||
for len(pws) > defaultPartsToMerge {
|
for len(pws) > defaultPartsToMerge {
|
||||||
if err := pt.mergeParts(pws[:defaultPartsToMerge], nil); err != nil {
|
if err := pt.mergeParts(pws[:defaultPartsToMerge], stopCh); err != nil {
|
||||||
return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err)
|
return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err)
|
||||||
}
|
}
|
||||||
pws = pws[defaultPartsToMerge:]
|
pws = pws[defaultPartsToMerge:]
|
||||||
@ -820,12 +820,53 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
|
|||||||
if len(pws) == 0 {
|
if len(pws) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := pt.mergeParts(pws, nil); err != nil {
|
if err := pt.mergeParts(pws, stopCh); err != nil {
|
||||||
return fmt.Errorf("cannot merge %d parts: %w", len(pws), err)
|
return fmt.Errorf("cannot merge %d parts: %w", len(pws), err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ForceMergeAllParts runs merge for all the parts in pt - small and big.
|
||||||
|
func (pt *partition) ForceMergeAllParts() error {
|
||||||
|
var pws []*partWrapper
|
||||||
|
pt.partsLock.Lock()
|
||||||
|
if !hasActiveMerges(pt.smallParts) && !hasActiveMerges(pt.bigParts) {
|
||||||
|
pws = appendAllPartsToMerge(pws, pt.smallParts)
|
||||||
|
pws = appendAllPartsToMerge(pws, pt.bigParts)
|
||||||
|
}
|
||||||
|
pt.partsLock.Unlock()
|
||||||
|
|
||||||
|
if len(pws) == 0 {
|
||||||
|
// Nothing to merge.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// If len(pws) == 1, then the merge must run anyway, so deleted time series could be removed from the part.
|
||||||
|
if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil {
|
||||||
|
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendAllPartsToMerge(dst, src []*partWrapper) []*partWrapper {
|
||||||
|
for _, pw := range src {
|
||||||
|
if pw.isInMerge {
|
||||||
|
logger.Panicf("BUG: part %q is already in merge", pw.p.path)
|
||||||
|
}
|
||||||
|
pw.isInMerge = true
|
||||||
|
dst = append(dst, pw)
|
||||||
|
}
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
func hasActiveMerges(pws []*partWrapper) bool {
|
||||||
|
for _, pw := range pws {
|
||||||
|
if pw.isInMerge {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
|
bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
|
||||||
smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
|
smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
|
||||||
|
@ -1044,6 +1044,13 @@ func (mr *MetricRow) Unmarshal(src []byte) ([]byte, error) {
|
|||||||
return tail, nil
|
return tail, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ForceMergePartitions force-merges partitions in s with names starting from the given partitionNamePrefix.
|
||||||
|
//
|
||||||
|
// Partitions are merged sequentially in order to reduce load on the system.
|
||||||
|
func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error {
|
||||||
|
return s.tb.ForceMergePartitions(partitionNamePrefix)
|
||||||
|
}
|
||||||
|
|
||||||
// AddRows adds the given mrs to s.
|
// AddRows adds the given mrs to s.
|
||||||
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
|
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
|
||||||
if len(mrs) == 0 {
|
if len(mrs) == 0 {
|
||||||
|
@ -773,6 +773,21 @@ func testStorageAddRows(s *Storage) error {
|
|||||||
return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, m1.TableMetrics.SmallRowsCount)
|
return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, m1.TableMetrics.SmallRowsCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify that force merge for the snapshot leaves only a single part per partition.
|
||||||
|
if err := s1.ForceMergePartitions(""); err != nil {
|
||||||
|
return fmt.Errorf("error when force merging partitions: %w", err)
|
||||||
|
}
|
||||||
|
ptws := s1.tb.GetPartitions(nil)
|
||||||
|
defer s1.tb.PutPartitions(ptws)
|
||||||
|
for _, ptw := range ptws {
|
||||||
|
pws := ptw.pt.GetParts(nil)
|
||||||
|
numParts := len(pws)
|
||||||
|
ptw.pt.PutParts(pws)
|
||||||
|
if numParts != 1 {
|
||||||
|
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want 1", ptw.pt.name, numParts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
s1.MustClose()
|
s1.MustClose()
|
||||||
|
|
||||||
// Delete the snapshot and make sure it is no longer visible.
|
// Delete the snapshot and make sure it is no longer visible.
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -242,6 +243,26 @@ func (tb *table) UpdateMetrics(m *TableMetrics) {
|
|||||||
tb.ptwsLock.Unlock()
|
tb.ptwsLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ForceMergePartitions force-merges partitions in tb with names starting from the given partitionNamePrefix.
|
||||||
|
//
|
||||||
|
// Partitions are merged sequentially in order to reduce load on the system.
|
||||||
|
func (tb *table) ForceMergePartitions(partitionNamePrefix string) error {
|
||||||
|
ptws := tb.GetPartitions(nil)
|
||||||
|
defer tb.PutPartitions(ptws)
|
||||||
|
for _, ptw := range ptws {
|
||||||
|
if !strings.HasPrefix(ptw.pt.name, partitionNamePrefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logger.Infof("starting forced merge for partition %q", ptw.pt.name)
|
||||||
|
startTime := time.Now()
|
||||||
|
if err := ptw.pt.ForceMergeAllParts(); err != nil {
|
||||||
|
return fmt.Errorf("cannot complete forced merge for partition %q: %w", ptw.pt.name, err)
|
||||||
|
}
|
||||||
|
logger.Infof("forced merge for partition %q has been finished in %.3f seconds", ptw.pt.name, time.Since(startTime).Seconds())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// AddRows adds the given rows to the table tb.
|
// AddRows adds the given rows to the table tb.
|
||||||
func (tb *table) AddRows(rows []rawRow) error {
|
func (tb *table) AddRows(rows []rawRow) error {
|
||||||
if len(rows) == 0 {
|
if len(rows) == 0 {
|
||||||
|
Loading…
Reference in New Issue
Block a user