mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-14 16:12:15 +01:00
{lib/streamaggr,vmagent/remotewrite}: breaking change for keepInput flag (#4575)
* {lib/streamaggr,vmagent/remotewrite}: breaking change for keepInput flag Changes default behaviour of keepInput flag to write series which did not match any aggregators to the remote write. See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243 Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * Update app/vmagent/remotewrite/remotewrite.go Co-authored-by: Roman Khavronenko <roman@victoriametrics.com> --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Roman Khavronenko <roman@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
9f440b1013
commit
470afac5ff
@ -620,7 +620,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
|
||||
// from affecting time series for other remoteWrite.url configs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
||||
v = tssRelabelPool.Get().(*[]prompbmarshal.TimeSeries)
|
||||
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
rowsCountBeforeRelabel := getRowsCount(tss)
|
||||
tss = rctx.applyRelabeling(tss, nil, pcs)
|
||||
@ -630,20 +630,41 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
|
||||
rowsCount := getRowsCount(tss)
|
||||
rwctx.rowsPushedAfterRelabel.Add(rowsCount)
|
||||
|
||||
// Apply stream aggregation if any
|
||||
sas := rwctx.sas.Load()
|
||||
sas.Push(tss)
|
||||
if sas == nil || rwctx.streamAggrKeepInput {
|
||||
// Push samples to the remote storage
|
||||
rwctx.pushInternal(tss)
|
||||
defer func() {
|
||||
// Return back relabeling contexts to the pool
|
||||
if rctx != nil {
|
||||
*v = prompbmarshal.ResetTimeSeries(tss)
|
||||
tssPool.Put(v)
|
||||
putRelabelCtx(rctx)
|
||||
}
|
||||
}
|
||||
|
||||
// Return back relabeling contexts to the pool
|
||||
if rctx != nil {
|
||||
*v = prompbmarshal.ResetTimeSeries(tss)
|
||||
tssRelabelPool.Put(v)
|
||||
putRelabelCtx(rctx)
|
||||
// Load stream aggregagation config
|
||||
sas := rwctx.sas.Load()
|
||||
|
||||
// Fast path, no need to track used series
|
||||
if sas == nil || rwctx.streamAggrKeepInput {
|
||||
// Apply stream aggregation to the input samples
|
||||
// it's safe to call sas.Push with sas == nil
|
||||
sas.Push(tss, nil)
|
||||
|
||||
// Push all samples to the remote storage
|
||||
rwctx.pushInternal(tss)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Track series which were used for stream aggregation.
|
||||
ut := streamaggr.NewTssUsageTracker(len(tss))
|
||||
sas.Push(tss, ut.Matched)
|
||||
|
||||
unmatchedSeries := tssPool.Get().(*[]prompbmarshal.TimeSeries)
|
||||
// Push only unmatched series to the remote storage
|
||||
*unmatchedSeries = ut.GetUnmatched(tss, *unmatchedSeries)
|
||||
rwctx.pushInternal(*unmatchedSeries)
|
||||
|
||||
*unmatchedSeries = prompbmarshal.ResetTimeSeries(*unmatchedSeries)
|
||||
tssPool.Put(unmatchedSeries)
|
||||
}
|
||||
|
||||
func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
|
||||
@ -682,7 +703,7 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() {
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
|
||||
}
|
||||
|
||||
var tssRelabelPool = &sync.Pool{
|
||||
var tssPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
a := []prompbmarshal.TimeSeries{}
|
||||
return &a
|
||||
|
@ -24,6 +24,10 @@ The following `tip` changes can be tested by building VictoriaMetrics components
|
||||
|
||||
## tip
|
||||
|
||||
**Update notes:** release contains breaking change to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) `-remoteWrite.streamAggr.keepInput` command-line flag.
|
||||
Default behaviour has changed to keep metrics which were not matched by any aggregation rule when `-remoteWrite.streamAggr.keepInput` is set to false (default value).
|
||||
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243) for details.
|
||||
|
||||
* SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html).
|
||||
* SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved).
|
||||
|
||||
|
@ -10,6 +10,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
@ -18,7 +20,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
var supportedOutputs = []string{
|
||||
@ -194,12 +195,12 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
|
||||
}
|
||||
|
||||
// Push pushes tss to a.
|
||||
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
|
||||
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
for _, aggr := range a.as {
|
||||
aggr.Push(tss)
|
||||
aggr.Push(tss, matched)
|
||||
}
|
||||
}
|
||||
|
||||
@ -449,7 +450,7 @@ func (a *aggregator) dedupFlush() {
|
||||
skipAggrSuffix: true,
|
||||
}
|
||||
a.dedupAggr.appendSeriesForFlush(ctx)
|
||||
a.push(ctx.tss, false)
|
||||
a.push(ctx.tss, nil)
|
||||
}
|
||||
|
||||
func (a *aggregator) flush() {
|
||||
@ -498,9 +499,9 @@ func (a *aggregator) MustStop() {
|
||||
}
|
||||
|
||||
// Push pushes tss to a.
|
||||
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
|
||||
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matched func(id int)) {
|
||||
if a.dedupAggr == nil {
|
||||
a.push(tss, true)
|
||||
a.push(tss, matched)
|
||||
return
|
||||
}
|
||||
|
||||
@ -509,20 +510,14 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
|
||||
pushSample := a.dedupAggr.pushSample
|
||||
inputKey := ""
|
||||
bb := bbPool.Get()
|
||||
labels := promutils.GetLabels()
|
||||
for _, ts := range tss {
|
||||
for idx, ts := range tss {
|
||||
if !a.match.Match(ts.Labels) {
|
||||
continue
|
||||
}
|
||||
labels.Labels = append(labels.Labels[:0], ts.Labels...)
|
||||
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
|
||||
if len(labels.Labels) == 0 {
|
||||
// The metric has been deleted by the relabeling
|
||||
continue
|
||||
if matched != nil {
|
||||
matched(idx)
|
||||
}
|
||||
labels.Sort()
|
||||
|
||||
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
|
||||
bb.B = marshalLabelsFast(bb.B[:0], ts.Labels)
|
||||
outputKey := bytesutil.InternBytes(bb.B)
|
||||
for _, sample := range ts.Samples {
|
||||
pushSample(inputKey, outputKey, sample.Value)
|
||||
@ -532,14 +527,17 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
|
||||
bbPool.Put(bb)
|
||||
}
|
||||
|
||||
func (a *aggregator) push(tss []prompbmarshal.TimeSeries, applyFilters bool) {
|
||||
func (a *aggregator) push(tss []prompbmarshal.TimeSeries, tracker func(id int)) {
|
||||
labels := promutils.GetLabels()
|
||||
tmpLabels := promutils.GetLabels()
|
||||
bb := bbPool.Get()
|
||||
for _, ts := range tss {
|
||||
if applyFilters && !a.match.Match(ts.Labels) {
|
||||
for idx, ts := range tss {
|
||||
if !a.match.Match(ts.Labels) {
|
||||
continue
|
||||
}
|
||||
if tracker != nil {
|
||||
tracker(idx)
|
||||
}
|
||||
labels.Labels = append(labels.Labels[:0], ts.Labels...)
|
||||
if applyFilters {
|
||||
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
|
||||
@ -804,3 +802,30 @@ func sortAndRemoveDuplicates(a []string) []string {
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// TssUsageTracker tracks used series for streaming aggregation.
|
||||
type TssUsageTracker struct {
|
||||
usedSeries map[int]struct{}
|
||||
}
|
||||
|
||||
// NewTssUsageTracker returns new TssUsageTracker.
|
||||
func NewTssUsageTracker(totalSeries int) *TssUsageTracker {
|
||||
return &TssUsageTracker{usedSeries: make(map[int]struct{}, totalSeries)}
|
||||
}
|
||||
|
||||
// Matched marks series with id as used.
|
||||
// Not safe for concurrent use. The caller must
|
||||
// ensure that there are no concurrent calls to Matched.
|
||||
func (tut *TssUsageTracker) Matched(id int) {
|
||||
tut.usedSeries[id] = struct{}{}
|
||||
}
|
||||
|
||||
// GetUnmatched returns unused series from tss.
|
||||
func (tut *TssUsageTracker) GetUnmatched(tss, dst []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries {
|
||||
for k := range tss {
|
||||
if _, ok := tut.usedSeries[k]; !ok {
|
||||
dst = append(dst, tss[k])
|
||||
}
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
@ -183,7 +183,7 @@ func TestAggregatorsSuccess(t *testing.T) {
|
||||
|
||||
// Push the inputMetrics to Aggregators
|
||||
tssInput := mustParsePromMetrics(inputMetrics)
|
||||
a.Push(tssInput)
|
||||
a.Push(tssInput, nil)
|
||||
a.MustStop()
|
||||
|
||||
// Verify the tssOutput contains the expected metrics
|
||||
@ -703,7 +703,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
|
||||
|
||||
// Push the inputMetrics to Aggregators
|
||||
tssInput := mustParsePromMetrics(inputMetrics)
|
||||
a.Push(tssInput)
|
||||
a.Push(tssInput, nil)
|
||||
if a != nil {
|
||||
for _, aggr := range a.as {
|
||||
aggr.dedupFlush()
|
||||
|
@ -37,8 +37,13 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
|
||||
without: [job]
|
||||
outputs: [%q]
|
||||
`, output)
|
||||
i := 0
|
||||
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
||||
panic(fmt.Errorf("unexpected pushFunc call"))
|
||||
i++
|
||||
if i > 1 {
|
||||
// pushFunc is expected to be called exactly once at MustStop
|
||||
panic(fmt.Errorf("unexpected pushFunc call"))
|
||||
}
|
||||
}
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
||||
if err != nil {
|
||||
@ -50,16 +55,78 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
|
||||
b.SetBytes(int64(len(benchSeries)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
a.Push(benchSeries)
|
||||
a.Push(benchSeries, nil)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func newBenchSeries(seriesCount, samplesPerSeries int) []prompbmarshal.TimeSeries {
|
||||
func BenchmarkAggregatorsPushWithSeriesTracker(b *testing.B) {
|
||||
config := fmt.Sprintf(`
|
||||
- match: http_requests_total
|
||||
interval: 24h
|
||||
without: [job]
|
||||
outputs: [%q]
|
||||
`, "total")
|
||||
i := 0
|
||||
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
||||
i++
|
||||
if i > 1 {
|
||||
// pushFunc is expected to be called exactly once at MustStop
|
||||
panic(fmt.Errorf("unexpected pushFunc call"))
|
||||
}
|
||||
}
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error when initializing aggregators: %s", err)
|
||||
}
|
||||
defer a.MustStop()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
series []prompbmarshal.TimeSeries
|
||||
}{
|
||||
{
|
||||
name: "all matches",
|
||||
series: benchSeries,
|
||||
},
|
||||
{
|
||||
name: "no matches",
|
||||
series: benchSeriesWithRandomNames100,
|
||||
},
|
||||
{
|
||||
name: "50% matches",
|
||||
series: benchSeriesWithRandomNames50,
|
||||
},
|
||||
{
|
||||
name: "10% matches",
|
||||
series: benchSeriesWithRandomNames10,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
b.Run(tt.name, func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(tt.series)))
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
ut := NewTssUsageTracker(len(tt.series))
|
||||
a.Push(tt.series, ut.Matched)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newBenchSeries(seriesCount, samplesPerSeries int, randomNameFraction float64) []prompbmarshal.TimeSeries {
|
||||
a := make([]string, seriesCount*samplesPerSeries)
|
||||
for i := 0; i < samplesPerSeries; i++ {
|
||||
for j := 0; j < seriesCount; j++ {
|
||||
s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo",instance="bar"} %d`, j, i*10)
|
||||
metricName := "http_requests_total"
|
||||
if randomNameFraction > 0 && j%int(1/randomNameFraction) == 0 {
|
||||
metricName = fmt.Sprintf("random_other_name_%d", j)
|
||||
}
|
||||
|
||||
s := fmt.Sprintf(`%s{path="/foo/%d",job="foo",instance="bar"} %d`, metricName, j, i*10)
|
||||
a = append(a, s)
|
||||
}
|
||||
}
|
||||
@ -70,4 +137,7 @@ func newBenchSeries(seriesCount, samplesPerSeries int) []prompbmarshal.TimeSerie
|
||||
const seriesCount = 10000
|
||||
const samplesPerSeries = 10
|
||||
|
||||
var benchSeries = newBenchSeries(seriesCount, samplesPerSeries)
|
||||
var benchSeries = newBenchSeries(seriesCount, samplesPerSeries, 0)
|
||||
var benchSeriesWithRandomNames10 = newBenchSeries(seriesCount, samplesPerSeries, 0.1)
|
||||
var benchSeriesWithRandomNames50 = newBenchSeries(seriesCount, samplesPerSeries, 0.5)
|
||||
var benchSeriesWithRandomNames100 = newBenchSeries(seriesCount, samplesPerSeries, 1)
|
||||
|
Loading…
Reference in New Issue
Block a user