lib/promscrape: follow-up for 393876e52a

- Document the change in docs/CHANGELOG.md
- Reduce memory usage when sending stale markers even more by parsing the response in stream parsing mode
- Update the TestSendStaleSeries

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675
This commit is contained in:
Aliaksandr Valialkin 2023-01-23 21:52:57 -08:00
parent 8e2a8a6ae2
commit 71a170d404
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
3 changed files with 69 additions and 68 deletions

View File

@ -15,8 +15,10 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when sending stale markers for targets, which expose big number of metrics. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675) issues.
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): propagate all the timeout-related errors from `vmstorage` to `vmselect` when `vmstorage`. Previously some timeout errors weren't returned from `vmselect` to `vmstorage`. Instead, `vmstorage` could log the error and close the connection to `vmselect`, so `vmselect` was logging cryptic errors such as `cannot execute funcName="..." on vmstorage "...": EOF`.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add support for time zone selection for older versions of browsers. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3680)
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add support for time zone selection for older versions of browsers. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3680).
## [v1.86.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.86.2)

View File

@ -1,6 +1,7 @@
package promscrape
import (
"bytes"
"flag"
"fmt"
"io"
@ -767,11 +768,6 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int {
var sendStaleSeriesConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
// maxStaleSeriesAtOnce defines the max number of stale series
// to process and send at once. It prevents from excessive memory usage
// when big number of metrics become stale at the same time.
const maxStaleSeriesAtOnce = 1e3
func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp int64, addAutoSeries bool) {
// This function is CPU-bound, while it may allocate big amounts of memory.
// That's why it is a good idea to limit the number of concurrent calls to this function
@ -794,37 +790,44 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
writeRequestCtxPool.Put(wc)
}()
if bodyString != "" {
wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
// Send stale markers in streaming mode in order to reduce memory usage
// when stale markers for targets exposing big number of metrics must be generated.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675
var mu sync.Mutex
br := bytes.NewBufferString(bodyString)
err := parser.ParseStream(br, timestamp, false, func(rows []parser.Row) error {
mu.Lock()
defer mu.Unlock()
for i := range rows {
sw.addRowToTimeseries(wc, &rows[i], timestamp, true)
}
srcRows := wc.rows.Rows
for from := 0; from < len(srcRows); from += maxStaleSeriesAtOnce {
to := from + maxStaleSeriesAtOnce
if to > len(srcRows) {
to = len(srcRows)
}
for i := range srcRows[from:to] {
sw.addRowToTimeseries(wc, &srcRows[i], timestamp, true)
}
// add auto series at the last iteration
if addAutoSeries && to == len(srcRows) {
am := &autoMetrics{}
sw.addAutoMetrics(am, wc, timestamp)
}
// Apply series limit to stale markers in order to prevent sending stale markers for newly created series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
if sw.seriesLimitExceeded {
sw.applySeriesLimit(wc)
}
series := wc.writeRequest.Timeseries
if len(series) == 0 {
continue
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
setStaleMarkersForRows(wc.writeRequest.Timeseries)
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
wc.resetNoRows()
return nil
}, sw.logError)
if err != nil {
sw.logError(fmt.Errorf("cannot send stale markers: %s", err).Error())
}
// Substitute all the values with Prometheus stale markers.
}
if addAutoSeries {
am := &autoMetrics{}
sw.addAutoMetrics(am, wc, timestamp)
}
setStaleMarkersForRows(wc.writeRequest.Timeseries)
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
}
func setStaleMarkersForRows(series []prompbmarshal.TimeSeries) {
for _, tss := range series {
samples := tss.Samples
for i := range samples {
@ -832,9 +835,6 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
}
staleSamplesCreated.Add(len(samples))
}
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
wc.reset()
}
}
var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`)

View File

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
)
@ -687,19 +688,24 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) {
}
func TestSendStaleSeries(t *testing.T) {
f := func(lastScrape, currScrape string, staleMarksExpected int) {
t.Helper()
var sw scrapeWork
sw.Config = &ScrapeWork{
NoStaleMarkers: false,
}
common.StartUnmarshalWorkers()
defer common.StopUnmarshalWorkers()
var timeseriesExpectedN int
var staleMarks int
sw.PushData = func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
t.Helper()
if len(wr.Timeseries) != timeseriesExpectedN {
t.Fatalf("expected to get %d stale series; got %d", timeseriesExpectedN, len(wr.Timeseries))
staleMarks += len(wr.Timeseries)
}
sw.sendStaleSeries(lastScrape, currScrape, 0, false)
if staleMarks != staleMarksExpected {
t.Fatalf("unexpected number of stale marks; got %d; want %d", staleMarks, staleMarksExpected)
}
}
generateScrape := func(n int) string {
w := strings.Builder{}
for i := 0; i < n; i++ {
@ -708,20 +714,13 @@ func TestSendStaleSeries(t *testing.T) {
return w.String()
}
timeseriesExpectedN = 0
sw.sendStaleSeries("", "", 0, false)
timeseriesExpectedN = 0
sw.sendStaleSeries(generateScrape(10), generateScrape(10), 0, false)
timeseriesExpectedN = 10
sw.sendStaleSeries(generateScrape(10), "", 0, false)
timeseriesExpectedN = 5
sw.sendStaleSeries(generateScrape(10), generateScrape(5), 0, false)
timeseriesExpectedN = maxStaleSeriesAtOnce
sw.sendStaleSeries(generateScrape(maxStaleSeriesAtOnce*2), "", 0, false)
f("", "", 0)
f(generateScrape(10), generateScrape(10), 0)
f(generateScrape(10), "", 10)
f("", generateScrape(10), 0)
f(generateScrape(10), generateScrape(3), 7)
f(generateScrape(3), generateScrape(10), 0)
f(generateScrape(20000), generateScrape(10), 19990)
}
func parsePromRow(data string) *parser.Row {