mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-05 14:22:15 +01:00
bc1f92d7f5
- Drop samples and return true from remotewrite.TryPush() at fast path when all the remote storage systems are configured with the disabled on-disk queue, every in-memory queue is full and -remoteWrite.dropSamplesOnOverload is set to true. This case is quite common, so it should be optimized. Previously additional CPU time was spent on per-remoteWriteCtx relabeling and other processing in this case. - Properly count the number of dropped samples inside remoteWriteCtx.pushInternalTrackDropped(). Previously dropped samples were counted only if -remoteWrite.dropSamplesOnOverload flag is set. In reality, the samples are dropped when they couldn't be sent to the queue because in-memory queue is full and on-disk queue is disabled. The remoteWriteCtx.pushInternalTrackDropped() function is called by streaming aggregation for pushing the aggregated data to the remote storage. Streaming aggregation cannot wait until the remote storage processes pending data, so it drops aggregated samples in this case. - Clarify the description for -remoteWrite.disableOnDiskQueue command-line flag at -help output, so it is clear that this flag can be set individually per each -remoteWrite.url. - Make the -remoteWrite.dropSamplesOnOverload flag global. If some of the remote storage systems are configured with the disabled on-disk queue, then there is no sense in keeping samples on some of these systems, while dropping samples on the remaining systems, since this will result in global stall on the remote storage system with the disabled on-disk queue and with the -remoteWrite.dropSamplesOnOverload=false flag. vmagent will always return false from remotewrite.TryPush() in this case. This will result in infinite duplicate samples written to the remaining remote storage systems. That's why the -remoteWrite.dropSamplesOnOverload is forcibly set to true if more than one -remoteWrite.disableOnDiskQueue flag is set. This allows proceeding with newly scraped / pushed samples by sending them to the remaining remote storage systems, while dropping them on overloaded systems with the -remoteWrite.disableOnDiskQueue flag set. - Verify that the remoteWriteCtx.TryPush() returns true in the TestRemoteWriteContext_TryPush_ImmutableTimeseries test. - Mention in vmagent docs that the -remoteWrite.disableOnDiskQueue command-line flag can be set individually per each -remoteWrite.url. See https://docs.victoriametrics.com/vmagent/#disabling-on-disk-persistence Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6248 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065
168 lines
4.1 KiB
Go
168 lines
4.1 KiB
Go
package remotewrite
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
func TestGetLabelsHash_Distribution(t *testing.T) {
|
|
f := func(bucketsCount int) {
|
|
t.Helper()
|
|
|
|
// Distribute itemsCount hashes returned by getLabelsHash() across bucketsCount buckets.
|
|
itemsCount := 1_000 * bucketsCount
|
|
m := make([]int, bucketsCount)
|
|
var labels []prompbmarshal.Label
|
|
for i := 0; i < itemsCount; i++ {
|
|
labels = append(labels[:0], prompbmarshal.Label{
|
|
Name: "__name__",
|
|
Value: fmt.Sprintf("some_name_%d", i),
|
|
})
|
|
for j := 0; j < 10; j++ {
|
|
labels = append(labels, prompbmarshal.Label{
|
|
Name: fmt.Sprintf("label_%d", j),
|
|
Value: fmt.Sprintf("value_%d_%d", i, j),
|
|
})
|
|
}
|
|
h := getLabelsHash(labels)
|
|
m[h%uint64(bucketsCount)]++
|
|
}
|
|
|
|
// Verify that the distribution is even
|
|
expectedItemsPerBucket := itemsCount / bucketsCount
|
|
for _, n := range m {
|
|
if math.Abs(1-float64(n)/float64(expectedItemsPerBucket)) > 0.04 {
|
|
t.Fatalf("unexpected items in the bucket for %d buckets; got %d; want around %d", bucketsCount, n, expectedItemsPerBucket)
|
|
}
|
|
}
|
|
}
|
|
|
|
f(2)
|
|
f(3)
|
|
f(4)
|
|
f(5)
|
|
f(10)
|
|
}
|
|
|
|
func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
|
|
f := func(streamAggrConfig, relabelConfig string, dedupInterval time.Duration, keepInput, dropInput bool, input string) {
|
|
t.Helper()
|
|
perURLRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig))
|
|
if err != nil {
|
|
t.Fatalf("cannot load relabel configs: %s", err)
|
|
}
|
|
rcs := &relabelConfigs{
|
|
perURL: []*promrelabel.ParsedConfigs{
|
|
perURLRelabel,
|
|
},
|
|
}
|
|
allRelabelConfigs.Store(rcs)
|
|
|
|
pss := make([]*pendingSeries, 1)
|
|
pss[0] = newPendingSeries(nil, true, 0, 100)
|
|
rwctx := &remoteWriteCtx{
|
|
idx: 0,
|
|
streamAggrKeepInput: keepInput,
|
|
streamAggrDropInput: dropInput,
|
|
pss: pss,
|
|
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(`foo`),
|
|
rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`),
|
|
}
|
|
if dedupInterval > 0 {
|
|
rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil, "global")
|
|
}
|
|
|
|
if len(streamAggrConfig) > 0 {
|
|
sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), nil, streamaggr.Options{})
|
|
if err != nil {
|
|
t.Fatalf("cannot load streamaggr configs: %s", err)
|
|
}
|
|
rwctx.sas.Store(sas)
|
|
}
|
|
|
|
offsetMsecs := time.Now().UnixMilli()
|
|
inputTss := prompbmarshal.MustParsePromMetrics(input, offsetMsecs)
|
|
expectedTss := make([]prompbmarshal.TimeSeries, len(inputTss))
|
|
|
|
// copy inputTss to make sure it is not mutated during TryPush call
|
|
copy(expectedTss, inputTss)
|
|
if !rwctx.TryPush(inputTss, false) {
|
|
t.Fatalf("cannot push samples to rwctx")
|
|
}
|
|
|
|
if !reflect.DeepEqual(expectedTss, inputTss) {
|
|
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", inputTss, expectedTss)
|
|
}
|
|
}
|
|
|
|
f(`
|
|
- interval: 1m
|
|
outputs: [sum_samples]
|
|
- interval: 2m
|
|
outputs: [count_series]
|
|
`, `
|
|
- action: keep
|
|
source_labels: [env]
|
|
regex: "dev"
|
|
`, 0, false, false, `
|
|
metric{env="dev"} 10
|
|
metric{env="bar"} 20
|
|
metric{env="dev"} 15
|
|
metric{env="bar"} 25
|
|
`)
|
|
f(``, ``, time.Hour, false, false, `
|
|
metric{env="dev"} 10
|
|
metric{env="foo"} 20
|
|
metric{env="dev"} 15
|
|
metric{env="foo"} 25
|
|
`)
|
|
f(``, `
|
|
- action: keep
|
|
source_labels: [env]
|
|
regex: "dev"
|
|
`, time.Hour, false, false, `
|
|
metric{env="dev"} 10
|
|
metric{env="bar"} 20
|
|
metric{env="dev"} 15
|
|
metric{env="bar"} 25
|
|
`)
|
|
f(``, `
|
|
- action: keep
|
|
source_labels: [env]
|
|
regex: "dev"
|
|
`, time.Hour, true, false, `
|
|
metric{env="test"} 10
|
|
metric{env="dev"} 20
|
|
metric{env="foo"} 15
|
|
metric{env="dev"} 25
|
|
`)
|
|
f(``, `
|
|
- action: keep
|
|
source_labels: [env]
|
|
regex: "dev"
|
|
`, time.Hour, false, true, `
|
|
metric{env="foo"} 10
|
|
metric{env="dev"} 20
|
|
metric{env="foo"} 15
|
|
metric{env="dev"} 25
|
|
`)
|
|
f(``, `
|
|
- action: keep
|
|
source_labels: [env]
|
|
regex: "dev"
|
|
`, time.Hour, true, true, `
|
|
metric{env="dev"} 10
|
|
metric{env="test"} 20
|
|
metric{env="dev"} 15
|
|
metric{env="bar"} 25
|
|
`)
|
|
}
|