mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
ed523b5bbc
This allows performing online de-duplication of incoming samples
197 lines
4.1 KiB
Go
197 lines
4.1 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// Deduplicator deduplicates samples per each time series.
|
|
type Deduplicator struct {
|
|
da *dedupAggr
|
|
lc promutils.LabelsCompressor
|
|
|
|
wg sync.WaitGroup
|
|
stopCh chan struct{}
|
|
|
|
ms *metrics.Set
|
|
}
|
|
|
|
// NewDeduplicator returns new deduplicator, which deduplicates samples per each time series.
|
|
//
|
|
// The de-duplicated samples are passed to pushFunc once per dedupInterval.
|
|
//
|
|
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
|
|
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration) *Deduplicator {
|
|
d := &Deduplicator{
|
|
da: newDedupAggr(),
|
|
stopCh: make(chan struct{}),
|
|
ms: metrics.NewSet(),
|
|
}
|
|
|
|
ms := d.ms
|
|
_ = ms.NewGauge(`vm_streamaggr_dedup_state_size_bytes`, func() float64 {
|
|
return float64(d.da.sizeBytes())
|
|
})
|
|
_ = ms.NewGauge(`vm_streamaggr_dedup_state_items_count`, func() float64 {
|
|
return float64(d.da.itemsCount())
|
|
})
|
|
|
|
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
|
|
return float64(d.lc.SizeBytes())
|
|
})
|
|
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 {
|
|
return float64(d.lc.ItemsCount())
|
|
})
|
|
metrics.RegisterSet(ms)
|
|
|
|
d.wg.Add(1)
|
|
go func() {
|
|
defer d.wg.Done()
|
|
d.runFlusher(pushFunc, dedupInterval)
|
|
}()
|
|
|
|
return d
|
|
}
|
|
|
|
// MustStop stops d.
|
|
func (d *Deduplicator) MustStop() {
|
|
metrics.UnregisterSet(d.ms)
|
|
d.ms = nil
|
|
|
|
close(d.stopCh)
|
|
d.wg.Wait()
|
|
}
|
|
|
|
// Push pushes tss to d.
|
|
func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
|
|
ctx := getDeduplicatorPushCtx()
|
|
pss := ctx.pss
|
|
buf := ctx.buf
|
|
|
|
for _, ts := range tss {
|
|
buf = d.lc.Compress(buf[:0], ts.Labels)
|
|
key := bytesutil.InternBytes(buf)
|
|
for _, s := range ts.Samples {
|
|
pss = append(pss, pushSample{
|
|
key: key,
|
|
value: s.Value,
|
|
})
|
|
}
|
|
}
|
|
|
|
d.da.pushSamples(pss)
|
|
|
|
ctx.pss = pss
|
|
ctx.buf = buf
|
|
putDeduplicatorPushCtx(ctx)
|
|
}
|
|
|
|
func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration) {
|
|
t := time.NewTicker(dedupInterval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-d.stopCh:
|
|
return
|
|
case <-t.C:
|
|
d.flush(pushFunc)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Deduplicator) flush(pushFunc PushFunc) {
|
|
timestamp := time.Now().UnixMilli()
|
|
d.da.flush(func(pss []pushSample) {
|
|
ctx := getDeduplicatorFlushCtx()
|
|
|
|
tss := ctx.tss
|
|
labels := ctx.labels
|
|
samples := ctx.samples
|
|
for _, ps := range pss {
|
|
labelsLen := len(labels)
|
|
labels = decompressLabels(labels, &d.lc, ps.key)
|
|
|
|
samplesLen := len(samples)
|
|
samples = append(samples, prompbmarshal.Sample{
|
|
Value: ps.value,
|
|
Timestamp: timestamp,
|
|
})
|
|
|
|
tss = append(tss, prompbmarshal.TimeSeries{
|
|
Labels: labels[labelsLen:],
|
|
Samples: samples[samplesLen:],
|
|
})
|
|
}
|
|
pushFunc(tss)
|
|
|
|
ctx.tss = tss
|
|
ctx.labels = labels
|
|
ctx.samples = samples
|
|
putDeduplicatorFlushCtx(ctx)
|
|
}, true)
|
|
}
|
|
|
|
type deduplicatorPushCtx struct {
|
|
pss []pushSample
|
|
buf []byte
|
|
}
|
|
|
|
func (ctx *deduplicatorPushCtx) reset() {
|
|
clear(ctx.pss)
|
|
ctx.pss = ctx.pss[:0]
|
|
|
|
ctx.buf = ctx.buf[:0]
|
|
}
|
|
|
|
func getDeduplicatorPushCtx() *deduplicatorPushCtx {
|
|
v := deduplicatorPushCtxPool.Get()
|
|
if v == nil {
|
|
return &deduplicatorPushCtx{}
|
|
}
|
|
return v.(*deduplicatorPushCtx)
|
|
}
|
|
|
|
func putDeduplicatorPushCtx(ctx *deduplicatorPushCtx) {
|
|
ctx.reset()
|
|
deduplicatorPushCtxPool.Put(ctx)
|
|
}
|
|
|
|
var deduplicatorPushCtxPool sync.Pool
|
|
|
|
type deduplicatorFlushCtx struct {
|
|
tss []prompbmarshal.TimeSeries
|
|
labels []prompbmarshal.Label
|
|
samples []prompbmarshal.Sample
|
|
}
|
|
|
|
func (ctx *deduplicatorFlushCtx) reset() {
|
|
clear(ctx.tss)
|
|
ctx.tss = ctx.tss[:0]
|
|
|
|
clear(ctx.labels)
|
|
ctx.labels = ctx.labels[:0]
|
|
|
|
clear(ctx.samples)
|
|
ctx.samples = ctx.samples[:0]
|
|
}
|
|
|
|
func getDeduplicatorFlushCtx() *deduplicatorFlushCtx {
|
|
v := deduplicatorFlushCtxPool.Get()
|
|
if v == nil {
|
|
return &deduplicatorFlushCtx{}
|
|
}
|
|
return v.(*deduplicatorFlushCtx)
|
|
}
|
|
|
|
func putDeduplicatorFlushCtx(ctx *deduplicatorFlushCtx) {
|
|
ctx.reset()
|
|
deduplicatorFlushCtxPool.Put(ctx)
|
|
}
|
|
|
|
var deduplicatorFlushCtxPool sync.Pool
|