adding AggregateSeriesLists graphite function (#5809)

* adding aggregate series list graphite function

adding also aliases for sum diff and multiply

* Adding tests for aggregateSeriesLists and aliases
This commit is contained in:
rbizos 2024-03-26 14:48:58 +01:00 committed by hagen1778
parent 51f5ac1929
commit 23ab865035
No known key found for this signature in database
GPG Key ID: 3BF75F3741CA9640
3 changed files with 307 additions and 27 deletions

View File

@ -3279,6 +3279,102 @@ func TestExecExprSuccess(t *testing.T) {
Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
},
})
f(`aggregateSeriesLists(
summarize(
time('foo.bar.baz',10),
'45s'
),
summarize(
time('bar.foo.bad',10),
'45s'
), 'sum')`, []*series{
{
Timestamps: []int64{120000, 165000},
Values: []float64{1170, 2000},
Name: `sumSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
},
})
f(`sumSeriesLists(
summarize(
time('foo.bar.baz',10),
'45s'
),
summarize(
time('bar.foo.bad',10),
'45s'
))`, []*series{
{
Timestamps: []int64{120000, 165000},
Values: []float64{1170, 2000},
Name: `sumSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
},
})
f(`aggregateSeriesLists(
summarize(
time('foo.bar.baz',10),
'45s'
),
summarize(
time('bar.foo.bad',10),
'45s'
), 'diff')`, []*series{
{
Timestamps: []int64{120000, 165000},
Values: []float64{0, 0},
Name: `diffSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
},
})
f(`diffSeriesLists(
summarize(
time('foo.bar.baz',10),
'45s'
),
summarize(
time('bar.foo.bad',10),
'45s'
))`, []*series{
{
Timestamps: []int64{120000, 165000},
Values: []float64{0, 0},
Name: `diffSeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
},
})
f(`aggregateSeriesLists(
summarize(
time('foo.bar.baz',10),
'45s'
),
summarize(
time('bar.foo.bad',10),
'45s'
), 'multiply')`, []*series{
{
Timestamps: []int64{120000, 165000},
Values: []float64{342225, 1e+06},
Name: `multiplySeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
},
})
f(`multiplySeriesLists(
summarize(
time('foo.bar.baz',10),
'45s'
),
summarize(
time('bar.foo.bad',10),
'45s'
))`, []*series{
{
Timestamps: []int64{120000, 165000},
Values: []float64{342225, 1e+06},
Name: `multiplySeries(summarize(foo.bar.baz,'45s','sum'),summarize(bar.foo.bad,'45s','sum'))`,
Tags: map[string]string{"name": "foo.bar.baz", "summarize": "45s", "summarizeFunction": "sum"},
},
})
f(`weightedAverage(
summarize(
group(

View File

@ -40,6 +40,52 @@
}
]
},
"aggregateSeriesLists": {
"name": "aggregateSeriesLists",
"function": "aggregateSeriesLists(seriesListFirstPos, seriesListSecondPos, func, xFilesFactor=None)",
"description": "Iterates over a two lists and aggregates using specified function list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length\n\nPosition of seriesList matters. For example using “sum” function aggregateSeriesLists(list1[0..n], list2[0..n], \"sum\") it would find sum for each member of the list list1[0] + list2[0], list1[1] + list2[1], list1[n] + list2[n].",
"module": "graphite.render.functions",
"group": "Combine",
"params": [
{
"name": "seriesListFirstPos",
"type": "seriesList",
"required": true
},
{
"name": "seriesListSecondPos",
"type": "seriesList",
"required": true
},
{
"name": "func",
"type": "aggFunc",
"required": true,
"options": [
"average",
"avg",
"avg_zero",
"count",
"current",
"diff",
"last",
"max",
"median",
"min",
"multiply",
"range",
"rangeOf",
"stddev",
"sum",
"total"
]
},
{
"name": "xFilesFactor",
"type": "float"
}
]
},
"aggregateWithWildcards": {
"name": "aggregateWithWildcards",
"function": "aggregateWithWildcards(seriesList, func, *positions)",
@ -211,6 +257,25 @@
}
]
},
"DiffSeriesLists": {
"name": "DiffSeriesLists",
"function": "DiffSeriesLists(seriesListFirstPos, seriesListSecondPos)",
"description": "Iterates over a two lists and subtracts series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length",
"module": "graphite.render.functions",
"group": "Combine",
"params": [
{
"name": "seriesListFirstPos",
"type": "seriesList",
"required": true
},
{
"name": "seriesListSecondPos",
"type": "seriesList",
"required": true
}
]
},
"divideSeries": {
"name": "divideSeries",
"function": "divideSeries(dividendSeriesList, divisorSeries)",
@ -529,6 +594,25 @@
}
]
},
"MultiplySeriesLists": {
"name": "MultiplySeriesLists",
"function": "MultiplySeriesLists(seriesListFirstPos, seriesListSecondPos)",
"description": "Iterates over a two lists and multiply series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length",
"module": "graphite.render.functions",
"group": "Combine",
"params": [
{
"name": "seriesListFirstPos",
"type": "seriesList",
"required": true
},
{
"name": "seriesListSecondPos",
"type": "seriesList",
"required": true
}
]
},
"multiplySeriesWithWildcards": {
"name": "multiplySeriesWithWildcards",
"function": "multiplySeriesWithWildcards(seriesList, *position)",
@ -715,6 +799,26 @@
}
]
},
"SumSeriesLists": {
"name": "SumSeriesLists",
"function": "sumSeriesLists(seriesListFirstPos, seriesListSecondPos)",
"description": "Iterates over a two lists and sums series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length",
"module": "graphite.render.functions",
"group": "Combine",
"params": [
{
"name": "seriesListFirstPos",
"type": "seriesList",
"required": true
},
{
"name": "seriesListSecondPos",
"type": "seriesList",
"required": true
}
]
},
"sumSeriesWithWildcards": {
"name": "sumSeriesWithWildcards",
"function": "sumSeriesWithWildcards(seriesList, *position)",
@ -728,7 +832,25 @@
"required": true
},
{
"name": "position",
"name": "position", "SumSeriesLists": {
"name": "SumSeriesLists",
"function": "sumSeriesLists(seriesListFirstPos, seriesListSecondPos)",
"description": "Iterates over a two lists and sums series lists 2 through n from series 1 list1[0] to list2[0], list1[1] to list2[1] and so on. The lists will need to be the same length",
"module": "graphite.render.functions",
"group": "Combine",
"params": [
{
"name": "seriesListFirstPos",
"type": "seriesList",
"required": true
},
{
"name": "seriesListSecondPos",
"type": "seriesList",
"required": true
}
]
},
"type": "node",
"multiple": true
}

View File

@ -36,6 +36,7 @@ func init() {
"add": transformAdd,
"aggregate": transformAggregate,
"aggregateLine": transformAggregateLine,
"aggregateSeriesLists": transformAggregateSeriesLists,
"aggregateWithWildcards": transformAggregateWithWildcards,
"alias": transformAlias,
"aliasByMetric": transformAliasByMetric,
@ -66,6 +67,7 @@ func init() {
"delay": transformDelay,
"derivative": transformDerivative,
"diffSeries": transformDiffSeries,
"diffSeriesLists": transformDiffSeriesLists,
"divideSeries": transformDivideSeries,
"divideSeriesLists": transformDivideSeriesLists,
"drawAsInfinite": transformDrawAsInfinite,
@ -125,6 +127,7 @@ func init() {
"movingSum": transformMovingSum,
"movingWindow": transformMovingWindow,
"multiplySeries": transformMultiplySeries,
"multiplySeriesLists": transformMultiplySeriesLists,
"multiplySeriesWithWildcards": transformMultiplySeriesWithWildcards,
"nPercentile": transformNPercentile,
"nonNegativeDerivative": transformNonNegativeDerivative,
@ -172,6 +175,7 @@ func init() {
"substr": transformSubstr,
"sum": transformSumSeries,
"sumSeries": transformSumSeries,
"sumSeriesLists": transformSumSeriesLists,
"sumSeriesWithWildcards": transformSumSeriesWithWildcards,
"summarize": transformSummarize,
"threshold": transformThreshold,
@ -1316,6 +1320,86 @@ func transformDivideSeries(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesF
return f, nil
}
func aggregateSeriesListsGeneric(ec *evalConfig, fe *graphiteql.FuncExpr, funcName string) (nextSeriesFunc, error) {
args := fe.Args
agg, err := getAggrFunc(funcName)
if err != nil {
return nil, err
}
nextSeriesFirst, err := evalSeriesList(ec, args, "seriesListFirstPos", 0)
if err != nil {
return nil, err
}
nextSeriesSecond, err := evalSeriesList(ec, args, "seriesListSecondPos", 1)
if err != nil {
return nil, err
}
return aggregateSeriesList(ec, fe, nextSeriesFirst, nextSeriesSecond, agg, funcName)
}
// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.aggregateSeriesLists
func transformAggregateSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
args := fe.Args
if len(args) != 3 && len(args) != 4 {
return nil, fmt.Errorf("unexpected number of args; got %d; want 3 or 4", len(args))
}
funcName, err := getString(args, "func", 2)
if err != nil {
return nil, err
}
return aggregateSeriesListsGeneric(ec, fe, funcName)
}
// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.sumSeriesLists
func transformSumSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
return aggregateSeriesListsGeneric(ec, fe, "sum")
}
// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.multiplySeriesLists
func transformMultiplySeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
return aggregateSeriesListsGeneric(ec, fe, "multiply")
}
// See https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.diffSeriesLists
func transformDiffSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
return aggregateSeriesListsGeneric(ec, fe, "diff")
}
func aggregateSeriesList(ec *evalConfig, fe *graphiteql.FuncExpr, nextSeriesFirst, nextSeriesSecond nextSeriesFunc, agg aggrFunc, funcName string) (nextSeriesFunc, error) {
ssFirst, stepFirst, err := fetchNormalizedSeries(ec, nextSeriesFirst, false)
if err != nil {
return nil, err
}
ssSecond, stepSecond, err := fetchNormalizedSeries(ec, nextSeriesSecond, false)
if err != nil {
return nil, err
}
if len(ssFirst) != len(ssSecond) {
return nil, fmt.Errorf("First and second lists must have equal number of series; got %d vs %d series", len(ssFirst), len(ssSecond))
}
if stepFirst != stepSecond {
return nil, fmt.Errorf("step mismatch for first and second: %d vs %d", stepFirst, stepSecond)
}
valuePair := make([]float64, 2)
for i, s := range ssFirst {
sSecond := ssSecond[i]
values := s.Values
secondValues := sSecond.Values
for j, v := range values {
valuePair[0], valuePair[1] = v, secondValues[j]
values[j] = agg(valuePair)
}
s.Name = fmt.Sprintf("%sSeries(%s,%s)", funcName, s.Name, sSecond.Name)
s.expr = fe
s.pathExpression = s.Name
}
return multiSeriesFunc(ssFirst), nil
}
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.divideSeriesLists
func transformDivideSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSeriesFunc, error) {
args := fe.Args
@ -1326,36 +1410,14 @@ func transformDivideSeriesLists(ec *evalConfig, fe *graphiteql.FuncExpr) (nextSe
if err != nil {
return nil, err
}
ssDividend, stepDivident, err := fetchNormalizedSeries(ec, nextDividend, false)
if err != nil {
return nil, err
}
nextDivisor, err := evalSeriesList(ec, args, "divisorSeriesList", 1)
if err != nil {
return nil, err
}
ssDivisor, stepDivisor, err := fetchNormalizedSeries(ec, nextDivisor, false)
if err != nil {
return nil, err
}
if len(ssDividend) != len(ssDivisor) {
return nil, fmt.Errorf("divident and divisor must have equal number of series; got %d vs %d series", len(ssDividend), len(ssDivisor))
}
if stepDivident != stepDivisor {
return nil, fmt.Errorf("step mismatch for divident and divisor: %d vs %d", stepDivident, stepDivisor)
}
for i, s := range ssDividend {
sDivisor := ssDivisor[i]
values := s.Values
divisorValues := sDivisor.Values
for j, v := range values {
values[j] = v / divisorValues[j]
}
s.Name = fmt.Sprintf("divideSeries(%s,%s)", s.Name, sDivisor.Name)
s.expr = fe
s.pathExpression = s.Name
}
return multiSeriesFunc(ssDividend), nil
return aggregateSeriesList(ec, fe, nextDividend, nextDivisor, func(values []float64) float64 {
return values[0] / values[1]
}, "divide")
}
// See https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.drawAsInfinite