From cc8f2bee0d40c8d6b206e186ff74eedf50eabcf5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 18 Sep 2023 23:58:32 +0200 Subject: [PATCH] app/vlinsert: follow-up for d570763c91a235cfcbaf1b799cbfbefa557ee7e1 - Switch from summary to histogram for vl_http_request_duration_seconds metric. This allows calculating request duration quantiles across multiple hosts via histogram_quantile(0.99, sum(vl_http_request_duration_seconds_bucket) by (vmrange)). - Take into account only successfully processed data ingestion requests when updating vl_http_request_duration_seconds histogram. Failed requests are ignored, since they may significantly skew measurements. - Clarify the description of the change at docs/VictoriaLogs/CHANGELOG.md. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934 --- app/vlinsert/elasticsearch/elasticsearch.go | 12 ++++++++---- app/vlinsert/jsonline/jsonline.go | 14 +++++++++----- app/vlinsert/loki/loki.go | 14 -------------- app/vlinsert/loki/loki_json.go | 19 +++++++++++++++---- app/vlinsert/loki/loki_protobuf.go | 20 +++++++++++++++++--- docs/VictoriaLogs/CHANGELOG.md | 2 +- 6 files changed, 50 insertions(+), 31 deletions(-) diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index aaa8457450..a3ae68f19e 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -86,7 +86,6 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { return true case "/_bulk": startTime := time.Now() - defer bulkRequestDuration.UpdateDuration(startTime) bulkRequestsTotal.Inc() cp, err := insertutils.GetCommonParams(r) @@ -110,6 +109,12 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { defer bufferedwriter.Put(bw) WriteBulkResponse(bw, n, tookMs) _ = bw.Flush() + + // update bulkRequestDuration only for successfully parsed requests + // There is no need in updating bulkRequestDuration for request errors, + // since their timings are usually much smaller than the timing for successful request parsing. + bulkRequestDuration.UpdateDuration(startTime) + return true default: return false @@ -118,7 +123,8 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { var ( bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`) - bulkRequestDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`) + rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elasticsearch_bulk"}`) + bulkRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`) ) func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, @@ -164,8 +170,6 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, var lineBufferPool bytesutil.ByteBufferPool -var rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elasticsearch_bulk"}`) - func readBulkLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field), ) (bool, error) { diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index 965f1b15c9..bf8d4760ec 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -19,11 +19,9 @@ import ( "github.com/VictoriaMetrics/metrics" ) -var jsonlineRequestDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/jsonline"}`) - // RequestHandler processes jsonline insert requests func RequestHandler(w http.ResponseWriter, r *http.Request) bool { - defer jsonlineRequestDuration.UpdateDuration(time.Now()) + startTime := time.Now() w.Header().Add("Content-Type", "application/json") if r.Method != "POST" { @@ -80,6 +78,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { vlstorage.MustAddRows(lr) logstorage.PutLogRows(lr) + // update jsonlineRequestDuration only for successfully parsed requests. + // There is no need in updating jsonlineRequestDuration for request errors, + // since their timings are usually much smaller than the timing for successful request parsing. + jsonlineRequestDuration.UpdateDuration(startTime) + return true } @@ -147,6 +150,7 @@ func parseISO8601Timestamp(s string) (int64, error) { var lineBufferPool bytesutil.ByteBufferPool var ( - requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`) - rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`) + requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`) + rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`) + jsonlineRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`) ) diff --git a/app/vlinsert/loki/loki.go b/app/vlinsert/loki/loki.go index 98bb688117..e7a3c0b7d0 100644 --- a/app/vlinsert/loki/loki.go +++ b/app/vlinsert/loki/loki.go @@ -2,21 +2,11 @@ package loki import ( "net/http" - "time" - - "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) -var ( - lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`) - lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`) - lokiRequestJSONDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) - lokiRequestProtobufDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) -) - // RequestHandler processes Loki insert requests func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { switch path { @@ -37,13 +27,9 @@ func handleInsert(r *http.Request, w http.ResponseWriter) bool { contentType := r.Header.Get("Content-Type") switch contentType { case "application/json": - defer lokiRequestJSONDuration.UpdateDuration(time.Now()) - lokiRequestsJSONTotal.Inc() return handleJSON(r, w) default: // Protobuf request body should be handled by default according to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki - defer lokiRequestProtobufDuration.UpdateDuration(time.Now()) - lokiRequestsProtobufTotal.Inc() return handleProtobuf(r, w) } } diff --git a/app/vlinsert/loki/loki_json.go b/app/vlinsert/loki/loki_json.go index 381aa34364..88a75df1d0 100644 --- a/app/vlinsert/loki/loki_json.go +++ b/app/vlinsert/loki/loki_json.go @@ -18,12 +18,11 @@ import ( "github.com/valyala/fastjson" ) -var ( - rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`) - parserPool fastjson.ParserPool -) +var parserPool fastjson.ParserPool func handleJSON(r *http.Request, w http.ResponseWriter) bool { + startTime := time.Now() + lokiRequestsJSONTotal.Inc() reader := r.Body if r.Header.Get("Content-Encoding") == "gzip" { zr, err := common.GetGzipReader(reader) @@ -58,9 +57,21 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool { return true } rowsIngestedJSONTotal.Add(n) + + // update lokiRequestJSONDuration only for successfully parsed requests + // There is no need in updating lokiRequestJSONDuration for request errors, + // since their timings are usually much smaller than the timing for successful request parsing. + lokiRequestJSONDuration.UpdateDuration(startTime) + return true } +var ( + lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`) + rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`) + lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`) +) + func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { p := parserPool.Get() defer parserPool.Put(p) diff --git a/app/vlinsert/loki/loki_protobuf.go b/app/vlinsert/loki/loki_protobuf.go index a75bf93d00..aa4e6b592f 100644 --- a/app/vlinsert/loki/loki_protobuf.go +++ b/app/vlinsert/loki/loki_protobuf.go @@ -19,12 +19,13 @@ import ( ) var ( - rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) - bytesBufPool bytesutil.ByteBufferPool - pushReqsPool sync.Pool + bytesBufPool bytesutil.ByteBufferPool + pushReqsPool sync.Pool ) func handleProtobuf(r *http.Request, w http.ResponseWriter) bool { + startTime := time.Now() + lokiRequestsProtobufTotal.Inc() wcr := writeconcurrencylimiter.GetReader(r.Body) data, err := io.ReadAll(wcr) writeconcurrencylimiter.PutReader(wcr) @@ -47,10 +48,23 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) bool { httpserver.Errorf(w, r, "cannot parse loki request: %s", err) return true } + rowsIngestedProtobufTotal.Add(n) + + // update lokiRequestProtobufDuration only for successfully parsed requests + // There is no need in updating lokiRequestProtobufDuration for request errors, + // since their timings are usually much smaller than the timing for successful request parsing. + lokiRequestProtobufDuration.UpdateDuration(startTime) + return true } +var ( + lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`) + rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) + lokiRequestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`) +) + func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { bb := bytesBufPool.Get() defer bytesBufPool.Put(bb) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 6a0c121ba0..f2bb6d29aa 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -10,7 +10,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * `vl_data_size_bytes{type="storage"}` - on-disk size for data excluding [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) indexes. * `vl_data_size_bytes{type="indexdb"}` - on-disk size for [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) indexes. * FEATURE: add `-insert.maxFieldsPerLine` command-line flag, which can be used for limiting the number of fields per line in logs sent to VictoriaLogs via ingestion protocols. This helps to avoid issues like [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762). -* FEATURE: expose `vl_http_request_duration_seconds` metric at the [/metrics](monitoring). See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934) for details. +* FEATURE: expose `vl_http_request_duration_seconds` histogram at the [/metrics](https://docs.victoriametrics.com/VictoriaLogs/#monitoring) page. Thanks to @crossoverJie for [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934). * BUGFIX: fix possible panic when no data is written to VictoriaLogs for a long time. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4895). Thanks to @crossoverJie for filing and fixing the issue. * BUGFIX: add `/insert/loky/ready` endpoint, which is used by Promtail for healthchecks. This should remove `unsupported path requested: /insert/loki/ready` warning logs. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762#issuecomment-1690966722).