VictoriaMetrics/app/vmctl/opentsdb/opentsdb.go
John Seekins 695cb617b2
Simplify queries to OpenTSDB for migration (#1809)
* Simplify queries to OpenTSDB (and make them properly appear in OpenTSDB query stats) and also tweak defaults a bit

Signed-off-by: John Seekins <jseekins@datto.com>

* remove extraneous printlns

Signed-off-by: John Seekins <jseekins@datto.com>

* remove empty line

Signed-off-by: John Seekins <jseekins@datto.com>

* fix bug in offset calcuation and closer to working with simpler queries

Signed-off-by: John Seekins <jseekins@datto.com>

* fix boolean eval

Signed-off-by: John Seekins <jseekins@datto.com>

* fix casting and check for multiple series

Signed-off-by: John Seekins <jseekins@datto.com>
2021-11-18 20:18:15 +03:00

312 lines
9.2 KiB
Go

package opentsdb
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"time"
)
// Retention objects contain meta data about what to query for our run
type Retention struct {
/*
OpenTSDB has two levels of aggregation,
First, we aggregate any un-mentioned tags into the last result
Second, we aggregate into buckets over time
To simulate this with config, we have
FirstOrder (e.g. sum/avg/max/etc.)
SecondOrder (e.g. sum/avg/max/etc.)
AggTime (e.g. 1m/10m/1d/etc.)
This will build into m=<FirstOrder>:<AggTime>-<SecondOrder>-none:
Or an example: m=sum:1m-avg-none
*/
FirstOrder string
SecondOrder string
AggTime string
// The actual ranges will will attempt to query (as offsets from now)
QueryRanges []TimeRange
}
// RetentionMeta objects exist to pass smaller subsets (only one retention range) of a full Retention object around
type RetentionMeta struct {
FirstOrder string
SecondOrder string
AggTime string
}
// Client object holds general config about how queries should be performed
type Client struct {
Addr string
// The meta query limit for series returned
Limit int
Retentions []Retention
Filters []string
Normalize bool
HardTS int64
}
// Config contains fields required
// for Client configuration
type Config struct {
Addr string
Limit int
Offset int64
HardTS int64
Retentions []string
Filters []string
Normalize bool
MsecsTime bool
}
// TimeRange contains data about time ranges to query
type TimeRange struct {
Start int64
End int64
}
// MetaResults contains return data from search series lookup queries
type MetaResults struct {
Type string `json:"type"`
Results []Meta `json:"results"`
//metric string
//tags interface{}
//limit int
//time int
//startIndex int
//totalResults int
}
// Meta A meta object about a metric
// only contain the tags/etc. and no data
type Meta struct {
//tsuid string
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
}
// OtsdbMetric is a single series in OpenTSDB's returned format
type OtsdbMetric struct {
Metric string
Tags map[string]string
AggregateTags []string
Dps map[int64]float64
}
// Metric holds the time series data in VictoriaMetrics format
type Metric struct {
Metric string
Tags map[string]string
Timestamps []int64
Values []float64
}
// FindMetrics discovers all metrics that OpenTSDB knows about (given a filter)
// e.g. /api/suggest?type=metrics&q=system&max=100000
func (c Client) FindMetrics(q string) ([]string, error) {
resp, err := http.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve metric data from %q: %s", q, err)
}
var metriclist []string
err = json.Unmarshal(body, &metriclist)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
}
return metriclist, nil
}
// FindSeries discovers all series associated with a metric
// e.g. /api/search/lookup?m=system.load5&limit=1000000
func (c Client) FindSeries(metric string) ([]Meta, error) {
q := fmt.Sprintf("%s/api/search/lookup?m=%s&limit=%d", c.Addr, metric, c.Limit)
resp, err := http.Get(q)
if err != nil {
return nil, fmt.Errorf("failed to set GET request to %q: %s", q, err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
}
var results MetaResults
err = json.Unmarshal(body, &results)
if err != nil {
return nil, fmt.Errorf("failed to read response from %q: %s", q, err)
}
return results.Results, nil
}
// GetData actually retrieves data for a series at a specified time range
// e.g. /api/query?start=1&end=200&m=sum:1m-avg-none:system.load5{host=host1}
func (c Client) GetData(series Meta, rt RetentionMeta, start int64, end int64) (Metric, error) {
/*
First, build our tag string.
It's literally just key=value,key=value,...
*/
tagStr := ""
for k, v := range series.Tags {
tagStr += fmt.Sprintf("%s=%s,", k, v)
}
// obviously we don't want trailing commas...
tagStr = strings.Trim(tagStr, ",")
/*
The aggregation policy should already be somewhat formatted:
FirstOrder (e.g. sum/avg/max/etc.)
SecondOrder (e.g. sum/avg/max/etc.)
AggTime (e.g. 1m/10m/1d/etc.)
This will build into m=<FirstOrder>:<AggTime>-<SecondOrder>-none:
Or an example: m=sum:1m-avg-none
*/
aggPol := fmt.Sprintf("%s:%s-%s-none", rt.FirstOrder, rt.AggTime, rt.SecondOrder)
/*
Our actual query string:
Start and End are just timestamps
We then add the aggregation policy, the metric, and the tag set
*/
queryStr := fmt.Sprintf("start=%v&end=%v&m=%s:%s{%s}", start, end, aggPol,
series.Metric, tagStr)
q := fmt.Sprintf("%s/api/query?%s", c.Addr, queryStr)
resp, err := http.Get(q)
if err != nil {
return Metric{}, fmt.Errorf("failed to send GET request to %q: %s", q, err)
}
if resp.StatusCode != 200 {
return Metric{}, fmt.Errorf("Bad return from OpenTSDB: %q: %v", resp.StatusCode, resp)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return Metric{}, fmt.Errorf("could not retrieve series data from %q: %s", q, err)
}
var output []OtsdbMetric
err = json.Unmarshal(body, &output)
if err != nil {
return Metric{}, fmt.Errorf("failed to unmarshal response from %q [%v]: %s", q, body, err)
}
/*
We expect results to look like:
[
{
"metric": "zfs_filesystem.available",
"tags": {
"rack": "6",
"replica": "1",
"host": "c7-bfyii-115",
"pool": "dattoarray",
"row": "c",
"dc": "us-west-3",
"group": "legonode"
},
"aggregateTags": [],
"dps": {
"1626019200": 32490602877610.668,
"1626033600": 32486439014058.668
}
}
]
There are two things that could be bad here:
1. There are no actual stats returned (an empty array -> [])
2. There are aggregate tags in the results
An empty array doesn't cast to a OtsdbMetric struct well, and there's no reason to try, so we should just skip it
Because we're trying to migrate data without transformations, seeing aggregate tags could mean
we're dropping series on the floor.
*/
if len(output) < 1 {
// no results returned...return an empty object without error
return Metric{}, nil
}
if len(output) > 1 {
// multiple series returned for a single query. We can't process this right, so...
return Metric{}, fmt.Errorf("Query returned multiple results: %v", output)
}
if len(output[0].AggregateTags) > 0 {
// This failure means we've suppressed potential series somehow...
return Metric{}, fmt.Errorf("Query somehow has aggregate tags: %v", output[0].AggregateTags)
}
data := Metric{}
data.Metric = output[0].Metric
data.Tags = output[0].Tags
/*
We evaluate data for correctness before formatting the actual values
to skip a little bit of time if the series has invalid formatting
*/
data, err = modifyData(data, c.Normalize)
if err != nil {
return Metric{}, fmt.Errorf("invalid series data from %q: %s", q, err)
}
/*
Convert data from OpenTSDB's output format ([[ts,val],[ts,val]...])
to VictoriaMetrics format: {"timestamps": [ts,ts,ts...], "values": [val,val,val...]}
The nasty part here is that because an object in each array
can be a float64, we have to initially cast _all_ objects that way
then convert the timestamp back to something reasonable.
*/
for ts, val := range output[0].Dps {
data.Timestamps = append(data.Timestamps, ts)
data.Values = append(data.Values, val)
}
return data, nil
}
// NewClient creates and returns OpenTSDB client
// configured with passed Config
func NewClient(cfg Config) (*Client, error) {
var retentions []Retention
offsetPrint := int64(time.Now().Unix())
offsetSecs := cfg.Offset * 24 * 60 * 60
if cfg.MsecsTime {
// 1000000 == Nanoseconds -> Milliseconds difference
offsetPrint = int64(time.Now().UnixNano() / 1000000)
// also bump offsetSecs to milliseconds
offsetSecs = offsetSecs * 1000
}
if cfg.HardTS > 0 {
/*
HardTS is a specific timestamp we'll be starting at.
Just present that if it is defined
*/
offsetPrint = cfg.HardTS
} else if offsetSecs > 0 {
/*
Our "offset" is the number of days (in seconds) we should step
back before starting to scan for data
*/
offsetPrint = offsetPrint - offsetSecs
}
log.Println(fmt.Sprintf("Will collect data starting at TS %v", offsetPrint))
for _, r := range cfg.Retentions {
ret, err := convertRetention(r, offsetSecs, cfg.MsecsTime)
if err != nil {
return &Client{}, fmt.Errorf("Couldn't parse retention %q :: %v", r, err)
}
retentions = append(retentions, ret)
}
client := &Client{
Addr: strings.Trim(cfg.Addr, "/"),
Retentions: retentions,
Limit: cfg.Limit,
Filters: cfg.Filters,
Normalize: cfg.Normalize,
HardTS: cfg.HardTS,
}
return client, nil
}