From bb87949d5c40d70308fc46940e8b7885681c6097 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 28 Oct 2021 12:46:27 +0300 Subject: [PATCH] lib/protoparser/influx: automatically detect timestamp precision depending on the number of decimal digits in the timestamp --- docs/CHANGELOG.md | 1 + lib/protoparser/influx/streamparser.go | 32 +++++++++++++++++++-- lib/protoparser/influx/streamparser_test.go | 30 +++++++++++++++++++ 3 files changed, 60 insertions(+), 3 deletions(-) create mode 100644 lib/protoparser/influx/streamparser_test.go diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 330d475c0..5022c901e 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,7 @@ sort: 15 * FEATURE: vmalert: allow groups with empty rules list like Prometheus does. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1742). * FEATURE: vmagent: add `collapse` and `expand` buttons per each group of targets with the same `job_name` at `http://vmagent:8429/targets` page. +* FEATURE: automatically detect timestamp precision (ns, us, ms or s) for the data ingested into VictoriaMetrics via [InfluxDB line protocol](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf). * BUGFIX: vmagent: properly display `proxy_url` config option at `http://vmagent:8429/config` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1755). * BUGFIX: fix tests for Apple M1. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1653). diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index 7bc98a0fd..96f865b6a 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -36,8 +36,7 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun r = zr } - // Default precision is 'ns'. See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp - tsMultiplier := int64(1e6) + tsMultiplier := int64(0) switch precision { case "ns": tsMultiplier = 1e6 @@ -189,7 +188,14 @@ func (uw *unmarshalWork) Unmarshal() { // Adjust timestamps according to uw.tsMultiplier currentTs := time.Now().UnixNano() / 1e6 tsMultiplier := uw.tsMultiplier - if tsMultiplier >= 1 { + if tsMultiplier == 0 { + // Default precision is 'ns'. See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp + // But it can be in ns, us, ms or s depending on the number of digits in practice. + for i := range rows { + tsPtr := &rows[i].Timestamp + *tsPtr = detectTimestamp(*tsPtr, currentTs) + } + } else if tsMultiplier >= 1 { for i := range rows { row := &rows[i] if row.Timestamp == 0 { @@ -237,3 +243,23 @@ func putUnmarshalWork(uw *unmarshalWork) { } var unmarshalWorkPool sync.Pool + +func detectTimestamp(ts, currentTs int64) int64 { + if ts == 0 { + return currentTs + } + if ts >= 1e17 { + // convert nanoseconds to milliseconds + return ts / 1e6 + } + if ts >= 1e14 { + // convert microseconds to milliseconds + return ts / 1e3 + } + if ts >= 1e11 { + // the ts is in milliseconds + return ts + } + // convert seconds to milliseconds + return ts * 1e3 +} diff --git a/lib/protoparser/influx/streamparser_test.go b/lib/protoparser/influx/streamparser_test.go new file mode 100644 index 000000000..ca7bd1b30 --- /dev/null +++ b/lib/protoparser/influx/streamparser_test.go @@ -0,0 +1,30 @@ +package influx + +import ( + "testing" +) + +func TestDetectTimestamp(t *testing.T) { + tsDefault := int64(123) + f := func(ts, tsExpected int64) { + t.Helper() + tsResult := detectTimestamp(ts, tsDefault) + if tsResult != tsExpected { + t.Fatalf("unexpected timestamp for detectTimestamp(%d, %d); got %d; want %d", ts, tsDefault, tsResult, tsExpected) + } + } + f(0, tsDefault) + f(1, 1e3) + f(1e7, 1e10) + f(1e8, 1e11) + f(1e9, 1e12) + f(1e10, 1e13) + f(1e11, 1e11) + f(1e12, 1e12) + f(1e13, 1e13) + f(1e14, 1e11) + f(1e15, 1e12) + f(1e16, 1e13) + f(1e17, 1e11) + f(1e18, 1e12) +}