lib/protoparser/influx: automatically detect timestamp precision depending on the number of decimal digits in the timestamp

This commit is contained in:
Aliaksandr Valialkin 2021-10-28 12:46:27 +03:00
parent 105deb164c
commit 6873d6d893
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
3 changed files with 60 additions and 3 deletions

View File

@ -8,6 +8,7 @@ sort: 15
* FEATURE: vmalert: allow groups with empty rules list like Prometheus does. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1742).
* FEATURE: vmagent: add `collapse` and `expand` buttons per each group of targets with the same `job_name` at `http://vmagent:8429/targets` page.
* FEATURE: automatically detect timestamp precision (ns, us, ms or s) for the data ingested into VictoriaMetrics via [InfluxDB line protocol](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
* BUGFIX: vmagent: properly display `proxy_url` config option at `http://vmagent:8429/config` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1755).
* BUGFIX: fix tests for Apple M1. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1653).

View File

@ -36,8 +36,7 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun
r = zr
}
// Default precision is 'ns'. See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp
tsMultiplier := int64(1e6)
tsMultiplier := int64(0)
switch precision {
case "ns":
tsMultiplier = 1e6
@ -189,7 +188,14 @@ func (uw *unmarshalWork) Unmarshal() {
// Adjust timestamps according to uw.tsMultiplier
currentTs := time.Now().UnixNano() / 1e6
tsMultiplier := uw.tsMultiplier
if tsMultiplier >= 1 {
if tsMultiplier == 0 {
// Default precision is 'ns'. See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp
// But it can be in ns, us, ms or s depending on the number of digits in practice.
for i := range rows {
tsPtr := &rows[i].Timestamp
*tsPtr = detectTimestamp(*tsPtr, currentTs)
}
} else if tsMultiplier >= 1 {
for i := range rows {
row := &rows[i]
if row.Timestamp == 0 {
@ -237,3 +243,23 @@ func putUnmarshalWork(uw *unmarshalWork) {
}
var unmarshalWorkPool sync.Pool
func detectTimestamp(ts, currentTs int64) int64 {
if ts == 0 {
return currentTs
}
if ts >= 1e17 {
// convert nanoseconds to milliseconds
return ts / 1e6
}
if ts >= 1e14 {
// convert microseconds to milliseconds
return ts / 1e3
}
if ts >= 1e11 {
// the ts is in milliseconds
return ts
}
// convert seconds to milliseconds
return ts * 1e3
}

View File

@ -0,0 +1,30 @@
package influx
import (
"testing"
)
func TestDetectTimestamp(t *testing.T) {
tsDefault := int64(123)
f := func(ts, tsExpected int64) {
t.Helper()
tsResult := detectTimestamp(ts, tsDefault)
if tsResult != tsExpected {
t.Fatalf("unexpected timestamp for detectTimestamp(%d, %d); got %d; want %d", ts, tsDefault, tsResult, tsExpected)
}
}
f(0, tsDefault)
f(1, 1e3)
f(1e7, 1e10)
f(1e8, 1e11)
f(1e9, 1e12)
f(1e10, 1e13)
f(1e11, 1e11)
f(1e12, 1e12)
f(1e13, 1e13)
f(1e14, 1e11)
f(1e15, 1e12)
f(1e16, 1e13)
f(1e17, 1e11)
f(1e18, 1e12)
}