2020-05-05 09:53:42 +02:00
package main
import (
2023-09-08 00:46:34 +02:00
"context"
"errors"
2020-05-05 09:53:42 +02:00
"flag"
2021-09-14 11:17:49 +02:00
"fmt"
2023-01-27 22:38:13 +01:00
"io"
"net"
2020-05-05 09:53:42 +02:00
"net/http"
2023-01-27 22:38:13 +01:00
"net/textproto"
2023-01-27 23:06:42 +01:00
"net/url"
2020-05-16 10:59:30 +02:00
"os"
2022-03-18 17:31:58 +01:00
"strings"
2021-11-09 18:18:27 +01:00
"sync"
2020-05-05 09:53:42 +02:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
2023-01-27 22:38:13 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-05-16 10:59:30 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
2020-12-03 20:40:30 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-05-05 09:53:42 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2023-01-27 22:38:13 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
2020-05-05 09:53:42 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2022-07-21 18:58:22 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
2021-05-18 01:23:53 +02:00
"github.com/VictoriaMetrics/metrics"
2020-05-05 09:53:42 +02:00
)
var (
2023-01-27 08:08:35 +01:00
httpListenAddr = flag . String ( "httpListenAddr" , ":8427" , "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol" )
useProxyProtocol = flag . Bool ( "httpListenAddr.useProxyProtocol" , false , "Whether to use proxy protocol for connections accepted at -httpListenAddr . " +
2023-03-08 10:26:53 +01:00
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt . " +
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing" )
2023-01-27 23:06:42 +01:00
maxIdleConnsPerBackend = flag . Int ( "maxIdleConnsPerBackend" , 100 , "The maximum number of idle connections vmauth can open per each backend host. " +
"See also -maxConcurrentRequests" )
responseTimeout = flag . Duration ( "responseTimeout" , 5 * time . Minute , "The timeout for receiving a response from backend" )
maxConcurrentRequests = flag . Int ( "maxConcurrentRequests" , 1000 , "The maximum number of concurrent requests vmauth can process. Other requests are rejected with " +
2023-02-11 06:57:49 +01:00
"'429 Too Many Requests' http status code. See also -maxConcurrentPerUserRequests and -maxIdleConnsPerBackend command-line options" )
maxConcurrentPerUserRequests = flag . Int ( "maxConcurrentPerUserRequests" , 300 , "The maximum number of concurrent requests vmauth can process per each configured user. " +
"Other requests are rejected with '429 Too Many Requests' http status code. See also -maxConcurrentRequests command-line option and max_concurrent_requests option " +
"in per-user config" )
2023-01-27 23:06:42 +01:00
reloadAuthKey = flag . String ( "reloadAuthKey" , "" , "Auth key for /-/reload http endpoint. It must be passed as authKey=..." )
logInvalidAuthTokens = flag . Bool ( "logInvalidAuthTokens" , false , "Whether to log requests with invalid auth tokens. " +
2021-10-19 14:29:07 +02:00
` Such requests are always counted at vmauth_http_request_errors_total { reason="invalid_auth_token"} metric, which is exposed at /metrics page ` )
2023-09-08 00:46:34 +02:00
failTimeout = flag . Duration ( "failTimeout" , 3 * time . Second , "Sets a delay period for load balancing to skip a malfunctioning backend" )
maxRequestBodySizeToRetry = flagutil . NewBytes ( "maxRequestBodySizeToRetry" , 16 * 1024 , "The maximum request body size, which can be cached and re-tried at other backends. " +
"Bigger values may require more memory" )
2020-05-05 09:53:42 +02:00
)
func main ( ) {
2020-05-16 10:59:30 +02:00
// Write flags and help message to stdout, since it is easier to grep or pipe.
flag . CommandLine . SetOutput ( os . Stdout )
2020-06-05 09:39:46 +02:00
flag . Usage = usage
2020-05-16 10:59:30 +02:00
envflag . Parse ( )
2020-05-05 09:53:42 +02:00
buildinfo . Init ( )
logger . Init ( )
2022-07-22 12:35:58 +02:00
pushmetrics . Init ( )
2020-05-05 09:53:42 +02:00
logger . Infof ( "starting vmauth at %q..." , * httpListenAddr )
startTime := time . Now ( )
initAuthConfig ( )
2023-01-27 08:08:35 +01:00
go httpserver . Serve ( * httpListenAddr , * useProxyProtocol , requestHandler )
2020-05-05 09:53:42 +02:00
logger . Infof ( "started vmauth in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
sig := procutil . WaitForSigterm ( )
logger . Infof ( "received signal %s" , sig )
startTime = time . Now ( )
logger . Infof ( "gracefully shutting down webservice at %q" , * httpListenAddr )
if err := httpserver . Stop ( * httpListenAddr ) ; err != nil {
logger . Fatalf ( "cannot stop the webservice: %s" , err )
}
logger . Infof ( "successfully shut down the webservice in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
stopAuthConfig ( )
logger . Infof ( "successfully stopped vmauth in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
}
func requestHandler ( w http . ResponseWriter , r * http . Request ) bool {
2021-05-18 01:23:53 +02:00
switch r . URL . Path {
case "/-/reload" :
2023-01-11 00:51:55 +01:00
if ! httpserver . CheckAuthFlag ( w , r , * reloadAuthKey , "reloadAuthKey" ) {
2021-05-20 17:46:12 +02:00
return true
}
2021-05-18 01:23:53 +02:00
configReloadRequests . Inc ( )
procutil . SelfSIGHUP ( )
w . WriteHeader ( http . StatusOK )
return true
}
2021-04-02 21:14:53 +02:00
authToken := r . Header . Get ( "Authorization" )
if authToken == "" {
2023-04-24 14:57:13 +02:00
// Process requests for unauthorized users
ui := authConfig . Load ( ) . UnauthorizedUser
if ui != nil {
processUserRequest ( w , r , ui )
return true
}
2020-08-09 08:38:41 +02:00
w . Header ( ) . Set ( "WWW-Authenticate" , ` Basic realm="Restricted" ` )
2021-04-02 21:14:53 +02:00
http . Error ( w , "missing `Authorization` request header" , http . StatusUnauthorized )
2020-05-05 09:53:42 +02:00
return true
}
2022-03-18 17:31:58 +01:00
if strings . HasPrefix ( authToken , "Token " ) {
// Handle InfluxDB's proprietary token authentication scheme as a bearer token authentication
// See https://docs.influxdata.com/influxdb/v2.0/api/
authToken = strings . Replace ( authToken , "Token" , "Bearer" , 1 )
}
2023-01-27 23:06:42 +01:00
2023-04-20 19:08:27 +02:00
ac := * authUsers . Load ( )
2021-04-02 21:14:53 +02:00
ui := ac [ authToken ]
if ui == nil {
2021-09-14 11:17:49 +02:00
invalidAuthTokenRequests . Inc ( )
if * logInvalidAuthTokens {
2023-05-17 09:09:47 +02:00
err := fmt . Errorf ( "cannot find the provided auth token %q in config" , authToken )
2023-01-27 22:38:13 +01:00
err = & httpserver . ErrorWithStatusCode {
Err : err ,
StatusCode : http . StatusUnauthorized ,
}
httpserver . Errorf ( w , r , "%s" , err )
2021-09-14 11:17:49 +02:00
} else {
2023-05-17 09:09:47 +02:00
http . Error ( w , "Unauthorized" , http . StatusUnauthorized )
2021-09-14 11:17:49 +02:00
}
2020-05-05 09:53:42 +02:00
return true
}
2023-04-24 14:57:13 +02:00
processUserRequest ( w , r , ui )
return true
}
func processUserRequest ( w http . ResponseWriter , r * http . Request , ui * UserInfo ) {
2023-06-27 20:15:17 +02:00
startTime := time . Now ( )
defer ui . requestsDuration . UpdateDuration ( startTime )
2021-02-11 11:40:59 +01:00
ui . requests . Inc ( )
2023-01-27 22:38:13 +01:00
2023-01-27 23:06:42 +01:00
// Limit the concurrency of requests to backends
concurrencyLimitOnce . Do ( concurrencyLimitInit )
select {
case concurrencyLimitCh <- struct { } { } :
2023-02-10 05:03:01 +01:00
if err := ui . beginConcurrencyLimit ( ) ; err != nil {
handleConcurrencyLimitError ( w , r , err )
<- concurrencyLimitCh
2023-04-24 14:57:13 +02:00
return
2023-01-27 23:06:42 +01:00
}
2023-02-10 05:03:01 +01:00
default :
concurrentRequestsLimitReached . Inc ( )
err := fmt . Errorf ( "cannot serve more than -maxConcurrentRequests=%d concurrent requests" , cap ( concurrencyLimitCh ) )
handleConcurrencyLimitError ( w , r , err )
2023-04-24 14:57:13 +02:00
return
2023-01-27 23:06:42 +01:00
}
2023-02-10 06:05:13 +01:00
processRequest ( w , r , ui )
2023-02-10 05:03:01 +01:00
ui . endConcurrencyLimit ( )
2023-01-27 23:06:42 +01:00
<- concurrencyLimitCh
}
2023-02-10 06:05:13 +01:00
func processRequest ( w http . ResponseWriter , r * http . Request , ui * UserInfo ) {
u := normalizeURL ( r . URL )
2023-09-08 22:39:17 +02:00
up , hc , retryStatusCodes := ui . getURLPrefixAndHeaders ( u )
2023-04-26 11:04:35 +02:00
isDefault := false
if up == nil {
if ui . DefaultURL == nil {
2023-11-01 20:59:46 +01:00
// Authorization should be requested for http requests without credentials
// to a route that is not in the configuration for unauthorized user.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5236
if ui . BearerToken == "" && ui . Username == "" && len ( * authUsers . Load ( ) ) > 0 {
w . Header ( ) . Set ( "WWW-Authenticate" , ` Basic realm="Restricted" ` )
http . Error ( w , "missing `Authorization` request header" , http . StatusUnauthorized )
return
}
missingRouteRequests . Inc ( )
2023-04-26 11:04:35 +02:00
httpserver . Errorf ( w , r , "missing route for %q" , u . String ( ) )
return
}
2023-09-08 22:39:17 +02:00
up , hc , retryStatusCodes = ui . DefaultURL , ui . HeadersConf , ui . RetryStatusCodes
2023-04-26 11:04:35 +02:00
isDefault = true
2023-02-10 06:05:13 +01:00
}
maxAttempts := up . getBackendsCount ( )
2023-09-08 00:46:34 +02:00
if maxAttempts > 1 {
r . Body = & readTrackingBody {
r : r . Body ,
}
}
2023-02-10 06:05:13 +01:00
for i := 0 ; i < maxAttempts ; i ++ {
2023-02-11 09:27:40 +01:00
bu := up . getLeastLoadedBackendURL ( )
2023-04-26 11:04:35 +02:00
targetURL := bu . url
// Don't change path and add request_path query param for default route.
if isDefault {
query := targetURL . Query ( )
query . Set ( "request_path" , u . Path )
targetURL . RawQuery = query . Encode ( )
} else { // Update path for regular routes.
targetURL = mergeURLs ( targetURL , u )
}
2023-09-08 22:39:17 +02:00
ok := tryProcessingRequest ( w , r , targetURL , hc , retryStatusCodes )
2023-02-11 09:27:40 +01:00
bu . put ( )
if ok {
2023-02-10 06:05:13 +01:00
return
}
2023-02-11 09:27:40 +01:00
bu . setBroken ( )
2023-02-10 06:05:13 +01:00
}
2023-04-26 11:04:35 +02:00
err := & httpserver . ErrorWithStatusCode {
2023-02-10 06:05:13 +01:00
Err : fmt . Errorf ( "all the backends for the user %q are unavailable" , ui . name ( ) ) ,
StatusCode : http . StatusServiceUnavailable ,
}
httpserver . Errorf ( w , r , "%s" , err )
}
2023-09-08 22:39:17 +02:00
func tryProcessingRequest ( w http . ResponseWriter , r * http . Request , targetURL * url . URL , hc HeadersConf , retryStatusCodes [ ] int ) bool {
2023-01-27 22:38:13 +01:00
// This code has been copied from net/http/httputil/reverseproxy.go
req := sanitizeRequestHeaders ( r )
req . URL = targetURL
2023-09-08 22:39:17 +02:00
updateHeadersByConfig ( req . Header , hc . RequestHeaders )
2023-01-27 22:38:13 +01:00
transportOnce . Do ( transportInit )
res , err := transport . RoundTrip ( req )
2023-09-08 00:46:34 +02:00
rtb , rtbOK := req . Body . ( * readTrackingBody )
2023-01-27 22:38:13 +01:00
if err != nil {
2023-09-08 00:46:34 +02:00
if errors . Is ( err , context . Canceled ) || errors . Is ( err , context . DeadlineExceeded ) {
// Do not retry canceled or timed out requests
remoteAddr := httpserver . GetQuotedRemoteAddr ( r )
requestURI := httpserver . GetRequestURI ( r )
logger . Warnf ( "remoteAddr: %s; requestURI: %s; error when proxying response body from %s: %s" , remoteAddr , requestURI , targetURL , err )
return true
}
if ! rtbOK || ! rtb . canRetry ( ) {
// Request body cannot be re-sent to another backend. Return the error to the client then.
2023-02-10 06:05:13 +01:00
err = & httpserver . ErrorWithStatusCode {
Err : fmt . Errorf ( "cannot proxy the request to %q: %w" , targetURL , err ) ,
StatusCode : http . StatusServiceUnavailable ,
}
httpserver . Errorf ( w , r , "%s" , err )
return true
2023-01-27 22:38:13 +01:00
}
2023-05-17 09:37:03 +02:00
// Retry the request if its body wasn't read yet. This usually means that the backend isn't reachable.
remoteAddr := httpserver . GetQuotedRemoteAddr ( r )
// NOTE: do not use httpserver.GetRequestURI
2023-09-08 00:46:34 +02:00
// it explicitly reads request body, which may fail retries.
2023-09-08 22:39:17 +02:00
logger . Warnf ( "remoteAddr: %s; requestURI: %s; retrying the request to %s because of response error: %s" , remoteAddr , req . URL , targetURL , err )
2023-09-08 00:46:34 +02:00
return false
}
2023-09-08 22:39:17 +02:00
if ( rtbOK && rtb . canRetry ( ) ) && hasInt ( retryStatusCodes , res . StatusCode ) {
// Retry requests at other backends if it matches retryStatusCodes.
2023-09-08 00:46:34 +02:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4893
remoteAddr := httpserver . GetQuotedRemoteAddr ( r )
// NOTE: do not use httpserver.GetRequestURI
// it explicitly reads request body, which may fail retries.
2023-09-08 22:39:17 +02:00
logger . Warnf ( "remoteAddr: %s; requestURI: %s; retrying the request to %s because response status code=%d belongs to retry_status_codes=%d" ,
remoteAddr , req . URL , targetURL , res . StatusCode , retryStatusCodes )
2023-02-10 06:05:13 +01:00
return false
2023-01-27 22:38:13 +01:00
}
removeHopHeaders ( res . Header )
copyHeader ( w . Header ( ) , res . Header )
2023-09-08 22:39:17 +02:00
updateHeadersByConfig ( w . Header ( ) , hc . ResponseHeaders )
2023-01-27 22:38:13 +01:00
w . WriteHeader ( res . StatusCode )
copyBuf := copyBufPool . Get ( )
copyBuf . B = bytesutil . ResizeNoCopyNoOverallocate ( copyBuf . B , 16 * 1024 )
_ , err = io . CopyBuffer ( w , res . Body , copyBuf . B )
copyBufPool . Put ( copyBuf )
if err != nil && ! netutil . IsTrivialNetworkError ( err ) {
remoteAddr := httpserver . GetQuotedRemoteAddr ( r )
requestURI := httpserver . GetRequestURI ( r )
logger . Warnf ( "remoteAddr: %s; requestURI: %s; error when proxying response body from %s: %s" , remoteAddr , requestURI , targetURL , err )
2023-02-10 06:05:13 +01:00
return true
2021-10-22 18:08:06 +02:00
}
2023-02-10 06:05:13 +01:00
return true
2020-05-05 09:53:42 +02:00
}
2023-09-08 22:39:17 +02:00
func hasInt ( a [ ] int , n int ) bool {
for _ , x := range a {
if x == n {
return true
}
}
return false
}
2023-01-27 22:38:13 +01:00
var copyBufPool bytesutil . ByteBufferPool
func copyHeader ( dst , src http . Header ) {
for k , vv := range src {
for _ , v := range vv {
dst . Add ( k , v )
2021-06-11 11:50:22 +02:00
}
2023-01-27 22:38:13 +01:00
}
}
2023-08-31 14:26:51 +02:00
func updateHeadersByConfig ( headers http . Header , config [ ] Header ) {
for _ , h := range config {
if h . Value == "" {
headers . Del ( h . Name )
} else {
headers . Set ( h . Name , h . Value )
}
}
}
2023-01-27 22:38:13 +01:00
func sanitizeRequestHeaders ( r * http . Request ) * http . Request {
// This code has been copied from net/http/httputil/reverseproxy.go
req := r . Clone ( r . Context ( ) )
removeHopHeaders ( req . Header )
if clientIP , _ , err := net . SplitHostPort ( req . RemoteAddr ) ; err == nil {
// If we aren't the first proxy retain prior
// X-Forwarded-For information as a comma+space
// separated list and fold multiple headers into one.
prior := req . Header [ "X-Forwarded-For" ]
if len ( prior ) > 0 {
clientIP = strings . Join ( prior , ", " ) + ", " + clientIP
}
req . Header . Set ( "X-Forwarded-For" , clientIP )
}
return req
}
func removeHopHeaders ( h http . Header ) {
// remove hop-by-hop headers listed in the "Connection" header of h.
// See RFC 7230, section 6.1
for _ , f := range h [ "Connection" ] {
for _ , sf := range strings . Split ( f , "," ) {
if sf = textproto . TrimString ( sf ) ; sf != "" {
h . Del ( sf )
}
}
}
// Remove hop-by-hop headers to the backend. Especially
// important is "Connection" because we want a persistent
// connection, regardless of what the client sent to us.
for _ , key := range hopHeaders {
h . Del ( key )
}
}
// Hop-by-hop headers. These are removed when sent to the backend.
// As of RFC 7230, hop-by-hop headers are required to appear in the
// Connection header field. These are the headers defined by the
// obsoleted RFC 2616 (section 13.5.1) and are used for backward
// compatibility.
var hopHeaders = [ ] string {
"Connection" ,
"Proxy-Connection" , // non-standard but still sent by libcurl and rejected by e.g. google
"Keep-Alive" ,
"Proxy-Authenticate" ,
"Proxy-Authorization" ,
"Te" , // canonicalized version of "TE"
"Trailer" , // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
"Transfer-Encoding" ,
"Upgrade" ,
2021-06-11 11:50:22 +02:00
}
2021-09-14 11:17:49 +02:00
var (
2021-10-19 14:29:07 +02:00
configReloadRequests = metrics . NewCounter ( ` vmauth_http_requests_total { path="/-/reload"} ` )
invalidAuthTokenRequests = metrics . NewCounter ( ` vmauth_http_request_errors_total { reason="invalid_auth_token"} ` )
missingRouteRequests = metrics . NewCounter ( ` vmauth_http_request_errors_total { reason="missing_route"} ` )
2021-09-14 11:17:49 +02:00
)
2021-05-18 01:23:53 +02:00
2021-11-09 18:18:27 +01:00
var (
2023-01-27 22:38:13 +01:00
transport * http . Transport
transportOnce sync . Once
2021-11-09 18:18:27 +01:00
)
2023-01-27 22:38:13 +01:00
func transportInit ( ) {
tr := http . DefaultTransport . ( * http . Transport ) . Clone ( )
tr . ResponseHeaderTimeout = * responseTimeout
// Automatic compression must be disabled in order to fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/535
tr . DisableCompression = true
// Disable HTTP/2.0, since VictoriaMetrics components don't support HTTP/2.0 (because there is no sense in this).
tr . ForceAttemptHTTP2 = false
tr . MaxIdleConnsPerHost = * maxIdleConnsPerBackend
if tr . MaxIdleConns != 0 && tr . MaxIdleConns < tr . MaxIdleConnsPerHost {
tr . MaxIdleConns = tr . MaxIdleConnsPerHost
2021-11-09 18:18:27 +01:00
}
2023-01-27 22:38:13 +01:00
transport = tr
2020-05-05 09:53:42 +02:00
}
2020-06-05 09:39:46 +02:00
2023-01-27 23:06:42 +01:00
var (
concurrencyLimitCh chan struct { }
concurrencyLimitOnce sync . Once
)
func concurrencyLimitInit ( ) {
concurrencyLimitCh = make ( chan struct { } , * maxConcurrentRequests )
_ = metrics . NewGauge ( "vmauth_concurrent_requests_capacity" , func ( ) float64 {
return float64 ( * maxConcurrentRequests )
} )
_ = metrics . NewGauge ( "vmauth_concurrent_requests_current" , func ( ) float64 {
return float64 ( len ( concurrencyLimitCh ) )
} )
}
2023-02-10 05:03:01 +01:00
var concurrentRequestsLimitReached = metrics . NewCounter ( "vmauth_concurrent_requests_limit_reached_total" )
2023-01-27 23:06:42 +01:00
2020-06-05 09:39:46 +02:00
func usage ( ) {
const s = `
vmauth authenticates and authorizes incoming requests and proxies them to VictoriaMetrics .
2021-04-20 19:16:17 +02:00
See the docs at https : //docs.victoriametrics.com/vmauth.html .
2020-06-05 09:39:46 +02:00
`
2020-12-03 20:40:30 +01:00
flagutil . Usage ( s )
2020-06-05 09:39:46 +02:00
}
2023-02-10 05:03:01 +01:00
func handleConcurrencyLimitError ( w http . ResponseWriter , r * http . Request , err error ) {
w . Header ( ) . Add ( "Retry-After" , "10" )
err = & httpserver . ErrorWithStatusCode {
Err : err ,
StatusCode : http . StatusTooManyRequests ,
}
httpserver . Errorf ( w , r , "%s" , err )
}
2023-05-17 09:19:33 +02:00
type readTrackingBody struct {
2023-09-08 00:46:34 +02:00
// r contains reader for initial data reading
r io . ReadCloser
// buf is a buffer for data read from r. Buf size is limited by maxRequestBodySizeToRetry.
// If more than maxRequestBodySizeToRetry is read from r, then cannotRetry is set to true.
buf [ ] byte
// cannotRetry is set to true when more than maxRequestBodySizeToRetry are read from r.
// In this case the read data cannot fit buf, so it cannot be re-read from buf.
cannotRetry bool
// bufComplete is set to true when buf contains complete request body read from r.
bufComplete bool
// needReadBuf is set to true when Read() must be performed from buf instead of r.
needReadBuf bool
// offset is an offset at buf for the next data read if needReadBuf is set to true.
offset int
2023-05-17 09:19:33 +02:00
}
// Read implements io.Reader interface
// tracks body reading requests
func ( rtb * readTrackingBody ) Read ( p [ ] byte ) ( int , error ) {
2023-09-08 00:46:34 +02:00
if rtb . needReadBuf {
if rtb . offset >= len ( rtb . buf ) {
return 0 , io . EOF
}
n := copy ( p , rtb . buf [ rtb . offset : ] )
rtb . offset += n
return n , nil
2023-05-17 09:19:33 +02:00
}
2023-09-08 00:46:34 +02:00
if rtb . r == nil {
return 0 , fmt . Errorf ( "cannot read data after closing the reader" )
}
n , err := rtb . r . Read ( p )
if rtb . cannotRetry {
return n , err
}
if len ( rtb . buf ) + n > maxRequestBodySizeToRetry . IntN ( ) {
rtb . cannotRetry = true
return n , err
}
rtb . buf = append ( rtb . buf , p [ : n ] ... )
if err == io . EOF {
rtb . bufComplete = true
}
return n , err
}
func ( rtb * readTrackingBody ) canRetry ( ) bool {
if rtb . cannotRetry {
return false
}
if len ( rtb . buf ) > 0 && ! rtb . needReadBuf {
return false
}
return true
2023-05-17 09:19:33 +02:00
}
2023-05-17 09:37:03 +02:00
// Close implements io.Closer interface.
2023-05-17 09:19:33 +02:00
func ( rtb * readTrackingBody ) Close ( ) error {
2023-09-08 00:46:34 +02:00
rtb . offset = 0
if rtb . bufComplete {
rtb . needReadBuf = true
2023-05-17 09:19:33 +02:00
}
2023-09-08 00:46:34 +02:00
// Close rtb.r only if the request body is completely read or if it is too big.
// http.Roundtrip performs body.Close call even without any Read calls,
// so this hack allows us to reuse request body.
if rtb . bufComplete || rtb . cannotRetry {
if rtb . r == nil {
return nil
}
err := rtb . r . Close ( )
rtb . r = nil
return err
}
2023-05-17 09:19:33 +02:00
return nil
}