mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-22 08:26:19 +01:00
447 lines
13 KiB
Go
447 lines
13 KiB
Go
package stream
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"unicode"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
|
)
|
|
|
|
var (
|
|
// sanitizeMetrics controls sanitizing metric and label names ingested via OpenTelemetry protocol.
|
|
sanitizeMetrics = flag.Bool("opentelemetry.sanitizeMetrics", false, "Sanitize metric and label names for the ingested OpenTelemetry data")
|
|
)
|
|
|
|
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L19
|
|
var unitMap = []struct {
|
|
prefix string
|
|
units map[string]string
|
|
}{
|
|
{
|
|
units: map[string]string{
|
|
// Time
|
|
"d": "days",
|
|
"h": "hours",
|
|
"min": "minutes",
|
|
"s": "seconds",
|
|
"ms": "milliseconds",
|
|
"us": "microseconds",
|
|
"ns": "nanoseconds",
|
|
|
|
// Bytes
|
|
"By": "bytes",
|
|
"KiBy": "kibibytes",
|
|
"MiBy": "mebibytes",
|
|
"GiBy": "gibibytes",
|
|
"TiBy": "tibibytes",
|
|
"KBy": "kilobytes",
|
|
"MBy": "megabytes",
|
|
"GBy": "gigabytes",
|
|
"TBy": "terabytes",
|
|
|
|
// SI
|
|
"m": "meters",
|
|
"V": "volts",
|
|
"A": "amperes",
|
|
"J": "joules",
|
|
"W": "watts",
|
|
"g": "grams",
|
|
|
|
// Misc
|
|
"Cel": "celsius",
|
|
"Hz": "hertz",
|
|
"1": "",
|
|
"%": "percent",
|
|
},
|
|
}, {
|
|
prefix: "per",
|
|
units: map[string]string{
|
|
"s": "second",
|
|
"m": "minute",
|
|
"h": "hour",
|
|
"d": "day",
|
|
"w": "week",
|
|
"mo": "month",
|
|
"y": "year",
|
|
},
|
|
},
|
|
}
|
|
|
|
// ParseStream parses OpenTelemetry protobuf or json data from r and calls callback for the parsed rows.
|
|
//
|
|
// callback shouldn't hold tss items after returning.
|
|
//
|
|
// optional processBody can be used for pre-processing the read request body from r before parsing it in OpenTelemetry format.
|
|
func ParseStream(r io.Reader, isGzipped bool, processBody func([]byte) ([]byte, error), callback func(tss []prompbmarshal.TimeSeries) error) error {
|
|
wcr := writeconcurrencylimiter.GetReader(r)
|
|
defer writeconcurrencylimiter.PutReader(wcr)
|
|
r = wcr
|
|
|
|
if isGzipped {
|
|
zr, err := common.GetGzipReader(r)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot read gzip-compressed OpenTelemetry protocol data: %w", err)
|
|
}
|
|
defer common.PutGzipReader(zr)
|
|
r = zr
|
|
}
|
|
|
|
wr := getWriteContext()
|
|
defer putWriteContext(wr)
|
|
req, err := wr.readAndUnpackRequest(r, processBody)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unpack OpenTelemetry metrics: %w", err)
|
|
}
|
|
wr.parseRequestToTss(req)
|
|
|
|
if err := callback(wr.tss); err != nil {
|
|
return fmt.Errorf("error when processing OpenTelemetry samples: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (wr *writeContext) appendSamplesFromScopeMetrics(sc *pb.ScopeMetrics) {
|
|
for _, m := range sc.Metrics {
|
|
if len(m.Name) == 0 {
|
|
// skip metrics without names
|
|
continue
|
|
}
|
|
metricName := sanitizeMetricName(m)
|
|
switch {
|
|
case m.Gauge != nil:
|
|
for _, p := range m.Gauge.DataPoints {
|
|
wr.appendSampleFromNumericPoint(metricName, p)
|
|
}
|
|
case m.Sum != nil:
|
|
if m.Sum.AggregationTemporality != pb.AggregationTemporalityCumulative {
|
|
rowsDroppedUnsupportedSum.Inc()
|
|
continue
|
|
}
|
|
for _, p := range m.Sum.DataPoints {
|
|
wr.appendSampleFromNumericPoint(metricName, p)
|
|
}
|
|
case m.Summary != nil:
|
|
for _, p := range m.Summary.DataPoints {
|
|
wr.appendSamplesFromSummary(metricName, p)
|
|
}
|
|
case m.Histogram != nil:
|
|
if m.Histogram.AggregationTemporality != pb.AggregationTemporalityCumulative {
|
|
rowsDroppedUnsupportedHistogram.Inc()
|
|
continue
|
|
}
|
|
for _, p := range m.Histogram.DataPoints {
|
|
wr.appendSamplesFromHistogram(metricName, p)
|
|
}
|
|
default:
|
|
rowsDroppedUnsupportedMetricType.Inc()
|
|
logger.Warnf("unsupported type for metric %q", metricName)
|
|
}
|
|
}
|
|
}
|
|
|
|
// appendSampleFromNumericPoint appends p to wr.tss
|
|
func (wr *writeContext) appendSampleFromNumericPoint(metricName string, p *pb.NumberDataPoint) {
|
|
var v float64
|
|
switch {
|
|
case p.IntValue != nil:
|
|
v = float64(*p.IntValue)
|
|
case p.DoubleValue != nil:
|
|
v = *p.DoubleValue
|
|
}
|
|
|
|
t := int64(p.TimeUnixNano / 1e6)
|
|
isStale := (p.Flags)&uint32(1) != 0
|
|
wr.pointLabels = appendAttributesToPromLabels(wr.pointLabels[:0], p.Attributes)
|
|
|
|
wr.appendSample(metricName, t, v, isStale)
|
|
}
|
|
|
|
// appendSamplesFromSummary appends summary p to wr.tss
|
|
func (wr *writeContext) appendSamplesFromSummary(metricName string, p *pb.SummaryDataPoint) {
|
|
t := int64(p.TimeUnixNano / 1e6)
|
|
isStale := (p.Flags)&uint32(1) != 0
|
|
wr.pointLabels = appendAttributesToPromLabels(wr.pointLabels[:0], p.Attributes)
|
|
|
|
wr.appendSample(metricName+"_sum", t, p.Sum, isStale)
|
|
wr.appendSample(metricName+"_count", t, float64(p.Count), isStale)
|
|
for _, q := range p.QuantileValues {
|
|
qValue := strconv.FormatFloat(q.Quantile, 'f', -1, 64)
|
|
wr.appendSampleWithExtraLabel(metricName, "quantile", qValue, t, q.Value, isStale)
|
|
}
|
|
}
|
|
|
|
// appendSamplesFromHistogram appends histogram p to wr.tss
|
|
func (wr *writeContext) appendSamplesFromHistogram(metricName string, p *pb.HistogramDataPoint) {
|
|
if len(p.BucketCounts) == 0 {
|
|
// nothing to append
|
|
return
|
|
}
|
|
if len(p.BucketCounts) != len(p.ExplicitBounds)+1 {
|
|
// fast path, broken data format
|
|
logger.Warnf("opentelemetry bad histogram format: %q, size of buckets: %d, size of bounds: %d", metricName, len(p.BucketCounts), len(p.ExplicitBounds))
|
|
return
|
|
}
|
|
|
|
t := int64(p.TimeUnixNano / 1e6)
|
|
isStale := (p.Flags)&uint32(1) != 0
|
|
wr.pointLabels = appendAttributesToPromLabels(wr.pointLabels[:0], p.Attributes)
|
|
wr.appendSample(metricName+"_count", t, float64(p.Count), isStale)
|
|
if p.Sum == nil {
|
|
// fast path, convert metric as simple counter.
|
|
// given buckets cannot be used for histogram functions.
|
|
// Negative threshold buckets MAY be used, but then the Histogram MetricPoint MUST NOT contain a sum value as it would no longer be a counter semantically.
|
|
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#histogram
|
|
return
|
|
}
|
|
|
|
wr.appendSample(metricName+"_sum", t, *p.Sum, isStale)
|
|
|
|
var cumulative uint64
|
|
for index, bound := range p.ExplicitBounds {
|
|
cumulative += p.BucketCounts[index]
|
|
boundLabelValue := strconv.FormatFloat(bound, 'f', -1, 64)
|
|
wr.appendSampleWithExtraLabel(metricName+"_bucket", "le", boundLabelValue, t, float64(cumulative), isStale)
|
|
}
|
|
cumulative += p.BucketCounts[len(p.BucketCounts)-1]
|
|
wr.appendSampleWithExtraLabel(metricName+"_bucket", "le", "+Inf", t, float64(cumulative), isStale)
|
|
}
|
|
|
|
// appendSample appends sample with the given metricName to wr.tss
|
|
func (wr *writeContext) appendSample(metricName string, t int64, v float64, isStale bool) {
|
|
wr.appendSampleWithExtraLabel(metricName, "", "", t, v, isStale)
|
|
}
|
|
|
|
// appendSampleWithExtraLabel appends sample with the given metricName and the given (labelName=labelValue) extra label to wr.tss
|
|
func (wr *writeContext) appendSampleWithExtraLabel(metricName, labelName, labelValue string, t int64, v float64, isStale bool) {
|
|
if isStale {
|
|
v = decimal.StaleNaN
|
|
}
|
|
if t <= 0 {
|
|
// Set the current timestamp if t isn't set.
|
|
t = int64(fasttime.UnixTimestamp()) * 1000
|
|
}
|
|
|
|
labelsPool := wr.labelsPool
|
|
labelsLen := len(labelsPool)
|
|
labelsPool = append(labelsPool, prompbmarshal.Label{
|
|
Name: "__name__",
|
|
Value: metricName,
|
|
})
|
|
labelsPool = append(labelsPool, wr.baseLabels...)
|
|
labelsPool = append(labelsPool, wr.pointLabels...)
|
|
if labelName != "" && labelValue != "" {
|
|
labelsPool = append(labelsPool, prompbmarshal.Label{
|
|
Name: labelName,
|
|
Value: labelValue,
|
|
})
|
|
}
|
|
|
|
samplesPool := wr.samplesPool
|
|
samplesLen := len(samplesPool)
|
|
samplesPool = append(samplesPool, prompbmarshal.Sample{
|
|
Timestamp: t,
|
|
Value: v,
|
|
})
|
|
|
|
wr.tss = append(wr.tss, prompbmarshal.TimeSeries{
|
|
Labels: labelsPool[labelsLen:],
|
|
Samples: samplesPool[samplesLen:],
|
|
})
|
|
|
|
wr.labelsPool = labelsPool
|
|
wr.samplesPool = samplesPool
|
|
|
|
rowsRead.Inc()
|
|
}
|
|
|
|
// appendAttributesToPromLabels appends attributes to dst and returns the result.
|
|
func appendAttributesToPromLabels(dst []prompbmarshal.Label, attributes []*pb.KeyValue) []prompbmarshal.Label {
|
|
for _, at := range attributes {
|
|
dst = append(dst, prompbmarshal.Label{
|
|
Name: sanitizeLabelName(at.Key),
|
|
Value: at.Value.FormatString(),
|
|
})
|
|
}
|
|
return dst
|
|
}
|
|
|
|
type writeContext struct {
|
|
// bb holds the original data (json or protobuf), which must be parsed.
|
|
bb bytesutil.ByteBuffer
|
|
|
|
// tss holds parsed time series
|
|
tss []prompbmarshal.TimeSeries
|
|
|
|
// baseLabels are labels, which must be added to all the ingested samples
|
|
baseLabels []prompbmarshal.Label
|
|
|
|
// pointLabels are labels, which must be added to the ingested OpenTelemetry points
|
|
pointLabels []prompbmarshal.Label
|
|
|
|
// pools are used for reducing memory allocations when parsing time series
|
|
labelsPool []prompbmarshal.Label
|
|
samplesPool []prompbmarshal.Sample
|
|
}
|
|
|
|
func (wr *writeContext) reset() {
|
|
wr.bb.Reset()
|
|
|
|
tss := wr.tss
|
|
for i := range tss {
|
|
ts := &tss[i]
|
|
ts.Labels = nil
|
|
ts.Samples = nil
|
|
}
|
|
wr.tss = tss[:0]
|
|
|
|
wr.baseLabels = resetLabels(wr.baseLabels)
|
|
wr.pointLabels = resetLabels(wr.pointLabels)
|
|
|
|
wr.labelsPool = resetLabels(wr.labelsPool)
|
|
wr.samplesPool = wr.samplesPool[:0]
|
|
}
|
|
|
|
func resetLabels(labels []prompbmarshal.Label) []prompbmarshal.Label {
|
|
for i := range labels {
|
|
labels[i] = prompbmarshal.Label{}
|
|
}
|
|
return labels[:0]
|
|
}
|
|
|
|
func (wr *writeContext) readAndUnpackRequest(r io.Reader, processBody func([]byte) ([]byte, error)) (*pb.ExportMetricsServiceRequest, error) {
|
|
if _, err := wr.bb.ReadFrom(r); err != nil {
|
|
return nil, fmt.Errorf("cannot read request: %w", err)
|
|
}
|
|
var req pb.ExportMetricsServiceRequest
|
|
if processBody != nil {
|
|
data, err := processBody(wr.bb.B)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot process request body: %w", err)
|
|
}
|
|
wr.bb.B = append(wr.bb.B[:0], data...)
|
|
}
|
|
if err := req.UnmarshalProtobuf(wr.bb.B); err != nil {
|
|
return nil, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(wr.bb.B), err)
|
|
}
|
|
return &req, nil
|
|
}
|
|
|
|
func (wr *writeContext) parseRequestToTss(req *pb.ExportMetricsServiceRequest) {
|
|
for _, rm := range req.ResourceMetrics {
|
|
var attributes []*pb.KeyValue
|
|
if rm.Resource != nil {
|
|
attributes = rm.Resource.Attributes
|
|
}
|
|
wr.baseLabels = appendAttributesToPromLabels(wr.baseLabels[:0], attributes)
|
|
for _, sc := range rm.ScopeMetrics {
|
|
wr.appendSamplesFromScopeMetrics(sc)
|
|
}
|
|
}
|
|
}
|
|
|
|
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_label.go#L26
|
|
func sanitizeLabelName(labelName string) string {
|
|
if !*sanitizeMetrics {
|
|
return labelName
|
|
}
|
|
if len(labelName) == 0 {
|
|
return labelName
|
|
}
|
|
labelName = promrelabel.SanitizeLabelName(labelName)
|
|
if unicode.IsDigit(rune(labelName[0])) {
|
|
return "key_" + labelName
|
|
} else if strings.HasPrefix(labelName, "_") && !strings.HasPrefix(labelName, "__") {
|
|
return "key" + labelName
|
|
}
|
|
return labelName
|
|
}
|
|
|
|
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/b8655058501bed61a06bb660869051491f46840b/pkg/translator/prometheus/normalize_name.go#L83
|
|
func sanitizeMetricName(metric *pb.Metric) string {
|
|
if !*sanitizeMetrics {
|
|
return metric.Name
|
|
}
|
|
nameTokens := promrelabel.SanitizeLabelNameParts(metric.Name)
|
|
unitTokens := strings.SplitN(metric.Unit, "/", len(unitMap))
|
|
for i, u := range unitTokens {
|
|
unitToken := strings.TrimSpace(u)
|
|
if unitToken == "" || strings.ContainsAny(unitToken, "{}") {
|
|
continue
|
|
}
|
|
if unit, ok := unitMap[i].units[unitToken]; ok {
|
|
unitToken = unit
|
|
}
|
|
if unitToken != "" && !containsToken(nameTokens, unitToken) {
|
|
unitPrefix := unitMap[i].prefix
|
|
if unitPrefix != "" {
|
|
nameTokens = append(nameTokens, unitPrefix, unitToken)
|
|
} else {
|
|
nameTokens = append(nameTokens, unitToken)
|
|
}
|
|
}
|
|
}
|
|
if metric.Sum != nil && metric.Sum.IsMonotonic {
|
|
nameTokens = moveOrAppend(nameTokens, "total")
|
|
} else if metric.Unit == "1" && metric.Gauge != nil {
|
|
nameTokens = moveOrAppend(nameTokens, "ratio")
|
|
}
|
|
return strings.Join(nameTokens, "_")
|
|
}
|
|
|
|
func containsToken(tokens []string, value string) bool {
|
|
for _, token := range tokens {
|
|
if token == value {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func moveOrAppend(tokens []string, value string) []string {
|
|
for t := range tokens {
|
|
if tokens[t] == value {
|
|
tokens = append(tokens[:t], tokens[t+1:]...)
|
|
break
|
|
}
|
|
}
|
|
return append(tokens, value)
|
|
}
|
|
|
|
var wrPool sync.Pool
|
|
|
|
func getWriteContext() *writeContext {
|
|
v := wrPool.Get()
|
|
if v == nil {
|
|
return &writeContext{}
|
|
}
|
|
return v.(*writeContext)
|
|
}
|
|
|
|
func putWriteContext(wr *writeContext) {
|
|
wr.reset()
|
|
wrPool.Put(wr)
|
|
}
|
|
|
|
var (
|
|
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentelemetry"}`)
|
|
rowsDroppedUnsupportedHistogram = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_histogram_aggregation"}`)
|
|
rowsDroppedUnsupportedSum = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_sum_aggregation"}`)
|
|
rowsDroppedUnsupportedMetricType = metrics.NewCounter(`vm_protoparser_rows_dropped_total{type="opentelemetry",reason="unsupported_metric_type"}`)
|
|
)
|