VictoriaMetrics/lib/streamaggr/deduplicator.go
Aliaksandr Valialkin cbc637d1dd
app/vmagent/remotewrite: follow-up for f153f54d11
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
  This improves code maintainability a bit.

- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
  This prevents from potential resource leaks.

- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
  This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.

- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
  in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.

- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
  This label should simplify investigation of the exposed metrics.

- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
  it is hard to use these labels in query filters and aggregation functions.

- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
  This metric shows the number of samples generated by the corresponding streaming aggregation rule.
  This metric has been added in the commit 861852f262 .
  See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462

- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
  This metric has been added in the commit 861852f262 .
  See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462

- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
  which could modify the behaviour of the constructed streaming aggregator.
  Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.

- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
  This also allows passing nil instead of Options when default options are enough.

- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
  so they have consistent set of labels comparing to the rest of streaming aggregation metrics.

- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
  `aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
  `output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
  metrics. This is a follow-up for the commit 2eb1bc4f81 .
  See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604

- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
  figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
  in the commit 2eb1bc4f81 .

- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
  While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 20:25:36 +02:00

248 lines
5.8 KiB
Go

package streamaggr
import (
"fmt"
"slices"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
)
// Deduplicator deduplicates samples per each time series.
type Deduplicator struct {
da *dedupAggr
dropLabels []string
wg sync.WaitGroup
stopCh chan struct{}
ms *metrics.Set
dedupFlushDuration *metrics.Histogram
dedupFlushTimeouts *metrics.Counter
}
// NewDeduplicator returns new deduplicator, which deduplicates samples per each time series.
//
// The de-duplicated samples are passed to pushFunc once per dedupInterval.
//
// An optional dropLabels list may contain label names, which must be dropped before de-duplicating samples.
// Common case is to drop `replica`-like labels from samples received from HA datasources.
//
// alias is url label used in metrics exposed by the returned Deduplicator.
//
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),
dropLabels: dropLabels,
stopCh: make(chan struct{}),
ms: metrics.NewSet(),
}
ms := d.ms
metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.da.sizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.da.itemsCount())
})
d.dedupFlushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.dedupFlushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
metrics.RegisterSet(ms)
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.runFlusher(pushFunc, dedupInterval)
}()
return d
}
// MustStop stops d.
func (d *Deduplicator) MustStop() {
metrics.UnregisterSet(d.ms, true)
d.ms = nil
close(d.stopCh)
d.wg.Wait()
}
// Push pushes tss to d.
func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
ctx := getDeduplicatorPushCtx()
pss := ctx.pss
labels := &ctx.labels
buf := ctx.buf
dropLabels := d.dropLabels
for _, ts := range tss {
if len(dropLabels) > 0 {
labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels)
} else {
labels.Labels = append(labels.Labels[:0], ts.Labels...)
}
if len(labels.Labels) == 0 {
continue
}
labels.Sort()
bufLen := len(buf)
buf = lc.Compress(buf, labels.Labels)
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
pss = append(pss, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
})
}
}
d.da.pushSamples(pss)
ctx.pss = pss
ctx.buf = buf
putDeduplicatorPushCtx(ctx)
}
func dropSeriesLabels(dst, src []prompbmarshal.Label, labelNames []string) []prompbmarshal.Label {
for _, label := range src {
if !slices.Contains(labelNames, label.Name) {
dst = append(dst, label)
}
}
return dst
}
func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration) {
t := time.NewTicker(dedupInterval)
defer t.Stop()
for {
select {
case <-d.stopCh:
return
case <-t.C:
d.flush(pushFunc, dedupInterval)
}
}
}
func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
startTime := time.Now()
timestamp := startTime.UnixMilli()
d.da.flush(func(pss []pushSample) {
ctx := getDeduplicatorFlushCtx()
tss := ctx.tss
labels := ctx.labels
samples := ctx.samples
for _, ps := range pss {
labelsLen := len(labels)
labels = decompressLabels(labels, ps.key)
samplesLen := len(samples)
samples = append(samples, prompbmarshal.Sample{
Value: ps.value,
Timestamp: timestamp,
})
tss = append(tss, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
pushFunc(tss)
ctx.tss = tss
ctx.labels = labels
ctx.samples = samples
putDeduplicatorFlushCtx(ctx)
})
duration := time.Since(startTime)
d.dedupFlushDuration.Update(duration.Seconds())
if duration > dedupInterval {
d.dedupFlushTimeouts.Inc()
logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+
"possible solutions: increase dedupInterval; reduce samples' ingestion rate", dedupInterval, duration.Seconds())
}
}
type deduplicatorPushCtx struct {
pss []pushSample
labels promutils.Labels
buf []byte
}
func (ctx *deduplicatorPushCtx) reset() {
clear(ctx.pss)
ctx.pss = ctx.pss[:0]
ctx.labels.Reset()
ctx.buf = ctx.buf[:0]
}
func getDeduplicatorPushCtx() *deduplicatorPushCtx {
v := deduplicatorPushCtxPool.Get()
if v == nil {
return &deduplicatorPushCtx{}
}
return v.(*deduplicatorPushCtx)
}
func putDeduplicatorPushCtx(ctx *deduplicatorPushCtx) {
ctx.reset()
deduplicatorPushCtxPool.Put(ctx)
}
var deduplicatorPushCtxPool sync.Pool
type deduplicatorFlushCtx struct {
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
}
func (ctx *deduplicatorFlushCtx) reset() {
clear(ctx.tss)
ctx.tss = ctx.tss[:0]
clear(ctx.labels)
ctx.labels = ctx.labels[:0]
clear(ctx.samples)
ctx.samples = ctx.samples[:0]
}
func getDeduplicatorFlushCtx() *deduplicatorFlushCtx {
v := deduplicatorFlushCtxPool.Get()
if v == nil {
return &deduplicatorFlushCtx{}
}
return v.(*deduplicatorFlushCtx)
}
func putDeduplicatorFlushCtx(ctx *deduplicatorFlushCtx) {
ctx.reset()
deduplicatorFlushCtxPool.Put(ctx)
}
var deduplicatorFlushCtxPool sync.Pool