mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
app/vmalert: add retries to remotewrite (#605)
* app/vmalert: add retries to remotewrite Remotewrite pkg now does limited number of retries if write request failed. This suppose to make vmalert state persisting more reliable. New metrics were added to remotewrite in order to track rows/bytes sent/dropped. defaultFlushInterval was increased from 1s to 5s for sanity reasons. * fix * wip * wip * wip * fix bits alignment bug for 32-bit systems * fix mistakenly dropped field
This commit is contained in:
parent
de137aef98
commit
703def4b2e
@ -144,7 +144,6 @@ var (
|
|||||||
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
|
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
|
||||||
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
|
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
|
||||||
|
|
||||||
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
|
|
||||||
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -255,7 +254,6 @@ func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, inter
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(tss) > 0 && e.rw != nil {
|
if len(tss) > 0 && e.rw != nil {
|
||||||
remoteWriteSent.Add(len(tss))
|
|
||||||
for _, ts := range tss {
|
for _, ts := range tss {
|
||||||
if err := e.rw.Push(ts); err != nil {
|
if err := e.rw.Push(ts); err != nil {
|
||||||
remoteWriteErrors.Inc()
|
remoteWriteErrors.Inc()
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -61,7 +62,7 @@ const (
|
|||||||
defaultConcurrency = 4
|
defaultConcurrency = 4
|
||||||
defaultMaxBatchSize = 1e3
|
defaultMaxBatchSize = 1e3
|
||||||
defaultMaxQueueSize = 1e5
|
defaultMaxQueueSize = 1e5
|
||||||
defaultFlushInterval = time.Second
|
defaultFlushInterval = 5 * time.Second
|
||||||
defaultWriteTimeout = 30 * time.Second
|
defaultWriteTimeout = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -85,6 +86,9 @@ func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
|||||||
if cfg.WriteTimeout == 0 {
|
if cfg.WriteTimeout == 0 {
|
||||||
cfg.WriteTimeout = defaultWriteTimeout
|
cfg.WriteTimeout = defaultWriteTimeout
|
||||||
}
|
}
|
||||||
|
if cfg.Transport == nil {
|
||||||
|
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
|
||||||
|
}
|
||||||
c := &Client{
|
c := &Client{
|
||||||
c: &http.Client{
|
c: &http.Client{
|
||||||
Timeout: cfg.WriteTimeout,
|
Timeout: cfg.WriteTimeout,
|
||||||
@ -138,14 +142,11 @@ func (c *Client) Close() error {
|
|||||||
|
|
||||||
func (c *Client) run(ctx context.Context) {
|
func (c *Client) run(ctx context.Context) {
|
||||||
ticker := time.NewTicker(c.flushInterval)
|
ticker := time.NewTicker(c.flushInterval)
|
||||||
wr := prompbmarshal.WriteRequest{}
|
wr := &prompbmarshal.WriteRequest{}
|
||||||
shutdown := func() {
|
shutdown := func() {
|
||||||
for ts := range c.input {
|
for ts := range c.input {
|
||||||
wr.Timeseries = append(wr.Timeseries, ts)
|
wr.Timeseries = append(wr.Timeseries, ts)
|
||||||
}
|
}
|
||||||
if len(wr.Timeseries) < 1 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
||||||
c.flush(lastCtx, wr)
|
c.flush(lastCtx, wr)
|
||||||
cancel()
|
cancel()
|
||||||
@ -164,44 +165,82 @@ func (c *Client) run(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
c.flush(ctx, wr)
|
c.flush(ctx, wr)
|
||||||
wr = prompbmarshal.WriteRequest{}
|
case ts, ok := <-c.input:
|
||||||
case ts := <-c.input:
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
wr.Timeseries = append(wr.Timeseries, ts)
|
wr.Timeseries = append(wr.Timeseries, ts)
|
||||||
if len(wr.Timeseries) >= c.maxBatchSize {
|
if len(wr.Timeseries) >= c.maxBatchSize {
|
||||||
c.flush(ctx, wr)
|
c.flush(ctx, wr)
|
||||||
wr = prompbmarshal.WriteRequest{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) flush(ctx context.Context, wr prompbmarshal.WriteRequest) {
|
var (
|
||||||
|
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
|
||||||
|
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
|
||||||
|
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
|
||||||
|
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
|
||||||
|
)
|
||||||
|
|
||||||
|
// flush is a blocking function that marshals WriteRequest and sends
|
||||||
|
// it to remote write endpoint. Flush performs limited amount of retries
|
||||||
|
// if request fails.
|
||||||
|
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
||||||
if len(wr.Timeseries) < 1 {
|
if len(wr.Timeseries) < 1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
defer prompbmarshal.ResetWriteRequest(wr)
|
||||||
|
|
||||||
data, err := wr.Marshal()
|
data, err := wr.Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("failed to marshal WriteRequest: %s", err)
|
logger.Errorf("failed to marshal WriteRequest: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
req, err := http.NewRequest("POST", c.addr, bytes.NewReader(snappy.Encode(nil, data)))
|
|
||||||
|
const attempts = 5
|
||||||
|
b := snappy.Encode(nil, data)
|
||||||
|
for i := 0; i < attempts; i++ {
|
||||||
|
err := c.send(ctx, b)
|
||||||
|
if err == nil {
|
||||||
|
sentRows.Add(len(wr.Timeseries))
|
||||||
|
sentBytes.Add(len(b))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Errorf("attempt %d to send request failed: %s", i+1, err)
|
||||||
|
// sleeping to avoid remote db hammering
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
droppedRows.Add(len(wr.Timeseries))
|
||||||
|
droppedBytes.Add(len(b))
|
||||||
|
logger.Errorf("all %d attempts to send request failed - dropping %d timeseries",
|
||||||
|
attempts, len(wr.Timeseries))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) send(ctx context.Context, data []byte) error {
|
||||||
|
r := bytes.NewReader(data)
|
||||||
|
req, err := http.NewRequest("POST", c.addr, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("failed to create new HTTP request: %s", err)
|
return fmt.Errorf("failed to create new HTTP request: %s", err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if c.baPass != "" {
|
if c.baPass != "" {
|
||||||
req.SetBasicAuth(c.baUser, c.baPass)
|
req.SetBasicAuth(c.baUser, c.baPass)
|
||||||
}
|
}
|
||||||
resp, err := c.c.Do(req.WithContext(ctx))
|
resp, err := c.c.Do(req.WithContext(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("error getting response from %s:%s", req.URL, err)
|
return fmt.Errorf("error while sending request to %s: %s; Data len %d(%d)",
|
||||||
return
|
req.URL, err, len(data), r.Size())
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() { _ = resp.Body.Close() }()
|
||||||
if resp.StatusCode != http.StatusNoContent {
|
if resp.StatusCode != http.StatusNoContent {
|
||||||
body, _ := ioutil.ReadAll(resp.Body)
|
body, _ := ioutil.ReadAll(resp.Body)
|
||||||
logger.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body)
|
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||||
return
|
resp.StatusCode, req.URL, body)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
102
app/vmalert/remotewrite/remotewrite_test.go
Normal file
102
app/vmalert/remotewrite/remotewrite_test.go
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
package remotewrite
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/snappy"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestClient_Push(t *testing.T) {
|
||||||
|
testSrv := newRWServer()
|
||||||
|
cfg := Config{
|
||||||
|
Addr: testSrv.URL,
|
||||||
|
MaxBatchSize: 100,
|
||||||
|
}
|
||||||
|
client, err := NewClient(context.Background(), cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create client: %s", err)
|
||||||
|
}
|
||||||
|
const rowsN = 1e4
|
||||||
|
var sent int
|
||||||
|
for i := 0; i < rowsN; i++ {
|
||||||
|
s := prompbmarshal.TimeSeries{
|
||||||
|
Samples: []prompbmarshal.Sample{{
|
||||||
|
Value: rand.Float64(),
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
err := client.Push(s)
|
||||||
|
if err == nil {
|
||||||
|
sent++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if sent == 0 {
|
||||||
|
t.Fatalf("0 series sent")
|
||||||
|
}
|
||||||
|
if err := client.Close(); err != nil {
|
||||||
|
t.Fatalf("failed to close client: %s", err)
|
||||||
|
}
|
||||||
|
got := testSrv.accepted()
|
||||||
|
if got != sent {
|
||||||
|
t.Fatalf("expected to have %d series; got %d", sent, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRWServer() *rwServer {
|
||||||
|
rw := &rwServer{}
|
||||||
|
rw.Server = httptest.NewServer(http.HandlerFunc(rw.handler))
|
||||||
|
return rw
|
||||||
|
}
|
||||||
|
|
||||||
|
type rwServer struct {
|
||||||
|
// WARN: ordering of fields is important for alignment!
|
||||||
|
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
||||||
|
acceptedRows uint64
|
||||||
|
*httptest.Server
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rw *rwServer) accepted() int {
|
||||||
|
return int(atomic.LoadUint64(&rw.acceptedRows))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rw *rwServer) err(w http.ResponseWriter, err error) {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
w.Write([]byte(err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
rw.err(w, fmt.Errorf("bad method %q", r.Method))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
data, err := ioutil.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
rw.err(w, fmt.Errorf("body read err: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() { _ = r.Body.Close() }()
|
||||||
|
|
||||||
|
b, err := snappy.Decode(nil, data)
|
||||||
|
if err != nil {
|
||||||
|
rw.err(w, fmt.Errorf("decode err: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
wr := &prompb.WriteRequest{}
|
||||||
|
if err := wr.Unmarshal(b); err != nil {
|
||||||
|
rw.err(w, fmt.Errorf("unmarhsal err: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries)))
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user