diff --git a/app/vlinsert/datadog/datadog.go b/app/vlinsert/datadog/datadog.go index 8bd7f96d8..e53d3a7e9 100644 --- a/app/vlinsert/datadog/datadog.go +++ b/app/vlinsert/datadog/datadog.go @@ -84,7 +84,7 @@ func datadogLogsIngestion(w http.ResponseWriter, r *http.Request) bool { return true } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("datadog") n, err := readLogsRequest(ts, data, lmp.AddRow) lmp.MustClose() if n > 0 { diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index b0e941cd2..4472572ef 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -101,7 +101,7 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { httpserver.Errorf(w, r, "%s", err) return true } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("elasticsearch_bulk") isGzip := r.Header.Get("Content-Encoding") == "gzip" n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgFields, lmp) lmp.MustClose() diff --git a/app/vlinsert/insertutils/common_params.go b/app/vlinsert/insertutils/common_params.go index c357d3f87..4a85bc49f 100644 --- a/app/vlinsert/insertutils/common_params.go +++ b/app/vlinsert/insertutils/common_params.go @@ -154,6 +154,8 @@ type logMessageProcessor struct { cp *CommonParams lr *logstorage.LogRows + + processedBytesTotal *metrics.Counter } func (lmp *logMessageProcessor) initPeriodicFlush() { @@ -187,6 +189,9 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel lmp.mu.Lock() defer lmp.mu.Unlock() + n := getApproxJSONRowLen(fields) + lmp.processedBytesTotal.Add(n) + if len(fields) > *MaxFieldsPerLine { rf := logstorage.RowFormatter(fields) logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf) @@ -207,6 +212,16 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields []logstorage.Fiel } } +// getApproxJSONRowLen returns an approximate length of the log entry with the given fields if represented as JSON. +func getApproxJSONRowLen(fields []logstorage.Field) int { + n := len("{}\n") + n += len(`"_time":""`) + len(time.RFC3339Nano) + for _, f := range fields { + n += len(`,"":""`) + len(f.Name) + len(f.Value) + } + return n +} + // flushLocked must be called under locked lmp.mu. func (lmp *logMessageProcessor) flushLocked() { lmp.lastFlushTime = time.Now() @@ -227,12 +242,15 @@ func (lmp *logMessageProcessor) MustClose() { // NewLogMessageProcessor returns new LogMessageProcessor for the given cp. // // MustClose() must be called on the returned LogMessageProcessor when it is no longer needed. -func (cp *CommonParams) NewLogMessageProcessor() LogMessageProcessor { +func (cp *CommonParams) NewLogMessageProcessor(protocolName string) LogMessageProcessor { lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields, cp.ExtraFields, *defaultMsgValue) + processedBytesTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_bytes_ingested_total{type=%q}", protocolName)) lmp := &logMessageProcessor{ cp: cp, lr: lr, + processedBytesTotal: processedBytesTotal, + stopCh: make(chan struct{}), } lmp.initPeriodicFlush() diff --git a/app/vlinsert/journald/journald.go b/app/vlinsert/journald/journald.go index 1cf76c354..68509a571 100644 --- a/app/vlinsert/journald/journald.go +++ b/app/vlinsert/journald/journald.go @@ -120,7 +120,7 @@ func handleJournald(r *http.Request, w http.ResponseWriter) { return } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("journald") n, err := parseJournaldRequest(data, lmp, cp) lmp.MustClose() if err != nil { @@ -138,12 +138,12 @@ func handleJournald(r *http.Request, w http.ResponseWriter) { } var ( - rowsIngestedJournaldTotal = metrics.NewCounter(`vl_rows_ingested_total{type="journald", format="journald"}`) + rowsIngestedJournaldTotal = metrics.NewCounter(`vl_rows_ingested_total{type="journald"}`) - requestsJournaldTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/journald/upload",format="journald"}`) - errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/journald/upload",format="journald"}`) + requestsJournaldTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/journald/upload"}`) + errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/journald/upload"}`) - requestJournaldDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/journald/upload",format="journald"}`) + requestJournaldDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/journald/upload"}`) ) // See https://systemd.io/JOURNAL_EXPORT_FORMATS/#journal-export-format diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index 1db2cf7ca..732d2a2dc 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -52,7 +52,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) { reader = zr } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("jsonline") err = processStreamInternal(reader, cp.TimeField, cp.MsgFields, lmp) lmp.MustClose() diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go index 768691c11..8b8f18415 100644 --- a/app/vlinsert/loki/loki_json.go +++ b/app/vlinsert/loki/loki_json.go @@ -53,7 +53,7 @@ func handleJSON(r *http.Request, w http.ResponseWriter) { httpserver.Errorf(w, r, "%s", err) return } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("loki_json") n, err := parseJSONRequest(data, lmp) lmp.MustClose() if err != nil { @@ -71,7 +71,7 @@ func handleJSON(r *http.Request, w http.ResponseWriter) { var ( requestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`) - rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`) + rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki_json"}`) requestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) ) diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go index 2c1ac6b39..014b1bc53 100644 --- a/app/vlinsert/loki/loki_protobuf.go +++ b/app/vlinsert/loki/loki_protobuf.go @@ -44,7 +44,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { httpserver.Errorf(w, r, "%s", err) return } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("loki_protobuf") n, err := parseProtobufRequest(data, lmp) lmp.MustClose() if err != nil { @@ -62,7 +62,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { var ( requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`) - rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) + rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki_protobuf"}`) requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) ) diff --git a/app/vlinsert/opentelemetry/opentelemetry.go b/app/vlinsert/opentelemetry/opentelemetry.go index b300500ca..5a5882c60 100644 --- a/app/vlinsert/opentelemetry/opentelemetry.go +++ b/app/vlinsert/opentelemetry/opentelemetry.go @@ -66,7 +66,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { return } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("opentelelemtry_protobuf") n, err := pushProtobufRequest(data, lmp) lmp.MustClose() if err != nil { @@ -83,7 +83,7 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) { } var ( - rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="opentelemetry",format="protobuf"}`) + rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="opentelemetry_protobuf"}`) requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) diff --git a/app/vlinsert/syslog/syslog.go b/app/vlinsert/syslog/syslog.go index 7211204ab..4ad43fe70 100644 --- a/app/vlinsert/syslog/syslog.go +++ b/app/vlinsert/syslog/syslog.go @@ -314,7 +314,7 @@ func serveUDP(ln net.PacketConn, tenantID logstorage.TenantID, compressMethod st } bb.B = bb.B[:n] udpRequestsTotal.Inc() - if err := processStream(bb.NewReader(), compressMethod, useLocalTimestamp, cp); err != nil { + if err := processStream("udp", bb.NewReader(), compressMethod, useLocalTimestamp, cp); err != nil { logger.Errorf("syslog: cannot process UDP data from %s at %s: %s", remoteAddr, localAddr, err) } } @@ -354,7 +354,7 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri wg.Add(1) go func() { cp := insertutils.GetCommonParamsForSyslog(tenantID, streamFields, ignoreFields, extraFields) - if err := processStream(c, compressMethod, useLocalTimestamp, cp); err != nil { + if err := processStream("tcp", c, compressMethod, useLocalTimestamp, cp); err != nil { logger.Errorf("syslog: cannot process TCP data at %q: %s", addr, err) } @@ -369,12 +369,12 @@ func serveTCP(ln net.Listener, tenantID logstorage.TenantID, compressMethod stri } // processStream parses a stream of syslog messages from r and ingests them into vlstorage. -func processStream(r io.Reader, compressMethod string, useLocalTimestamp bool, cp *insertutils.CommonParams) error { +func processStream(protocol string, r io.Reader, compressMethod string, useLocalTimestamp bool, cp *insertutils.CommonParams) error { if err := vlstorage.CanWriteData(); err != nil { return err } - lmp := cp.NewLogMessageProcessor() + lmp := cp.NewLogMessageProcessor("syslog_" + protocol) err := processStreamInternal(r, compressMethod, useLocalTimestamp, lmp) lmp.MustClose() diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index d80a1dbe0..be1ca475a 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -10,6 +10,7 @@ menu: aliases: - /VictoriaLogs/CHANGELOG.html --- + The following `tip` changes can be tested by building VictoriaLogs from the latest commit of [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/) repository according to [these docs](https://docs.victoriametrics.com/victorialogs/quickstart/#building-from-source-code) @@ -18,7 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add frontend-only pagination for table view. * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): improve memory consumption during data processing. This enhancement reduces the overall memory footprint, leading to better performance and stability. * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): reduce memory usage across all tabs for improved performance and stability. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7185). -* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for template alias in predefined panels. This allows creating more readable metric names in the legend using constructions like `{{label_name}}`, where `label_name` is the name of the label. [See this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/116101da78a4dee8bd7c4ba0e66458fd05a10469#diff-95141489b32468cf852d2705d96eaa48c50a8b1cdd0424a29e7ca289912a6dcbR140-R151) +* FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): expose `vl_bytes_ingested_total` [counter](https://docs.victoriametrics.com/keyconcepts/#counter) at `/metrics` page. This counter tracks an estimated number of bytes processed when parsing the ingested logs. This counter is exposed individually per every [supported data ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/) - the protocol name is exposed in the `type` label. For example, `vl_bytes_ingested_total{type="jsonline"}` tracks an estimated number of bytes processed when reading the ingested logs via [json line protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api). Thanks to @tenmozes for the idea and [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7682). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix for `showLegend` and `alias` flags in predefined panels. [See this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7565)