app/vlinsert: follow-up for d570763c91

- Switch from summary to histogram for vl_http_request_duration_seconds metric.
  This allows calculating request duration quantiles across multiple hosts
  via histogram_quantile(0.99, sum(vl_http_request_duration_seconds_bucket) by (vmrange)).
- Take into account only successfully processed data ingestion requests
  when updating vl_http_request_duration_seconds histogram.
  Failed requests are ignored, since they may significantly skew measurements.
- Clarify the description of the change at docs/VictoriaLogs/CHANGELOG.md.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934
This commit is contained in:
Aliaksandr Valialkin 2023-09-18 23:58:32 +02:00
parent de2b3ff9b0
commit cc8f2bee0d
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
6 changed files with 50 additions and 31 deletions

View File

@ -86,7 +86,6 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
return true return true
case "/_bulk": case "/_bulk":
startTime := time.Now() startTime := time.Now()
defer bulkRequestDuration.UpdateDuration(startTime)
bulkRequestsTotal.Inc() bulkRequestsTotal.Inc()
cp, err := insertutils.GetCommonParams(r) cp, err := insertutils.GetCommonParams(r)
@ -110,6 +109,12 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
defer bufferedwriter.Put(bw) defer bufferedwriter.Put(bw)
WriteBulkResponse(bw, n, tookMs) WriteBulkResponse(bw, n, tookMs)
_ = bw.Flush() _ = bw.Flush()
// update bulkRequestDuration only for successfully parsed requests
// There is no need in updating bulkRequestDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing.
bulkRequestDuration.UpdateDuration(startTime)
return true return true
default: default:
return false return false
@ -118,7 +123,8 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
var ( var (
bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`) bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`)
bulkRequestDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`) rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elasticsearch_bulk"}`)
bulkRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`)
) )
func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
@ -164,8 +170,6 @@ func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
var lineBufferPool bytesutil.ByteBufferPool var lineBufferPool bytesutil.ByteBufferPool
var rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elasticsearch_bulk"}`)
func readBulkLine(sc *bufio.Scanner, timeField, msgField string, func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field), processLogMessage func(timestamp int64, fields []logstorage.Field),
) (bool, error) { ) (bool, error) {

View File

@ -19,11 +19,9 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var jsonlineRequestDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/jsonline"}`)
// RequestHandler processes jsonline insert requests // RequestHandler processes jsonline insert requests
func RequestHandler(w http.ResponseWriter, r *http.Request) bool { func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
defer jsonlineRequestDuration.UpdateDuration(time.Now()) startTime := time.Now()
w.Header().Add("Content-Type", "application/json") w.Header().Add("Content-Type", "application/json")
if r.Method != "POST" { if r.Method != "POST" {
@ -80,6 +78,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
vlstorage.MustAddRows(lr) vlstorage.MustAddRows(lr)
logstorage.PutLogRows(lr) logstorage.PutLogRows(lr)
// update jsonlineRequestDuration only for successfully parsed requests.
// There is no need in updating jsonlineRequestDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing.
jsonlineRequestDuration.UpdateDuration(startTime)
return true return true
} }
@ -147,6 +150,7 @@ func parseISO8601Timestamp(s string) (int64, error) {
var lineBufferPool bytesutil.ByteBufferPool var lineBufferPool bytesutil.ByteBufferPool
var ( var (
requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`) requestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/jsonline"}`)
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`) rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="jsonline"}`)
jsonlineRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/jsonline"}`)
) )

View File

@ -2,21 +2,11 @@ package loki
import ( import (
"net/http" "net/http"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
) )
var (
lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`)
lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
lokiRequestJSONDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
lokiRequestProtobufDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
)
// RequestHandler processes Loki insert requests // RequestHandler processes Loki insert requests
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
switch path { switch path {
@ -37,13 +27,9 @@ func handleInsert(r *http.Request, w http.ResponseWriter) bool {
contentType := r.Header.Get("Content-Type") contentType := r.Header.Get("Content-Type")
switch contentType { switch contentType {
case "application/json": case "application/json":
defer lokiRequestJSONDuration.UpdateDuration(time.Now())
lokiRequestsJSONTotal.Inc()
return handleJSON(r, w) return handleJSON(r, w)
default: default:
// Protobuf request body should be handled by default according to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki // Protobuf request body should be handled by default according to https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
defer lokiRequestProtobufDuration.UpdateDuration(time.Now())
lokiRequestsProtobufTotal.Inc()
return handleProtobuf(r, w) return handleProtobuf(r, w)
} }
} }

View File

@ -18,12 +18,11 @@ import (
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
) )
var ( var parserPool fastjson.ParserPool
rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`)
parserPool fastjson.ParserPool
)
func handleJSON(r *http.Request, w http.ResponseWriter) bool { func handleJSON(r *http.Request, w http.ResponseWriter) bool {
startTime := time.Now()
lokiRequestsJSONTotal.Inc()
reader := r.Body reader := r.Body
if r.Header.Get("Content-Encoding") == "gzip" { if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(reader) zr, err := common.GetGzipReader(reader)
@ -58,9 +57,21 @@ func handleJSON(r *http.Request, w http.ResponseWriter) bool {
return true return true
} }
rowsIngestedJSONTotal.Add(n) rowsIngestedJSONTotal.Add(n)
// update lokiRequestJSONDuration only for successfully parsed requests
// There is no need in updating lokiRequestJSONDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing.
lokiRequestJSONDuration.UpdateDuration(startTime)
return true return true
} }
var (
lokiRequestsJSONTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="json"}`)
rowsIngestedJSONTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="json"}`)
lokiRequestJSONDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="json"}`)
)
func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { func parseJSONRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
p := parserPool.Get() p := parserPool.Get()
defer parserPool.Put(p) defer parserPool.Put(p)

View File

@ -19,12 +19,13 @@ import (
) )
var ( var (
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`) bytesBufPool bytesutil.ByteBufferPool
bytesBufPool bytesutil.ByteBufferPool pushReqsPool sync.Pool
pushReqsPool sync.Pool
) )
func handleProtobuf(r *http.Request, w http.ResponseWriter) bool { func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
startTime := time.Now()
lokiRequestsProtobufTotal.Inc()
wcr := writeconcurrencylimiter.GetReader(r.Body) wcr := writeconcurrencylimiter.GetReader(r.Body)
data, err := io.ReadAll(wcr) data, err := io.ReadAll(wcr)
writeconcurrencylimiter.PutReader(wcr) writeconcurrencylimiter.PutReader(wcr)
@ -47,10 +48,23 @@ func handleProtobuf(r *http.Request, w http.ResponseWriter) bool {
httpserver.Errorf(w, r, "cannot parse loki request: %s", err) httpserver.Errorf(w, r, "cannot parse loki request: %s", err)
return true return true
} }
rowsIngestedProtobufTotal.Add(n) rowsIngestedProtobufTotal.Add(n)
// update lokiRequestProtobufDuration only for successfully parsed requests
// There is no need in updating lokiRequestProtobufDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing.
lokiRequestProtobufDuration.UpdateDuration(startTime)
return true return true
} }
var (
lokiRequestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/loki/api/v1/push",format="protobuf"}`)
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="loki",format="protobuf"}`)
lokiRequestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/loki/api/v1/push",format="protobuf"}`)
)
func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) { func parseProtobufRequest(data []byte, processLogMessage func(timestamp int64, fields []logstorage.Field)) (int, error) {
bb := bytesBufPool.Get() bb := bytesBufPool.Get()
defer bytesBufPool.Put(bb) defer bytesBufPool.Put(bb)

View File

@ -10,7 +10,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
* `vl_data_size_bytes{type="storage"}` - on-disk size for data excluding [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) indexes. * `vl_data_size_bytes{type="storage"}` - on-disk size for data excluding [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) indexes.
* `vl_data_size_bytes{type="indexdb"}` - on-disk size for [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) indexes. * `vl_data_size_bytes{type="indexdb"}` - on-disk size for [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) indexes.
* FEATURE: add `-insert.maxFieldsPerLine` command-line flag, which can be used for limiting the number of fields per line in logs sent to VictoriaLogs via ingestion protocols. This helps to avoid issues like [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762). * FEATURE: add `-insert.maxFieldsPerLine` command-line flag, which can be used for limiting the number of fields per line in logs sent to VictoriaLogs via ingestion protocols. This helps to avoid issues like [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762).
* FEATURE: expose `vl_http_request_duration_seconds` metric at the [/metrics](monitoring). See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934) for details. * FEATURE: expose `vl_http_request_duration_seconds` histogram at the [/metrics](https://docs.victoriametrics.com/VictoriaLogs/#monitoring) page. Thanks to @crossoverJie for [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4934).
* BUGFIX: fix possible panic when no data is written to VictoriaLogs for a long time. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4895). Thanks to @crossoverJie for filing and fixing the issue. * BUGFIX: fix possible panic when no data is written to VictoriaLogs for a long time. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4895). Thanks to @crossoverJie for filing and fixing the issue.
* BUGFIX: add `/insert/loky/ready` endpoint, which is used by Promtail for healthchecks. This should remove `unsupported path requested: /insert/loki/ready` warning logs. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762#issuecomment-1690966722). * BUGFIX: add `/insert/loky/ready` endpoint, which is used by Promtail for healthchecks. This should remove `unsupported path requested: /insert/loki/ready` warning logs. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762#issuecomment-1690966722).