diff --git a/app/vmauth/main.go b/app/vmauth/main.go index ddc8a5dff..04f949de9 100644 --- a/app/vmauth/main.go +++ b/app/vmauth/main.go @@ -50,7 +50,7 @@ var ( `Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`) failTimeout = flag.Duration("failTimeout", 3*time.Second, "Sets a delay period for load balancing to skip a malfunctioning backend") maxRequestBodySizeToRetry = flagutil.NewBytes("maxRequestBodySizeToRetry", 16*1024, "The maximum request body size, which can be cached and re-tried at other backends. "+ - "Bigger values may require more memory") + "Bigger values may require more memory. Negative or zero values disable request body caching and retries.") backendTLSInsecureSkipVerify = flag.Bool("backend.tlsInsecureSkipVerify", false, "Whether to skip TLS verification when connecting to backends over HTTPS. "+ "See https://docs.victoriametrics.com/vmauth/#backend-tls-setup") backendTLSCAFile = flag.String("backend.TLSCAFile", "", "Optional path to TLS root CA file, which is used for TLS verification when connecting to backends over HTTPS. "+ @@ -200,10 +200,13 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) { up, hc = ui.DefaultURL, ui.HeadersConf isDefault = true } - maxAttempts := up.getBackendsCount() - r.Body = &readTrackingBody{ - r: r.Body, + // caching makes sense only for positive non zero size + if maxRequestBodySizeToRetry.IntN() > 0 { + rtb := getReadTrackingBody(r.Body, int(r.ContentLength)) + defer putReadTrackingBody(rtb) + r.Body = rtb } + maxAttempts := up.getBackendsCount() for i := 0; i < maxAttempts; i++ { bu := up.getBackendURL() targetURL := bu.url @@ -503,9 +506,6 @@ type readTrackingBody struct { // bufComplete is set to true when buf contains complete request body read from r. bufComplete bool - // needReadBuf is set to true when Read() must be performed from buf instead of r. - needReadBuf bool - // offset is an offset at buf for the next data read if needReadBuf is set to true. offset int } @@ -513,50 +513,63 @@ type readTrackingBody struct { // Read implements io.Reader interface // tracks body reading requests func (rtb *readTrackingBody) Read(p []byte) (int, error) { - if rtb.needReadBuf { - if rtb.offset >= len(rtb.buf) { - return 0, io.EOF + if rtb.offset < len(rtb.buf) { + if rtb.cannotRetry { + return 0, fmt.Errorf("cannot retry reading data from buf") } - n := copy(p, rtb.buf[rtb.offset:]) - rtb.offset += n - return n, nil + nb := copy(p, rtb.buf[rtb.offset:]) + rtb.offset += nb + if rtb.bufComplete { + if rtb.offset == len(rtb.buf) { + return nb, io.EOF + } + return nb, nil + } + if nb < len(p) { + nr, err := rtb.readFromStream(p[nb:]) + return nb + nr, err + } + return nb, nil } + if rtb.bufComplete { + return 0, io.EOF + } + return rtb.readFromStream(p) +} +func (rtb *readTrackingBody) readFromStream(p []byte) (int, error) { if rtb.r == nil { return 0, fmt.Errorf("cannot read data after closing the reader") } - n, err := rtb.r.Read(p) if rtb.cannotRetry { return n, err } - if len(rtb.buf)+n > maxRequestBodySizeToRetry.IntN() { + if rtb.offset+n > maxRequestBodySizeToRetry.IntN() { + rtb.cannotRetry = true + } + if n > 0 { + rtb.offset += n + rtb.buf = append(rtb.buf, p[:n]...) + } + if err != nil { + if err == io.EOF { + rtb.bufComplete = true + return n, err + } rtb.cannotRetry = true return n, err } - rtb.buf = append(rtb.buf, p[:n]...) - if err == io.EOF { - rtb.bufComplete = true - } - return n, err + return n, nil } func (rtb *readTrackingBody) canRetry() bool { - if rtb.cannotRetry { - return false - } - if len(rtb.buf) > 0 && !rtb.needReadBuf { - return false - } - return true + return !rtb.cannotRetry } // Close implements io.Closer interface. func (rtb *readTrackingBody) Close() error { rtb.offset = 0 - if rtb.bufComplete { - rtb.needReadBuf = true - } // Close rtb.r only if the request body is completely read or if it is too big. // http.Roundtrip performs body.Close call even without any Read calls, @@ -572,3 +585,38 @@ func (rtb *readTrackingBody) Close() error { return nil } + +var readTrackingBodyPool sync.Pool + +func getReadTrackingBody(origin io.ReadCloser, b int) *readTrackingBody { + bufSize := 1024 + if b > 0 && b < maxRequestBodySizeToRetry.IntN() { + bufSize = b + } + v := readTrackingBodyPool.Get() + if v == nil { + v = &readTrackingBody{ + buf: make([]byte, 0, bufSize), + } + } + rtb := v.(*readTrackingBody) + rtb.r = origin + if bufSize > cap(rtb.buf) { + rtb.buf = make([]byte, 0, bufSize) + } + + return rtb +} + +func putReadTrackingBody(rtb *readTrackingBody) { + if rtb.r != nil { + _ = rtb.r.Close() + } + rtb.r = nil + rtb.buf = rtb.buf[:0] + rtb.offset = 0 + rtb.cannotRetry = false + rtb.bufComplete = false + + readTrackingBodyPool.Put(rtb) +} diff --git a/app/vmauth/main_test.go b/app/vmauth/main_test.go index 3e7b6d6d3..a7b34d67f 100644 --- a/app/vmauth/main_test.go +++ b/app/vmauth/main_test.go @@ -55,9 +55,6 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) { if n != 1 { t.Fatalf("unexpected number of bytes read; got %d; want 1", n) } - if rtb.canRetry() { - t.Fatalf("canRetry() must return false") - } data, err := io.ReadAll(rtb) if err != nil { t.Fatalf("unexpected error when reading all the data: %s", err) @@ -85,6 +82,128 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) { f(newTestString(2 * maxRequestBodySizeToRetry.IntN())) } +// request body not over maxRequestBodySizeToRetry +// 1. When writing data downstream, buf only caches part of the data because the downstream connection is disconnected. +// 2. retry request: because buf caches some data, first read buf and then read stream when retrying +// 3. retry request: the data has been read to buf in the second step. if the request fails, retry to read all buf later. +func TestRetryReadSuccessAfterPartialRead(t *testing.T) { + f := func(s string) { + rtb := &readTrackingBody{ + r: io.NopCloser(bytes.NewBufferString(s)), + buf: make([]byte, 0, len(s)), + } + + var data []byte + var err error + halfSize := len(s) / 2 + if halfSize == 0 { + halfSize = 100 + } + buf := make([]byte, halfSize) + var n int + + // read part of the data + n, err = rtb.Read(buf[:]) + data = append(data, buf[:n]...) + if err != nil && err != io.EOF { + t.Fatalf("unexpected error: %s", err) + } + + // request failed when output stream is closed (eg: server connection reset) + // would close the reader + if err := rtb.Close(); err != nil { + t.Fatalf("unexpected error when closing readTrackingBody: %s", err) + } + if !rtb.canRetry() { + t.Fatalf("canRetry() must return true") + } + + // retry read (read buf + remaining data) + data = data[:0] + err = nil + for err == nil { + n, err = rtb.Read(buf[:]) + data = append(data, buf[:n]...) + } + if err != io.EOF { + t.Fatalf("unexpected error: %s", err) + } + if string(data) != s { + t.Fatalf("unexpected data read; got\n%s\nwant\n%s", data, s) + } + // cannotRetry return false + // because the request data is not over maxRequestBodySizeToRetry limit + if !rtb.canRetry() { + t.Fatalf("canRetry() must return true") + } + } + + f("") + f("foo") + f("foobar") + f(newTestString(maxRequestBodySizeToRetry.IntN())) +} + +// request body over maxRequestBodySizeToRetry +// 1. When writing data downstream, buf only caches part of the data because the downstream connection is disconnected. +// 2. retry request: because buf caches some data, first read buf and then read stream when retrying +// 3. retry request: the data has been read to buf in the second step. if the request fails, retry to read all buf later. +func TestRetryReadSuccessAfterPartialReadAndCannotRetryAgain(t *testing.T) { + f := func(s string) { + rtb := &readTrackingBody{ + r: io.NopCloser(bytes.NewBufferString(s)), + buf: make([]byte, 0, len(s)), + } + + var data []byte + var err error + halfSize := len(s) / 2 + if halfSize == 0 { + halfSize = 100 + } + buf := make([]byte, halfSize) + var n int + + // read part of the data + n, err = rtb.Read(buf[:]) + data = append(data, buf[:n]...) + if err != nil && err != io.EOF { + t.Fatalf("unexpected error: %s", err) + } + + // request failed when output stream is closed (eg: server connection reset) + if err := rtb.Close(); err != nil { + t.Fatalf("unexpected error when closing readTrackingBody: %s", err) + } + if !rtb.canRetry() { + t.Fatalf("canRetry() must return true") + } + + // retry read (read buf + remaining data) + data = data[:0] + err = nil + for err == nil { + n, err = rtb.Read(buf[:]) + data = append(data, buf[:n]...) + } + if err != io.EOF { + t.Fatalf("unexpected error: %s", err) + } + if string(data) != s { + t.Fatalf("unexpected data read; got\n%s\nwant\n%s", data, s) + } + + // cannotRetry returns true + // because the request data is over maxRequestBodySizeToRetry limit + if rtb.canRetry() { + t.Fatalf("canRetry() must return false") + } + } + + f(newTestString(maxRequestBodySizeToRetry.IntN() + 1)) + f(newTestString(2 * maxRequestBodySizeToRetry.IntN())) +} + func newTestString(sLen int) string { return string(make([]byte, sLen)) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a78841eaa..a07959c3f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -32,17 +32,20 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). **Update note 1: support for snap packages was removed due to lack of interest from community. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6543) for details. Please read about supported package types [here](https://docs.victoriametrics.com/#install).** * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow overriding `Host` header with a target host before sending to a downstream. See this [issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6453) +* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details. * FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): add [Grafana dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmauth.json) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml) for [vmauth](https://docs.victoriametrics.com/vmauth/) dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4313) for details. +* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): added `yandexcloud_sd` AWS API IMDSv2 support. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): expose metrics related to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): - * `vm_streamaggr_matched_samples_total` - shows the number of samples matched by the aggregation rule; - * `vm_streamaggr_flushed_samples_total` - shows the number of samples produced by the aggregation rule; - * `vm_streamaggr_samples_lag_seconds` - shows the max lag between samples timestamps within one batch received by the aggregation; + * `vm_streamaggr_matched_samples_total` - shows the number of samples matched by the aggregation rule; + * `vm_streamaggr_flushed_samples_total` - shows the number of samples produced by the aggregation rule; + * `vm_streamaggr_samples_lag_seconds` - shows the max lag between samples timestamps within one batch received by the aggregation; * `vm_streamaggr_stale_samples_total` - shows the number of time series that became [stale](https://docs.victoriametrics.com/stream-aggregation/#staleness) during aggregation; * metrics related to stream aggregation got additional labels `match` (matching param), `group` (`by` or `without` param), `url` (address of `remoteWrite.url` where aggregation is applied), `position` (the position of the aggregation rule in config file). * These and other metrics were reflected on the [vmagent dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmagent.json) in `stream aggregation` section. * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): do not retry RPC calls to vmstorage nodes if [complexity limits](https://docs.victoriametrics.com/#resource-usage-limits) were exceeded. +* BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana). * BUGFIX: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): fix incorrect link to vmui from [VictoriaMetrics plugin in Grafana](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#grafana). * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Fix the dateMetricIDCache consistency issue that leads to duplicate per-day index entries when new time series are inserted concurrently. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6534) for details. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix input cursor position reset in modal settings. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6530). diff --git a/docs/vmauth.md b/docs/vmauth.md index 16defc06e..d1ae36cb4 100644 --- a/docs/vmauth.md +++ b/docs/vmauth.md @@ -1260,7 +1260,7 @@ See the docs at https://docs.victoriametrics.com/vmauth/ . -maxIdleConnsPerBackend int The maximum number of idle connections vmauth can open per each backend host. See also -maxConcurrentRequests (default 100) -maxRequestBodySizeToRetry size - The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory + The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory. Negative or zero values disable request body caching and retries. Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384) -memory.allowedBytes size Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage