VictoriaMetrics/app/vmalert/datasource/vm.go
Dmitry Shevchuk 2baf98082b
Adds ability to query right vmselect endpoint based on the query type (#1050)
* Adds ability to query right vmselect endpoint based on the query type

Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>
2021-02-03 21:26:30 +00:00

207 lines
5.7 KiB
Go

package datasource
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
)
type response struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Labels map[string]string `json:"metric"`
TV [2]interface{} `json:"value"`
} `json:"result"`
} `json:"data"`
ErrorType string `json:"errorType"`
Error string `json:"error"`
}
func (r response) metrics() ([]Metric, error) {
var ms []Metric
var m Metric
var f float64
var err error
for i, res := range r.Data.Result {
f, err = strconv.ParseFloat(res.TV[1].(string), 64)
if err != nil {
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, res.TV[1], err)
}
m.Labels = nil
for k, v := range r.Data.Result[i].Labels {
m.AddLabel(k, v)
}
m.Timestamp = int64(res.TV[0].(float64))
m.Value = f
ms = append(ms, m)
}
return ms, nil
}
type graphiteResponse []graphiteResponseTarget
type graphiteResponseTarget struct {
Target string `json:"target"`
Tags map[string]string `json:"tags"`
DataPoints [][2]float64 `json:"datapoints"`
}
func (r graphiteResponse) metrics() []Metric {
var ms []Metric
for _, res := range r {
if len(res.DataPoints) < 1 {
continue
}
var m Metric
// add only last value to the result.
last := res.DataPoints[len(res.DataPoints)-1]
m.Value = last[0]
m.Timestamp = int64(last[1])
for k, v := range res.Tags {
m.AddLabel(k, v)
}
ms = append(ms, m)
}
return ms
}
// VMStorage represents vmstorage entity with ability to read and write metrics
type VMStorage struct {
c *http.Client
datasourceURL string
basicAuthUser string
basicAuthPass string
appendTypePrefix bool
lookBack time.Duration
queryStep time.Duration
}
const queryPath = "/api/v1/query"
const graphitePath = "/render"
const prometheusPrefix = "/prometheus"
const graphitePrefix = "/graphite"
// NewVMStorage is a constructor for VMStorage
func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Duration, queryStep time.Duration, appendTypePrefix bool, c *http.Client) *VMStorage {
return &VMStorage{
c: c,
basicAuthUser: basicAuthUser,
basicAuthPass: basicAuthPass,
datasourceURL: strings.TrimSuffix(baseURL, "/"),
appendTypePrefix: appendTypePrefix,
lookBack: lookBack,
queryStep: queryStep,
}
}
// Query reads metrics from datasource by given query and type
func (s *VMStorage) Query(ctx context.Context, query string, dataSourceType Type) ([]Metric, error) {
switch dataSourceType.name {
case "", prometheusType:
return s.queryDataSource(ctx, query, s.setPrometheusReqParams, parsePrometheusResponse)
case graphiteType:
return s.queryDataSource(ctx, query, s.setGraphiteReqParams, parseGraphiteResponse)
default:
return nil, fmt.Errorf("engine not found: %q", dataSourceType)
}
}
func (s *VMStorage) queryDataSource(
ctx context.Context,
query string,
setReqParams func(r *http.Request, query string),
processResponse func(r *http.Request, resp *http.Response,
) ([]Metric, error)) ([]Metric, error) {
req, err := http.NewRequest("POST", s.datasourceURL, nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
if s.basicAuthPass != "" {
req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass)
}
setReqParams(req, query)
resp, err := s.c.Do(req.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("error getting response from %s: %w", req.URL, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("datasource returns unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body)
}
return processResponse(req, resp)
}
func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) {
if s.appendTypePrefix {
r.URL.Path += prometheusPrefix
}
r.URL.Path += queryPath
q := r.URL.Query()
q.Set("query", query)
if s.lookBack > 0 {
lookBack := time.Now().Add(-s.lookBack)
q.Set("time", fmt.Sprintf("%d", lookBack.Unix()))
}
if s.queryStep > 0 {
q.Set("step", s.queryStep.String())
}
r.URL.RawQuery = q.Encode()
}
func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string) {
if s.appendTypePrefix {
r.URL.Path += graphitePrefix
}
r.URL.Path += graphitePath
q := r.URL.Query()
q.Set("format", "json")
q.Set("target", query)
from := "-5min"
if s.lookBack > 0 {
lookBack := time.Now().Add(-s.lookBack)
from = strconv.FormatInt(lookBack.Unix(), 10)
}
q.Set("from", from)
q.Set("until", "now")
r.URL.RawQuery = q.Encode()
}
const (
statusSuccess, statusError, rtVector = "success", "error", "vector"
)
func parsePrometheusResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
r := &response{}
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
return nil, fmt.Errorf("error parsing prometheus metrics for %s: %w", req.URL, err)
}
if r.Status == statusError {
return nil, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL, r.ErrorType, r.Error)
}
if r.Status != statusSuccess {
return nil, fmt.Errorf("unknown status: %s, Expected success or error ", r.Status)
}
if r.Data.ResultType != rtVector {
return nil, fmt.Errorf("unknown result type:%s. Expected vector", r.Data.ResultType)
}
return r.metrics()
}
func parseGraphiteResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
r := &graphiteResponse{}
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
return nil, fmt.Errorf("error parsing graphite metrics for %s: %w", req.URL, err)
}
return r.metrics(), nil
}