mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 20:22:48 +01:00
147 lines
2.9 KiB
Go
147 lines
2.9 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
|
"github.com/cheggaaa/pb/v3"
|
|
)
|
|
|
|
type influxProcessor struct {
|
|
ic *influx.Client
|
|
im *vm.Importer
|
|
cc int
|
|
separator string
|
|
}
|
|
|
|
func newInfluxProcessor(ic *influx.Client, im *vm.Importer, cc int, separator string) *influxProcessor {
|
|
if cc < 1 {
|
|
cc = 1
|
|
}
|
|
return &influxProcessor{
|
|
ic: ic,
|
|
im: im,
|
|
cc: cc,
|
|
separator: separator,
|
|
}
|
|
}
|
|
|
|
func (ip *influxProcessor) run(silent bool) error {
|
|
series, err := ip.ic.Explore()
|
|
if err != nil {
|
|
return fmt.Errorf("explore query failed: %s", err)
|
|
}
|
|
if len(series) < 1 {
|
|
return fmt.Errorf("found no timeseries to import")
|
|
}
|
|
|
|
question := fmt.Sprintf("Found %d timeseries to import. Continue?", len(series))
|
|
if !silent && !prompt(question) {
|
|
return nil
|
|
}
|
|
|
|
bar := pb.StartNew(len(series))
|
|
seriesCh := make(chan *influx.Series)
|
|
errCh := make(chan error)
|
|
ip.im.ResetStats()
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(ip.cc)
|
|
for i := 0; i < ip.cc; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for s := range seriesCh {
|
|
if err := ip.do(s); err != nil {
|
|
errCh <- fmt.Errorf("request failed for %q.%q: %s", s.Measurement, s.Field, err)
|
|
return
|
|
}
|
|
bar.Increment()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// any error breaks the import
|
|
for _, s := range series {
|
|
select {
|
|
case infErr := <-errCh:
|
|
return fmt.Errorf("influx error: %s", infErr)
|
|
case vmErr := <-ip.im.Errors():
|
|
return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
|
|
case seriesCh <- s:
|
|
}
|
|
}
|
|
|
|
close(seriesCh)
|
|
wg.Wait()
|
|
ip.im.Close()
|
|
// drain import errors channel
|
|
for vmErr := range ip.im.Errors() {
|
|
return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr))
|
|
}
|
|
bar.Finish()
|
|
log.Println("Import finished!")
|
|
log.Print(ip.im.Stats())
|
|
return nil
|
|
}
|
|
|
|
const dbLabel = "db"
|
|
|
|
func (ip *influxProcessor) do(s *influx.Series) error {
|
|
cr, err := ip.ic.FetchDataPoints(s)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch datapoints: %s", err)
|
|
}
|
|
defer func() {
|
|
_ = cr.Close()
|
|
}()
|
|
var name string
|
|
if s.Measurement != "" {
|
|
name = fmt.Sprintf("%s%s%s", s.Measurement, ip.separator, s.Field)
|
|
} else {
|
|
name = s.Field
|
|
}
|
|
|
|
labels := make([]vm.LabelPair, len(s.LabelPairs))
|
|
var containsDBLabel bool
|
|
for i, lp := range s.LabelPairs {
|
|
if lp.Name == dbLabel {
|
|
containsDBLabel = true
|
|
break
|
|
}
|
|
labels[i] = vm.LabelPair{
|
|
Name: lp.Name,
|
|
Value: lp.Value,
|
|
}
|
|
}
|
|
if !containsDBLabel {
|
|
labels = append(labels, vm.LabelPair{
|
|
Name: dbLabel,
|
|
Value: ip.ic.Database(),
|
|
})
|
|
}
|
|
|
|
for {
|
|
time, values, err := cr.Next()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
// skip empty results
|
|
if len(time) < 1 {
|
|
continue
|
|
}
|
|
ip.im.Input() <- &vm.TimeSeries{
|
|
Name: name,
|
|
LabelPairs: labels,
|
|
Timestamps: time,
|
|
Values: values,
|
|
}
|
|
}
|
|
}
|