app/vmctl: drop integration tests from cluster branch, since they expect single-node VictoriaMetrics

This is a follow-up for 235477628e
This commit is contained in:
Aliaksandr Valialkin 2023-03-17 16:43:44 -07:00
parent 31005f4980
commit aeeab74388
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
4 changed files with 0 additions and 1244 deletions

View File

@ -1,319 +0,0 @@
package main
import (
"context"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/prometheus/prometheus/prompb"
)
func TestRemoteRead(t *testing.T) {
var testCases = []struct {
name string
remoteReadConfig remoteread.Config
vmCfg vm.Config
start string
end string
numOfSamples int64
numOfSeries int64
rrp remoteReadProcessor
chunk string
remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
expectedSeries []vm.TimeSeries
}{
{
name: "step minute on minute time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
start: "2022-11-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2,
numOfSeries: 3,
chunk: stepper.StepMinute,
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
expectedSeries: []vm.TimeSeries{
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{0, 0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{100, 100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{200, 200},
},
},
},
{
name: "step month on month time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
start: "2022-09-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2,
numOfSeries: 3,
chunk: stepper.StepMonth,
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
expectedSeries: []vm.TimeSeries{
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1664184185000},
Values: []float64{0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1664184185000},
Values: []float64{100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1664184185000},
Values: []float64{200},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1666819415000},
Values: []float64{0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1666819415000},
Values: []float64{100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1666819415000},
Values: []float64{200}},
},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
remoteReadServer := remote_read_integration.NewRemoteReadServer(t)
defer remoteReadServer.Close()
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
defer remoteWriteServer.Close()
tt.remoteReadConfig.Addr = remoteReadServer.URL()
rr, err := remoteread.NewClient(tt.remoteReadConfig)
if err != nil {
t.Fatalf("error create remote read client: %s", err)
}
start, err := time.Parse(time.RFC3339, tt.start)
if err != nil {
t.Fatalf("Error parse start time: %s", err)
}
end, err := time.Parse(time.RFC3339, tt.end)
if err != nil {
t.Fatalf("Error parse end time: %s", err)
}
rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
remoteReadServer.SetRemoteReadSeries(rrs)
remoteWriteServer.ExpectedSeries(tt.expectedSeries)
tt.vmCfg.Addr = remoteWriteServer.URL()
importer, err := vm.NewImporter(ctx, tt.vmCfg)
if err != nil {
t.Fatalf("failed to create VM importer: %s", err)
}
defer importer.Close()
rmp := remoteReadProcessor{
src: rr,
dst: importer,
filter: remoteReadFilter{
timeStart: &start,
timeEnd: &end,
chunk: tt.chunk,
},
cc: 1,
}
err = rmp.run(ctx, true, false)
if err != nil {
t.Fatalf("failed to run remote read processor: %s", err)
}
})
}
}
func TestSteamRemoteRead(t *testing.T) {
var testCases = []struct {
name string
remoteReadConfig remoteread.Config
vmCfg vm.Config
start string
end string
numOfSamples int64
numOfSeries int64
rrp remoteReadProcessor
chunk string
remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
expectedSeries []vm.TimeSeries
}{
{
name: "step minute on minute time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
start: "2022-11-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2,
numOfSeries: 3,
chunk: stepper.StepMinute,
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
expectedSeries: []vm.TimeSeries{
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{0, 0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{100, 100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{200, 200},
},
},
},
{
name: "step month on month time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
start: "2022-09-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2,
numOfSeries: 3,
chunk: stepper.StepMonth,
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
expectedSeries: []vm.TimeSeries{
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1664184185000},
Values: []float64{0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1664184185000},
Values: []float64{100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1664184185000},
Values: []float64{200},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1666819415000},
Values: []float64{0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1666819415000},
Values: []float64{100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1666819415000},
Values: []float64{200}},
},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t)
defer remoteReadServer.Close()
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
defer remoteWriteServer.Close()
tt.remoteReadConfig.Addr = remoteReadServer.URL()
rr, err := remoteread.NewClient(tt.remoteReadConfig)
if err != nil {
t.Fatalf("error create remote read client: %s", err)
}
start, err := time.Parse(time.RFC3339, tt.start)
if err != nil {
t.Fatalf("Error parse start time: %s", err)
}
end, err := time.Parse(time.RFC3339, tt.end)
if err != nil {
t.Fatalf("Error parse end time: %s", err)
}
rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
remoteReadServer.InitMockStorage(rrs)
remoteWriteServer.ExpectedSeries(tt.expectedSeries)
tt.vmCfg.Addr = remoteWriteServer.URL()
importer, err := vm.NewImporter(ctx, tt.vmCfg)
if err != nil {
t.Fatalf("failed to create VM importer: %s", err)
}
defer importer.Close()
rmp := remoteReadProcessor{
src: rr,
dst: importer,
filter: remoteReadFilter{
timeStart: &start,
timeEnd: &end,
chunk: tt.chunk,
},
cc: 1,
}
err = rmp.run(ctx, true, false)
if err != nil {
t.Fatalf("failed to run remote read processor: %s", err)
}
})
}
}

View File

@ -1,368 +0,0 @@
package remote_read_integration
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunks"
)
const (
maxBytesInFrame = 1024 * 1024
)
type RemoteReadServer struct {
server *httptest.Server
series []*prompb.TimeSeries
storage *MockStorage
}
// NewRemoteReadServer creates a remote read server. It exposes a single endpoint and responds with the
// passed series based on the request to the read endpoint. It returns a server which should be closed after
// being used.
func NewRemoteReadServer(t *testing.T) *RemoteReadServer {
rrs := &RemoteReadServer{
series: make([]*prompb.TimeSeries, 0),
}
rrs.server = httptest.NewServer(rrs.getReadHandler(t))
return rrs
}
// Close closes the server.
func (rrs *RemoteReadServer) Close() {
rrs.server.Close()
}
func (rrs *RemoteReadServer) URL() string {
return rrs.server.URL
}
func (rrs *RemoteReadServer) SetRemoteReadSeries(series []*prompb.TimeSeries) {
rrs.series = append(rrs.series, series...)
}
func (rrs *RemoteReadServer) getReadHandler(t *testing.T) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !validateReadHeaders(t, r) {
t.Fatalf("invalid read headers")
}
compressed, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("error read body: %s", err)
}
reqBuf, err := snappy.Decode(nil, compressed)
if err != nil {
t.Fatalf("error decode compressed data:%s", err)
}
var req prompb.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
t.Fatalf("error unmarshal read request: %s", err)
}
resp := &prompb.ReadResponse{
Results: make([]*prompb.QueryResult, len(req.Queries)),
}
for i, r := range req.Queries {
startTs := r.StartTimestampMs
endTs := r.EndTimestampMs
ts := make([]*prompb.TimeSeries, len(rrs.series))
for i, s := range rrs.series {
var samples []prompb.Sample
for _, sample := range s.Samples {
if sample.Timestamp >= startTs && sample.Timestamp < endTs {
samples = append(samples, sample)
}
}
var series prompb.TimeSeries
if len(samples) > 0 {
series.Labels = s.Labels
series.Samples = samples
}
ts[i] = &series
}
resp.Results[i] = &prompb.QueryResult{Timeseries: ts}
data, err := proto.Marshal(resp)
if err != nil {
t.Fatalf("error marshal response: %s", err)
}
compressed = snappy.Encode(nil, data)
w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy")
w.WriteHeader(http.StatusOK)
if _, err := w.Write(compressed); err != nil {
t.Fatalf("snappy encode error: %s", err)
}
}
})
}
func NewRemoteReadStreamServer(t *testing.T) *RemoteReadServer {
rrs := &RemoteReadServer{
series: make([]*prompb.TimeSeries, 0),
}
rrs.server = httptest.NewServer(rrs.getStreamReadHandler(t))
return rrs
}
func (rrs *RemoteReadServer) InitMockStorage(series []*prompb.TimeSeries) {
rrs.storage = NewMockStorage(series)
}
func (rrs *RemoteReadServer) getStreamReadHandler(t *testing.T) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !validateStreamReadHeaders(t, r) {
t.Fatalf("invalid read headers")
}
f, ok := w.(http.Flusher)
if !ok {
t.Fatalf("internal http.ResponseWriter does not implement http.Flusher interface")
}
stream := remote.NewChunkedWriter(w, f)
data, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("error read body: %s", err)
}
decodedData, err := snappy.Decode(nil, data)
if err != nil {
t.Fatalf("error decode compressed data:%s", err)
}
var req prompb.ReadRequest
if err := proto.Unmarshal(decodedData, &req); err != nil {
t.Fatalf("error unmarshal read request: %s", err)
}
var chks []prompb.Chunk
ctx := context.Background()
for idx, r := range req.Queries {
startTs := r.StartTimestampMs
endTs := r.EndTimestampMs
var matchers []*labels.Matcher
cb := func() (int64, error) { return 0, nil }
c := remote.NewSampleAndChunkQueryableClient(rrs.storage, nil, matchers, true, cb)
q, err := c.ChunkQuerier(ctx, startTs, endTs)
if err != nil {
t.Fatalf("error init chunk querier: %s", err)
}
ss := q.Select(false, nil, matchers...)
var iter chunks.Iterator
for ss.Next() {
series := ss.At()
iter = series.Iterator(iter)
labels := remote.MergeLabels(labelsToLabelsProto(series.Labels()), nil)
frameBytesLeft := maxBytesInFrame
for _, lb := range labels {
frameBytesLeft -= lb.Size()
}
isNext := iter.Next()
for isNext {
chunk := iter.At()
if chunk.Chunk == nil {
t.Fatalf("error found not populated chunk returned by SeriesSet at ref: %v", chunk.Ref)
}
chks = append(chks, prompb.Chunk{
MinTimeMs: chunk.MinTime,
MaxTimeMs: chunk.MaxTime,
Type: prompb.Chunk_Encoding(chunk.Chunk.Encoding()),
Data: chunk.Chunk.Bytes(),
})
frameBytesLeft -= chks[len(chks)-1].Size()
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
isNext = iter.Next()
if frameBytesLeft > 0 && isNext {
continue
}
resp := &prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{
{Labels: labels, Chunks: chks},
},
QueryIndex: int64(idx),
}
b, err := proto.Marshal(resp)
if err != nil {
t.Fatalf("error marshal response: %s", err)
}
if _, err := stream.Write(b); err != nil {
t.Fatalf("error write to stream: %s", err)
}
chks = chks[:0]
rrs.storage.Reset()
}
if err := iter.Err(); err != nil {
t.Fatalf("error iterate over chunk series: %s", err)
}
}
}
})
}
func validateReadHeaders(t *testing.T, r *http.Request) bool {
if r.Method != http.MethodPost {
t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost)
}
if r.Header.Get("Content-Encoding") != "snappy" {
t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy")
}
if r.Header.Get("Content-Type") != "application/x-protobuf" {
t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-protobuf")
}
remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version")
if remoteReadVersion == "" {
t.Fatalf("got empty prometheus remote read header")
}
if !strings.HasPrefix(remoteReadVersion, "0.1.") {
t.Fatalf("wrong remote version defined")
}
return true
}
func validateStreamReadHeaders(t *testing.T, r *http.Request) bool {
if r.Method != http.MethodPost {
t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost)
}
if r.Header.Get("Content-Encoding") != "snappy" {
t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy")
}
if r.Header.Get("Content-Type") != "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" {
t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
}
remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version")
if remoteReadVersion == "" {
t.Fatalf("got empty prometheus remote read header")
}
if !strings.HasPrefix(remoteReadVersion, "0.1.") {
t.Fatalf("wrong remote version defined")
}
return true
}
func GenerateRemoteReadSeries(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries {
var ts []*prompb.TimeSeries
j := 0
for i := 0; i < int(numOfSeries); i++ {
if i%3 == 0 {
j++
}
timeSeries := prompb.TimeSeries{
Labels: []prompb.Label{
{Name: labels.MetricName, Value: fmt.Sprintf("vm_metric_%d", j)},
{Name: "job", Value: strconv.Itoa(i)},
},
}
ts = append(ts, &timeSeries)
}
for i := range ts {
ts[i].Samples = generateRemoteReadSamples(i, start, end, numOfSamples)
}
return ts
}
func generateRemoteReadSamples(idx int, startTime, endTime, numOfSamples int64) []prompb.Sample {
samples := make([]prompb.Sample, 0)
delta := (endTime - startTime) / numOfSamples
t := startTime
for t != endTime {
v := 100 * int64(idx)
samples = append(samples, prompb.Sample{
Timestamp: t * 1000,
Value: float64(v),
})
t = t + delta
}
return samples
}
type MockStorage struct {
query *prompb.Query
store []*prompb.TimeSeries
}
func NewMockStorage(series []*prompb.TimeSeries) *MockStorage {
return &MockStorage{store: series}
}
func (ms *MockStorage) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
if ms.query != nil {
return nil, fmt.Errorf("expected only one call to remote client got: %v", query)
}
ms.query = query
q := &prompb.QueryResult{Timeseries: make([]*prompb.TimeSeries, 0, len(ms.store))}
for _, s := range ms.store {
var samples []prompb.Sample
for _, sample := range s.Samples {
if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp < query.EndTimestampMs {
samples = append(samples, sample)
}
}
var series prompb.TimeSeries
if len(samples) > 0 {
series.Labels = s.Labels
series.Samples = samples
}
q.Timeseries = append(q.Timeseries, &series)
}
return q, nil
}
func (ms *MockStorage) Reset() {
ms.query = nil
}
func labelsToLabelsProto(labels labels.Labels) []prompb.Label {
result := make([]prompb.Label, 0, len(labels))
for _, l := range labels {
result = append(result, prompb.Label{
Name: l.Name,
Value: l.Value,
})
}
return result
}

View File

@ -1,261 +0,0 @@
package remote_read_integration
import (
"bufio"
"encoding/json"
"fmt"
"log"
"net/http"
"net/http/httptest"
"reflect"
"sort"
"strconv"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
)
// LabelValues represents series from api/v1/series response
type LabelValues map[string]string
// Response represents response from api/v1/series
type Response struct {
Status string `json:"status"`
Series []LabelValues `json:"data"`
}
// RemoteWriteServer represents fake remote write server with database
type RemoteWriteServer struct {
server *httptest.Server
series []vm.TimeSeries
expectedSeries []vm.TimeSeries
}
// NewRemoteWriteServer prepares test remote write server
func NewRemoteWriteServer(t *testing.T) *RemoteWriteServer {
rws := &RemoteWriteServer{series: make([]vm.TimeSeries, 0)}
mux := http.NewServeMux()
mux.Handle("/api/v1/import", rws.getWriteHandler(t))
mux.Handle("/health", rws.handlePing())
mux.Handle("/api/v1/series", rws.seriesHandler())
mux.Handle("/api/v1/export/native", rws.exportNativeHandler())
mux.Handle("/api/v1/import/native", rws.importNativeHandler(t))
rws.server = httptest.NewServer(mux)
return rws
}
// Close closes the server
func (rws *RemoteWriteServer) Close() {
rws.server.Close()
}
// Series saves generated series for fake database
func (rws *RemoteWriteServer) Series(series []vm.TimeSeries) {
rws.series = append(rws.series, series...)
}
// ExpectedSeries saves expected results to check in the handler
func (rws *RemoteWriteServer) ExpectedSeries(series []vm.TimeSeries) {
rws.expectedSeries = append(rws.expectedSeries, series...)
}
// URL returns server url
func (rws *RemoteWriteServer) URL() string {
return rws.server.URL
}
func (rws *RemoteWriteServer) getWriteHandler(t *testing.T) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var tss []vm.TimeSeries
scanner := bufio.NewScanner(r.Body)
var rows parser.Rows
for scanner.Scan() {
rows.Unmarshal(scanner.Text())
for _, row := range rows.Rows {
var labelPairs []vm.LabelPair
var ts vm.TimeSeries
nameValue := ""
for _, tag := range row.Tags {
if string(tag.Key) == "__name__" {
nameValue = string(tag.Value)
continue
}
labelPairs = append(labelPairs, vm.LabelPair{Name: string(tag.Key), Value: string(tag.Value)})
}
ts.Values = append(ts.Values, row.Values...)
ts.Timestamps = append(ts.Timestamps, row.Timestamps...)
ts.Name = nameValue
ts.LabelPairs = labelPairs
tss = append(tss, ts)
}
rows.Reset()
}
if !reflect.DeepEqual(tss, rws.expectedSeries) {
w.WriteHeader(http.StatusInternalServerError)
t.Fatalf("datasets not equal, expected: %#v; \n got: %#v", rws.expectedSeries, tss)
return
}
w.WriteHeader(http.StatusNoContent)
return
})
}
func (rws *RemoteWriteServer) handlePing() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
})
}
func (rws *RemoteWriteServer) seriesHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var labelValues []LabelValues
for _, ser := range rws.series {
metricNames := make(LabelValues)
if ser.Name != "" {
metricNames["__name__"] = ser.Name
}
for _, p := range ser.LabelPairs {
metricNames[p.Name] = p.Value
}
labelValues = append(labelValues, metricNames)
}
resp := Response{
Status: "success",
Series: labelValues,
}
err := json.NewEncoder(w).Encode(resp)
if err != nil {
log.Printf("error send series: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
})
}
func (rws *RemoteWriteServer) exportNativeHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()
err := prometheus.ExportNativeHandler(now, w, r)
if err != nil {
log.Printf("error export series via native protocol: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
return
})
}
func (rws *RemoteWriteServer) importNativeHandler(t *testing.T) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
common.StartUnmarshalWorkers()
defer common.StopUnmarshalWorkers()
var gotTimeSeries []vm.TimeSeries
err := stream.Parse(r.Body, false, func(block *stream.Block) error {
mn := &block.MetricName
var timeseries vm.TimeSeries
timeseries.Name = string(mn.MetricGroup)
timeseries.Timestamps = append(timeseries.Timestamps, block.Timestamps...)
timeseries.Values = append(timeseries.Values, block.Values...)
for i := range mn.Tags {
tag := &mn.Tags[i]
timeseries.LabelPairs = append(timeseries.LabelPairs, vm.LabelPair{
Name: string(tag.Key),
Value: string(tag.Value),
})
}
gotTimeSeries = append(gotTimeSeries, timeseries)
return nil
})
if err != nil {
log.Printf("error parse stream blocks: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
// got timeseries should be sorted
// because they are processed independently
sort.SliceStable(gotTimeSeries, func(i, j int) bool {
iv, jv := gotTimeSeries[i], gotTimeSeries[j]
switch {
case iv.Values[0] != jv.Values[0]:
return iv.Values[0] < jv.Values[0]
case iv.Timestamps[0] != jv.Timestamps[0]:
return iv.Timestamps[0] < jv.Timestamps[0]
default:
return iv.Name < jv.Name
}
})
if !reflect.DeepEqual(gotTimeSeries, rws.expectedSeries) {
w.WriteHeader(http.StatusInternalServerError)
t.Fatalf("datasets not equal, expected: %#v;\n got: %#v", rws.expectedSeries, gotTimeSeries)
}
w.WriteHeader(http.StatusNoContent)
return
})
}
// GenerateVNSeries generates test timeseries
func GenerateVNSeries(start, end, numOfSeries, numOfSamples int64) []vm.TimeSeries {
var ts []vm.TimeSeries
j := 0
for i := 0; i < int(numOfSeries); i++ {
if i%3 == 0 {
j++
}
timeSeries := vm.TimeSeries{
Name: fmt.Sprintf("vm_metric_%d", j),
LabelPairs: []vm.LabelPair{
{Name: "job", Value: strconv.Itoa(i)},
},
}
ts = append(ts, timeSeries)
}
for i := range ts {
t, v := generateTimeStampsAndValues(i, start, end, numOfSamples)
ts[i].Timestamps = t
ts[i].Values = v
}
return ts
}
func generateTimeStampsAndValues(idx int, startTime, endTime, numOfSamples int64) ([]int64, []float64) {
delta := (endTime - startTime) / numOfSamples
var timestamps []int64
var values []float64
t := startTime
for t != endTime {
v := 100 * int64(idx)
timestamps = append(timestamps, t*1000)
values = append(values, float64(v))
t = t + delta
}
return timestamps, values
}

View File

@ -1,296 +0,0 @@
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
remote_read_integration "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
const (
storagePath = "TestStorage"
retentionPeriod = "100y"
)
func Test_vmNativeProcessor_run(t *testing.T) {
processFlags()
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
defer func() {
vmstorage.Stop()
if err := os.RemoveAll(storagePath); err != nil {
log.Fatalf("cannot remove %q: %s", storagePath, err)
}
}()
type fields struct {
filter native.Filter
dst *native.Client
src *native.Client
backoff *backoff.Backoff
s *stats
rateLimit int64
interCluster bool
cc int
matchName string
matchValue string
}
type args struct {
ctx context.Context
silent bool
}
tests := []struct {
name string
fields fields
args args
vmSeries func(start, end, numOfSeries, numOfSamples int64) []vm.TimeSeries
expectedSeries []vm.TimeSeries
start string
end string
numOfSamples int64
numOfSeries int64
chunk string
wantErr bool
}{
{
name: "step minute on minute time range",
start: "2022-11-25T11:23:05+02:00",
end: "2022-11-27T11:24:05+02:00",
numOfSamples: 2,
numOfSeries: 3,
chunk: stepper.StepMinute,
fields: fields{
filter: native.Filter{},
backoff: backoff.New(),
rateLimit: 0,
interCluster: false,
cc: 1,
matchName: "__name__",
matchValue: ".*",
},
args: args{
ctx: context.Background(),
silent: true,
},
vmSeries: remote_read_integration.GenerateVNSeries,
expectedSeries: []vm.TimeSeries{
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1669368185000, 1669454615000},
Values: []float64{0, 0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1669368185000, 1669454615000},
Values: []float64{100, 100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1669368185000, 1669454615000},
Values: []float64{200, 200},
},
},
wantErr: false,
},
{
name: "step month on month time range",
start: "2022-09-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2,
numOfSeries: 3,
chunk: stepper.StepMonth,
fields: fields{
filter: native.Filter{},
backoff: backoff.New(),
rateLimit: 0,
interCluster: false,
cc: 1,
matchName: "__name__",
matchValue: ".*",
},
args: args{
ctx: context.Background(),
silent: true,
},
vmSeries: remote_read_integration.GenerateVNSeries,
expectedSeries: []vm.TimeSeries{
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1664184185000},
Values: []float64{0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1666819415000},
Values: []float64{0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1664184185000},
Values: []float64{100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1666819415000},
Values: []float64{100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1664184185000},
Values: []float64{200},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1666819415000},
Values: []float64{200},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
src := remote_read_integration.NewRemoteWriteServer(t)
dst := remote_read_integration.NewRemoteWriteServer(t)
defer func() {
src.Close()
dst.Close()
}()
start, err := time.Parse(time.RFC3339, tt.start)
if err != nil {
t.Fatalf("Error parse start time: %s", err)
}
end, err := time.Parse(time.RFC3339, tt.end)
if err != nil {
t.Fatalf("Error parse end time: %s", err)
}
tt.fields.filter.Match = fmt.Sprintf("%s=%q", tt.fields.matchName, tt.fields.matchValue)
tt.fields.filter.TimeStart = tt.start
tt.fields.filter.TimeEnd = tt.end
rws := tt.vmSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
src.Series(rws)
dst.ExpectedSeries(tt.expectedSeries)
if err := fillStorage(rws); err != nil {
t.Fatalf("error add series to storage: %s", err)
}
tt.fields.src = &native.Client{
AuthCfg: nil,
Addr: src.URL(),
ExtraLabels: []string{},
DisableHTTPKeepAlive: false,
}
tt.fields.dst = &native.Client{
AuthCfg: nil,
Addr: dst.URL(),
ExtraLabels: []string{},
DisableHTTPKeepAlive: false,
}
p := &vmNativeProcessor{
filter: tt.fields.filter,
dst: tt.fields.dst,
src: tt.fields.src,
backoff: tt.fields.backoff,
s: tt.fields.s,
rateLimit: tt.fields.rateLimit,
interCluster: tt.fields.interCluster,
cc: tt.fields.cc,
}
if err := p.run(tt.args.ctx, tt.args.silent); (err != nil) != tt.wantErr {
t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr)
}
deleted, err := deleteSeries(tt.fields.matchName, tt.fields.matchValue)
if err != nil {
t.Fatalf("error delete series: %s", err)
}
if int64(deleted) != tt.numOfSeries {
t.Fatalf("expected deleted series %d; got deleted series %d", tt.numOfSeries, deleted)
}
})
}
}
func processFlags() {
flag.Parse()
for _, fv := range []struct {
flag string
value string
}{
{flag: "storageDataPath", value: storagePath},
{flag: "retentionPeriod", value: retentionPeriod},
} {
// panics if flag doesn't exist
if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil {
log.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err)
}
}
}
func fillStorage(series []vm.TimeSeries) error {
var mrs []storage.MetricRow
for _, series := range series {
var labels []prompb.Label
for _, lp := range series.LabelPairs {
labels = append(labels, prompb.Label{Name: []byte(lp.Name), Value: []byte(lp.Value)})
}
if series.Name != "" {
labels = append(labels, prompb.Label{Name: []byte("__name__"), Value: []byte(series.Name)})
}
mr := storage.MetricRow{}
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels)
timestamps := series.Timestamps
values := series.Values
for i, value := range values {
mr.Timestamp = timestamps[i]
mr.Value = value
mrs = append(mrs, mr)
}
}
if err := vmstorage.AddRows(mrs); err != nil {
return fmt.Errorf("unexpected error in AddRows: %s", err)
}
vmstorage.Storage.DebugFlush()
return nil
}
func deleteSeries(name, value string) (int, error) {
tfs := storage.NewTagFilters()
if err := tfs.Add([]byte(name), []byte(value), false, true); err != nil {
return 0, fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
}
return vmstorage.DeleteSeries(nil, []*storage.TagFilters{tfs})
}