mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-25 03:40:10 +01:00
8bab50dc29
app/vmctl: vm-native - split migration on per-metric basis `vm-native` mode now splits the migration process on per-metric basis. This allows to migrate metrics one-by-one according to the specified filter. This change allows to retry export/import requests for a specific metric and provides a better understanding of the migration progress. --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
120 lines
2.7 KiB
Go
120 lines
2.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
|
|
)
|
|
|
|
// If you want to run this test:
|
|
// 1. run two instances of victoriametrics and define -httpListenAddr for both or just for second instance
|
|
// 2. define srcAddr and dstAddr const with your victoriametrics addresses
|
|
// 3. define matchFilter const with your importing data
|
|
// 4. define timeStartFilter
|
|
// 5. run each test one by one
|
|
|
|
const (
|
|
matchFilter = `{job="avalanche"}`
|
|
timeStartFilter = "2020-01-01T20:07:00Z"
|
|
timeEndFilter = "2020-08-01T20:07:00Z"
|
|
srcAddr = "http://127.0.0.1:8428"
|
|
dstAddr = "http://127.0.0.1:8528"
|
|
)
|
|
|
|
// This test simulates close process if user abort it
|
|
func Test_vmNativeProcessor_run(t *testing.T) {
|
|
t.Skip()
|
|
type fields struct {
|
|
filter native.Filter
|
|
rateLimit int64
|
|
dst *native.Client
|
|
src *native.Client
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
fields fields
|
|
closer func(cancelFunc context.CancelFunc)
|
|
wantErr bool
|
|
}{
|
|
{
|
|
name: "simulate syscall.SIGINT",
|
|
fields: fields{
|
|
filter: native.Filter{
|
|
Match: matchFilter,
|
|
TimeStart: timeStartFilter,
|
|
},
|
|
rateLimit: 0,
|
|
dst: &native.Client{
|
|
Addr: dstAddr,
|
|
},
|
|
src: &native.Client{
|
|
Addr: srcAddr,
|
|
},
|
|
},
|
|
closer: func(cancelFunc context.CancelFunc) {
|
|
time.Sleep(time.Second * 5)
|
|
cancelFunc()
|
|
},
|
|
wantErr: true,
|
|
},
|
|
{
|
|
name: "simulate correct work",
|
|
fields: fields{
|
|
filter: native.Filter{
|
|
Match: matchFilter,
|
|
TimeStart: timeStartFilter,
|
|
},
|
|
rateLimit: 0,
|
|
dst: &native.Client{
|
|
Addr: dstAddr,
|
|
},
|
|
src: &native.Client{
|
|
Addr: srcAddr,
|
|
},
|
|
},
|
|
closer: func(cancelFunc context.CancelFunc) {},
|
|
wantErr: false,
|
|
},
|
|
{
|
|
name: "simulate correct work with chunking",
|
|
fields: fields{
|
|
filter: native.Filter{
|
|
Match: matchFilter,
|
|
TimeStart: timeStartFilter,
|
|
TimeEnd: timeEndFilter,
|
|
Chunk: stepper.StepMonth,
|
|
},
|
|
rateLimit: 0,
|
|
dst: &native.Client{
|
|
Addr: dstAddr,
|
|
},
|
|
src: &native.Client{
|
|
Addr: srcAddr,
|
|
},
|
|
},
|
|
closer: func(cancelFunc context.CancelFunc) {},
|
|
wantErr: false,
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
|
p := &vmNativeProcessor{
|
|
filter: tt.fields.filter,
|
|
rateLimit: tt.fields.rateLimit,
|
|
dst: tt.fields.dst,
|
|
src: tt.fields.src,
|
|
}
|
|
|
|
tt.closer(cancelFn)
|
|
|
|
if err := p.run(ctx, true); (err != nil) != tt.wantErr {
|
|
t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr)
|
|
}
|
|
})
|
|
}
|
|
}
|