From ad6af95183a8133c9def59a15320de6b0cd7cfd9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 16 Jul 2024 18:59:16 +0200 Subject: [PATCH] Revert "app/vmauth: reader pool to reduce gc & mem alloc (#6533)" This reverts commit 4d66e042e3d22c6babdfd45b3e32e9073fa59d35. Reasons for revert: - The commit makes unrelated invalid changes to docs/CHANGELOG.md - The changes at app/vmauth/main.go are too complex. It is better splitting them into two parts: - pooling readTrackingBody struct for reducing pressure on GC - avoiding to use readTrackingBody when -maxRequestBodySizeToRetry command-line flag is set to 0 Let's make this in the follow-up commits! Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6445 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533 --- app/vmauth/main.go | 108 ++++++++++------------------------ app/vmauth/main_test.go | 125 +--------------------------------------- docs/CHANGELOG.md | 3 - docs/vmauth.md | 2 +- 4 files changed, 34 insertions(+), 204 deletions(-) diff --git a/app/vmauth/main.go b/app/vmauth/main.go index 91af81cd1..e5b9f5ce1 100644 --- a/app/vmauth/main.go +++ b/app/vmauth/main.go @@ -50,7 +50,7 @@ var ( `Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`) failTimeout = flag.Duration("failTimeout", 3*time.Second, "Sets a delay period for load balancing to skip a malfunctioning backend") maxRequestBodySizeToRetry = flagutil.NewBytes("maxRequestBodySizeToRetry", 16*1024, "The maximum request body size, which can be cached and re-tried at other backends. "+ - "Bigger values may require more memory. Negative or zero values disable request body caching and retries.") + "Bigger values may require more memory") backendTLSInsecureSkipVerify = flag.Bool("backend.tlsInsecureSkipVerify", false, "Whether to skip TLS verification when connecting to backends over HTTPS. "+ "See https://docs.victoriametrics.com/vmauth/#backend-tls-setup") backendTLSCAFile = flag.String("backend.TLSCAFile", "", "Optional path to TLS root CA file, which is used for TLS verification when connecting to backends over HTTPS. "+ @@ -200,13 +200,10 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) { up, hc = ui.DefaultURL, ui.HeadersConf isDefault = true } - // caching makes sense only for positive non zero size - if maxRequestBodySizeToRetry.IntN() > 0 { - rtb := getReadTrackingBody(r.Body, int(r.ContentLength)) - defer putReadTrackingBody(rtb) - r.Body = rtb - } maxAttempts := up.getBackendsCount() + r.Body = &readTrackingBody{ + r: r.Body, + } for i := 0; i < maxAttempts; i++ { bu := up.getBackendURL() targetURL := bu.url @@ -505,6 +502,9 @@ type readTrackingBody struct { // bufComplete is set to true when buf contains complete request body read from r. bufComplete bool + // needReadBuf is set to true when Read() must be performed from buf instead of r. + needReadBuf bool + // offset is an offset at buf for the next data read if needReadBuf is set to true. offset int } @@ -512,63 +512,50 @@ type readTrackingBody struct { // Read implements io.Reader interface // tracks body reading requests func (rtb *readTrackingBody) Read(p []byte) (int, error) { - if rtb.offset < len(rtb.buf) { - if rtb.cannotRetry { - return 0, fmt.Errorf("cannot retry reading data from buf") + if rtb.needReadBuf { + if rtb.offset >= len(rtb.buf) { + return 0, io.EOF } - nb := copy(p, rtb.buf[rtb.offset:]) - rtb.offset += nb - if rtb.bufComplete { - if rtb.offset == len(rtb.buf) { - return nb, io.EOF - } - return nb, nil - } - if nb < len(p) { - nr, err := rtb.readFromStream(p[nb:]) - return nb + nr, err - } - return nb, nil + n := copy(p, rtb.buf[rtb.offset:]) + rtb.offset += n + return n, nil } - if rtb.bufComplete { - return 0, io.EOF - } - return rtb.readFromStream(p) -} -func (rtb *readTrackingBody) readFromStream(p []byte) (int, error) { if rtb.r == nil { return 0, fmt.Errorf("cannot read data after closing the reader") } + n, err := rtb.r.Read(p) if rtb.cannotRetry { return n, err } - if rtb.offset+n > maxRequestBodySizeToRetry.IntN() { - rtb.cannotRetry = true - } - if n > 0 { - rtb.offset += n - rtb.buf = append(rtb.buf, p[:n]...) - } - if err != nil { - if err == io.EOF { - rtb.bufComplete = true - return n, err - } + if len(rtb.buf)+n > maxRequestBodySizeToRetry.IntN() { rtb.cannotRetry = true return n, err } - return n, nil + rtb.buf = append(rtb.buf, p[:n]...) + if err == io.EOF { + rtb.bufComplete = true + } + return n, err } func (rtb *readTrackingBody) canRetry() bool { - return !rtb.cannotRetry + if rtb.cannotRetry { + return false + } + if len(rtb.buf) > 0 && !rtb.needReadBuf { + return false + } + return true } // Close implements io.Closer interface. func (rtb *readTrackingBody) Close() error { rtb.offset = 0 + if rtb.bufComplete { + rtb.needReadBuf = true + } // Close rtb.r only if the request body is completely read or if it is too big. // http.Roundtrip performs body.Close call even without any Read calls, @@ -584,38 +571,3 @@ func (rtb *readTrackingBody) Close() error { return nil } - -var readTrackingBodyPool sync.Pool - -func getReadTrackingBody(origin io.ReadCloser, b int) *readTrackingBody { - bufSize := 1024 - if b > 0 && b < maxRequestBodySizeToRetry.IntN() { - bufSize = b - } - v := readTrackingBodyPool.Get() - if v == nil { - v = &readTrackingBody{ - buf: make([]byte, 0, bufSize), - } - } - rtb := v.(*readTrackingBody) - rtb.r = origin - if bufSize > cap(rtb.buf) { - rtb.buf = make([]byte, 0, bufSize) - } - - return rtb -} - -func putReadTrackingBody(rtb *readTrackingBody) { - if rtb.r != nil { - _ = rtb.r.Close() - } - rtb.r = nil - rtb.buf = rtb.buf[:0] - rtb.offset = 0 - rtb.cannotRetry = false - rtb.bufComplete = false - - readTrackingBodyPool.Put(rtb) -} diff --git a/app/vmauth/main_test.go b/app/vmauth/main_test.go index a7b34d67f..3e7b6d6d3 100644 --- a/app/vmauth/main_test.go +++ b/app/vmauth/main_test.go @@ -55,6 +55,9 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) { if n != 1 { t.Fatalf("unexpected number of bytes read; got %d; want 1", n) } + if rtb.canRetry() { + t.Fatalf("canRetry() must return false") + } data, err := io.ReadAll(rtb) if err != nil { t.Fatalf("unexpected error when reading all the data: %s", err) @@ -82,128 +85,6 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) { f(newTestString(2 * maxRequestBodySizeToRetry.IntN())) } -// request body not over maxRequestBodySizeToRetry -// 1. When writing data downstream, buf only caches part of the data because the downstream connection is disconnected. -// 2. retry request: because buf caches some data, first read buf and then read stream when retrying -// 3. retry request: the data has been read to buf in the second step. if the request fails, retry to read all buf later. -func TestRetryReadSuccessAfterPartialRead(t *testing.T) { - f := func(s string) { - rtb := &readTrackingBody{ - r: io.NopCloser(bytes.NewBufferString(s)), - buf: make([]byte, 0, len(s)), - } - - var data []byte - var err error - halfSize := len(s) / 2 - if halfSize == 0 { - halfSize = 100 - } - buf := make([]byte, halfSize) - var n int - - // read part of the data - n, err = rtb.Read(buf[:]) - data = append(data, buf[:n]...) - if err != nil && err != io.EOF { - t.Fatalf("unexpected error: %s", err) - } - - // request failed when output stream is closed (eg: server connection reset) - // would close the reader - if err := rtb.Close(); err != nil { - t.Fatalf("unexpected error when closing readTrackingBody: %s", err) - } - if !rtb.canRetry() { - t.Fatalf("canRetry() must return true") - } - - // retry read (read buf + remaining data) - data = data[:0] - err = nil - for err == nil { - n, err = rtb.Read(buf[:]) - data = append(data, buf[:n]...) - } - if err != io.EOF { - t.Fatalf("unexpected error: %s", err) - } - if string(data) != s { - t.Fatalf("unexpected data read; got\n%s\nwant\n%s", data, s) - } - // cannotRetry return false - // because the request data is not over maxRequestBodySizeToRetry limit - if !rtb.canRetry() { - t.Fatalf("canRetry() must return true") - } - } - - f("") - f("foo") - f("foobar") - f(newTestString(maxRequestBodySizeToRetry.IntN())) -} - -// request body over maxRequestBodySizeToRetry -// 1. When writing data downstream, buf only caches part of the data because the downstream connection is disconnected. -// 2. retry request: because buf caches some data, first read buf and then read stream when retrying -// 3. retry request: the data has been read to buf in the second step. if the request fails, retry to read all buf later. -func TestRetryReadSuccessAfterPartialReadAndCannotRetryAgain(t *testing.T) { - f := func(s string) { - rtb := &readTrackingBody{ - r: io.NopCloser(bytes.NewBufferString(s)), - buf: make([]byte, 0, len(s)), - } - - var data []byte - var err error - halfSize := len(s) / 2 - if halfSize == 0 { - halfSize = 100 - } - buf := make([]byte, halfSize) - var n int - - // read part of the data - n, err = rtb.Read(buf[:]) - data = append(data, buf[:n]...) - if err != nil && err != io.EOF { - t.Fatalf("unexpected error: %s", err) - } - - // request failed when output stream is closed (eg: server connection reset) - if err := rtb.Close(); err != nil { - t.Fatalf("unexpected error when closing readTrackingBody: %s", err) - } - if !rtb.canRetry() { - t.Fatalf("canRetry() must return true") - } - - // retry read (read buf + remaining data) - data = data[:0] - err = nil - for err == nil { - n, err = rtb.Read(buf[:]) - data = append(data, buf[:n]...) - } - if err != io.EOF { - t.Fatalf("unexpected error: %s", err) - } - if string(data) != s { - t.Fatalf("unexpected data read; got\n%s\nwant\n%s", data, s) - } - - // cannotRetry returns true - // because the request data is over maxRequestBodySizeToRetry limit - if rtb.canRetry() { - t.Fatalf("canRetry() must return false") - } - } - - f(newTestString(maxRequestBodySizeToRetry.IntN() + 1)) - f(newTestString(2 * maxRequestBodySizeToRetry.IntN())) -} - func newTestString(sLen int) string { return string(make([]byte, sLen)) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3a25c3b62..e518f9939 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -38,9 +38,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * SECURITY: upgrade base docker image (Alpine) from 3.20.0 to 3.20.1. See [alpine 3.20.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.20.1-released.html). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow overriding `Host` header with backend host before sending the request to the configured backend. See [these docs](https://docs.victoriametrics.com/vmauth/#modifying-http-headers) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6453) -* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details. * FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): add [Grafana dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmauth.json) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml) for [vmauth](https://docs.victoriametrics.com/vmauth/) dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4313) for details. -* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): [`yandexcloud_sd_configs`](https://docs.victoriametrics.com/sd_configs/#yandexcloud_sd_configs): add support for obtaining IAM token in [GCE format](https://yandex.cloud/en-ru/docs/compute/operations/vm-connect/auth-inside-vm#auth-inside-vm) additionally to the [deprecated Amazon EC2 IMDSv1 format](https://yandex.cloud/en/docs/security/standard/authentication#aws-token). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5513). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add `-graphite.sanitizeMetricName` cmd-line flag for sanitizing metrics ingested via [Graphite protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6077). * FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose the following metrics at `/metrics` page of [vmagent](https://docs.victoriametrics.com/vmagent/) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/): @@ -62,7 +60,6 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmgateway](https://docs.victoriametrics.com/vmgateway/): properly apply read and write based rate limits. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6148) for details. * BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana). -* BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana). * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Fix the dateMetricIDCache consistency issue that leads to duplicate per-day index entries when new time series are inserted concurrently. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6534) for details. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): fix incorrect redirection in WebUI of vmalert. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6603) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6620). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix input cursor position reset in modal settings. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6530). diff --git a/docs/vmauth.md b/docs/vmauth.md index ee36f6209..ffb002754 100644 --- a/docs/vmauth.md +++ b/docs/vmauth.md @@ -1260,7 +1260,7 @@ See the docs at https://docs.victoriametrics.com/vmauth/ . -maxIdleConnsPerBackend int The maximum number of idle connections vmauth can open per each backend host. See also -maxConcurrentRequests (default 100) -maxRequestBodySizeToRetry size - The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory. Negative or zero values disable request body caching and retries. + The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384) -memory.allowedBytes size Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage