2020-04-27 23:19:27 +02:00
package remotewrite
import (
"bytes"
"context"
2022-05-13 15:19:32 +02:00
"flag"
2020-04-27 23:19:27 +02:00
"fmt"
"io/ioutil"
"net/http"
2021-09-16 13:00:16 +02:00
"path"
2020-04-27 23:19:27 +02:00
"strings"
"sync"
"time"
2021-09-14 13:32:06 +02:00
"github.com/golang/snappy"
2020-04-27 23:19:27 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2021-09-14 13:32:06 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2020-04-27 23:19:27 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2020-07-05 17:46:52 +02:00
"github.com/VictoriaMetrics/metrics"
2020-04-27 23:19:27 +02:00
)
2022-05-13 15:19:32 +02:00
var (
disablePathAppend = flag . Bool ( "remoteWrite.disablePathAppend" , false , "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url." )
)
2020-04-27 23:19:27 +02:00
// Client is an asynchronous HTTP client for writing
// timeseries via remote write protocol.
type Client struct {
2022-05-13 15:19:32 +02:00
addr string
c * http . Client
authCfg * promauth . Config
input chan prompbmarshal . TimeSeries
flushInterval time . Duration
maxBatchSize int
maxQueueSize int
2020-04-27 23:19:27 +02:00
wg sync . WaitGroup
doneCh chan struct { }
}
2020-04-28 10:19:37 +02:00
// Config is config for remote write.
2020-04-27 23:19:27 +02:00
type Config struct {
// Addr of remote storage
2021-09-14 13:32:06 +02:00
Addr string
AuthCfg * promauth . Config
2020-04-27 23:19:27 +02:00
2020-06-01 12:46:37 +02:00
// Concurrency defines number of readers that
// concurrently read from the queue and flush data
Concurrency int
2020-04-27 23:19:27 +02:00
// MaxBatchSize defines max number of timeseries
// to be flushed at once
MaxBatchSize int
// MaxQueueSize defines max length of input queue
2020-06-01 12:46:37 +02:00
// populated by Push method.
// Push will be rejected once queue is full.
2020-04-27 23:19:27 +02:00
MaxQueueSize int
// FlushInterval defines time interval for flushing batches
FlushInterval time . Duration
// WriteTimeout defines timeout for HTTP write request
// to remote storage
WriteTimeout time . Duration
2020-06-23 21:45:45 +02:00
// Transport will be used by the underlying http.Client
Transport * http . Transport
2021-08-16 13:20:57 +02:00
// DisablePathAppend can be used to not automatically append '/api/v1/write' to the remote write url
DisablePathAppend bool
2020-04-27 23:19:27 +02:00
}
const (
2020-06-01 12:46:37 +02:00
defaultConcurrency = 4
2020-04-27 23:19:27 +02:00
defaultMaxBatchSize = 1e3
2020-06-01 12:46:37 +02:00
defaultMaxQueueSize = 1e5
2020-07-05 17:46:52 +02:00
defaultFlushInterval = 5 * time . Second
2020-04-27 23:19:27 +02:00
defaultWriteTimeout = 30 * time . Second
)
// NewClient returns asynchronous client for
// writing timeseries via remotewrite protocol.
func NewClient ( ctx context . Context , cfg Config ) ( * Client , error ) {
if cfg . Addr == "" {
return nil , fmt . Errorf ( "config.Addr can't be empty" )
}
if cfg . MaxBatchSize == 0 {
cfg . MaxBatchSize = defaultMaxBatchSize
}
if cfg . MaxQueueSize == 0 {
cfg . MaxQueueSize = defaultMaxQueueSize
}
if cfg . FlushInterval == 0 {
cfg . FlushInterval = defaultFlushInterval
}
if cfg . WriteTimeout == 0 {
cfg . WriteTimeout = defaultWriteTimeout
}
2020-07-05 17:46:52 +02:00
if cfg . Transport == nil {
cfg . Transport = http . DefaultTransport . ( * http . Transport ) . Clone ( )
}
2021-09-16 13:00:16 +02:00
cc := defaultConcurrency
if cfg . Concurrency > 0 {
cc = cfg . Concurrency
}
2020-04-27 23:19:27 +02:00
c := & Client {
c : & http . Client {
2020-06-23 21:45:45 +02:00
Timeout : cfg . WriteTimeout ,
Transport : cfg . Transport ,
2020-04-27 23:19:27 +02:00
} ,
2022-05-13 15:19:32 +02:00
addr : strings . TrimSuffix ( cfg . Addr , "/" ) ,
authCfg : cfg . AuthCfg ,
flushInterval : cfg . FlushInterval ,
maxBatchSize : cfg . MaxBatchSize ,
maxQueueSize : cfg . MaxQueueSize ,
doneCh : make ( chan struct { } ) ,
input : make ( chan prompbmarshal . TimeSeries , cfg . MaxQueueSize ) ,
2020-04-27 23:19:27 +02:00
}
2021-09-16 13:00:16 +02:00
2020-06-01 12:46:37 +02:00
for i := 0 ; i < cc ; i ++ {
c . run ( ctx )
}
2020-04-27 23:19:27 +02:00
return c , nil
}
// Push adds timeseries into queue for writing into remote storage.
// Push returns and error if client is stopped or if queue is full.
func ( c * Client ) Push ( s prompbmarshal . TimeSeries ) error {
select {
case <- c . doneCh :
return fmt . Errorf ( "client is closed" )
case c . input <- s :
return nil
default :
2020-05-13 20:32:21 +02:00
return fmt . Errorf ( "failed to push timeseries - queue is full (%d entries). " +
"Queue size is controlled by -remoteWrite.maxQueueSize flag" ,
2020-04-27 23:19:27 +02:00
c . maxQueueSize )
}
}
// Close stops the client and waits for all goroutines
// to exit.
func ( c * Client ) Close ( ) error {
if c . doneCh == nil {
return fmt . Errorf ( "client is already closed" )
}
close ( c . input )
close ( c . doneCh )
c . wg . Wait ( )
return nil
}
func ( c * Client ) run ( ctx context . Context ) {
ticker := time . NewTicker ( c . flushInterval )
2020-07-05 17:46:52 +02:00
wr := & prompbmarshal . WriteRequest { }
2020-04-27 23:19:27 +02:00
shutdown := func ( ) {
for ts := range c . input {
wr . Timeseries = append ( wr . Timeseries , ts )
}
2020-06-01 12:46:37 +02:00
lastCtx , cancel := context . WithTimeout ( context . Background ( ) , defaultWriteTimeout )
2020-04-27 23:19:27 +02:00
c . flush ( lastCtx , wr )
cancel ( )
}
c . wg . Add ( 1 )
go func ( ) {
defer c . wg . Done ( )
defer ticker . Stop ( )
for {
select {
case <- c . doneCh :
shutdown ( )
return
case <- ctx . Done ( ) :
shutdown ( )
return
case <- ticker . C :
c . flush ( ctx , wr )
2020-07-05 17:46:52 +02:00
case ts , ok := <- c . input :
if ! ok {
continue
}
2020-04-27 23:19:27 +02:00
wr . Timeseries = append ( wr . Timeseries , ts )
if len ( wr . Timeseries ) >= c . maxBatchSize {
c . flush ( ctx , wr )
}
}
}
} ( )
}
2020-07-05 17:46:52 +02:00
var (
2021-09-16 13:00:16 +02:00
sentRows = metrics . NewCounter ( ` vmalert_remotewrite_sent_rows_total ` )
sentBytes = metrics . NewCounter ( ` vmalert_remotewrite_sent_bytes_total ` )
droppedRows = metrics . NewCounter ( ` vmalert_remotewrite_dropped_rows_total ` )
droppedBytes = metrics . NewCounter ( ` vmalert_remotewrite_dropped_bytes_total ` )
bufferFlushDuration = metrics . NewHistogram ( ` vmalert_remotewrite_flush_duration_seconds ` )
2020-07-05 17:46:52 +02:00
)
// flush is a blocking function that marshals WriteRequest and sends
// it to remote write endpoint. Flush performs limited amount of retries
// if request fails.
func ( c * Client ) flush ( ctx context . Context , wr * prompbmarshal . WriteRequest ) {
2020-04-27 23:19:27 +02:00
if len ( wr . Timeseries ) < 1 {
return
}
2020-07-05 17:46:52 +02:00
defer prompbmarshal . ResetWriteRequest ( wr )
2021-09-16 13:00:16 +02:00
defer bufferFlushDuration . UpdateDuration ( time . Now ( ) )
2020-07-05 17:46:52 +02:00
2020-04-27 23:19:27 +02:00
data , err := wr . Marshal ( )
if err != nil {
logger . Errorf ( "failed to marshal WriteRequest: %s" , err )
return
}
2020-07-05 17:46:52 +02:00
const attempts = 5
b := snappy . Encode ( nil , data )
for i := 0 ; i < attempts ; i ++ {
err := c . send ( ctx , b )
if err == nil {
sentRows . Add ( len ( wr . Timeseries ) )
sentBytes . Add ( len ( b ) )
return
}
logger . Errorf ( "attempt %d to send request failed: %s" , i + 1 , err )
// sleeping to avoid remote db hammering
time . Sleep ( time . Second )
continue
}
droppedRows . Add ( len ( wr . Timeseries ) )
droppedBytes . Add ( len ( b ) )
2022-03-29 15:09:07 +02:00
logger . Errorf ( "all %d attempts to send request failed - dropping %d time series" ,
2020-07-05 17:46:52 +02:00
attempts , len ( wr . Timeseries ) )
}
func ( c * Client ) send ( ctx context . Context , data [ ] byte ) error {
r := bytes . NewReader ( data )
req , err := http . NewRequest ( "POST" , c . addr , r )
2020-04-27 23:19:27 +02:00
if err != nil {
2020-07-15 12:54:45 +02:00
return fmt . Errorf ( "failed to create new HTTP request: %w" , err )
2020-04-27 23:19:27 +02:00
}
2022-06-07 14:33:21 +02:00
2022-06-13 08:59:03 +02:00
// RFC standard compliant headers
2022-06-07 14:33:21 +02:00
req . Header . Set ( "Content-Encoding" , "snappy" )
2022-06-13 08:59:03 +02:00
req . Header . Set ( "Content-Type" , "application/x-protobuf" )
// Prometheus compliant headers
req . Header . Set ( "X-Prometheus-Remote-Write-Version" , "0.1.0" )
2022-06-07 14:33:21 +02:00
2021-09-14 13:32:06 +02:00
if c . authCfg != nil {
if auth := c . authCfg . GetAuthHeader ( ) ; auth != "" {
req . Header . Set ( "Authorization" , auth )
}
2020-04-27 23:19:27 +02:00
}
2022-05-13 15:19:32 +02:00
if ! * disablePathAppend {
req . URL . Path = path . Join ( req . URL . Path , "/api/v1/write" )
2021-08-16 13:20:57 +02:00
}
2020-04-27 23:19:27 +02:00
resp , err := c . c . Do ( req . WithContext ( ctx ) )
if err != nil {
2020-07-15 12:54:45 +02:00
return fmt . Errorf ( "error while sending request to %s: %w; Data len %d(%d)" ,
2021-10-18 09:20:26 +02:00
req . URL . Redacted ( ) , err , len ( data ) , r . Size ( ) )
2020-04-27 23:19:27 +02:00
}
defer func ( ) { _ = resp . Body . Close ( ) } ( )
2021-08-20 10:58:32 +02:00
if resp . StatusCode != http . StatusNoContent && resp . StatusCode != http . StatusOK {
2020-04-27 23:19:27 +02:00
body , _ := ioutil . ReadAll ( resp . Body )
2020-07-05 17:46:52 +02:00
return fmt . Errorf ( "unexpected response code %d for %s. Response body %q" ,
2021-10-18 09:20:26 +02:00
resp . StatusCode , req . URL . Redacted ( ) , body )
2020-04-27 23:19:27 +02:00
}
2020-07-05 17:46:52 +02:00
return nil
2020-04-27 23:19:27 +02:00
}