From 36a1a21d6e7b9045971b3ab8aa16d2a56b9ab8db Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 24 Jan 2020 20:10:03 +0200 Subject: [PATCH] lib/protoparser: add parser for Prometheus exposition text format This parser will be used by vmagent --- lib/protoparser/prometheus/parser.go | 292 ++++++++++++++++++ lib/protoparser/prometheus/parser_test.go | 270 ++++++++++++++++ .../prometheus/parser_timing_test.go | 25 ++ 3 files changed, 587 insertions(+) create mode 100644 lib/protoparser/prometheus/parser.go create mode 100644 lib/protoparser/prometheus/parser_test.go create mode 100644 lib/protoparser/prometheus/parser_timing_test.go diff --git a/lib/protoparser/prometheus/parser.go b/lib/protoparser/prometheus/parser.go new file mode 100644 index 0000000000..e76a3026cd --- /dev/null +++ b/lib/protoparser/prometheus/parser.go @@ -0,0 +1,292 @@ +package prometheus + +import ( + "fmt" + "strconv" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" + "github.com/valyala/fastjson/fastfloat" +) + +// Rows contains parsed Prometheus rows. +type Rows struct { + Rows []Row + + tagsPool []Tag +} + +// Reset resets rs. +func (rs *Rows) Reset() { + // Reset items, so they can be GC'ed + + for i := range rs.Rows { + rs.Rows[i].reset() + } + rs.Rows = rs.Rows[:0] + + for i := range rs.tagsPool { + rs.tagsPool[i].reset() + } + rs.tagsPool = rs.tagsPool[:0] +} + +// Unmarshal unmarshals Prometheus exposition text rows from s. +// +// See https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-format-details +// +// s must be unchanged until rs is in use. +func (rs *Rows) Unmarshal(s string) { + noEscapes := strings.IndexByte(s, '\\') < 0 + rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0], noEscapes) +} + +// Row is a single Prometheus row. +type Row struct { + Metric string + Tags []Tag + Value float64 + Timestamp int64 +} + +func (r *Row) reset() { + r.Metric = "" + r.Tags = nil + r.Value = 0 + r.Timestamp = 0 +} + +func skipLeadingWhitespace(s string) string { + // Prometheus treats ' ' and '\t' as whitespace + // according to https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-format-details + for len(s) > 0 && (s[0] == ' ' || s[0] == '\t') { + s = s[1:] + } + return s +} + +func skipTrailingWhitespace(s string) string { + // Prometheus treats ' ' and '\t' as whitespace + // according to https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-format-details + for len(s) > 0 && (s[len(s)-1] == ' ' || s[len(s)-1] == '\t') { + s = s[:len(s)-1] + } + return s +} + +func nextWhitespace(s string) int { + n := strings.IndexByte(s, ' ') + if n < 0 { + return strings.IndexByte(s, '\t') + } + n1 := strings.IndexByte(s, '\t') + if n1 < 0 || n1 > n { + return n + } + return n1 +} + +func (r *Row) unmarshal(s string, tagsPool []Tag, noEscapes bool) ([]Tag, error) { + r.reset() + s = skipLeadingWhitespace(s) + n := strings.IndexByte(s, '{') + if n >= 0 { + // Tags found. Parse them. + r.Metric = skipTrailingWhitespace(s[:n]) + s = s[n+1:] + tagsStart := len(tagsPool) + var err error + s, tagsPool, err = unmarshalTags(tagsPool, s, noEscapes) + if err != nil { + return tagsPool, fmt.Errorf("cannot unmarshal tags: %s", err) + } + if len(s) > 0 && s[0] == ' ' { + // Fast path - skip whitespace. + s = s[1:] + } + tags := tagsPool[tagsStart:] + r.Tags = tags[:len(tags):len(tags)] + } else { + // Tags weren't found. Search for value after whitespace + n = nextWhitespace(s) + if n < 0 { + return tagsPool, fmt.Errorf("missing value") + } + r.Metric = s[:n] + s = s[n+1:] + } + if len(r.Metric) == 0 { + return tagsPool, fmt.Errorf("metric cannot be empty") + } + s = skipLeadingWhitespace(s) + if len(s) == 0 { + return tagsPool, fmt.Errorf("value cannot be empty") + } + n = nextWhitespace(s) + if n < 0 { + // There is no timestamp. + r.Value = fastfloat.ParseBestEffort(s) + return tagsPool, nil + } + // There is timestamp. + r.Value = fastfloat.ParseBestEffort(s[:n]) + s = skipLeadingWhitespace(s[n+1:]) + r.Timestamp = fastfloat.ParseInt64BestEffort(s) + return tagsPool, nil +} + +func unmarshalRows(dst []Row, s string, tagsPool []Tag, noEscapes bool) ([]Row, []Tag) { + for len(s) > 0 { + n := strings.IndexByte(s, '\n') + if n < 0 { + // The last line. + return unmarshalRow(dst, s, tagsPool, noEscapes) + } + dst, tagsPool = unmarshalRow(dst, s[:n], tagsPool, noEscapes) + s = s[n+1:] + } + return dst, tagsPool +} + +func unmarshalRow(dst []Row, s string, tagsPool []Tag, noEscapes bool) ([]Row, []Tag) { + if len(s) > 0 && s[len(s)-1] == '\r' { + s = s[:len(s)-1] + } + s = skipLeadingWhitespace(s) + if len(s) == 0 { + // Skip empty line + return dst, tagsPool + } + if s[0] == '#' { + // Skip comment + return dst, tagsPool + } + if cap(dst) > len(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, Row{}) + } + r := &dst[len(dst)-1] + var err error + tagsPool, err = r.unmarshal(s, tagsPool, noEscapes) + if err != nil { + dst = dst[:len(dst)-1] + logger.Errorf("cannot unmarshal Prometheus line %q: %s", s, err) + invalidLines.Inc() + } + return dst, tagsPool +} + +var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="prometheus"}`) + +func unmarshalTags(dst []Tag, s string, noEscapes bool) (string, []Tag, error) { + s = skipLeadingWhitespace(s) + if len(s) > 0 && s[0] == '}' { + // End of tags found. + return s[1:], dst, nil + } + for { + n := strings.IndexByte(s, '=') + if n < 0 { + return s, dst, fmt.Errorf("missing value for tag %q", s) + } + key := skipTrailingWhitespace(s[:n]) + s = skipLeadingWhitespace(s[n+1:]) + if len(s) == 0 || s[0] != '"' { + return s, dst, fmt.Errorf("expecting quoted value for tag %q; got %q", key, s) + } + value := s[1:] + if noEscapes { + // Fast path - the line has no escape chars + n = strings.IndexByte(value, '"') + if n < 0 { + return s, dst, fmt.Errorf("missing closing quote for tag value %q", s) + } + s = value[n+1:] + value = value[:n] + } else { + // Slow path - the line contains escape chars + n = findClosingQuote(s) + if n < 0 { + return s, dst, fmt.Errorf("missing closing quote for tag value %q", s) + } + var err error + value, err = unescapeValue(s[:n+1]) + if err != nil { + return s, dst, fmt.Errorf("cannot unescape value %q for tag %q: %s", s[:n+1], key, err) + } + s = s[n+1:] + } + if len(key) > 0 && len(value) > 0 { + if cap(dst) > len(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, Tag{}) + } + tag := &dst[len(dst)-1] + tag.Key = key + tag.Value = value + } + s = skipLeadingWhitespace(s) + if len(s) > 0 && s[0] == '}' { + // End of tags found. + return s[1:], dst, nil + } + if len(s) == 0 || s[0] != ',' { + return s, dst, fmt.Errorf("missing comma after tag %s=%q", key, value) + } + s = s[1:] + } +} + +// Tag is a Prometheus tag. +type Tag struct { + Key string + Value string +} + +func (t *Tag) reset() { + t.Key = "" + t.Value = "" +} + +func findClosingQuote(s string) int { + if len(s) == 0 || s[0] != '"' { + return -1 + } + off := 1 + s = s[1:] + for { + n := strings.IndexByte(s, '"') + if n < 0 { + return -1 + } + if prevBackslashesCount(s[:n])%2 == 0 { + return off + n + } + off += n + 1 + s = s[n+1:] + } +} + +func unescapeValue(s string) (string, error) { + if len(s) < 2 || s[0] != '"' || s[len(s)-1] != '"' { + return "", fmt.Errorf("unexpected tag value: %q", s) + } + n := strings.IndexByte(s, '\\') + if n < 0 { + // Fast path - nothing to unescape + return s[1 : len(s)-1], nil + } + return strconv.Unquote(s) +} + +func prevBackslashesCount(s string) int { + n := 0 + for len(s) > 0 && s[len(s)-1] == '\\' { + n++ + s = s[:len(s)-1] + } + return n +} diff --git a/lib/protoparser/prometheus/parser_test.go b/lib/protoparser/prometheus/parser_test.go new file mode 100644 index 0000000000..b1517da6d3 --- /dev/null +++ b/lib/protoparser/prometheus/parser_test.go @@ -0,0 +1,270 @@ +package prometheus + +import ( + "reflect" + "testing" +) + +func TestPrevBackslashesCount(t *testing.T) { + f := func(s string, nExpected int) { + t.Helper() + n := prevBackslashesCount(s) + if n != nExpected { + t.Fatalf("unexpected value returned from prevBackslashesCount(%q); got %d; want %d", s, n, nExpected) + } + } + f(``, 0) + f(`foo`, 0) + f(`\`, 1) + f(`\\`, 2) + f(`\\\`, 3) + f(`\\\a`, 0) + f(`foo\bar`, 0) + f(`foo\\`, 2) + f(`\\foo\`, 1) + f(`\\foo\\\\`, 4) +} + +func TestFindClosingQuote(t *testing.T) { + f := func(s string, nExpected int) { + t.Helper() + n := findClosingQuote(s) + if n != nExpected { + t.Fatalf("unexpected value returned from findClosingQuote(%q); got %d; want %d", s, n, nExpected) + } + } + f(``, -1) + f(`x`, -1) + f(`"`, -1) + f(`""`, 1) + f(`foobar"`, -1) + f(`"foo"`, 4) + f(`"\""`, 3) + f(`"\\"`, 3) + f(`"\"`, -1) + f(`"foo\"bar\"baz"`, 14) +} + +func TestUnescapeValueFailure(t *testing.T) { + f := func(s string) { + t.Helper() + ss, err := unescapeValue(s) + if err == nil { + t.Fatalf("expecting error") + } + if ss != "" { + t.Fatalf("expecting empty string; got %q", ss) + } + } + f(``) + f(`foobar`) + f(`"foobar`) + f(`foobar"`) + f(`"foobar\"`) + f(` "foobar"`) + f(`"foobar" `) +} + +func TestUnescapeValueSuccess(t *testing.T) { + f := func(s, resultExpected string) { + t.Helper() + result, err := unescapeValue(s) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if result != resultExpected { + t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) + } + } + f(`""`, "") + f(`"f"`, "f") + f(`"foobar"`, "foobar") + f(`"\"\n\t"`, "\"\n\t") +} + +func TestRowsUnmarshalFailure(t *testing.T) { + f := func(s string) { + t.Helper() + var rows Rows + rows.Unmarshal(s) + if len(rows.Rows) != 0 { + t.Fatalf("unexpected number of rows parsed; got %d; want 0;\nrows:%#v", len(rows.Rows), rows.Rows) + } + + // Try again + rows.Unmarshal(s) + if len(rows.Rows) != 0 { + t.Fatalf("unexpected number of rows parsed; got %d; want 0;\nrows:%#v", len(rows.Rows), rows.Rows) + } + } + + // Empty lines and comments + f("") + f(" ") + f("\t") + f("\t \r") + f("\t\t \n\n # foobar") + f("#foobar") + f("#foobar\n") + + // invalid tags + f("a{") + f("a { ") + f("a {foo") + f("a {foo}") + f("a {foo =") + f(`a {foo ="bar`) + f(`a {foo ="b\ar`) + f(`a {foo = "bar"`) + f(`a {foo ="bar",`) + f(`a {foo ="bar" , `) + f(`a {foo ="bar" , }`) + f(`a {foo ="bar" , baz }`) + + // empty metric name + f(`{foo="bar"}`) + + // Missing value + f("aaa") + f(" aaa") + f(" aaa ") + f(" aaa \n") + f(` aa{foo="bar"} ` + "\n") +} + +func TestRowsUnmarshalSuccess(t *testing.T) { + f := func(s string, rowsExpected *Rows) { + t.Helper() + var rows Rows + rows.Unmarshal(s) + if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { + t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) + } + + // Try unmarshaling again + rows.Unmarshal(s) + if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { + t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) + } + + rows.Reset() + if len(rows.Rows) != 0 { + t.Fatalf("non-empty rows after reset: %+v", rows.Rows) + } + } + + // Empty line or comment + f("", &Rows{}) + f("\r", &Rows{}) + f("\n\n", &Rows{}) + f("\n\r\n", &Rows{}) + f("\t \t\n\r\n#foobar\n # baz", &Rows{}) + + // Single line + f("foobar 78.9", &Rows{ + Rows: []Row{{ + Metric: "foobar", + Value: 78.9, + }}, + }) + f("foobar 123.456 789\n", &Rows{ + Rows: []Row{{ + Metric: "foobar", + Value: 123.456, + Timestamp: 789, + }}, + }) + f("foobar{} 123.456 789\n", &Rows{ + Rows: []Row{{ + Metric: "foobar", + Value: 123.456, + Timestamp: 789, + }}, + }) + + // Timestamp bigger than 1<<31 + f("aaa 1123 429496729600", &Rows{ + Rows: []Row{{ + Metric: "aaa", + Value: 1123, + Timestamp: 429496729600, + }}, + }) + + // Tags + f(`foo{bar="baz"} 1 2`, &Rows{ + Rows: []Row{{ + Metric: "foo", + Tags: []Tag{{ + Key: "bar", + Value: "baz", + }}, + Value: 1, + Timestamp: 2, + }}, + }) + f(`foo{bar="b\"a\\z"} -1.2`, &Rows{ + Rows: []Row{{ + Metric: "foo", + Tags: []Tag{{ + Key: "bar", + Value: "b\"a\\z", + }}, + Value: -1.2, + }}, + }) + // Empty tags + f(`foo {bar="baz",aa="",x="y",="z"} 1 2`, &Rows{ + Rows: []Row{{ + Metric: "foo", + Tags: []Tag{ + { + Key: "bar", + Value: "baz", + }, + { + Key: "x", + Value: "y", + }, + }, + Value: 1, + Timestamp: 2, + }}, + }) + + // Multi lines + f("# foo\n # bar ba zzz\nfoo 0.3 2\naaa 3\nbar.baz 0.34 43\n", &Rows{ + Rows: []Row{ + { + Metric: "foo", + Value: 0.3, + Timestamp: 2, + }, + { + Metric: "aaa", + Value: 3, + }, + { + Metric: "bar.baz", + Value: 0.34, + Timestamp: 43, + }, + }, + }) + + // Multi lines with invalid line + f("\t foo\t {} 0.3\t 2\naaa\n bar.baz 0.34 43\n", &Rows{ + Rows: []Row{ + { + Metric: "foo", + Value: 0.3, + Timestamp: 2, + }, + { + Metric: "bar.baz", + Value: 0.34, + Timestamp: 43, + }, + }, + }) +} diff --git a/lib/protoparser/prometheus/parser_timing_test.go b/lib/protoparser/prometheus/parser_timing_test.go new file mode 100644 index 0000000000..893e829b11 --- /dev/null +++ b/lib/protoparser/prometheus/parser_timing_test.go @@ -0,0 +1,25 @@ +package prometheus + +import ( + "fmt" + "testing" +) + +func BenchmarkRowsUnmarshal(b *testing.B) { + s := `cpu_usage{mode="user"} 1.23 +cpu_usage{mode="system"} 23.344 +cpu_usage{mode="iowait"} 3.3443 +cpu_usage{mode="irq"} 0.34432 +` + b.SetBytes(int64(len(s))) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var rows Rows + for pb.Next() { + rows.Unmarshal(s) + if len(rows.Rows) != 4 { + panic(fmt.Errorf("unexpected number of rows unmarshaled: got %d; want 4", len(rows.Rows))) + } + } + }) +}