app/vmagent/remotewrite: sort labels before sending the series to per-remoteWrite.url queues

This commit is contained in:
Aliaksandr Valialkin 2021-05-20 02:12:36 +03:00
parent eb8093ca6b
commit f7e73fbe8b
3 changed files with 16 additions and 52 deletions

View File

@ -128,7 +128,6 @@ func (wr *writeRequest) reset() {
}
func (wr *writeRequest) flush() {
sortLabelsIfNeeded(wr.tss)
wr.wr.Timeseries = wr.tss
wr.adjustSampleValues()
atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp())

View File

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/metrics"
xxhash "github.com/cespare/xxhash/v2"
)
@ -38,6 +39,10 @@ var (
"Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. "+
"By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. "+
"This option may be used for improving data compression for the stored metrics")
sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. `+
`This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. `+
`For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+
`Enabled sorting for labels can slow down ingestion performance a bit`)
)
var rwctxs []*remoteWriteCtx
@ -173,6 +178,7 @@ func Push(wr *prompbmarshal.WriteRequest) {
tssBlock = rctx.applyRelabeling(tssBlock, labelsGlobal, pcsGlobal)
globalRelabelMetricsDropped.Add(tssBlockLen - len(tssBlock))
}
sortLabelsIfNeeded(tssBlock)
for _, rwctx := range rwctxs {
rwctx.Push(tssBlock)
}
@ -185,6 +191,16 @@ func Push(wr *prompbmarshal.WriteRequest) {
}
}
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) {
if !*sortLabels {
return
}
for i := range tss {
promrelabel.SortLabels(tss[i].Labels)
}
}
var globalRelabelMetricsDropped = metrics.NewCounter("vmagent_remotewrite_global_relabel_metrics_dropped_total")
type remoteWriteCtx struct {

View File

@ -1,51 +0,0 @@
package remotewrite
import (
"flag"
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
var sortLabels = flag.Bool("sortLabels", false, `Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. `+
`This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. `+
`For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+
`Enabled sorting for labels can slow down ingestion performance a bit`)
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) {
if !*sortLabels {
return
}
// The slc is used for avoiding memory allocation when passing labels to sort.Sort.
slc := sortLabelsCtxPool.Get().(*sortLabelsCtx)
for i := range tss {
slc.labels = tss[i].Labels
sort.Sort(&slc.labels)
}
slc.labels = nil
sortLabelsCtxPool.Put(slc)
}
type sortLabelsCtx struct {
labels sortedLabels
}
var sortLabelsCtxPool = &sync.Pool{
New: func() interface{} {
return &sortLabelsCtx{}
},
}
type sortedLabels []prompbmarshal.Label
func (sl *sortedLabels) Len() int { return len(*sl) }
func (sl *sortedLabels) Less(i, j int) bool {
a := *sl
return a[i].Name < a[j].Name
}
func (sl *sortedLabels) Swap(i, j int) {
a := *sl
a[i], a[j] = a[j], a[i]
}