2023-06-22 04:39:22 +02:00
|
|
|
package insertutils
|
|
|
|
|
|
|
|
import (
|
|
|
|
"net/http"
|
|
|
|
|
2023-09-15 15:18:38 +02:00
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
|
|
|
2023-06-22 04:39:22 +02:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
|
|
)
|
|
|
|
|
|
|
|
// CommonParams contains common HTTP parameters used by log ingestion APIs.
|
|
|
|
//
|
|
|
|
// See https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#http-parameters
|
|
|
|
type CommonParams struct {
|
|
|
|
TenantID logstorage.TenantID
|
|
|
|
TimeField string
|
|
|
|
MsgField string
|
|
|
|
StreamFields []string
|
|
|
|
IgnoreFields []string
|
|
|
|
|
|
|
|
Debug bool
|
|
|
|
DebugRequestURI string
|
|
|
|
DebugRemoteAddr string
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetCommonParams returns CommonParams from r.
|
|
|
|
func GetCommonParams(r *http.Request) (*CommonParams, error) {
|
|
|
|
// Extract tenantID
|
|
|
|
tenantID, err := logstorage.GetTenantIDFromRequest(r)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Extract time field name from _time_field query arg
|
|
|
|
var timeField = "_time"
|
|
|
|
if tf := r.FormValue("_time_field"); tf != "" {
|
|
|
|
timeField = tf
|
|
|
|
}
|
|
|
|
|
|
|
|
// Extract message field name from _msg_field query arg
|
|
|
|
var msgField = ""
|
|
|
|
if msgf := r.FormValue("_msg_field"); msgf != "" {
|
|
|
|
msgField = msgf
|
|
|
|
}
|
|
|
|
|
|
|
|
streamFields := httputils.GetArray(r, "_stream_fields")
|
|
|
|
ignoreFields := httputils.GetArray(r, "ignore_fields")
|
|
|
|
|
|
|
|
debug := httputils.GetBool(r, "debug")
|
|
|
|
debugRequestURI := ""
|
|
|
|
debugRemoteAddr := ""
|
|
|
|
if debug {
|
|
|
|
debugRequestURI = httpserver.GetRequestURI(r)
|
|
|
|
debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r)
|
|
|
|
}
|
|
|
|
|
|
|
|
cp := &CommonParams{
|
|
|
|
TenantID: tenantID,
|
|
|
|
TimeField: timeField,
|
|
|
|
MsgField: msgField,
|
|
|
|
StreamFields: streamFields,
|
|
|
|
IgnoreFields: ignoreFields,
|
|
|
|
Debug: debug,
|
|
|
|
DebugRequestURI: debugRequestURI,
|
|
|
|
DebugRemoteAddr: debugRemoteAddr,
|
|
|
|
}
|
|
|
|
return cp, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetProcessLogMessageFunc returns a function, which adds parsed log messages to lr.
|
2023-10-02 16:26:02 +02:00
|
|
|
func (cp *CommonParams) GetProcessLogMessageFunc(lr *logstorage.LogRows) func(timestamp int64, fields []logstorage.Field) {
|
|
|
|
return func(timestamp int64, fields []logstorage.Field) {
|
2023-09-15 15:18:38 +02:00
|
|
|
if len(fields) > *MaxFieldsPerLine {
|
2023-09-18 11:59:46 +02:00
|
|
|
rf := logstorage.RowFormatter(fields)
|
|
|
|
logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf)
|
2023-09-15 15:18:38 +02:00
|
|
|
rowsDroppedTotalTooManyFields.Inc()
|
2023-10-02 16:26:02 +02:00
|
|
|
return
|
2023-09-15 15:18:38 +02:00
|
|
|
}
|
|
|
|
|
2023-06-22 04:39:22 +02:00
|
|
|
lr.MustAdd(cp.TenantID, timestamp, fields)
|
|
|
|
if cp.Debug {
|
|
|
|
s := lr.GetRowString(0)
|
|
|
|
lr.ResetKeepSettings()
|
|
|
|
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", cp.DebugRemoteAddr, cp.DebugRequestURI, s)
|
2023-09-15 15:18:38 +02:00
|
|
|
rowsDroppedTotalDebug.Inc()
|
2023-10-02 16:26:02 +02:00
|
|
|
return
|
2023-06-22 04:39:22 +02:00
|
|
|
}
|
|
|
|
if lr.NeedFlush() {
|
2023-10-02 16:26:02 +02:00
|
|
|
vlstorage.MustAddRows(lr)
|
2023-06-22 04:39:22 +02:00
|
|
|
lr.ResetKeepSettings()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-15 15:18:38 +02:00
|
|
|
var rowsDroppedTotalDebug = metrics.NewCounter(`vl_rows_dropped_total{reason="debug"}`)
|
|
|
|
var rowsDroppedTotalTooManyFields = metrics.NewCounter(`vl_rows_dropped_total{reason="too_many_fields"}`)
|