mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
681d29654a
Loki uses default labels format without "or" operator. This format can't create a list of LabelFilters, so only first set of LabelFilters should be used. Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
140 lines
3.9 KiB
Go
140 lines
3.9 KiB
Go
package loki
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
|
|
"github.com/golang/snappy"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/VictoriaMetrics/metricsql"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
|
)
|
|
|
|
var (
|
|
rowsIngestedTotalProtobuf = metrics.NewCounter(`vl_rows_ingested_total{type="loki", format="protobuf"}`)
|
|
bytesBufPool bytesutil.ByteBufferPool
|
|
pushReqsPool sync.Pool
|
|
)
|
|
|
|
func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
|
|
wcr := writeconcurrencylimiter.GetReader(r.Body)
|
|
defer writeconcurrencylimiter.PutReader(wcr)
|
|
|
|
cp, err := getCommonParams(r)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot parse request: %s", err)
|
|
return true
|
|
}
|
|
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
|
|
defer logstorage.PutLogRows(lr)
|
|
|
|
processLogMessage := cp.GetProcessLogMessageFunc(lr)
|
|
n, err := processProtobufRequest(wcr, processLogMessage)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot decode loki request: %s", err)
|
|
return true
|
|
}
|
|
|
|
rowsIngestedTotalProtobuf.Add(n)
|
|
|
|
return true
|
|
}
|
|
|
|
func processProtobufRequest(r io.Reader, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
|
|
wcr := writeconcurrencylimiter.GetReader(r)
|
|
defer writeconcurrencylimiter.PutReader(wcr)
|
|
|
|
bytes, err := io.ReadAll(wcr)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot read request body: %s", err)
|
|
}
|
|
|
|
bb := bytesBufPool.Get()
|
|
defer bytesBufPool.Put(bb)
|
|
bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], bytes)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot decode snappy from request body: %s", err)
|
|
}
|
|
|
|
req := getPushReq()
|
|
defer putPushReq(req)
|
|
err = req.Unmarshal(bb.B)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse request body: %s", err)
|
|
}
|
|
|
|
var commonFields []logstorage.Field
|
|
rowsIngested := 0
|
|
for stIdx, st := range req.Streams {
|
|
// st.Labels contains labels for the stream.
|
|
// Labels are same for all entries in the stream.
|
|
commonFields, err = parseLogFields(st.Labels, commonFields)
|
|
if err != nil {
|
|
return rowsIngested, fmt.Errorf("failed to unmarshal labels in stream %d: %q; %s", stIdx, st.Labels, err)
|
|
}
|
|
msgFieldIDx := len(commonFields) - 1
|
|
commonFields[msgFieldIDx].Name = msgField
|
|
|
|
for _, v := range st.Entries {
|
|
commonFields[msgFieldIDx].Value = v.Line
|
|
processLogMessage(v.Timestamp.UnixNano(), commonFields)
|
|
rowsIngested++
|
|
}
|
|
}
|
|
return rowsIngested, nil
|
|
}
|
|
|
|
// Parses logs fields s and returns the corresponding log fields.
|
|
// Cannot use searchutils.ParseMetricSelector here because its dependencies
|
|
// bring flags which clashes with logstorage flags.
|
|
//
|
|
// Loki encodes labels in the PromQL labels format.
|
|
// See test data of promtail for examples: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
|
|
func parseLogFields(s string, dst []logstorage.Field) ([]logstorage.Field, error) {
|
|
expr, err := metricsql.Parse(s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
me, ok := expr.(*metricsql.MetricExpr)
|
|
if !ok {
|
|
return nil, fmt.Errorf("failed to parse stream labels; got %q", expr.AppendString(nil))
|
|
}
|
|
|
|
// Expecting only label filters without MetricsQL "or" operator.
|
|
if len(me.LabelFilterss) != 1 {
|
|
return nil, fmt.Errorf("unexpected format of log fields; got %q", s)
|
|
}
|
|
|
|
// Allocate space for labels + msg field.
|
|
// Msg field is added by caller.
|
|
dst = slicesutil.ResizeNoCopyMayOverallocate(dst, len(me.LabelFilterss[0]))
|
|
for i, l := range me.LabelFilterss[0] {
|
|
dst[i].Name = l.Label
|
|
dst[i].Value = l.Value
|
|
}
|
|
|
|
return dst, nil
|
|
}
|
|
|
|
func getPushReq() *PushRequest {
|
|
v := pushReqsPool.Get()
|
|
if v == nil {
|
|
return &PushRequest{}
|
|
}
|
|
return v.(*PushRequest)
|
|
}
|
|
|
|
func putPushReq(reqs *PushRequest) {
|
|
reqs.Reset()
|
|
pushReqsPool.Put(reqs)
|
|
}
|