VictoriaMetrics/app/vmalert/remotewrite/remotewrite_test.go

120 lines
2.7 KiB
Go

package remotewrite
import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestClient_Push(t *testing.T) {
testSrv := newRWServer()
cfg := Config{
Addr: testSrv.URL,
MaxBatchSize: 100,
}
client, err := NewClient(context.Background(), cfg)
if err != nil {
t.Fatalf("failed to create client: %s", err)
}
r := rand.New(rand.NewSource(1))
const rowsN = 1e4
var sent int
for i := 0; i < rowsN; i++ {
s := prompbmarshal.TimeSeries{
Samples: []prompbmarshal.Sample{{
Value: r.Float64(),
Timestamp: time.Now().Unix(),
}},
}
err := client.Push(s)
if err == nil {
sent++
}
}
if sent == 0 {
t.Fatalf("0 series sent")
}
if err := client.Close(); err != nil {
t.Fatalf("failed to close client: %s", err)
}
got := testSrv.accepted()
if got != sent {
t.Fatalf("expected to have %d series; got %d", sent, got)
}
}
func newRWServer() *rwServer {
rw := &rwServer{}
rw.Server = httptest.NewServer(http.HandlerFunc(rw.handler))
return rw
}
type rwServer struct {
// WARN: ordering of fields is important for alignment!
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG
acceptedRows uint64
*httptest.Server
}
func (rw *rwServer) accepted() int {
return int(atomic.LoadUint64(&rw.acceptedRows))
}
func (rw *rwServer) err(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
}
func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
rw.err(w, fmt.Errorf("bad method %q", r.Method))
return
}
h := r.Header.Get("Content-Encoding")
if h != "snappy" {
rw.err(w, fmt.Errorf("header read error: Content-Encoding is not snappy (%q)", h))
}
h = r.Header.Get("Content-Type")
if h != "application/x-protobuf" {
rw.err(w, fmt.Errorf("header read error: Content-Type is not x-protobuf (%q)", h))
}
h = r.Header.Get("X-Prometheus-Remote-Write-Version")
if h != "0.1.0" {
rw.err(w, fmt.Errorf("header read error: X-Prometheus-Remote-Write-Version is not 0.1.0 (%q)", h))
}
data, err := io.ReadAll(r.Body)
if err != nil {
rw.err(w, fmt.Errorf("body read err: %w", err))
return
}
defer func() { _ = r.Body.Close() }()
b, err := snappy.Decode(nil, data)
if err != nil {
rw.err(w, fmt.Errorf("decode err: %w", err))
return
}
wr := &prompb.WriteRequest{}
if err := wr.Unmarshal(b); err != nil {
rw.err(w, fmt.Errorf("unmarhsal err: %w", err))
return
}
atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries)))
w.WriteHeader(http.StatusNoContent)
}