mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-14 16:12:15 +01:00
lib/promscrape: shard targets among cluster nodes after relabeling is applied
This guarantees that targets with the same set of labels go to the same vmagent node. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687#issuecomment-940629495
This commit is contained in:
parent
aeedfe2fe2
commit
c3a729d458
@ -7,6 +7,7 @@ sort: 15
|
||||
## tip
|
||||
|
||||
* FEATURE: vmagent: expose `-promscrape.config` contents at `/config` page as Prometheus does. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1695).
|
||||
* FEATURE: vmagent: shard targets among cluster nodes after the relabeling is applied. This should guarantee that targets with the same set of labels go to the same `vmagent` node in the cluster. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687).
|
||||
* FEATURE: add trigonometric functions, which are going to be added in [Prometheus 2.31](https://github.com/prometheus/prometheus/pull/9239): [acosh](https://docs.victoriametrics.com/MetricsQL.html#acosh), [asinh](https://docs.victoriametrics.com/MetricsQL.html#asinh), [atan](https://docs.victoriametrics.com/MetricsQL.html#atan), [atanh](https://docs.victoriametrics.com/MetricsQL.html#atanh), [cosh](https://docs.victoriametrics.com/MetricsQL.html#cosh), [deg](https://docs.victoriametrics.com/MetricsQL.html#deg), [rad](https://docs.victoriametrics.com/MetricsQL.html#rad), [sinh](https://docs.victoriametrics.com/MetricsQL.html#sinh), [tan](https://docs.victoriametrics.com/MetricsQL.html#tan), [tanh](https://docs.victoriametrics.com/MetricsQL.html#tanh). Also add `atan2` binary operator. See [this pull request](https://github.com/prometheus/prometheus/pull/9248).
|
||||
* FEATURE: consistently return the same set of time series from [limitk](https://docs.victoriametrics.com/MetricsQL.html#limitk) function. This improves the usability of periodically refreshed graphs.
|
||||
|
||||
|
@ -926,11 +926,14 @@ func (stc *StaticConfig) appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConf
|
||||
return dst
|
||||
}
|
||||
|
||||
func appendScrapeWorkKey(dst []byte, target string, extraLabels, metaLabels map[string]string) []byte {
|
||||
dst = append(dst, target...)
|
||||
dst = append(dst, ',')
|
||||
dst = appendSortedKeyValuePairs(dst, extraLabels)
|
||||
dst = appendSortedKeyValuePairs(dst, metaLabels)
|
||||
func appendScrapeWorkKey(dst []byte, labels []prompbmarshal.Label) []byte {
|
||||
for _, label := range labels {
|
||||
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
|
||||
dst = append(dst, label.Name...)
|
||||
dst = append(dst, '=')
|
||||
dst = append(dst, label.Value...)
|
||||
dst = append(dst, ',')
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
@ -955,37 +958,9 @@ func needSkipScrapeWork(key string, membersCount, replicasCount, memberNum int)
|
||||
return true
|
||||
}
|
||||
|
||||
func appendSortedKeyValuePairs(dst []byte, m map[string]string) []byte {
|
||||
keys := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
for _, k := range keys {
|
||||
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
|
||||
dst = append(dst, k...)
|
||||
dst = append(dst, '=')
|
||||
dst = append(dst, m[k]...)
|
||||
dst = append(dst, ',')
|
||||
}
|
||||
dst = append(dst, '\n')
|
||||
return dst
|
||||
}
|
||||
|
||||
var scrapeWorkKeyBufPool bytesutil.ByteBufferPool
|
||||
|
||||
func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels map[string]string) (*ScrapeWork, error) {
|
||||
// Verify whether the scrape work must be skipped because of `-promscrape.cluster.*` configs.
|
||||
if *clusterMembersCount > 1 {
|
||||
bb := scrapeWorkKeyBufPool.Get()
|
||||
bb.B = appendScrapeWorkKey(bb.B[:0], target, extraLabels, metaLabels)
|
||||
needSkip := needSkipScrapeWork(bytesutil.ToUnsafeString(bb.B), *clusterMembersCount, *clusterReplicationFactor, *clusterMemberNum)
|
||||
scrapeWorkKeyBufPool.Put(bb)
|
||||
if needSkip {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
labels := mergeLabels(swc, target, extraLabels, metaLabels)
|
||||
var originalLabels []prompbmarshal.Label
|
||||
if !*dropOriginalLabels {
|
||||
@ -1001,6 +976,19 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825 for details.
|
||||
labels = append([]prompbmarshal.Label{}, labels...)
|
||||
|
||||
// Verify whether the scrape work must be skipped because of `-promscrape.cluster.*` configs.
|
||||
// Perform the verification on labels after the relabeling in order to guarantee that targets with the same set of labels
|
||||
// go to the same vmagent shard.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687#issuecomment-940629495
|
||||
if *clusterMembersCount > 1 {
|
||||
bb := scrapeWorkKeyBufPool.Get()
|
||||
bb.B = appendScrapeWorkKey(bb.B[:0], labels)
|
||||
needSkip := needSkipScrapeWork(bytesutil.ToUnsafeString(bb.B), *clusterMembersCount, *clusterReplicationFactor, *clusterMemberNum)
|
||||
scrapeWorkKeyBufPool.Put(bb)
|
||||
if needSkip {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
if len(labels) == 0 {
|
||||
// Drop target without labels.
|
||||
droppedTargetsMap.Register(originalLabels)
|
||||
|
Loading…
Reference in New Issue
Block a user