diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 6bc7279801..bd1b5b8030 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -37,7 +37,7 @@ The sandbox cluster installation is running under the constant load generated by * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): show all the dropped targets together with the reason why they are dropped at `http://vmagent:8429/service-discovery` page. Previously targets, which were dropped because of [target sharding](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets) weren't displayed on this page. This could complicate service discovery debugging. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5389). * FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `keep_if_contains` and `drop_if_contains` relabeling actions. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) for details. -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): export `vm_promscrape_scrape_pool_targets` metric to track the number of targets that each scrape_job discovers. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5311). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): export `vm_promscrape_scrape_pool_targets` [metric](https://docs.victoriametrics.com/vmagent.html#monitoring) to track the number of targets each scrape job discovers. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5311). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): provide `/vmalert/api/v1/rule` and `/api/v1/rule` API endpoints to get the rule object in JSON format. See [these docs](https://docs.victoriametrics.com/vmalert.html#web) for details. * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add [day_of_year()](https://docs.victoriametrics.com/MetricsQL.html#day_of_year) function, which returns the day of the year for each of the given unix timestamps. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5345) for details. Thanks to @luckyxiaoqiang for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5368/). * FEATURE: all VictoriaMetrics binaries: expose additional metrics at `/metrics` page, which may simplify debugging of VictoriaMetrics components (see [this feature request](https://github.com/VictoriaMetrics/metrics/issues/54)): diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index c454715284..11c31473a7 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -104,9 +104,11 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) { } timestamp := int64(123000) + tsmGlobal.Register(&sw) if err := sw.scrapeInternal(timestamp, timestamp); err == nil { t.Fatalf("expecting non-nil error") } + tsmGlobal.Unregister(&sw) if pushDataErr != nil { t.Fatalf("unexpected error: %s", pushDataErr) } @@ -152,11 +154,13 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { } timestamp := int64(123000) + tsmGlobal.Register(&sw) if err := sw.scrapeInternal(timestamp, timestamp); err != nil { if !strings.Contains(err.Error(), "sample_limit") { t.Fatalf("unexpected error: %s", err) } } + tsmGlobal.Unregister(&sw) if pushDataErr != nil { t.Fatalf("unexpected error: %s", pushDataErr) } diff --git a/lib/promscrape/targetstatus.go b/lib/promscrape/targetstatus.go index e2c406ad07..a4051bdccd 100644 --- a/lib/promscrape/targetstatus.go +++ b/lib/promscrape/targetstatus.go @@ -13,6 +13,7 @@ import ( "time" "unsafe" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/metrics" @@ -88,91 +89,108 @@ type targetStatusMap struct { mu sync.Mutex m map[*scrapeWork]*targetStatus jobNames []string + + // the current number of `up` targets in the given jobName + upByJob map[string]int + + // the current number of `down` targets in the given jobName + downByJob map[string]int } func newTargetStatusMap() *targetStatusMap { return &targetStatusMap{ - m: make(map[*scrapeWork]*targetStatus), + m: make(map[*scrapeWork]*targetStatus), + upByJob: make(map[string]int), + downByJob: make(map[string]int), } } -func (tsm *targetStatusMap) Reset() { - tsm.mu.Lock() - tsm.m = make(map[*scrapeWork]*targetStatus) - tsm.mu.Unlock() -} - func (tsm *targetStatusMap) registerJobNames(jobNames []string) { tsm.mu.Lock() - tsm.registerJobsMetrics(tsm.jobNames, jobNames) + tsm.registerJobsMetricsLocked(tsm.jobNames, jobNames) tsm.jobNames = append(tsm.jobNames[:0], jobNames...) tsm.mu.Unlock() } -// registerJobsMetrics registers metrics for new jobs and unregisterMetric metrics for removed jobs -func (tsm *targetStatusMap) registerJobsMetrics(prevJobNames, currentJobNames []string) { - prevName := make(map[string]struct{}, len(prevJobNames)) - currentName := make(map[string]struct{}, len(currentJobNames)) - for _, n := range currentJobNames { - currentName[n] = struct{}{} +// registerJobsMetricsLocked registers metrics for new jobs and unregisters metrics for removed jobs +// +// tsm.mu must be locked when calling this function. +func (tsm *targetStatusMap) registerJobsMetricsLocked(prevJobNames, currentJobNames []string) { + prevNames := make(map[string]struct{}, len(prevJobNames)) + currentNames := make(map[string]struct{}, len(currentJobNames)) + for _, jobName := range currentJobNames { + currentNames[jobName] = struct{}{} } - for _, n := range prevJobNames { - prevName[n] = struct{}{} - if _, ok := currentName[n]; !ok { - metrics.UnregisterMetric(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="up"}`, n)) - metrics.UnregisterMetric(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="down"}`, n)) + for _, jobName := range prevJobNames { + prevNames[jobName] = struct{}{} + if _, ok := currentNames[jobName]; !ok { + metrics.UnregisterMetric(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="up"}`, jobName)) + metrics.UnregisterMetric(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="down"}`, jobName)) } } - for _, n := range currentJobNames { - if _, ok := prevName[n]; !ok { - n := n - _ = metrics.NewGauge(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="up"}`, n), func() float64 { - jobStatus := tsm.getTargetsStatusByJob(&requestFilter{ - originalJobName: n, - }) - var up float64 - for _, status := range jobStatus.jobTargetsStatuses { - up = +float64(status.upCount) - } - return up - }) - _ = metrics.NewGauge(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="down"}`, n), func() float64 { - jobStatus := tsm.getTargetsStatusByJob(&requestFilter{ - originalJobName: n, - }) - var down float64 - for _, status := range jobStatus.jobTargetsStatuses { - down = +float64(status.targetsTotal - status.upCount) - } - return down - }) + for _, jobName := range currentJobNames { + if _, ok := prevNames[jobName]; ok { + continue } + jobNameLocal := jobName + _ = metrics.NewGauge(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="up"}`, jobName), func() float64 { + tsm.mu.Lock() + n := tsm.upByJob[jobNameLocal] + tsm.mu.Unlock() + return float64(n) + }) + _ = metrics.NewGauge(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="down"}`, jobName), func() float64 { + tsm.mu.Lock() + n := tsm.downByJob[jobNameLocal] + tsm.mu.Unlock() + return float64(n) + }) } } func (tsm *targetStatusMap) Register(sw *scrapeWork) { + jobName := sw.Config.jobNameOriginal + tsm.mu.Lock() tsm.m[sw] = &targetStatus{ sw: sw, } + tsm.downByJob[jobName]++ tsm.mu.Unlock() } func (tsm *targetStatusMap) Unregister(sw *scrapeWork) { + jobName := sw.Config.jobNameOriginal + tsm.mu.Lock() + ts, ok := tsm.m[sw] + if !ok { + logger.Panicf("BUG: missing Register() call for the target %q", jobName) + } + if ts.up { + tsm.upByJob[jobName]-- + } else { + tsm.downByJob[jobName]-- + } delete(tsm.m, sw) tsm.mu.Unlock() } func (tsm *targetStatusMap) Update(sw *scrapeWork, up bool, scrapeTime, scrapeDuration int64, samplesScraped int, err error) { + jobName := sw.Config.jobNameOriginal + tsm.mu.Lock() - ts := tsm.m[sw] - if ts == nil { - ts = &targetStatus{ - sw: sw, - } - tsm.m[sw] = ts + ts, ok := tsm.m[sw] + if !ok { + logger.Panicf("BUG: missing Register() call for the target %q", jobName) + } + if up && !ts.up { + tsm.upByJob[jobName]++ + tsm.downByJob[jobName]-- + } else if !up && ts.up { + tsm.upByJob[jobName]-- + tsm.downByJob[jobName]++ } ts.up = up ts.scrapeTime = scrapeTime