mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
opentelemetry: added cmd flag to sanitize metric names (#6035)
This commit is contained in:
parent
166b97b8d0
commit
47892b4a4c
@ -59,6 +59,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes.
|
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes.
|
||||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): split [explore phase](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics) in `vm-native` mode by time intervals when [--vm-native-step-interval](https://docs.victoriametrics.com/vmctl/#using-time-based-chunking-of-migration) is specified. This should reduce probability of exceeding complexity limits for number of selected series during explore phase. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5369).
|
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): split [explore phase](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics) in `vm-native` mode by time intervals when [--vm-native-step-interval](https://docs.victoriametrics.com/vmctl/#using-time-based-chunking-of-migration) is specified. This should reduce probability of exceeding complexity limits for number of selected series during explore phase. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5369).
|
||||||
* FEATURE: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): add support for `aggregateSeriesLists`, `diffSeriesLists`, `multiplySeriesLists` and `sumSeriesLists` functions. Thanks to @rbizos for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5809).
|
* FEATURE: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): add support for `aggregateSeriesLists`, `diffSeriesLists`, `multiplySeriesLists` and `sumSeriesLists` functions. Thanks to @rbizos for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5809).
|
||||||
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): added command line argument that enables OpenTelementry metric names and labels sanitization.
|
||||||
|
|
||||||
|
|
||||||
* BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue.
|
* BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue.
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970).
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970).
|
||||||
|
@ -1557,6 +1557,9 @@ VictoriaMetrics supports data ingestion via [OpenTelemetry protocol for metrics]
|
|||||||
VictoriaMetrics expects `protobuf`-encoded requests at `/opentelemetry/v1/metrics`.
|
VictoriaMetrics expects `protobuf`-encoded requests at `/opentelemetry/v1/metrics`.
|
||||||
Set HTTP request header `Content-Encoding: gzip` when sending gzip-compressed data to `/opentelemetry/v1/metrics`.
|
Set HTTP request header `Content-Encoding: gzip` when sending gzip-compressed data to `/opentelemetry/v1/metrics`.
|
||||||
|
|
||||||
|
VictoriaMetrics automatically does not sanitize metric names for the data ingested via OpenTelemetry protocol
|
||||||
|
If you need accepting metric and label names as is with sanitizing, then pass `-opentelemetry.sanitizeMetrics=true` command-line flag to VictoriaMetrics.
|
||||||
|
|
||||||
See [How to use OpenTelemetry metrics with VictoriaMetrics](https://docs.victoriametrics.com/guides/getting-started-with-opentelemetry/).
|
See [How to use OpenTelemetry metrics with VictoriaMetrics](https://docs.victoriametrics.com/guides/getting-started-with-opentelemetry/).
|
||||||
|
|
||||||
## JSON line format
|
## JSON line format
|
||||||
|
@ -663,6 +663,11 @@ func SanitizeLabelName(name string) string {
|
|||||||
return labelNameSanitizer.Transform(name)
|
return labelNameSanitizer.Transform(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SanitizeLabelNameParts returns label name slice generated from metric name divided by unsupported characters
|
||||||
|
func SanitizeLabelNameParts(name string) []string {
|
||||||
|
return unsupportedLabelNameChars.Split(name, -1)
|
||||||
|
}
|
||||||
|
|
||||||
var labelNameSanitizer = bytesutil.NewFastStringTransformer(func(s string) string {
|
var labelNameSanitizer = bytesutil.NewFastStringTransformer(func(s string) string {
|
||||||
return unsupportedLabelNameChars.ReplaceAllString(s, "_")
|
return unsupportedLabelNameChars.ReplaceAllString(s, "_")
|
||||||
})
|
})
|
||||||
|
@ -190,6 +190,7 @@ func (sm *ScopeMetrics) unmarshalProtobuf(src []byte) (err error) {
|
|||||||
// Metric represents the corresponding OTEL protobuf message
|
// Metric represents the corresponding OTEL protobuf message
|
||||||
type Metric struct {
|
type Metric struct {
|
||||||
Name string
|
Name string
|
||||||
|
Unit string
|
||||||
Gauge *Gauge
|
Gauge *Gauge
|
||||||
Sum *Sum
|
Sum *Sum
|
||||||
Histogram *Histogram
|
Histogram *Histogram
|
||||||
@ -198,6 +199,7 @@ type Metric struct {
|
|||||||
|
|
||||||
func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
mm.AppendString(1, m.Name)
|
mm.AppendString(1, m.Name)
|
||||||
|
mm.AppendString(3, m.Unit)
|
||||||
switch {
|
switch {
|
||||||
case m.Gauge != nil:
|
case m.Gauge != nil:
|
||||||
m.Gauge.marshalProtobuf(mm.AppendMessage(5))
|
m.Gauge.marshalProtobuf(mm.AppendMessage(5))
|
||||||
@ -213,6 +215,7 @@ func (m *Metric) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
|||||||
func (m *Metric) unmarshalProtobuf(src []byte) (err error) {
|
func (m *Metric) unmarshalProtobuf(src []byte) (err error) {
|
||||||
// message Metric {
|
// message Metric {
|
||||||
// string name = 1;
|
// string name = 1;
|
||||||
|
// string unit = 3;
|
||||||
// oneof data {
|
// oneof data {
|
||||||
// Gauge gauge = 5;
|
// Gauge gauge = 5;
|
||||||
// Sum sum = 7;
|
// Sum sum = 7;
|
||||||
@ -233,6 +236,12 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) {
|
|||||||
return fmt.Errorf("cannot read metric name")
|
return fmt.Errorf("cannot read metric name")
|
||||||
}
|
}
|
||||||
m.Name = strings.Clone(name)
|
m.Name = strings.Clone(name)
|
||||||
|
case 3:
|
||||||
|
unit, ok := fc.String()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read metric unit")
|
||||||
|
}
|
||||||
|
m.Unit = strings.Clone(unit)
|
||||||
case 5:
|
case 5:
|
||||||
data, ok := fc.MessageData()
|
data, ok := fc.MessageData()
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -617,6 +626,7 @@ func (ndp *NumberDataPoint) unmarshalProtobuf(src []byte) (err error) {
|
|||||||
type Sum struct {
|
type Sum struct {
|
||||||
DataPoints []*NumberDataPoint
|
DataPoints []*NumberDataPoint
|
||||||
AggregationTemporality AggregationTemporality
|
AggregationTemporality AggregationTemporality
|
||||||
|
IsMonotonic bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// AggregationTemporality represents the corresponding OTEL protobuf enum
|
// AggregationTemporality represents the corresponding OTEL protobuf enum
|
||||||
@ -636,6 +646,7 @@ func (s *Sum) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
|||||||
dp.marshalProtobuf(mm.AppendMessage(1))
|
dp.marshalProtobuf(mm.AppendMessage(1))
|
||||||
}
|
}
|
||||||
mm.AppendInt64(2, int64(s.AggregationTemporality))
|
mm.AppendInt64(2, int64(s.AggregationTemporality))
|
||||||
|
mm.AppendBool(3, s.IsMonotonic)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sum) unmarshalProtobuf(src []byte) (err error) {
|
func (s *Sum) unmarshalProtobuf(src []byte) (err error) {
|
||||||
@ -666,6 +677,12 @@ func (s *Sum) unmarshalProtobuf(src []byte) (err error) {
|
|||||||
return fmt.Errorf("cannot read AggregationTemporality")
|
return fmt.Errorf("cannot read AggregationTemporality")
|
||||||
}
|
}
|
||||||
s.AggregationTemporality = AggregationTemporality(at)
|
s.AggregationTemporality = AggregationTemporality(at)
|
||||||
|
case 3:
|
||||||
|
im, ok := fc.Bool()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read IsMonotonic")
|
||||||
|
}
|
||||||
|
s.IsMonotonic = im
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
package stream
|
package stream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"unicode"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
|
||||||
@ -13,11 +16,72 @@ import (
|
|||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// sanitizeMetrics controls sanitizing metric and label names ingested via OpenTelemetry protocol.
|
||||||
|
sanitizeMetrics = flag.Bool("opentelemetry.sanitizeMetrics", false, "Sanitize metric and label names for the ingested OpenTelemetry data")
|
||||||
|
)
|
||||||
|
|
||||||
|
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L19
|
||||||
|
var unitMap = []struct {
|
||||||
|
prefix string
|
||||||
|
units map[string]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
units: map[string]string{
|
||||||
|
// Time
|
||||||
|
"d": "days",
|
||||||
|
"h": "hours",
|
||||||
|
"min": "minutes",
|
||||||
|
"s": "seconds",
|
||||||
|
"ms": "milliseconds",
|
||||||
|
"us": "microseconds",
|
||||||
|
"ns": "nanoseconds",
|
||||||
|
|
||||||
|
// Bytes
|
||||||
|
"By": "bytes",
|
||||||
|
"KiBy": "kibibytes",
|
||||||
|
"MiBy": "mebibytes",
|
||||||
|
"GiBy": "gibibytes",
|
||||||
|
"TiBy": "tibibytes",
|
||||||
|
"KBy": "kilobytes",
|
||||||
|
"MBy": "megabytes",
|
||||||
|
"GBy": "gigabytes",
|
||||||
|
"TBy": "terabytes",
|
||||||
|
|
||||||
|
// SI
|
||||||
|
"m": "meters",
|
||||||
|
"V": "volts",
|
||||||
|
"A": "amperes",
|
||||||
|
"J": "joules",
|
||||||
|
"W": "watts",
|
||||||
|
"g": "grams",
|
||||||
|
|
||||||
|
// Misc
|
||||||
|
"Cel": "celsius",
|
||||||
|
"Hz": "hertz",
|
||||||
|
"1": "",
|
||||||
|
"%": "percent",
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
prefix: "per",
|
||||||
|
units: map[string]string{
|
||||||
|
"s": "second",
|
||||||
|
"m": "minute",
|
||||||
|
"h": "hour",
|
||||||
|
"d": "day",
|
||||||
|
"w": "week",
|
||||||
|
"mo": "month",
|
||||||
|
"y": "year",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows.
|
// ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows.
|
||||||
//
|
//
|
||||||
// callback shouldn't hold tss items after returning.
|
// callback shouldn't hold tss items after returning.
|
||||||
@ -58,10 +122,11 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
|
|||||||
// skip metrics without names
|
// skip metrics without names
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
metricName := sanitizeMetricName(m)
|
||||||
switch {
|
switch {
|
||||||
case m.Gauge != nil:
|
case m.Gauge != nil:
|
||||||
for _, p := range m.Gauge.DataPoints {
|
for _, p := range m.Gauge.DataPoints {
|
||||||
wr.appendSampleFromNumericPoint(m.Name, p)
|
wr.appendSampleFromNumericPoint(metricName, p)
|
||||||
}
|
}
|
||||||
case m.Sum != nil:
|
case m.Sum != nil:
|
||||||
if m.Sum.AggregationTemporality != pb.AggregationTemporalityCumulative {
|
if m.Sum.AggregationTemporality != pb.AggregationTemporalityCumulative {
|
||||||
@ -69,11 +134,11 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, p := range m.Sum.DataPoints {
|
for _, p := range m.Sum.DataPoints {
|
||||||
wr.appendSampleFromNumericPoint(m.Name, p)
|
wr.appendSampleFromNumericPoint(metricName, p)
|
||||||
}
|
}
|
||||||
case m.Summary != nil:
|
case m.Summary != nil:
|
||||||
for _, p := range m.Summary.DataPoints {
|
for _, p := range m.Summary.DataPoints {
|
||||||
wr.appendSamplesFromSummary(m.Name, p)
|
wr.appendSamplesFromSummary(metricName, p)
|
||||||
}
|
}
|
||||||
case m.Histogram != nil:
|
case m.Histogram != nil:
|
||||||
if m.Histogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
|
if m.Histogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
|
||||||
@ -81,11 +146,11 @@ func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, p := range m.Histogram.DataPoints {
|
for _, p := range m.Histogram.DataPoints {
|
||||||
wr.appendSamplesFromHistogram(m.Name, p)
|
wr.appendSamplesFromHistogram(metricName, p)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
rowsDroppedUnsupportedMetricType.Inc()
|
rowsDroppedUnsupportedMetricType.Inc()
|
||||||
logger.Warnf("unsupported type for metric %q", m.Name)
|
logger.Warnf("unsupported type for metric %q", metricName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -209,7 +274,7 @@ func (wr *writeContext) appendSampleWithExtraLabel(metricName, labelName, labelV
|
|||||||
func appendAttributesToPromLabels(dst []prompbmarshal.Label, attributes []*pb.KeyValue) []prompbmarshal.Label {
|
func appendAttributesToPromLabels(dst []prompbmarshal.Label, attributes []*pb.KeyValue) []prompbmarshal.Label {
|
||||||
for _, at := range attributes {
|
for _, at := range attributes {
|
||||||
dst = append(dst, prompbmarshal.Label{
|
dst = append(dst, prompbmarshal.Label{
|
||||||
Name: at.Key,
|
Name: sanitizeLabelName(at.Key),
|
||||||
Value: at.Value.FormatString(),
|
Value: at.Value.FormatString(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -290,6 +355,74 @@ func (wr *writeContext) parseRequestToTss(req *pb.ExportMetricsServiceRequest) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_label.go#L26
|
||||||
|
func sanitizeLabelName(labelName string) string {
|
||||||
|
if !*sanitizeMetrics {
|
||||||
|
return labelName
|
||||||
|
}
|
||||||
|
if len(labelName) == 0 {
|
||||||
|
return labelName
|
||||||
|
}
|
||||||
|
labelName = promrelabel.SanitizeLabelName(labelName)
|
||||||
|
if unicode.IsDigit(rune(labelName[0])) {
|
||||||
|
return "key_" + labelName
|
||||||
|
} else if strings.HasPrefix(labelName, "_") && !strings.HasPrefix(labelName, "__") {
|
||||||
|
return "key" + labelName
|
||||||
|
}
|
||||||
|
return labelName
|
||||||
|
}
|
||||||
|
|
||||||
|
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L83
|
||||||
|
func sanitizeMetricName(metric *pb.Metric) string {
|
||||||
|
if !*sanitizeMetrics {
|
||||||
|
return metric.Name
|
||||||
|
}
|
||||||
|
nameTokens := promrelabel.SanitizeLabelNameParts(metric.Name)
|
||||||
|
unitTokens := strings.SplitN(metric.Unit, "/", len(unitMap))
|
||||||
|
for i, u := range unitTokens {
|
||||||
|
unitToken := strings.TrimSpace(u)
|
||||||
|
if unitToken == "" || strings.ContainsAny(unitToken, "{}") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if unit, ok := unitMap[i].units[unitToken]; ok {
|
||||||
|
unitToken = unit
|
||||||
|
}
|
||||||
|
if unitToken != "" && !containsToken(nameTokens, unitToken) {
|
||||||
|
unitPrefix := unitMap[i].prefix
|
||||||
|
if unitPrefix != "" {
|
||||||
|
nameTokens = append(nameTokens, unitPrefix, unitToken)
|
||||||
|
} else {
|
||||||
|
nameTokens = append(nameTokens, unitToken)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if metric.Sum != nil && metric.Sum.IsMonotonic {
|
||||||
|
nameTokens = moveOrAppend(nameTokens, "total")
|
||||||
|
} else if metric.Unit == "1" && metric.Gauge != nil {
|
||||||
|
nameTokens = moveOrAppend(nameTokens, "ratio")
|
||||||
|
}
|
||||||
|
return strings.Join(nameTokens, "_")
|
||||||
|
}
|
||||||
|
|
||||||
|
func containsToken(tokens []string, value string) bool {
|
||||||
|
for _, token := range tokens {
|
||||||
|
if token == value {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func moveOrAppend(tokens []string, value string) []string {
|
||||||
|
for t := range tokens {
|
||||||
|
if tokens[t] == value {
|
||||||
|
tokens = append(tokens[:t], tokens[t+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return append(tokens, value)
|
||||||
|
}
|
||||||
|
|
||||||
var wrPool sync.Pool
|
var wrPool sync.Pool
|
||||||
|
|
||||||
func getWriteContext() *writeContext {
|
func getWriteContext() *writeContext {
|
||||||
|
@ -15,8 +15,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestParseStream(t *testing.T) {
|
func TestParseStream(t *testing.T) {
|
||||||
f := func(samples []*pb.Metric, tssExpected []prompbmarshal.TimeSeries) {
|
f := func(samples []*pb.Metric, tssExpected []prompbmarshal.TimeSeries, sanitize bool) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
*sanitizeMetrics = sanitize
|
||||||
|
|
||||||
checkSeries := func(tss []prompbmarshal.TimeSeries) error {
|
checkSeries := func(tss []prompbmarshal.TimeSeries) error {
|
||||||
if len(tss) != len(tssExpected) {
|
if len(tss) != len(tssExpected) {
|
||||||
@ -86,10 +87,10 @@ func TestParseStream(t *testing.T) {
|
|||||||
// Test all metric types
|
// Test all metric types
|
||||||
f(
|
f(
|
||||||
[]*pb.Metric{
|
[]*pb.Metric{
|
||||||
generateGauge("my-gauge"),
|
generateGauge("my-gauge", ""),
|
||||||
generateHistogram("my-histogram"),
|
generateHistogram("my-histogram", ""),
|
||||||
generateSum("my-sum"),
|
generateSum("my-sum", "", false),
|
||||||
generateSummary("my-summary"),
|
generateSummary("my-summary", ""),
|
||||||
},
|
},
|
||||||
[]prompbmarshal.TimeSeries{
|
[]prompbmarshal.TimeSeries{
|
||||||
newPromPBTs("my-gauge", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")),
|
newPromPBTs("my-gauge", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")),
|
||||||
@ -106,16 +107,85 @@ func TestParseStream(t *testing.T) {
|
|||||||
newPromPBTs("my-summary", 35000, 7.5, jobLabelValue, kvLabel("label6", "value6"), kvLabel("quantile", "0.1")),
|
newPromPBTs("my-summary", 35000, 7.5, jobLabelValue, kvLabel("label6", "value6"), kvLabel("quantile", "0.1")),
|
||||||
newPromPBTs("my-summary", 35000, 10.0, jobLabelValue, kvLabel("label6", "value6"), kvLabel("quantile", "0.5")),
|
newPromPBTs("my-summary", 35000, 10.0, jobLabelValue, kvLabel("label6", "value6"), kvLabel("quantile", "0.5")),
|
||||||
newPromPBTs("my-summary", 35000, 15.0, jobLabelValue, kvLabel("label6", "value6"), kvLabel("quantile", "1")),
|
newPromPBTs("my-summary", 35000, 15.0, jobLabelValue, kvLabel("label6", "value6"), kvLabel("quantile", "1")),
|
||||||
})
|
},
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
|
||||||
// Test gauge
|
// Test gauge
|
||||||
f(
|
f(
|
||||||
[]*pb.Metric{
|
[]*pb.Metric{
|
||||||
generateGauge("my-gauge"),
|
generateGauge("my-gauge", ""),
|
||||||
},
|
},
|
||||||
[]prompbmarshal.TimeSeries{
|
[]prompbmarshal.TimeSeries{
|
||||||
newPromPBTs("my-gauge", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")),
|
newPromPBTs("my-gauge", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")),
|
||||||
},
|
},
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test gauge with unit and sanitization
|
||||||
|
f(
|
||||||
|
[]*pb.Metric{
|
||||||
|
generateGauge("my-gauge", "ms"),
|
||||||
|
},
|
||||||
|
[]prompbmarshal.TimeSeries{
|
||||||
|
newPromPBTs("my_gauge_milliseconds", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")),
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test gauge with unit inside metric
|
||||||
|
f(
|
||||||
|
[]*pb.Metric{
|
||||||
|
generateGauge("my-gauge-milliseconds", "ms"),
|
||||||
|
},
|
||||||
|
[]prompbmarshal.TimeSeries{
|
||||||
|
newPromPBTs("my_gauge_milliseconds", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")),
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test gauge with ratio suffix
|
||||||
|
f(
|
||||||
|
[]*pb.Metric{
|
||||||
|
generateGauge("my-gauge-milliseconds", "1"),
|
||||||
|
},
|
||||||
|
[]prompbmarshal.TimeSeries{
|
||||||
|
newPromPBTs("my_gauge_milliseconds_ratio", 15000, 15.0, jobLabelValue, kvLabel("label1", "value1")),
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test sum with total suffix
|
||||||
|
f(
|
||||||
|
[]*pb.Metric{
|
||||||
|
generateSum("my-sum", "ms", true),
|
||||||
|
},
|
||||||
|
[]prompbmarshal.TimeSeries{
|
||||||
|
newPromPBTs("my_sum_milliseconds_total", 150000, 15.5, jobLabelValue, kvLabel("label5", "value5")),
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test sum with total suffix, which exists in a metric name
|
||||||
|
f(
|
||||||
|
[]*pb.Metric{
|
||||||
|
generateSum("my-total-sum", "ms", true),
|
||||||
|
},
|
||||||
|
[]prompbmarshal.TimeSeries{
|
||||||
|
newPromPBTs("my_sum_milliseconds_total", 150000, 15.5, jobLabelValue, kvLabel("label5", "value5")),
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test sum with total and complex suffix
|
||||||
|
f(
|
||||||
|
[]*pb.Metric{
|
||||||
|
generateSum("my-total-sum", "m/s", true),
|
||||||
|
},
|
||||||
|
[]prompbmarshal.TimeSeries{
|
||||||
|
newPromPBTs("my_sum_meters_per_second_total", 150000, 15.5, jobLabelValue, kvLabel("label5", "value5")),
|
||||||
|
},
|
||||||
|
true,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -152,7 +222,7 @@ func attributesFromKV(k, v string) []*pb.KeyValue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateGauge(name string) *pb.Metric {
|
func generateGauge(name, unit string) *pb.Metric {
|
||||||
n := int64(15)
|
n := int64(15)
|
||||||
points := []*pb.NumberDataPoint{
|
points := []*pb.NumberDataPoint{
|
||||||
{
|
{
|
||||||
@ -163,13 +233,14 @@ func generateGauge(name string) *pb.Metric {
|
|||||||
}
|
}
|
||||||
return &pb.Metric{
|
return &pb.Metric{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
Unit: unit,
|
||||||
Gauge: &pb.Gauge{
|
Gauge: &pb.Gauge{
|
||||||
DataPoints: points,
|
DataPoints: points,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateHistogram(name string) *pb.Metric {
|
func generateHistogram(name, unit string) *pb.Metric {
|
||||||
points := []*pb.HistogramDataPoint{
|
points := []*pb.HistogramDataPoint{
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -183,6 +254,7 @@ func generateHistogram(name string) *pb.Metric {
|
|||||||
}
|
}
|
||||||
return &pb.Metric{
|
return &pb.Metric{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
Unit: unit,
|
||||||
Histogram: &pb.Histogram{
|
Histogram: &pb.Histogram{
|
||||||
AggregationTemporality: pb.AggregationTemporalityCumulative,
|
AggregationTemporality: pb.AggregationTemporalityCumulative,
|
||||||
DataPoints: points,
|
DataPoints: points,
|
||||||
@ -190,7 +262,7 @@ func generateHistogram(name string) *pb.Metric {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateSum(name string) *pb.Metric {
|
func generateSum(name, unit string, isMonotonic bool) *pb.Metric {
|
||||||
d := float64(15.5)
|
d := float64(15.5)
|
||||||
points := []*pb.NumberDataPoint{
|
points := []*pb.NumberDataPoint{
|
||||||
{
|
{
|
||||||
@ -201,14 +273,16 @@ func generateSum(name string) *pb.Metric {
|
|||||||
}
|
}
|
||||||
return &pb.Metric{
|
return &pb.Metric{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
Unit: unit,
|
||||||
Sum: &pb.Sum{
|
Sum: &pb.Sum{
|
||||||
AggregationTemporality: pb.AggregationTemporalityCumulative,
|
AggregationTemporality: pb.AggregationTemporalityCumulative,
|
||||||
DataPoints: points,
|
DataPoints: points,
|
||||||
|
IsMonotonic: isMonotonic,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateSummary(name string) *pb.Metric {
|
func generateSummary(name, unit string) *pb.Metric {
|
||||||
points := []*pb.SummaryDataPoint{
|
points := []*pb.SummaryDataPoint{
|
||||||
{
|
{
|
||||||
Attributes: attributesFromKV("label6", "value6"),
|
Attributes: attributesFromKV("label6", "value6"),
|
||||||
@ -233,6 +307,7 @@ func generateSummary(name string) *pb.Metric {
|
|||||||
}
|
}
|
||||||
return &pb.Metric{
|
return &pb.Metric{
|
||||||
Name: name,
|
Name: name,
|
||||||
|
Unit: unit,
|
||||||
Summary: &pb.Summary{
|
Summary: &pb.Summary{
|
||||||
DataPoints: points,
|
DataPoints: points,
|
||||||
},
|
},
|
||||||
|
@ -10,10 +10,10 @@ import (
|
|||||||
|
|
||||||
func BenchmarkParseStream(b *testing.B) {
|
func BenchmarkParseStream(b *testing.B) {
|
||||||
samples := []*pb.Metric{
|
samples := []*pb.Metric{
|
||||||
generateGauge("my-gauge"),
|
generateGauge("my-gauge", ""),
|
||||||
generateHistogram("my-histogram"),
|
generateHistogram("my-histogram", ""),
|
||||||
generateSum("my-sum"),
|
generateSum("my-sum", "", false),
|
||||||
generateSummary("my-summary"),
|
generateSummary("my-summary", ""),
|
||||||
}
|
}
|
||||||
b.SetBytes(1)
|
b.SetBytes(1)
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
|
Loading…
Reference in New Issue
Block a user