From 97fafce02899d637d100f1a71962814044c39baa Mon Sep 17 00:00:00 2001 From: John Seekins Date: Thu, 8 Apr 2021 13:58:06 -0600 Subject: [PATCH] OpenTSDB migration to VictoriaMetrics (#1089) --- app/vmctl/README.md | 56 +++ app/vmctl/flags.go | 72 ++++ app/vmctl/main.go | 33 ++ app/vmctl/opentsdb.go | 159 +++++++ app/vmctl/opentsdb/opentsdb.go | 347 +++++++++++++++ app/vmctl/opentsdb/parser.go | 173 ++++++++ app/vmctl/opentsdb/parser_test.go | 217 ++++++++++ .../opentsdb/testdata/exampleOutput.json | 398 ++++++++++++++++++ app/vmctl/opentsdb/testdata/exampleQuery.json | 62 +++ 9 files changed, 1517 insertions(+) create mode 100644 app/vmctl/opentsdb.go create mode 100644 app/vmctl/opentsdb/opentsdb.go create mode 100644 app/vmctl/opentsdb/parser.go create mode 100644 app/vmctl/opentsdb/parser_test.go create mode 100644 app/vmctl/opentsdb/testdata/exampleOutput.json create mode 100644 app/vmctl/opentsdb/testdata/exampleQuery.json diff --git a/app/vmctl/README.md b/app/vmctl/README.md index f26acba854..b9c6dc0706 100644 --- a/app/vmctl/README.md +++ b/app/vmctl/README.md @@ -7,8 +7,37 @@ Features: - [x] Thanos: migrate data from Thanos to VictoriaMetrics - [ ] ~~Prometheus: migrate data from Prometheus to VictoriaMetrics by query~~(discarded) - [x] InfluxDB: migrate data from InfluxDB to VictoriaMetrics +- [x] OpenTSDB: migrate data from OpenTSDB to VictoriaMetrics - [ ] Storage Management: data re-balancing between nodes +# Table of contents + +* [Articles](#articles) +* [How to build](#how-to-build) +* [Migrating data from OpenTSDB](#migrating-data-from-opentsdb) +* [Migrating data from InfluxDB 1.x](#migrating-data-from-influxdb-1x) + * [Data mapping](#data-mapping) + * [Configuration](#configuration) + * [Filtering](#filtering) +* [Migrating data from InfluxDB 2.x](#migrating-data-from-influxdb-2x) +* [Migrating data from Prometheus](#migrating-data-from-prometheus) + * [Data mapping](#data-mapping-1) + * [Configuration](#configuration-1) + * [Filtering](#filtering-1) +* [Migrating data from Thanos](#migrating-data-from-thanos) + * [Current data](#current-data) + * [Historical data](#historical-data) +* [Migrating data from VictoriaMetrics](#migrating-data-from-victoriametrics) + * [Native protocol](#native-protocol) +* [Tuning](#tuning) + * [Influx mode](#influx-mode) + * [Prometheus mode](#prometheus-mode) + * [VictoriaMetrics importer](#victoriametrics-importer) + * [Importer stats](#importer-stats) +* [Significant figures](#significant-figures) +* [Adding extra labels](#adding-extra-labels) + + ## Articles * [How to migrate data from Prometheus](https://medium.com/@romanhavronenko/victoriametrics-how-to-migrate-data-from-prometheus-d44a6728f043) @@ -60,6 +89,33 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b 2. Run `make vmctl-arm-prod` or `make vmctl-arm64-prod` from the root folder of [the repository](https://github.com/VictoriaMetrics/VictoriaMetrics). It builds `vmctl-arm-prod` or `vmctl-arm64-prod` binary respectively and puts it into the `bin` folder. +## Migrating data from OpenTSDB + +`vmctl` supports the `opentsdb` mode to migrate data from OpenTSDB to VictoriaMetrics time-series database. + +See `./vmctl opentsdb --help` for details and full list of flags. + +*OpenTSDB migration is not possible without a functioning [meta](http://opentsdb.net/docs/build/html/user_guide/metadata.html) table to search for metrics/series.* + +OpenTSDB migration works like so: + +1. Find metrics based on selected filters (or the default filter set ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']) + * e.g. `curl -Ss "http://opentsdb:4242/api/suggest?type=metrics&q=sys"` +2. Find series associated with each returned metric + * e.g. `curl -Ss "http://opentsdb:4242/api/search/lookup?m=system.load5&limit=1000000"` +3. Download data for each series in chunks defined in the CLI switches + * e.g. `-retention=sum-1m-avg:1h:90d` == + * `curl -Ss "http://opentsdb:4242/api/query?start=1h-ago&end=now&m=sum:1m-avg-none:system.load5\{host=host1\}"` + * `curl -Ss "http://opentsdb:4242/api/query?start=2h-ago&end=1h-ago&m=sum:1m-avg-none:system.load5\{host=host1\}"` + * `curl -Ss "http://opentsdb:4242/api/query?start=3h-ago&end=2h-ago&m=sum:1m-avg-none:system.load5\{host=host1\}"` + * ... + * `curl -Ss "http://opentsdb:4242/api/query?start=2160h-ago&end=2159h-ago&m=sum:1m-avg-none:system.load5\{host=host1\}"` + +This means that we must stream data from OpenTSDB to VictoriaMetrics in chunks. This is where concurrency for OpenTSDB comes in. We can query multiple chunks at once, but we shouldn't perform too many chunks at a time to avoid overloading the OpenTSDB cluster. + +### Restarting OpenTSDB migrations + +One important note for OpenTSDB migration: Queries/HBase scans can "get stuck" within OpenTSDB itself. This can cause instability and performance issues within an OpenTSDB cluster, so stopping the migrator to deal with it may be necessary. Because of this, we provide the timstamp we started collecting data from at thebeginning of the run. You can stop and restart the importer using this "hard timestamp" to ensure you collect data from the same time range over multiple runs. ## Migrating data from InfluxDB (1.x) diff --git a/app/vmctl/flags.go b/app/vmctl/flags.go index 947135506b..bf32c708f3 100644 --- a/app/vmctl/flags.go +++ b/app/vmctl/flags.go @@ -96,6 +96,78 @@ var ( } ) +const ( + otsdbAddr = "otsdb-addr" + otsdbConcurrency = "otsdb-concurrency" + otsdbQueryLimit = "otsdb-query-limit" + otsdbOffsetDays = "otsdb-offset-days" + otsdbHardTSStart = "otsdb-hard-ts-start" + otsdbRetentions = "otsdb-retentions" + otsdbFilters = "otsdb-filters" + otsdbNormalize = "otsdb-normalize" + otsdbMsecsTime = "otsdb-msecstime" +) + +var ( + otsdbFlags = []cli.Flag{ + &cli.StringFlag{ + Name: otsdbAddr, + Value: "http://localhost:4242", + Required: true, + Usage: "OpenTSDB server addr", + }, + &cli.IntFlag{ + Name: otsdbConcurrency, + Usage: "Number of concurrently running fetch queries to OpenTSDB per metric", + Value: 1, + }, + &cli.StringSliceFlag{ + Name: otsdbRetentions, + Value: nil, + Required: true, + Usage: "Retentions patterns to collect on. Each pattern should describe the aggregation performed " + + "for the query, the row size (in HBase) that will define how long each individual query is, " + + "and the time range to query for. e.g. sum-1m-avg:1h:3d. " + + "The first time range defined should be a multiple of the row size in HBase. " + + "e.g. if the row size is 2 hours, 4h is good, 5h less so. We want each query to land on unique rows.", + }, + &cli.StringSliceFlag{ + Name: otsdbFilters, + Value: cli.NewStringSlice("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"), + Usage: "Filters to process for discovering metrics in OpenTSDB", + }, + &cli.Int64Flag{ + Name: otsdbOffsetDays, + Usage: "Days to offset our 'starting' point for collecting data from OpenTSDB", + Value: 0, + }, + &cli.Int64Flag{ + Name: otsdbHardTSStart, + Usage: "A specific timestamp to start from, will override using an offset", + Value: 0, + }, + /* + because the defaults are set *extremely* low in OpenTSDB (10-25 results), we will + set a larger default limit, but still allow a user to increase/decrease it + */ + &cli.IntFlag{ + Name: otsdbQueryLimit, + Usage: "Result limit on meta queries to OpenTSDB (affects both metric name and tag value queries, recommended to use a value exceeding your largest series)", + Value: 100e3, + }, + &cli.BoolFlag{ + Name: otsdbMsecsTime, + Value: false, + Usage: "Whether OpenTSDB is writing values in milliseconds or seconds", + }, + &cli.BoolFlag{ + Name: otsdbNormalize, + Value: false, + Usage: "Whether to normalize all data received to lower case before forwarding to VictoriaMetrics", + }, + } +) + const ( influxAddr = "influx-addr" influxUser = "influx-user" diff --git a/app/vmctl/main.go b/app/vmctl/main.go index d3268f3538..0c88f12718 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/influx" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" @@ -23,6 +24,38 @@ func main() { Usage: "Victoria metrics command-line tool", Version: buildinfo.Version, Commands: []*cli.Command{ + { + Name: "opentsdb", + Usage: "Migrate timeseries from OpenTSDB", + Flags: mergeFlags(globalFlags, otsdbFlags, vmFlags), + Action: func(c *cli.Context) error { + fmt.Println("OpenTSDB import mode") + + oCfg := opentsdb.Config{ + Addr: c.String(otsdbAddr), + Limit: c.Int(otsdbQueryLimit), + Offset: c.Int64(otsdbOffsetDays), + HardTS: c.Int64(otsdbHardTSStart), + Retentions: c.StringSlice(otsdbRetentions), + Filters: c.StringSlice(otsdbFilters), + Normalize: c.Bool(otsdbNormalize), + MsecsTime: c.Bool(otsdbMsecsTime), + } + otsdbClient, err := opentsdb.NewClient(oCfg) + if err != nil { + return fmt.Errorf("failed to create opentsdb client: %s", err) + } + + vmCfg := initConfigVM(c) + importer, err := vm.NewImporter(vmCfg) + if err != nil { + return fmt.Errorf("failed to create VM importer: %s", err) + } + + otsdbProcessor := newOtsdbProcessor(otsdbClient, importer, c.Int(otsdbConcurrency)) + return otsdbProcessor.run(c.Bool(globalSilent)) + }, + }, { Name: "influx", Usage: "Migrate timeseries from InfluxDB", diff --git a/app/vmctl/opentsdb.go b/app/vmctl/opentsdb.go new file mode 100644 index 0000000000..8135fba3eb --- /dev/null +++ b/app/vmctl/opentsdb.go @@ -0,0 +1,159 @@ +package main + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/opentsdb" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" + "github.com/cheggaaa/pb/v3" +) + +type otsdbProcessor struct { + oc *opentsdb.Client + im *vm.Importer + otsdbcc int +} + +type queryObj struct { + Series opentsdb.Meta + Rt opentsdb.RetentionMeta + Tr opentsdb.TimeRange + StartTime int64 +} + +func newOtsdbProcessor(oc *opentsdb.Client, im *vm.Importer, otsdbcc int) *otsdbProcessor { + if otsdbcc < 1 { + otsdbcc = 1 + } + return &otsdbProcessor{ + oc: oc, + im: im, + otsdbcc: otsdbcc, + } +} + +func (op *otsdbProcessor) run(silent bool) error { + log.Println("Loading all metrics from OpenTSDB for filters: ", op.oc.Filters) + var metrics []string + for _, filter := range op.oc.Filters { + q := fmt.Sprintf("%s/api/suggest?type=metrics&q=%s&max=%d", op.oc.Addr, filter, op.oc.Limit) + m, err := op.oc.FindMetrics(q) + if err != nil { + return fmt.Errorf("metric discovery failed for %q: %s", q, err) + } + metrics = append(metrics, m...) + } + if len(metrics) < 1 { + return fmt.Errorf("found no timeseries to import with filters %q", op.oc.Filters) + } + + question := fmt.Sprintf("Found %d metrics to import. Continue?", len(metrics)) + if !silent && !prompt(question) { + return nil + } + op.im.ResetStats() + startTime := time.Now().Unix() + queryRanges := 0 + // pre-calculate the number of query ranges we'll be processing + for _, rt := range op.oc.Retentions { + queryRanges += len(rt.QueryRanges) + } + for _, metric := range metrics { + log.Println(fmt.Sprintf("Starting work on %s", metric)) + serieslist, err := op.oc.FindSeries(metric) + if err != nil { + return fmt.Errorf("couldn't retrieve series list for %s : %s", metric, err) + } + /* + Create channels for collecting/processing series and errors + We'll create them per metric to reduce pressure against OpenTSDB + + Limit the size of seriesCh so we can't get too far ahead of actual processing + */ + seriesCh := make(chan queryObj, op.otsdbcc) + errCh := make(chan error) + // we're going to make serieslist * queryRanges queries, so we should represent that in the progress bar + bar := pb.StartNew(len(serieslist) * queryRanges) + var wg sync.WaitGroup + wg.Add(op.otsdbcc) + for i := 0; i < op.otsdbcc; i++ { + go func() { + defer wg.Done() + for s := range seriesCh { + if err := op.do(s); err != nil { + errCh <- fmt.Errorf("couldn't retrieve series for %s : %s", metric, err) + return + } + bar.Increment() + } + }() + } + /* + Loop through all series for this metric, processing all retentions and time ranges + requested. This loop is our primary "collect data from OpenTSDB loop" and should + be async, sending data to VictoriaMetrics over time. + + The idea with having the select at the inner-most loop is to ensure quick + short-circuiting on error. + */ + for _, series := range serieslist { + for _, rt := range op.oc.Retentions { + for _, tr := range rt.QueryRanges { + select { + case otsdbErr := <-errCh: + return fmt.Errorf("opentsdb error: %s", otsdbErr) + case vmErr := <-op.im.Errors(): + return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr)) + case seriesCh <- queryObj{ + Tr: tr, StartTime: startTime, + Series: series, Rt: opentsdb.RetentionMeta{ + FirstOrder: rt.FirstOrder, SecondOrder: rt.SecondOrder, AggTime: rt.AggTime}}: + } + } + } + } + // Drain channels per metric + close(seriesCh) + wg.Wait() + close(errCh) + // check for any lingering errors on the query side + for otsdbErr := range errCh { + return fmt.Errorf("Import process failed: \n%s", otsdbErr) + } + bar.Finish() + log.Print(op.im.Stats()) + } + op.im.Close() + for vmErr := range op.im.Errors() { + return fmt.Errorf("Import process failed: \n%s", wrapErr(vmErr)) + } + log.Println("Import finished!") + log.Print(op.im.Stats()) + return nil +} + +func (op *otsdbProcessor) do(s queryObj) error { + start := s.StartTime - s.Tr.Start + end := s.StartTime - s.Tr.End + data, err := op.oc.GetData(s.Series, s.Rt, start, end) + if err != nil { + return fmt.Errorf("failed to collect data for %v in %v:%v :: %v", s.Series, s.Rt, s.Tr, err) + } + if len(data.Timestamps) < 1 || len(data.Values) < 1 { + return nil + } + labels := make([]vm.LabelPair, len(data.Tags)) + for k, v := range data.Tags { + labels = append(labels, vm.LabelPair{Name: k, Value: v}) + } + op.im.Input() <- &vm.TimeSeries{ + Name: data.Metric, + LabelPairs: labels, + Timestamps: data.Timestamps, + Values: data.Values, + } + return nil +} diff --git a/app/vmctl/opentsdb/opentsdb.go b/app/vmctl/opentsdb/opentsdb.go new file mode 100644 index 0000000000..b2c32931b7 --- /dev/null +++ b/app/vmctl/opentsdb/opentsdb.go @@ -0,0 +1,347 @@ +package opentsdb + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "strings" + "time" +) + +// Retention objects contain meta data about what to query for our run +type Retention struct { + /* + OpenTSDB has two levels of aggregation, + First, we aggregate any un-mentioned tags into the last result + Second, we aggregate into buckets over time + To simulate this with config, we have + FirstOrder (e.g. sum/avg/max/etc.) + SecondOrder (e.g. sum/avg/max/etc.) + AggTime (e.g. 1m/10m/1d/etc.) + This will build into m=:--none: + Or an example: m=sum:1m-avg-none + */ + FirstOrder string + SecondOrder string + AggTime string + // The actual ranges will will attempt to query (as offsets from now) + QueryRanges []TimeRange +} + +// RetentionMeta objects exist to pass smaller subsets (only one retention range) of a full Retention object around +type RetentionMeta struct { + FirstOrder string + SecondOrder string + AggTime string +} + +// Client object holds general config about how queries should be performed +type Client struct { + Addr string + // The meta query limit for series returned + Limit int + Retentions []Retention + Filters []string + Normalize bool +} + +// Config contains fields required +// for Client configuration +type Config struct { + Addr string + Limit int + Offset int64 + HardTS int64 + Retentions []string + Filters []string + Normalize bool + MsecsTime bool +} + +// TimeRange contains data about time ranges to query +type TimeRange struct { + Start int64 + End int64 +} + +// MetaResults contains return data from search series lookup queries +type MetaResults struct { + Type string `json:"type"` + Results []Meta `json:"results"` + //metric string + //tags interface{} + //limit int + //time int + //startIndex int + //totalResults int +} + +// Meta A meta object about a metric +// only contain the tags/etc. and no data +type Meta struct { + //tsuid string + Metric string `json:"metric"` + Tags map[string]string `json:"tags"` +} + +// Metric holds the time series data +type Metric struct { + Metric string + Tags map[string]string + Timestamps []int64 + Values []float64 +} + +// ExpressionOutput contains results from actual data queries +type ExpressionOutput struct { + Outputs []qoObj `json:"outputs"` + Query interface{} `json:"query"` +} + +// QoObj contains actual timeseries data from the returned data query +type qoObj struct { + ID string `json:"id"` + Alias string `json:"alias"` + Dps [][]float64 `json:"dps"` + //dpsMeta interface{} + //meta interface{} +} + +// Expression objects format our data queries +/* +All of the following structs are to build a OpenTSDB expression object +*/ +type Expression struct { + Time timeObj `json:"time"` + Filters []filterObj `json:"filters"` + Metrics []metricObj `json:"metrics"` + // this just needs to be an empty object, so the value doesn't matter + Expressions []int `json:"expressions"` + Outputs []outputObj `json:"outputs"` +} + +type timeObj struct { + Start int64 `json:"start"` + End int64 `json:"end"` + Aggregator string `json:"aggregator"` + Downsampler dSObj `json:"downsampler"` +} + +type dSObj struct { + Interval string `json:"interval"` + Aggregator string `json:"aggregator"` + FillPolicy fillObj `json:"fillPolicy"` +} + +type fillObj struct { + // we'll always hard-code to NaN here, so we don't need value + Policy string `json:"policy"` +} + +type filterObj struct { + Tags []tagObj `json:"tags"` + ID string `json:"id"` +} + +type tagObj struct { + Type string `json:"type"` + Tagk string `json:"tagk"` + Filter string `json:"filter"` + GroupBy bool `json:"groupBy"` +} + +type metricObj struct { + ID string `json:"id"` + Metric string `json:"metric"` + Filter string `json:"filter"` + FillPolicy fillObj `json:"fillPolicy"` +} + +type outputObj struct { + ID string `json:"id"` + Alias string `json:"alias"` +} + +/* End expression object structs */ + +var ( + exprOutput = outputObj{ID: "a", Alias: "query"} + exprFillPolicy = fillObj{Policy: "nan"} +) + +// FindMetrics discovers all metrics that OpenTSDB knows about (given a filter) +// e.g. /api/suggest?type=metrics&q=system&max=100000 +func (c Client) FindMetrics(q string) ([]string, error) { + resp, err := http.Get(q) + if err != nil { + return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err) + } + if resp.StatusCode != 200 { + return nil, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp) + } + defer func() { _ = resp.Body.Close() }() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("could not retrieve metric data from %q: %s", q, err) + } + var metriclist []string + err = json.Unmarshal(body, &metriclist) + if err != nil { + return nil, fmt.Errorf("failed to read response from %q: %s", q, err) + } + return metriclist, nil +} + +// FindSeries discovers all series associated with a metric +// e.g. /api/search/lookup?m=system.load5&limit=1000000 +func (c Client) FindSeries(metric string) ([]Meta, error) { + q := fmt.Sprintf("%s/api/search/lookup?m=%s&limit=%d", c.Addr, metric, c.Limit) + resp, err := http.Get(q) + if err != nil { + return nil, fmt.Errorf("failed to set GET request to %q: %s", q, err) + } + if resp.StatusCode != 200 { + return nil, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp) + } + defer func() { _ = resp.Body.Close() }() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("could not retrieve series data from %q: %s", q, err) + } + var results MetaResults + err = json.Unmarshal(body, &results) + if err != nil { + return nil, fmt.Errorf("failed to read response from %q: %s", q, err) + } + return results.Results, nil +} + +// GetData actually retrieves data for a series at a specified time range +func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (Metric, error) { + /* + Here we build the actual exp query we'll send to OpenTSDB + + This is comprised of a number of different settings. We hard-code + a few to simplify the JSON object creation. + There are examples queries available, so not too much detail here... + */ + expr := Expression{} + expr.Outputs = []outputObj{exprOutput} + expr.Metrics = append(expr.Metrics, metricObj{ID: "a", Metric: series.Metric, + Filter: "f1", FillPolicy: exprFillPolicy}) + expr.Time = timeObj{Start: start, End: end, Aggregator: rt.FirstOrder, + Downsampler: dSObj{Interval: rt.AggTime, + Aggregator: rt.SecondOrder, + FillPolicy: exprFillPolicy}} + var TagList []tagObj + for k, v := range series.Tags { + /* + every tag should be a literal_or because that's the closest to a full "==" that + this endpoint allows for + */ + TagList = append(TagList, tagObj{Type: "literal_or", Tagk: k, + Filter: v, GroupBy: true}) + } + expr.Filters = append(expr.Filters, filterObj{ID: "f1", Tags: TagList}) + // "expressions" is required in the query object or we get a 5xx, so force it to exist + expr.Expressions = make([]int, 0) + inputData, err := json.Marshal(expr) + if err != nil { + return Metric{}, fmt.Errorf("failed to marshal query JSON %s", err) + } + q := fmt.Sprintf("%s/api/query/exp", c.Addr) + resp, err := http.Post(q, "application/json", bytes.NewBuffer(inputData)) + if err != nil { + return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err) + } + if resp.StatusCode != 200 { + return Metric{}, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp) + } + defer func() { _ = resp.Body.Close() }() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return Metric{}, fmt.Errorf("could not retrieve series data from %q: %s", q, err) + } + var output ExpressionOutput + err = json.Unmarshal(body, &output) + if err != nil { + return Metric{}, fmt.Errorf("failed to unmarshal response from %q: %s", q, err) + } + if len(output.Outputs) < 1 { + // no results returned...return an empty object without error + return Metric{}, nil + } + data := Metric{} + data.Metric = series.Metric + data.Tags = series.Tags + /* + We evaluate data for correctness before formatting the actual values + to skip a little bit of time if the series has invalid formatting + + First step is to enforce Prometheus' data model + */ + data, err = modifyData(data, c.Normalize) + if err != nil { + return Metric{}, fmt.Errorf("invalid series data from %q: %s", q, err) + } + /* + Convert data from OpenTSDB's output format ([[ts,val],[ts,val]...]) + to VictoriaMetrics format: {"timestamps": [ts,ts,ts...], "values": [val,val,val...]} + The nasty part here is that because an object in each array + can be a float64, we have to initially cast _all_ objects that way + then convert the timestamp back to something reasonable. + */ + for _, tsobj := range output.Outputs[0].Dps { + data.Timestamps = append(data.Timestamps, int64(tsobj[0])) + data.Values = append(data.Values, tsobj[1]) + } + return data, nil +} + +// NewClient creates and returns OpenTSDB client +// configured with passed Config +func NewClient(cfg Config) (*Client, error) { + var retentions []Retention + offsetPrint := int64(time.Now().Unix()) + if cfg.MsecsTime { + // 1000000 == Nanoseconds -> Milliseconds difference + offsetPrint = int64(time.Now().UnixNano() / 1000000) + } + if cfg.HardTS > 0 { + /* + "Hard" offsets are specific timestamps, rather than + a relative number of days. To use them effectively + we should subtract them from our default offset (Now) + */ + offsetPrint = offsetPrint - cfg.HardTS + } else if cfg.Offset > 0 { + /* + Our "offset" is the number of days we should step + back before starting to scan for data + */ + if cfg.MsecsTime { + offsetPrint = offsetPrint - (cfg.Offset * 24 * 60 * 60 * 1000) + } else { + offsetPrint = offsetPrint - (cfg.Offset * 24 * 60 * 60) + } + } + log.Println(fmt.Sprintf("Will collect data starting at TS %v", offsetPrint)) + for _, r := range cfg.Retentions { + ret, err := convertRetention(r, offsetPrint, cfg.MsecsTime) + if err != nil { + return &Client{}, fmt.Errorf("Couldn't parse retention %q :: %v", r, err) + } + retentions = append(retentions, ret) + } + client := &Client{ + Addr: strings.Trim(cfg.Addr, "/"), + Retentions: retentions, + Limit: cfg.Limit, + Filters: cfg.Filters, + Normalize: cfg.Normalize, + } + return client, nil +} diff --git a/app/vmctl/opentsdb/parser.go b/app/vmctl/opentsdb/parser.go new file mode 100644 index 0000000000..32e8f9f31e --- /dev/null +++ b/app/vmctl/opentsdb/parser.go @@ -0,0 +1,173 @@ +package opentsdb + +import ( + "fmt" + "regexp" + "strconv" + "strings" + "time" +) + +var ( + allowedNames = regexp.MustCompile("^[a-zA-Z][a-zA-Z0-9_:]*$") + allowedFirstChar = regexp.MustCompile("^[a-zA-Z]") + replaceChars = regexp.MustCompile("[^a-zA-Z0-9_:]") + allowedTagKeys = regexp.MustCompile("^[a-zA-Z][a-zA-Z0-9_]*$") +) + +func convertDuration(duration string) (time.Duration, error) { + /* + Golang's time library doesn't support many different + string formats (year, month, week, day) because they + aren't consistent ranges. But Java's library _does_. + Consequently, we'll need to handle all the custom + time ranges, and, to make the internal API call consistent, + we'll need to allow for durations that Go supports, too. + + The nice thing is all the "broken" time ranges are > 1 hour, + so we can just make assumptions to convert them to a range in hours. + They aren't *good* assumptions, but they're reasonable + for this function. + */ + var actualDuration time.Duration + var err error + var timeValue int + if strings.HasSuffix(duration, "y") { + timeValue, err = strconv.Atoi(strings.Trim(duration, "y")) + if err != nil { + return 0, fmt.Errorf("invalid time range: %q", duration) + } + timeValue = timeValue * 365 * 24 + actualDuration, err = time.ParseDuration(fmt.Sprintf("%vh", timeValue)) + if err != nil { + return 0, fmt.Errorf("invalid time range: %q", duration) + } + } else if strings.HasSuffix(duration, "w") { + timeValue, err = strconv.Atoi(strings.Trim(duration, "w")) + if err != nil { + return 0, fmt.Errorf("invalid time range: %q", duration) + } + timeValue = timeValue * 7 * 24 + actualDuration, err = time.ParseDuration(fmt.Sprintf("%vh", timeValue)) + if err != nil { + return 0, fmt.Errorf("invalid time range: %q", duration) + } + } else if strings.HasSuffix(duration, "d") { + timeValue, err = strconv.Atoi(strings.Trim(duration, "d")) + if err != nil { + return 0, fmt.Errorf("invalid time range: %q", duration) + } + timeValue = timeValue * 24 + actualDuration, err = time.ParseDuration(fmt.Sprintf("%vh", timeValue)) + if err != nil { + return 0, fmt.Errorf("invalid time range: %q", duration) + } + } else if strings.HasSuffix(duration, "h") || strings.HasSuffix(duration, "m") || strings.HasSuffix(duration, "s") || strings.HasSuffix(duration, "ms") { + actualDuration, err = time.ParseDuration(duration) + if err != nil { + return 0, fmt.Errorf("invalid time range: %q", duration) + } + } else { + return 0, fmt.Errorf("invalid time duration string: %q", duration) + } + return actualDuration, nil +} + +// Convert an incoming retention "string" into the component parts +func convertRetention(retention string, offset int64, msecTime bool) (Retention, error) { + /* + A retention string coming in looks like + sum-1m-avg:1h:30d + So we: + 1. split on the : + 2. split on the - in slice 0 + 3. create the time ranges we actually need + */ + chunks := strings.Split(retention, ":") + if len(chunks) != 3 { + return Retention{}, fmt.Errorf("invalid retention string: %q", retention) + } + rowLengthDuration, err := convertDuration(chunks[1]) + if err != nil { + return Retention{}, fmt.Errorf("invalid row length (first order) duration string: %q: %s", chunks[1], err) + } + // set length of each row in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds + rowLength := rowLengthDuration.Milliseconds() + if !msecTime { + rowLength = rowLength / 1000 + } + ttlDuration, err := convertDuration(chunks[2]) + if err != nil { + return Retention{}, fmt.Errorf("invalid ttl (second order) duration string: %q: %s", chunks[2], err) + } + // set ttl in milliseconds, unless we aren't using millisecond time in OpenTSDB...then use seconds + ttl := ttlDuration.Milliseconds() + if !msecTime { + ttl = ttl / 1000 + } + // bump by the offset so we don't look at empty ranges any time offset > ttl + ttl += offset + var timeChunks []TimeRange + var i int64 + for i = offset; i <= ttl; i = i + rowLength { + timeChunks = append(timeChunks, TimeRange{Start: i + rowLength, End: i}) + } + // first/second order aggregations for queries defined in chunk 0... + aggregates := strings.Split(chunks[0], "-") + if len(aggregates) != 3 { + return Retention{}, fmt.Errorf("invalid aggregation string: %q", chunks[0]) + } + + ret := Retention{FirstOrder: aggregates[0], + SecondOrder: aggregates[2], + AggTime: aggregates[1], + QueryRanges: timeChunks} + return ret, nil +} + +// This ensures any incoming data from OpenTSDB matches the Prometheus data model +// https://prometheus.io/docs/concepts/data_model +func modifyData(msg Metric, normalize bool) (Metric, error) { + finalMsg := Metric{ + Metric: "", Tags: make(map[string]string), + Timestamps: msg.Timestamps, Values: msg.Values, + } + // if the metric name has invalid characters, the data model says to drop it + if !allowedFirstChar.MatchString(msg.Metric) { + return Metric{}, fmt.Errorf("%s has a bad first character", msg.Metric) + } + name := msg.Metric + // if normalization requested, lowercase the name + if normalize { + name = strings.ToLower(name) + } + /* + replace bad characters in metric name with _ per the data model + only replace if needed to reduce string processing time + */ + if !allowedNames.MatchString(name) { + finalMsg.Metric = replaceChars.ReplaceAllString(name, "_") + } else { + finalMsg.Metric = name + } + // replace bad characters in tag keys with _ per the data model + for key, value := range msg.Tags { + // if normalization requested, lowercase the key and value + if normalize { + key = strings.ToLower(key) + value = strings.ToLower(value) + } + /* + replace all explicitly bad characters with _ + only replace if needed to reduce string processing time + */ + if !allowedTagKeys.MatchString(key) { + key = replaceChars.ReplaceAllString(key, "_") + } + // tags that start with __ are considered custom stats for internal prometheus stuff, we should drop them + if !strings.HasPrefix(key, "__") { + finalMsg.Tags[key] = value + } + } + return finalMsg, nil +} diff --git a/app/vmctl/opentsdb/parser_test.go b/app/vmctl/opentsdb/parser_test.go new file mode 100644 index 0000000000..0aa93b7cd0 --- /dev/null +++ b/app/vmctl/opentsdb/parser_test.go @@ -0,0 +1,217 @@ +package opentsdb + +import ( + "testing" +) + +func TestConvertRetention(t *testing.T) { + /* + 2592000 seconds in 30 days + 3600 in one hour + 2592000 / 3600 = 720 individual query "ranges" should exist, plus one because time ranges can be weird + First order should == "sum" + Second order should == "avg" + AggTime should == "1m" + */ + res, err := convertRetention("sum-1m-avg:1h:30d", 0, false) + if err != nil { + t.Fatalf("Error parsing valid retention string: %v", err) + } + if len(res.QueryRanges) != 721 { + t.Fatalf("Found %v query ranges. Should have found 720", len(res.QueryRanges)) + } + if res.FirstOrder != "sum" { + t.Fatalf("Incorrect first order aggregation %q. Should have been 'sum'", res.FirstOrder) + } + if res.SecondOrder != "avg" { + t.Fatalf("Incorrect second order aggregation %q. Should have been 'avg'", res.SecondOrder) + } + if res.AggTime != "1m" { + t.Fatalf("Incorrect aggregation time length %q. Should have been '1m'", res.AggTime) + } + /* + Invalid retention string + */ + res, err = convertRetention("sum-1m-avg:30d", 0, false) + if err == nil { + t.Fatalf("Bad retention string (sum-1m-avg:30d) didn't fail: %v", res) + } + /* + Invalid aggregation string + */ + res, err = convertRetention("sum-1m:1h:30d", 0, false) + if err == nil { + t.Fatalf("Bad aggregation string (sum-1m:1h:30d) didn't fail: %v", res) + } +} + +func TestModifyData(t *testing.T) { + /* + Good metric metadata + */ + m := Metric{ + Metric: "cpu", + Tags: map[string]string{ + "core": "0", + }, + Values: []float64{ + 0, + }, + Timestamps: []int64{ + 0, + }, + } + res, err := modifyData(m, false) + if err != nil { + t.Fatalf("Valid metric %v failed to parse: %v", m, err) + } + if res.Metric != "cpu" { + t.Fatalf("Valid metric name %q was converted: %q", m.Metric, res.Metric) + } + found := false + for k := range res.Tags { + if k == "core" { + found = true + break + } + } + if !found { + t.Fatalf("Valid metric tag name 'core' missing: %v", res.Tags) + } + + /* + Bad first character in metric name + metric names cannot start with _, so this + metric should fail entirely + */ + m = Metric{ + Metric: "_cpu", + Tags: map[string]string{ + "core": "0", + }, + Values: []float64{ + 0, + }, + Timestamps: []int64{ + 0, + }, + } + res, err = modifyData(m, false) + if err == nil { + t.Fatalf("Invalid metric %v parsed?", m) + } + + /* + Bad character in metric name + metric names cannot have `.`, so this + should be converted to `_` + */ + m = Metric{ + Metric: "cpu.name", + Tags: map[string]string{ + "core": "0", + }, + Values: []float64{ + 0, + }, + Timestamps: []int64{ + 0, + }, + } + res, err = modifyData(m, false) + if err != nil { + t.Fatalf("Valid metric failed to parse? %v", err) + } + if res.Metric != "cpu_name" { + t.Fatalf("Metric name not correctly converted from 'cpu.name' to 'cpu_name': %q", res.Metric) + } + + /* + bad tag prefix (__) + Prometheus considers tags beginning with __ + to be internal use only. They should not show up in incoming data. + this tag should be dropped from the result + */ + m = Metric{ + Metric: "cpu", + Tags: map[string]string{ + "__core": "0", + }, + Values: []float64{ + 0, + }, + Timestamps: []int64{ + 0, + }, + } + res, err = modifyData(m, false) + if err != nil { + t.Fatalf("Valid metric failed to parse? %v", err) + } + found = false + for k := range res.Tags { + if k == "__core" { + found = true + break + } + } + if found { + t.Fatalf("Bad tag key prefix (__) found") + } + + /* + bad tag key + tag keys cannot contain `.`, this should be + replaced with `_` + */ + m = Metric{ + Metric: "cpu", + Tags: map[string]string{ + "core.name": "0", + }, + Values: []float64{ + 0, + }, + Timestamps: []int64{ + 0, + }, + } + res, err = modifyData(m, false) + if err != nil { + t.Fatalf("Valid metric failed to parse? %v", err) + } + found = false + for k := range res.Tags { + if k == "core.name" { + found = true + break + } + } + if found { + t.Fatalf("Bad tag key 'core.name' not converted") + } + + /* + test normalize + All characters should be returned lowercase + */ + m = Metric{ + Metric: "CPU", + Tags: map[string]string{ + "core": "0", + }, + Values: []float64{ + 0, + }, + Timestamps: []int64{ + 0, + }, + } + res, err = modifyData(m, true) + if err != nil { + t.Fatalf("Valid metric failed to parse? %v", err) + } + if res.Metric != "cpu" { + t.Fatalf("Normalization of metric name didn't happen!") + } +} diff --git a/app/vmctl/opentsdb/testdata/exampleOutput.json b/app/vmctl/opentsdb/testdata/exampleOutput.json new file mode 100644 index 0000000000..8e4c1a6adb --- /dev/null +++ b/app/vmctl/opentsdb/testdata/exampleOutput.json @@ -0,0 +1,398 @@ +{ + "outputs": [ + { + "id": "a", + "alias": "query", + "dps": [ + [ + 1614099600000, + 0.28 + ], + [ + 1614099660000, + 0.22 + ], + [ + 1614099720000, + 0.18 + ], + [ + 1614099780000, + 0.14 + ], + [ + 1614099840000, + 0.24 + ], + [ + 1614099900000, + 0.19 + ], + [ + 1614099960000, + 0.22 + ], + [ + 1614100020000, + 0.2 + ], + [ + 1614100080000, + 0.18 + ], + [ + 1614100140000, + 0.22 + ], + [ + 1614100200000, + 0.17 + ], + [ + 1614100260000, + 0.16 + ], + [ + 1614100320000, + 0.22 + ], + [ + 1614100380000, + 0.3 + ], + [ + 1614100440000, + 0.28 + ], + [ + 1614100500000, + 0.27 + ], + [ + 1614100560000, + 0.26 + ], + [ + 1614100620000, + 0.23 + ], + [ + 1614100680000, + 0.18 + ], + [ + 1614100740000, + 0.3 + ], + [ + 1614100800000, + 0.24 + ], + [ + 1614100860000, + 0.19 + ], + [ + 1614100920000, + 0.16 + ], + [ + 1614100980000, + 0.19 + ], + [ + 1614101040000, + 0.23 + ], + [ + 1614101100000, + 0.18 + ], + [ + 1614101160000, + 0.15 + ], + [ + 1614101220000, + 0.12 + ], + [ + 1614101280000, + 0.1 + ], + [ + 1614101340000, + 0.24 + ], + [ + 1614101400000, + 0.19 + ], + [ + 1614101460000, + 0.16 + ], + [ + 1614101520000, + 0.14 + ], + [ + 1614101580000, + 0.12 + ], + [ + 1614101640000, + 0.14 + ], + [ + 1614101700000, + 0.12 + ], + [ + 1614101760000, + 0.13 + ], + [ + 1614101820000, + 0.12 + ], + [ + 1614101880000, + 0.11 + ], + [ + 1614101940000, + 0.36 + ], + [ + 1614102000000, + 0.35 + ], + [ + 1614102060000, + 0.3 + ], + [ + 1614102120000, + 0.32 + ], + [ + 1614102180000, + 0.27 + ], + [ + 1614102240000, + 0.26 + ], + [ + 1614102300000, + 0.21 + ], + [ + 1614102360000, + 0.18 + ], + [ + 1614102420000, + 0.15 + ], + [ + 1614102480000, + 0.12 + ], + [ + 1614102540000, + 0.24 + ], + [ + 1614102600000, + 0.2 + ], + [ + 1614102660000, + 0.17 + ], + [ + 1614102720000, + 0.18 + ], + [ + 1614102780000, + 0.14 + ], + [ + 1614102840000, + 0.39 + ], + [ + 1614102900000, + 0.31 + ], + [ + 1614102960000, + 0.3 + ], + [ + 1614103020000, + 0.24 + ], + [ + 1614103080000, + 0.26 + ], + [ + 1614103140000, + 0.21 + ], + [ + 1614103200000, + 0.17 + ], + [ + 1614103260000, + 0.15 + ], + [ + 1614103320000, + 0.2 + ], + [ + 1614103380000, + 0.2 + ], + [ + 1614103440000, + 0.22 + ], + [ + 1614103500000, + 0.19 + ], + [ + 1614103560000, + 0.22 + ], + [ + 1614103620000, + 0.29 + ], + [ + 1614103680000, + 0.31 + ], + [ + 1614103740000, + 0.28 + ], + [ + 1614103800000, + 0.23 + ] + ], + "dpsMeta": { + "firstTimestamp": 1614099600000, + "lastTimestamp": 1614103800000, + "setCount": 71, + "series": 1 + }, + "meta": [ + { + "index": 0, + "metrics": [ + "timestamp" + ] + }, + { + "index": 1, + "metrics": [ + "system.load5" + ], + "commonTags": { + "rack": "undef", + "host": "use1-mon-metrics-1", + "row": "undef", + "dc": "us-east-1", + "group": "monitoring" + }, + "aggregatedTags": [] + } + ] + } + ], + "query": { + "name": null, + "time": { + "start": "1h-ago", + "end": null, + "timezone": null, + "downsampler": { + "interval": "1m", + "aggregator": "avg", + "fillPolicy": { + "policy": "nan", + "value": "NaN" + } + }, + "aggregator": "sum", + "rate": false + }, + "filters": [ + { + "id": "f1", + "tags": [ + { + "tagk": "host", + "filter": "use1-mon-metrics-1", + "group_by": true, + "type": "literal_or" + }, + { + "tagk": "group", + "filter": "monitoring", + "group_by": true, + "type": "literal_or" + }, + { + "tagk": "dc", + "filter": "us-east-1", + "group_by": true, + "type": "literal_or" + }, + { + "tagk": "rack", + "filter": "undef", + "group_by": true, + "type": "literal_or" + }, + { + "tagk": "row", + "filter": "undef", + "group_by": true, + "type": "literal_or" + } + ], + "explicitTags": false + } + ], + "metrics": [ + { + "metric": "system.load5", + "id": "a", + "filter": "f1", + "aggregator": null, + "timeOffset": null, + "fillPolicy": { + "policy": "nan", + "value": "NaN" + } + } + ], + "expressions": [], + "outputs": [ + { + "id": "a", + "alias": "query" + } + ] + } +} diff --git a/app/vmctl/opentsdb/testdata/exampleQuery.json b/app/vmctl/opentsdb/testdata/exampleQuery.json new file mode 100644 index 0000000000..b5fac0a119 --- /dev/null +++ b/app/vmctl/opentsdb/testdata/exampleQuery.json @@ -0,0 +1,62 @@ +{ + "time": { + "start": "1h-ago", + "aggregator":"sum", + "downsampler": { + "interval": "1m", + "aggregator": "avg", + "fillPolicy": { + "policy": "nan" + } + } + }, + "filters": [ + { + "tags": [ + { + "type": "literal_or", + "tagk": "host", + "filter": "use1-mon-metrics-1", + "groupBy": true + }, + { + "type": "literal_or", + "tagk": "group", + "filter": "monitoring", + "groupBy": true + }, + { + "type": "literal_or", + "tagk": "dc", + "filter": "us-east-1", + "groupBy": true + }, + { + "type": "literal_or", + "tagk": "rack", + "filter": "undef", + "groupBy": true + }, + { + "type": "literal_or", + "tagk": "row", + "filter": "undef", + "groupBy": true + } + ], + "id": "f1" + } + ], + "metrics": [ + { + "id": "a", + "metric": "system.load5", + "filter": "f1", + "fillPolicy":{"policy":"nan"} + } + ], + "expressions": [], + "outputs":[ + {"id":"a", "alias":"query"} + ] +}