mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
app/vmctl: fix performance degradation, add flag to disable backoff policy (#4097)
* app/vmctl: change api for getting metric names * app/vmctl: fix tests * app/vmctl: add flag to enable backoff policy, fix test, performance improvements * app/vmctl: use one http client * app/vmctl: made linter happy * app/vmctl: updated documentation and CHANGELOG.md * app/vmctl: cleanup * app/vmctl: rename flag * app/vmctl: cleanup * app/vmctl: fix comments * app/vmctl: fix metrics parser problem, improve tests
This commit is contained in:
parent
809fbaeaac
commit
2a5b9ff782
@ -738,7 +738,7 @@ or higher.
|
||||
See `./vmctl vm-native --help` for details and full list of flags.
|
||||
|
||||
Migration in `vm-native` mode takes two steps:
|
||||
1. Explore the list of the metrics to migrate via `/api/v1/series` API;
|
||||
1. Explore the list of the metrics to migrate via `api/v1/label/__name__/values` API;
|
||||
2. Migrate explored metrics one-by-one.
|
||||
|
||||
```
|
||||
@ -765,6 +765,57 @@ Requests to make: 9 / 9 [██████████████████
|
||||
requests retries: 0;
|
||||
2023/03/02 09:22:06 Total time: 3.633127625s
|
||||
```
|
||||
`vmctl` uses retries with backoff policy by default.
|
||||
|
||||
The benefits of this retry backoff policy include:
|
||||
1. Improved success rates:
|
||||
With each retry attempt, the migration process has a higher chance of success.
|
||||
By increasing the delay between retries, the system can avoid overwhelming the service with too many requests at once.
|
||||
|
||||
2. Reduced load on the system:
|
||||
By increasing the delay between retries, the system can reduce the load on the service by limiting the number of
|
||||
requests made in a short amount of time.
|
||||
3. Can help to migrate a big amount of data
|
||||
|
||||
However, there are also some potential penalties associated with using a backoff retry policy, including:
|
||||
1. Increased migration process latency:
|
||||
`vmctl` need to make additional call to the `api/v1/label/__name__/values` with defined `--vm-native-filter-match` flag,
|
||||
and after process all metric names with additional filters.
|
||||
|
||||
In case when retries with backoff policy is unneeded `--vm-native-disable-retries` command line flag can be used.
|
||||
When this flag is set to `true`, `vmctl` skips additional call to the `api/v1/label/__name__/values` API and starts
|
||||
migration process by making calls to the `/api/v1/export` and `api/v1/import`. If some errors happen `vmctl` immediately
|
||||
stops the migration process.
|
||||
|
||||
```
|
||||
./vmctl vm-native --vm-native-src-addr=http://127.0.0.1:8481/select/0/prometheus \
|
||||
--vm-native-dst-addr=http://127.0.0.1:8428 \
|
||||
--vm-native-filter-match='{__name__!=""}' \
|
||||
--vm-native-filter-time-start='2023-04-08T11:30:30Z' \
|
||||
--vm-native-disable-retries=true
|
||||
|
||||
VictoriaMetrics Native import mode
|
||||
|
||||
2023/04/11 10:17:14 Initing import process from "http://127.0.0.1:8481/select/0/prometheus/api/v1/export/native" to "http://localhost:8428/api/v1/import/native" with filter
|
||||
filter: match[]={__name__!=""}
|
||||
start: 2023-04-08T11:30:30Z
|
||||
. Continue? [Y/n]
|
||||
2023/04/11 10:17:15 Requests to make: 1
|
||||
2023/04/11 10:17:15 number of workers decreased to 1, because vmctl calculated requests to make 1
|
||||
Total: 0 ↙ Speed: ? p/s Continue import process with filter
|
||||
filter: match[]={__name__!=""}
|
||||
start: 2023-04-08T11:30:30Z
|
||||
end: 2023-04-11T07:17:14Z:
|
||||
Total: 1.64 GiB ↖ Speed: 11.20 MiB p/s
|
||||
2023/04/11 10:19:45 Import finished!
|
||||
2023/04/11 10:19:45 VictoriaMetrics importer stats:
|
||||
time spent while importing: 2m30.813841541s;
|
||||
total bytes: 1.8 GB;
|
||||
bytes/s: 11.7 MB;
|
||||
requests: 1;
|
||||
requests retries: 0;
|
||||
2023/04/11 10:19:45 Total time: 2m30.814721125s
|
||||
```
|
||||
|
||||
Importing tips:
|
||||
|
||||
|
@ -326,6 +326,7 @@ const (
|
||||
vmNativeStepInterval = "vm-native-step-interval"
|
||||
|
||||
vmNativeDisableHTTPKeepAlive = "vm-native-disable-http-keep-alive"
|
||||
vmNativeDisableRetries = "vm-native-disable-retries"
|
||||
|
||||
vmNativeSrcAddr = "vm-native-src-addr"
|
||||
vmNativeSrcUser = "vm-native-src-user"
|
||||
@ -443,6 +444,11 @@ var (
|
||||
Usage: "Number of workers concurrently performing import requests to VM",
|
||||
Value: 2,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: vmNativeDisableRetries,
|
||||
Usage: "Defines whether to disable retries with backoff policy for migration process",
|
||||
Value: false,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
@ -201,6 +202,8 @@ func main() {
|
||||
return fmt.Errorf("flag %q can't be empty", vmNativeFilterMatch)
|
||||
}
|
||||
|
||||
disableKeepAlive := c.Bool(vmNativeDisableHTTPKeepAlive)
|
||||
|
||||
var srcExtraLabels []string
|
||||
srcAddr := strings.Trim(c.String(vmNativeSrcAddr), "/")
|
||||
srcAuthConfig, err := auth.Generate(
|
||||
@ -210,6 +213,7 @@ func main() {
|
||||
if err != nil {
|
||||
return fmt.Errorf("error initilize auth config for source: %s", srcAddr)
|
||||
}
|
||||
srcHTTPClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: disableKeepAlive}}
|
||||
|
||||
dstAddr := strings.Trim(c.String(vmNativeDstAddr), "/")
|
||||
dstExtraLabels := c.StringSlice(vmExtraLabel)
|
||||
@ -220,6 +224,7 @@ func main() {
|
||||
if err != nil {
|
||||
return fmt.Errorf("error initilize auth config for destination: %s", dstAddr)
|
||||
}
|
||||
dstHTTPClient := &http.Client{Transport: &http.Transport{DisableKeepAlives: disableKeepAlive}}
|
||||
|
||||
p := vmNativeProcessor{
|
||||
rateLimit: c.Int64(vmRateLimit),
|
||||
@ -231,19 +236,20 @@ func main() {
|
||||
Chunk: c.String(vmNativeStepInterval),
|
||||
},
|
||||
src: &native.Client{
|
||||
AuthCfg: srcAuthConfig,
|
||||
Addr: srcAddr,
|
||||
ExtraLabels: srcExtraLabels,
|
||||
DisableHTTPKeepAlive: c.Bool(vmNativeDisableHTTPKeepAlive),
|
||||
AuthCfg: srcAuthConfig,
|
||||
Addr: srcAddr,
|
||||
ExtraLabels: srcExtraLabels,
|
||||
HTTPClient: srcHTTPClient,
|
||||
},
|
||||
dst: &native.Client{
|
||||
AuthCfg: dstAuthConfig,
|
||||
Addr: dstAddr,
|
||||
ExtraLabels: dstExtraLabels,
|
||||
DisableHTTPKeepAlive: c.Bool(vmNativeDisableHTTPKeepAlive),
|
||||
AuthCfg: dstAuthConfig,
|
||||
Addr: dstAddr,
|
||||
ExtraLabels: dstExtraLabels,
|
||||
HTTPClient: dstHTTPClient,
|
||||
},
|
||||
backoff: backoff.New(),
|
||||
cc: c.Int(vmConcurrency),
|
||||
backoff: backoff.New(),
|
||||
cc: c.Int(vmConcurrency),
|
||||
disableRetries: c.Bool(vmNativeDisableRetries),
|
||||
}
|
||||
return p.run(ctx, isNonInteractive(c))
|
||||
},
|
||||
|
@ -11,34 +11,33 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
nativeTenantsAddr = "admin/tenants"
|
||||
nativeSeriesAddr = "api/v1/series"
|
||||
nameLabel = "__name__"
|
||||
nativeTenantsAddr = "admin/tenants"
|
||||
nativeMetricNamesAddr = "api/v1/label/__name__/values"
|
||||
)
|
||||
|
||||
// Client is an HTTP client for exporting and importing
|
||||
// time series via native protocol.
|
||||
type Client struct {
|
||||
AuthCfg *auth.Config
|
||||
Addr string
|
||||
ExtraLabels []string
|
||||
DisableHTTPKeepAlive bool
|
||||
AuthCfg *auth.Config
|
||||
Addr string
|
||||
ExtraLabels []string
|
||||
HTTPClient *http.Client
|
||||
}
|
||||
|
||||
// LabelValues represents series from api/v1/series response
|
||||
type LabelValues map[string]string
|
||||
|
||||
// Response represents response from api/v1/series
|
||||
// Response represents response from api/v1/label/__name__/values
|
||||
type Response struct {
|
||||
Status string `json:"status"`
|
||||
Series []LabelValues `json:"data"`
|
||||
Status string `json:"status"`
|
||||
MetricNames []string `json:"data"`
|
||||
}
|
||||
|
||||
// Explore finds series by provided filter from api/v1/series
|
||||
func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) (map[string]struct{}, error) {
|
||||
url := fmt.Sprintf("%s/%s", c.Addr, nativeSeriesAddr)
|
||||
// Explore finds metric names by provided filter from api/v1/label/__name__/values
|
||||
func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) ([]string, error) {
|
||||
url := fmt.Sprintf("%s/%s", c.Addr, nativeMetricNamesAddr)
|
||||
if tenantID != "" {
|
||||
url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeSeriesAddr)
|
||||
url = fmt.Sprintf("%s/select/%s/prometheus/%s", c.Addr, tenantID, nativeMetricNamesAddr)
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
@ -68,21 +67,7 @@ func (c *Client) Explore(ctx context.Context, f Filter, tenantID string) (map[st
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
return nil, fmt.Errorf("cannot close series response body: %s", err)
|
||||
}
|
||||
names := make(map[string]struct{})
|
||||
for _, series := range response.Series {
|
||||
// TODO: consider tweaking /api/v1/series API to return metric names only
|
||||
// this could make explore response much lighter.
|
||||
for key, value := range series {
|
||||
if key != nameLabel {
|
||||
continue
|
||||
}
|
||||
if _, ok := names[value]; ok {
|
||||
continue
|
||||
}
|
||||
names[value] = struct{}{}
|
||||
}
|
||||
}
|
||||
return names, nil
|
||||
return response.MetricNames, nil
|
||||
}
|
||||
|
||||
// ImportPipe uses pipe reader in request to process data
|
||||
@ -169,8 +154,8 @@ func (c *Client) do(req *http.Request, expSC int) (*http.Response, error) {
|
||||
if c.AuthCfg != nil {
|
||||
c.AuthCfg.SetHeaders(req, true)
|
||||
}
|
||||
var httpClient = &http.Client{Transport: &http.Transport{DisableKeepAlives: c.DisableHTTPKeepAlive}}
|
||||
resp, err := httpClient.Do(req)
|
||||
|
||||
resp, err := c.HTTPClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unexpected error when performing request: %w", err)
|
||||
}
|
||||
|
@ -29,6 +29,11 @@ type Response struct {
|
||||
Series []LabelValues `json:"data"`
|
||||
}
|
||||
|
||||
type MetricNamesResponse struct {
|
||||
Status string `json:"status"`
|
||||
Data []string `json:"data"`
|
||||
}
|
||||
|
||||
// RemoteWriteServer represents fake remote write server with database
|
||||
type RemoteWriteServer struct {
|
||||
server *httptest.Server
|
||||
@ -44,6 +49,7 @@ func NewRemoteWriteServer(t *testing.T) *RemoteWriteServer {
|
||||
mux.Handle("/api/v1/import", rws.getWriteHandler(t))
|
||||
mux.Handle("/health", rws.handlePing())
|
||||
mux.Handle("/api/v1/series", rws.seriesHandler())
|
||||
mux.Handle("/api/v1/label/__name__/values", rws.valuesHandler())
|
||||
mux.Handle("/api/v1/export/native", rws.exportNativeHandler())
|
||||
mux.Handle("/api/v1/import/native", rws.importNativeHandler(t))
|
||||
rws.server = httptest.NewServer(mux)
|
||||
@ -145,6 +151,36 @@ func (rws *RemoteWriteServer) seriesHandler() http.Handler {
|
||||
})
|
||||
}
|
||||
|
||||
func (rws *RemoteWriteServer) valuesHandler() http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
labelNames := make(map[string]struct{})
|
||||
for _, ser := range rws.series {
|
||||
if ser.Name != "" {
|
||||
labelNames[ser.Name] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
metricNames := make([]string, 0, len(labelNames))
|
||||
for k := range labelNames {
|
||||
metricNames = append(metricNames, k)
|
||||
}
|
||||
resp := MetricNamesResponse{
|
||||
Status: "success",
|
||||
Data: metricNames,
|
||||
}
|
||||
|
||||
err := json.NewEncoder(w).Encode(resp)
|
||||
if err != nil {
|
||||
log.Printf("error send series: %s", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
func (rws *RemoteWriteServer) exportNativeHandler() http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
now := time.Now()
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -13,8 +14,8 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/cheggaaa/pb/v3"
|
||||
)
|
||||
|
||||
@ -25,16 +26,18 @@ type vmNativeProcessor struct {
|
||||
src *native.Client
|
||||
backoff *backoff.Backoff
|
||||
|
||||
s *stats
|
||||
rateLimit int64
|
||||
interCluster bool
|
||||
cc int
|
||||
s *stats
|
||||
rateLimit int64
|
||||
interCluster bool
|
||||
cc int
|
||||
disableRetries bool
|
||||
}
|
||||
|
||||
const (
|
||||
nativeExportAddr = "api/v1/export/native"
|
||||
nativeImportAddr = "api/v1/import/native"
|
||||
nativeBarTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}`
|
||||
nativeExportAddr = "api/v1/export/native"
|
||||
nativeImportAddr = "api/v1/import/native"
|
||||
nativeWithBackoffTpl = `{{ blue "%s:" }} {{ counters . }} {{ bar . "[" "█" (cycle . "█") "▒" "]" }} {{ percent . }}`
|
||||
nativeSingleProcessTpl = `Total: {{counters . }} {{ cycle . "↖" "↗" "↘" "↙" }} Speed: {{speed . }} {{string . "suffix"}}`
|
||||
)
|
||||
|
||||
func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error {
|
||||
@ -94,9 +97,9 @@ func (p *vmNativeProcessor) run(ctx context.Context, silent bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string) error {
|
||||
func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
|
||||
|
||||
retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL) }
|
||||
retryableFunc := func() error { return p.runSingle(ctx, f, srcURL, dstURL, bar) }
|
||||
attempts, err := p.backoff.Retry(ctx, retryableFunc)
|
||||
p.s.Lock()
|
||||
p.s.retries += attempts
|
||||
@ -108,13 +111,18 @@ func (p *vmNativeProcessor) do(ctx context.Context, f native.Filter, srcURL, dst
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string) error {
|
||||
func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcURL, dstURL string, bar *pb.ProgressBar) error {
|
||||
|
||||
exportReader, err := p.src.ExportPipe(ctx, srcURL, f)
|
||||
reader, err := p.src.ExportPipe(ctx, srcURL, f)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to init export pipe: %w", err)
|
||||
}
|
||||
|
||||
if p.disableRetries && bar != nil {
|
||||
fmt.Printf("Continue import process with filter %s:\n", f.String())
|
||||
reader = bar.NewProxyReader(reader)
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
@ -131,7 +139,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
|
||||
w = limiter.NewWriteLimiter(pw, rl)
|
||||
}
|
||||
|
||||
written, err := io.Copy(w, exportReader)
|
||||
written, err := io.Copy(w, reader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write into %q: %s", p.dst.Addr, err)
|
||||
}
|
||||
@ -176,17 +184,22 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
|
||||
fmt.Println("") // extra line for better output formatting
|
||||
log.Printf(initMessage, initParams...)
|
||||
|
||||
log.Printf("Exploring metrics...")
|
||||
metrics, err := p.src.Explore(ctx, p.filter, tenantID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err)
|
||||
var foundSeriesMsg string
|
||||
|
||||
metrics := []string{p.filter.Match}
|
||||
if !p.disableRetries {
|
||||
log.Printf("Exploring metrics...")
|
||||
metrics, err = p.src.Explore(ctx, p.filter, tenantID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get metrics from source %s: %w", p.src.Addr, err)
|
||||
}
|
||||
|
||||
if len(metrics) == 0 {
|
||||
return fmt.Errorf("no metrics found")
|
||||
}
|
||||
foundSeriesMsg = fmt.Sprintf("Found %d metrics to import", len(metrics))
|
||||
}
|
||||
|
||||
if len(metrics) == 0 {
|
||||
return fmt.Errorf("no metrics found")
|
||||
}
|
||||
|
||||
foundSeriesMsg := fmt.Sprintf("Found %d metrics to import", len(metrics))
|
||||
if !p.interCluster {
|
||||
// do not prompt for intercluster because there could be many tenants,
|
||||
// and we don't want to interrupt the process when moving to the next tenant.
|
||||
@ -206,7 +219,10 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
|
||||
|
||||
var bar *pb.ProgressBar
|
||||
if !silent {
|
||||
bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeBarTpl, barPrefix)).New(len(metrics) * len(ranges))
|
||||
bar = pb.ProgressBarTemplate(fmt.Sprintf(nativeWithBackoffTpl, barPrefix)).New(len(metrics) * len(ranges))
|
||||
if p.disableRetries {
|
||||
bar = pb.ProgressBarTemplate(nativeSingleProcessTpl).New(0)
|
||||
}
|
||||
bar.Start()
|
||||
defer bar.Finish()
|
||||
}
|
||||
@ -220,19 +236,26 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for f := range filterCh {
|
||||
if err := p.do(ctx, f, srcURL, dstURL); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
if bar != nil {
|
||||
bar.Increment()
|
||||
if !p.disableRetries {
|
||||
if err := p.do(ctx, f, srcURL, dstURL, nil); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
if bar != nil {
|
||||
bar.Increment()
|
||||
}
|
||||
} else {
|
||||
if err := p.runSingle(ctx, f, srcURL, dstURL, bar); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// any error breaks the import
|
||||
for s := range metrics {
|
||||
for _, s := range metrics {
|
||||
|
||||
match, err := buildMatchWithFilter(p.filter.Match, s)
|
||||
if err != nil {
|
||||
@ -313,11 +336,26 @@ func byteCountSI(b int64) string {
|
||||
}
|
||||
|
||||
func buildMatchWithFilter(filter string, metricName string) (string, error) {
|
||||
labels, err := promutils.NewLabelsFromString(filter)
|
||||
if filter == metricName {
|
||||
return filter, nil
|
||||
}
|
||||
|
||||
labels, err := searchutils.ParseMetricSelector(filter)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
labels.Set("__name__", metricName)
|
||||
|
||||
return labels.String(), nil
|
||||
str := make([]string, 0, len(labels))
|
||||
for _, label := range labels {
|
||||
if len(label.Key) == 0 {
|
||||
continue
|
||||
}
|
||||
str = append(str, label.String())
|
||||
}
|
||||
|
||||
nameFilter := fmt.Sprintf("__name__=%q", metricName)
|
||||
str = append(str, nameFilter)
|
||||
|
||||
match := fmt.Sprintf("{%s}", strings.Join(str, ","))
|
||||
return match, nil
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
@ -191,7 +192,7 @@ func Test_vmNativeProcessor_run(t *testing.T) {
|
||||
t.Fatalf("Error parse end time: %s", err)
|
||||
}
|
||||
|
||||
tt.fields.filter.Match = fmt.Sprintf("%s=%q", tt.fields.matchName, tt.fields.matchValue)
|
||||
tt.fields.filter.Match = fmt.Sprintf("{%s=~%q}", tt.fields.matchName, tt.fields.matchValue)
|
||||
tt.fields.filter.TimeStart = tt.start
|
||||
tt.fields.filter.TimeEnd = tt.end
|
||||
|
||||
@ -205,16 +206,16 @@ func Test_vmNativeProcessor_run(t *testing.T) {
|
||||
}
|
||||
|
||||
tt.fields.src = &native.Client{
|
||||
AuthCfg: nil,
|
||||
Addr: src.URL(),
|
||||
ExtraLabels: []string{},
|
||||
DisableHTTPKeepAlive: false,
|
||||
AuthCfg: nil,
|
||||
Addr: src.URL(),
|
||||
ExtraLabels: []string{},
|
||||
HTTPClient: &http.Client{Transport: &http.Transport{DisableKeepAlives: false}},
|
||||
}
|
||||
tt.fields.dst = &native.Client{
|
||||
AuthCfg: nil,
|
||||
Addr: dst.URL(),
|
||||
ExtraLabels: []string{},
|
||||
DisableHTTPKeepAlive: false,
|
||||
AuthCfg: nil,
|
||||
Addr: dst.URL(),
|
||||
ExtraLabels: []string{},
|
||||
HTTPClient: &http.Client{Transport: &http.Transport{DisableKeepAlives: false}},
|
||||
}
|
||||
|
||||
p := &vmNativeProcessor{
|
||||
@ -307,44 +308,72 @@ func Test_buildMatchWithFilter(t *testing.T) {
|
||||
name: "parsed metric with label",
|
||||
filter: `{__name__="http_request_count_total",cluster="kube1"}`,
|
||||
metricName: "http_request_count_total",
|
||||
want: `{__name__="http_request_count_total",cluster="kube1"}`,
|
||||
want: `{cluster="kube1",__name__="http_request_count_total"}`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "metric name with label",
|
||||
filter: `http_request_count_total{cluster="kube1"}`,
|
||||
metricName: "http_request_count_total",
|
||||
want: `{__name__="http_request_count_total",cluster="kube1"}`,
|
||||
want: `{cluster="kube1",__name__="http_request_count_total"}`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "parsed metric with regexp value",
|
||||
filter: `{__name__="http_request_count_total",cluster~="kube.*"}`,
|
||||
filter: `{__name__="http_request_count_total",cluster=~"kube.*"}`,
|
||||
metricName: "http_request_count_total",
|
||||
want: `{__name__="http_request_count_total",cluster~="kube.*"}`,
|
||||
want: `{cluster=~"kube.*",__name__="http_request_count_total"}`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "only label with regexp",
|
||||
filter: `{cluster~=".*"}`,
|
||||
filter: `{cluster=~".*"}`,
|
||||
metricName: "http_request_count_total",
|
||||
want: `{cluster~=".*",__name__="http_request_count_total"}`,
|
||||
want: `{cluster=~".*",__name__="http_request_count_total"}`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "many labels in filter with regexp",
|
||||
filter: `{cluster~=".*",job!=""}`,
|
||||
filter: `{cluster=~".*",job!=""}`,
|
||||
metricName: "http_request_count_total",
|
||||
want: `{cluster~=".*",job!="",__name__="http_request_count_total"}`,
|
||||
want: `{cluster=~".*",job!="",__name__="http_request_count_total"}`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "match with error",
|
||||
filter: `{cluster=~".*"}`,
|
||||
filter: `{cluster~=".*"}`,
|
||||
metricName: "http_request_count_total",
|
||||
want: ``,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "all names",
|
||||
filter: `{__name__!=""}`,
|
||||
metricName: "http_request_count_total",
|
||||
want: `{__name__="http_request_count_total"}`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "with many underscores labels",
|
||||
filter: `{__name__!="", __meta__!=""}`,
|
||||
metricName: "http_request_count_total",
|
||||
want: `{__meta__!="",__name__="http_request_count_total"}`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "metric name has regexp",
|
||||
filter: `{__name__=~".*"}`,
|
||||
metricName: "http_request_count_total",
|
||||
want: `{__name__="http_request_count_total"}`,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "metric name has negative regexp",
|
||||
filter: `{__name__!~".*"}`,
|
||||
metricName: "http_request_count_total",
|
||||
want: `{__name__="http_request_count_total"}`,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
@ -13,6 +13,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||
* [How to build vmauth](https://docs.victoriametrics.com/vmauth.html#how-to-build-from-sources)
|
||||
* [How to build vmctl](https://docs.victoriametrics.com/vmctl.html#how-to-build)
|
||||
|
||||
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix performance issue when using `vmctl vm-native`. Added flag to disable backoff policy `--vm-native-disable-retries`. Changed API call to explore the list of the metric names from `/api/v1/series` to `api/v1/label/__name__/values`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4092).
|
||||
|
||||
## tip
|
||||
|
||||
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): store backup creation and completion time in `backup_complete.ignore` file of backup contents. This is useful to determine point in time when backup was created and completed.
|
||||
|
@ -742,7 +742,7 @@ or higher.
|
||||
See `./vmctl vm-native --help` for details and full list of flags.
|
||||
|
||||
Migration in `vm-native` mode takes two steps:
|
||||
1. Explore the list of the metrics to migrate via `/api/v1/series` API;
|
||||
1. Explore the list of the metrics to migrate via `api/v1/label/__name__/values` API;
|
||||
2. Migrate explored metrics one-by-one.
|
||||
|
||||
```
|
||||
@ -770,6 +770,59 @@ Requests to make: 9 / 9 [██████████████████
|
||||
2023/03/02 09:22:06 Total time: 3.633127625s
|
||||
```
|
||||
|
||||
|
||||
`vmctl` uses retries with backoff policy by default.
|
||||
|
||||
The benefits of this retry backoff policy include:
|
||||
1. Improved success rates:
|
||||
With each retry attempt, the migration process has a higher chance of success.
|
||||
By increasing the delay between retries, the system can avoid overwhelming the service with too many requests at once.
|
||||
|
||||
2. Reduced load on the system:
|
||||
By increasing the delay between retries, the system can reduce the load on the service by limiting the number of
|
||||
requests made in a short amount of time.
|
||||
3. Can help to migrate a big amount of data
|
||||
|
||||
However, there are also some potential penalties associated with using a backoff retry policy, including:
|
||||
1. Increased migration process latency:
|
||||
`vmctl` need to make additional call to the `api/v1/label/__name__/values` with defined `--vm-native-filter-match` flag,
|
||||
and after process all metric names with additional filters.
|
||||
|
||||
In case when retries with backoff policy is unneeded `--vm-native-disable-retries` command line flag can be used.
|
||||
When this flag is set to `true`, `vmctl` skips additional call to the `api/v1/label/__name__/values` API and starts
|
||||
migration process by making calls to the `/api/v1/export` and `api/v1/import`. If some errors happen `vmctl` immediately
|
||||
stops the migration process.
|
||||
|
||||
```
|
||||
./vmctl vm-native --vm-native-src-addr=http://127.0.0.1:8481/select/0/prometheus \
|
||||
--vm-native-dst-addr=http://127.0.0.1:8428 \
|
||||
--vm-native-filter-match='{__name__!=""}' \
|
||||
--vm-native-filter-time-start='2023-04-08T11:30:30Z' \
|
||||
--vm-native-disable-retries=true
|
||||
|
||||
VictoriaMetrics Native import mode
|
||||
|
||||
2023/04/11 10:17:14 Initing import process from "http://127.0.0.1:8481/select/0/prometheus/api/v1/export/native" to "http://localhost:8428/api/v1/import/native" with filter
|
||||
filter: match[]={__name__!=""}
|
||||
start: 2023-04-08T11:30:30Z
|
||||
. Continue? [Y/n]
|
||||
2023/04/11 10:17:15 Requests to make: 1
|
||||
2023/04/11 10:17:15 number of workers decreased to 1, because vmctl calculated requests to make 1
|
||||
Total: 0 ↙ Speed: ? p/s Continue import process with filter
|
||||
filter: match[]={__name__!=""}
|
||||
start: 2023-04-08T11:30:30Z
|
||||
end: 2023-04-11T07:17:14Z:
|
||||
Total: 1.64 GiB ↖ Speed: 11.20 MiB p/s
|
||||
2023/04/11 10:19:45 Import finished!
|
||||
2023/04/11 10:19:45 VictoriaMetrics importer stats:
|
||||
time spent while importing: 2m30.813841541s;
|
||||
total bytes: 1.8 GB;
|
||||
bytes/s: 11.7 MB;
|
||||
requests: 1;
|
||||
requests retries: 0;
|
||||
2023/04/11 10:19:45 Total time: 2m30.814721125s
|
||||
```
|
||||
|
||||
Importing tips:
|
||||
|
||||
1. Migrating big volumes of data may result in reaching the safety limits on `src` side.
|
||||
|
Loading…
Reference in New Issue
Block a user