mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-07 08:32:18 +01:00
79008b712f
VM has different responses to equivalent queries for MetricsQL and GraphiteQL in case of failed access to one of vmstorage node of the cluster vmstorage nodes. For GraphiteQL, the denyPartialResponse feature is not used, it is always true, which is not always correct (depending on the configuration). In the PR I have removed the hardcoded denyPartialResponse for GraphiteQL, just like MetricsQL does. - [x] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/).
214 lines
6.7 KiB
Go
214 lines
6.7 KiB
Go
package graphite
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphiteql"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
|
)
|
|
|
|
var maxGraphiteSeries = flag.Int("search.maxGraphiteSeries", 300e3, "The maximum number of time series, which can be scanned during queries to Graphite Render API. "+
|
|
"See https://docs.victoriametrics.com/#graphite-render-api-usage")
|
|
|
|
type evalConfig struct {
|
|
at *auth.Token
|
|
startTime int64
|
|
endTime int64
|
|
storageStep int64
|
|
denyPartialResponse bool
|
|
deadline searchutils.Deadline
|
|
|
|
currentTime time.Time
|
|
|
|
// xFilesFactor is used for determining when consolidateFunc must be applied.
|
|
//
|
|
// 0 means that consolidateFunc should be applied if at least a single non-NaN data point exists on the given step.
|
|
// 1 means that consolidateFunc should be applied if all the data points are non-NaN on the given step.
|
|
xFilesFactor float64
|
|
|
|
// Enforced tag filters
|
|
etfs [][]storage.TagFilter
|
|
|
|
// originalQuery contains the original query - used for debug logging.
|
|
originalQuery string
|
|
}
|
|
|
|
func (ec *evalConfig) pointsLen(step int64) int {
|
|
return int((ec.endTime - ec.startTime) / step)
|
|
}
|
|
|
|
func (ec *evalConfig) newTimestamps(step int64) []int64 {
|
|
pointsLen := ec.pointsLen(step)
|
|
timestamps := make([]int64, pointsLen)
|
|
ts := ec.startTime
|
|
for i := 0; i < pointsLen; i++ {
|
|
timestamps[i] = ts
|
|
ts += step
|
|
}
|
|
return timestamps
|
|
}
|
|
|
|
type series struct {
|
|
Name string
|
|
Tags map[string]string
|
|
Timestamps []int64
|
|
Values []float64
|
|
|
|
// holds current path expression like graphite does.
|
|
pathExpression string
|
|
|
|
expr graphiteql.Expr
|
|
|
|
// consolidateFunc is applied to raw samples in order to generate data points algined to the given step.
|
|
// see series.consolidate() function for details.
|
|
consolidateFunc aggrFunc
|
|
|
|
// xFilesFactor is used for determining when consolidateFunc must be applied.
|
|
//
|
|
// 0 means that consolidateFunc should be applied if at least a single non-NaN data point exists on the given step.
|
|
// 1 means that consolidateFunc should be applied if all the data points are non-NaN on the given step.
|
|
xFilesFactor float64
|
|
|
|
step int64
|
|
}
|
|
|
|
func (s *series) consolidate(ec *evalConfig, step int64) {
|
|
aggrFunc := s.consolidateFunc
|
|
if aggrFunc == nil {
|
|
aggrFunc = aggrAvg
|
|
}
|
|
xFilesFactor := s.xFilesFactor
|
|
if s.xFilesFactor <= 0 {
|
|
xFilesFactor = ec.xFilesFactor
|
|
}
|
|
s.summarize(aggrFunc, ec.startTime, ec.endTime, step, xFilesFactor)
|
|
}
|
|
|
|
func (s *series) summarize(aggrFunc aggrFunc, startTime, endTime, step int64, xFilesFactor float64) {
|
|
pointsLen := int((endTime - startTime) / step)
|
|
timestamps := s.Timestamps
|
|
values := s.Values
|
|
dstTimestamps := make([]int64, 0, pointsLen)
|
|
dstValues := make([]float64, 0, pointsLen)
|
|
ts := startTime
|
|
i := 0
|
|
for len(dstTimestamps) < pointsLen {
|
|
tsEnd := ts + step
|
|
j := i
|
|
for j < len(timestamps) && timestamps[j] < tsEnd {
|
|
j++
|
|
}
|
|
if i == j && i > 0 && ts-timestamps[i-1] <= 2000 {
|
|
// The current [ts ... tsEnd) interval has no samples,
|
|
// but the last sample on the previous interval [ts - step ... ts)
|
|
// is closer than 2 seconds to the current interval.
|
|
// Let's consider that this sample belongs to the current interval,
|
|
// since such discrepancy could appear because of small jitter in samples' ingestion.
|
|
i--
|
|
}
|
|
v := aggrFunc.apply(xFilesFactor, values[i:j])
|
|
dstTimestamps = append(dstTimestamps, ts)
|
|
dstValues = append(dstValues, v)
|
|
ts = tsEnd
|
|
i = j
|
|
}
|
|
// Do not reuse s.Timestamps and s.Values, since they can be too big
|
|
s.Timestamps = dstTimestamps
|
|
s.Values = dstValues
|
|
s.step = step
|
|
}
|
|
|
|
func execExpr(ec *evalConfig, query string) (nextSeriesFunc, error) {
|
|
expr, err := graphiteql.Parse(query)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse %q: %w", query, err)
|
|
}
|
|
return evalExpr(ec, expr)
|
|
}
|
|
|
|
func evalExpr(ec *evalConfig, expr graphiteql.Expr) (nextSeriesFunc, error) {
|
|
switch t := expr.(type) {
|
|
case *graphiteql.MetricExpr:
|
|
return evalMetricExpr(ec, t)
|
|
case *graphiteql.FuncExpr:
|
|
return evalFuncExpr(ec, t)
|
|
default:
|
|
return nil, fmt.Errorf("unexpected expression type %T; want graphiteql.MetricExpr or graphiteql.FuncExpr; expr: %q", t, t.AppendString(nil))
|
|
}
|
|
}
|
|
|
|
func evalMetricExpr(ec *evalConfig, me *graphiteql.MetricExpr) (nextSeriesFunc, error) {
|
|
tfs := []storage.TagFilter{{
|
|
Key: []byte("__graphite__"),
|
|
Value: []byte(me.Query),
|
|
}}
|
|
tfss := joinTagFilterss(tfs, ec.etfs)
|
|
sq := storage.NewSearchQuery(ec.at.AccountID, ec.at.ProjectID, ec.startTime, ec.endTime, tfss, *maxGraphiteSeries)
|
|
return newNextSeriesForSearchQuery(ec, sq, me)
|
|
}
|
|
|
|
func newNextSeriesForSearchQuery(ec *evalConfig, sq *storage.SearchQuery, expr graphiteql.Expr) (nextSeriesFunc, error) {
|
|
rss, _, err := netstorage.ProcessSearchQuery(nil, ec.denyPartialResponse, sq, ec.deadline)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot fetch data for %q: %w", sq, err)
|
|
}
|
|
seriesCh := make(chan *series, cgroup.AvailableCPUs())
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
err := rss.RunParallel(nil, func(rs *netstorage.Result, _ uint) error {
|
|
nameWithTags := getCanonicalPath(&rs.MetricName)
|
|
tags := unmarshalTags(nameWithTags)
|
|
s := &series{
|
|
Name: tags["name"],
|
|
Tags: tags,
|
|
Timestamps: append([]int64{}, rs.Timestamps...),
|
|
Values: append([]float64{}, rs.Values...),
|
|
expr: expr,
|
|
pathExpression: string(expr.AppendString(nil)),
|
|
}
|
|
s.summarize(aggrAvg, ec.startTime, ec.endTime, ec.storageStep, 0)
|
|
t := timerpool.Get(30 * time.Second)
|
|
select {
|
|
case seriesCh <- s:
|
|
case <-t.C:
|
|
logger.Errorf("resource leak when processing the %s (full query: %s); please report this error to VictoriaMetrics developers",
|
|
expr.AppendString(nil), ec.originalQuery)
|
|
}
|
|
timerpool.Put(t)
|
|
return nil
|
|
})
|
|
close(seriesCh)
|
|
errCh <- err
|
|
}()
|
|
f := func() (*series, error) {
|
|
s := <-seriesCh
|
|
if s != nil {
|
|
return s, nil
|
|
}
|
|
err := <-errCh
|
|
return nil, err
|
|
}
|
|
return f, nil
|
|
}
|
|
|
|
func evalFuncExpr(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
|
|
// Do not lowercase the fe.FuncName, since Graphite function names are case-sensitive.
|
|
tf := transformFuncs[fe.FuncName]
|
|
if tf == nil {
|
|
return nil, fmt.Errorf("unknown function %q", fe.FuncName)
|
|
}
|
|
nextSeries, err := tf(ec, fe)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot evaluate %s: %w", fe.AppendString(nil), err)
|
|
}
|
|
return nextSeries, nil
|
|
}
|