mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-02 01:00:07 +01:00
320 lines
9.5 KiB
Go
320 lines
9.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
|
"github.com/prometheus/prometheus/prompb"
|
|
)
|
|
|
|
func TestRemoteRead(t *testing.T) {
|
|
|
|
var testCases = []struct {
|
|
name string
|
|
remoteReadConfig remoteread.Config
|
|
vmCfg vm.Config
|
|
start string
|
|
end string
|
|
numOfSamples int64
|
|
numOfSeries int64
|
|
rrp remoteReadProcessor
|
|
chunk string
|
|
remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
|
|
expectedSeries []vm.TimeSeries
|
|
}{
|
|
{
|
|
name: "step minute on minute time range",
|
|
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
|
|
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
|
|
start: "2022-11-26T11:23:05+02:00",
|
|
end: "2022-11-26T11:24:05+02:00",
|
|
numOfSamples: 2,
|
|
numOfSeries: 3,
|
|
chunk: stepper.StepMinute,
|
|
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
|
expectedSeries: []vm.TimeSeries{
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
|
Timestamps: []int64{1669454585000, 1669454615000},
|
|
Values: []float64{0, 0},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
|
Timestamps: []int64{1669454585000, 1669454615000},
|
|
Values: []float64{100, 100},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
|
Timestamps: []int64{1669454585000, 1669454615000},
|
|
Values: []float64{200, 200},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "step month on month time range",
|
|
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
|
|
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
|
|
start: "2022-09-26T11:23:05+02:00",
|
|
end: "2022-11-26T11:24:05+02:00",
|
|
numOfSamples: 2,
|
|
numOfSeries: 3,
|
|
chunk: stepper.StepMonth,
|
|
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
|
expectedSeries: []vm.TimeSeries{
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
|
Timestamps: []int64{1664184185000},
|
|
Values: []float64{0},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
|
Timestamps: []int64{1664184185000},
|
|
Values: []float64{100},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
|
Timestamps: []int64{1664184185000},
|
|
Values: []float64{200},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
|
Timestamps: []int64{1666819415000},
|
|
Values: []float64{0},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
|
Timestamps: []int64{1666819415000},
|
|
Values: []float64{100},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
|
Timestamps: []int64{1666819415000},
|
|
Values: []float64{200}},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range testCases {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
remoteReadServer := remote_read_integration.NewRemoteReadServer(t)
|
|
defer remoteReadServer.Close()
|
|
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
|
|
defer remoteWriteServer.Close()
|
|
|
|
tt.remoteReadConfig.Addr = remoteReadServer.URL()
|
|
|
|
rr, err := remoteread.NewClient(tt.remoteReadConfig)
|
|
if err != nil {
|
|
t.Fatalf("error create remote read client: %s", err)
|
|
}
|
|
|
|
start, err := time.Parse(time.RFC3339, tt.start)
|
|
if err != nil {
|
|
t.Fatalf("Error parse start time: %s", err)
|
|
}
|
|
|
|
end, err := time.Parse(time.RFC3339, tt.end)
|
|
if err != nil {
|
|
t.Fatalf("Error parse end time: %s", err)
|
|
}
|
|
|
|
rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
|
|
|
|
remoteReadServer.SetRemoteReadSeries(rrs)
|
|
remoteWriteServer.ExpectedSeries(tt.expectedSeries)
|
|
|
|
tt.vmCfg.Addr = remoteWriteServer.URL()
|
|
|
|
importer, err := vm.NewImporter(tt.vmCfg)
|
|
if err != nil {
|
|
t.Fatalf("failed to create VM importer: %s", err)
|
|
}
|
|
defer importer.Close()
|
|
|
|
rmp := remoteReadProcessor{
|
|
src: rr,
|
|
dst: importer,
|
|
filter: remoteReadFilter{
|
|
timeStart: &start,
|
|
timeEnd: &end,
|
|
chunk: tt.chunk,
|
|
},
|
|
cc: 1,
|
|
}
|
|
|
|
ctx := context.Background()
|
|
err = rmp.run(ctx, true, false)
|
|
if err != nil {
|
|
t.Fatalf("failed to run remote read processor: %s", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSteamRemoteRead(t *testing.T) {
|
|
|
|
var testCases = []struct {
|
|
name string
|
|
remoteReadConfig remoteread.Config
|
|
vmCfg vm.Config
|
|
start string
|
|
end string
|
|
numOfSamples int64
|
|
numOfSeries int64
|
|
rrp remoteReadProcessor
|
|
chunk string
|
|
remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
|
|
expectedSeries []vm.TimeSeries
|
|
}{
|
|
{
|
|
name: "step minute on minute time range",
|
|
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
|
|
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
|
|
start: "2022-11-26T11:23:05+02:00",
|
|
end: "2022-11-26T11:24:05+02:00",
|
|
numOfSamples: 2,
|
|
numOfSeries: 3,
|
|
chunk: stepper.StepMinute,
|
|
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
|
expectedSeries: []vm.TimeSeries{
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
|
Timestamps: []int64{1669454585000, 1669454615000},
|
|
Values: []float64{0, 0},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
|
Timestamps: []int64{1669454585000, 1669454615000},
|
|
Values: []float64{100, 100},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
|
Timestamps: []int64{1669454585000, 1669454615000},
|
|
Values: []float64{200, 200},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "step month on month time range",
|
|
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
|
|
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
|
|
start: "2022-09-26T11:23:05+02:00",
|
|
end: "2022-11-26T11:24:05+02:00",
|
|
numOfSamples: 2,
|
|
numOfSeries: 3,
|
|
chunk: stepper.StepMonth,
|
|
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
|
expectedSeries: []vm.TimeSeries{
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
|
Timestamps: []int64{1664184185000},
|
|
Values: []float64{0},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
|
Timestamps: []int64{1664184185000},
|
|
Values: []float64{100},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
|
Timestamps: []int64{1664184185000},
|
|
Values: []float64{200},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
|
Timestamps: []int64{1666819415000},
|
|
Values: []float64{0},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
|
Timestamps: []int64{1666819415000},
|
|
Values: []float64{100},
|
|
},
|
|
{
|
|
Name: "vm_metric_1",
|
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
|
Timestamps: []int64{1666819415000},
|
|
Values: []float64{200}},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range testCases {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t)
|
|
defer remoteReadServer.Close()
|
|
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
|
|
defer remoteWriteServer.Close()
|
|
|
|
tt.remoteReadConfig.Addr = remoteReadServer.URL()
|
|
|
|
rr, err := remoteread.NewClient(tt.remoteReadConfig)
|
|
if err != nil {
|
|
t.Fatalf("error create remote read client: %s", err)
|
|
}
|
|
|
|
start, err := time.Parse(time.RFC3339, tt.start)
|
|
if err != nil {
|
|
t.Fatalf("Error parse start time: %s", err)
|
|
}
|
|
|
|
end, err := time.Parse(time.RFC3339, tt.end)
|
|
if err != nil {
|
|
t.Fatalf("Error parse end time: %s", err)
|
|
}
|
|
|
|
rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
|
|
|
|
remoteReadServer.InitMockStorage(rrs)
|
|
remoteWriteServer.ExpectedSeries(tt.expectedSeries)
|
|
|
|
tt.vmCfg.Addr = remoteWriteServer.URL()
|
|
|
|
importer, err := vm.NewImporter(tt.vmCfg)
|
|
if err != nil {
|
|
t.Fatalf("failed to create VM importer: %s", err)
|
|
}
|
|
defer importer.Close()
|
|
|
|
rmp := remoteReadProcessor{
|
|
src: rr,
|
|
dst: importer,
|
|
filter: remoteReadFilter{
|
|
timeStart: &start,
|
|
timeEnd: &end,
|
|
chunk: tt.chunk,
|
|
},
|
|
cc: 1,
|
|
}
|
|
|
|
ctx := context.Background()
|
|
err = rmp.run(ctx, true, false)
|
|
if err != nil {
|
|
t.Fatalf("failed to run remote read processor: %s", err)
|
|
}
|
|
})
|
|
}
|
|
}
|