mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-02 01:00:07 +01:00
144 lines
4.5 KiB
Go
144 lines
4.5 KiB
Go
|
package opentelemetry
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"net/http"
|
||
|
"time"
|
||
|
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||
|
"github.com/VictoriaMetrics/metrics"
|
||
|
)
|
||
|
|
||
|
// RequestHandler processes Opentelemetry insert requests
|
||
|
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||
|
switch path {
|
||
|
// use the same path as opentelemetry collector
|
||
|
// https://opentelemetry.io/docs/specs/otlp/#otlphttp-request
|
||
|
case "/v1/logs":
|
||
|
if r.Header.Get("Content-Type") == "application/json" {
|
||
|
httpserver.Errorf(w, r, "json encoding isn't supported for opentelemetry format. Use protobuf encoding")
|
||
|
return true
|
||
|
}
|
||
|
handleProtobuf(r, w)
|
||
|
return true
|
||
|
default:
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func handleProtobuf(r *http.Request, w http.ResponseWriter) {
|
||
|
startTime := time.Now()
|
||
|
requestsProtobufTotal.Inc()
|
||
|
reader := r.Body
|
||
|
if r.Header.Get("Content-Encoding") == "gzip" {
|
||
|
zr, err := common.GetGzipReader(reader)
|
||
|
if err != nil {
|
||
|
httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err)
|
||
|
return
|
||
|
}
|
||
|
defer common.PutGzipReader(zr)
|
||
|
reader = zr
|
||
|
}
|
||
|
|
||
|
wcr := writeconcurrencylimiter.GetReader(reader)
|
||
|
data, err := io.ReadAll(wcr)
|
||
|
writeconcurrencylimiter.PutReader(wcr)
|
||
|
if err != nil {
|
||
|
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
cp, err := insertutils.GetCommonParams(r)
|
||
|
if err != nil {
|
||
|
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
|
||
|
return
|
||
|
}
|
||
|
if err := vlstorage.CanWriteData(); err != nil {
|
||
|
httpserver.Errorf(w, r, "%s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
lmp := cp.NewLogMessageProcessor()
|
||
|
n, err := pushProtobufRequest(data, lmp)
|
||
|
lmp.MustClose()
|
||
|
if err != nil {
|
||
|
httpserver.Errorf(w, r, "cannot parse OpenTelemetry protobuf request: %s", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
rowsIngestedProtobufTotal.Add(n)
|
||
|
|
||
|
// update requestProtobufDuration only for successfully parsed requests
|
||
|
// There is no need in updating requestProtobufDuration for request errors,
|
||
|
// since their timings are usually much smaller than the timing for successful request parsing.
|
||
|
requestProtobufDuration.UpdateDuration(startTime)
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="opentelemetry",format="protobuf"}`)
|
||
|
|
||
|
requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
||
|
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
||
|
|
||
|
requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
||
|
)
|
||
|
|
||
|
func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) {
|
||
|
var req pb.ExportLogsServiceRequest
|
||
|
if err := req.UnmarshalProtobuf(data); err != nil {
|
||
|
errorsTotal.Inc()
|
||
|
return 0, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err)
|
||
|
}
|
||
|
|
||
|
var rowsIngested int
|
||
|
var commonFields []logstorage.Field
|
||
|
for _, rl := range req.ResourceLogs {
|
||
|
attributes := rl.Resource.Attributes
|
||
|
commonFields = slicesutil.SetLength(commonFields, len(attributes))
|
||
|
for i, attr := range attributes {
|
||
|
commonFields[i].Name = attr.Key
|
||
|
commonFields[i].Value = attr.Value.FormatString()
|
||
|
}
|
||
|
commonFieldsLen := len(commonFields)
|
||
|
for _, sc := range rl.ScopeLogs {
|
||
|
var scopeIngested int
|
||
|
commonFields, scopeIngested = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp)
|
||
|
rowsIngested += scopeIngested
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return rowsIngested, nil
|
||
|
}
|
||
|
|
||
|
func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field, lmp insertutils.LogMessageProcessor) ([]logstorage.Field, int) {
|
||
|
fields := commonFields
|
||
|
for _, lr := range sc.LogRecords {
|
||
|
fields = fields[:len(commonFields)]
|
||
|
fields = append(fields, logstorage.Field{
|
||
|
Name: "_msg",
|
||
|
Value: lr.Body.FormatString(),
|
||
|
})
|
||
|
for _, attr := range lr.Attributes {
|
||
|
fields = append(fields, logstorage.Field{
|
||
|
Name: attr.Key,
|
||
|
Value: attr.Value.FormatString(),
|
||
|
})
|
||
|
}
|
||
|
fields = append(fields, logstorage.Field{
|
||
|
Name: "severity",
|
||
|
Value: lr.FormatSeverity(),
|
||
|
})
|
||
|
|
||
|
lmp.AddRow(lr.ExtractTimestampNano(), fields)
|
||
|
}
|
||
|
return fields, len(sc.LogRecords)
|
||
|
}
|