lib/mergeset: drop the crufty code responsible for direct upgrade from releases prior v1.28.0

Upgrade to v1.84.0, wait until the "finished round 2 of background conversion" message
appears in the log and then upgrade to newer release.
This commit is contained in:
Aliaksandr Valialkin 2022-12-03 21:17:27 -08:00
parent 8e9822bc7f
commit 0b8e7deabd
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 7 additions and 79 deletions

View File

@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip ## tip
**Update note 1:** this release drops support for direct upgrade from VictoriaMetrics versions prior [v1.28.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.28.0). Please upgrade to `v1.84.0`, wait until `finished round 2 of background conversion` line is emitted to log by single-node VictoriaMetrics or by `vmstorage`, and then upgrade to newer releases.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [service discovery](https://docs.victoriametrics.com/sd_configs.html) performance when discovering big number of targets (10K and more). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [service discovery](https://docs.victoriametrics.com/sd_configs.html) performance when discovering big number of targets (10K and more).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `exported_` prefix to metric names exported by scrape targets if these metric names clash with [automatically generated metrics](https://docs.victoriametrics.com/vmagent.html#automatically-generated-metrics) such as `up`, `scrape_samples_scraped`, etc. This prevents from corruption of automatically generated metrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3406). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `exported_` prefix to metric names exported by scrape targets if these metric names clash with [automatically generated metrics](https://docs.victoriametrics.com/vmagent.html#automatically-generated-metrics) such as `up`, `scrape_samples_scraped`, etc. This prevents from corruption of automatically generated metrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3406).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve error message when the requested path cannot be properly parsed, so users could identify the issue and properly fix the path. Now the error message links to [url format docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3402). * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve error message when the requested path cannot be properly parsed, so users could identify the issue and properly fix the path. Now the error message links to [url format docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3402).

View File

@ -112,8 +112,6 @@ type Table struct {
rawItemsFlusherWG sync.WaitGroup rawItemsFlusherWG sync.WaitGroup
convertersWG sync.WaitGroup
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines. // Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
rawItemsPendingFlushesWG syncwg.WaitGroup rawItemsPendingFlushesWG syncwg.WaitGroup
} }
@ -306,12 +304,6 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
logger.Infof("table %q has been opened in %.3f seconds; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d", logger.Infof("table %q has been opened in %.3f seconds; partsCount: %d; blocksCount: %d, itemsCount: %d; sizeBytes: %d",
path, time.Since(startTime).Seconds(), m.PartsCount, m.BlocksCount, m.ItemsCount, m.SizeBytes) path, time.Since(startTime).Seconds(), m.PartsCount, m.BlocksCount, m.ItemsCount, m.SizeBytes)
tb.convertersWG.Add(1)
go func() {
tb.convertToV1280()
tb.convertersWG.Done()
}()
if flushCallback != nil { if flushCallback != nil {
tb.flushCallbackWorkerWG.Add(1) tb.flushCallbackWorkerWG.Add(1)
go func() { go func() {
@ -345,11 +337,6 @@ func (tb *Table) MustClose() {
tb.rawItemsFlusherWG.Wait() tb.rawItemsFlusherWG.Wait()
logger.Infof("raw items flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path) logger.Infof("raw items flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path)
logger.Infof("waiting for converters to stop on %q...", tb.path)
startTime = time.Now()
tb.convertersWG.Wait()
logger.Infof("converters stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), tb.path)
logger.Infof("waiting for part mergers to stop on %q...", tb.path) logger.Infof("waiting for part mergers to stop on %q...", tb.path)
startTime = time.Now() startTime = time.Now()
tb.partMergersWG.Wait() tb.partMergersWG.Wait()
@ -376,7 +363,7 @@ func (tb *Table) MustClose() {
} }
tb.partsLock.Unlock() tb.partsLock.Unlock()
if err := tb.mergePartsOptimal(pws, nil); err != nil { if err := tb.mergePartsOptimal(pws); err != nil {
logger.Panicf("FATAL: cannot flush inmemory parts to files in %q: %s", tb.path, err) logger.Panicf("FATAL: cannot flush inmemory parts to files in %q: %s", tb.path, err)
} }
logger.Infof("%d inmemory parts have been flushed to files in %.3f seconds on %q", len(pws), time.Since(startTime).Seconds(), tb.path) logger.Infof("%d inmemory parts have been flushed to files in %.3f seconds on %q", len(pws), time.Since(startTime).Seconds(), tb.path)
@ -533,63 +520,11 @@ func (tb *Table) rawItemsFlusher() {
} }
} }
const convertToV1280FileName = "converted-to-v1.28.0" func (tb *Table) mergePartsOptimal(pws []*partWrapper) error {
func (tb *Table) convertToV1280() {
// Convert tag->metricID rows into tag->metricIDs rows when upgrading to v1.28.0+.
flagFilePath := tb.path + "/" + convertToV1280FileName
if fs.IsPathExist(flagFilePath) {
// The conversion has been already performed.
return
}
getAllPartsForMerge := func() []*partWrapper {
var pws []*partWrapper
tb.partsLock.Lock()
for _, pw := range tb.parts {
if pw.isInMerge {
continue
}
pw.isInMerge = true
pws = append(pws, pw)
}
tb.partsLock.Unlock()
return pws
}
pws := getAllPartsForMerge()
if len(pws) > 0 {
logger.Infof("started round 1 of background conversion of %q to v1.28.0 format; merge %d parts", tb.path, len(pws))
startTime := time.Now()
if err := tb.mergePartsOptimal(pws, tb.stopCh); err != nil {
logger.Errorf("failed round 1 of background conversion of %q to v1.28.0 format: %s", tb.path, err)
return
}
logger.Infof("finished round 1 of background conversion of %q to v1.28.0 format in %.3f seconds", tb.path, time.Since(startTime).Seconds())
// The second round is needed in order to merge small blocks
// with tag->metricIDs rows left after the first round.
pws = getAllPartsForMerge()
logger.Infof("started round 2 of background conversion of %q to v1.28.0 format; merge %d parts", tb.path, len(pws))
startTime = time.Now()
if len(pws) > 0 {
if err := tb.mergePartsOptimal(pws, tb.stopCh); err != nil {
logger.Errorf("failed round 2 of background conversion of %q to v1.28.0 format: %s", tb.path, err)
return
}
}
logger.Infof("finished round 2 of background conversion of %q to v1.28.0 format in %.3f seconds", tb.path, time.Since(startTime).Seconds())
}
if err := fs.WriteFileAtomically(flagFilePath, []byte("ok"), false); err != nil {
logger.Panicf("FATAL: cannot create %q: %s", flagFilePath, err)
}
}
func (tb *Table) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error {
for len(pws) > defaultPartsToMerge { for len(pws) > defaultPartsToMerge {
pwsChunk := pws[:defaultPartsToMerge] pwsChunk := pws[:defaultPartsToMerge]
pws = pws[defaultPartsToMerge:] pws = pws[defaultPartsToMerge:]
if err := tb.mergeParts(pwsChunk, stopCh, false); err != nil { if err := tb.mergeParts(pwsChunk, nil, false); err != nil {
tb.releasePartsToMerge(pws) tb.releasePartsToMerge(pws)
return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err) return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err)
} }
@ -597,7 +532,7 @@ func (tb *Table) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) e
if len(pws) == 0 { if len(pws) == 0 {
return nil return nil
} }
if err := tb.mergeParts(pws, stopCh, false); err != nil { if err := tb.mergeParts(pws, nil, false); err != nil {
return fmt.Errorf("cannot merge %d parts: %w", len(pws), err) return fmt.Errorf("cannot merge %d parts: %w", len(pws), err)
} }
return nil return nil
@ -1188,16 +1123,7 @@ func (tb *Table) CreateSnapshotAt(dstDir string) error {
for _, fi := range fis { for _, fi := range fis {
fn := fi.Name() fn := fi.Name()
if !fs.IsDirOrSymlink(fi) { if !fs.IsDirOrSymlink(fi) {
switch fn { // Skip non-directories.
case convertToV1280FileName:
srcPath := srcDir + "/" + fn
dstPath := dstDir + "/" + fn
if err := os.Link(srcPath, dstPath); err != nil {
return fmt.Errorf("cannot hard link from %q to %q: %w", srcPath, dstPath, err)
}
default:
// Skip other non-directories.
}
continue continue
} }
if isSpecialDir(fn) { if isSpecialDir(fn) {