diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3ce5714a0a..428b3f76e0 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -11,7 +11,9 @@ sort: 15 * FEATURE: vmagent: add ability to set `series_limit` option for a particular scrape target via `__series_limit__` label. This allows setting the limit on the number of time series on a per-target basis. See [these docs](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) for details. * FEATURE: vmagent: add ability to set `stream_parse` option for a particular scrape target via `__stream_parse__` label. This allows managing the stream parsing mode on a per-target basis. See [these docs](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) for details. * FEATURE: add new relabeling actions: `keep_metrics` and `drop_metrics`. This simplifies metrics filtering by metric names. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling) for more details. -* FAETURE: allow splitting long `regex` in relabeling filters into an array of shorter regexps, which can be put into multiple lines for better readability and maintainability. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling) for more details. +* FEATURE: allow splitting long `regex` in relabeling filters into an array of shorter regexps, which can be put into multiple lines for better readability and maintainability. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling) for more details. +* FEATURE: vmagent: reduce CPU usage when calculating the number of newly added series per scrape (this number is sent to remote storage in `scrape_series_added` metric). +* FEATURE: vmagent: reduce CPU usage when applying `series_limit` to scrape targets with constant set of metrics. See more information about `series_limit` [here](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter). * BUGFIX: properly handle queries with multiple filters matching empty labels such as `metric{label1=~"foo|",label2="bar|"}`. This filter must match the following series: `metric`, `metric{label1="foo"}`, `metric{label2="bar"}` and `metric{label1="foo",label2="bar"}`. Previously it was matching only `metric{label1="foo",label2="bar"}`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601). * BUGFIX: vmselect: reset connection timeouts after each request to `vmstorage`. This should prevent from `cannot read data in 0.000 seconds: unexpected EOF` warning in logs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1562). Thanks to @mxlxm . diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 93581b773d..364bf487e2 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -6,6 +6,7 @@ import ( "math" "math/bits" "strconv" + "strings" "sync" "time" @@ -178,9 +179,10 @@ type scrapeWork struct { tmpRow parser.Row - // the seriesMap, seriesAdded and labelsHashBuf are used for fast calculation of `scrape_series_added` metric. - seriesMap map[uint64]struct{} - seriesAdded int + // This flag is set to true if series_limit is exceeded. + seriesLimitExceeded bool + + // labelsHashBuf is used for calculating the hash on series labels labelsHashBuf []byte // Optional limiter on the number of unique series per scrape target. @@ -195,7 +197,6 @@ type scrapeWork struct { prevLabelsLen int // lastScrape holds the last response from scrape target. - // It is used for generating Prometheus stale markers. lastScrape []byte } @@ -307,6 +308,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error up := 1 wc := writeRequestCtxPool.Get(sw.prevLabelsLen) bodyString := bytesutil.ToUnsafeString(body.B) + lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) + areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString) if err != nil { up = 0 scrapesFailed.Inc() @@ -327,8 +330,21 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error err = fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) } - sw.updateSeriesAdded(wc) - seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling) + if up == 0 { + bodyString = "" + } + seriesAdded := 0 + if !areIdenticalSeries { + // The returned value for seriesAdded may be bigger than the real number of added series + // if some series were removed during relabeling. + // This is a trade-off between performance and accuracy. + seriesAdded = sw.getSeriesAdded(bodyString) + } + if sw.seriesLimitExceeded || !areIdenticalSeries { + if sw.applySeriesLimit(wc) { + sw.seriesLimitExceeded = true + } + } sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) @@ -340,12 +356,12 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error writeRequestCtxPool.Put(wc) // body must be released only after wc is released, since wc refers to body. sw.prevBodyLen = len(body.B) + if !areIdenticalSeries { + sw.sendStaleSeries(bodyString, scrapeTimestamp, false) + } + sw.lastScrape = append(sw.lastScrape[:0], bodyString...) leveledbytebufferpool.Put(body) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) - if up == 0 { - bodyString = "" - } - sw.sendStaleSeries(bodyString, scrapeTimestamp, false) return err } @@ -383,7 +399,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) } - sw.updateSeriesAdded(wc) sw.pushData(&wc.writeRequest) wc.resetNoRows() return nil @@ -404,12 +419,13 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { } scrapesFailed.Inc() } - seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling) sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) - sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) + // scrape_series_added isn't calculated in streaming mode, + // since it may need unlimited amounts of memory when scraping targets with millions of exposed metrics. + sw.addAutoTimeseries(wc, "scrape_series_added", 0, scrapeTimestamp) sw.pushData(&wc.writeRequest) sw.prevLabelsLen = len(wc.labels) wc.reset() @@ -486,11 +502,16 @@ func (wc *writeRequestCtx) resetNoRows() { var writeRequestCtxPool leveledWriteRequestCtxPool -func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) { - if sw.seriesMap == nil { - sw.seriesMap = make(map[uint64]struct{}, len(wc.writeRequest.Timeseries)) +func (sw *scrapeWork) getSeriesAdded(currScrape string) int { + if currScrape == "" { + return 0 } - m := sw.seriesMap + lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) + bodyString := parser.GetRowsDiff(currScrape, lastScrape) + return strings.Count(bodyString, "\n") +} + +func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) bool { seriesLimit := *seriesLimitPerTarget if sw.Config.SeriesLimit > 0 { seriesLimit = sw.Config.SeriesLimit @@ -499,24 +520,26 @@ func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) { sw.seriesLimiter = bloomfilter.NewLimiter(seriesLimit, 24*time.Hour) } hsl := sw.seriesLimiter + if hsl == nil { + return false + } dstSeries := wc.writeRequest.Timeseries[:0] job := sw.Config.Job() + limitExceeded := false for _, ts := range wc.writeRequest.Timeseries { h := sw.getLabelsHash(ts.Labels) - if hsl != nil && !hsl.Add(h) { + if !hsl.Add(h) { // The limit on the number of hourly unique series per scrape target has been exceeded. // Drop the metric. metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_series_limit_rows_dropped_total{scrape_job_original=%q,scrape_job=%q,scrape_target=%q}`, sw.Config.jobNameOriginal, job, sw.Config.ScrapeURL)).Inc() + limitExceeded = true continue } dstSeries = append(dstSeries, ts) - if _, ok := m[h]; !ok { - m[h] = struct{}{} - sw.seriesAdded++ - } } wc.writeRequest.Timeseries = dstSeries + return limitExceeded } func (sw *scrapeWork) sendStaleSeries(currScrape string, timestamp int64, addAutoSeries bool) { @@ -524,18 +547,11 @@ func (sw *scrapeWork) sendStaleSeries(currScrape string, timestamp int64, addAut return } lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) - if parser.AreIdenticalSeriesFast(lastScrape, currScrape) { - // Fast path: the current scrape contains the same set of series as the previous scrape. - return - } - // Slow path: the current scrape contains different set of series than the previous scrape. - // Detect missing series in the current scrape and send stale markers for them. bodyString := lastScrape if currScrape != "" { - bodyString = parser.GetDiffWithStaleRows(lastScrape, currScrape) + bodyString = parser.GetRowsDiff(lastScrape, currScrape) } - wc := writeRequestCtxPool.Get(sw.prevLabelsLen) - defer writeRequestCtxPool.Put(wc) + wc := &writeRequestCtx{} if bodyString != "" { wc.rows.Unmarshal(bodyString) srcRows := wc.rows.Rows @@ -562,17 +578,6 @@ func (sw *scrapeWork) sendStaleSeries(currScrape string, timestamp int64, addAut } } sw.pushData(&wc.writeRequest) - sw.lastScrape = append(sw.lastScrape[:0], currScrape...) -} - -func (sw *scrapeWork) finalizeSeriesAdded(lastScrapeSize int) int { - seriesAdded := sw.seriesAdded - sw.seriesAdded = 0 - if len(sw.seriesMap) > 4*lastScrapeSize { - // Reset seriesMap, since it occupies more than 4x metrics collected during the last scrape. - sw.seriesMap = make(map[uint64]struct{}, lastScrapeSize) - } - return seriesAdded } func (sw *scrapeWork) getLabelsHash(labels []prompbmarshal.Label) uint64 { diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index b19093c7dd..8b9dbac00e 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -325,7 +325,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped{job="xx",instance="foo.com"} 4 123 scrape_duration_seconds{job="xx",instance="foo.com"} 0 123 scrape_samples_post_metric_relabeling{job="xx",instance="foo.com"} 1 123 - scrape_series_added{job="xx",instance="foo.com"} 1 123 + scrape_series_added{job="xx",instance="foo.com"} 4 123 `) f(` foo{bar="baz"} 34.44 diff --git a/lib/protoparser/prometheus/parser.go b/lib/protoparser/prometheus/parser.go index dd250e1aa4..fec8a972a9 100644 --- a/lib/protoparser/prometheus/parser.go +++ b/lib/protoparser/prometheus/parser.go @@ -366,10 +366,10 @@ func prevBackslashesCount(s string) int { return n } -// GetDiffWithStaleRows returns rows from s1, which are missing in s2. +// GetRowsDiff returns rows from s1, which are missing in s2. // // The returned rows have default value 0 and have no timestamps. -func GetDiffWithStaleRows(s1, s2 string) string { +func GetRowsDiff(s1, s2 string) string { var r1, r2 Rows r1.Unmarshal(s1) r2.Unmarshal(s2) @@ -386,11 +386,8 @@ func GetDiffWithStaleRows(s1, s2 string) string { r := &rows1[i] key := marshalMetricNameWithTags(r) if !m[key] { - logger.Infof("missing %s", key) diff = append(diff, key...) diff = append(diff, " 0\n"...) - } else { - logger.Infof("found %s", key) } } return string(diff) diff --git a/lib/protoparser/prometheus/parser_test.go b/lib/protoparser/prometheus/parser_test.go index 27812c9c49..d2a2baaed1 100644 --- a/lib/protoparser/prometheus/parser_test.go +++ b/lib/protoparser/prometheus/parser_test.go @@ -6,12 +6,12 @@ import ( "testing" ) -func TestGetDiffWithStaleRows(t *testing.T) { +func TestGetRowsDiff(t *testing.T) { f := func(s1, s2, resultExpected string) { t.Helper() - result := GetDiffWithStaleRows(s1, s2) + result := GetRowsDiff(s1, s2) if result != resultExpected { - t.Fatalf("unexpected result for GetDiffWithStaleRows(%q, %q); got %q; want %q", s1, s2, result, resultExpected) + t.Fatalf("unexpected result for GetRowsDiff(%q, %q); got %q; want %q", s1, s2, result, resultExpected) } } f("", "", "")