mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-05 01:01:09 +01:00
app/vmctl/testdata: fix tests broken after updating Prometheus dependencies in the commit 7c40b95224
This is a follow-up for 765ce1b181
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7700
This commit is contained in:
parent
6d0420b454
commit
f8cb2cf1a0
@ -1,348 +1,348 @@
|
||||
package main
|
||||
|
||||
// import (
|
||||
// "context"
|
||||
// "net/http"
|
||||
// "testing"
|
||||
// "time"
|
||||
//
|
||||
// "github.com/prometheus/prometheus/prompb"
|
||||
//
|
||||
// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
|
||||
// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
|
||||
// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
|
||||
// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
|
||||
// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
|
||||
// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
// )
|
||||
//
|
||||
// func TestRemoteRead(t *testing.T) {
|
||||
// barpool.Disable(true)
|
||||
// defer func() {
|
||||
// barpool.Disable(false)
|
||||
// }()
|
||||
// defer func() { isSilent = false }()
|
||||
//
|
||||
// var testCases = []struct {
|
||||
// name string
|
||||
// remoteReadConfig remoteread.Config
|
||||
// vmCfg vm.Config
|
||||
// start string
|
||||
// end string
|
||||
// numOfSamples int64
|
||||
// numOfSeries int64
|
||||
// rrp remoteReadProcessor
|
||||
// chunk string
|
||||
// remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
|
||||
// expectedSeries []vm.TimeSeries
|
||||
// }{
|
||||
// {
|
||||
// name: "step minute on minute time range",
|
||||
// remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
|
||||
// vmCfg: vm.Config{Addr: "", Concurrency: 1},
|
||||
// start: "2022-11-26T11:23:05+02:00",
|
||||
// end: "2022-11-26T11:24:05+02:00",
|
||||
// numOfSamples: 2,
|
||||
// numOfSeries: 3,
|
||||
// chunk: stepper.StepMinute,
|
||||
// remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||
// expectedSeries: []vm.TimeSeries{
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
// Timestamps: []int64{1669454585000, 1669454615000},
|
||||
// Values: []float64{0, 0},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
// Timestamps: []int64{1669454585000, 1669454615000},
|
||||
// Values: []float64{100, 100},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
// Timestamps: []int64{1669454585000, 1669454615000},
|
||||
// Values: []float64{200, 200},
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// {
|
||||
// name: "step month on month time range",
|
||||
// remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
|
||||
// vmCfg: vm.Config{Addr: "", Concurrency: 1,
|
||||
// Transport: http.DefaultTransport.(*http.Transport)},
|
||||
// start: "2022-09-26T11:23:05+02:00",
|
||||
// end: "2022-11-26T11:24:05+02:00",
|
||||
// numOfSamples: 2,
|
||||
// numOfSeries: 3,
|
||||
// chunk: stepper.StepMonth,
|
||||
// remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||
// expectedSeries: []vm.TimeSeries{
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
// Timestamps: []int64{1664184185000},
|
||||
// Values: []float64{0},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
// Timestamps: []int64{1664184185000},
|
||||
// Values: []float64{100},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
// Timestamps: []int64{1664184185000},
|
||||
// Values: []float64{200},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
// Timestamps: []int64{1666819415000},
|
||||
// Values: []float64{0},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
// Timestamps: []int64{1666819415000},
|
||||
// Values: []float64{100},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
// Timestamps: []int64{1666819415000},
|
||||
// Values: []float64{200}},
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
//
|
||||
// for _, tt := range testCases {
|
||||
// t.Run(tt.name, func(t *testing.T) {
|
||||
// ctx := context.Background()
|
||||
// remoteReadServer := remote_read_integration.NewRemoteReadServer(t)
|
||||
// defer remoteReadServer.Close()
|
||||
// remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
|
||||
// defer remoteWriteServer.Close()
|
||||
//
|
||||
// tt.remoteReadConfig.Addr = remoteReadServer.URL()
|
||||
//
|
||||
// rr, err := remoteread.NewClient(tt.remoteReadConfig)
|
||||
// if err != nil {
|
||||
// t.Fatalf("error create remote read client: %s", err)
|
||||
// }
|
||||
//
|
||||
// start, err := time.Parse(time.RFC3339, tt.start)
|
||||
// if err != nil {
|
||||
// t.Fatalf("Error parse start time: %s", err)
|
||||
// }
|
||||
//
|
||||
// end, err := time.Parse(time.RFC3339, tt.end)
|
||||
// if err != nil {
|
||||
// t.Fatalf("Error parse end time: %s", err)
|
||||
// }
|
||||
//
|
||||
// rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
|
||||
//
|
||||
// remoteReadServer.SetRemoteReadSeries(rrs)
|
||||
// remoteWriteServer.ExpectedSeries(tt.expectedSeries)
|
||||
//
|
||||
// tt.vmCfg.Addr = remoteWriteServer.URL()
|
||||
//
|
||||
// b, err := backoff.New(10, 1.8, time.Second*2)
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to create backoff: %s", err)
|
||||
// }
|
||||
// tt.vmCfg.Backoff = b
|
||||
//
|
||||
// importer, err := vm.NewImporter(ctx, tt.vmCfg)
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to create VM importer: %s", err)
|
||||
// }
|
||||
// defer importer.Close()
|
||||
//
|
||||
// rmp := remoteReadProcessor{
|
||||
// src: rr,
|
||||
// dst: importer,
|
||||
// filter: remoteReadFilter{
|
||||
// timeStart: &start,
|
||||
// timeEnd: &end,
|
||||
// chunk: tt.chunk,
|
||||
// },
|
||||
// cc: 1,
|
||||
// isVerbose: false,
|
||||
// }
|
||||
//
|
||||
// err = rmp.run(ctx)
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to run remote read processor: %s", err)
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func TestSteamRemoteRead(t *testing.T) {
|
||||
// barpool.Disable(true)
|
||||
// defer func() {
|
||||
// barpool.Disable(false)
|
||||
// }()
|
||||
// defer func() { isSilent = false }()
|
||||
//
|
||||
// var testCases = []struct {
|
||||
// name string
|
||||
// remoteReadConfig remoteread.Config
|
||||
// vmCfg vm.Config
|
||||
// start string
|
||||
// end string
|
||||
// numOfSamples int64
|
||||
// numOfSeries int64
|
||||
// rrp remoteReadProcessor
|
||||
// chunk string
|
||||
// remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
|
||||
// expectedSeries []vm.TimeSeries
|
||||
// }{
|
||||
// {
|
||||
// name: "step minute on minute time range",
|
||||
// remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
|
||||
// vmCfg: vm.Config{Addr: "", Concurrency: 1},
|
||||
// start: "2022-11-26T11:23:05+02:00",
|
||||
// end: "2022-11-26T11:24:05+02:00",
|
||||
// numOfSamples: 2,
|
||||
// numOfSeries: 3,
|
||||
// chunk: stepper.StepMinute,
|
||||
// remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||
// expectedSeries: []vm.TimeSeries{
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
// Timestamps: []int64{1669454585000, 1669454615000},
|
||||
// Values: []float64{0, 0},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
// Timestamps: []int64{1669454585000, 1669454615000},
|
||||
// Values: []float64{100, 100},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
// Timestamps: []int64{1669454585000, 1669454615000},
|
||||
// Values: []float64{200, 200},
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// {
|
||||
// name: "step month on month time range",
|
||||
// remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
|
||||
// vmCfg: vm.Config{Addr: "", Concurrency: 1},
|
||||
// start: "2022-09-26T11:23:05+02:00",
|
||||
// end: "2022-11-26T11:24:05+02:00",
|
||||
// numOfSamples: 2,
|
||||
// numOfSeries: 3,
|
||||
// chunk: stepper.StepMonth,
|
||||
// remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||
// expectedSeries: []vm.TimeSeries{
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
// Timestamps: []int64{1664184185000},
|
||||
// Values: []float64{0},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
// Timestamps: []int64{1664184185000},
|
||||
// Values: []float64{100},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
// Timestamps: []int64{1664184185000},
|
||||
// Values: []float64{200},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
// Timestamps: []int64{1666819415000},
|
||||
// Values: []float64{0},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
// Timestamps: []int64{1666819415000},
|
||||
// Values: []float64{100},
|
||||
// },
|
||||
// {
|
||||
// Name: "vm_metric_1",
|
||||
// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
// Timestamps: []int64{1666819415000},
|
||||
// Values: []float64{200}},
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
//
|
||||
// for _, tt := range testCases {
|
||||
// t.Run(tt.name, func(t *testing.T) {
|
||||
// ctx := context.Background()
|
||||
// remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t)
|
||||
// defer remoteReadServer.Close()
|
||||
// remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
|
||||
// defer remoteWriteServer.Close()
|
||||
//
|
||||
// tt.remoteReadConfig.Addr = remoteReadServer.URL()
|
||||
//
|
||||
// rr, err := remoteread.NewClient(tt.remoteReadConfig)
|
||||
// if err != nil {
|
||||
// t.Fatalf("error create remote read client: %s", err)
|
||||
// }
|
||||
//
|
||||
// start, err := time.Parse(time.RFC3339, tt.start)
|
||||
// if err != nil {
|
||||
// t.Fatalf("Error parse start time: %s", err)
|
||||
// }
|
||||
//
|
||||
// end, err := time.Parse(time.RFC3339, tt.end)
|
||||
// if err != nil {
|
||||
// t.Fatalf("Error parse end time: %s", err)
|
||||
// }
|
||||
//
|
||||
// rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
|
||||
//
|
||||
// remoteReadServer.InitMockStorage(rrs)
|
||||
// remoteWriteServer.ExpectedSeries(tt.expectedSeries)
|
||||
//
|
||||
// tt.vmCfg.Addr = remoteWriteServer.URL()
|
||||
//
|
||||
// b, err := backoff.New(10, 1.8, time.Second*2)
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to create backoff: %s", err)
|
||||
// }
|
||||
//
|
||||
// tt.vmCfg.Backoff = b
|
||||
// importer, err := vm.NewImporter(ctx, tt.vmCfg)
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to create VM importer: %s", err)
|
||||
// }
|
||||
// defer importer.Close()
|
||||
//
|
||||
// rmp := remoteReadProcessor{
|
||||
// src: rr,
|
||||
// dst: importer,
|
||||
// filter: remoteReadFilter{
|
||||
// timeStart: &start,
|
||||
// timeEnd: &end,
|
||||
// chunk: tt.chunk,
|
||||
// },
|
||||
// cc: 1,
|
||||
// isVerbose: false,
|
||||
// }
|
||||
//
|
||||
// err = rmp.run(ctx)
|
||||
// if err != nil {
|
||||
// t.Fatalf("failed to run remote read processor: %s", err)
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
// }
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
)
|
||||
|
||||
func TestRemoteRead(t *testing.T) {
|
||||
barpool.Disable(true)
|
||||
defer func() {
|
||||
barpool.Disable(false)
|
||||
}()
|
||||
defer func() { isSilent = false }()
|
||||
|
||||
var testCases = []struct {
|
||||
name string
|
||||
remoteReadConfig remoteread.Config
|
||||
vmCfg vm.Config
|
||||
start string
|
||||
end string
|
||||
numOfSamples int64
|
||||
numOfSeries int64
|
||||
rrp remoteReadProcessor
|
||||
chunk string
|
||||
remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
|
||||
expectedSeries []vm.TimeSeries
|
||||
}{
|
||||
{
|
||||
name: "step minute on minute time range",
|
||||
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
|
||||
vmCfg: vm.Config{Addr: "", Concurrency: 1},
|
||||
start: "2022-11-26T11:23:05+02:00",
|
||||
end: "2022-11-26T11:24:05+02:00",
|
||||
numOfSamples: 2,
|
||||
numOfSeries: 3,
|
||||
chunk: stepper.StepMinute,
|
||||
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||
expectedSeries: []vm.TimeSeries{
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
Timestamps: []int64{1669454585000, 1669454615000},
|
||||
Values: []float64{0, 0},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
Timestamps: []int64{1669454585000, 1669454615000},
|
||||
Values: []float64{100, 100},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
Timestamps: []int64{1669454585000, 1669454615000},
|
||||
Values: []float64{200, 200},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "step month on month time range",
|
||||
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
|
||||
vmCfg: vm.Config{Addr: "", Concurrency: 1,
|
||||
Transport: http.DefaultTransport.(*http.Transport)},
|
||||
start: "2022-09-26T11:23:05+02:00",
|
||||
end: "2022-11-26T11:24:05+02:00",
|
||||
numOfSamples: 2,
|
||||
numOfSeries: 3,
|
||||
chunk: stepper.StepMonth,
|
||||
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||
expectedSeries: []vm.TimeSeries{
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
Timestamps: []int64{1664184185000},
|
||||
Values: []float64{0},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
Timestamps: []int64{1664184185000},
|
||||
Values: []float64{100},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
Timestamps: []int64{1664184185000},
|
||||
Values: []float64{200},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
Timestamps: []int64{1666819415000},
|
||||
Values: []float64{0},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
Timestamps: []int64{1666819415000},
|
||||
Values: []float64{100},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
Timestamps: []int64{1666819415000},
|
||||
Values: []float64{200}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
remoteReadServer := remote_read_integration.NewRemoteReadServer(t)
|
||||
defer remoteReadServer.Close()
|
||||
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
|
||||
defer remoteWriteServer.Close()
|
||||
|
||||
tt.remoteReadConfig.Addr = remoteReadServer.URL()
|
||||
|
||||
rr, err := remoteread.NewClient(tt.remoteReadConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("error create remote read client: %s", err)
|
||||
}
|
||||
|
||||
start, err := time.Parse(time.RFC3339, tt.start)
|
||||
if err != nil {
|
||||
t.Fatalf("Error parse start time: %s", err)
|
||||
}
|
||||
|
||||
end, err := time.Parse(time.RFC3339, tt.end)
|
||||
if err != nil {
|
||||
t.Fatalf("Error parse end time: %s", err)
|
||||
}
|
||||
|
||||
rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
|
||||
|
||||
remoteReadServer.SetRemoteReadSeries(rrs)
|
||||
remoteWriteServer.ExpectedSeries(tt.expectedSeries)
|
||||
|
||||
tt.vmCfg.Addr = remoteWriteServer.URL()
|
||||
|
||||
b, err := backoff.New(10, 1.8, time.Second*2)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create backoff: %s", err)
|
||||
}
|
||||
tt.vmCfg.Backoff = b
|
||||
|
||||
importer, err := vm.NewImporter(ctx, tt.vmCfg)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create VM importer: %s", err)
|
||||
}
|
||||
defer importer.Close()
|
||||
|
||||
rmp := remoteReadProcessor{
|
||||
src: rr,
|
||||
dst: importer,
|
||||
filter: remoteReadFilter{
|
||||
timeStart: &start,
|
||||
timeEnd: &end,
|
||||
chunk: tt.chunk,
|
||||
},
|
||||
cc: 1,
|
||||
isVerbose: false,
|
||||
}
|
||||
|
||||
err = rmp.run(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to run remote read processor: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSteamRemoteRead(t *testing.T) {
|
||||
barpool.Disable(true)
|
||||
defer func() {
|
||||
barpool.Disable(false)
|
||||
}()
|
||||
defer func() { isSilent = false }()
|
||||
|
||||
var testCases = []struct {
|
||||
name string
|
||||
remoteReadConfig remoteread.Config
|
||||
vmCfg vm.Config
|
||||
start string
|
||||
end string
|
||||
numOfSamples int64
|
||||
numOfSeries int64
|
||||
rrp remoteReadProcessor
|
||||
chunk string
|
||||
remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
|
||||
expectedSeries []vm.TimeSeries
|
||||
}{
|
||||
{
|
||||
name: "step minute on minute time range",
|
||||
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
|
||||
vmCfg: vm.Config{Addr: "", Concurrency: 1},
|
||||
start: "2022-11-26T11:23:05+02:00",
|
||||
end: "2022-11-26T11:24:05+02:00",
|
||||
numOfSamples: 2,
|
||||
numOfSeries: 3,
|
||||
chunk: stepper.StepMinute,
|
||||
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||
expectedSeries: []vm.TimeSeries{
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
Timestamps: []int64{1669454585000, 1669454615000},
|
||||
Values: []float64{0, 0},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
Timestamps: []int64{1669454585000, 1669454615000},
|
||||
Values: []float64{100, 100},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
Timestamps: []int64{1669454585000, 1669454615000},
|
||||
Values: []float64{200, 200},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "step month on month time range",
|
||||
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
|
||||
vmCfg: vm.Config{Addr: "", Concurrency: 1},
|
||||
start: "2022-09-26T11:23:05+02:00",
|
||||
end: "2022-11-26T11:24:05+02:00",
|
||||
numOfSamples: 2,
|
||||
numOfSeries: 3,
|
||||
chunk: stepper.StepMonth,
|
||||
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||
expectedSeries: []vm.TimeSeries{
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
Timestamps: []int64{1664184185000},
|
||||
Values: []float64{0},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
Timestamps: []int64{1664184185000},
|
||||
Values: []float64{100},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
Timestamps: []int64{1664184185000},
|
||||
Values: []float64{200},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
Timestamps: []int64{1666819415000},
|
||||
Values: []float64{0},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
Timestamps: []int64{1666819415000},
|
||||
Values: []float64{100},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
Timestamps: []int64{1666819415000},
|
||||
Values: []float64{200}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t)
|
||||
defer remoteReadServer.Close()
|
||||
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
|
||||
defer remoteWriteServer.Close()
|
||||
|
||||
tt.remoteReadConfig.Addr = remoteReadServer.URL()
|
||||
|
||||
rr, err := remoteread.NewClient(tt.remoteReadConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("error create remote read client: %s", err)
|
||||
}
|
||||
|
||||
start, err := time.Parse(time.RFC3339, tt.start)
|
||||
if err != nil {
|
||||
t.Fatalf("Error parse start time: %s", err)
|
||||
}
|
||||
|
||||
end, err := time.Parse(time.RFC3339, tt.end)
|
||||
if err != nil {
|
||||
t.Fatalf("Error parse end time: %s", err)
|
||||
}
|
||||
|
||||
rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
|
||||
|
||||
remoteReadServer.InitMockStorage(rrs)
|
||||
remoteWriteServer.ExpectedSeries(tt.expectedSeries)
|
||||
|
||||
tt.vmCfg.Addr = remoteWriteServer.URL()
|
||||
|
||||
b, err := backoff.New(10, 1.8, time.Second*2)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create backoff: %s", err)
|
||||
}
|
||||
|
||||
tt.vmCfg.Backoff = b
|
||||
importer, err := vm.NewImporter(ctx, tt.vmCfg)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create VM importer: %s", err)
|
||||
}
|
||||
defer importer.Close()
|
||||
|
||||
rmp := remoteReadProcessor{
|
||||
src: rr,
|
||||
dst: importer,
|
||||
filter: remoteReadFilter{
|
||||
timeStart: &start,
|
||||
timeEnd: &end,
|
||||
chunk: tt.chunk,
|
||||
},
|
||||
cc: 1,
|
||||
isVerbose: false,
|
||||
}
|
||||
|
||||
err = rmp.run(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to run remote read processor: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ import (
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
)
|
||||
|
||||
|
@ -1,368 +1,469 @@
|
||||
package remote_read_integration
|
||||
|
||||
// import (
|
||||
// "context"
|
||||
// "fmt"
|
||||
// "io"
|
||||
// "net/http"
|
||||
// "net/http/httptest"
|
||||
// "strconv"
|
||||
// "strings"
|
||||
// "testing"
|
||||
//
|
||||
// "github.com/gogo/protobuf/proto"
|
||||
// "github.com/golang/snappy"
|
||||
// "github.com/prometheus/prometheus/model/labels"
|
||||
// "github.com/prometheus/prometheus/prompb"
|
||||
// "github.com/prometheus/prometheus/storage/remote"
|
||||
// "github.com/prometheus/prometheus/tsdb/chunks"
|
||||
// )
|
||||
//
|
||||
// const (
|
||||
// maxBytesInFrame = 1024 * 1024
|
||||
// )
|
||||
//
|
||||
// type RemoteReadServer struct {
|
||||
// server *httptest.Server
|
||||
// series []*prompb.TimeSeries
|
||||
// storage *MockStorage
|
||||
// }
|
||||
//
|
||||
// // NewRemoteReadServer creates a remote read server. It exposes a single endpoint and responds with the
|
||||
// // passed series based on the request to the read endpoint. It returns a server which should be closed after
|
||||
// // being used.
|
||||
// func NewRemoteReadServer(t *testing.T) *RemoteReadServer {
|
||||
// rrs := &RemoteReadServer{
|
||||
// series: make([]*prompb.TimeSeries, 0),
|
||||
// }
|
||||
// rrs.server = httptest.NewServer(rrs.getReadHandler(t))
|
||||
// return rrs
|
||||
// }
|
||||
//
|
||||
// // Close closes the server.
|
||||
// func (rrs *RemoteReadServer) Close() {
|
||||
// rrs.server.Close()
|
||||
// }
|
||||
//
|
||||
// func (rrs *RemoteReadServer) URL() string {
|
||||
// return rrs.server.URL
|
||||
// }
|
||||
//
|
||||
// func (rrs *RemoteReadServer) SetRemoteReadSeries(series []*prompb.TimeSeries) {
|
||||
// rrs.series = append(rrs.series, series...)
|
||||
// }
|
||||
//
|
||||
// func (rrs *RemoteReadServer) getReadHandler(t *testing.T) http.Handler {
|
||||
// return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// if !validateReadHeaders(t, r) {
|
||||
// t.Fatalf("invalid read headers")
|
||||
// }
|
||||
//
|
||||
// compressed, err := io.ReadAll(r.Body)
|
||||
// if err != nil {
|
||||
// t.Fatalf("error read body: %s", err)
|
||||
// }
|
||||
//
|
||||
// reqBuf, err := snappy.Decode(nil, compressed)
|
||||
// if err != nil {
|
||||
// t.Fatalf("error decode compressed data:%s", err)
|
||||
// }
|
||||
//
|
||||
// var req prompb.ReadRequest
|
||||
// if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
// t.Fatalf("error unmarshal read request: %s", err)
|
||||
// }
|
||||
//
|
||||
// resp := &prompb.ReadResponse{
|
||||
// Results: make([]*prompb.QueryResult, len(req.Queries)),
|
||||
// }
|
||||
//
|
||||
// for i, r := range req.Queries {
|
||||
// startTs := r.StartTimestampMs
|
||||
// endTs := r.EndTimestampMs
|
||||
// ts := make([]*prompb.TimeSeries, len(rrs.series))
|
||||
// for i, s := range rrs.series {
|
||||
// var samples []prompb.Sample
|
||||
// for _, sample := range s.Samples {
|
||||
// if sample.Timestamp >= startTs && sample.Timestamp < endTs {
|
||||
// samples = append(samples, sample)
|
||||
// }
|
||||
// }
|
||||
// var series prompb.TimeSeries
|
||||
// if len(samples) > 0 {
|
||||
// series.Labels = s.Labels
|
||||
// series.Samples = samples
|
||||
// }
|
||||
// ts[i] = &series
|
||||
// }
|
||||
//
|
||||
// resp.Results[i] = &prompb.QueryResult{Timeseries: ts}
|
||||
// data, err := proto.Marshal(resp)
|
||||
// if err != nil {
|
||||
// t.Fatalf("error marshal response: %s", err)
|
||||
// }
|
||||
//
|
||||
// compressed = snappy.Encode(nil, data)
|
||||
//
|
||||
// w.Header().Set("Content-Type", "application/x-protobuf")
|
||||
// w.Header().Set("Content-Encoding", "snappy")
|
||||
// w.WriteHeader(http.StatusOK)
|
||||
//
|
||||
// if _, err := w.Write(compressed); err != nil {
|
||||
// t.Fatalf("snappy encode error: %s", err)
|
||||
// }
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
//
|
||||
// func NewRemoteReadStreamServer(t *testing.T) *RemoteReadServer {
|
||||
// rrs := &RemoteReadServer{
|
||||
// series: make([]*prompb.TimeSeries, 0),
|
||||
// }
|
||||
// rrs.server = httptest.NewServer(rrs.getStreamReadHandler(t))
|
||||
// return rrs
|
||||
// }
|
||||
//
|
||||
// func (rrs *RemoteReadServer) InitMockStorage(series []*prompb.TimeSeries) {
|
||||
// rrs.storage = NewMockStorage(series)
|
||||
// }
|
||||
//
|
||||
// func (rrs *RemoteReadServer) getStreamReadHandler(t *testing.T) http.Handler {
|
||||
// return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// if !validateStreamReadHeaders(t, r) {
|
||||
// t.Fatalf("invalid read headers")
|
||||
// }
|
||||
//
|
||||
// f, ok := w.(http.Flusher)
|
||||
// if !ok {
|
||||
// t.Fatalf("internal http.ResponseWriter does not implement http.Flusher interface")
|
||||
// }
|
||||
//
|
||||
// stream := remote.NewChunkedWriter(w, f)
|
||||
//
|
||||
// data, err := io.ReadAll(r.Body)
|
||||
// if err != nil {
|
||||
// t.Fatalf("error read body: %s", err)
|
||||
// }
|
||||
//
|
||||
// decodedData, err := snappy.Decode(nil, data)
|
||||
// if err != nil {
|
||||
// t.Fatalf("error decode compressed data:%s", err)
|
||||
// }
|
||||
//
|
||||
// var req prompb.ReadRequest
|
||||
// if err := proto.Unmarshal(decodedData, &req); err != nil {
|
||||
// t.Fatalf("error unmarshal read request: %s", err)
|
||||
// }
|
||||
//
|
||||
// var chks []prompb.Chunk
|
||||
// ctx := context.Background()
|
||||
// for idx, r := range req.Queries {
|
||||
// startTs := r.StartTimestampMs
|
||||
// endTs := r.EndTimestampMs
|
||||
//
|
||||
// var matchers []*labels.Matcher
|
||||
// cb := func() (int64, error) { return 0, nil }
|
||||
//
|
||||
// c := remote.NewSampleAndChunkQueryableClient(rrs.storage, nil, matchers, true, cb)
|
||||
//
|
||||
// q, err := c.ChunkQuerier(startTs, endTs)
|
||||
// if err != nil {
|
||||
// t.Fatalf("error init chunk querier: %s", err)
|
||||
// }
|
||||
//
|
||||
// ss := q.Select(ctx, false, nil, matchers...)
|
||||
// var iter chunks.Iterator
|
||||
// for ss.Next() {
|
||||
// series := ss.At()
|
||||
// iter = series.Iterator(iter)
|
||||
// labels := remote.MergeLabels(labelsToLabelsProto(series.Labels()), nil)
|
||||
//
|
||||
// frameBytesLeft := maxBytesInFrame
|
||||
// for _, lb := range labels {
|
||||
// frameBytesLeft -= lb.Size()
|
||||
// }
|
||||
//
|
||||
// isNext := iter.Next()
|
||||
//
|
||||
// for isNext {
|
||||
// chunk := iter.At()
|
||||
//
|
||||
// if chunk.Chunk == nil {
|
||||
// t.Fatalf("error found not populated chunk returned by SeriesSet at ref: %v", chunk.Ref)
|
||||
// }
|
||||
//
|
||||
// chks = append(chks, prompb.Chunk{
|
||||
// MinTimeMs: chunk.MinTime,
|
||||
// MaxTimeMs: chunk.MaxTime,
|
||||
// Type: prompb.Chunk_Encoding(chunk.Chunk.Encoding()),
|
||||
// Data: chunk.Chunk.Bytes(),
|
||||
// })
|
||||
//
|
||||
// frameBytesLeft -= chks[len(chks)-1].Size()
|
||||
//
|
||||
// // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
|
||||
// isNext = iter.Next()
|
||||
// if frameBytesLeft > 0 && isNext {
|
||||
// continue
|
||||
// }
|
||||
//
|
||||
// resp := &prompb.ChunkedReadResponse{
|
||||
// ChunkedSeries: []*prompb.ChunkedSeries{
|
||||
// {Labels: labels, Chunks: chks},
|
||||
// },
|
||||
// QueryIndex: int64(idx),
|
||||
// }
|
||||
//
|
||||
// b, err := proto.Marshal(resp)
|
||||
// if err != nil {
|
||||
// t.Fatalf("error marshal response: %s", err)
|
||||
// }
|
||||
//
|
||||
// if _, err := stream.Write(b); err != nil {
|
||||
// t.Fatalf("error write to stream: %s", err)
|
||||
// }
|
||||
// chks = chks[:0]
|
||||
// rrs.storage.Reset()
|
||||
// }
|
||||
// if err := iter.Err(); err != nil {
|
||||
// t.Fatalf("error iterate over chunk series: %s", err)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
//
|
||||
// func validateReadHeaders(t *testing.T, r *http.Request) bool {
|
||||
// if r.Method != http.MethodPost {
|
||||
// t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost)
|
||||
// }
|
||||
// if r.Header.Get("Content-Encoding") != "snappy" {
|
||||
// t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy")
|
||||
// }
|
||||
// if r.Header.Get("Content-Type") != "application/x-protobuf" {
|
||||
// t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-protobuf")
|
||||
// }
|
||||
//
|
||||
// remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version")
|
||||
// if remoteReadVersion == "" {
|
||||
// t.Fatalf("got empty prometheus remote read header")
|
||||
// }
|
||||
// if !strings.HasPrefix(remoteReadVersion, "0.1.") {
|
||||
// t.Fatalf("wrong remote version defined")
|
||||
// }
|
||||
//
|
||||
// return true
|
||||
// }
|
||||
//
|
||||
// func validateStreamReadHeaders(t *testing.T, r *http.Request) bool {
|
||||
// if r.Method != http.MethodPost {
|
||||
// t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost)
|
||||
// }
|
||||
// if r.Header.Get("Content-Encoding") != "snappy" {
|
||||
// t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy")
|
||||
// }
|
||||
// if r.Header.Get("Content-Type") != "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" {
|
||||
// t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
|
||||
// }
|
||||
//
|
||||
// remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version")
|
||||
// if remoteReadVersion == "" {
|
||||
// t.Fatalf("got empty prometheus remote read header")
|
||||
// }
|
||||
// if !strings.HasPrefix(remoteReadVersion, "0.1.") {
|
||||
// t.Fatalf("wrong remote version defined")
|
||||
// }
|
||||
// return true
|
||||
// }
|
||||
//
|
||||
// func GenerateRemoteReadSeries(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries {
|
||||
// var ts []*prompb.TimeSeries
|
||||
// j := 0
|
||||
// for i := 0; i < int(numOfSeries); i++ {
|
||||
// if i%3 == 0 {
|
||||
// j++
|
||||
// }
|
||||
//
|
||||
// timeSeries := prompb.TimeSeries{
|
||||
// Labels: []prompb.Label{
|
||||
// {Name: labels.MetricName, Value: fmt.Sprintf("vm_metric_%d", j)},
|
||||
// {Name: "job", Value: strconv.Itoa(i)},
|
||||
// },
|
||||
// }
|
||||
//
|
||||
// ts = append(ts, &timeSeries)
|
||||
// }
|
||||
//
|
||||
// for i := range ts {
|
||||
// ts[i].Samples = generateRemoteReadSamples(i, start, end, numOfSamples)
|
||||
// }
|
||||
//
|
||||
// return ts
|
||||
// }
|
||||
//
|
||||
// func generateRemoteReadSamples(idx int, startTime, endTime, numOfSamples int64) []prompb.Sample {
|
||||
// samples := make([]prompb.Sample, 0)
|
||||
// delta := (endTime - startTime) / numOfSamples
|
||||
//
|
||||
// t := startTime
|
||||
// for t != endTime {
|
||||
// v := 100 * int64(idx)
|
||||
// samples = append(samples, prompb.Sample{
|
||||
// Timestamp: t * 1000,
|
||||
// Value: float64(v),
|
||||
// })
|
||||
// t = t + delta
|
||||
// }
|
||||
//
|
||||
// return samples
|
||||
// }
|
||||
//
|
||||
// type MockStorage struct {
|
||||
// query *prompb.Query
|
||||
// store []*prompb.TimeSeries
|
||||
// }
|
||||
//
|
||||
// func NewMockStorage(series []*prompb.TimeSeries) *MockStorage {
|
||||
// return &MockStorage{store: series}
|
||||
// }
|
||||
//
|
||||
// func (ms *MockStorage) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
||||
// if ms.query != nil {
|
||||
// return nil, fmt.Errorf("expected only one call to remote client got: %v", query)
|
||||
// }
|
||||
// ms.query = query
|
||||
//
|
||||
// q := &prompb.QueryResult{Timeseries: make([]*prompb.TimeSeries, 0, len(ms.store))}
|
||||
// for _, s := range ms.store {
|
||||
// var samples []prompb.Sample
|
||||
// for _, sample := range s.Samples {
|
||||
// if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp < query.EndTimestampMs {
|
||||
// samples = append(samples, sample)
|
||||
// }
|
||||
// }
|
||||
// var series prompb.TimeSeries
|
||||
// if len(samples) > 0 {
|
||||
// series.Labels = s.Labels
|
||||
// series.Samples = samples
|
||||
// }
|
||||
//
|
||||
// q.Timeseries = append(q.Timeseries, &series)
|
||||
// }
|
||||
// return q, nil
|
||||
// }
|
||||
//
|
||||
// func (ms *MockStorage) Reset() {
|
||||
// ms.query = nil
|
||||
// }
|
||||
//
|
||||
// func labelsToLabelsProto(labels labels.Labels) []prompb.Label {
|
||||
// result := make([]prompb.Label, 0, len(labels))
|
||||
// for _, l := range labels {
|
||||
// result = append(result, prompb.Label{
|
||||
// Name: l.Name,
|
||||
// Value: l.Value,
|
||||
// })
|
||||
// }
|
||||
// return result
|
||||
// }
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
)
|
||||
|
||||
const (
|
||||
maxBytesInFrame = 1024 * 1024
|
||||
)
|
||||
|
||||
type RemoteReadServer struct {
|
||||
server *httptest.Server
|
||||
series []*prompb.TimeSeries
|
||||
storage *MockStorage
|
||||
}
|
||||
|
||||
// NewRemoteReadServer creates a remote read server. It exposes a single endpoint and responds with the
|
||||
// passed series based on the request to the read endpoint. It returns a server which should be closed after
|
||||
// being used.
|
||||
func NewRemoteReadServer(t *testing.T) *RemoteReadServer {
|
||||
rrs := &RemoteReadServer{
|
||||
series: make([]*prompb.TimeSeries, 0),
|
||||
}
|
||||
rrs.server = httptest.NewServer(rrs.getReadHandler(t))
|
||||
return rrs
|
||||
}
|
||||
|
||||
// Close closes the server.
|
||||
func (rrs *RemoteReadServer) Close() {
|
||||
rrs.server.Close()
|
||||
}
|
||||
|
||||
func (rrs *RemoteReadServer) URL() string {
|
||||
return rrs.server.URL
|
||||
}
|
||||
|
||||
func (rrs *RemoteReadServer) SetRemoteReadSeries(series []*prompb.TimeSeries) {
|
||||
rrs.series = append(rrs.series, series...)
|
||||
}
|
||||
|
||||
func (rrs *RemoteReadServer) getReadHandler(t *testing.T) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if !validateReadHeaders(t, r) {
|
||||
t.Fatalf("invalid read headers")
|
||||
}
|
||||
|
||||
compressed, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("error read body: %s", err)
|
||||
}
|
||||
|
||||
reqBuf, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
t.Fatalf("error decode compressed data:%s", err)
|
||||
}
|
||||
|
||||
var req prompb.ReadRequest
|
||||
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||
t.Fatalf("error unmarshal read request: %s", err)
|
||||
}
|
||||
|
||||
resp := &prompb.ReadResponse{
|
||||
Results: make([]*prompb.QueryResult, len(req.Queries)),
|
||||
}
|
||||
|
||||
for i, r := range req.Queries {
|
||||
startTs := r.StartTimestampMs
|
||||
endTs := r.EndTimestampMs
|
||||
ts := make([]*prompb.TimeSeries, len(rrs.series))
|
||||
for i, s := range rrs.series {
|
||||
var samples []prompb.Sample
|
||||
for _, sample := range s.Samples {
|
||||
if sample.Timestamp >= startTs && sample.Timestamp < endTs {
|
||||
samples = append(samples, sample)
|
||||
}
|
||||
}
|
||||
var series prompb.TimeSeries
|
||||
if len(samples) > 0 {
|
||||
series.Labels = s.Labels
|
||||
series.Samples = samples
|
||||
}
|
||||
ts[i] = &series
|
||||
}
|
||||
|
||||
resp.Results[i] = &prompb.QueryResult{Timeseries: ts}
|
||||
data, err := proto.Marshal(resp)
|
||||
if err != nil {
|
||||
t.Fatalf("error marshal response: %s", err)
|
||||
}
|
||||
|
||||
compressed = snappy.Encode(nil, data)
|
||||
|
||||
w.Header().Set("Content-Type", "application/x-protobuf")
|
||||
w.Header().Set("Content-Encoding", "snappy")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
if _, err := w.Write(compressed); err != nil {
|
||||
t.Fatalf("snappy encode error: %s", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func NewRemoteReadStreamServer(t *testing.T) *RemoteReadServer {
|
||||
rrs := &RemoteReadServer{
|
||||
series: make([]*prompb.TimeSeries, 0),
|
||||
}
|
||||
rrs.server = httptest.NewServer(rrs.getStreamReadHandler(t))
|
||||
return rrs
|
||||
}
|
||||
|
||||
func (rrs *RemoteReadServer) InitMockStorage(series []*prompb.TimeSeries) {
|
||||
rrs.storage = NewMockStorage(series)
|
||||
}
|
||||
|
||||
func (rrs *RemoteReadServer) getStreamReadHandler(t *testing.T) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if !validateStreamReadHeaders(t, r) {
|
||||
t.Fatalf("invalid read headers")
|
||||
}
|
||||
|
||||
f, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
t.Fatalf("internal http.ResponseWriter does not implement http.Flusher interface")
|
||||
}
|
||||
|
||||
stream := remote.NewChunkedWriter(w, f)
|
||||
|
||||
data, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("error read body: %s", err)
|
||||
}
|
||||
|
||||
decodedData, err := snappy.Decode(nil, data)
|
||||
if err != nil {
|
||||
t.Fatalf("error decode compressed data:%s", err)
|
||||
}
|
||||
|
||||
var req prompb.ReadRequest
|
||||
if err := proto.Unmarshal(decodedData, &req); err != nil {
|
||||
t.Fatalf("error unmarshal read request: %s", err)
|
||||
}
|
||||
|
||||
var chks []prompb.Chunk
|
||||
ctx := context.Background()
|
||||
for idx, r := range req.Queries {
|
||||
startTs := r.StartTimestampMs
|
||||
endTs := r.EndTimestampMs
|
||||
|
||||
var matchers []*labels.Matcher
|
||||
cb := func() (int64, error) { return 0, nil }
|
||||
|
||||
c := remote.NewSampleAndChunkQueryableClient(rrs.storage, nil, matchers, true, cb)
|
||||
|
||||
q, err := c.ChunkQuerier(startTs, endTs)
|
||||
if err != nil {
|
||||
t.Fatalf("error init chunk querier: %s", err)
|
||||
}
|
||||
|
||||
ss := q.Select(ctx, false, nil, matchers...)
|
||||
var iter chunks.Iterator
|
||||
for ss.Next() {
|
||||
series := ss.At()
|
||||
iter = series.Iterator(iter)
|
||||
labels := remote.MergeLabels(labelsToLabelsProto(series.Labels()), nil)
|
||||
|
||||
frameBytesLeft := maxBytesInFrame
|
||||
for _, lb := range labels {
|
||||
frameBytesLeft -= lb.Size()
|
||||
}
|
||||
|
||||
isNext := iter.Next()
|
||||
|
||||
for isNext {
|
||||
chunk := iter.At()
|
||||
|
||||
if chunk.Chunk == nil {
|
||||
t.Fatalf("error found not populated chunk returned by SeriesSet at ref: %v", chunk.Ref)
|
||||
}
|
||||
|
||||
chks = append(chks, prompb.Chunk{
|
||||
MinTimeMs: chunk.MinTime,
|
||||
MaxTimeMs: chunk.MaxTime,
|
||||
Type: prompb.Chunk_Encoding(chunk.Chunk.Encoding()),
|
||||
Data: chunk.Chunk.Bytes(),
|
||||
})
|
||||
|
||||
frameBytesLeft -= chks[len(chks)-1].Size()
|
||||
|
||||
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
|
||||
isNext = iter.Next()
|
||||
if frameBytesLeft > 0 && isNext {
|
||||
continue
|
||||
}
|
||||
|
||||
resp := &prompb.ChunkedReadResponse{
|
||||
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||
{Labels: labels, Chunks: chks},
|
||||
},
|
||||
QueryIndex: int64(idx),
|
||||
}
|
||||
|
||||
b, err := proto.Marshal(resp)
|
||||
if err != nil {
|
||||
t.Fatalf("error marshal response: %s", err)
|
||||
}
|
||||
|
||||
if _, err := stream.Write(b); err != nil {
|
||||
t.Fatalf("error write to stream: %s", err)
|
||||
}
|
||||
chks = chks[:0]
|
||||
rrs.storage.Reset()
|
||||
}
|
||||
if err := iter.Err(); err != nil {
|
||||
t.Fatalf("error iterate over chunk series: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func validateReadHeaders(t *testing.T, r *http.Request) bool {
|
||||
if r.Method != http.MethodPost {
|
||||
t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost)
|
||||
}
|
||||
if r.Header.Get("Content-Encoding") != "snappy" {
|
||||
t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy")
|
||||
}
|
||||
if r.Header.Get("Content-Type") != "application/x-protobuf" {
|
||||
t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-protobuf")
|
||||
}
|
||||
|
||||
remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version")
|
||||
if remoteReadVersion == "" {
|
||||
t.Fatalf("got empty prometheus remote read header")
|
||||
}
|
||||
if !strings.HasPrefix(remoteReadVersion, "0.1.") {
|
||||
t.Fatalf("wrong remote version defined")
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func validateStreamReadHeaders(t *testing.T, r *http.Request) bool {
|
||||
if r.Method != http.MethodPost {
|
||||
t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost)
|
||||
}
|
||||
if r.Header.Get("Content-Encoding") != "snappy" {
|
||||
t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy")
|
||||
}
|
||||
if r.Header.Get("Content-Type") != "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" {
|
||||
t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
|
||||
}
|
||||
|
||||
remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version")
|
||||
if remoteReadVersion == "" {
|
||||
t.Fatalf("got empty prometheus remote read header")
|
||||
}
|
||||
if !strings.HasPrefix(remoteReadVersion, "0.1.") {
|
||||
t.Fatalf("wrong remote version defined")
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func GenerateRemoteReadSeries(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries {
|
||||
var ts []*prompb.TimeSeries
|
||||
j := 0
|
||||
for i := 0; i < int(numOfSeries); i++ {
|
||||
if i%3 == 0 {
|
||||
j++
|
||||
}
|
||||
|
||||
timeSeries := prompb.TimeSeries{
|
||||
Labels: []prompb.Label{
|
||||
{Name: labels.MetricName, Value: fmt.Sprintf("vm_metric_%d", j)},
|
||||
{Name: "job", Value: strconv.Itoa(i)},
|
||||
},
|
||||
}
|
||||
|
||||
ts = append(ts, &timeSeries)
|
||||
}
|
||||
|
||||
for i := range ts {
|
||||
ts[i].Samples = generateRemoteReadSamples(i, start, end, numOfSamples)
|
||||
}
|
||||
|
||||
return ts
|
||||
}
|
||||
|
||||
func generateRemoteReadSamples(idx int, startTime, endTime, numOfSamples int64) []prompb.Sample {
|
||||
samples := make([]prompb.Sample, 0)
|
||||
delta := (endTime - startTime) / numOfSamples
|
||||
|
||||
t := startTime
|
||||
for t != endTime {
|
||||
v := 100 * int64(idx)
|
||||
samples = append(samples, prompb.Sample{
|
||||
Timestamp: t * 1000,
|
||||
Value: float64(v),
|
||||
})
|
||||
t = t + delta
|
||||
}
|
||||
|
||||
return samples
|
||||
}
|
||||
|
||||
type MockStorage struct {
|
||||
query *prompb.Query
|
||||
store []*prompb.TimeSeries
|
||||
}
|
||||
|
||||
func NewMockStorage(series []*prompb.TimeSeries) *MockStorage {
|
||||
return &MockStorage{store: series}
|
||||
}
|
||||
|
||||
func (ms *MockStorage) Read(_ context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error) {
|
||||
if sortSeries {
|
||||
return nil, fmt.Errorf("unexpected sortSeries=true")
|
||||
}
|
||||
if ms.query != nil {
|
||||
return nil, fmt.Errorf("expected only one call to remote client got: %v", query)
|
||||
}
|
||||
ms.query = query
|
||||
|
||||
tss := make([]*prompb.TimeSeries, 0, len(ms.store))
|
||||
for _, s := range ms.store {
|
||||
var samples []prompb.Sample
|
||||
for _, sample := range s.Samples {
|
||||
if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp < query.EndTimestampMs {
|
||||
samples = append(samples, sample)
|
||||
}
|
||||
}
|
||||
var series prompb.TimeSeries
|
||||
if len(samples) > 0 {
|
||||
series.Labels = s.Labels
|
||||
series.Samples = samples
|
||||
}
|
||||
|
||||
tss = append(tss, &series)
|
||||
}
|
||||
return &mockSeriesSet{
|
||||
tss: tss,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ms *MockStorage) Reset() {
|
||||
ms.query = nil
|
||||
}
|
||||
|
||||
type mockSeriesSet struct {
|
||||
tss []*prompb.TimeSeries
|
||||
next int
|
||||
}
|
||||
|
||||
func (ss *mockSeriesSet) Next() bool {
|
||||
if ss.next >= len(ss.tss) {
|
||||
return false
|
||||
}
|
||||
ss.next++
|
||||
return true
|
||||
}
|
||||
|
||||
func (ss *mockSeriesSet) At() storage.Series {
|
||||
return &mockSeries{
|
||||
s: ss.tss[ss.next-1],
|
||||
}
|
||||
}
|
||||
|
||||
func (ss *mockSeriesSet) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *mockSeriesSet) Warnings() annotations.Annotations {
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockSeries struct {
|
||||
s *prompb.TimeSeries
|
||||
}
|
||||
|
||||
func (s *mockSeries) Labels() labels.Labels {
|
||||
a := make(labels.Labels, len(s.s.Labels))
|
||||
for i, label := range s.s.Labels {
|
||||
a[i] = labels.Label{
|
||||
Name: label.Name,
|
||||
Value: label.Value,
|
||||
}
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
func (s *mockSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
|
||||
return &mockSamplesIterator{
|
||||
samples: s.s.Samples,
|
||||
}
|
||||
}
|
||||
|
||||
type mockSamplesIterator struct {
|
||||
samples []prompb.Sample
|
||||
next int
|
||||
}
|
||||
|
||||
func (si *mockSamplesIterator) Next() chunkenc.ValueType {
|
||||
if si.next >= len(si.samples) {
|
||||
return chunkenc.ValNone
|
||||
}
|
||||
si.next++
|
||||
return chunkenc.ValFloat
|
||||
}
|
||||
|
||||
func (si *mockSamplesIterator) Seek(t int64) chunkenc.ValueType {
|
||||
for i := range si.samples {
|
||||
if si.samples[i].Timestamp >= t {
|
||||
si.next = i + 1
|
||||
return chunkenc.ValFloat
|
||||
}
|
||||
}
|
||||
return chunkenc.ValNone
|
||||
}
|
||||
|
||||
func (si *mockSamplesIterator) At() (int64, float64) {
|
||||
s := si.samples[si.next-1]
|
||||
return s.Timestamp, s.Value
|
||||
}
|
||||
|
||||
func (si *mockSamplesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) {
|
||||
panic("BUG: musn't be called")
|
||||
}
|
||||
|
||||
func (si *mockSamplesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
||||
panic("BUG: mustn't be called")
|
||||
}
|
||||
|
||||
func (si *mockSamplesIterator) AtT() int64 {
|
||||
return si.samples[si.next-1].Timestamp
|
||||
}
|
||||
|
||||
func (si *mockSamplesIterator) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func labelsToLabelsProto(labels labels.Labels) []prompb.Label {
|
||||
result := make([]prompb.Label, 0, len(labels))
|
||||
for _, l := range labels {
|
||||
result = append(result, prompb.Label{
|
||||
Name: l.Name,
|
||||
Value: l.Value,
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user