VictoriaMetrics/app/vmselect/graphite/render_api.go
Anton L 79008b712f
app/vmselect/graphite: respect denyPartialResponse for graphite requests (#6748)
VM has different responses to equivalent queries for MetricsQL and
GraphiteQL in case of failed access to one of vmstorage node of the
cluster vmstorage nodes. For GraphiteQL, the denyPartialResponse feature
is not used, it is always true, which is not always correct (depending
on the configuration).

In the PR I have removed the hardcoded denyPartialResponse for
GraphiteQL, just like MetricsQL does.

- [x] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).
2024-08-07 12:34:23 +02:00

279 lines
8.5 KiB
Go

package graphite
import (
"flag"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/metrics"
)
var (
storageStep = flag.Duration("search.graphiteStorageStep", 10*time.Second, "The interval between datapoints stored in the database. "+
"It is used at Graphite Render API handler for normalizing the interval between datapoints in case it isn't normalized. "+
"It can be overridden by sending 'storage_step' query arg to /render API or "+
"by sending the desired interval via 'Storage-Step' http header during querying /render API")
maxPointsPerSeries = flag.Int("search.graphiteMaxPointsPerSeries", 1e6, "The maximum number of points per series Graphite render API can return")
)
// RenderHandler implements /render endpoint from Graphite Render API.
//
// See https://graphite.readthedocs.io/en/stable/render_api.html
func RenderHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
deadline := searchutils.GetDeadlineForQuery(r, startTime)
format := r.FormValue("format")
if format != "json" {
return fmt.Errorf("unsupported format=%q; supported values: json", format)
}
xFilesFactor := float64(0)
if xff := r.FormValue("xFilesFactor"); len(xff) > 0 {
f, err := strconv.ParseFloat(xff, 64)
if err != nil {
return fmt.Errorf("cannot parse xFilesFactor=%q: %w", xff, err)
}
xFilesFactor = f
}
from := r.FormValue("from")
fromTime := startTime.UnixNano()/1e6 - 24*3600*1000
if len(from) != 0 {
fv, err := parseTime(startTime, from)
if err != nil {
return fmt.Errorf("cannot parse from=%q: %w", from, err)
}
fromTime = fv
}
until := r.FormValue("until")
untilTime := startTime.UnixNano() / 1e6
if len(until) != 0 {
uv, err := parseTime(startTime, until)
if err != nil {
return fmt.Errorf("cannot parse until=%q: %w", until, err)
}
untilTime = uv
}
storageStep, err := getStorageStep(r)
if err != nil {
return err
}
fromAlign := fromTime % storageStep
fromTime -= fromAlign
if fromAlign > 0 {
fromTime += storageStep
}
untilAlign := untilTime % storageStep
untilTime -= untilAlign
if untilAlign > 0 {
untilTime += storageStep
}
if untilTime < fromTime {
return fmt.Errorf("from=%s cannot exceed until=%s", from, until)
}
pointsPerSeries := (untilTime - fromTime) / storageStep
if pointsPerSeries > int64(*maxPointsPerSeries) {
return fmt.Errorf("too many points per series must be returned on the given [from=%s ... until=%s] time range and the given storageStep=%d: %d; "+
"either reduce the time range or increase -search.graphiteMaxPointsPerSeries=%d", from, until, storageStep, pointsPerSeries, *maxPointsPerSeries)
}
maxDataPoints := 0
if s := r.FormValue("maxDataPoints"); len(s) > 0 {
n, err := strconv.ParseFloat(s, 64)
if err != nil {
return fmt.Errorf("cannot parse maxDataPoints=%q: %w", maxDataPoints, err)
}
if n <= 0 {
return fmt.Errorf("maxDataPoints must be greater than 0; got %f", n)
}
maxDataPoints = int(n)
}
etfs, err := searchutils.GetExtraTagFilters(r)
if err != nil {
return fmt.Errorf("cannot setup tag filters: %w", err)
}
denyPartialResponse := httputils.GetDenyPartialResponse(r)
var nextSeriess []nextSeriesFunc
targets := r.Form["target"]
for _, target := range targets {
ec := &evalConfig{
at: at,
startTime: fromTime,
endTime: untilTime,
storageStep: storageStep,
denyPartialResponse: denyPartialResponse,
deadline: deadline,
currentTime: startTime,
xFilesFactor: xFilesFactor,
etfs: etfs,
originalQuery: target,
}
nextSeries, err := execExpr(ec, target)
if err != nil {
for _, f := range nextSeriess {
_, _ = drainAllSeries(f)
}
return fmt.Errorf("cannot eval target=%q: %w", target, err)
}
// do not use nextSeriesConcurrentWrapper here in order to preserve series order.
if maxDataPoints > 0 {
step := (ec.endTime - ec.startTime) / int64(maxDataPoints)
nextSeries = nextSeriesSerialWrapper(nextSeries, func(s *series) (*series, error) {
aggrFunc := s.consolidateFunc
if aggrFunc == nil {
aggrFunc = aggrAvg
}
xFilesFactor := s.xFilesFactor
if s.xFilesFactor <= 0 {
xFilesFactor = ec.xFilesFactor
}
if len(s.Values) > maxDataPoints {
s.summarize(aggrFunc, ec.startTime, ec.endTime, step, xFilesFactor)
}
return s, nil
})
}
nextSeriess = append(nextSeriess, nextSeries)
}
f := nextSeriesGroup(nextSeriess, nil)
jsonp := r.FormValue("jsonp")
contentType := getContentType(jsonp)
w.Header().Set("Content-Type", contentType)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteRenderJSONResponse(bw, f, jsonp)
if err := bw.Flush(); err != nil {
return err
}
renderDuration.UpdateDuration(startTime)
return nil
}
var renderDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/render"}`)
const msecsPerDay = 24 * 3600 * 1000
// parseTime parses Graphite time in s.
//
// If the time in s is relative, then it is relative to startTime.
func parseTime(startTime time.Time, s string) (int64, error) {
switch s {
case "now":
return startTime.UnixNano() / 1e6, nil
case "today":
ts := startTime.UnixNano() / 1e6
return ts - ts%msecsPerDay, nil
case "yesterday":
ts := startTime.UnixNano() / 1e6
return ts - (ts % msecsPerDay) - msecsPerDay, nil
}
// Attempt to parse RFC3339 (YYYY-MM-DDTHH:mm:SSZTZ:00)
if t, err := time.Parse(time.RFC3339, s); err == nil {
return t.UnixNano() / 1e6, nil
}
// Attempt to parse HH:MM_YYYYMMDD
if t, err := time.Parse("15:04_20060102", s); err == nil {
return t.UnixNano() / 1e6, nil
}
// Attempt to parse HH:MMYYYYMMDD
if t, err := time.Parse("15:0420060102", s); err == nil {
return t.UnixNano() / 1e6, nil
}
// Attempt to parse YYYYMMDD
if t, err := time.Parse("20060102", s); err == nil {
return t.UnixNano() / 1e6, nil
}
// Attempt to parse HH:MM YYYYMMDD
if t, err := time.Parse("15:04 20060102", s); err == nil {
return t.UnixNano() / 1e6, nil
}
// Attempt to parse YYYY-MM-DD
if t, err := time.Parse("2006-01-02", s); err == nil {
return t.UnixNano() / 1e6, nil
}
// Attempt to parse MM/DD/YY
if t, err := time.Parse("01/02/06", s); err == nil {
return t.UnixNano() / 1e6, nil
}
// Attempt to parse time as unix timestamp
if n, err := strconv.ParseInt(s, 10, 64); err == nil {
return n * 1000, nil
}
// Attempt to parse interval
if interval, err := parseInterval(s); err == nil {
return startTime.UnixNano()/1e6 + interval, nil
}
return 0, fmt.Errorf("unsupported time %q", s)
}
func parseInterval(s string) (int64, error) {
s = strings.TrimSpace(s)
prefix := s
var suffix string
for i := 0; i < len(s); i++ {
ch := s[i]
if ch != '-' && ch != '+' && ch != '.' && (ch < '0' || ch > '9') {
prefix = s[:i]
suffix = s[i:]
break
}
}
n, err := strconv.ParseFloat(prefix, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse interval %q: %w", s, err)
}
suffix = strings.TrimSpace(suffix)
if len(suffix) == 0 {
return 0, fmt.Errorf("missing suffix for interval %q; expecting s, min, h, d, w, mon or y suffix", s)
}
var m float64
switch {
case strings.HasPrefix(suffix, "ms"):
m = 1
case strings.HasPrefix(suffix, "s"):
m = 1000
case strings.HasPrefix(suffix, "mi"),
strings.HasPrefix(suffix, "m") && !strings.HasPrefix(suffix, "mo"):
m = 60 * 1000
case strings.HasPrefix(suffix, "h"):
m = 3600 * 1000
case strings.HasPrefix(suffix, "d"):
m = 24 * 3600 * 1000
case strings.HasPrefix(suffix, "w"):
m = 7 * 24 * 3600 * 1000
case strings.HasPrefix(suffix, "mo"):
m = 30 * 24 * 3600 * 1000
case strings.HasPrefix(suffix, "y"):
m = 365 * 24 * 3600 * 1000
default:
return 0, fmt.Errorf("unsupported interval %q", s)
}
return int64(n * m), nil
}
func getStorageStep(r *http.Request) (int64, error) {
s := r.FormValue("storage_step")
if len(s) == 0 {
s = r.Header.Get("Storage-Step")
}
if len(s) == 0 {
step := int64(storageStep.Seconds() * 1000)
if step <= 0 {
return 0, fmt.Errorf("the `-search.graphiteStorageStep` command-line flag value must be positive; got %s", storageStep.String())
}
return step, nil
}
step, err := parseInterval(s)
if err != nil {
return 0, fmt.Errorf("cannot parse datapoints interval %s: %w", s, err)
}
if step <= 0 {
return 0, fmt.Errorf("storage_step cannot be negative; got %s", s)
}
return step, nil
}