diff --git a/app/vmctl/flags.go b/app/vmctl/flags.go index 4ff0e999fd..eae82bdcb6 100644 --- a/app/vmctl/flags.go +++ b/app/vmctl/flags.go @@ -155,7 +155,7 @@ var ( &cli.IntFlag{ Name: otsdbQueryLimit, Usage: "Result limit on meta queries to OpenTSDB (affects both metric name and tag value queries, recommended to use a value exceeding your largest series)", - Value: 100e3, + Value: 100e6, }, &cli.BoolFlag{ Name: otsdbMsecsTime, diff --git a/app/vmctl/opentsdb/opentsdb.go b/app/vmctl/opentsdb/opentsdb.go index 18084c52c6..02a375c081 100644 --- a/app/vmctl/opentsdb/opentsdb.go +++ b/app/vmctl/opentsdb/opentsdb.go @@ -1,7 +1,6 @@ package opentsdb import ( - "bytes" "encoding/json" "fmt" "io/ioutil" @@ -88,7 +87,15 @@ type Meta struct { Tags map[string]string `json:"tags"` } -// Metric holds the time series data +// OtsdbMetric is a single series in OpenTSDB's returned format +type OtsdbMetric struct { + Metric string + Tags map[string]string + AggregateTags []string + Dps map[int64]float64 +} + +// Metric holds the time series data in VictoriaMetrics format type Metric struct { Metric string Tags map[string]string @@ -96,83 +103,6 @@ type Metric struct { Values []float64 } -// ExpressionOutput contains results from actual data queries -type ExpressionOutput struct { - Outputs []qoObj `json:"outputs"` - Query interface{} `json:"query"` -} - -// QoObj contains actual timeseries data from the returned data query -type qoObj struct { - ID string `json:"id"` - Alias string `json:"alias"` - Dps [][]float64 `json:"dps"` - //dpsMeta interface{} - //meta interface{} -} - -// Expression objects format our data queries -/* -All of the following structs are to build a OpenTSDB expression object -*/ -type Expression struct { - Time timeObj `json:"time"` - Filters []filterObj `json:"filters"` - Metrics []metricObj `json:"metrics"` - // this just needs to be an empty object, so the value doesn't matter - Expressions []int `json:"expressions"` - Outputs []outputObj `json:"outputs"` -} - -type timeObj struct { - Start int64 `json:"start"` - End int64 `json:"end"` - Aggregator string `json:"aggregator"` - Downsampler dSObj `json:"downsampler"` -} - -type dSObj struct { - Interval string `json:"interval"` - Aggregator string `json:"aggregator"` - FillPolicy fillObj `json:"fillPolicy"` -} - -type fillObj struct { - // we'll always hard-code to NaN here, so we don't need value - Policy string `json:"policy"` -} - -type filterObj struct { - Tags []tagObj `json:"tags"` - ID string `json:"id"` -} - -type tagObj struct { - Type string `json:"type"` - Tagk string `json:"tagk"` - Filter string `json:"filter"` - GroupBy bool `json:"groupBy"` -} - -type metricObj struct { - ID string `json:"id"` - Metric string `json:"metric"` - Filter string `json:"filter"` - FillPolicy fillObj `json:"fillPolicy"` -} - -type outputObj struct { - ID string `json:"id"` - Alias string `json:"alias"` -} - -/* End expression object structs */ - -var ( - exprOutput = outputObj{ID: "a", Alias: "query"} - exprFillPolicy = fillObj{Policy: "nan"} -) - // FindMetrics discovers all metrics that OpenTSDB knows about (given a filter) // e.g. /api/suggest?type=metrics&q=system&max=100000 func (c Client) FindMetrics(q string) ([]string, error) { @@ -221,41 +151,39 @@ func (c Client) FindSeries(metric string) ([]Meta, error) { } // GetData actually retrieves data for a series at a specified time range +// e.g. /api/query?start=1&end=200&m=sum:1m-avg-none:system.load5{host=host1} func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (Metric, error) { /* - Here we build the actual exp query we'll send to OpenTSDB - - This is comprised of a number of different settings. We hard-code - a few to simplify the JSON object creation. - There are examples queries available, so not too much detail here... + First, build our tag string. + It's literally just key=value,key=value,... */ - expr := Expression{} - expr.Outputs = []outputObj{exprOutput} - expr.Metrics = append(expr.Metrics, metricObj{ID: "a", Metric: series.Metric, - Filter: "f1", FillPolicy: exprFillPolicy}) - expr.Time = timeObj{Start: start, End: end, Aggregator: rt.FirstOrder, - Downsampler: dSObj{Interval: rt.AggTime, - Aggregator: rt.SecondOrder, - FillPolicy: exprFillPolicy}} - var TagList []tagObj + tagStr := "" for k, v := range series.Tags { - /* - every tag should be a literal_or because that's the closest to a full "==" that - this endpoint allows for - */ - TagList = append(TagList, tagObj{Type: "literal_or", Tagk: k, - Filter: v, GroupBy: true}) - } - expr.Filters = append(expr.Filters, filterObj{ID: "f1", Tags: TagList}) - // "expressions" is required in the query object or we get a 5xx, so force it to exist - expr.Expressions = make([]int, 0) - inputData, err := json.Marshal(expr) - if err != nil { - return Metric{}, fmt.Errorf("failed to marshal query JSON %s", err) + tagStr += fmt.Sprintf("%s=%s,", k, v) } + // obviously we don't want trailing commas... + tagStr = strings.Trim(tagStr, ",") - q := fmt.Sprintf("%s/api/query/exp", c.Addr) - resp, err := http.Post(q, "application/json", bytes.NewBuffer(inputData)) + /* + The aggregation policy should already be somewhat formatted: + FirstOrder (e.g. sum/avg/max/etc.) + SecondOrder (e.g. sum/avg/max/etc.) + AggTime (e.g. 1m/10m/1d/etc.) + This will build into m=:--none: + Or an example: m=sum:1m-avg-none + */ + aggPol := fmt.Sprintf("%s:%s-%s-none", rt.FirstOrder, rt.AggTime, rt.SecondOrder) + + /* + Our actual query string: + Start and End are just timestamps + We then add the aggregation policy, the metric, and the tag set + */ + queryStr := fmt.Sprintf("start=%v&end=%v&m=%s:%s{%s}", start, end, aggPol, + series.Metric, tagStr) + + q := fmt.Sprintf("%s/api/query?%s", c.Addr, queryStr) + resp, err := http.Get(q) if err != nil { return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err) } @@ -267,28 +195,63 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) ( if err != nil { return Metric{}, fmt.Errorf("could not retrieve series data from %q: %s", q, err) } - var output ExpressionOutput + var output []OtsdbMetric err = json.Unmarshal(body, &output) if err != nil { - return Metric{}, fmt.Errorf("failed to unmarshal response from %q: %s", q, err) + return Metric{}, fmt.Errorf("failed to unmarshal response from %q [%v]: %s", q, body, err) } - if len(output.Outputs) < 1 { + /* + We expect results to look like: + [ + { + "metric": "zfs_filesystem.available", + "tags": { + "rack": "6", + "replica": "1", + "host": "c7-bfyii-115", + "pool": "dattoarray", + "row": "c", + "dc": "us-west-3", + "group": "legonode" + }, + "aggregateTags": [], + "dps": { + "1626019200": 32490602877610.668, + "1626033600": 32486439014058.668 + } + } + ] + There are two things that could be bad here: + 1. There are no actual stats returned (an empty array -> []) + 2. There are aggregate tags in the results + An empty array doesn't cast to a OtsdbMetric struct well, and there's no reason to try, so we should just skip it + Because we're trying to migrate data without transformations, seeing aggregate tags could mean + we're dropping series on the floor. + */ + if len(output) < 1 { // no results returned...return an empty object without error return Metric{}, nil } + if len(output) > 1 { + // multiple series returned for a single query. We can't process this right, so... + return Metric{}, fmt.Errorf("Query returned multiple results: %v", output) + } + if len(output[0].AggregateTags) > 0 { + // This failure means we've suppressed potential series somehow... + return Metric{}, fmt.Errorf("Query somehow has aggregate tags: %v", output[0].AggregateTags) + } data := Metric{} - data.Metric = series.Metric - data.Tags = series.Tags + data.Metric = output[0].Metric + data.Tags = output[0].Tags /* We evaluate data for correctness before formatting the actual values to skip a little bit of time if the series has invalid formatting - - First step is to enforce Prometheus' data model */ data, err = modifyData(data, c.Normalize) if err != nil { return Metric{}, fmt.Errorf("invalid series data from %q: %s", q, err) } + /* Convert data from OpenTSDB's output format ([[ts,val],[ts,val]...]) to VictoriaMetrics format: {"timestamps": [ts,ts,ts...], "values": [val,val,val...]} @@ -296,9 +259,9 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) ( can be a float64, we have to initially cast _all_ objects that way then convert the timestamp back to something reasonable. */ - for _, tsobj := range output.Outputs[0].Dps { - data.Timestamps = append(data.Timestamps, int64(tsobj[0])) - data.Values = append(data.Values, tsobj[1]) + for ts, val := range output[0].Dps { + data.Timestamps = append(data.Timestamps, ts) + data.Values = append(data.Values, val) } return data, nil } @@ -308,9 +271,12 @@ func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) ( func NewClient(cfg Config) (*Client, error) { var retentions []Retention offsetPrint := int64(time.Now().Unix()) + offsetSecs := cfg.Offset * 24 * 60 * 60 if cfg.MsecsTime { // 1000000 == Nanoseconds -> Milliseconds difference offsetPrint = int64(time.Now().UnixNano() / 1000000) + // also bump offsetSecs to milliseconds + offsetSecs = offsetSecs * 1000 } if cfg.HardTS > 0 { /* @@ -318,20 +284,16 @@ func NewClient(cfg Config) (*Client, error) { Just present that if it is defined */ offsetPrint = cfg.HardTS - } else if cfg.Offset > 0 { + } else if offsetSecs > 0 { /* - Our "offset" is the number of days we should step + Our "offset" is the number of days (in seconds) we should step back before starting to scan for data */ - if cfg.MsecsTime { - offsetPrint = offsetPrint - (cfg.Offset * 24 * 60 * 60 * 1000) - } else { - offsetPrint = offsetPrint - (cfg.Offset * 24 * 60 * 60) - } + offsetPrint = offsetPrint - offsetSecs } log.Println(fmt.Sprintf("Will collect data starting at TS %v", offsetPrint)) for _, r := range cfg.Retentions { - ret, err := convertRetention(r, cfg.Offset, cfg.MsecsTime) + ret, err := convertRetention(r, offsetSecs, cfg.MsecsTime) if err != nil { return &Client{}, fmt.Errorf("Couldn't parse retention %q :: %v", r, err) } diff --git a/app/vmctl/opentsdb/parser.go b/app/vmctl/opentsdb/parser.go index 32e8f9f31e..a463129208 100644 --- a/app/vmctl/opentsdb/parser.go +++ b/app/vmctl/opentsdb/parser.go @@ -107,6 +107,7 @@ func convertRetention(retention string, offset int64, msecTime bool) (Retention, } // bump by the offset so we don't look at empty ranges any time offset > ttl ttl += offset + var timeChunks []TimeRange var i int64 for i = offset; i <= ttl; i = i + rowLength { diff --git a/app/vmctl/opentsdb/testdata/exampleOutput.json b/app/vmctl/opentsdb/testdata/exampleOutput.json deleted file mode 100644 index 8e4c1a6adb..0000000000 --- a/app/vmctl/opentsdb/testdata/exampleOutput.json +++ /dev/null @@ -1,398 +0,0 @@ -{ - "outputs": [ - { - "id": "a", - "alias": "query", - "dps": [ - [ - 1614099600000, - 0.28 - ], - [ - 1614099660000, - 0.22 - ], - [ - 1614099720000, - 0.18 - ], - [ - 1614099780000, - 0.14 - ], - [ - 1614099840000, - 0.24 - ], - [ - 1614099900000, - 0.19 - ], - [ - 1614099960000, - 0.22 - ], - [ - 1614100020000, - 0.2 - ], - [ - 1614100080000, - 0.18 - ], - [ - 1614100140000, - 0.22 - ], - [ - 1614100200000, - 0.17 - ], - [ - 1614100260000, - 0.16 - ], - [ - 1614100320000, - 0.22 - ], - [ - 1614100380000, - 0.3 - ], - [ - 1614100440000, - 0.28 - ], - [ - 1614100500000, - 0.27 - ], - [ - 1614100560000, - 0.26 - ], - [ - 1614100620000, - 0.23 - ], - [ - 1614100680000, - 0.18 - ], - [ - 1614100740000, - 0.3 - ], - [ - 1614100800000, - 0.24 - ], - [ - 1614100860000, - 0.19 - ], - [ - 1614100920000, - 0.16 - ], - [ - 1614100980000, - 0.19 - ], - [ - 1614101040000, - 0.23 - ], - [ - 1614101100000, - 0.18 - ], - [ - 1614101160000, - 0.15 - ], - [ - 1614101220000, - 0.12 - ], - [ - 1614101280000, - 0.1 - ], - [ - 1614101340000, - 0.24 - ], - [ - 1614101400000, - 0.19 - ], - [ - 1614101460000, - 0.16 - ], - [ - 1614101520000, - 0.14 - ], - [ - 1614101580000, - 0.12 - ], - [ - 1614101640000, - 0.14 - ], - [ - 1614101700000, - 0.12 - ], - [ - 1614101760000, - 0.13 - ], - [ - 1614101820000, - 0.12 - ], - [ - 1614101880000, - 0.11 - ], - [ - 1614101940000, - 0.36 - ], - [ - 1614102000000, - 0.35 - ], - [ - 1614102060000, - 0.3 - ], - [ - 1614102120000, - 0.32 - ], - [ - 1614102180000, - 0.27 - ], - [ - 1614102240000, - 0.26 - ], - [ - 1614102300000, - 0.21 - ], - [ - 1614102360000, - 0.18 - ], - [ - 1614102420000, - 0.15 - ], - [ - 1614102480000, - 0.12 - ], - [ - 1614102540000, - 0.24 - ], - [ - 1614102600000, - 0.2 - ], - [ - 1614102660000, - 0.17 - ], - [ - 1614102720000, - 0.18 - ], - [ - 1614102780000, - 0.14 - ], - [ - 1614102840000, - 0.39 - ], - [ - 1614102900000, - 0.31 - ], - [ - 1614102960000, - 0.3 - ], - [ - 1614103020000, - 0.24 - ], - [ - 1614103080000, - 0.26 - ], - [ - 1614103140000, - 0.21 - ], - [ - 1614103200000, - 0.17 - ], - [ - 1614103260000, - 0.15 - ], - [ - 1614103320000, - 0.2 - ], - [ - 1614103380000, - 0.2 - ], - [ - 1614103440000, - 0.22 - ], - [ - 1614103500000, - 0.19 - ], - [ - 1614103560000, - 0.22 - ], - [ - 1614103620000, - 0.29 - ], - [ - 1614103680000, - 0.31 - ], - [ - 1614103740000, - 0.28 - ], - [ - 1614103800000, - 0.23 - ] - ], - "dpsMeta": { - "firstTimestamp": 1614099600000, - "lastTimestamp": 1614103800000, - "setCount": 71, - "series": 1 - }, - "meta": [ - { - "index": 0, - "metrics": [ - "timestamp" - ] - }, - { - "index": 1, - "metrics": [ - "system.load5" - ], - "commonTags": { - "rack": "undef", - "host": "use1-mon-metrics-1", - "row": "undef", - "dc": "us-east-1", - "group": "monitoring" - }, - "aggregatedTags": [] - } - ] - } - ], - "query": { - "name": null, - "time": { - "start": "1h-ago", - "end": null, - "timezone": null, - "downsampler": { - "interval": "1m", - "aggregator": "avg", - "fillPolicy": { - "policy": "nan", - "value": "NaN" - } - }, - "aggregator": "sum", - "rate": false - }, - "filters": [ - { - "id": "f1", - "tags": [ - { - "tagk": "host", - "filter": "use1-mon-metrics-1", - "group_by": true, - "type": "literal_or" - }, - { - "tagk": "group", - "filter": "monitoring", - "group_by": true, - "type": "literal_or" - }, - { - "tagk": "dc", - "filter": "us-east-1", - "group_by": true, - "type": "literal_or" - }, - { - "tagk": "rack", - "filter": "undef", - "group_by": true, - "type": "literal_or" - }, - { - "tagk": "row", - "filter": "undef", - "group_by": true, - "type": "literal_or" - } - ], - "explicitTags": false - } - ], - "metrics": [ - { - "metric": "system.load5", - "id": "a", - "filter": "f1", - "aggregator": null, - "timeOffset": null, - "fillPolicy": { - "policy": "nan", - "value": "NaN" - } - } - ], - "expressions": [], - "outputs": [ - { - "id": "a", - "alias": "query" - } - ] - } -} diff --git a/app/vmctl/opentsdb/testdata/exampleQuery.json b/app/vmctl/opentsdb/testdata/exampleQuery.json deleted file mode 100644 index b5fac0a119..0000000000 --- a/app/vmctl/opentsdb/testdata/exampleQuery.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "time": { - "start": "1h-ago", - "aggregator":"sum", - "downsampler": { - "interval": "1m", - "aggregator": "avg", - "fillPolicy": { - "policy": "nan" - } - } - }, - "filters": [ - { - "tags": [ - { - "type": "literal_or", - "tagk": "host", - "filter": "use1-mon-metrics-1", - "groupBy": true - }, - { - "type": "literal_or", - "tagk": "group", - "filter": "monitoring", - "groupBy": true - }, - { - "type": "literal_or", - "tagk": "dc", - "filter": "us-east-1", - "groupBy": true - }, - { - "type": "literal_or", - "tagk": "rack", - "filter": "undef", - "groupBy": true - }, - { - "type": "literal_or", - "tagk": "row", - "filter": "undef", - "groupBy": true - } - ], - "id": "f1" - } - ], - "metrics": [ - { - "id": "a", - "metric": "system.load5", - "filter": "f1", - "fillPolicy":{"policy":"nan"} - } - ], - "expressions": [], - "outputs":[ - {"id":"a", "alias":"query"} - ] -}