lib/protoparser/graphite: accept whitespace in metric names and tags according to the specification

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/99
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102

See the specification https://graphite.readthedocs.io/en/latest/tags.html
This commit is contained in:
Aliaksandr Valialkin 2022-09-26 15:17:22 +03:00
parent 938d7a932c
commit 2b98f2bc1a
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
3 changed files with 117 additions and 51 deletions

View File

@ -21,6 +21,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105). * FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105).
* FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113). * FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113).
* FEATURE: accept whitespace in metric names and tags ingested via [Graphite plaintext protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) according to [the specs](https://graphite.readthedocs.io/en/latest/tags.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102).
* FEATURE: check the correctess of raw sample timestamps stored on disk when reading them. This reduces the probability of possible silent corruption of the data stored on disk. This should help [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2998) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3011). * FEATURE: check the correctess of raw sample timestamps stored on disk when reading them. This reduces the probability of possible silent corruption of the data stored on disk. This should help [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2998) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3011).
* FEATURE: atomically delete directories with snapshots, parts and partitions at [storage level](https://docs.victoriametrics.com/#storage). Previously such directories can be left in partially deleted state when the deletion operation was interrupted by unclean shutdown. This may result in `cannot open file ...: no such file or directory` error on the next start. The probability of this error was quite high when NFS or EFS was used as persistent storage for VictoriaMetrics data. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3038). * FEATURE: atomically delete directories with snapshots, parts and partitions at [storage level](https://docs.victoriametrics.com/#storage). Previously such directories can be left in partially deleted state when the deletion operation was interrupted by unclean shutdown. This may result in `cannot open file ...: no such file or directory` error on the next start. The probability of this error was quite high when NFS or EFS was used as persistent storage for VictoriaMetrics data. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3038).
* FEATURE: set the `start` arg to `end - 5 minutes` if isn't passed explicitly to [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3052). * FEATURE: set the `start` arg to `end - 5 minutes` if isn't passed explicitly to [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3052).

View File

@ -61,9 +61,6 @@ func (r *Row) reset() {
// UnmarshalMetricAndTags unmarshals metric and optional tags from s. // UnmarshalMetricAndTags unmarshals metric and optional tags from s.
func (r *Row) UnmarshalMetricAndTags(s string, tagsPool []Tag) ([]Tag, error) { func (r *Row) UnmarshalMetricAndTags(s string, tagsPool []Tag) ([]Tag, error) {
if strings.Contains(s, " ") {
return tagsPool, fmt.Errorf("unexpected whitespace found in %q", s)
}
n := strings.IndexByte(s, ';') n := strings.IndexByte(s, ';')
if n < 0 { if n < 0 {
// No tags // No tags
@ -72,11 +69,7 @@ func (r *Row) UnmarshalMetricAndTags(s string, tagsPool []Tag) ([]Tag, error) {
// Tags found // Tags found
r.Metric = s[:n] r.Metric = s[:n]
tagsStart := len(tagsPool) tagsStart := len(tagsPool)
var err error tagsPool = unmarshalTags(tagsPool, s[n+1:])
tagsPool, err = unmarshalTags(tagsPool, s[n+1:])
if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal tags: %w", err)
}
tags := tagsPool[tagsStart:] tags := tagsPool[tagsStart:]
r.Tags = tags[:len(tags):len(tags)] r.Tags = tags[:len(tags):len(tags)]
} }
@ -88,40 +81,43 @@ func (r *Row) UnmarshalMetricAndTags(s string, tagsPool []Tag) ([]Tag, error) {
func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) { func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
r.reset() r.reset()
n := strings.IndexAny(s, graphiteSeparators) sOrig := s
s = stripTrailingWhitespace(s)
n := strings.LastIndexAny(s, graphiteSeparators)
if n < 0 { if n < 0 {
return tagsPool, fmt.Errorf("cannot find separator between metric and value in %q", s) return tagsPool, fmt.Errorf("cannot find separator between value and timestamp in %q", s)
} }
metricAndTags := s[:n] timestampStr := s[n+1:]
tail := stripLeadingWhitespace(s[n+1:]) valueStr := ""
metricAndTags := ""
s = stripTrailingWhitespace(s[:n])
n = strings.LastIndexAny(s, graphiteSeparators)
if n < 0 {
// Missing timestamp
metricAndTags = stripLeadingWhitespace(s)
valueStr = timestampStr
timestampStr = ""
} else {
metricAndTags = stripLeadingWhitespace(s[:n])
valueStr = s[n+1:]
}
metricAndTags = stripTrailingWhitespace(metricAndTags)
tagsPool, err := r.UnmarshalMetricAndTags(metricAndTags, tagsPool) tagsPool, err := r.UnmarshalMetricAndTags(metricAndTags, tagsPool)
if err != nil { if err != nil {
return tagsPool, err return tagsPool, fmt.Errorf("cannot parse metric and tags from %q: %w; original line: %q", metricAndTags, err, sOrig)
} }
if len(timestampStr) > 0 {
n = strings.IndexAny(tail, graphiteSeparators) ts, err := fastfloat.Parse(timestampStr)
if n < 0 {
// There is no timestamp. Use default timestamp instead.
v, err := fastfloat.Parse(tail)
if err != nil { if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w", tail, err) return tagsPool, fmt.Errorf("cannot unmarshal timestamp from %q: %w; orignal line: %q", timestampStr, err, sOrig)
} }
r.Value = v
return tagsPool, nil
}
v, err := fastfloat.Parse(tail[:n])
if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w", tail[:n], err)
}
tail = stripLeadingWhitespace(tail[n+1:])
tail = stripTrailingWhitespace(tail)
ts, err := fastfloat.Parse(tail)
if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal timestamp from %q: %w", tail, err)
}
r.Value = v
r.Timestamp = int64(ts) r.Timestamp = int64(ts)
}
v, err := fastfloat.Parse(valueStr)
if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w; original line: %q", valueStr, err, sOrig)
}
r.Value = v
return tagsPool, nil return tagsPool, nil
} }
@ -142,11 +138,11 @@ func unmarshalRow(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
if len(s) > 0 && s[len(s)-1] == '\r' { if len(s) > 0 && s[len(s)-1] == '\r' {
s = s[:len(s)-1] s = s[:len(s)-1]
} }
s = stripLeadingWhitespace(s)
if len(s) == 0 { if len(s) == 0 {
// Skip empty line // Skip empty line
return dst, tagsPool return dst, tagsPool
} }
if cap(dst) > len(dst) { if cap(dst) > len(dst) {
dst = dst[:len(dst)+1] dst = dst[:len(dst)+1]
} else { } else {
@ -165,7 +161,7 @@ func unmarshalRow(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="graphite"}`) var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="graphite"}`)
func unmarshalTags(dst []Tag, s string) ([]Tag, error) { func unmarshalTags(dst []Tag, s string) []Tag {
for { for {
if cap(dst) > len(dst) { if cap(dst) > len(dst) {
dst = dst[:len(dst)+1] dst = dst[:len(dst)+1]
@ -182,7 +178,7 @@ func unmarshalTags(dst []Tag, s string) ([]Tag, error) {
// Skip empty tag // Skip empty tag
dst = dst[:len(dst)-1] dst = dst[:len(dst)-1]
} }
return dst, nil return dst
} }
tag.unmarshal(s[:n]) tag.unmarshal(s[:n])
s = s[n+1:] s = s[n+1:]

View File

@ -19,13 +19,6 @@ func TestUnmarshalMetricAndTagsFailure(t *testing.T) {
} }
f("") f("")
f(";foo=bar") f(";foo=bar")
f(" ")
f("foo ;bar=baz")
f("f oo;bar=baz")
f("foo;bar=baz ")
f("foo;bar= baz")
f("foo;bar=b az")
f("foo;b ar=baz")
} }
func TestUnmarshalMetricAndTagsSuccess(t *testing.T) { func TestUnmarshalMetricAndTagsSuccess(t *testing.T) {
@ -40,10 +33,67 @@ func TestUnmarshalMetricAndTagsSuccess(t *testing.T) {
t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &r, rExpected) t.Fatalf("unexpected row;\ngot\n%+v\nwant\n%+v", &r, rExpected)
} }
} }
f(" ", &Row{
Metric: " ",
})
f("foo ;bar=baz", &Row{
Metric: "foo ",
Tags: []Tag{
{
Key: "bar",
Value: "baz",
},
},
})
f("f oo;bar=baz", &Row{
Metric: "f oo",
Tags: []Tag{
{
Key: "bar",
Value: "baz",
},
},
})
f("foo;bar=baz ", &Row{
Metric: "foo",
Tags: []Tag{
{
Key: "bar",
Value: "baz ",
},
},
})
f("foo;bar= baz", &Row{
Metric: "foo",
Tags: []Tag{
{
Key: "bar",
Value: " baz",
},
},
})
f("foo;bar=b az", &Row{
Metric: "foo",
Tags: []Tag{
{
Key: "bar",
Value: "b az",
},
},
})
f("foo;b ar=baz", &Row{
Metric: "foo",
Tags: []Tag{
{
Key: "b ar",
Value: "baz",
},
},
})
f("foo", &Row{ f("foo", &Row{
Metric: "foo", Metric: "foo",
}) })
f("foo;bar=123;baz=aabb", &Row{ f("foo;bar=123;baz=aa=bb", &Row{
Metric: "foo", Metric: "foo",
Tags: []Tag{ Tags: []Tag{
{ {
@ -52,7 +102,7 @@ func TestUnmarshalMetricAndTagsSuccess(t *testing.T) {
}, },
{ {
Key: "baz", Key: "baz",
Value: "aabb", Value: "aa=bb",
}, },
}, },
}) })
@ -74,16 +124,9 @@ func TestRowsUnmarshalFailure(t *testing.T) {
} }
} }
// Missing metric
f(" 123 455")
// Missing value // Missing value
f("aaa") f("aaa")
// unexpected space in tag value
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/99
f("s;tag1=aaa1;tag2=bb b2;tag3=ccc3 1")
// invalid value // invalid value
f("aa bb") f("aa bb")
@ -103,7 +146,7 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
// Try unmarshaling again // Try unmarshaling again
rows.Unmarshal(s) rows.Unmarshal(s)
if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) t.Fatalf("unexpected rows on second unmarshal;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
} }
rows.Reset() rows.Reset()
@ -119,6 +162,12 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
f("\n\r\n", &Rows{}) f("\n\r\n", &Rows{})
// Single line // Single line
f(" 123 455", &Rows{
Rows: []Row{{
Metric: "123",
Value: 455,
}},
})
f("foobar -123.456 789", &Rows{ f("foobar -123.456 789", &Rows{
Rows: []Row{{ Rows: []Row{{
Metric: "foobar", Metric: "foobar",
@ -134,6 +183,26 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
}}, }},
}) })
// Whitespace in metric name, tag name and tag value
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102
f("s a;ta g1=aaa1;tag2=bb b2;tag3 1 23", &Rows{
Rows: []Row{{
Metric: "s a",
Value: 1,
Timestamp: 23,
Tags: []Tag{
{
Key: "ta g1",
Value: "aaa1",
},
{
Key: "tag2",
Value: "bb b2",
},
},
}},
})
// Missing timestamp // Missing timestamp
f("aaa 1123", &Rows{ f("aaa 1123", &Rows{
Rows: []Row{{ Rows: []Row{{