mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-23 11:21:26 +01:00
f03e81c693
- Make sure that invalid/missing TLS CA file or TLS client certificate files at vmagent startup don't prevent from processing the corresponding scrape targets after the file becomes correct, without the need to restart vmagent. Previously scrape targets with invalid TLS CA file or TLS client certificate files were permanently dropped after the first attempt to initialize them, and they didn't appear until the next vmagent reload or the next change in other places of the loaded scrape configs. - Make sure that TLS CA is properly re-loaded from file after it changes without the need to restart vmagent. Previously the old TLS CA was used until vmagent restart. - Properly handle errors during http request creation for the second attempt to send data to remote system at vmagent and vmalert. Previously failed request creation could result in nil pointer dereferencing, since the returned request is nil on error. - Add more context to the logged error during AWS sigv4 request signing before sending the data to -remoteWrite.url at vmagent. Previously it could miss details on the source of the request. - Do not create a new HTTP client per second when generating OAuth2 token needed to put in Authorization header of every http request issued by vmagent during service discovery or target scraping. Re-use the HTTP client instead until the corresponding scrape config changes. - Cache error at lib/promauth.Config.GetAuthHeader() in the same way as the auth header is cached, e.g. the error is cached for a second now. This should reduce load on CPU and OAuth2 server when auth header cannot be obtained because of temporary error. - Share tls.Config.GetClientCertificate function among multiple scrape targets with the same tls_config. Cache the loaded certificate and the error for one second. This should significantly reduce CPU load when scraping big number of targets with the same tls_config. - Allow loading TLS certificates from HTTP and HTTPs urls by specifying these urls at `tls_config->cert_file` and `tls_config->key_file`. - Improve test coverage at lib/promauth - Skip unreachable or invalid files specified at `scrape_config_files` during vmagent startup, since these files may become valid later. Previously vmagent was exitting in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959
329 lines
9.2 KiB
Go
329 lines
9.2 KiB
Go
package remotewrite
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/golang/snappy"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
const (
|
|
defaultConcurrency = 4
|
|
defaultMaxBatchSize = 1e3
|
|
defaultMaxQueueSize = 1e5
|
|
defaultFlushInterval = 5 * time.Second
|
|
defaultWriteTimeout = 30 * time.Second
|
|
)
|
|
|
|
var (
|
|
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
|
|
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
|
|
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
|
|
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
|
|
)
|
|
|
|
// Client is an asynchronous HTTP client for writing
|
|
// timeseries via remote write protocol.
|
|
type Client struct {
|
|
addr string
|
|
c *http.Client
|
|
authCfg *promauth.Config
|
|
input chan prompbmarshal.TimeSeries
|
|
flushInterval time.Duration
|
|
maxBatchSize int
|
|
maxQueueSize int
|
|
|
|
wg sync.WaitGroup
|
|
doneCh chan struct{}
|
|
}
|
|
|
|
// Config is config for remote write client.
|
|
type Config struct {
|
|
// Addr of remote storage
|
|
Addr string
|
|
AuthCfg *promauth.Config
|
|
|
|
// Concurrency defines number of readers that
|
|
// concurrently read from the queue and flush data
|
|
Concurrency int
|
|
// MaxBatchSize defines max number of timeseries
|
|
// to be flushed at once
|
|
MaxBatchSize int
|
|
// MaxQueueSize defines max length of input queue
|
|
// populated by Push method.
|
|
// Push will be rejected once queue is full.
|
|
MaxQueueSize int
|
|
// FlushInterval defines time interval for flushing batches
|
|
FlushInterval time.Duration
|
|
// Transport will be used by the underlying http.Client
|
|
Transport *http.Transport
|
|
}
|
|
|
|
// NewClient returns asynchronous client for
|
|
// writing timeseries via remotewrite protocol.
|
|
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
|
if cfg.Addr == "" {
|
|
return nil, fmt.Errorf("config.Addr can't be empty")
|
|
}
|
|
if cfg.MaxBatchSize == 0 {
|
|
cfg.MaxBatchSize = defaultMaxBatchSize
|
|
}
|
|
if cfg.MaxQueueSize == 0 {
|
|
cfg.MaxQueueSize = defaultMaxQueueSize
|
|
}
|
|
if cfg.FlushInterval == 0 {
|
|
cfg.FlushInterval = defaultFlushInterval
|
|
}
|
|
if cfg.Transport == nil {
|
|
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
|
|
}
|
|
cc := defaultConcurrency
|
|
if cfg.Concurrency > 0 {
|
|
cc = cfg.Concurrency
|
|
}
|
|
c := &Client{
|
|
c: &http.Client{
|
|
Timeout: *sendTimeout,
|
|
Transport: cfg.Transport,
|
|
},
|
|
addr: strings.TrimSuffix(cfg.Addr, "/"),
|
|
authCfg: cfg.AuthCfg,
|
|
flushInterval: cfg.FlushInterval,
|
|
maxBatchSize: cfg.MaxBatchSize,
|
|
maxQueueSize: cfg.MaxQueueSize,
|
|
doneCh: make(chan struct{}),
|
|
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
|
|
}
|
|
|
|
for i := 0; i < cc; i++ {
|
|
c.run(ctx)
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// Push adds timeseries into queue for writing into remote storage.
|
|
// Push returns and error if client is stopped or if queue is full.
|
|
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
|
|
select {
|
|
case <-c.doneCh:
|
|
return fmt.Errorf("client is closed")
|
|
case c.input <- s:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
|
|
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
|
|
c.maxQueueSize)
|
|
}
|
|
}
|
|
|
|
// Close stops the client and waits for all goroutines
|
|
// to exit.
|
|
func (c *Client) Close() error {
|
|
if c.doneCh == nil {
|
|
return fmt.Errorf("client is already closed")
|
|
}
|
|
close(c.input)
|
|
close(c.doneCh)
|
|
c.wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) run(ctx context.Context) {
|
|
ticker := time.NewTicker(c.flushInterval)
|
|
wr := &prompbmarshal.WriteRequest{}
|
|
shutdown := func() {
|
|
for ts := range c.input {
|
|
wr.Timeseries = append(wr.Timeseries, ts)
|
|
}
|
|
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
|
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
|
|
c.flush(lastCtx, wr)
|
|
cancel()
|
|
}
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-c.doneCh:
|
|
shutdown()
|
|
return
|
|
case <-ctx.Done():
|
|
shutdown()
|
|
return
|
|
case <-ticker.C:
|
|
c.flush(ctx, wr)
|
|
case ts, ok := <-c.input:
|
|
if !ok {
|
|
continue
|
|
}
|
|
wr.Timeseries = append(wr.Timeseries, ts)
|
|
if len(wr.Timeseries) >= c.maxBatchSize {
|
|
c.flush(ctx, wr)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
var (
|
|
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
|
|
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
|
|
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
|
|
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
|
|
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
|
|
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
|
|
|
|
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
|
|
return float64(*concurrency)
|
|
})
|
|
)
|
|
|
|
// flush is a blocking function that marshals WriteRequest and sends
|
|
// it to remote-write endpoint. Flush performs limited amount of retries
|
|
// if request fails.
|
|
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
|
if len(wr.Timeseries) < 1 {
|
|
return
|
|
}
|
|
defer prompbmarshal.ResetWriteRequest(wr)
|
|
defer bufferFlushDuration.UpdateDuration(time.Now())
|
|
|
|
data, err := wr.Marshal()
|
|
if err != nil {
|
|
logger.Errorf("failed to marshal WriteRequest: %s", err)
|
|
return
|
|
}
|
|
|
|
b := snappy.Encode(nil, data)
|
|
|
|
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
|
|
if retryInterval > maxRetryInterval {
|
|
retryInterval = maxRetryInterval
|
|
}
|
|
timeStart := time.Now()
|
|
defer func() {
|
|
sendDuration.Add(time.Since(timeStart).Seconds())
|
|
}()
|
|
L:
|
|
for attempts := 0; ; attempts++ {
|
|
err := c.send(ctx, b)
|
|
if err == nil {
|
|
sentRows.Add(len(wr.Timeseries))
|
|
sentBytes.Add(len(b))
|
|
return
|
|
}
|
|
|
|
_, isNotRetriable := err.(*nonRetriableError)
|
|
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
|
|
|
|
if isNotRetriable {
|
|
// exit fast if error isn't retriable
|
|
break
|
|
}
|
|
|
|
// check if request has been cancelled before backoff
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
|
|
break L
|
|
default:
|
|
}
|
|
|
|
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
|
|
if timeLeftForRetries < 0 {
|
|
// the max retry time has passed, so we give up
|
|
break
|
|
}
|
|
|
|
if retryInterval > timeLeftForRetries {
|
|
retryInterval = timeLeftForRetries
|
|
}
|
|
// sleeping to prevent remote db hammering
|
|
time.Sleep(retryInterval)
|
|
retryInterval *= 2
|
|
|
|
}
|
|
|
|
droppedRows.Add(len(wr.Timeseries))
|
|
droppedBytes.Add(len(b))
|
|
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
|
|
len(wr.Timeseries))
|
|
}
|
|
|
|
func (c *Client) send(ctx context.Context, data []byte) error {
|
|
r := bytes.NewReader(data)
|
|
req, err := http.NewRequest(http.MethodPost, c.addr, r)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
|
}
|
|
|
|
// RFC standard compliant headers
|
|
req.Header.Set("Content-Encoding", "snappy")
|
|
req.Header.Set("Content-Type", "application/x-protobuf")
|
|
|
|
// Prometheus compliant headers
|
|
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
|
|
|
if c.authCfg != nil {
|
|
err = c.authCfg.SetHeaders(req, true)
|
|
if err != nil {
|
|
return &nonRetriableError{
|
|
err: err,
|
|
}
|
|
}
|
|
}
|
|
if !*disablePathAppend {
|
|
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
|
|
}
|
|
resp, err := c.c.Do(req.WithContext(ctx))
|
|
if err != nil {
|
|
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
|
req.URL.Redacted(), err, len(data), r.Size())
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
body, _ := io.ReadAll(resp.Body)
|
|
|
|
// according to https://prometheus.io/docs/concepts/remote_write_spec/
|
|
// Prometheus remote Write compatible receivers MUST
|
|
switch resp.StatusCode / 100 {
|
|
case 2:
|
|
// respond with a HTTP 2xx status code when the write is successful.
|
|
return nil
|
|
case 4:
|
|
if resp.StatusCode != http.StatusTooManyRequests {
|
|
// MUST NOT retry write requests on HTTP 4xx responses other than 429
|
|
return &nonRetriableError{
|
|
err: fmt.Errorf("unexpected response code %d for %s. Response body %q", resp.StatusCode, req.URL.Redacted(), body),
|
|
}
|
|
}
|
|
fallthrough
|
|
default:
|
|
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
|
resp.StatusCode, req.URL.Redacted(), body)
|
|
}
|
|
}
|
|
|
|
type nonRetriableError struct {
|
|
err error
|
|
}
|
|
|
|
func (e *nonRetriableError) Error() string {
|
|
return e.err.Error()
|
|
}
|