VictoriaMetrics/app/vmalert/remotewrite/remotewrite_test.go
Roman Khavronenko 9afd19d375 app/vmalert: add retries to remotewrite (#605)
* app/vmalert: add retries to remotewrite

Remotewrite pkg now does limited number of retries if write request failed.
This suppose to make vmalert state persisting more reliable.

New metrics were added to remotewrite in order to track rows/bytes sent/dropped.

defaultFlushInterval was increased from 1s to 5s for sanity reasons.

* fix

* wip

* wip

* wip

* fix bits alignment bug for 32-bit systems

* fix mistakenly dropped field
2020-07-05 18:47:38 +03:00

103 lines
2.2 KiB
Go

package remotewrite
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestClient_Push(t *testing.T) {
testSrv := newRWServer()
cfg := Config{
Addr: testSrv.URL,
MaxBatchSize: 100,
}
client, err := NewClient(context.Background(), cfg)
if err != nil {
t.Fatalf("failed to create client: %s", err)
}
const rowsN = 1e4
var sent int
for i := 0; i < rowsN; i++ {
s := prompbmarshal.TimeSeries{
Samples: []prompbmarshal.Sample{{
Value: rand.Float64(),
Timestamp: time.Now().Unix(),
}},
}
err := client.Push(s)
if err == nil {
sent++
}
}
if sent == 0 {
t.Fatalf("0 series sent")
}
if err := client.Close(); err != nil {
t.Fatalf("failed to close client: %s", err)
}
got := testSrv.accepted()
if got != sent {
t.Fatalf("expected to have %d series; got %d", sent, got)
}
}
func newRWServer() *rwServer {
rw := &rwServer{}
rw.Server = httptest.NewServer(http.HandlerFunc(rw.handler))
return rw
}
type rwServer struct {
// WARN: ordering of fields is important for alignment!
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG
acceptedRows uint64
*httptest.Server
}
func (rw *rwServer) accepted() int {
return int(atomic.LoadUint64(&rw.acceptedRows))
}
func (rw *rwServer) err(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
}
func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
rw.err(w, fmt.Errorf("bad method %q", r.Method))
return
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
rw.err(w, fmt.Errorf("body read err: %s", err))
return
}
defer func() { _ = r.Body.Close() }()
b, err := snappy.Decode(nil, data)
if err != nil {
rw.err(w, fmt.Errorf("decode err: %s", err))
return
}
wr := &prompb.WriteRequest{}
if err := wr.Unmarshal(b); err != nil {
rw.err(w, fmt.Errorf("unmarhsal err: %s", err))
return
}
atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries)))
w.WriteHeader(http.StatusNoContent)
}