mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
app/vmctl: add flag to handle Prometheus remote_write to InfluxDB (#2545)
Make it possible to migrate timeseries while restoring the original timeseries name previously written from Prometheus to InfluxDB v1 via remote_write. Fixes: https://github.com/VictoriaMetrics/vmctl/issues/8
This commit is contained in:
parent
e736de8e5c
commit
1580f751d6
@ -202,6 +202,7 @@ const (
|
||||
influxFilterTimeEnd = "influx-filter-time-end"
|
||||
influxMeasurementFieldSeparator = "influx-measurement-field-separator"
|
||||
influxSkipDatabaseLabel = "influx-skip-database-label"
|
||||
influxPrometheusMode = "influx-prometheus-mode"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -264,6 +265,11 @@ var (
|
||||
Usage: "Wether to skip adding the label 'db' to timeseries.",
|
||||
Value: false,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: influxPrometheusMode,
|
||||
Usage: "Wether to restore the original timeseries name previously written from Prometheus to InfluxDB v1 via remote_write.",
|
||||
Value: false,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -17,9 +17,10 @@ type influxProcessor struct {
|
||||
cc int
|
||||
separator string
|
||||
skipDbLabel bool
|
||||
promMode bool
|
||||
}
|
||||
|
||||
func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel bool) *influxProcessor {
|
||||
func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string, skipDbLabel bool, promMode bool) *influxProcessor {
|
||||
if cc < 1 {
|
||||
cc = 1
|
||||
}
|
||||
@ -29,6 +30,7 @@ func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator st
|
||||
cc: cc,
|
||||
separator: separator,
|
||||
skipDbLabel: skipDbLabel,
|
||||
promMode: promMode,
|
||||
}
|
||||
}
|
||||
|
||||
@ -101,6 +103,8 @@ func (ip *influxProcessor) run(silent, verbose bool) error {
|
||||
}
|
||||
|
||||
const dbLabel = "db"
|
||||
const nameLabel = "__name__"
|
||||
const valueField = "value"
|
||||
|
||||
func (ip *influxProcessor) do(s *influx.Series) error {
|
||||
cr, err := ip.ic.FetchDataPoints(s)
|
||||
@ -122,6 +126,8 @@ func (ip *influxProcessor) do(s *influx.Series) error {
|
||||
for i, lp := range s.LabelPairs {
|
||||
if lp.Name == dbLabel {
|
||||
containsDBLabel = true
|
||||
} else if lp.Name == nameLabel && s.Field == valueField && ip.promMode {
|
||||
name = lp.Value
|
||||
}
|
||||
labels[i] = vm.LabelPair{
|
||||
Name: lp.Name,
|
||||
|
@ -105,7 +105,8 @@ func main() {
|
||||
importer,
|
||||
c.Int(influxConcurrency),
|
||||
c.String(influxMeasurementFieldSeparator),
|
||||
c.Bool(influxSkipDatabaseLabel))
|
||||
c.Bool(influxSkipDatabaseLabel),
|
||||
c.Bool(influxPrometheusMode))
|
||||
return processor.run(c.Bool(globalSilent), c.Bool(globalVerbose))
|
||||
},
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user