lib/promscrape: stop scrapers for the removed targets before starting scrapers for the added targets

This should prevent from possible time series overlap when old target is substituted by new target (for example, during Kubernetes deployments).

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1530
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1509
This commit is contained in:
Aliaksandr Valialkin 2021-08-17 00:52:42 +03:00
parent fe8c462044
commit db34c40aec
2 changed files with 36 additions and 13 deletions

View File

@ -8,6 +8,8 @@ sort: 15
* FEATURE: vmalert: add `-remoteWrite.disablePathAppend` command-line flag, which can be used when custom `-remoteWrite.url` must be specified. For example, `./vmalert -disablePathAppend -remoteWrite.url='http://foo.bar/a/b/c?d=e'` would write data to `http://foo.bar/a/b/c?d=e` instead of `http://foo.bar/a/b/c?d=e/api/v1/write`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1536). * FEATURE: vmalert: add `-remoteWrite.disablePathAppend` command-line flag, which can be used when custom `-remoteWrite.url` must be specified. For example, `./vmalert -disablePathAppend -remoteWrite.url='http://foo.bar/a/b/c?d=e'` would write data to `http://foo.bar/a/b/c?d=e` instead of `http://foo.bar/a/b/c?d=e/api/v1/write`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1536).
* BUGFIX: vmagent: stop scrapers for deleted targets before starting scrapers for added targets. This should prevent from possible time series overlap when old targets are substituted by new targets (for example, during new deployment in Kubernetes). The overlap could lead to incorrect query results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1509).
## [v1.64.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.64.0) ## [v1.64.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.64.0)

View File

@ -301,6 +301,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
additionsCount := 0 additionsCount := 0
deletionsCount := 0 deletionsCount := 0
swsMap := make(map[string][]prompbmarshal.Label, len(sws)) swsMap := make(map[string][]prompbmarshal.Label, len(sws))
var swsToStart []*ScrapeWork
for _, sw := range sws { for _, sw := range sws {
key := sw.key() key := sw.key()
originalLabels := swsMap[key] originalLabels := swsMap[key]
@ -320,33 +321,47 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
// The scraper for the given key already exists. // The scraper for the given key already exists.
continue continue
} }
swsToStart = append(swsToStart, sw)
}
// Start a scraper for the missing key. // Stop deleted scrapers before starting new scrapers in order to prevent
// series overlap when old scrape target is substituted by new scrape target.
var stoppedChs []<-chan struct{}
for key, sc := range sg.m {
if _, ok := swsMap[key]; !ok {
close(sc.stopCh)
stoppedChs = append(stoppedChs, sc.stoppedCh)
delete(sg.m, key)
deletionsCount++
}
}
// Wait until all the deleted scrapers are stopped before starting new scrapers.
for _, ch := range stoppedChs {
<-ch
}
// Start new scrapers only after the deleted scrapers are stopped.
for _, sw := range swsToStart {
sc := newScraper(sw, sg.name, sg.pushData) sc := newScraper(sw, sg.name, sg.pushData)
sg.activeScrapers.Inc() sg.activeScrapers.Inc()
sg.scrapersStarted.Inc() sg.scrapersStarted.Inc()
sg.wg.Add(1) sg.wg.Add(1)
tsmGlobal.Register(sw) tsmGlobal.Register(sw)
go func(sw *ScrapeWork) { go func(sw *ScrapeWork) {
defer sg.wg.Done() defer func() {
sg.wg.Done()
close(sc.stoppedCh)
}()
sc.sw.run(sc.stopCh) sc.sw.run(sc.stopCh)
tsmGlobal.Unregister(sw) tsmGlobal.Unregister(sw)
sg.activeScrapers.Dec() sg.activeScrapers.Dec()
sg.scrapersStopped.Inc() sg.scrapersStopped.Inc()
}(sw) }(sw)
key := sw.key()
sg.m[key] = sc sg.m[key] = sc
additionsCount++ additionsCount++
} }
// Stop deleted scrapers, which are missing in sws.
for key, sc := range sg.m {
if _, ok := swsMap[key]; !ok {
close(sc.stopCh)
delete(sg.m, key)
deletionsCount++
}
}
if additionsCount > 0 || deletionsCount > 0 { if additionsCount > 0 || deletionsCount > 0 {
sg.changesCount.Add(additionsCount + deletionsCount) sg.changesCount.Add(additionsCount + deletionsCount)
logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m)) logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m))
@ -354,13 +369,19 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
} }
type scraper struct { type scraper struct {
sw scrapeWork sw scrapeWork
// stopCh is unblocked when the given scraper must be stopped.
stopCh chan struct{} stopCh chan struct{}
// stoppedCh is unblocked when the given scraper is stopped.
stoppedCh chan struct{}
} }
func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper { func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper {
sc := &scraper{ sc := &scraper{
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
} }
c := newClient(sw) c := newClient(sw)
sc.sw.Config = sw sc.sw.Config = sw