diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 0d076fb5f8..d8547c7aaf 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,8 @@ sort: 15 * FEATURE: vmalert: add `-remoteWrite.disablePathAppend` command-line flag, which can be used when custom `-remoteWrite.url` must be specified. For example, `./vmalert -disablePathAppend -remoteWrite.url='http://foo.bar/a/b/c?d=e'` would write data to `http://foo.bar/a/b/c?d=e` instead of `http://foo.bar/a/b/c?d=e/api/v1/write`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1536). +* BUGFIX: vmagent: stop scrapers for deleted targets before starting scrapers for added targets. This should prevent from possible time series overlap when old targets are substituted by new targets (for example, during new deployment in Kubernetes). The overlap could lead to incorrect query results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1509). + ## [v1.64.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.64.0) diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 8509636d6c..20e9502ae5 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -301,6 +301,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) { additionsCount := 0 deletionsCount := 0 swsMap := make(map[string][]prompbmarshal.Label, len(sws)) + var swsToStart []*ScrapeWork for _, sw := range sws { key := sw.key() originalLabels := swsMap[key] @@ -320,33 +321,47 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) { // The scraper for the given key already exists. continue } + swsToStart = append(swsToStart, sw) + } - // Start a scraper for the missing key. + // Stop deleted scrapers before starting new scrapers in order to prevent + // series overlap when old scrape target is substituted by new scrape target. + var stoppedChs []<-chan struct{} + for key, sc := range sg.m { + if _, ok := swsMap[key]; !ok { + close(sc.stopCh) + stoppedChs = append(stoppedChs, sc.stoppedCh) + delete(sg.m, key) + deletionsCount++ + } + } + // Wait until all the deleted scrapers are stopped before starting new scrapers. + for _, ch := range stoppedChs { + <-ch + } + + // Start new scrapers only after the deleted scrapers are stopped. + for _, sw := range swsToStart { sc := newScraper(sw, sg.name, sg.pushData) sg.activeScrapers.Inc() sg.scrapersStarted.Inc() sg.wg.Add(1) tsmGlobal.Register(sw) go func(sw *ScrapeWork) { - defer sg.wg.Done() + defer func() { + sg.wg.Done() + close(sc.stoppedCh) + }() sc.sw.run(sc.stopCh) tsmGlobal.Unregister(sw) sg.activeScrapers.Dec() sg.scrapersStopped.Inc() }(sw) + key := sw.key() sg.m[key] = sc additionsCount++ } - // Stop deleted scrapers, which are missing in sws. - for key, sc := range sg.m { - if _, ok := swsMap[key]; !ok { - close(sc.stopCh) - delete(sg.m, key) - deletionsCount++ - } - } - if additionsCount > 0 || deletionsCount > 0 { sg.changesCount.Add(additionsCount + deletionsCount) logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m)) @@ -354,13 +369,19 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) { } type scraper struct { - sw scrapeWork + sw scrapeWork + + // stopCh is unblocked when the given scraper must be stopped. stopCh chan struct{} + + // stoppedCh is unblocked when the given scraper is stopped. + stoppedCh chan struct{} } func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper { sc := &scraper{ - stopCh: make(chan struct{}), + stopCh: make(chan struct{}), + stoppedCh: make(chan struct{}), } c := newClient(sw) sc.sw.Config = sw