app/vmauth: pool readTrackingBody structs in order to reduce pressure on Go GC

- use pool for readTrackingBody structs in order to reduce pressure on Go GC
- allow re-reading partially read request body
- add missing tests for various cases of readTrackingBody usage

This is a follow-up for ad6af95183 and 4d66e042e3.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6445
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6446
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533
This commit is contained in:
Aliaksandr Valialkin 2024-07-17 11:06:16 +02:00
parent 277aad18d8
commit 7ee5797493
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
4 changed files with 195 additions and 55 deletions

View File

@ -50,7 +50,7 @@ var (
`Such requests are always counted at vmauth_http_request_errors_total{reason="invalid_auth_token"} metric, which is exposed at /metrics page`)
failTimeout = flag.Duration("failTimeout", 3*time.Second, "Sets a delay period for load balancing to skip a malfunctioning backend")
maxRequestBodySizeToRetry = flagutil.NewBytes("maxRequestBodySizeToRetry", 16*1024, "The maximum request body size, which can be cached and re-tried at other backends. "+
"Bigger values may require more memory")
"Bigger values may require more memory. Zero or negative value disables caching of request body. This may be useful when proxying data ingestion requests")
backendTLSInsecureSkipVerify = flag.Bool("backend.tlsInsecureSkipVerify", false, "Whether to skip TLS verification when connecting to backends over HTTPS. "+
"See https://docs.victoriametrics.com/vmauth/#backend-tls-setup")
backendTLSCAFile = flag.String("backend.TLSCAFile", "", "Optional path to TLS root CA file, which is used for TLS verification when connecting to backends over HTTPS. "+
@ -200,10 +200,12 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo) {
up, hc = ui.DefaultURL, ui.HeadersConf
isDefault = true
}
rtb := getReadTrackingBody(r.Body, maxRequestBodySizeToRetry.IntN())
defer putReadTrackingBody(rtb)
r.Body = rtb
maxAttempts := up.getBackendsCount()
r.Body = &readTrackingBody{
r: r.Body,
}
for i := 0; i < maxAttempts; i++ {
bu := up.getBackendURL()
targetURL := bu.url
@ -269,7 +271,7 @@ again:
ui.backendErrors.Inc()
return true
}
// one time retry trivial network errors, such as proxy idle timeout misconfiguration or socket close by OS
// Retry request on trivial network errors, such as proxy idle timeout misconfiguration or socket close by OS
if (netutil.IsTrivialNetworkError(err) || errors.Is(err, io.EOF)) && trivialRetries < 1 {
trivialRetries++
goto again
@ -487,41 +489,74 @@ func handleConcurrencyLimitError(w http.ResponseWriter, r *http.Request, err err
httpserver.Errorf(w, r, "%s", err)
}
// readTrackingBody must be obtained via getReadTrackingBody()
type readTrackingBody struct {
// maxBodySize is the maximum body size to cache in buf.
//
// Bigger bodies cannot be retried.
maxBodySize int
// r contains reader for initial data reading
r io.ReadCloser
// buf is a buffer for data read from r. Buf size is limited by maxRequestBodySizeToRetry.
// If more than maxRequestBodySizeToRetry is read from r, then cannotRetry is set to true.
// buf is a buffer for data read from r. Buf size is limited by maxBodySize.
// If more than maxBodySize is read from r, then cannotRetry is set to true.
buf []byte
// cannotRetry is set to true when more than maxRequestBodySizeToRetry are read from r.
// readBuf points to the cached data at buf, which must be read in the next call to Read().
readBuf []byte
// cannotRetry is set to true when more than maxBodySize bytes are read from r.
// In this case the read data cannot fit buf, so it cannot be re-read from buf.
cannotRetry bool
// bufComplete is set to true when buf contains complete request body read from r.
bufComplete bool
// needReadBuf is set to true when Read() must be performed from buf instead of r.
needReadBuf bool
// offset is an offset at buf for the next data read if needReadBuf is set to true.
offset int
}
// Read implements io.Reader interface
// tracks body reading requests
func (rtb *readTrackingBody) reset() {
rtb.maxBodySize = 0
rtb.r = nil
rtb.buf = rtb.buf[:0]
rtb.readBuf = nil
rtb.cannotRetry = false
rtb.bufComplete = false
}
func getReadTrackingBody(r io.ReadCloser, maxBodySize int) *readTrackingBody {
v := readTrackingBodyPool.Get()
if v == nil {
v = &readTrackingBody{}
}
rtb := v.(*readTrackingBody)
if maxBodySize < 0 {
maxBodySize = 0
}
rtb.maxBodySize = maxBodySize
rtb.r = r
return rtb
}
func putReadTrackingBody(rtb *readTrackingBody) {
rtb.reset()
readTrackingBodyPool.Put(rtb)
}
var readTrackingBodyPool sync.Pool
// Read implements io.Reader interface.
func (rtb *readTrackingBody) Read(p []byte) (int, error) {
if rtb.needReadBuf {
if rtb.offset >= len(rtb.buf) {
return 0, io.EOF
}
n := copy(p, rtb.buf[rtb.offset:])
rtb.offset += n
if len(rtb.readBuf) > 0 {
n := copy(p, rtb.readBuf)
rtb.readBuf = rtb.readBuf[n:]
return n, nil
}
if rtb.r == nil {
if rtb.bufComplete {
return 0, io.EOF
}
return 0, fmt.Errorf("cannot read data after closing the reader")
}
@ -529,7 +564,8 @@ func (rtb *readTrackingBody) Read(p []byte) (int, error) {
if rtb.cannotRetry {
return n, err
}
if len(rtb.buf)+n > maxRequestBodySizeToRetry.IntN() {
if len(rtb.buf)+n > rtb.maxBodySize {
rtb.cannotRetry = true
return n, err
}
@ -544,17 +580,18 @@ func (rtb *readTrackingBody) canRetry() bool {
if rtb.cannotRetry {
return false
}
if len(rtb.buf) > 0 && !rtb.needReadBuf {
return false
if rtb.bufComplete {
return true
}
return true
return rtb.r != nil
}
// Close implements io.Closer interface.
func (rtb *readTrackingBody) Close() error {
rtb.offset = 0
if rtb.bufComplete {
rtb.needReadBuf = true
if !rtb.cannotRetry {
rtb.readBuf = rtb.buf
} else {
rtb.readBuf = nil
}
// Close rtb.r only if the request body is completely read or if it is too big.

View File

@ -6,14 +6,15 @@ import (
"testing"
)
func TestReadTrackingBodyRetrySuccess(t *testing.T) {
f := func(s string) {
func TestReadTrackingBody_RetrySuccess(t *testing.T) {
f := func(s string, maxBodySize int) {
t.Helper()
rtb := &readTrackingBody{
r: io.NopCloser(bytes.NewBufferString(s)),
}
rtb := getReadTrackingBody(io.NopCloser(bytes.NewBufferString(s)), maxBodySize)
defer putReadTrackingBody(rtb)
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true")
t.Fatalf("canRetry() must return true before reading anything")
}
for i := 0; i < 5; i++ {
data, err := io.ReadAll(rtb)
@ -32,44 +33,99 @@ func TestReadTrackingBodyRetrySuccess(t *testing.T) {
}
}
f("")
f("foo")
f("foobar")
f(newTestString(maxRequestBodySizeToRetry.IntN()))
f("", 0)
f("", -1)
f("", 100)
f("foo", 100)
f("foobar", 100)
f(newTestString(1000), 1000)
}
func TestReadTrackingBodyRetryFailure(t *testing.T) {
f := func(s string) {
func TestReadTrackingBody_RetrySuccessPartialRead(t *testing.T) {
f := func(s string, maxBodySize int) {
t.Helper()
rtb := &readTrackingBody{
r: io.NopCloser(bytes.NewBufferString(s)),
// Check the case with partial read
rtb := getReadTrackingBody(io.NopCloser(bytes.NewBufferString(s)), maxBodySize)
defer putReadTrackingBody(rtb)
for i := 0; i < len(s); i++ {
buf := make([]byte, i)
n, err := io.ReadFull(rtb, buf)
if err != nil {
t.Fatalf("unexpected error when reading %d bytes: %s", i, err)
}
if n != i {
t.Fatalf("unexpected number of bytes read; got %d; want %d", n, i)
}
if string(buf) != s[:i] {
t.Fatalf("unexpected data read with the length %d\ngot\n%s\nwant\n%s", i, buf, s[:i])
}
if err := rtb.Close(); err != nil {
t.Fatalf("unexpected error when closing reader after reading %d bytes", i)
}
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true after closing the reader after reading %d bytes", i)
}
}
data, err := io.ReadAll(rtb)
if err != nil {
t.Fatalf("unexpected error when reading all the data: %s", err)
}
if string(data) != s {
t.Fatalf("unexpected data read\ngot\n%s\nwant\n%s", data, s)
}
if err := rtb.Close(); err != nil {
t.Fatalf("unexpected error when closing readTrackingBody: %s", err)
}
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true")
t.Fatalf("canRetry() must return true after closing the reader after reading all the input")
}
}
f("", 0)
f("", -1)
f("", 100)
f("foo", 100)
f("foobar", 100)
f(newTestString(1000), 1000)
}
func TestReadTrackingBody_RetryFailureTooBigBody(t *testing.T) {
f := func(s string, maxBodySize int) {
t.Helper()
rtb := getReadTrackingBody(io.NopCloser(bytes.NewBufferString(s)), maxBodySize)
defer putReadTrackingBody(rtb)
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true before reading anything")
}
buf := make([]byte, 1)
n, err := rtb.Read(buf)
n, err := io.ReadFull(rtb, buf)
if err != nil {
t.Fatalf("unexpected error when reading a single byte: %s", err)
}
if n != 1 {
t.Fatalf("unexpected number of bytes read; got %d; want 1", n)
}
if rtb.canRetry() {
t.Fatalf("canRetry() must return false")
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true after reading one byte")
}
data, err := io.ReadAll(rtb)
if err != nil {
t.Fatalf("unexpected error when reading all the data: %s", err)
}
if string(buf)+string(data) != s {
t.Fatalf("unexpected data read\ngot\n%s\nwant\n%s", string(buf)+string(data), s)
dataRead := string(buf) + string(data)
if dataRead != s {
t.Fatalf("unexpected data read\ngot\n%s\nwant\n%s", dataRead, s)
}
if err := rtb.Close(); err != nil {
t.Fatalf("unexpected error when closing readTrackingBody: %s", err)
}
if rtb.canRetry() {
t.Fatalf("canRetry() must return false")
t.Fatalf("canRetry() must return false after closing the reader")
}
data, err = io.ReadAll(rtb)
@ -81,10 +137,55 @@ func TestReadTrackingBodyRetryFailure(t *testing.T) {
}
}
f(newTestString(maxRequestBodySizeToRetry.IntN() + 1))
f(newTestString(2 * maxRequestBodySizeToRetry.IntN()))
const maxBodySize = 1000
f(newTestString(maxBodySize+1), maxBodySize)
f(newTestString(2*maxBodySize), maxBodySize)
}
func TestReadTrackingBody_RetryFailureZeroOrNegativeMaxBodySize(t *testing.T) {
f := func(s string, maxBodySize int) {
t.Helper()
rtb := getReadTrackingBody(io.NopCloser(bytes.NewBufferString(s)), maxBodySize)
defer putReadTrackingBody(rtb)
if !rtb.canRetry() {
t.Fatalf("canRetry() must return true before reading anything")
}
data, err := io.ReadAll(rtb)
if err != nil {
t.Fatalf("unexpected error when reading all the data: %s", err)
}
if string(data) != s {
t.Fatalf("unexpected data read\ngot\n%s\nwant\n%s", data, s)
}
if err := rtb.Close(); err != nil {
t.Fatalf("unexpected error when closing readTrackingBody: %s", err)
}
if rtb.canRetry() {
t.Fatalf("canRetry() must return false after closing the reader")
}
data, err = io.ReadAll(rtb)
if err == nil {
t.Fatalf("expecting non-nil error")
}
if len(data) != 0 {
t.Fatalf("unexpected non-empty data read: %q", data)
}
}
f("foobar", 0)
f(newTestString(1000), 0)
f("foobar", -1)
f(newTestString(1000), -1)
}
func newTestString(sLen int) string {
return string(make([]byte, sLen))
data := make([]byte, sLen)
for i := range data {
data[i] = byte(i)
}
return string(data)
}

View File

@ -37,7 +37,9 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* SECURITY: upgrade Go builder from Go1.22.4 to Go1.22.5. See the list of issues addressed in [Go1.22.5](https://github.com/golang/go/issues?q=milestone%3AGo1.22.5+label%3ACherryPickApproved).
* SECURITY: upgrade base docker image (Alpine) from 3.20.0 to 3.20.1. See [alpine 3.20.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.20.1-released.html).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow overriding `Host` header with backend host before sending the request to the configured backend. See [these docs](https://docs.victoriametrics.com/vmauth/#modifying-http-headers) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6453)
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow overriding `Host` header with backend host before sending the request to the configured backend. See [these docs](https://docs.victoriametrics.com/vmauth/#modifying-http-headers) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6453).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduce CPU usage when proxying data ingestion requests.
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow disabling request body caching with `-maxRequestBodySizeToRetry=0`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6445). Thanks to @shichanglin5 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533).
* FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): add [Grafana dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmauth.json) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml) for [vmauth](https://docs.victoriametrics.com/vmauth/) dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4313) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): [`yandexcloud_sd_configs`](https://docs.victoriametrics.com/sd_configs/#yandexcloud_sd_configs): add support for obtaining IAM token in [GCE format](https://yandex.cloud/en-ru/docs/compute/operations/vm-connect/auth-inside-vm#auth-inside-vm) additionally to the [deprecated Amazon EC2 IMDSv1 format](https://yandex.cloud/en/docs/security/standard/authentication#aws-token). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5513).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add `-graphite.sanitizeMetricName` cmd-line flag for sanitizing metrics ingested via [Graphite protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6077).

View File

@ -1260,7 +1260,7 @@ See the docs at https://docs.victoriametrics.com/vmauth/ .
-maxIdleConnsPerBackend int
The maximum number of idle connections vmauth can open per each backend host. See also -maxConcurrentRequests (default 100)
-maxRequestBodySizeToRetry size
The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory
The maximum request body size, which can be cached and re-tried at other backends. Bigger values may require more memory. Zero or negative value disables caching of request body. This may be useful when proxying data ingestion requests
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384)
-memory.allowedBytes size
Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage