Revert "app/vmauth: reader pool to reduce gc & mem alloc (#6533)"

This reverts commit 4d66e042e3.

Reasons for revert:

- The commit makes unrelated invalid changes to docs/CHANGELOG.md
- The changes at app/vmauth/main.go are too complex. It is better splitting them into two parts:
  - pooling readTrackingBody struct for reducing pressure on GC
  - avoiding to use readTrackingBody when -maxRequestBodySizeToRetry command-line flag is set to 0

Let's make this in the follow-up commits!

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6445
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533
This commit is contained in:
Aliaksandr Valialkin 2024-07-16 18:59:16 +02:00
parent 9cf187c83b
commit ad6af95183
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
4 changed files with 34 additions and 204 deletions

View File

@ -50,7 +50,7 @@ var (
`Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`)
failTimeout = flag.Duration("failTimeout", 3*time.Second, "Sets a delay period for load balancing to skip a malfunctioning backend")
maxRequestBodySizeToRetry = flagutil.NewBytes("maxRequestBodySizeToRetry", 16*1024, "The maximum request body size, which can be cached and re-tried at other backends. "+
"Bigger values may require more memory. Negative or zero values disable request body caching and retries.")
"Bigger values may require more memory")
backendTLSInsecureSkipVerify = flag.Bool("backend.tlsInsecureSkipVerify", false, "Whether to skip TLS verification when connecting to backends over HTTPS. "+
"See https://docs.victoriametrics.com/vmauth/#backend-tls-setup")
backendTLSCAFile = flag.String("backend.TLSCAFile", "", "Optional path to TLS root CA file, which is used for TLS verification when connecting to backends over HTTPS. "+
@ -200,13 +200,10 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) {
up, hc = ui.DefaultURL, ui.HeadersConf
isDefault = true
}
// caching makes sense only for positive non zero size
if maxRequestBodySizeToRetry.IntN() > 0 {
rtb := getReadTrackingBody(r.Body, int(r.ContentLength))
defer putReadTrackingBody(rtb)
r.Body = rtb
}
maxAttempts := up.getBackendsCount()
r.Body = &readTrackingBody{
r: r.Body,
}
for i := 0; i < maxAttempts; i++ {
bu := up.getBackendURL()
targetURL := bu.url
@ -505,6 +502,9 @@ type readTrackingBody struct {
// bufComplete is set to true when buf contains complete request body read from r.
bufComplete bool
// needReadBuf is set to true when Read() must be performed from buf instead of r.
needReadBuf bool
// offset is an offset at buf for the next data read if needReadBuf is set to true.
offset int
}
@ -512,63 +512,50 @@ type readTrackingBody struct {
// Read implements io.Reader interface
// tracks body reading requests
func (rtb *readTrackingBody) Read(p []byte) (int, error) {
if rtb.offset < len(rtb.buf) {
if rtb.cannotRetry {
return 0, fmt.Errorf("cannot retry reading data from buf")
if rtb.needReadBuf {
if rtb.offset >= len(rtb.buf) {
return 0, io.EOF
}
nb := copy(p, rtb.buf[rtb.offset:])
rtb.offset += nb
if rtb.bufComplete {
if rtb.offset == len(rtb.buf) {
return nb, io.EOF
}
return nb, nil
}
if nb < len(p) {
nr, err := rtb.readFromStream(p[nb:])
return nb + nr, err
}
return nb, nil
n := copy(p, rtb.buf[rtb.offset:])
rtb.offset += n
return n, nil
}
if rtb.bufComplete {
return 0, io.EOF
}
return rtb.readFromStream(p)
}
func (rtb *readTrackingBody) readFromStream(p []byte) (int, error) {
if rtb.r == nil {
return 0, fmt.Errorf("cannot read data after closing the reader")
}
n, err := rtb.r.Read(p)
if rtb.cannotRetry {
return n, err
}
if rtb.offset+n > maxRequestBodySizeToRetry.IntN() {
rtb.cannotRetry = true
}
if n > 0 {
rtb.offset += n
rtb.buf = append(rtb.buf, p[:n]...)
}
if err != nil {
if err == io.EOF {
rtb.bufComplete = true
return n, err
}
if len(rtb.buf)+n > maxRequestBodySizeToRetry.IntN() {
rtb.cannotRetry = true
return n, err
}
return n, nil
rtb.buf = append(rtb.buf, p[:n]...)
if err == io.EOF {
rtb.bufComplete = true
}
return n, err
}
func (rtb *readTrackingBody) canRetry() bool {
return !rtb.cannotRetry
if rtb.cannotRetry {
return false
}
if len(rtb.buf) > 0 && !rtb.needReadBuf {
return false
}
return true
}
// Close implements io.Closer interface.
func (rtb *readTrackingBody) Close() error {
rtb.offset = 0
if rtb.bufComplete {
rtb.needReadBuf = true
}
// Close rtb.r only if the request body is completely read or if it is too big.
// http.Roundtrip performs body.Close call even without any Read calls,
@ -584,38 +571,3 @@ func (rtb *readTrackingBody) Close() error {
return nil
}
var readTrackingBodyPool sync.Pool
func getReadTrackingBody(origin io.ReadCloser, b int) *readTrackingBody {
bufSize := 1024
if b > 0 && b < maxRequestBodySizeToRetry.IntN() {
bufSize = b
}
v := readTrackingBodyPool.Get()
if v == nil {
v = &readTrackingBody{
buf: make([]byte, 0, bufSize),
}
}
rtb := v.(*readTrackingBody)
rtb.r = origin
if bufSize > cap(rtb.buf) {
rtb.buf = make([]byte, 0, bufSize)
}
return rtb
}
func putReadTrackingBody(rtb *readTrackingBody) {
if rtb.r != nil {
_ = rtb.r.Close()
}
rtb.r = nil
rtb.buf = rtb.buf[:0]
rtb.offset = 0
rtb.cannotRetry = false
rtb.bufComplete = false
readTrackingBodyPool.Put(rtb)
}

View File

@ -55,6 +55,9 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) {
if n != 1 {
t.Fatalf("unexpected number of bytes read; got %d; want 1", n)
}
if rtb.canRetry() {
t.Fatalf("canRetry() must return false")
}
data, err := io.ReadAll(rtb)
if err != nil {
t.Fatalf("unexpected error when reading all the data: %s", err)
@ -82,128 +85,6 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) {
f(newTestString(2 * maxRequestBodySizeToRetry.IntN()))
}
// request body not over maxRequestBodySizeToRetry
// 1. When writing data downstream, buf only caches part of the data because the downstream connection is disconnected.
// 2. retry request: because buf caches some data, first read buf and then read stream when retrying
// 3. retry request: the data has been read to buf in the second step. if the request fails, retry to read all buf later.
func TestRetryReadSuccessAfterPartialRead(t *testing.T) {
f := func(s string) {
rtb := &readTrackingBody{
r: io.NopCloser(bytes.NewBufferString(s)),
buf: make([]byte, 0, len(s)),
}
var data []byte
var err error
halfSize := len(s) / 2
if halfSize == 0 {
halfSize = 100
}
buf := make([]byte, halfSize)
var n int
// read part of the data
n, err = rtb.Read(buf[:])
data = append(data, buf[:n]...)
if err != nil && err != io.EOF {
t.Fatalf("unexpected error: %s", err)
}
// request failed when output stream is closed (eg: server connection reset)
// would close the reader
if err := rtb.Close(); err != nil {
t.Fatalf("unexpected error when closing readTrackingBody: %s", err)
}
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true")
}
// retry read (read buf + remaining data)
data = data[:0]
err = nil
for err == nil {
n, err = rtb.Read(buf[:])
data = append(data, buf[:n]...)
}
if err != io.EOF {
t.Fatalf("unexpected error: %s", err)
}
if string(data) != s {
t.Fatalf("unexpected data read; got\n%s\nwant\n%s", data, s)
}
// cannotRetry return false
// because the request data is not over maxRequestBodySizeToRetry limit
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true")
}
}
f("")
f("foo")
f("foobar")
f(newTestString(maxRequestBodySizeToRetry.IntN()))
}
// request body over maxRequestBodySizeToRetry
// 1. When writing data downstream, buf only caches part of the data because the downstream connection is disconnected.
// 2. retry request: because buf caches some data, first read buf and then read stream when retrying
// 3. retry request: the data has been read to buf in the second step. if the request fails, retry to read all buf later.
func TestRetryReadSuccessAfterPartialReadAndCannotRetryAgain(t *testing.T) {
f := func(s string) {
rtb := &readTrackingBody{
r: io.NopCloser(bytes.NewBufferString(s)),
buf: make([]byte, 0, len(s)),
}
var data []byte
var err error
halfSize := len(s) / 2
if halfSize == 0 {
halfSize = 100
}
buf := make([]byte, halfSize)
var n int
// read part of the data
n, err = rtb.Read(buf[:])
data = append(data, buf[:n]...)
if err != nil && err != io.EOF {
t.Fatalf("unexpected error: %s", err)
}
// request failed when output stream is closed (eg: server connection reset)
if err := rtb.Close(); err != nil {
t.Fatalf("unexpected error when closing readTrackingBody: %s", err)
}
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true")
}
// retry read (read buf + remaining data)
data = data[:0]
err = nil
for err == nil {
n, err = rtb.Read(buf[:])
data = append(data, buf[:n]...)
}
if err != io.EOF {
t.Fatalf("unexpected error: %s", err)
}
if string(data) != s {
t.Fatalf("unexpected data read; got\n%s\nwant\n%s", data, s)
}
// cannotRetry returns true
// because the request data is over maxRequestBodySizeToRetry limit
if rtb.canRetry() {
t.Fatalf("canRetry() must return false")
}
}
f(newTestString(maxRequestBodySizeToRetry.IntN() + 1))
f(newTestString(2 * maxRequestBodySizeToRetry.IntN()))
}
func newTestString(sLen int) string {
return string(make([]byte, sLen))
}

View File

@ -38,9 +38,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* SECURITY: upgrade base docker image (Alpine) from 3.20.0 to 3.20.1. See [alpine 3.20.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.20.1-released.html).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow overriding `Host` header with backend host before sending the request to the configured backend. See [these docs](https://docs.victoriametrics.com/vmauth/#modifying-http-headers) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6453)
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details.
* FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): add [Grafana dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmauth.json) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml) for [vmauth](https://docs.victoriametrics.com/vmauth/) dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4313) for details.
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): [`yandexcloud_sd_configs`](https://docs.victoriametrics.com/sd_configs/#yandexcloud_sd_configs): add support for obtaining IAM token in [GCE format](https://yandex.cloud/en-ru/docs/compute/operations/vm-connect/auth-inside-vm#auth-inside-vm) additionally to the [deprecated Amazon EC2 IMDSv1 format](https://yandex.cloud/en/docs/security/standard/authentication#aws-token). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5513).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add `-graphite.sanitizeMetricName` cmd-line flag for sanitizing metrics ingested via [Graphite protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6077).
* FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose the following metrics at `/metrics` page of [vmagent](https://docs.victoriametrics.com/vmagent/) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/):
@ -62,7 +60,6 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmgateway](https://docs.victoriametrics.com/vmgateway/): properly apply read and write based rate limits. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6148) for details.
* BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana).
* BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana).
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Fix the dateMetricIDCache consistency issue that leads to duplicate per-day index entries when new time series are inserted concurrently. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6534) for details.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): fix incorrect redirection in WebUI of vmalert. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6603) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6620).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix input cursor position reset in modal settings. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6530).

View File

@ -1260,7 +1260,7 @@ See the docs at https://docs.victoriametrics.com/vmauth/ .
-maxIdleConnsPerBackend int
The maximum number of idle connections vmauth can open per each backend host. See also -maxConcurrentRequests (default 100)
-maxRequestBodySizeToRetry size
The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory. Negative or zero values disable request body caching and retries.
The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384)
-memory.allowedBytes size
Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage