mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
app/vmctl: integration test for native protocol (#3947)
* app/vmctl: integration test for native protocol * app/vmctl: implemented two integration tests * app/vmctl: cleanup * app/vmctl: split storage init and filling data logic * app/vmctl: cleanup * app/vmctl: remove storage from server, used initialization process * app/vmctl: prepare for parallel run, code cleanup * app/vmctl: code cleanup * app/vmctl: remove unused field
This commit is contained in:
parent
6f6333831e
commit
4d68f5b1fc
@ -2,39 +2,70 @@ package remote_read_integration
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
|
||||
)
|
||||
|
||||
// LabelValues represents series from api/v1/series response
|
||||
type LabelValues map[string]string
|
||||
|
||||
// Response represents response from api/v1/series
|
||||
type Response struct {
|
||||
Status string `json:"status"`
|
||||
Series []LabelValues `json:"data"`
|
||||
}
|
||||
|
||||
// RemoteWriteServer represents fake remote write server with database
|
||||
type RemoteWriteServer struct {
|
||||
server *httptest.Server
|
||||
series []vm.TimeSeries
|
||||
server *httptest.Server
|
||||
series []vm.TimeSeries
|
||||
expectedSeries []vm.TimeSeries
|
||||
}
|
||||
|
||||
// NewRemoteWriteServer prepares test remote write server
|
||||
func NewRemoteWriteServer(t *testing.T) *RemoteWriteServer {
|
||||
rws := &RemoteWriteServer{series: make([]vm.TimeSeries, 0)}
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.Handle("/api/v1/import", rws.getWriteHandler(t))
|
||||
mux.Handle("/health", rws.handlePing())
|
||||
mux.Handle("/api/v1/series", rws.seriesHandler())
|
||||
mux.Handle("/api/v1/export/native", rws.exportNativeHandler())
|
||||
mux.Handle("/api/v1/import/native", rws.importNativeHandler(t))
|
||||
rws.server = httptest.NewServer(mux)
|
||||
return rws
|
||||
}
|
||||
|
||||
// Close closes the server.
|
||||
// Close closes the server
|
||||
func (rws *RemoteWriteServer) Close() {
|
||||
rws.server.Close()
|
||||
}
|
||||
|
||||
func (rws *RemoteWriteServer) ExpectedSeries(series []vm.TimeSeries) {
|
||||
// Series saves generated series for fake database
|
||||
func (rws *RemoteWriteServer) Series(series []vm.TimeSeries) {
|
||||
rws.series = append(rws.series, series...)
|
||||
}
|
||||
|
||||
// ExpectedSeries saves expected results to check in the handler
|
||||
func (rws *RemoteWriteServer) ExpectedSeries(series []vm.TimeSeries) {
|
||||
rws.expectedSeries = append(rws.expectedSeries, series...)
|
||||
}
|
||||
|
||||
// URL returns server url
|
||||
func (rws *RemoteWriteServer) URL() string {
|
||||
return rws.server.URL
|
||||
}
|
||||
@ -68,13 +99,14 @@ func (rws *RemoteWriteServer) getWriteHandler(t *testing.T) http.Handler {
|
||||
rows.Reset()
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(tss, rws.series) {
|
||||
if !reflect.DeepEqual(tss, rws.expectedSeries) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
t.Fatalf("datasets not equal, expected: %#v; \n got: %#v", rws.series, tss)
|
||||
t.Fatalf("datasets not equal, expected: %#v; \n got: %#v", rws.expectedSeries, tss)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
@ -84,3 +116,146 @@ func (rws *RemoteWriteServer) handlePing() http.Handler {
|
||||
_, _ = w.Write([]byte("OK"))
|
||||
})
|
||||
}
|
||||
|
||||
func (rws *RemoteWriteServer) seriesHandler() http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var labelValues []LabelValues
|
||||
for _, ser := range rws.series {
|
||||
metricNames := make(LabelValues)
|
||||
if ser.Name != "" {
|
||||
metricNames["__name__"] = ser.Name
|
||||
}
|
||||
for _, p := range ser.LabelPairs {
|
||||
metricNames[p.Name] = p.Value
|
||||
}
|
||||
labelValues = append(labelValues, metricNames)
|
||||
}
|
||||
|
||||
resp := Response{
|
||||
Status: "success",
|
||||
Series: labelValues,
|
||||
}
|
||||
|
||||
err := json.NewEncoder(w).Encode(resp)
|
||||
if err != nil {
|
||||
log.Printf("error send series: %s", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (rws *RemoteWriteServer) exportNativeHandler() http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
now := time.Now()
|
||||
err := prometheus.ExportNativeHandler(now, w, r)
|
||||
if err != nil {
|
||||
log.Printf("error export series via native protocol: %s", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
func (rws *RemoteWriteServer) importNativeHandler(t *testing.T) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
common.StartUnmarshalWorkers()
|
||||
defer common.StopUnmarshalWorkers()
|
||||
|
||||
var gotTimeSeries []vm.TimeSeries
|
||||
|
||||
err := stream.Parse(r.Body, false, func(block *stream.Block) error {
|
||||
mn := &block.MetricName
|
||||
var timeseries vm.TimeSeries
|
||||
timeseries.Name = string(mn.MetricGroup)
|
||||
timeseries.Timestamps = append(timeseries.Timestamps, block.Timestamps...)
|
||||
timeseries.Values = append(timeseries.Values, block.Values...)
|
||||
|
||||
for i := range mn.Tags {
|
||||
tag := &mn.Tags[i]
|
||||
timeseries.LabelPairs = append(timeseries.LabelPairs, vm.LabelPair{
|
||||
Name: string(tag.Key),
|
||||
Value: string(tag.Value),
|
||||
})
|
||||
}
|
||||
|
||||
gotTimeSeries = append(gotTimeSeries, timeseries)
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("error parse stream blocks: %s", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// got timeseries should be sorted
|
||||
// because they are processed independently
|
||||
sort.SliceStable(gotTimeSeries, func(i, j int) bool {
|
||||
iv, jv := gotTimeSeries[i], gotTimeSeries[j]
|
||||
switch {
|
||||
case iv.Values[0] != jv.Values[0]:
|
||||
return iv.Values[0] < jv.Values[0]
|
||||
case iv.Timestamps[0] != jv.Timestamps[0]:
|
||||
return iv.Timestamps[0] < jv.Timestamps[0]
|
||||
default:
|
||||
return iv.Name < jv.Name
|
||||
}
|
||||
})
|
||||
|
||||
if !reflect.DeepEqual(gotTimeSeries, rws.expectedSeries) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
t.Fatalf("datasets not equal, expected: %#v;\n got: %#v", rws.expectedSeries, gotTimeSeries)
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
// GenerateVNSeries generates test timeseries
|
||||
func GenerateVNSeries(start, end, numOfSeries, numOfSamples int64) []vm.TimeSeries {
|
||||
var ts []vm.TimeSeries
|
||||
j := 0
|
||||
for i := 0; i < int(numOfSeries); i++ {
|
||||
if i%3 == 0 {
|
||||
j++
|
||||
}
|
||||
|
||||
timeSeries := vm.TimeSeries{
|
||||
Name: fmt.Sprintf("vm_metric_%d", j),
|
||||
LabelPairs: []vm.LabelPair{
|
||||
{Name: "job", Value: strconv.Itoa(i)},
|
||||
},
|
||||
}
|
||||
|
||||
ts = append(ts, timeSeries)
|
||||
}
|
||||
|
||||
for i := range ts {
|
||||
t, v := generateTimeStampsAndValues(i, start, end, numOfSamples)
|
||||
ts[i].Timestamps = t
|
||||
ts[i].Values = v
|
||||
}
|
||||
|
||||
return ts
|
||||
}
|
||||
|
||||
func generateTimeStampsAndValues(idx int, startTime, endTime, numOfSamples int64) ([]int64, []float64) {
|
||||
delta := (endTime - startTime) / numOfSamples
|
||||
|
||||
var timestamps []int64
|
||||
var values []float64
|
||||
t := startTime
|
||||
for t != endTime {
|
||||
v := 100 * int64(idx)
|
||||
timestamps = append(timestamps, t*1000)
|
||||
values = append(values, float64(v))
|
||||
t = t + delta
|
||||
}
|
||||
|
||||
return timestamps, values
|
||||
}
|
||||
|
@ -2,118 +2,295 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
|
||||
remote_read_integration "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
)
|
||||
|
||||
// If you want to run this test:
|
||||
// 1. run two instances of victoriametrics and define -httpListenAddr for both or just for second instance
|
||||
// 2. define srcAddr and dstAddr const with your victoriametrics addresses
|
||||
// 3. define matchFilter const with your importing data
|
||||
// 4. define timeStartFilter
|
||||
// 5. run each test one by one
|
||||
|
||||
const (
|
||||
matchFilter = `{job="avalanche"}`
|
||||
timeStartFilter = "2020-01-01T20:07:00Z"
|
||||
timeEndFilter = "2020-08-01T20:07:00Z"
|
||||
srcAddr = "http://127.0.0.1:8428"
|
||||
dstAddr = "http://127.0.0.1:8528"
|
||||
storagePath = "TestStorage"
|
||||
retentionPeriod = "100y"
|
||||
)
|
||||
|
||||
// This test simulates close process if user abort it
|
||||
func Test_vmNativeProcessor_run(t *testing.T) {
|
||||
t.Skip()
|
||||
|
||||
processFlags()
|
||||
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
|
||||
defer func() {
|
||||
vmstorage.Stop()
|
||||
if err := os.RemoveAll(storagePath); err != nil {
|
||||
log.Fatalf("cannot remove %q: %s", storagePath, err)
|
||||
}
|
||||
}()
|
||||
|
||||
type fields struct {
|
||||
filter native.Filter
|
||||
rateLimit int64
|
||||
dst *native.Client
|
||||
src *native.Client
|
||||
filter native.Filter
|
||||
dst *native.Client
|
||||
src *native.Client
|
||||
backoff *backoff.Backoff
|
||||
s *stats
|
||||
rateLimit int64
|
||||
interCluster bool
|
||||
cc int
|
||||
matchName string
|
||||
matchValue string
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
silent bool
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
closer func(cancelFunc context.CancelFunc)
|
||||
wantErr bool
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
vmSeries func(start, end, numOfSeries, numOfSamples int64) []vm.TimeSeries
|
||||
expectedSeries []vm.TimeSeries
|
||||
start string
|
||||
end string
|
||||
numOfSamples int64
|
||||
numOfSeries int64
|
||||
chunk string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "simulate syscall.SIGINT",
|
||||
name: "step minute on minute time range",
|
||||
start: "2022-11-25T11:23:05+02:00",
|
||||
end: "2022-11-27T11:24:05+02:00",
|
||||
numOfSamples: 2,
|
||||
numOfSeries: 3,
|
||||
chunk: stepper.StepMinute,
|
||||
fields: fields{
|
||||
filter: native.Filter{
|
||||
Match: matchFilter,
|
||||
TimeStart: timeStartFilter,
|
||||
filter: native.Filter{},
|
||||
backoff: backoff.New(),
|
||||
rateLimit: 0,
|
||||
interCluster: false,
|
||||
cc: 1,
|
||||
matchName: "__name__",
|
||||
matchValue: ".*",
|
||||
},
|
||||
args: args{
|
||||
ctx: context.Background(),
|
||||
silent: true,
|
||||
},
|
||||
vmSeries: remote_read_integration.GenerateVNSeries,
|
||||
expectedSeries: []vm.TimeSeries{
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
Timestamps: []int64{1669368185000, 1669454615000},
|
||||
Values: []float64{0, 0},
|
||||
},
|
||||
rateLimit: 0,
|
||||
dst: &native.Client{
|
||||
Addr: dstAddr,
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
Timestamps: []int64{1669368185000, 1669454615000},
|
||||
Values: []float64{100, 100},
|
||||
},
|
||||
src: &native.Client{
|
||||
Addr: srcAddr,
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
Timestamps: []int64{1669368185000, 1669454615000},
|
||||
Values: []float64{200, 200},
|
||||
},
|
||||
},
|
||||
closer: func(cancelFunc context.CancelFunc) {
|
||||
time.Sleep(time.Second * 5)
|
||||
cancelFunc()
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "simulate correct work",
|
||||
fields: fields{
|
||||
filter: native.Filter{
|
||||
Match: matchFilter,
|
||||
TimeStart: timeStartFilter,
|
||||
},
|
||||
rateLimit: 0,
|
||||
dst: &native.Client{
|
||||
Addr: dstAddr,
|
||||
},
|
||||
src: &native.Client{
|
||||
Addr: srcAddr,
|
||||
},
|
||||
},
|
||||
closer: func(cancelFunc context.CancelFunc) {},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "simulate correct work with chunking",
|
||||
name: "step month on month time range",
|
||||
start: "2022-09-26T11:23:05+02:00",
|
||||
end: "2022-11-26T11:24:05+02:00",
|
||||
numOfSamples: 2,
|
||||
numOfSeries: 3,
|
||||
chunk: stepper.StepMonth,
|
||||
fields: fields{
|
||||
filter: native.Filter{
|
||||
Match: matchFilter,
|
||||
TimeStart: timeStartFilter,
|
||||
TimeEnd: timeEndFilter,
|
||||
Chunk: stepper.StepMonth,
|
||||
filter: native.Filter{},
|
||||
backoff: backoff.New(),
|
||||
rateLimit: 0,
|
||||
interCluster: false,
|
||||
cc: 1,
|
||||
matchName: "__name__",
|
||||
matchValue: ".*",
|
||||
},
|
||||
args: args{
|
||||
ctx: context.Background(),
|
||||
silent: true,
|
||||
},
|
||||
vmSeries: remote_read_integration.GenerateVNSeries,
|
||||
expectedSeries: []vm.TimeSeries{
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
Timestamps: []int64{1664184185000},
|
||||
Values: []float64{0},
|
||||
},
|
||||
rateLimit: 0,
|
||||
dst: &native.Client{
|
||||
Addr: dstAddr,
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||
Timestamps: []int64{1666819415000},
|
||||
Values: []float64{0},
|
||||
},
|
||||
src: &native.Client{
|
||||
Addr: srcAddr,
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
Timestamps: []int64{1664184185000},
|
||||
Values: []float64{100},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||
Timestamps: []int64{1666819415000},
|
||||
Values: []float64{100},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
Timestamps: []int64{1664184185000},
|
||||
Values: []float64{200},
|
||||
},
|
||||
{
|
||||
Name: "vm_metric_1",
|
||||
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||
Timestamps: []int64{1666819415000},
|
||||
Values: []float64{200},
|
||||
},
|
||||
},
|
||||
closer: func(cancelFunc context.CancelFunc) {},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
p := &vmNativeProcessor{
|
||||
filter: tt.fields.filter,
|
||||
rateLimit: tt.fields.rateLimit,
|
||||
dst: tt.fields.dst,
|
||||
src: tt.fields.src,
|
||||
src := remote_read_integration.NewRemoteWriteServer(t)
|
||||
dst := remote_read_integration.NewRemoteWriteServer(t)
|
||||
|
||||
defer func() {
|
||||
src.Close()
|
||||
dst.Close()
|
||||
}()
|
||||
|
||||
start, err := time.Parse(time.RFC3339, tt.start)
|
||||
if err != nil {
|
||||
t.Fatalf("Error parse start time: %s", err)
|
||||
}
|
||||
|
||||
tt.closer(cancelFn)
|
||||
end, err := time.Parse(time.RFC3339, tt.end)
|
||||
if err != nil {
|
||||
t.Fatalf("Error parse end time: %s", err)
|
||||
}
|
||||
|
||||
if err := p.run(ctx, true); (err != nil) != tt.wantErr {
|
||||
tt.fields.filter.Match = fmt.Sprintf("%s=%q", tt.fields.matchName, tt.fields.matchValue)
|
||||
tt.fields.filter.TimeStart = tt.start
|
||||
tt.fields.filter.TimeEnd = tt.end
|
||||
|
||||
rws := tt.vmSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
|
||||
|
||||
src.Series(rws)
|
||||
dst.ExpectedSeries(tt.expectedSeries)
|
||||
|
||||
if err := fillStorage(rws); err != nil {
|
||||
t.Fatalf("error add series to storage: %s", err)
|
||||
}
|
||||
|
||||
tt.fields.src = &native.Client{
|
||||
AuthCfg: nil,
|
||||
Addr: src.URL(),
|
||||
ExtraLabels: []string{},
|
||||
DisableHTTPKeepAlive: false,
|
||||
}
|
||||
tt.fields.dst = &native.Client{
|
||||
AuthCfg: nil,
|
||||
Addr: dst.URL(),
|
||||
ExtraLabels: []string{},
|
||||
DisableHTTPKeepAlive: false,
|
||||
}
|
||||
|
||||
p := &vmNativeProcessor{
|
||||
filter: tt.fields.filter,
|
||||
dst: tt.fields.dst,
|
||||
src: tt.fields.src,
|
||||
backoff: tt.fields.backoff,
|
||||
s: tt.fields.s,
|
||||
rateLimit: tt.fields.rateLimit,
|
||||
interCluster: tt.fields.interCluster,
|
||||
cc: tt.fields.cc,
|
||||
}
|
||||
|
||||
if err := p.run(tt.args.ctx, tt.args.silent); (err != nil) != tt.wantErr {
|
||||
t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
deleted, err := deleteSeries(tt.fields.matchName, tt.fields.matchValue)
|
||||
if err != nil {
|
||||
t.Fatalf("error delete series: %s", err)
|
||||
}
|
||||
if int64(deleted) != tt.numOfSeries {
|
||||
t.Fatalf("expected deleted series %d; got deleted series %d", tt.numOfSeries, deleted)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func processFlags() {
|
||||
flag.Parse()
|
||||
for _, fv := range []struct {
|
||||
flag string
|
||||
value string
|
||||
}{
|
||||
{flag: "storageDataPath", value: storagePath},
|
||||
{flag: "retentionPeriod", value: retentionPeriod},
|
||||
} {
|
||||
// panics if flag doesn't exist
|
||||
if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil {
|
||||
log.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func fillStorage(series []vm.TimeSeries) error {
|
||||
var mrs []storage.MetricRow
|
||||
for _, series := range series {
|
||||
var labels []prompb.Label
|
||||
for _, lp := range series.LabelPairs {
|
||||
labels = append(labels, prompb.Label{Name: []byte(lp.Name), Value: []byte(lp.Value)})
|
||||
}
|
||||
if series.Name != "" {
|
||||
labels = append(labels, prompb.Label{Name: []byte("__name__"), Value: []byte(series.Name)})
|
||||
}
|
||||
mr := storage.MetricRow{}
|
||||
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels)
|
||||
|
||||
timestamps := series.Timestamps
|
||||
values := series.Values
|
||||
for i, value := range values {
|
||||
mr.Timestamp = timestamps[i]
|
||||
mr.Value = value
|
||||
mrs = append(mrs, mr)
|
||||
}
|
||||
}
|
||||
|
||||
if err := vmstorage.AddRows(mrs); err != nil {
|
||||
return fmt.Errorf("unexpected error in AddRows: %s", err)
|
||||
}
|
||||
vmstorage.Storage.DebugFlush()
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteSeries(name, value string) (int, error) {
|
||||
tfs := storage.NewTagFilters()
|
||||
if err := tfs.Add([]byte(name), []byte(value), false, true); err != nil {
|
||||
return 0, fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
|
||||
}
|
||||
return vmstorage.DeleteSeries(nil, []*storage.TagFilters{tfs})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user