diff --git a/app/vmctl/testdata/servers_integration_test/remote_write_server.go b/app/vmctl/testdata/servers_integration_test/remote_write_server.go index 9671ddb94..e6525af1d 100644 --- a/app/vmctl/testdata/servers_integration_test/remote_write_server.go +++ b/app/vmctl/testdata/servers_integration_test/remote_write_server.go @@ -2,39 +2,70 @@ package remote_read_integration import ( "bufio" + "encoding/json" + "fmt" + "log" "net/http" "net/http/httptest" "reflect" + "sort" + "strconv" "testing" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" ) +// LabelValues represents series from api/v1/series response +type LabelValues map[string]string + +// Response represents response from api/v1/series +type Response struct { + Status string `json:"status"` + Series []LabelValues `json:"data"` +} + +// RemoteWriteServer represents fake remote write server with database type RemoteWriteServer struct { - server *httptest.Server - series []vm.TimeSeries + server *httptest.Server + series []vm.TimeSeries + expectedSeries []vm.TimeSeries } // NewRemoteWriteServer prepares test remote write server func NewRemoteWriteServer(t *testing.T) *RemoteWriteServer { rws := &RemoteWriteServer{series: make([]vm.TimeSeries, 0)} mux := http.NewServeMux() + mux.Handle("/api/v1/import", rws.getWriteHandler(t)) mux.Handle("/health", rws.handlePing()) + mux.Handle("/api/v1/series", rws.seriesHandler()) + mux.Handle("/api/v1/export/native", rws.exportNativeHandler()) + mux.Handle("/api/v1/import/native", rws.importNativeHandler(t)) rws.server = httptest.NewServer(mux) return rws } -// Close closes the server. +// Close closes the server func (rws *RemoteWriteServer) Close() { rws.server.Close() } -func (rws *RemoteWriteServer) ExpectedSeries(series []vm.TimeSeries) { +// Series saves generated series for fake database +func (rws *RemoteWriteServer) Series(series []vm.TimeSeries) { rws.series = append(rws.series, series...) } +// ExpectedSeries saves expected results to check in the handler +func (rws *RemoteWriteServer) ExpectedSeries(series []vm.TimeSeries) { + rws.expectedSeries = append(rws.expectedSeries, series...) +} + +// URL returns server url func (rws *RemoteWriteServer) URL() string { return rws.server.URL } @@ -68,13 +99,14 @@ func (rws *RemoteWriteServer) getWriteHandler(t *testing.T) http.Handler { rows.Reset() } - if !reflect.DeepEqual(tss, rws.series) { + if !reflect.DeepEqual(tss, rws.expectedSeries) { w.WriteHeader(http.StatusInternalServerError) - t.Fatalf("datasets not equal, expected: %#v; \n got: %#v", rws.series, tss) + t.Fatalf("datasets not equal, expected: %#v; \n got: %#v", rws.expectedSeries, tss) return } w.WriteHeader(http.StatusNoContent) + return }) } @@ -84,3 +116,146 @@ func (rws *RemoteWriteServer) handlePing() http.Handler { _, _ = w.Write([]byte("OK")) }) } + +func (rws *RemoteWriteServer) seriesHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var labelValues []LabelValues + for _, ser := range rws.series { + metricNames := make(LabelValues) + if ser.Name != "" { + metricNames["__name__"] = ser.Name + } + for _, p := range ser.LabelPairs { + metricNames[p.Name] = p.Value + } + labelValues = append(labelValues, metricNames) + } + + resp := Response{ + Status: "success", + Series: labelValues, + } + + err := json.NewEncoder(w).Encode(resp) + if err != nil { + log.Printf("error send series: %s", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + }) +} + +func (rws *RemoteWriteServer) exportNativeHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + now := time.Now() + err := prometheus.ExportNativeHandler(now, w, r) + if err != nil { + log.Printf("error export series via native protocol: %s", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) + return + }) +} + +func (rws *RemoteWriteServer) importNativeHandler(t *testing.T) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + common.StartUnmarshalWorkers() + defer common.StopUnmarshalWorkers() + + var gotTimeSeries []vm.TimeSeries + + err := stream.Parse(r.Body, false, func(block *stream.Block) error { + mn := &block.MetricName + var timeseries vm.TimeSeries + timeseries.Name = string(mn.MetricGroup) + timeseries.Timestamps = append(timeseries.Timestamps, block.Timestamps...) + timeseries.Values = append(timeseries.Values, block.Values...) + + for i := range mn.Tags { + tag := &mn.Tags[i] + timeseries.LabelPairs = append(timeseries.LabelPairs, vm.LabelPair{ + Name: string(tag.Key), + Value: string(tag.Value), + }) + } + + gotTimeSeries = append(gotTimeSeries, timeseries) + + return nil + }) + if err != nil { + log.Printf("error parse stream blocks: %s", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + // got timeseries should be sorted + // because they are processed independently + sort.SliceStable(gotTimeSeries, func(i, j int) bool { + iv, jv := gotTimeSeries[i], gotTimeSeries[j] + switch { + case iv.Values[0] != jv.Values[0]: + return iv.Values[0] < jv.Values[0] + case iv.Timestamps[0] != jv.Timestamps[0]: + return iv.Timestamps[0] < jv.Timestamps[0] + default: + return iv.Name < jv.Name + } + }) + + if !reflect.DeepEqual(gotTimeSeries, rws.expectedSeries) { + w.WriteHeader(http.StatusInternalServerError) + t.Fatalf("datasets not equal, expected: %#v;\n got: %#v", rws.expectedSeries, gotTimeSeries) + } + + w.WriteHeader(http.StatusNoContent) + return + }) +} + +// GenerateVNSeries generates test timeseries +func GenerateVNSeries(start, end, numOfSeries, numOfSamples int64) []vm.TimeSeries { + var ts []vm.TimeSeries + j := 0 + for i := 0; i < int(numOfSeries); i++ { + if i%3 == 0 { + j++ + } + + timeSeries := vm.TimeSeries{ + Name: fmt.Sprintf("vm_metric_%d", j), + LabelPairs: []vm.LabelPair{ + {Name: "job", Value: strconv.Itoa(i)}, + }, + } + + ts = append(ts, timeSeries) + } + + for i := range ts { + t, v := generateTimeStampsAndValues(i, start, end, numOfSamples) + ts[i].Timestamps = t + ts[i].Values = v + } + + return ts +} + +func generateTimeStampsAndValues(idx int, startTime, endTime, numOfSamples int64) ([]int64, []float64) { + delta := (endTime - startTime) / numOfSamples + + var timestamps []int64 + var values []float64 + t := startTime + for t != endTime { + v := 100 * int64(idx) + timestamps = append(timestamps, t*1000) + values = append(values, float64(v)) + t = t + delta + } + + return timestamps, values +} diff --git a/app/vmctl/vm_native_test.go b/app/vmctl/vm_native_test.go index 79329d8f5..82038e352 100644 --- a/app/vmctl/vm_native_test.go +++ b/app/vmctl/vm_native_test.go @@ -2,118 +2,295 @@ package main import ( "context" + "flag" + "fmt" + "log" + "os" "testing" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" + remote_read_integration "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) -// If you want to run this test: -// 1. run two instances of victoriametrics and define -httpListenAddr for both or just for second instance -// 2. define srcAddr and dstAddr const with your victoriametrics addresses -// 3. define matchFilter const with your importing data -// 4. define timeStartFilter -// 5. run each test one by one - const ( - matchFilter = `{job="avalanche"}` - timeStartFilter = "2020-01-01T20:07:00Z" - timeEndFilter = "2020-08-01T20:07:00Z" - srcAddr = "http://127.0.0.1:8428" - dstAddr = "http://127.0.0.1:8528" + storagePath = "TestStorage" + retentionPeriod = "100y" ) -// This test simulates close process if user abort it func Test_vmNativeProcessor_run(t *testing.T) { - t.Skip() + + processFlags() + vmstorage.Init(promql.ResetRollupResultCacheIfNeeded) + defer func() { + vmstorage.Stop() + if err := os.RemoveAll(storagePath); err != nil { + log.Fatalf("cannot remove %q: %s", storagePath, err) + } + }() + type fields struct { - filter native.Filter - rateLimit int64 - dst *native.Client - src *native.Client + filter native.Filter + dst *native.Client + src *native.Client + backoff *backoff.Backoff + s *stats + rateLimit int64 + interCluster bool + cc int + matchName string + matchValue string } + type args struct { + ctx context.Context + silent bool + } + tests := []struct { - name string - fields fields - closer func(cancelFunc context.CancelFunc) - wantErr bool + name string + fields fields + args args + vmSeries func(start, end, numOfSeries, numOfSamples int64) []vm.TimeSeries + expectedSeries []vm.TimeSeries + start string + end string + numOfSamples int64 + numOfSeries int64 + chunk string + wantErr bool }{ { - name: "simulate syscall.SIGINT", + name: "step minute on minute time range", + start: "2022-11-25T11:23:05+02:00", + end: "2022-11-27T11:24:05+02:00", + numOfSamples: 2, + numOfSeries: 3, + chunk: stepper.StepMinute, fields: fields{ - filter: native.Filter{ - Match: matchFilter, - TimeStart: timeStartFilter, + filter: native.Filter{}, + backoff: backoff.New(), + rateLimit: 0, + interCluster: false, + cc: 1, + matchName: "__name__", + matchValue: ".*", + }, + args: args{ + ctx: context.Background(), + silent: true, + }, + vmSeries: remote_read_integration.GenerateVNSeries, + expectedSeries: []vm.TimeSeries{ + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, + Timestamps: []int64{1669368185000, 1669454615000}, + Values: []float64{0, 0}, }, - rateLimit: 0, - dst: &native.Client{ - Addr: dstAddr, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, + Timestamps: []int64{1669368185000, 1669454615000}, + Values: []float64{100, 100}, }, - src: &native.Client{ - Addr: srcAddr, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, + Timestamps: []int64{1669368185000, 1669454615000}, + Values: []float64{200, 200}, }, }, - closer: func(cancelFunc context.CancelFunc) { - time.Sleep(time.Second * 5) - cancelFunc() - }, - wantErr: true, - }, - { - name: "simulate correct work", - fields: fields{ - filter: native.Filter{ - Match: matchFilter, - TimeStart: timeStartFilter, - }, - rateLimit: 0, - dst: &native.Client{ - Addr: dstAddr, - }, - src: &native.Client{ - Addr: srcAddr, - }, - }, - closer: func(cancelFunc context.CancelFunc) {}, wantErr: false, }, { - name: "simulate correct work with chunking", + name: "step month on month time range", + start: "2022-09-26T11:23:05+02:00", + end: "2022-11-26T11:24:05+02:00", + numOfSamples: 2, + numOfSeries: 3, + chunk: stepper.StepMonth, fields: fields{ - filter: native.Filter{ - Match: matchFilter, - TimeStart: timeStartFilter, - TimeEnd: timeEndFilter, - Chunk: stepper.StepMonth, + filter: native.Filter{}, + backoff: backoff.New(), + rateLimit: 0, + interCluster: false, + cc: 1, + matchName: "__name__", + matchValue: ".*", + }, + args: args{ + ctx: context.Background(), + silent: true, + }, + vmSeries: remote_read_integration.GenerateVNSeries, + expectedSeries: []vm.TimeSeries{ + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, + Timestamps: []int64{1664184185000}, + Values: []float64{0}, }, - rateLimit: 0, - dst: &native.Client{ - Addr: dstAddr, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, + Timestamps: []int64{1666819415000}, + Values: []float64{0}, }, - src: &native.Client{ - Addr: srcAddr, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, + Timestamps: []int64{1664184185000}, + Values: []float64{100}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, + Timestamps: []int64{1666819415000}, + Values: []float64{100}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, + Timestamps: []int64{1664184185000}, + Values: []float64{200}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, + Timestamps: []int64{1666819415000}, + Values: []float64{200}, }, }, - closer: func(cancelFunc context.CancelFunc) {}, wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx, cancelFn := context.WithCancel(context.Background()) - p := &vmNativeProcessor{ - filter: tt.fields.filter, - rateLimit: tt.fields.rateLimit, - dst: tt.fields.dst, - src: tt.fields.src, + src := remote_read_integration.NewRemoteWriteServer(t) + dst := remote_read_integration.NewRemoteWriteServer(t) + + defer func() { + src.Close() + dst.Close() + }() + + start, err := time.Parse(time.RFC3339, tt.start) + if err != nil { + t.Fatalf("Error parse start time: %s", err) } - tt.closer(cancelFn) + end, err := time.Parse(time.RFC3339, tt.end) + if err != nil { + t.Fatalf("Error parse end time: %s", err) + } - if err := p.run(ctx, true); (err != nil) != tt.wantErr { + tt.fields.filter.Match = fmt.Sprintf("%s=%q", tt.fields.matchName, tt.fields.matchValue) + tt.fields.filter.TimeStart = tt.start + tt.fields.filter.TimeEnd = tt.end + + rws := tt.vmSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) + + src.Series(rws) + dst.ExpectedSeries(tt.expectedSeries) + + if err := fillStorage(rws); err != nil { + t.Fatalf("error add series to storage: %s", err) + } + + tt.fields.src = &native.Client{ + AuthCfg: nil, + Addr: src.URL(), + ExtraLabels: []string{}, + DisableHTTPKeepAlive: false, + } + tt.fields.dst = &native.Client{ + AuthCfg: nil, + Addr: dst.URL(), + ExtraLabels: []string{}, + DisableHTTPKeepAlive: false, + } + + p := &vmNativeProcessor{ + filter: tt.fields.filter, + dst: tt.fields.dst, + src: tt.fields.src, + backoff: tt.fields.backoff, + s: tt.fields.s, + rateLimit: tt.fields.rateLimit, + interCluster: tt.fields.interCluster, + cc: tt.fields.cc, + } + + if err := p.run(tt.args.ctx, tt.args.silent); (err != nil) != tt.wantErr { t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr) } + deleted, err := deleteSeries(tt.fields.matchName, tt.fields.matchValue) + if err != nil { + t.Fatalf("error delete series: %s", err) + } + if int64(deleted) != tt.numOfSeries { + t.Fatalf("expected deleted series %d; got deleted series %d", tt.numOfSeries, deleted) + } }) } } + +func processFlags() { + flag.Parse() + for _, fv := range []struct { + flag string + value string + }{ + {flag: "storageDataPath", value: storagePath}, + {flag: "retentionPeriod", value: retentionPeriod}, + } { + // panics if flag doesn't exist + if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil { + log.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err) + } + } +} + +func fillStorage(series []vm.TimeSeries) error { + var mrs []storage.MetricRow + for _, series := range series { + var labels []prompb.Label + for _, lp := range series.LabelPairs { + labels = append(labels, prompb.Label{Name: []byte(lp.Name), Value: []byte(lp.Value)}) + } + if series.Name != "" { + labels = append(labels, prompb.Label{Name: []byte("__name__"), Value: []byte(series.Name)}) + } + mr := storage.MetricRow{} + mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels) + + timestamps := series.Timestamps + values := series.Values + for i, value := range values { + mr.Timestamp = timestamps[i] + mr.Value = value + mrs = append(mrs, mr) + } + } + + if err := vmstorage.AddRows(mrs); err != nil { + return fmt.Errorf("unexpected error in AddRows: %s", err) + } + vmstorage.Storage.DebugFlush() + return nil +} + +func deleteSeries(name, value string) (int, error) { + tfs := storage.NewTagFilters() + if err := tfs.Add([]byte(name), []byte(value), false, true); err != nil { + return 0, fmt.Errorf("unexpected error in TagFilters.Add: %w", err) + } + return vmstorage.DeleteSeries(nil, []*storage.TagFilters{tfs}) +}