mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-11 20:52:24 +01:00
295 lines
5.7 KiB
Go
295 lines
5.7 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"slices"
|
|
"strconv"
|
|
"unsafe"
|
|
|
|
"github.com/valyala/fastrand"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
type statsQuantile struct {
|
|
fields []string
|
|
|
|
phi float64
|
|
phiStr string
|
|
}
|
|
|
|
func (sq *statsQuantile) String() string {
|
|
s := "quantile(" + sq.phiStr
|
|
if len(sq.fields) > 0 {
|
|
s += ", " + fieldNamesString(sq.fields)
|
|
}
|
|
s += ")"
|
|
return s
|
|
}
|
|
|
|
func (sq *statsQuantile) updateNeededFields(neededFields fieldsSet) {
|
|
updateNeededFieldsForStatsFunc(neededFields, sq.fields)
|
|
}
|
|
|
|
func (sq *statsQuantile) newStatsProcessor() (statsProcessor, int) {
|
|
sqp := &statsQuantileProcessor{
|
|
sq: sq,
|
|
}
|
|
return sqp, int(unsafe.Sizeof(*sqp))
|
|
}
|
|
|
|
type statsQuantileProcessor struct {
|
|
sq *statsQuantile
|
|
|
|
h histogram
|
|
}
|
|
|
|
func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int {
|
|
stateSizeIncrease := 0
|
|
|
|
fields := sqp.sq.fields
|
|
if len(fields) == 0 {
|
|
for _, c := range br.getColumns() {
|
|
stateSizeIncrease += sqp.updateStateForColumn(br, c)
|
|
}
|
|
} else {
|
|
for _, field := range fields {
|
|
c := br.getColumnByName(field)
|
|
stateSizeIncrease += sqp.updateStateForColumn(br, c)
|
|
}
|
|
}
|
|
|
|
return stateSizeIncrease
|
|
}
|
|
|
|
func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
|
|
h := &sqp.h
|
|
stateSizeIncrease := 0
|
|
|
|
fields := sqp.sq.fields
|
|
if len(fields) == 0 {
|
|
for _, c := range br.getColumns() {
|
|
f, ok := c.getFloatValueAtRow(br, rowIdx)
|
|
if ok {
|
|
stateSizeIncrease += h.update(f)
|
|
}
|
|
}
|
|
} else {
|
|
for _, field := range fields {
|
|
c := br.getColumnByName(field)
|
|
f, ok := c.getFloatValueAtRow(br, rowIdx)
|
|
if ok {
|
|
stateSizeIncrease += h.update(f)
|
|
}
|
|
}
|
|
}
|
|
|
|
return stateSizeIncrease
|
|
}
|
|
|
|
func (sqp *statsQuantileProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) int {
|
|
h := &sqp.h
|
|
stateSizeIncrease := 0
|
|
|
|
if c.isConst {
|
|
f, ok := tryParseFloat64(c.valuesEncoded[0])
|
|
if ok {
|
|
for range br.timestamps {
|
|
stateSizeIncrease += h.update(f)
|
|
}
|
|
}
|
|
return stateSizeIncrease
|
|
}
|
|
if c.isTime {
|
|
return 0
|
|
}
|
|
|
|
switch c.valueType {
|
|
case valueTypeString:
|
|
for _, v := range c.getValues(br) {
|
|
f, ok := tryParseFloat64(v)
|
|
if ok {
|
|
stateSizeIncrease += h.update(f)
|
|
}
|
|
}
|
|
case valueTypeDict:
|
|
dictValues := c.dictValues
|
|
a := encoding.GetFloat64s(len(dictValues))
|
|
for i, v := range dictValues {
|
|
f, ok := tryParseFloat64(v)
|
|
if !ok {
|
|
f = nan
|
|
}
|
|
a.A[i] = f
|
|
}
|
|
for _, v := range c.getValuesEncoded(br) {
|
|
idx := v[0]
|
|
f := a.A[idx]
|
|
if !math.IsNaN(f) {
|
|
h.update(f)
|
|
}
|
|
}
|
|
encoding.PutFloat64s(a)
|
|
case valueTypeUint8:
|
|
for _, v := range c.getValuesEncoded(br) {
|
|
n := unmarshalUint8(v)
|
|
h.update(float64(n))
|
|
}
|
|
case valueTypeUint16:
|
|
for _, v := range c.getValuesEncoded(br) {
|
|
n := unmarshalUint16(v)
|
|
h.update(float64(n))
|
|
}
|
|
case valueTypeUint32:
|
|
for _, v := range c.getValuesEncoded(br) {
|
|
n := unmarshalUint32(v)
|
|
h.update(float64(n))
|
|
}
|
|
case valueTypeUint64:
|
|
for _, v := range c.getValuesEncoded(br) {
|
|
n := unmarshalUint64(v)
|
|
h.update(float64(n))
|
|
}
|
|
case valueTypeFloat64:
|
|
for _, v := range c.getValuesEncoded(br) {
|
|
f := unmarshalFloat64(v)
|
|
if !math.IsNaN(f) {
|
|
h.update(f)
|
|
}
|
|
}
|
|
case valueTypeIPv4:
|
|
case valueTypeTimestampISO8601:
|
|
default:
|
|
logger.Panicf("BUG: unexpected valueType=%d", c.valueType)
|
|
}
|
|
|
|
return stateSizeIncrease
|
|
}
|
|
|
|
func (sqp *statsQuantileProcessor) mergeState(sfp statsProcessor) {
|
|
src := sfp.(*statsQuantileProcessor)
|
|
sqp.h.mergeState(&src.h)
|
|
}
|
|
|
|
func (sqp *statsQuantileProcessor) finalizeStats() string {
|
|
q := sqp.h.quantile(sqp.sq.phi)
|
|
return strconv.FormatFloat(q, 'f', -1, 64)
|
|
}
|
|
|
|
func parseStatsQuantile(lex *lexer) (*statsQuantile, error) {
|
|
if !lex.isKeyword("quantile") {
|
|
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "quantile")
|
|
}
|
|
lex.nextToken()
|
|
|
|
fields, err := parseFieldNamesInParens(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'quantile' args: %w", err)
|
|
}
|
|
if len(fields) < 1 {
|
|
return nil, fmt.Errorf("'quantile' must have at least phi arg")
|
|
}
|
|
|
|
// Parse phi
|
|
phiStr := fields[0]
|
|
phi, ok := tryParseFloat64(phiStr)
|
|
if !ok {
|
|
return nil, fmt.Errorf("phi arg in 'quantile' must be floating point number; got %q", phiStr)
|
|
}
|
|
if phi < 0 || phi > 1 {
|
|
return nil, fmt.Errorf("phi arg in 'quantile' must be in the range [0..1]; got %q", phiStr)
|
|
}
|
|
|
|
// Parse fields
|
|
fields = fields[1:]
|
|
if slices.Contains(fields, "*") {
|
|
fields = nil
|
|
}
|
|
|
|
sq := &statsQuantile{
|
|
fields: fields,
|
|
|
|
phi: phi,
|
|
phiStr: phiStr,
|
|
}
|
|
return sq, nil
|
|
}
|
|
|
|
type histogram struct {
|
|
a []float64
|
|
min float64
|
|
max float64
|
|
count uint64
|
|
|
|
rng fastrand.RNG
|
|
}
|
|
|
|
func (h *histogram) update(f float64) int {
|
|
if h.count == 0 || f < h.min {
|
|
h.min = f
|
|
}
|
|
if h.count == 0 || f > h.max {
|
|
h.max = f
|
|
}
|
|
|
|
h.count++
|
|
if len(h.a) < maxHistogramSamples {
|
|
h.a = append(h.a, f)
|
|
return int(unsafe.Sizeof(f))
|
|
}
|
|
|
|
if n := h.rng.Uint32n(uint32(h.count)); n < uint32(len(h.a)) {
|
|
h.a[n] = f
|
|
}
|
|
return 0
|
|
}
|
|
|
|
const maxHistogramSamples = 100_000
|
|
|
|
func (h *histogram) mergeState(src *histogram) {
|
|
if src.count == 0 {
|
|
// Nothing to merge
|
|
return
|
|
}
|
|
if h.count == 0 {
|
|
h.a = append(h.a, src.a...)
|
|
h.min = src.min
|
|
h.max = src.max
|
|
h.count = src.count
|
|
return
|
|
}
|
|
|
|
h.a = append(h.a, src.a...)
|
|
if src.min < h.min {
|
|
h.min = src.min
|
|
}
|
|
if src.max > h.max {
|
|
h.max = src.max
|
|
}
|
|
h.count += src.count
|
|
}
|
|
|
|
func (h *histogram) quantile(phi float64) float64 {
|
|
if len(h.a) == 0 {
|
|
return nan
|
|
}
|
|
if len(h.a) == 1 {
|
|
return h.a[0]
|
|
}
|
|
if phi <= 0 {
|
|
return h.min
|
|
}
|
|
if phi >= 1 {
|
|
return h.max
|
|
}
|
|
|
|
slices.Sort(h.a)
|
|
idx := int(phi * float64(len(h.a)))
|
|
if idx == len(h.a) {
|
|
return h.max
|
|
}
|
|
return h.a[idx]
|
|
}
|