mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-20 23:46:23 +01:00
f03e81c693
- Make sure that invalid/missing TLS CA file or TLS client certificate files at vmagent startup don't prevent from processing the corresponding scrape targets after the file becomes correct, without the need to restart vmagent. Previously scrape targets with invalid TLS CA file or TLS client certificate files were permanently dropped after the first attempt to initialize them, and they didn't appear until the next vmagent reload or the next change in other places of the loaded scrape configs. - Make sure that TLS CA is properly re-loaded from file after it changes without the need to restart vmagent. Previously the old TLS CA was used until vmagent restart. - Properly handle errors during http request creation for the second attempt to send data to remote system at vmagent and vmalert. Previously failed request creation could result in nil pointer dereferencing, since the returned request is nil on error. - Add more context to the logged error during AWS sigv4 request signing before sending the data to -remoteWrite.url at vmagent. Previously it could miss details on the source of the request. - Do not create a new HTTP client per second when generating OAuth2 token needed to put in Authorization header of every http request issued by vmagent during service discovery or target scraping. Re-use the HTTP client instead until the corresponding scrape config changes. - Cache error at lib/promauth.Config.GetAuthHeader() in the same way as the auth header is cached, e.g. the error is cached for a second now. This should reduce load on CPU and OAuth2 server when auth header cannot be obtained because of temporary error. - Share tls.Config.GetClientCertificate function among multiple scrape targets with the same tls_config. Cache the loaded certificate and the error for one second. This should significantly reduce CPU load when scraping big number of targets with the same tls_config. - Allow loading TLS certificates from HTTP and HTTPs urls by specifying these urls at `tls_config->cert_file` and `tls_config->key_file`. - Improve test coverage at lib/promauth - Skip unreachable or invalid files specified at `scrape_config_files` during vmagent startup, since these files may become valid later. Previously vmagent was exitting in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959
444 lines
17 KiB
Go
444 lines
17 KiB
Go
package promscrape
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
|
|
"github.com/VictoriaMetrics/fasthttp"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
var (
|
|
maxScrapeSize = flagutil.NewBytes("promscrape.maxScrapeSize", 16*1024*1024, "The maximum size of scrape response in bytes to process from Prometheus targets. "+
|
|
"Bigger responses are rejected")
|
|
maxResponseHeadersSize = flagutil.NewBytes("promscrape.maxResponseHeadersSize", 4096, "The maximum size of http response headers from Prometheus scrape targets")
|
|
disableCompression = flag.Bool("promscrape.disableCompression", false, "Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. "+
|
|
"This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. "+
|
|
"It is possible to set 'disable_compression: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control")
|
|
disableKeepAlive = flag.Bool("promscrape.disableKeepAlive", false, "Whether to disable HTTP keep-alive connections when scraping all the targets. "+
|
|
"This may be useful when targets has no support for HTTP keep-alive connection. "+
|
|
"It is possible to set 'disable_keepalive: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control. "+
|
|
"Note that disabling HTTP keep-alive may increase load on both vmagent and scrape targets")
|
|
streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+
|
|
"for reducing memory usage when millions of metrics are exposed per each scrape target. "+
|
|
"It is possible to set 'stream_parse: true' individually per each 'scrape_config' section in '-promscrape.config' for fine-grained control")
|
|
)
|
|
|
|
type client struct {
|
|
// hc is the default client optimized for common case of scraping targets with moderate number of metrics.
|
|
hc *fasthttp.HostClient
|
|
|
|
// sc (aka `stream client`) is used instead of hc if ScrapeWork.StreamParse is set.
|
|
// It may be useful for scraping targets with millions of metrics per target.
|
|
sc *http.Client
|
|
|
|
ctx context.Context
|
|
scrapeURL string
|
|
scrapeTimeoutSecondsStr string
|
|
hostPort string
|
|
requestURI string
|
|
setHeaders func(req *http.Request) error
|
|
setProxyHeaders func(req *http.Request) error
|
|
setFasthttpHeaders func(req *fasthttp.Request) error
|
|
setFasthttpProxyHeaders func(req *fasthttp.Request) error
|
|
denyRedirects bool
|
|
disableCompression bool
|
|
disableKeepAlive bool
|
|
}
|
|
|
|
func addMissingPort(addr string, isTLS bool) string {
|
|
if strings.Contains(addr, ":") {
|
|
return addr
|
|
}
|
|
if isTLS {
|
|
return concatTwoStrings(addr, ":443")
|
|
}
|
|
return concatTwoStrings(addr, ":80")
|
|
}
|
|
|
|
func concatTwoStrings(x, y string) string {
|
|
bb := bbPool.Get()
|
|
b := bb.B[:0]
|
|
b = append(b, x...)
|
|
b = append(b, y...)
|
|
s := bytesutil.InternBytes(b)
|
|
bb.B = b
|
|
bbPool.Put(bb)
|
|
return s
|
|
}
|
|
|
|
const scrapeUserAgent = "vm_promscrape"
|
|
|
|
func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
|
|
var u fasthttp.URI
|
|
u.Update(sw.ScrapeURL)
|
|
hostPort := string(u.Host())
|
|
dialAddr := hostPort
|
|
requestURI := string(u.RequestURI())
|
|
isTLS := string(u.Scheme()) == "https"
|
|
var tlsCfg *tls.Config
|
|
if isTLS {
|
|
var err error
|
|
tlsCfg, err = sw.AuthConfig.NewTLSConfig()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot initialize tls config: %w", err)
|
|
}
|
|
}
|
|
setProxyHeaders := func(req *http.Request) error { return nil }
|
|
setFasthttpProxyHeaders := func(req *fasthttp.Request) error { return nil }
|
|
proxyURL := sw.ProxyURL
|
|
if !isTLS && proxyURL.IsHTTPOrHTTPS() {
|
|
// Send full sw.ScrapeURL in requests to a proxy host for non-TLS scrape targets
|
|
// like net/http package from Go does.
|
|
// See https://en.wikipedia.org/wiki/Proxy_server#Web_proxy_servers
|
|
pu := proxyURL.GetURL()
|
|
dialAddr = pu.Host
|
|
requestURI = sw.ScrapeURL
|
|
isTLS = pu.Scheme == "https"
|
|
if isTLS {
|
|
var err error
|
|
tlsCfg, err = sw.ProxyAuthConfig.NewTLSConfig()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot initialize proxy tls config: %w", err)
|
|
}
|
|
}
|
|
proxyURLOrig := proxyURL
|
|
setProxyHeaders = func(req *http.Request) error {
|
|
return proxyURLOrig.SetHeaders(sw.ProxyAuthConfig, req)
|
|
}
|
|
setFasthttpProxyHeaders = func(req *fasthttp.Request) error {
|
|
return proxyURLOrig.SetFasthttpHeaders(sw.ProxyAuthConfig, req)
|
|
}
|
|
proxyURL = &proxy.URL{}
|
|
}
|
|
hostPort = addMissingPort(hostPort, isTLS)
|
|
dialAddr = addMissingPort(dialAddr, isTLS)
|
|
dialFunc, err := newStatDialFunc(proxyURL, sw.ProxyAuthConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot create dial func: %w", err)
|
|
}
|
|
hc := &fasthttp.HostClient{
|
|
Addr: dialAddr,
|
|
// Name used in User-Agent request header
|
|
Name: scrapeUserAgent,
|
|
Dial: dialFunc,
|
|
IsTLS: isTLS,
|
|
TLSConfig: tlsCfg,
|
|
MaxIdleConnDuration: 2 * sw.ScrapeInterval,
|
|
ReadTimeout: sw.ScrapeTimeout,
|
|
WriteTimeout: 10 * time.Second,
|
|
MaxResponseBodySize: maxScrapeSize.IntN(),
|
|
MaxIdempotentRequestAttempts: 1,
|
|
ReadBufferSize: maxResponseHeadersSize.IntN(),
|
|
}
|
|
var sc *http.Client
|
|
var proxyURLFunc func(*http.Request) (*url.URL, error)
|
|
if pu := sw.ProxyURL.GetURL(); pu != nil {
|
|
proxyURLFunc = http.ProxyURL(pu)
|
|
}
|
|
sc = &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: tlsCfg,
|
|
Proxy: proxyURLFunc,
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
IdleConnTimeout: 2 * sw.ScrapeInterval,
|
|
DisableCompression: *disableCompression || sw.DisableCompression,
|
|
DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive,
|
|
DialContext: statStdDial,
|
|
MaxIdleConnsPerHost: 100,
|
|
MaxResponseHeaderBytes: int64(maxResponseHeadersSize.N),
|
|
},
|
|
Timeout: sw.ScrapeTimeout,
|
|
}
|
|
if sw.DenyRedirects {
|
|
sc.CheckRedirect = func(req *http.Request, via []*http.Request) error {
|
|
return http.ErrUseLastResponse
|
|
}
|
|
}
|
|
|
|
c := &client{
|
|
hc: hc,
|
|
ctx: ctx,
|
|
sc: sc,
|
|
scrapeURL: sw.ScrapeURL,
|
|
scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()),
|
|
hostPort: hostPort,
|
|
requestURI: requestURI,
|
|
setHeaders: func(req *http.Request) error {
|
|
return sw.AuthConfig.SetHeaders(req, true)
|
|
},
|
|
setProxyHeaders: setProxyHeaders,
|
|
setFasthttpHeaders: func(req *fasthttp.Request) error {
|
|
return sw.AuthConfig.SetFasthttpHeaders(req, true)
|
|
},
|
|
setFasthttpProxyHeaders: setFasthttpProxyHeaders,
|
|
denyRedirects: sw.DenyRedirects,
|
|
disableCompression: sw.DisableCompression,
|
|
disableKeepAlive: sw.DisableKeepAlive,
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
func (c *client) GetStreamReader() (*streamReader, error) {
|
|
deadline := time.Now().Add(c.sc.Timeout)
|
|
ctx, cancel := context.WithDeadline(c.ctx, deadline)
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.scrapeURL, nil)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("cannot create request for %q: %w", c.scrapeURL, err)
|
|
}
|
|
// The following `Accept` header has been copied from Prometheus sources.
|
|
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
|
|
// This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
|
|
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
|
|
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
|
|
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
|
|
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
|
|
req.Header.Set("User-Agent", scrapeUserAgent)
|
|
if err := c.setHeaders(req); err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
|
|
}
|
|
if err := c.setProxyHeaders(req); err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
|
|
}
|
|
scrapeRequests.Inc()
|
|
resp, err := c.sc.Do(req)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("cannot scrape %q: %w", c.scrapeURL, err)
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, resp.StatusCode)).Inc()
|
|
respBody, _ := io.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
cancel()
|
|
return nil, fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q",
|
|
c.scrapeURL, resp.StatusCode, http.StatusOK, respBody)
|
|
}
|
|
scrapesOK.Inc()
|
|
sr := &streamReader{
|
|
r: resp.Body,
|
|
cancel: cancel,
|
|
scrapeURL: c.scrapeURL,
|
|
maxBodySize: int64(c.hc.MaxResponseBodySize),
|
|
}
|
|
return sr, nil
|
|
}
|
|
|
|
// checks fasthttp status code for redirect as standard http/client does.
|
|
func isStatusRedirect(statusCode int) bool {
|
|
switch statusCode {
|
|
case 301, 302, 303, 307, 308:
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (c *client) ReadData(dst []byte) ([]byte, error) {
|
|
deadline := time.Now().Add(c.hc.ReadTimeout)
|
|
req := fasthttp.AcquireRequest()
|
|
req.SetRequestURI(c.requestURI)
|
|
req.Header.SetHost(c.hostPort)
|
|
// The following `Accept` header has been copied from Prometheus sources.
|
|
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
|
|
// This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
|
|
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
|
|
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
|
|
// Set X-Prometheus-Scrape-Timeout-Seconds like Prometheus does, since it is used by some exporters such as PushProx.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1179#issuecomment-813117162
|
|
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", c.scrapeTimeoutSecondsStr)
|
|
if err := c.setFasthttpHeaders(req); err != nil {
|
|
return nil, fmt.Errorf("failed to set request headers for %q: %w", c.scrapeURL, err)
|
|
}
|
|
if err := c.setFasthttpProxyHeaders(req); err != nil {
|
|
return nil, fmt.Errorf("failed to set proxy request headers for %q: %w", c.scrapeURL, err)
|
|
}
|
|
if !*disableCompression && !c.disableCompression {
|
|
req.Header.Set("Accept-Encoding", "gzip")
|
|
}
|
|
if *disableKeepAlive || c.disableKeepAlive {
|
|
req.SetConnectionClose()
|
|
}
|
|
resp := fasthttp.AcquireResponse()
|
|
swapResponseBodies := len(dst) == 0
|
|
if swapResponseBodies {
|
|
// An optimization: write response directly to dst.
|
|
// This should reduce memory usage when scraping big targets.
|
|
dst = resp.SwapBody(dst)
|
|
}
|
|
|
|
ctx, cancel := context.WithDeadline(c.ctx, deadline)
|
|
defer cancel()
|
|
|
|
err := doRequestWithPossibleRetry(ctx, c.hc, req, resp)
|
|
statusCode := resp.StatusCode()
|
|
redirectsCount := 0
|
|
for err == nil && isStatusRedirect(statusCode) {
|
|
if redirectsCount > 5 {
|
|
err = fmt.Errorf("too many redirects")
|
|
break
|
|
}
|
|
if c.denyRedirects {
|
|
err = fmt.Errorf("cannot follow redirects if `follow_redirects: false` is set")
|
|
break
|
|
}
|
|
// It is expected that the redirect is made on the same host.
|
|
// Otherwise it won't work.
|
|
location := resp.Header.Peek("Location")
|
|
if len(location) == 0 {
|
|
err = fmt.Errorf("missing Location header")
|
|
break
|
|
}
|
|
req.URI().UpdateBytes(location)
|
|
err = doRequestWithPossibleRetry(ctx, c.hc, req, resp)
|
|
statusCode = resp.StatusCode()
|
|
redirectsCount++
|
|
}
|
|
if swapResponseBodies {
|
|
dst = resp.SwapBody(dst)
|
|
}
|
|
fasthttp.ReleaseRequest(req)
|
|
if err != nil {
|
|
fasthttp.ReleaseResponse(resp)
|
|
if err == fasthttp.ErrTimeout {
|
|
scrapesTimedout.Inc()
|
|
return dst, fmt.Errorf("error when scraping %q with timeout %s: %w", c.scrapeURL, c.hc.ReadTimeout, err)
|
|
}
|
|
if err == fasthttp.ErrBodyTooLarge {
|
|
maxScrapeSizeExceeded.Inc()
|
|
return dst, fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize=%d; "+
|
|
"either reduce the response size for the target or increase -promscrape.maxScrapeSize", c.scrapeURL, maxScrapeSize.N)
|
|
}
|
|
return dst, fmt.Errorf("error when scraping %q: %w", c.scrapeURL, err)
|
|
}
|
|
if ce := resp.Header.Peek("Content-Encoding"); string(ce) == "gzip" {
|
|
var err error
|
|
if swapResponseBodies {
|
|
zb := gunzipBufPool.Get()
|
|
zb.B, err = fasthttp.AppendGunzipBytes(zb.B[:0], dst)
|
|
dst = append(dst[:0], zb.B...)
|
|
gunzipBufPool.Put(zb)
|
|
} else {
|
|
dst, err = fasthttp.AppendGunzipBytes(dst, resp.Body())
|
|
}
|
|
if err != nil {
|
|
fasthttp.ReleaseResponse(resp)
|
|
scrapesGunzipFailed.Inc()
|
|
return dst, fmt.Errorf("cannot ungzip response from %q: %w", c.scrapeURL, err)
|
|
}
|
|
scrapesGunzipped.Inc()
|
|
} else if !swapResponseBodies {
|
|
dst = append(dst, resp.Body()...)
|
|
}
|
|
fasthttp.ReleaseResponse(resp)
|
|
if len(dst) > c.hc.MaxResponseBodySize {
|
|
maxScrapeSizeExceeded.Inc()
|
|
return dst, fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize=%d (the actual response size is %d bytes); "+
|
|
"either reduce the response size for the target or increase -promscrape.maxScrapeSize", c.scrapeURL, maxScrapeSize.N, len(dst))
|
|
}
|
|
if statusCode != fasthttp.StatusOK {
|
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, statusCode)).Inc()
|
|
return dst, fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q",
|
|
c.scrapeURL, statusCode, fasthttp.StatusOK, dst)
|
|
}
|
|
scrapesOK.Inc()
|
|
return dst, nil
|
|
}
|
|
|
|
var gunzipBufPool bytesutil.ByteBufferPool
|
|
|
|
var (
|
|
maxScrapeSizeExceeded = metrics.NewCounter(`vm_promscrape_max_scrape_size_exceeded_errors_total`)
|
|
scrapesTimedout = metrics.NewCounter(`vm_promscrape_scrapes_timed_out_total`)
|
|
scrapesOK = metrics.NewCounter(`vm_promscrape_scrapes_total{status_code="200"}`)
|
|
scrapesGunzipped = metrics.NewCounter(`vm_promscrape_scrapes_gunziped_total`)
|
|
scrapesGunzipFailed = metrics.NewCounter(`vm_promscrape_scrapes_gunzip_failed_total`)
|
|
scrapeRequests = metrics.NewCounter(`vm_promscrape_scrape_requests_total`)
|
|
scrapeRetries = metrics.NewCounter(`vm_promscrape_scrape_retries_total`)
|
|
)
|
|
|
|
func doRequestWithPossibleRetry(ctx context.Context, hc *fasthttp.HostClient, req *fasthttp.Request, resp *fasthttp.Response) error {
|
|
scrapeRequests.Inc()
|
|
|
|
var reqErr error
|
|
// Return true if the request execution is completed and retry is not required
|
|
attempt := func() bool {
|
|
// Use DoCtx instead of Do in order to support context cancellation
|
|
reqErr = hc.DoCtx(ctx, req, resp)
|
|
if reqErr == nil {
|
|
statusCode := resp.StatusCode()
|
|
if statusCode != fasthttp.StatusTooManyRequests {
|
|
return true
|
|
}
|
|
} else if reqErr != fasthttp.ErrConnectionClosed && !strings.Contains(reqErr.Error(), "broken pipe") {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
if attempt() {
|
|
return reqErr
|
|
}
|
|
|
|
// The first attempt was unsuccessful. Use exponential backoff for further attempts.
|
|
// Perform the second attempt immediately after the first attempt - this should help
|
|
// in cases when the remote side closes the keep-alive connection before the first attempt.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3293
|
|
sleepTime := time.Second
|
|
// It is expected that the deadline is already set to ctx, so the loop below
|
|
// should eventually finish if all the attempt() calls are unsuccessful.
|
|
for {
|
|
scrapeRetries.Inc()
|
|
if attempt() {
|
|
return reqErr
|
|
}
|
|
sleepTime += sleepTime
|
|
if !discoveryutils.SleepCtx(ctx, sleepTime) {
|
|
return reqErr
|
|
}
|
|
}
|
|
}
|
|
|
|
type streamReader struct {
|
|
r io.ReadCloser
|
|
cancel context.CancelFunc
|
|
bytesRead int64
|
|
scrapeURL string
|
|
maxBodySize int64
|
|
}
|
|
|
|
func (sr *streamReader) Read(p []byte) (int, error) {
|
|
n, err := sr.r.Read(p)
|
|
sr.bytesRead += int64(n)
|
|
if err == nil && sr.bytesRead > sr.maxBodySize {
|
|
maxScrapeSizeExceeded.Inc()
|
|
err = fmt.Errorf("the response from %q exceeds -promscrape.maxScrapeSize=%d; "+
|
|
"either reduce the response size for the target or increase -promscrape.maxScrapeSize", sr.scrapeURL, sr.maxBodySize)
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (sr *streamReader) MustClose() {
|
|
sr.cancel()
|
|
if err := sr.r.Close(); err != nil {
|
|
logger.Errorf("cannot close reader: %s", err)
|
|
}
|
|
}
|