app/vmauth: reader pool to reduce gc & mem alloc (#6533)

follow up https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6446

issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6445

---------

Signed-off-by: f41gh7 <nik@victoriametrics.com>
Co-authored-by: f41gh7 <nik@victoriametrics.com>
This commit is contained in:
LHHDZ 2024-07-02 20:32:32 +08:00 committed by GitHub
parent dd97dd6373
commit 4d66e042e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 207 additions and 37 deletions

View File

@ -50,7 +50,7 @@ var (
`Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`)
failTimeout = flag.Duration("failTimeout", 3*time.Second, "Sets a delay period for load balancing to skip a malfunctioning backend")
maxRequestBodySizeToRetry = flagutil.NewBytes("maxRequestBodySizeToRetry", 16*1024, "The maximum request body size, which can be cached and re-tried at other backends. "+
"Bigger values may require more memory")
"Bigger values may require more memory. Negative or zero values disable request body caching and retries.")
backendTLSInsecureSkipVerify = flag.Bool("backend.tlsInsecureSkipVerify", false, "Whether to skip TLS verification when connecting to backends over HTTPS. "+
"See https://docs.victoriametrics.com/vmauth/#backend-tls-setup")
backendTLSCAFile = flag.String("backend.TLSCAFile", "", "Optional path to TLS root CA file, which is used for TLS verification when connecting to backends over HTTPS. "+
@ -200,10 +200,13 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) {
up, hc = ui.DefaultURL, ui.HeadersConf
isDefault = true
}
maxAttempts := up.getBackendsCount()
r.Body = &readTrackingBody{
r: r.Body,
// caching makes sense only for positive non zero size
if maxRequestBodySizeToRetry.IntN() > 0 {
rtb := getReadTrackingBody(r.Body, int(r.ContentLength))
defer putReadTrackingBody(rtb)
r.Body = rtb
}
maxAttempts := up.getBackendsCount()
for i := 0; i < maxAttempts; i++ {
bu := up.getBackendURL()
targetURL := bu.url
@ -503,9 +506,6 @@ type readTrackingBody struct {
// bufComplete is set to true when buf contains complete request body read from r.
bufComplete bool
// needReadBuf is set to true when Read() must be performed from buf instead of r.
needReadBuf bool
// offset is an offset at buf for the next data read if needReadBuf is set to true.
offset int
}
@ -513,50 +513,63 @@ type readTrackingBody struct {
// Read implements io.Reader interface
// tracks body reading requests
func (rtb *readTrackingBody) Read(p []byte) (int, error) {
if rtb.needReadBuf {
if rtb.offset >= len(rtb.buf) {
return 0, io.EOF
if rtb.offset < len(rtb.buf) {
if rtb.cannotRetry {
return 0, fmt.Errorf("cannot retry reading data from buf")
}
n := copy(p, rtb.buf[rtb.offset:])
rtb.offset += n
return n, nil
nb := copy(p, rtb.buf[rtb.offset:])
rtb.offset += nb
if rtb.bufComplete {
if rtb.offset == len(rtb.buf) {
return nb, io.EOF
}
return nb, nil
}
if nb < len(p) {
nr, err := rtb.readFromStream(p[nb:])
return nb + nr, err
}
return nb, nil
}
if rtb.bufComplete {
return 0, io.EOF
}
return rtb.readFromStream(p)
}
func (rtb *readTrackingBody) readFromStream(p []byte) (int, error) {
if rtb.r == nil {
return 0, fmt.Errorf("cannot read data after closing the reader")
}
n, err := rtb.r.Read(p)
if rtb.cannotRetry {
return n, err
}
if len(rtb.buf)+n > maxRequestBodySizeToRetry.IntN() {
if rtb.offset+n > maxRequestBodySizeToRetry.IntN() {
rtb.cannotRetry = true
}
if n > 0 {
rtb.offset += n
rtb.buf = append(rtb.buf, p[:n]...)
}
if err != nil {
if err == io.EOF {
rtb.bufComplete = true
return n, err
}
rtb.cannotRetry = true
return n, err
}
rtb.buf = append(rtb.buf, p[:n]...)
if err == io.EOF {
rtb.bufComplete = true
}
return n, err
return n, nil
}
func (rtb *readTrackingBody) canRetry() bool {
if rtb.cannotRetry {
return false
}
if len(rtb.buf) > 0 && !rtb.needReadBuf {
return false
}
return true
return !rtb.cannotRetry
}
// Close implements io.Closer interface.
func (rtb *readTrackingBody) Close() error {
rtb.offset = 0
if rtb.bufComplete {
rtb.needReadBuf = true
}
// Close rtb.r only if the request body is completely read or if it is too big.
// http.Roundtrip performs body.Close call even without any Read calls,
@ -572,3 +585,38 @@ func (rtb *readTrackingBody) Close() error {
return nil
}
var readTrackingBodyPool sync.Pool
func getReadTrackingBody(origin io.ReadCloser, b int) *readTrackingBody {
bufSize := 1024
if b > 0 && b < maxRequestBodySizeToRetry.IntN() {
bufSize = b
}
v := readTrackingBodyPool.Get()
if v == nil {
v = &readTrackingBody{
buf: make([]byte, 0, bufSize),
}
}
rtb := v.(*readTrackingBody)
rtb.r = origin
if bufSize > cap(rtb.buf) {
rtb.buf = make([]byte, 0, bufSize)
}
return rtb
}
func putReadTrackingBody(rtb *readTrackingBody) {
if rtb.r != nil {
_ = rtb.r.Close()
}
rtb.r = nil
rtb.buf = rtb.buf[:0]
rtb.offset = 0
rtb.cannotRetry = false
rtb.bufComplete = false
readTrackingBodyPool.Put(rtb)
}

View File

@ -55,9 +55,6 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) {
if n != 1 {
t.Fatalf("unexpected number of bytes read; got %d; want 1", n)
}
if rtb.canRetry() {
t.Fatalf("canRetry() must return false")
}
data, err := io.ReadAll(rtb)
if err != nil {
t.Fatalf("unexpected error when reading all the data: %s", err)
@ -85,6 +82,128 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) {
f(newTestString(2 * maxRequestBodySizeToRetry.IntN()))
}
// request body not over maxRequestBodySizeToRetry
// 1. When writing data downstream, buf only caches part of the data because the downstream connection is disconnected.
// 2. retry request: because buf caches some data, first read buf and then read stream when retrying
// 3. retry request: the data has been read to buf in the second step. if the request fails, retry to read all buf later.
func TestRetryReadSuccessAfterPartialRead(t *testing.T) {
f := func(s string) {
rtb := &readTrackingBody{
r: io.NopCloser(bytes.NewBufferString(s)),
buf: make([]byte, 0, len(s)),
}
var data []byte
var err error
halfSize := len(s) / 2
if halfSize == 0 {
halfSize = 100
}
buf := make([]byte, halfSize)
var n int
// read part of the data
n, err = rtb.Read(buf[:])
data = append(data, buf[:n]...)
if err != nil && err != io.EOF {
t.Fatalf("unexpected error: %s", err)
}
// request failed when output stream is closed (eg: server connection reset)
// would close the reader
if err := rtb.Close(); err != nil {
t.Fatalf("unexpected error when closing readTrackingBody: %s", err)
}
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true")
}
// retry read (read buf + remaining data)
data = data[:0]
err = nil
for err == nil {
n, err = rtb.Read(buf[:])
data = append(data, buf[:n]...)
}
if err != io.EOF {
t.Fatalf("unexpected error: %s", err)
}
if string(data) != s {
t.Fatalf("unexpected data read; got\n%s\nwant\n%s", data, s)
}
// cannotRetry return false
// because the request data is not over maxRequestBodySizeToRetry limit
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true")
}
}
f("")
f("foo")
f("foobar")
f(newTestString(maxRequestBodySizeToRetry.IntN()))
}
// request body over maxRequestBodySizeToRetry
// 1. When writing data downstream, buf only caches part of the data because the downstream connection is disconnected.
// 2. retry request: because buf caches some data, first read buf and then read stream when retrying
// 3. retry request: the data has been read to buf in the second step. if the request fails, retry to read all buf later.
func TestRetryReadSuccessAfterPartialReadAndCannotRetryAgain(t *testing.T) {
f := func(s string) {
rtb := &readTrackingBody{
r: io.NopCloser(bytes.NewBufferString(s)),
buf: make([]byte, 0, len(s)),
}
var data []byte
var err error
halfSize := len(s) / 2
if halfSize == 0 {
halfSize = 100
}
buf := make([]byte, halfSize)
var n int
// read part of the data
n, err = rtb.Read(buf[:])
data = append(data, buf[:n]...)
if err != nil && err != io.EOF {
t.Fatalf("unexpected error: %s", err)
}
// request failed when output stream is closed (eg: server connection reset)
if err := rtb.Close(); err != nil {
t.Fatalf("unexpected error when closing readTrackingBody: %s", err)
}
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true")
}
// retry read (read buf + remaining data)
data = data[:0]
err = nil
for err == nil {
n, err = rtb.Read(buf[:])
data = append(data, buf[:n]...)
}
if err != io.EOF {
t.Fatalf("unexpected error: %s", err)
}
if string(data) != s {
t.Fatalf("unexpected data read; got\n%s\nwant\n%s", data, s)
}
// cannotRetry returns true
// because the request data is over maxRequestBodySizeToRetry limit
if rtb.canRetry() {
t.Fatalf("canRetry() must return false")
}
}
f(newTestString(maxRequestBodySizeToRetry.IntN() + 1))
f(newTestString(2 * maxRequestBodySizeToRetry.IntN()))
}
func newTestString(sLen int) string {
return string(make([]byte, sLen))
}

View File

@ -32,17 +32,20 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
**Update note 1: support for snap packages was removed due to lack of interest from community. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6543) for details. Please read about supported package types [here](https://docs.victoriametrics.com/#install).**
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow overriding `Host` header with a target host before sending to a downstream. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6453)
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details.
* FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): add [Grafana dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmauth.json) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml) for [vmauth](https://docs.victoriametrics.com/vmauth/) dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4313) for details.
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): added `yandexcloud_sd` AWS API IMDSv2 support.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): expose metrics related to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/):
* `vm_streamaggr_matched_samples_total` - shows the number of samples matched by the aggregation rule;
* `vm_streamaggr_flushed_samples_total` - shows the number of samples produced by the aggregation rule;
* `vm_streamaggr_samples_lag_seconds` - shows the max lag between samples timestamps within one batch received by the aggregation;
* `vm_streamaggr_matched_samples_total` - shows the number of samples matched by the aggregation rule;
* `vm_streamaggr_flushed_samples_total` - shows the number of samples produced by the aggregation rule;
* `vm_streamaggr_samples_lag_seconds` - shows the max lag between samples timestamps within one batch received by the aggregation;
* `vm_streamaggr_stale_samples_total` - shows the number of time series that became [stale](https://docs.victoriametrics.com/stream-aggregation/#staleness) during aggregation;
* metrics related to stream aggregation got additional labels `match` (matching param), `group` (`by` or `without` param), `url` (address of `remoteWrite.url` where aggregation is applied), `position` (the position of the aggregation rule in config file).
* These and other metrics were reflected on the [vmagent dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmagent.json) in `stream aggregation` section.
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): do not retry RPC calls to vmstorage nodes if [complexity limits](https://docs.victoriametrics.com/#resource-usage-limits) were exceeded.
* BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana).
* BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana).
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Fix the dateMetricIDCache consistency issue that leads to duplicate per-day index entries when new time series are inserted concurrently. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6534) for details.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix input cursor position reset in modal settings. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6530).

View File

@ -1260,7 +1260,7 @@ See the docs at https://docs.victoriametrics.com/vmauth/ .
-maxIdleConnsPerBackend int
The maximum number of idle connections vmauth can open per each backend host. See also -maxConcurrentRequests (default 100)
-maxRequestBodySizeToRetry size
The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory
The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory. Negative or zero values disable request body caching and retries.
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384)
-memory.allowedBytes size
Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage