VictoriaMetrics/app/vmalert/datasource/vm_prom_api.go
Roman Khavronenko 31ca22109e
vmalert: fix replay step param (#3428)
The recent change in modifying default value
of `datasource.queryStep` flag resulted in situation
where replay mode was always running queries with
step=`datasource.queryStep`. When it should always
use rule's evaluation interval.

The fix is related not to replay mode only, but
for all Range requests. Now step param is set
individually for each mode.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

Signed-off-by: hagen1778 <roman@victoriametrics.com>
2022-12-02 19:09:30 -08:00

198 lines
5.8 KiB
Go

package datasource
import (
"encoding/json"
"flag"
"fmt"
"net/http"
"strconv"
"time"
)
var (
disablePathAppend = flag.Bool("remoteRead.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/query' path "+
"to the configured -datasource.url and -remoteRead.url")
)
type promResponse struct {
Status string `json:"status"`
ErrorType string `json:"errorType"`
Error string `json:"error"`
Data struct {
ResultType string `json:"resultType"`
Result json.RawMessage `json:"result"`
} `json:"data"`
}
type promInstant struct {
Result []struct {
Labels map[string]string `json:"metric"`
TV [2]interface{} `json:"value"`
} `json:"result"`
}
func (r promInstant) metrics() ([]Metric, error) {
var result []Metric
for i, res := range r.Result {
f, err := strconv.ParseFloat(res.TV[1].(string), 64)
if err != nil {
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, res.TV[1], err)
}
var m Metric
for k, v := range r.Result[i].Labels {
m.AddLabel(k, v)
}
m.Timestamps = append(m.Timestamps, int64(res.TV[0].(float64)))
m.Values = append(m.Values, f)
result = append(result, m)
}
return result, nil
}
type promRange struct {
Result []struct {
Labels map[string]string `json:"metric"`
TVs [][2]interface{} `json:"values"`
} `json:"result"`
}
func (r promRange) metrics() ([]Metric, error) {
var result []Metric
for i, res := range r.Result {
var m Metric
for _, tv := range res.TVs {
f, err := strconv.ParseFloat(tv[1].(string), 64)
if err != nil {
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, tv[1], err)
}
m.Values = append(m.Values, f)
m.Timestamps = append(m.Timestamps, int64(tv[0].(float64)))
}
if len(m.Values) < 1 || len(m.Timestamps) < 1 {
return nil, fmt.Errorf("metric %v contains no values", res)
}
m.Labels = nil
for k, v := range r.Result[i].Labels {
m.AddLabel(k, v)
}
result = append(result, m)
}
return result, nil
}
type promScalar [2]interface{}
func (r promScalar) metrics() ([]Metric, error) {
var m Metric
f, err := strconv.ParseFloat(r[1].(string), 64)
if err != nil {
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", r, r[1], err)
}
m.Values = append(m.Values, f)
m.Timestamps = append(m.Timestamps, int64(r[0].(float64)))
return []Metric{m}, nil
}
const (
statusSuccess, statusError = "success", "error"
rtVector, rtMatrix, rScalar = "vector", "matrix", "scalar"
)
func parsePrometheusResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
r := &promResponse{}
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
return nil, fmt.Errorf("error parsing prometheus metrics for %s: %w", req.URL.Redacted(), err)
}
if r.Status == statusError {
return nil, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL.Redacted(), r.ErrorType, r.Error)
}
if r.Status != statusSuccess {
return nil, fmt.Errorf("unknown status: %s, Expected success or error ", r.Status)
}
switch r.Data.ResultType {
case rtVector:
var pi promInstant
if err := json.Unmarshal(r.Data.Result, &pi.Result); err != nil {
return nil, fmt.Errorf("umarshal err %s; \n %#v", err, string(r.Data.Result))
}
return pi.metrics()
case rtMatrix:
var pr promRange
if err := json.Unmarshal(r.Data.Result, &pr.Result); err != nil {
return nil, err
}
return pr.metrics()
case rScalar:
var ps promScalar
if err := json.Unmarshal(r.Data.Result, &ps); err != nil {
return nil, err
}
return ps.metrics()
default:
return nil, fmt.Errorf("unknown result type %q", r.Data.ResultType)
}
}
func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) {
if s.appendTypePrefix {
r.URL.Path += "/prometheus"
}
if !*disablePathAppend {
r.URL.Path += "/api/v1/query"
}
q := r.URL.Query()
if s.lookBack > 0 {
timestamp = timestamp.Add(-s.lookBack)
}
if *queryTimeAlignment && s.evaluationInterval > 0 {
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
timestamp = timestamp.Truncate(s.evaluationInterval)
}
q.Set("time", fmt.Sprintf("%d", timestamp.Unix()))
if s.evaluationInterval > 0 { // set step as evaluationInterval by default
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
}
if s.queryStep > 0 { // override step with user-specified value
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.queryStep.Seconds())))
}
r.URL.RawQuery = q.Encode()
s.setPrometheusReqParams(r, query)
}
func (s *VMStorage) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) {
if s.appendTypePrefix {
r.URL.Path += "/prometheus"
}
if !*disablePathAppend {
r.URL.Path += "/api/v1/query_range"
}
q := r.URL.Query()
q.Add("start", fmt.Sprintf("%d", start.Unix()))
q.Add("end", fmt.Sprintf("%d", end.Unix()))
if s.evaluationInterval > 0 { // set step as evaluationInterval by default
// always convert to seconds to keep compatibility with older
// Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
}
r.URL.RawQuery = q.Encode()
s.setPrometheusReqParams(r, query)
}
func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) {
q := r.URL.Query()
for k, vs := range s.extraParams {
if q.Has(k) { // extraParams are prior to params in URL
q.Del(k)
}
for _, v := range vs {
q.Add(k, v)
}
}
q.Set("query", query)
r.URL.RawQuery = q.Encode()
}