VictoriaMetrics/app/vlinsert/insertutils/common_params.go
Zakhar Bessarab b0c8f7f718
app/vlinsert: add flag to limit amount of fields per line (#4976)
Adding limit on ingestion allows to avoid issues like this one https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762
Such issues are often caused by misconfigurtion on log persing/ingestion side and preventing such rows from being ingested allows to avoid performance implications created by storing such log rows.

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2023-09-18 12:00:02 +02:00

102 lines
3.0 KiB
Go

package insertutils
import (
"net/http"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
// CommonParams contains common HTTP parameters used by log ingestion APIs.
//
// See https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters
type CommonParams struct {
TenantID logstorage.TenantID
TimeField string
MsgField string
StreamFields []string
IgnoreFields []string
Debug bool
DebugRequestURI string
DebugRemoteAddr string
}
// GetCommonParams returns CommonParams from r.
func GetCommonParams(r *http.Request) (*CommonParams, error) {
// Extract tenantID
tenantID, err := logstorage.GetTenantIDFromRequest(r)
if err != nil {
return nil, err
}
// Extract time field name from _time_field query arg
var timeField = "_time"
if tf := r.FormValue("_time_field"); tf != "" {
timeField = tf
}
// Extract message field name from _msg_field query arg
var msgField = ""
if msgf := r.FormValue("_msg_field"); msgf != "" {
msgField = msgf
}
streamFields := httputils.GetArray(r, "_stream_fields")
ignoreFields := httputils.GetArray(r, "ignore_fields")
debug := httputils.GetBool(r, "debug")
debugRequestURI := ""
debugRemoteAddr := ""
if debug {
debugRequestURI = httpserver.GetRequestURI(r)
debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r)
}
cp := &CommonParams{
TenantID: tenantID,
TimeField: timeField,
MsgField: msgField,
StreamFields: streamFields,
IgnoreFields: ignoreFields,
Debug: debug,
DebugRequestURI: debugRequestURI,
DebugRemoteAddr: debugRemoteAddr,
}
return cp, nil
}
// GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr.
func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) {
return func(timestamp int64, fields []logstorage.Field) {
if len(fields) > *MaxFieldsPerLine {
var rf logstorage.RowFormatter
rf = append(rf[:0], fields...)
logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf.String())
rowsDroppedTotalTooManyFields.Inc()
return
}
lr.MustAdd(cp.TenantID, timestamp, fields)
if cp.Debug {
s := lr.GetRowString(0)
lr.ResetKeepSettings()
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", cp.DebugRemoteAddr, cp.DebugRequestURI, s)
rowsDroppedTotalDebug.Inc()
return
}
if lr.NeedFlush() {
vlstorage.MustAddRows(lr)
lr.ResetKeepSettings()
}
}
}
var rowsDroppedTotalDebug = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`)
var rowsDroppedTotalTooManyFields = metrics.NewCounter(`vl_rows_dropped_total{reason="too_many_fields"}`)