app/vmalert: add retries to remotewrite (#605)

* app/vmalert: add retries to remotewrite

Remotewrite pkg now does limited number of retries if write request failed.
This suppose to make vmalert state persisting more reliable.

New metrics were added to remotewrite in order to track rows/bytes sent/dropped.

defaultFlushInterval was increased from 1s to 5s for sanity reasons.

* fix

* wip

* wip

* wip

* fix bits alignment bug for 32-bit systems

* fix mistakenly dropped field
This commit is contained in:
Roman Khavronenko 2020-07-05 16:46:52 +01:00 committed by Aliaksandr Valialkin
parent 82871fb7a5
commit 9afd19d375
3 changed files with 157 additions and 18 deletions

View File

@ -144,7 +144,6 @@ var (
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
)
@ -255,7 +254,6 @@ func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, inter
}
if len(tss) > 0 && e.rw != nil {
remoteWriteSent.Add(len(tss))
for _, ts := range tss {
if err := e.rw.Push(ts); err != nil {
remoteWriteErrors.Inc()

View File

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy"
)
@ -61,7 +62,7 @@ const (
defaultConcurrency = 4
defaultMaxBatchSize = 1e3
defaultMaxQueueSize = 1e5
defaultFlushInterval = time.Second
defaultFlushInterval = 5 * time.Second
defaultWriteTimeout = 30 * time.Second
)
@ -85,6 +86,9 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.WriteTimeout == 0 {
cfg.WriteTimeout = defaultWriteTimeout
}
if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
}
c := &Client{
c: &http.Client{
Timeout: cfg.WriteTimeout,
@ -138,14 +142,11 @@ func (c *Client) Close() error {
func (c *Client) run(ctx context.Context) {
ticker := time.NewTicker(c.flushInterval)
wr := prompbmarshal.WriteRequest{}
wr := &prompbmarshal.WriteRequest{}
shutdown := func() {
for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts)
}
if len(wr.Timeseries) < 1 {
return
}
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
c.flush(lastCtx, wr)
cancel()
@ -164,44 +165,82 @@ func (c *Client) run(ctx context.Context) {
return
case <-ticker.C:
c.flush(ctx, wr)
wr = prompbmarshal.WriteRequest{}
case ts := <-c.input:
case ts, ok := <-c.input:
if !ok {
continue
}
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(ctx, wr)
wr = prompbmarshal.WriteRequest{}
}
}
}
}()
}
func (c *Client) flush(ctx context.Context, wr prompbmarshal.WriteRequest) {
var (
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
)
// flush is a blocking function that marshals WriteRequest and sends
// it to remote write endpoint. Flush performs limited amount of retries
// if request fails.
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
if len(wr.Timeseries) < 1 {
return
}
defer prompbmarshal.ResetWriteRequest(wr)
data, err := wr.Marshal()
if err != nil {
logger.Errorf("failed to marshal WriteRequest: %s", err)
return
}
req, err := http.NewRequest("POST", c.addr, bytes.NewReader(snappy.Encode(nil, data)))
if err != nil {
logger.Errorf("failed to create new HTTP request: %s", err)
const attempts = 5
b := snappy.Encode(nil, data)
for i := 0; i < attempts; i++ {
err := c.send(ctx, b)
if err == nil {
sentRows.Add(len(wr.Timeseries))
sentBytes.Add(len(b))
return
}
logger.Errorf("attempt %d to send request failed: %s", i+1, err)
// sleeping to avoid remote db hammering
time.Sleep(time.Second)
continue
}
droppedRows.Add(len(wr.Timeseries))
droppedBytes.Add(len(b))
logger.Errorf("all %d attempts to send request failed - dropping %d timeseries",
attempts, len(wr.Timeseries))
}
func (c *Client) send(ctx context.Context, data []byte) error {
r := bytes.NewReader(data)
req, err := http.NewRequest("POST", c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %s", err)
}
if c.baPass != "" {
req.SetBasicAuth(c.baUser, c.baPass)
}
resp, err := c.c.Do(req.WithContext(ctx))
if err != nil {
logger.Errorf("error getting response from %s:%s", req.URL, err)
return
return fmt.Errorf("error while sending request to %s: %s; Data len %d(%d)",
req.URL, err, len(data), r.Size())
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusNoContent {
body, _ := ioutil.ReadAll(resp.Body)
logger.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body)
return
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL, body)
}
return nil
}

View File

@ -0,0 +1,102 @@
package remotewrite
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestClient_Push(t *testing.T) {
testSrv := newRWServer()
cfg := Config{
Addr: testSrv.URL,
MaxBatchSize: 100,
}
client, err := NewClient(context.Background(), cfg)
if err != nil {
t.Fatalf("failed to create client: %s", err)
}
const rowsN = 1e4
var sent int
for i := 0; i < rowsN; i++ {
s := prompbmarshal.TimeSeries{
Samples: []prompbmarshal.Sample{{
Value: rand.Float64(),
Timestamp: time.Now().Unix(),
}},
}
err := client.Push(s)
if err == nil {
sent++
}
}
if sent == 0 {
t.Fatalf("0 series sent")
}
if err := client.Close(); err != nil {
t.Fatalf("failed to close client: %s", err)
}
got := testSrv.accepted()
if got != sent {
t.Fatalf("expected to have %d series; got %d", sent, got)
}
}
func newRWServer() *rwServer {
rw := &rwServer{}
rw.Server = httptest.NewServer(http.HandlerFunc(rw.handler))
return rw
}
type rwServer struct {
// WARN: ordering of fields is important for alignment!
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG
acceptedRows uint64
*httptest.Server
}
func (rw *rwServer) accepted() int {
return int(atomic.LoadUint64(&rw.acceptedRows))
}
func (rw *rwServer) err(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
}
func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
rw.err(w, fmt.Errorf("bad method %q", r.Method))
return
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
rw.err(w, fmt.Errorf("body read err: %s", err))
return
}
defer func() { _ = r.Body.Close() }()
b, err := snappy.Decode(nil, data)
if err != nil {
rw.err(w, fmt.Errorf("decode err: %s", err))
return
}
wr := &prompb.WriteRequest{}
if err := wr.Unmarshal(b); err != nil {
rw.err(w, fmt.Errorf("unmarhsal err: %s", err))
return
}
atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries)))
w.WriteHeader(http.StatusNoContent)
}