mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-04 22:02:12 +01:00
d5a599badc
- Make sure that invalid/missing TLS CA file or TLS client certificate files at vmagent startup don't prevent from processing the corresponding scrape targets after the file becomes correct, without the need to restart vmagent. Previously scrape targets with invalid TLS CA file or TLS client certificate files were permanently dropped after the first attempt to initialize them, and they didn't appear until the next vmagent reload or the next change in other places of the loaded scrape configs. - Make sure that TLS CA is properly re-loaded from file after it changes without the need to restart vmagent. Previously the old TLS CA was used until vmagent restart. - Properly handle errors during http request creation for the second attempt to send data to remote system at vmagent and vmalert. Previously failed request creation could result in nil pointer dereferencing, since the returned request is nil on error. - Add more context to the logged error during AWS sigv4 request signing before sending the data to -remoteWrite.url at vmagent. Previously it could miss details on the source of the request. - Do not create a new HTTP client per second when generating OAuth2 token needed to put in Authorization header of every http request issued by vmagent during service discovery or target scraping. Re-use the HTTP client instead until the corresponding scrape config changes. - Cache error at lib/promauth.Config.GetAuthHeader() in the same way as the auth header is cached, e.g. the error is cached for a second now. This should reduce load on CPU and OAuth2 server when auth header cannot be obtained because of temporary error. - Share tls.Config.GetClientCertificate function among multiple scrape targets with the same tls_config. Cache the loaded certificate and the error for one second. This should significantly reduce CPU load when scraping big number of targets with the same tls_config. - Allow loading TLS certificates from HTTP and HTTPs urls by specifying these urls at `tls_config->cert_file` and `tls_config->key_file`. - Improve test coverage at lib/promauth - Skip unreachable or invalid files specified at `scrape_config_files` during vmagent startup, since these files may become valid later. Previously vmagent was exitting in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959
419 lines
9.9 KiB
Go
419 lines
9.9 KiB
Go
package influx
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/valyala/fastjson/fastfloat"
|
|
)
|
|
|
|
// Rows contains parsed influx rows.
|
|
type Rows struct {
|
|
Rows []Row
|
|
|
|
tagsPool []Tag
|
|
fieldsPool []Field
|
|
}
|
|
|
|
// Reset resets rs.
|
|
func (rs *Rows) Reset() {
|
|
// Reset rows, tags and fields in order to remove references to old data,
|
|
// so GC could collect it.
|
|
|
|
for i := range rs.Rows {
|
|
rs.Rows[i].reset()
|
|
}
|
|
rs.Rows = rs.Rows[:0]
|
|
|
|
for i := range rs.tagsPool {
|
|
rs.tagsPool[i].reset()
|
|
}
|
|
rs.tagsPool = rs.tagsPool[:0]
|
|
|
|
for i := range rs.fieldsPool {
|
|
rs.fieldsPool[i].reset()
|
|
}
|
|
rs.fieldsPool = rs.fieldsPool[:0]
|
|
}
|
|
|
|
// Unmarshal unmarshals influx line protocol rows from s.
|
|
//
|
|
// See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
|
|
//
|
|
// s shouldn't be modified when rs is in use.
|
|
func (rs *Rows) Unmarshal(s string) {
|
|
rs.Rows, rs.tagsPool, rs.fieldsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], rs.fieldsPool[:0])
|
|
}
|
|
|
|
// Row is a single influx row.
|
|
type Row struct {
|
|
Measurement string
|
|
Tags []Tag
|
|
Fields []Field
|
|
Timestamp int64
|
|
}
|
|
|
|
func (r *Row) reset() {
|
|
r.Measurement = ""
|
|
r.Tags = nil
|
|
r.Fields = nil
|
|
r.Timestamp = 0
|
|
}
|
|
|
|
func (r *Row) unmarshal(s string, tagsPool []Tag, fieldsPool []Field, noEscapeChars bool) ([]Tag, []Field, error) {
|
|
r.reset()
|
|
n := nextUnescapedChar(s, ' ', noEscapeChars)
|
|
if n < 0 {
|
|
return tagsPool, fieldsPool, fmt.Errorf("cannot find Whitespace I in %q", s)
|
|
}
|
|
measurementTags := s[:n]
|
|
s = stripLeadingWhitespace(s[n+1:])
|
|
|
|
// Parse measurement and tags
|
|
var err error
|
|
n = nextUnescapedChar(measurementTags, ',', noEscapeChars)
|
|
if n >= 0 {
|
|
tagsStart := len(tagsPool)
|
|
tagsPool, err = unmarshalTags(tagsPool, measurementTags[n+1:], noEscapeChars)
|
|
if err != nil {
|
|
return tagsPool, fieldsPool, err
|
|
}
|
|
tags := tagsPool[tagsStart:]
|
|
r.Tags = tags[:len(tags):len(tags)]
|
|
measurementTags = measurementTags[:n]
|
|
}
|
|
r.Measurement = unescapeTagValue(measurementTags, noEscapeChars)
|
|
// Allow empty r.Measurement. In this case metric name is constructed directly from field keys.
|
|
|
|
// Parse fields
|
|
fieldsStart := len(fieldsPool)
|
|
hasQuotedFields := nextUnescapedChar(s, '"', noEscapeChars) >= 0
|
|
n = nextUnquotedChar(s, ' ', noEscapeChars, hasQuotedFields)
|
|
if n < 0 {
|
|
// No timestamp.
|
|
fieldsPool, err = unmarshalInfluxFields(fieldsPool, s, noEscapeChars, hasQuotedFields)
|
|
if err != nil {
|
|
return tagsPool, fieldsPool, err
|
|
}
|
|
fields := fieldsPool[fieldsStart:]
|
|
r.Fields = fields[:len(fields):len(fields)]
|
|
return tagsPool, fieldsPool, nil
|
|
}
|
|
fieldsPool, err = unmarshalInfluxFields(fieldsPool, s[:n], noEscapeChars, hasQuotedFields)
|
|
if err != nil {
|
|
if strings.HasPrefix(s[n+1:], "HTTP/") {
|
|
return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " +
|
|
"do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only")
|
|
}
|
|
return tagsPool, fieldsPool, err
|
|
}
|
|
r.Fields = fieldsPool[fieldsStart:]
|
|
s = stripLeadingWhitespace(s[n+1:])
|
|
|
|
// Parse timestamp
|
|
timestamp, err := fastfloat.ParseInt64(s)
|
|
if err != nil {
|
|
if strings.HasPrefix(s, "HTTP/") {
|
|
return tagsPool, fieldsPool, fmt.Errorf("please switch from tcp to http protocol for data ingestion; " +
|
|
"do not set `-influxListenAddr` command-line flag, since it is needed for tcp protocol only")
|
|
}
|
|
return tagsPool, fieldsPool, fmt.Errorf("cannot parse timestamp %q: %w", s, err)
|
|
}
|
|
r.Timestamp = timestamp
|
|
return tagsPool, fieldsPool, nil
|
|
}
|
|
|
|
// Tag represents influx tag.
|
|
type Tag struct {
|
|
Key string
|
|
Value string
|
|
}
|
|
|
|
func (tag *Tag) reset() {
|
|
tag.Key = ""
|
|
tag.Value = ""
|
|
}
|
|
|
|
func (tag *Tag) unmarshal(s string, noEscapeChars bool) error {
|
|
tag.reset()
|
|
n := nextUnescapedChar(s, '=', noEscapeChars)
|
|
if n < 0 {
|
|
return fmt.Errorf("missing tag value for %q", s)
|
|
}
|
|
tag.Key = unescapeTagValue(s[:n], noEscapeChars)
|
|
tag.Value = unescapeTagValue(s[n+1:], noEscapeChars)
|
|
return nil
|
|
}
|
|
|
|
// Field represents influx field.
|
|
type Field struct {
|
|
Key string
|
|
Value float64
|
|
}
|
|
|
|
func (f *Field) reset() {
|
|
f.Key = ""
|
|
f.Value = 0
|
|
}
|
|
|
|
func (f *Field) unmarshal(s string, noEscapeChars, hasQuotedFields bool) error {
|
|
f.reset()
|
|
n := nextUnescapedChar(s, '=', noEscapeChars)
|
|
if n < 0 {
|
|
return fmt.Errorf("missing field value for %q", s)
|
|
}
|
|
f.Key = unescapeTagValue(s[:n], noEscapeChars)
|
|
if len(f.Key) == 0 {
|
|
return fmt.Errorf("field key cannot be empty")
|
|
}
|
|
v, err := parseFieldValue(s[n+1:], hasQuotedFields)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse field value for %q: %w", f.Key, err)
|
|
}
|
|
f.Value = v
|
|
return nil
|
|
}
|
|
|
|
func unmarshalRows(dst []Row, s string, tagsPool []Tag, fieldsPool []Field) ([]Row, []Tag, []Field) {
|
|
noEscapeChars := strings.IndexByte(s, '\\') < 0
|
|
for len(s) > 0 {
|
|
n := strings.IndexByte(s, '\n')
|
|
if n < 0 {
|
|
// The last line.
|
|
return unmarshalRow(dst, s, tagsPool, fieldsPool, noEscapeChars)
|
|
}
|
|
dst, tagsPool, fieldsPool = unmarshalRow(dst, s[:n], tagsPool, fieldsPool, noEscapeChars)
|
|
s = s[n+1:]
|
|
}
|
|
return dst, tagsPool, fieldsPool
|
|
}
|
|
|
|
func unmarshalRow(dst []Row, s string, tagsPool []Tag, fieldsPool []Field, noEscapeChars bool) ([]Row, []Tag, []Field) {
|
|
if len(s) > 0 && s[len(s)-1] == '\r' {
|
|
s = s[:len(s)-1]
|
|
}
|
|
if len(s) == 0 {
|
|
// Skip empty line
|
|
return dst, tagsPool, fieldsPool
|
|
}
|
|
if s[0] == '#' {
|
|
// Skip comment
|
|
return dst, tagsPool, fieldsPool
|
|
}
|
|
|
|
if cap(dst) > len(dst) {
|
|
dst = dst[:len(dst)+1]
|
|
} else {
|
|
dst = append(dst, Row{})
|
|
}
|
|
r := &dst[len(dst)-1]
|
|
var err error
|
|
tagsPool, fieldsPool, err = r.unmarshal(s, tagsPool, fieldsPool, noEscapeChars)
|
|
if err != nil {
|
|
dst = dst[:len(dst)-1]
|
|
logger.Errorf("skipping InfluxDB line %q because of error: %s", s, err)
|
|
invalidLines.Inc()
|
|
}
|
|
return dst, tagsPool, fieldsPool
|
|
}
|
|
|
|
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="influx"}`)
|
|
|
|
func unmarshalTags(dst []Tag, s string, noEscapeChars bool) ([]Tag, error) {
|
|
for {
|
|
if cap(dst) > len(dst) {
|
|
dst = dst[:len(dst)+1]
|
|
} else {
|
|
dst = append(dst, Tag{})
|
|
}
|
|
tag := &dst[len(dst)-1]
|
|
n := nextUnescapedChar(s, ',', noEscapeChars)
|
|
if n < 0 {
|
|
if err := tag.unmarshal(s, noEscapeChars); err != nil {
|
|
return dst[:len(dst)-1], err
|
|
}
|
|
if len(tag.Key) == 0 || len(tag.Value) == 0 {
|
|
// Skip empty tag
|
|
dst = dst[:len(dst)-1]
|
|
}
|
|
return dst, nil
|
|
}
|
|
if err := tag.unmarshal(s[:n], noEscapeChars); err != nil {
|
|
return dst[:len(dst)-1], err
|
|
}
|
|
s = s[n+1:]
|
|
if len(tag.Key) == 0 || len(tag.Value) == 0 {
|
|
// Skip empty tag
|
|
dst = dst[:len(dst)-1]
|
|
}
|
|
}
|
|
}
|
|
|
|
func unmarshalInfluxFields(dst []Field, s string, noEscapeChars, hasQuotedFields bool) ([]Field, error) {
|
|
for {
|
|
if cap(dst) > len(dst) {
|
|
dst = dst[:len(dst)+1]
|
|
} else {
|
|
dst = append(dst, Field{})
|
|
}
|
|
f := &dst[len(dst)-1]
|
|
n := nextUnquotedChar(s, ',', noEscapeChars, hasQuotedFields)
|
|
if n < 0 {
|
|
if err := f.unmarshal(s, noEscapeChars, hasQuotedFields); err != nil {
|
|
return dst, err
|
|
}
|
|
return dst, nil
|
|
}
|
|
if err := f.unmarshal(s[:n], noEscapeChars, hasQuotedFields); err != nil {
|
|
return dst, err
|
|
}
|
|
s = s[n+1:]
|
|
}
|
|
}
|
|
|
|
func unescapeTagValue(s string, noEscapeChars bool) string {
|
|
if noEscapeChars {
|
|
// Fast path - no escape chars.
|
|
return s
|
|
}
|
|
n := strings.IndexByte(s, '\\')
|
|
if n < 0 {
|
|
return s
|
|
}
|
|
|
|
// Slow path. Remove escape chars.
|
|
dst := make([]byte, 0, len(s))
|
|
for {
|
|
dst = append(dst, s[:n]...)
|
|
s = s[n+1:]
|
|
if len(s) == 0 {
|
|
return string(append(dst, '\\'))
|
|
}
|
|
ch := s[0]
|
|
if ch != ' ' && ch != ',' && ch != '=' && ch != '\\' {
|
|
dst = append(dst, '\\')
|
|
}
|
|
dst = append(dst, ch)
|
|
s = s[1:]
|
|
n = strings.IndexByte(s, '\\')
|
|
if n < 0 {
|
|
return string(append(dst, s...))
|
|
}
|
|
}
|
|
}
|
|
|
|
func parseFieldValue(s string, hasQuotedFields bool) (float64, error) {
|
|
if len(s) == 0 {
|
|
return 0, fmt.Errorf("field value cannot be empty")
|
|
}
|
|
if hasQuotedFields && s[0] == '"' {
|
|
if len(s) < 2 || s[len(s)-1] != '"' {
|
|
return 0, fmt.Errorf("missing closing quote for quoted field value %s", s)
|
|
}
|
|
// Try converting quoted string to number, since sometimes InfluxDB agents
|
|
// send numbers as strings.
|
|
s = s[1 : len(s)-1]
|
|
return fastfloat.ParseBestEffort(s), nil
|
|
}
|
|
ch := s[len(s)-1]
|
|
if ch == 'i' {
|
|
// Integer value
|
|
ss := s[:len(s)-1]
|
|
n, err := fastfloat.ParseInt64(ss)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return float64(n), nil
|
|
}
|
|
if ch == 'u' {
|
|
// Unsigned integer value
|
|
ss := s[:len(s)-1]
|
|
n, err := fastfloat.ParseUint64(ss)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return float64(n), nil
|
|
}
|
|
if s == "t" || s == "T" || s == "true" || s == "True" || s == "TRUE" {
|
|
return 1, nil
|
|
}
|
|
if s == "f" || s == "F" || s == "false" || s == "False" || s == "FALSE" {
|
|
return 0, nil
|
|
}
|
|
return fastfloat.ParseBestEffort(s), nil
|
|
}
|
|
|
|
func nextUnescapedChar(s string, ch byte, noEscapeChars bool) int {
|
|
if noEscapeChars {
|
|
// Fast path: just search for ch in s, since s has no escape chars.
|
|
return strings.IndexByte(s, ch)
|
|
}
|
|
|
|
sOrig := s
|
|
again:
|
|
n := strings.IndexByte(s, ch)
|
|
if n < 0 {
|
|
return -1
|
|
}
|
|
if n == 0 {
|
|
return len(sOrig) - len(s) + n
|
|
}
|
|
if s[n-1] != '\\' {
|
|
return len(sOrig) - len(s) + n
|
|
}
|
|
nOrig := n
|
|
slashes := 0
|
|
for n > 0 && s[n-1] == '\\' {
|
|
slashes++
|
|
n--
|
|
}
|
|
if slashes&1 == 0 {
|
|
return len(sOrig) - len(s) + nOrig
|
|
}
|
|
s = s[nOrig+1:]
|
|
goto again
|
|
}
|
|
|
|
func nextUnquotedChar(s string, ch byte, noEscapeChars, hasQuotedFields bool) int {
|
|
if !hasQuotedFields {
|
|
return nextUnescapedChar(s, ch, noEscapeChars)
|
|
}
|
|
sOrig := s
|
|
for {
|
|
n := nextUnescapedChar(s, ch, noEscapeChars)
|
|
if n < 0 {
|
|
return -1
|
|
}
|
|
if !isInQuote(s[:n], noEscapeChars) {
|
|
return n + len(sOrig) - len(s)
|
|
}
|
|
s = s[n+1:]
|
|
n = nextUnescapedChar(s, '"', noEscapeChars)
|
|
if n < 0 {
|
|
return -1
|
|
}
|
|
s = s[n+1:]
|
|
}
|
|
}
|
|
|
|
func isInQuote(s string, noEscapeChars bool) bool {
|
|
isQuote := false
|
|
for {
|
|
n := nextUnescapedChar(s, '"', noEscapeChars)
|
|
if n < 0 {
|
|
return isQuote
|
|
}
|
|
isQuote = !isQuote
|
|
s = s[n+1:]
|
|
}
|
|
}
|
|
|
|
func stripLeadingWhitespace(s string) string {
|
|
for len(s) > 0 && s[0] == ' ' {
|
|
s = s[1:]
|
|
}
|
|
return s
|
|
}
|