VictoriaMetrics/app/vlinsert/elasticsearch/elasticsearch.go

284 lines
8.3 KiB
Go
Raw Normal View History

package elasticsearch
import (
"bufio"
"errors"
"flag"
"fmt"
"io"
"math"
"net/http"
"strconv"
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
)
var (
elasticsearchVersion = flag.String("elasticsearch.version", "8.9.0", "Elasticsearch version to report to client")
)
// RequestHandler processes Elasticsearch insert requests
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
w.Header().Add("Content-Type", "application/json")
// This header is needed for Logstash
w.Header().Set("X-Elastic-Product", "Elasticsearch")
if strings.HasPrefix(path, "/_ilm/policy") {
// Return fake response for Elasticsearch ilm request.
fmt.Fprintf(w, `{}`)
return true
}
if strings.HasPrefix(path, "/_index_template") {
// Return fake response for Elasticsearch index template request.
fmt.Fprintf(w, `{}`)
return true
}
if strings.HasPrefix(path, "/_ingest") {
// Return fake response for Elasticsearch ingest pipeline request.
// See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/put-pipeline-api.html
fmt.Fprintf(w, `{}`)
return true
}
if strings.HasPrefix(path, "/_nodes") {
// Return fake response for Elasticsearch nodes discovery request.
// See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/cluster.html
fmt.Fprintf(w, `{}`)
return true
}
switch path {
case "/":
switch r.Method {
case http.MethodGet:
// Return fake response for Elasticsearch ping request.
// See the latest available version for Elasticsearch at https://github.com/elastic/elasticsearch/releases
fmt.Fprintf(w, `{
"version": {
"number": %q
}
}`, *elasticsearchVersion)
case http.MethodHead:
// Return empty response for Logstash ping request.
}
return true
case "/_license":
// Return fake response for Elasticsearch license request.
fmt.Fprintf(w, `{
"license": {
"uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
"type": "oss",
"status": "active",
"expiry_date_in_millis" : 4000000000000
}
}`)
return true
case "/_bulk":
startTime := time.Now()
bulkRequestsTotal.Inc()
cp, err := insertutils.GetCommonParams(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
lr := logstorage.GetLogRows(cp.StreamFields, cp.IgnoreFields)
processLogMessage := cp.GetProcessLogMessageFunc(lr)
isGzip := r.Header.Get("Content-Encoding") == "gzip"
n, err := readBulkRequest(r.Body, isGzip, cp.TimeField, cp.MsgField, processLogMessage)
if err != nil {
logger.Warnf("cannot decode log message #%d in /_bulk request: %s", n, err)
return true
}
err = vlstorage.AddRows(lr)
logstorage.PutLogRows(lr)
if err != nil {
httpserver.Errorf(w, r, "cannot insert rows: %s", err)
}
tookMs := time.Since(startTime).Milliseconds()
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteBulkResponse(bw, n, tookMs)
_ = bw.Flush()
// update bulkRequestDuration only for successfully parsed requests
// There is no need in updating bulkRequestDuration for request errors,
// since their timings are usually much smaller than the timing for successful request parsing.
bulkRequestDuration.UpdateDuration(startTime)
return true
default:
return false
}
}
var (
bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`)
rowsIngestedTotal = metrics.NewCounter(`vl_rows_ingested_total{type="elasticsearch_bulk"}`)
bulkRequestDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}`)
)
func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field) error,
) (int, error) {
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
if isGzip {
zr, err := common.GetGzipReader(r)
if err != nil {
return 0, fmt.Errorf("cannot read gzipped _bulk request: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
}
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
lb := lineBufferPool.Get()
defer lineBufferPool.Put(lb)
lb.B = bytesutil.ResizeNoCopyNoOverallocate(lb.B, insertutils.MaxLineSizeBytes.IntN())
sc := bufio.NewScanner(wcr)
sc.Buffer(lb.B, len(lb.B))
n := 0
nCheckpoint := 0
for {
ok, err := readBulkLine(sc, timeField, msgField, processLogMessage)
wcr.DecConcurrency()
if err != nil || !ok {
rowsIngestedTotal.Add(n - nCheckpoint)
return n, err
}
n++
if batchSize := n - nCheckpoint; n >= 1000 {
rowsIngestedTotal.Add(batchSize)
nCheckpoint = n
}
}
}
var lineBufferPool bytesutil.ByteBufferPool
func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
processLogMessage func(timestamp int64, fields []logstorage.Field) error,
) (bool, error) {
var line []byte
// Read the command, must be "create" or "index"
for len(line) == 0 {
if !sc.Scan() {
if err := sc.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return false, fmt.Errorf(`cannot read "create" or "index" command, since its size exceeds -insert.maxLineSizeBytes=%d`,
insertutils.MaxLineSizeBytes.IntN())
}
return false, err
}
return false, nil
}
line = sc.Bytes()
}
lineStr := bytesutil.ToUnsafeString(line)
if !strings.Contains(lineStr, `"create"`) && !strings.Contains(lineStr, `"index"`) {
return false, fmt.Errorf(`unexpected command %q; expecting "create" or "index"`, line)
}
// Decode log message
if !sc.Scan() {
if err := sc.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return false, fmt.Errorf("cannot read log message, since its size exceeds -insert.maxLineSizeBytes=%d", insertutils.MaxLineSizeBytes.IntN())
}
return false, err
}
return false, fmt.Errorf(`missing log message after the "create" or "index" command`)
}
line = sc.Bytes()
p := logjson.GetParser()
if err := p.ParseLogMessage(line); err != nil {
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
}
app/vlinsert/loki: follow-up after 09df5b66fd7bf33f171d6179748e6f32394ef56e - Parse protobuf if Content-Type isn't set to `application/json` - this behavior is documented at https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki - Properly handle gzip'ped JSON requests. The `gzip` header must be read from `Content-Encoding` instead of `Content-Type` header - Properly flush all the parsed logs with the explicit call to vlstorage.MustAddRows() at the end of query handler - Check JSON field types more strictly. - Allow parsing Loki timestamp as floating-point number. Such a timestamp can be generated by some clients, which store timestamps in float64 instead of int64. - Optimize parsing of Loki labels in Prometheus text exposition format. - Simplify tests. - Remove lib/slicesutil, since there are no more users for it. - Update docs with missing info and fix various typos. For example, it should be enough to have `instance` and `job` labels as stream fields in most Loki setups. - Allow empty of missing timestamps in the ingested logs. The current timestamp at VictoriaLogs side is then used for the ingested logs. This simplifies debugging and testing of the provided HTTP-based data ingestion APIs. The remaining MAJOR issue, which needs to be addressed: victoria-logs binary size increased from 13MB to 22MB after adding support for Loki data ingestion protocol at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4482 . This is because of shitty protobuf dependencies. They must be replaced with another protobuf implementation similar to the one used at lib/prompb or lib/prompbmarshal .
2023-07-21 01:21:47 +02:00
ts, err := extractTimestampFromFields(timeField, p.Fields)
if err != nil {
return false, fmt.Errorf("cannot parse timestamp: %w", err)
}
app/vlinsert/loki: follow-up after 09df5b66fd7bf33f171d6179748e6f32394ef56e - Parse protobuf if Content-Type isn't set to `application/json` - this behavior is documented at https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki - Properly handle gzip'ped JSON requests. The `gzip` header must be read from `Content-Encoding` instead of `Content-Type` header - Properly flush all the parsed logs with the explicit call to vlstorage.MustAddRows() at the end of query handler - Check JSON field types more strictly. - Allow parsing Loki timestamp as floating-point number. Such a timestamp can be generated by some clients, which store timestamps in float64 instead of int64. - Optimize parsing of Loki labels in Prometheus text exposition format. - Simplify tests. - Remove lib/slicesutil, since there are no more users for it. - Update docs with missing info and fix various typos. For example, it should be enough to have `instance` and `job` labels as stream fields in most Loki setups. - Allow empty of missing timestamps in the ingested logs. The current timestamp at VictoriaLogs side is then used for the ingested logs. This simplifies debugging and testing of the provided HTTP-based data ingestion APIs. The remaining MAJOR issue, which needs to be addressed: victoria-logs binary size increased from 13MB to 22MB after adding support for Loki data ingestion protocol at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4482 . This is because of shitty protobuf dependencies. They must be replaced with another protobuf implementation similar to the one used at lib/prompb or lib/prompbmarshal .
2023-07-21 01:21:47 +02:00
if ts == 0 {
ts = time.Now().UnixNano()
}
p.RenameField(msgField, "_msg")
err = processLogMessage(ts, p.Fields)
logjson.PutParser(p)
if err != nil {
return false, err
}
return true, nil
}
func extractTimestampFromFields(timeField string, fields []logstorage.Field) (int64, error) {
for i := range fields {
f := &fields[i]
if f.Name != timeField {
continue
}
timestamp, err := parseElasticsearchTimestamp(f.Value)
if err != nil {
return 0, err
}
f.Value = ""
return timestamp, nil
}
app/vlinsert/loki: follow-up after 09df5b66fd7bf33f171d6179748e6f32394ef56e - Parse protobuf if Content-Type isn't set to `application/json` - this behavior is documented at https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki - Properly handle gzip'ped JSON requests. The `gzip` header must be read from `Content-Encoding` instead of `Content-Type` header - Properly flush all the parsed logs with the explicit call to vlstorage.MustAddRows() at the end of query handler - Check JSON field types more strictly. - Allow parsing Loki timestamp as floating-point number. Such a timestamp can be generated by some clients, which store timestamps in float64 instead of int64. - Optimize parsing of Loki labels in Prometheus text exposition format. - Simplify tests. - Remove lib/slicesutil, since there are no more users for it. - Update docs with missing info and fix various typos. For example, it should be enough to have `instance` and `job` labels as stream fields in most Loki setups. - Allow empty of missing timestamps in the ingested logs. The current timestamp at VictoriaLogs side is then used for the ingested logs. This simplifies debugging and testing of the provided HTTP-based data ingestion APIs. The remaining MAJOR issue, which needs to be addressed: victoria-logs binary size increased from 13MB to 22MB after adding support for Loki data ingestion protocol at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4482 . This is because of shitty protobuf dependencies. They must be replaced with another protobuf implementation similar to the one used at lib/prompb or lib/prompbmarshal .
2023-07-21 01:21:47 +02:00
return 0, nil
}
func parseElasticsearchTimestamp(s string) (int64, error) {
app/vlinsert/loki: follow-up after 09df5b66fd7bf33f171d6179748e6f32394ef56e - Parse protobuf if Content-Type isn't set to `application/json` - this behavior is documented at https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki - Properly handle gzip'ped JSON requests. The `gzip` header must be read from `Content-Encoding` instead of `Content-Type` header - Properly flush all the parsed logs with the explicit call to vlstorage.MustAddRows() at the end of query handler - Check JSON field types more strictly. - Allow parsing Loki timestamp as floating-point number. Such a timestamp can be generated by some clients, which store timestamps in float64 instead of int64. - Optimize parsing of Loki labels in Prometheus text exposition format. - Simplify tests. - Remove lib/slicesutil, since there are no more users for it. - Update docs with missing info and fix various typos. For example, it should be enough to have `instance` and `job` labels as stream fields in most Loki setups. - Allow empty of missing timestamps in the ingested logs. The current timestamp at VictoriaLogs side is then used for the ingested logs. This simplifies debugging and testing of the provided HTTP-based data ingestion APIs. The remaining MAJOR issue, which needs to be addressed: victoria-logs binary size increased from 13MB to 22MB after adding support for Loki data ingestion protocol at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4482 . This is because of shitty protobuf dependencies. They must be replaced with another protobuf implementation similar to the one used at lib/prompb or lib/prompbmarshal .
2023-07-21 01:21:47 +02:00
if s == "0" || s == "" {
// Special case - zero or empty timestamp must be substituted
// with the current time by the caller.
return 0, nil
}
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
// Try parsing timestamp in milliseconds
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp in milliseconds from %q: %w", s, err)
}
if n > int64(math.MaxInt64)/1e6 {
return 0, fmt.Errorf("too big timestamp in milliseconds: %d; mustn't exceed %d", n, int64(math.MaxInt64)/1e6)
}
if n < int64(math.MinInt64)/1e6 {
return 0, fmt.Errorf("too small timestamp in milliseconds: %d; must be bigger than %d", n, int64(math.MinInt64)/1e6)
}
n *= 1e6
return n, nil
}
if len(s) == len("YYYY-MM-DD") {
t, err := time.Parse("2006-01-02", s)
if err != nil {
return 0, fmt.Errorf("cannot parse date %q: %w", s, err)
}
return t.UnixNano(), nil
}
t, err := time.Parse(time.RFC3339, s)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
}
return t.UnixNano(), nil
}