lib/promscrape: follow-up for 97373b7786

Substitute O(N^2) algorithm for exposing the `vm_promscrape_scrape_pool_targets` metric
with O(N) algorithm, where N is the number of scrape jobs. The previous algorithm could slow down
/metrics exposition significantly when -promscrape.config contains thousands of scrape jobs.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5311
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5335
This commit is contained in:
Aliaksandr Valialkin 2023-12-06 17:31:25 +02:00
parent 509339bf63
commit 8b6bce61e4
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
3 changed files with 71 additions and 49 deletions

View File

@ -37,7 +37,7 @@ The sandbox cluster installation is running under the constant load generated by
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): show all the dropped targets together with the reason why they are dropped at `http://vmagent:8429/service-discovery` page. Previously targets, which were dropped because of [target sharding](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets) weren't displayed on this page. This could complicate service discovery debugging. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5389). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): show all the dropped targets together with the reason why they are dropped at `http://vmagent:8429/service-discovery` page. Previously targets, which were dropped because of [target sharding](https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets) weren't displayed on this page. This could complicate service discovery debugging. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5389).
* FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format). * FEATURE: reduce the default value for `-import.maxLineLen` command-line flag from 100MB to 10MB in order to prevent excessive memory usage during data import via [/api/v1/import](https://docs.victoriametrics.com/#how-to-import-data-in-json-line-format).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `keep_if_contains` and `drop_if_contains` relabeling actions. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) for details. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `keep_if_contains` and `drop_if_contains` relabeling actions. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): export `vm_promscrape_scrape_pool_targets` metric to track the number of targets that each scrape_job discovers. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5311). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): export `vm_promscrape_scrape_pool_targets` [metric](https://docs.victoriametrics.com/vmagent.html#monitoring) to track the number of targets each scrape job discovers. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5311).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): provide `/vmalert/api/v1/rule` and `/api/v1/rule` API endpoints to get the rule object in JSON format. See [these docs](https://docs.victoriametrics.com/vmalert.html#web) for details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): provide `/vmalert/api/v1/rule` and `/api/v1/rule` API endpoints to get the rule object in JSON format. See [these docs](https://docs.victoriametrics.com/vmalert.html#web) for details.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add [day_of_year()](https://docs.victoriametrics.com/MetricsQL.html#day_of_year) function, which returns the day of the year for each of the given unix timestamps. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5345) for details. Thanks to @luckyxiaoqiang for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5368/). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add [day_of_year()](https://docs.victoriametrics.com/MetricsQL.html#day_of_year) function, which returns the day of the year for each of the given unix timestamps. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5345) for details. Thanks to @luckyxiaoqiang for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5368/).
* FEATURE: all VictoriaMetrics binaries: expose additional metrics at `/metrics` page, which may simplify debugging of VictoriaMetrics components (see [this feature request](https://github.com/VictoriaMetrics/metrics/issues/54)): * FEATURE: all VictoriaMetrics binaries: expose additional metrics at `/metrics` page, which may simplify debugging of VictoriaMetrics components (see [this feature request](https://github.com/VictoriaMetrics/metrics/issues/54)):

View File

@ -104,9 +104,11 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) {
} }
timestamp := int64(123000) timestamp := int64(123000)
tsmGlobal.Register(&sw)
if err := sw.scrapeInternal(timestamp, timestamp); err == nil { if err := sw.scrapeInternal(timestamp, timestamp); err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
} }
tsmGlobal.Unregister(&sw)
if pushDataErr != nil { if pushDataErr != nil {
t.Fatalf("unexpected error: %s", pushDataErr) t.Fatalf("unexpected error: %s", pushDataErr)
} }
@ -152,11 +154,13 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) {
} }
timestamp := int64(123000) timestamp := int64(123000)
tsmGlobal.Register(&sw)
if err := sw.scrapeInternal(timestamp, timestamp); err != nil { if err := sw.scrapeInternal(timestamp, timestamp); err != nil {
if !strings.Contains(err.Error(), "sample_limit") { if !strings.Contains(err.Error(), "sample_limit") {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
} }
tsmGlobal.Unregister(&sw)
if pushDataErr != nil { if pushDataErr != nil {
t.Fatalf("unexpected error: %s", pushDataErr) t.Fatalf("unexpected error: %s", pushDataErr)
} }

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"unsafe" "unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -88,91 +89,108 @@ type targetStatusMap struct {
mu sync.Mutex mu sync.Mutex
m map[*scrapeWork]*targetStatus m map[*scrapeWork]*targetStatus
jobNames []string jobNames []string
// the current number of `up` targets in the given jobName
upByJob map[string]int
// the current number of `down` targets in the given jobName
downByJob map[string]int
} }
func newTargetStatusMap() *targetStatusMap { func newTargetStatusMap() *targetStatusMap {
return &targetStatusMap{ return &targetStatusMap{
m: make(map[*scrapeWork]*targetStatus), m: make(map[*scrapeWork]*targetStatus),
upByJob: make(map[string]int),
downByJob: make(map[string]int),
} }
} }
func (tsm *targetStatusMap) Reset() {
tsm.mu.Lock()
tsm.m = make(map[*scrapeWork]*targetStatus)
tsm.mu.Unlock()
}
func (tsm *targetStatusMap) registerJobNames(jobNames []string) { func (tsm *targetStatusMap) registerJobNames(jobNames []string) {
tsm.mu.Lock() tsm.mu.Lock()
tsm.registerJobsMetrics(tsm.jobNames, jobNames) tsm.registerJobsMetricsLocked(tsm.jobNames, jobNames)
tsm.jobNames = append(tsm.jobNames[:0], jobNames...) tsm.jobNames = append(tsm.jobNames[:0], jobNames...)
tsm.mu.Unlock() tsm.mu.Unlock()
} }
// registerJobsMetrics registers metrics for new jobs and unregisterMetric metrics for removed jobs // registerJobsMetricsLocked registers metrics for new jobs and unregisters metrics for removed jobs
func (tsm *targetStatusMap) registerJobsMetrics(prevJobNames, currentJobNames []string) { //
prevName := make(map[string]struct{}, len(prevJobNames)) // tsm.mu must be locked when calling this function.
currentName := make(map[string]struct{}, len(currentJobNames)) func (tsm *targetStatusMap) registerJobsMetricsLocked(prevJobNames, currentJobNames []string) {
for _, n := range currentJobNames { prevNames := make(map[string]struct{}, len(prevJobNames))
currentName[n] = struct{}{} currentNames := make(map[string]struct{}, len(currentJobNames))
for _, jobName := range currentJobNames {
currentNames[jobName] = struct{}{}
} }
for _, n := range prevJobNames { for _, jobName := range prevJobNames {
prevName[n] = struct{}{} prevNames[jobName] = struct{}{}
if _, ok := currentName[n]; !ok { if _, ok := currentNames[jobName]; !ok {
metrics.UnregisterMetric(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="up"}`, n)) metrics.UnregisterMetric(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="up"}`, jobName))
metrics.UnregisterMetric(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="down"}`, n)) metrics.UnregisterMetric(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="down"}`, jobName))
} }
} }
for _, n := range currentJobNames { for _, jobName := range currentJobNames {
if _, ok := prevName[n]; !ok { if _, ok := prevNames[jobName]; ok {
n := n continue
_ = metrics.NewGauge(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="up"}`, n), func() float64 {
jobStatus := tsm.getTargetsStatusByJob(&requestFilter{
originalJobName: n,
})
var up float64
for _, status := range jobStatus.jobTargetsStatuses {
up = +float64(status.upCount)
} }
return up jobNameLocal := jobName
_ = metrics.NewGauge(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="up"}`, jobName), func() float64 {
tsm.mu.Lock()
n := tsm.upByJob[jobNameLocal]
tsm.mu.Unlock()
return float64(n)
}) })
_ = metrics.NewGauge(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="down"}`, n), func() float64 { _ = metrics.NewGauge(fmt.Sprintf(`vm_promscrape_scrape_pool_targets{scrape_job=%q, status="down"}`, jobName), func() float64 {
jobStatus := tsm.getTargetsStatusByJob(&requestFilter{ tsm.mu.Lock()
originalJobName: n, n := tsm.downByJob[jobNameLocal]
tsm.mu.Unlock()
return float64(n)
}) })
var down float64
for _, status := range jobStatus.jobTargetsStatuses {
down = +float64(status.targetsTotal - status.upCount)
}
return down
})
}
} }
} }
func (tsm *targetStatusMap) Register(sw *scrapeWork) { func (tsm *targetStatusMap) Register(sw *scrapeWork) {
jobName := sw.Config.jobNameOriginal
tsm.mu.Lock() tsm.mu.Lock()
tsm.m[sw] = &targetStatus{ tsm.m[sw] = &targetStatus{
sw: sw, sw: sw,
} }
tsm.downByJob[jobName]++
tsm.mu.Unlock() tsm.mu.Unlock()
} }
func (tsm *targetStatusMap) Unregister(sw *scrapeWork) { func (tsm *targetStatusMap) Unregister(sw *scrapeWork) {
jobName := sw.Config.jobNameOriginal
tsm.mu.Lock() tsm.mu.Lock()
ts, ok := tsm.m[sw]
if !ok {
logger.Panicf("BUG: missing Register() call for the target %q", jobName)
}
if ts.up {
tsm.upByJob[jobName]--
} else {
tsm.downByJob[jobName]--
}
delete(tsm.m, sw) delete(tsm.m, sw)
tsm.mu.Unlock() tsm.mu.Unlock()
} }
func (tsm *targetStatusMap) Update(sw *scrapeWork, up bool, scrapeTime, scrapeDuration int64, samplesScraped int, err error) { func (tsm *targetStatusMap) Update(sw *scrapeWork, up bool, scrapeTime, scrapeDuration int64, samplesScraped int, err error) {
jobName := sw.Config.jobNameOriginal
tsm.mu.Lock() tsm.mu.Lock()
ts := tsm.m[sw] ts, ok := tsm.m[sw]
if ts == nil { if !ok {
ts = &targetStatus{ logger.Panicf("BUG: missing Register() call for the target %q", jobName)
sw: sw,
} }
tsm.m[sw] = ts if up && !ts.up {
tsm.upByJob[jobName]++
tsm.downByJob[jobName]--
} else if !up && ts.up {
tsm.upByJob[jobName]--
tsm.downByJob[jobName]++
} }
ts.up = up ts.up = up
ts.scrapeTime = scrapeTime ts.scrapeTime = scrapeTime