diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 978f099200..55713dd8d0 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -28,6 +28,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): prevent disabling state updates tracking per rule via setting values < 1. The minimum number of update states to track is now set to 1. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly update `debug` and `update_entries_limit` rule's params on config's hot-reload. * BUGFIX: properly initialize the `vm_concurrent_insert_current` metric before exposing it. Previously this metric could be left uninitialized in some cases, e.g. its value was zero. This could lead to false alerts for the query `avg_over_time(vm_concurrent_insert_current[1m]) >= vm_concurrent_insert_capacity`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3761). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): immediately cancel in-flight scrape requests during configuration reload when using [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously `vmagent` could wait for long time until all the in-flight requests are completed before reloading the configuration. This could significantly slow down configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3747). ## [v1.87.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.0) diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index 835c08e050..185c693ad0 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -43,6 +43,7 @@ type client struct { // It may be useful for scraping targets with millions of metrics per target. sc *http.Client + ctx context.Context scrapeURL string scrapeTimeoutSecondsStr string hostPort string @@ -77,7 +78,7 @@ func concatTwoStrings(x, y string) string { return s } -func newClient(sw *ScrapeWork) *client { +func newClient(sw *ScrapeWork, ctx context.Context) *client { var u fasthttp.URI u.Update(sw.ScrapeURL) hostPort := string(u.Host()) @@ -165,6 +166,7 @@ func newClient(sw *ScrapeWork) *client { } return &client{ hc: hc, + ctx: ctx, sc: sc, scrapeURL: sw.ScrapeURL, scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()), @@ -182,7 +184,7 @@ func newClient(sw *ScrapeWork) *client { func (c *client) GetStreamReader() (*streamReader, error) { deadline := time.Now().Add(c.sc.Timeout) - ctx, cancel := context.WithDeadline(context.Background(), deadline) + ctx, cancel := context.WithDeadline(c.ctx, deadline) req, err := http.NewRequestWithContext(ctx, "GET", c.scrapeURL, nil) if err != nil { cancel() diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 7f72c21a89..0686e47c25 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -2,6 +2,7 @@ package promscrape import ( "bytes" + "context" "flag" "fmt" "io" @@ -341,7 +342,7 @@ func newScraperGroup(name string, pushData func(at *auth.Token, wr *prompbmarsha func (sg *scraperGroup) stop() { sg.mLock.Lock() for _, sc := range sg.m { - close(sc.stopCh) + sc.cancel() } sg.m = nil sg.mLock.Unlock() @@ -383,7 +384,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) { var stoppedChs []<-chan struct{} for key, sc := range sg.m { if _, ok := swsMap[key]; !ok { - close(sc.stopCh) + sc.cancel() stoppedChs = append(stoppedChs, sc.stoppedCh) delete(sg.m, key) deletionsCount++ @@ -406,7 +407,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) { sg.wg.Done() close(sc.stoppedCh) }() - sc.sw.run(sc.stopCh, sg.globalStopCh) + sc.sw.run(sc.ctx.Done(), sg.globalStopCh) tsmGlobal.Unregister(&sc.sw) sg.activeScrapers.Dec() sg.scrapersStopped.Inc() @@ -425,19 +426,21 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) { type scraper struct { sw scrapeWork - // stopCh is unblocked when the given scraper must be stopped. - stopCh chan struct{} + ctx context.Context + cancel context.CancelFunc // stoppedCh is unblocked when the given scraper is stopped. stoppedCh chan struct{} } func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) *scraper { + ctx, cancel := context.WithCancel(context.Background()) sc := &scraper{ - stopCh: make(chan struct{}), + ctx: ctx, + cancel: cancel, stoppedCh: make(chan struct{}), } - c := newClient(sw) + c := newClient(sw, ctx) sc.sw.Config = sw sc.sw.ScrapeGroup = group sc.sw.ReadData = c.ReadData