mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
app/vmselect/promql: add histogram_over_time(m[d])
rollup function
This commit is contained in:
parent
3e09d38f29
commit
a8360d04c0
@ -491,6 +491,13 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *
|
||||
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
|
||||
preFunc(values, timestamps)
|
||||
for _, rc := range rcs {
|
||||
if tsm := newTimeseriesMap(name, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
|
||||
rc.DoTimeseriesMap(tsm, values, timestamps)
|
||||
tssLock.Lock()
|
||||
tss = tsm.AppendTimeseriesTo(tss)
|
||||
tssLock.Unlock()
|
||||
continue
|
||||
}
|
||||
var ts timeseries
|
||||
doRollupForTimeseries(rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps, removeMetricGroup)
|
||||
tssLock.Lock()
|
||||
@ -638,9 +645,9 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
|
||||
removeMetricGroup := !rollupFuncsKeepMetricGroup[name]
|
||||
var tss []*timeseries
|
||||
if iafc != nil {
|
||||
tss, err = evalRollupWithIncrementalAggregate(iafc, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup)
|
||||
tss, err = evalRollupWithIncrementalAggregate(name, iafc, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup)
|
||||
} else {
|
||||
tss, err = evalRollupNoIncrementalAggregate(rss, rcs, preFunc, sharedTimestamps, removeMetricGroup)
|
||||
tss, err = evalRollupNoIncrementalAggregate(name, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -662,13 +669,20 @@ func getRollupMemoryLimiter() *memoryLimiter {
|
||||
return &rollupMemoryLimiter
|
||||
}
|
||||
|
||||
func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
|
||||
func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
|
||||
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) {
|
||||
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
|
||||
preFunc(rs.Values, rs.Timestamps)
|
||||
ts := getTimeseries()
|
||||
defer putTimeseries(ts)
|
||||
for _, rc := range rcs {
|
||||
if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil {
|
||||
rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
|
||||
for _, ts := range tsm.m {
|
||||
iafc.updateTimeseries(ts, workerID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
ts.Reset()
|
||||
doRollupForTimeseries(rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup)
|
||||
iafc.updateTimeseries(ts, workerID)
|
||||
@ -685,13 +699,20 @@ func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *n
|
||||
return tss, nil
|
||||
}
|
||||
|
||||
func evalRollupNoIncrementalAggregate(rss *netstorage.Results, rcs []*rollupConfig,
|
||||
func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs []*rollupConfig,
|
||||
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) {
|
||||
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
|
||||
var tssLock sync.Mutex
|
||||
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
|
||||
preFunc(rs.Values, rs.Timestamps)
|
||||
for _, rc := range rcs {
|
||||
if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil {
|
||||
rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
|
||||
tssLock.Lock()
|
||||
tss = tsm.AppendTimeseriesTo(tss)
|
||||
tssLock.Unlock()
|
||||
continue
|
||||
}
|
||||
var ts timeseries
|
||||
doRollupForTimeseries(rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup)
|
||||
tssLock.Lock()
|
||||
|
@ -3083,6 +3083,120 @@ func TestExecSuccess(t *testing.T) {
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`histogram_over_time`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]))`
|
||||
r1 := netstorage.Result{
|
||||
Values: []float64{13, 14, 12, 8, 12, 13},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("vmrange"),
|
||||
Value: []byte("1.0e0...1.5e0"),
|
||||
},
|
||||
}
|
||||
r2 := netstorage.Result{
|
||||
Values: []float64{14, 15, 12, 13, 15, 11},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("vmrange"),
|
||||
Value: []byte("2.0e0...2.5e0"),
|
||||
},
|
||||
}
|
||||
r3 := netstorage.Result{
|
||||
Values: []float64{13, 11, 16, 19, 13, 16},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r3.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("vmrange"),
|
||||
Value: []byte("1.5e0...2.0e0"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1, r2, r3}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`sum(histogram_over_time) by (vmrange)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(sum(histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s])) by (vmrange))`
|
||||
r1 := netstorage.Result{
|
||||
Values: []float64{13, 14, 12, 8, 12, 13},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("vmrange"),
|
||||
Value: []byte("1.0e0...1.5e0"),
|
||||
},
|
||||
}
|
||||
r2 := netstorage.Result{
|
||||
Values: []float64{14, 15, 12, 13, 15, 11},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("vmrange"),
|
||||
Value: []byte("2.0e0...2.5e0"),
|
||||
},
|
||||
}
|
||||
r3 := netstorage.Result{
|
||||
Values: []float64{13, 11, 16, 19, 13, 16},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r3.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("vmrange"),
|
||||
Value: []byte("1.5e0...2.0e0"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1, r2, r3}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`sum(histogram_over_time)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sum(histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]))`
|
||||
r := netstorage.Result{
|
||||
Values: []float64{40, 40, 40, 40, 40, 40},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`topk_min(histogram_over_time)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `topk_min(1, histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]))`
|
||||
r := netstorage.Result{
|
||||
Values: []float64{14, 15, 12, 13, 15, 11},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("vmrange"),
|
||||
Value: []byte("2.0e0...2.5e0"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`share_gt_over_time`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `share_gt_over_time(rand(0)[200s:10s], 0.7)`
|
||||
|
@ -8,6 +8,8 @@ import (
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/valyala/histogram"
|
||||
)
|
||||
|
||||
@ -51,6 +53,7 @@ var rollupFuncs = map[string]newRollupFunc{
|
||||
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
|
||||
"share_le_over_time": newRollupShareLE,
|
||||
"share_gt_over_time": newRollupShareGT,
|
||||
"histogram_over_time": newRollupFuncOneArg(rollupHistogram),
|
||||
"rollup": newRollupFuncOneArg(rollupFake),
|
||||
"rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
|
||||
"rollup_deriv": newRollupFuncOneArg(rollupFake),
|
||||
@ -119,6 +122,8 @@ type rollupFuncArg struct {
|
||||
// Real previous value even if it is located too far from the current window.
|
||||
// It matches prevValue if prevValue is not nan.
|
||||
realPrevValue float64
|
||||
|
||||
tsm *timeseriesMap
|
||||
}
|
||||
|
||||
func (rfa *rollupFuncArg) reset() {
|
||||
@ -131,6 +136,7 @@ func (rfa *rollupFuncArg) reset() {
|
||||
rfa.step = 0
|
||||
rfa.scrapeInterval = 0
|
||||
rfa.realPrevValue = nan
|
||||
rfa.tsm = nil
|
||||
}
|
||||
|
||||
// rollupFunc must return rollup value for the given rfa.
|
||||
@ -169,6 +175,54 @@ var (
|
||||
// The maximum interval without previous rows.
|
||||
const maxSilenceInterval = 5 * 60 * 1000
|
||||
|
||||
type timeseriesMap struct {
|
||||
origin *timeseries
|
||||
labelName string
|
||||
h metrics.Histogram
|
||||
m map[string]*timeseries
|
||||
}
|
||||
|
||||
func newTimeseriesMap(funcName string, sharedTimestamps []int64, mnSrc *storage.MetricName) *timeseriesMap {
|
||||
if funcName != "histogram_over_time" {
|
||||
return nil
|
||||
}
|
||||
|
||||
values := make([]float64, len(sharedTimestamps))
|
||||
for i := range values {
|
||||
values[i] = nan
|
||||
}
|
||||
var origin timeseries
|
||||
origin.MetricName.CopyFrom(mnSrc)
|
||||
origin.MetricName.ResetMetricGroup()
|
||||
origin.Timestamps = sharedTimestamps
|
||||
origin.Values = values
|
||||
return ×eriesMap{
|
||||
origin: &origin,
|
||||
labelName: "vmrange",
|
||||
m: make(map[string]*timeseries),
|
||||
}
|
||||
}
|
||||
|
||||
func (tsm *timeseriesMap) AppendTimeseriesTo(dst []*timeseries) []*timeseries {
|
||||
for _, ts := range tsm.m {
|
||||
dst = append(dst, ts)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func (tsm *timeseriesMap) GetOrCreateTimeseries(labelValue string) *timeseries {
|
||||
ts := tsm.m[labelValue]
|
||||
if ts != nil {
|
||||
return ts
|
||||
}
|
||||
ts = ×eries{}
|
||||
ts.CopyFromShallowTimestamps(tsm.origin)
|
||||
ts.MetricName.RemoveTag(tsm.labelName)
|
||||
ts.MetricName.AddTag(tsm.labelName, labelValue)
|
||||
tsm.m[labelValue] = ts
|
||||
return ts
|
||||
}
|
||||
|
||||
// Do calculates rollups for the given timestamps and values, appends
|
||||
// them to dstValues and returns results.
|
||||
//
|
||||
@ -176,8 +230,19 @@ const maxSilenceInterval = 5 * 60 * 1000
|
||||
//
|
||||
// timestamps must cover time range [rc.Start - rc.Window - maxSilenceInterval ... rc.End + rc.Step].
|
||||
//
|
||||
// Cannot be called from concurrent goroutines.
|
||||
// Do cannot be called from concurrent goroutines.
|
||||
func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []int64) []float64 {
|
||||
return rc.doInternal(dstValues, nil, values, timestamps)
|
||||
}
|
||||
|
||||
// DoTimeseriesMap calculates rollups for the given timestamps and values and puts them to tsm.
|
||||
func (rc *rollupConfig) DoTimeseriesMap(tsm *timeseriesMap, values []float64, timestamps []int64) {
|
||||
ts := getTimeseries()
|
||||
ts.Values = rc.doInternal(ts.Values[:0], tsm, values, timestamps)
|
||||
putTimeseries(ts)
|
||||
}
|
||||
|
||||
func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, values []float64, timestamps []int64) []float64 {
|
||||
// Sanity checks.
|
||||
if rc.Step <= 0 {
|
||||
logger.Panicf("BUG: Step must be bigger than 0; got %d", rc.Step)
|
||||
@ -212,6 +277,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
|
||||
rfa.step = rc.Step
|
||||
rfa.scrapeInterval = scrapeInterval
|
||||
rfa.realPrevValue = nan
|
||||
rfa.tsm = tsm
|
||||
|
||||
i := 0
|
||||
j := 0
|
||||
@ -612,6 +678,21 @@ func newRollupQuantile(args []interface{}) (rollupFunc, error) {
|
||||
return rf, nil
|
||||
}
|
||||
|
||||
func rollupHistogram(rfa *rollupFuncArg) float64 {
|
||||
values := rfa.values
|
||||
tsm := rfa.tsm
|
||||
tsm.h.Reset()
|
||||
for _, v := range values {
|
||||
tsm.h.Update(v)
|
||||
}
|
||||
idx := rfa.idx
|
||||
tsm.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
|
||||
ts := tsm.GetOrCreateTimeseries(vmrange)
|
||||
ts.Values[idx] = float64(count)
|
||||
})
|
||||
return nan
|
||||
}
|
||||
|
||||
func rollupAvg(rfa *rollupFuncArg) float64 {
|
||||
// Do not use `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation,
|
||||
// since it is slower and has no significant benefits in precision.
|
||||
|
@ -79,6 +79,9 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
|
||||
- `increases_over_time(m[d])` and `decreases_over_time(m[d])` - returns the number of `m` increases or decreases over the given duration `d`.
|
||||
- `prometheus_buckets(q)` - converts [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) buckets to Prometheus buckets with `le` labels.
|
||||
- `histogram(q)` - calculates aggregate histogram over `q` time series for each point on the graph. See [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for more details.
|
||||
- `histogram_over_time(m[d])` - calculates [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) for `m` over `d`.
|
||||
For example, the following query calculates median temperature by country over the last 24 hours:
|
||||
`histogram_quantile(0.5, sum(histogram_over_time(temperature[24h])) by (vmbucket, country))`.
|
||||
- `topk_*` and `bottomk_*` aggregate functions, which return up to K time series. Note that the standard `topk` function may return more than K time series -
|
||||
see [this article](https://www.robustperception.io/graph-top-n-time-series-in-grafana) for details.
|
||||
- `topk_min(k, q)` - returns top K time series with the max minimums on the given time range
|
||||
|
@ -43,6 +43,7 @@ var rollupFuncs = map[string]bool{
|
||||
"scrape_interval": true,
|
||||
"share_le_over_time": true,
|
||||
"share_gt_over_time": true,
|
||||
"histogram_over_time": true,
|
||||
"rollup": true,
|
||||
"rollup_rate": true,
|
||||
"rollup_deriv": true,
|
||||
|
Loading…
Reference in New Issue
Block a user