mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-22 16:20:40 +01:00
a7f00101f5
Starting from v1.56.0 VM supports `round_digits` which allows to limit the number of digits after the decimal point in response value. The feature can be used to reduce entropy of produced by recording rules values and significantly improve the compression. See more details in link below. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/525
274 lines
7.3 KiB
Go
274 lines
7.3 KiB
Go
package datasource
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type response struct {
|
|
Status string `json:"status"`
|
|
Data struct {
|
|
ResultType string `json:"resultType"`
|
|
Result []struct {
|
|
Labels map[string]string `json:"metric"`
|
|
TV [2]interface{} `json:"value"`
|
|
} `json:"result"`
|
|
} `json:"data"`
|
|
ErrorType string `json:"errorType"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
func (r response) metrics() ([]Metric, error) {
|
|
var ms []Metric
|
|
var m Metric
|
|
var f float64
|
|
var err error
|
|
for i, res := range r.Data.Result {
|
|
f, err = strconv.ParseFloat(res.TV[1].(string), 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, res.TV[1], err)
|
|
}
|
|
m.Labels = nil
|
|
for k, v := range r.Data.Result[i].Labels {
|
|
m.AddLabel(k, v)
|
|
}
|
|
m.Timestamp = int64(res.TV[0].(float64))
|
|
m.Value = f
|
|
ms = append(ms, m)
|
|
}
|
|
return ms, nil
|
|
}
|
|
|
|
type graphiteResponse []graphiteResponseTarget
|
|
|
|
type graphiteResponseTarget struct {
|
|
Target string `json:"target"`
|
|
Tags map[string]string `json:"tags"`
|
|
DataPoints [][2]float64 `json:"datapoints"`
|
|
}
|
|
|
|
func (r graphiteResponse) metrics() []Metric {
|
|
var ms []Metric
|
|
for _, res := range r {
|
|
if len(res.DataPoints) < 1 {
|
|
continue
|
|
}
|
|
var m Metric
|
|
// add only last value to the result.
|
|
last := res.DataPoints[len(res.DataPoints)-1]
|
|
m.Value = last[0]
|
|
m.Timestamp = int64(last[1])
|
|
for k, v := range res.Tags {
|
|
m.AddLabel(k, v)
|
|
}
|
|
ms = append(ms, m)
|
|
}
|
|
return ms
|
|
}
|
|
|
|
// VMStorage represents vmstorage entity with ability to read and write metrics
|
|
type VMStorage struct {
|
|
c *http.Client
|
|
datasourceURL string
|
|
basicAuthUser string
|
|
basicAuthPass string
|
|
appendTypePrefix bool
|
|
lookBack time.Duration
|
|
queryStep time.Duration
|
|
roundDigits string
|
|
|
|
dataSourceType Type
|
|
evaluationInterval time.Duration
|
|
}
|
|
|
|
const queryPath = "/api/v1/query"
|
|
const graphitePath = "/render"
|
|
|
|
const prometheusPrefix = "/prometheus"
|
|
const graphitePrefix = "/graphite"
|
|
|
|
// QuerierParams params for Querier.
|
|
type QuerierParams struct {
|
|
DataSourceType *Type
|
|
EvaluationInterval time.Duration
|
|
}
|
|
|
|
// Clone makes clone of VMStorage, shares http client.
|
|
func (s *VMStorage) Clone() *VMStorage {
|
|
return &VMStorage{
|
|
c: s.c,
|
|
datasourceURL: s.datasourceURL,
|
|
basicAuthUser: s.basicAuthUser,
|
|
basicAuthPass: s.basicAuthPass,
|
|
lookBack: s.lookBack,
|
|
queryStep: s.queryStep,
|
|
appendTypePrefix: s.appendTypePrefix,
|
|
dataSourceType: s.dataSourceType,
|
|
}
|
|
}
|
|
|
|
// ApplyParams - changes given querier params.
|
|
func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage {
|
|
if params.DataSourceType != nil {
|
|
s.dataSourceType = *params.DataSourceType
|
|
}
|
|
s.evaluationInterval = params.EvaluationInterval
|
|
return s
|
|
}
|
|
|
|
// BuildWithParams - implements interface.
|
|
func (s *VMStorage) BuildWithParams(params QuerierParams) Querier {
|
|
return s.Clone().ApplyParams(params)
|
|
}
|
|
|
|
// NewVMStorage is a constructor for VMStorage
|
|
func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Duration, queryStep time.Duration, appendTypePrefix bool, c *http.Client) *VMStorage {
|
|
return &VMStorage{
|
|
c: c,
|
|
basicAuthUser: basicAuthUser,
|
|
basicAuthPass: basicAuthPass,
|
|
datasourceURL: strings.TrimSuffix(baseURL, "/"),
|
|
appendTypePrefix: appendTypePrefix,
|
|
lookBack: lookBack,
|
|
queryStep: queryStep,
|
|
dataSourceType: NewPrometheusType(),
|
|
}
|
|
}
|
|
|
|
// Query executes the given query and returns parsed response
|
|
func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) {
|
|
req, err := s.prepareReq(query, time.Now())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := s.do(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
_ = resp.Body.Close()
|
|
}()
|
|
|
|
parseFn := parsePrometheusResponse
|
|
if s.dataSourceType.name != prometheusType {
|
|
parseFn = parseGraphiteResponse
|
|
}
|
|
return parseFn(req, resp)
|
|
}
|
|
|
|
func (s *VMStorage) prepareReq(query string, timestamp time.Time) (*http.Request, error) {
|
|
req, err := http.NewRequest("POST", s.datasourceURL, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json; charset=utf-8")
|
|
if s.basicAuthPass != "" {
|
|
req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass)
|
|
}
|
|
|
|
switch s.dataSourceType.name {
|
|
case "", prometheusType:
|
|
s.setPrometheusReqParams(req, query, timestamp)
|
|
case graphiteType:
|
|
s.setGraphiteReqParams(req, query, timestamp)
|
|
default:
|
|
return nil, fmt.Errorf("engine not found: %q", s.dataSourceType.name)
|
|
}
|
|
return req, nil
|
|
}
|
|
|
|
func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, error) {
|
|
resp, err := s.c.Do(req.WithContext(ctx))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error getting response from %s: %w", req.URL, err)
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := ioutil.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
return nil, fmt.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string, timestamp time.Time) {
|
|
if s.appendTypePrefix {
|
|
r.URL.Path += prometheusPrefix
|
|
}
|
|
r.URL.Path += queryPath
|
|
q := r.URL.Query()
|
|
q.Set("query", query)
|
|
if s.lookBack > 0 {
|
|
timestamp = timestamp.Add(-s.lookBack)
|
|
}
|
|
if s.evaluationInterval > 0 {
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
|
|
timestamp = timestamp.Truncate(s.evaluationInterval)
|
|
// set step as evaluationInterval by default
|
|
q.Set("step", s.evaluationInterval.String())
|
|
}
|
|
q.Set("time", fmt.Sprintf("%d", timestamp.Unix()))
|
|
|
|
if s.queryStep > 0 {
|
|
// override step with user-specified value
|
|
q.Set("step", s.queryStep.String())
|
|
}
|
|
if s.roundDigits != "" {
|
|
q.Set("round_digits", s.roundDigits)
|
|
}
|
|
r.URL.RawQuery = q.Encode()
|
|
}
|
|
|
|
func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string, timestamp time.Time) {
|
|
if s.appendTypePrefix {
|
|
r.URL.Path += graphitePrefix
|
|
}
|
|
r.URL.Path += graphitePath
|
|
q := r.URL.Query()
|
|
q.Set("format", "json")
|
|
q.Set("target", query)
|
|
from := "-5min"
|
|
if s.lookBack > 0 {
|
|
lookBack := timestamp.Add(-s.lookBack)
|
|
from = strconv.FormatInt(lookBack.Unix(), 10)
|
|
}
|
|
q.Set("from", from)
|
|
q.Set("until", "now")
|
|
r.URL.RawQuery = q.Encode()
|
|
}
|
|
|
|
const (
|
|
statusSuccess, statusError, rtVector = "success", "error", "vector"
|
|
)
|
|
|
|
func parsePrometheusResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
|
|
r := &response{}
|
|
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
|
|
return nil, fmt.Errorf("error parsing prometheus metrics for %s: %w", req.URL, err)
|
|
}
|
|
if r.Status == statusError {
|
|
return nil, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL, r.ErrorType, r.Error)
|
|
}
|
|
if r.Status != statusSuccess {
|
|
return nil, fmt.Errorf("unknown status: %s, Expected success or error ", r.Status)
|
|
}
|
|
if r.Data.ResultType != rtVector {
|
|
return nil, fmt.Errorf("unknown result type:%s. Expected vector", r.Data.ResultType)
|
|
}
|
|
return r.metrics()
|
|
}
|
|
|
|
func parseGraphiteResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
|
|
r := &graphiteResponse{}
|
|
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
|
|
return nil, fmt.Errorf("error parsing graphite metrics for %s: %w", req.URL, err)
|
|
}
|
|
return r.metrics(), nil
|
|
}
|