mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
lib/protoparser/influx: automatically detect timestamp precision depending on the number of decimal digits in the timestamp
This commit is contained in:
parent
d0e7c0535e
commit
bb87949d5c
@ -8,6 +8,7 @@ sort: 15
|
||||
|
||||
* FEATURE: vmalert: allow groups with empty rules list like Prometheus does. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1742).
|
||||
* FEATURE: vmagent: add `collapse` and `expand` buttons per each group of targets with the same `job_name` at `http://vmagent:8429/targets` page.
|
||||
* FEATURE: automatically detect timestamp precision (ns, us, ms or s) for the data ingested into VictoriaMetrics via [InfluxDB line protocol](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
|
||||
|
||||
* BUGFIX: vmagent: properly display `proxy_url` config option at `http://vmagent:8429/config` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1755).
|
||||
* BUGFIX: fix tests for Apple M1. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1653).
|
||||
|
@ -36,8 +36,7 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun
|
||||
r = zr
|
||||
}
|
||||
|
||||
// Default precision is 'ns'. See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp
|
||||
tsMultiplier := int64(1e6)
|
||||
tsMultiplier := int64(0)
|
||||
switch precision {
|
||||
case "ns":
|
||||
tsMultiplier = 1e6
|
||||
@ -189,7 +188,14 @@ func (uw *unmarshalWork) Unmarshal() {
|
||||
// Adjust timestamps according to uw.tsMultiplier
|
||||
currentTs := time.Now().UnixNano() / 1e6
|
||||
tsMultiplier := uw.tsMultiplier
|
||||
if tsMultiplier >= 1 {
|
||||
if tsMultiplier == 0 {
|
||||
// Default precision is 'ns'. See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/#timestamp
|
||||
// But it can be in ns, us, ms or s depending on the number of digits in practice.
|
||||
for i := range rows {
|
||||
tsPtr := &rows[i].Timestamp
|
||||
*tsPtr = detectTimestamp(*tsPtr, currentTs)
|
||||
}
|
||||
} else if tsMultiplier >= 1 {
|
||||
for i := range rows {
|
||||
row := &rows[i]
|
||||
if row.Timestamp == 0 {
|
||||
@ -237,3 +243,23 @@ func putUnmarshalWork(uw *unmarshalWork) {
|
||||
}
|
||||
|
||||
var unmarshalWorkPool sync.Pool
|
||||
|
||||
func detectTimestamp(ts, currentTs int64) int64 {
|
||||
if ts == 0 {
|
||||
return currentTs
|
||||
}
|
||||
if ts >= 1e17 {
|
||||
// convert nanoseconds to milliseconds
|
||||
return ts / 1e6
|
||||
}
|
||||
if ts >= 1e14 {
|
||||
// convert microseconds to milliseconds
|
||||
return ts / 1e3
|
||||
}
|
||||
if ts >= 1e11 {
|
||||
// the ts is in milliseconds
|
||||
return ts
|
||||
}
|
||||
// convert seconds to milliseconds
|
||||
return ts * 1e3
|
||||
}
|
||||
|
30
lib/protoparser/influx/streamparser_test.go
Normal file
30
lib/protoparser/influx/streamparser_test.go
Normal file
@ -0,0 +1,30 @@
|
||||
package influx
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestDetectTimestamp(t *testing.T) {
|
||||
tsDefault := int64(123)
|
||||
f := func(ts, tsExpected int64) {
|
||||
t.Helper()
|
||||
tsResult := detectTimestamp(ts, tsDefault)
|
||||
if tsResult != tsExpected {
|
||||
t.Fatalf("unexpected timestamp for detectTimestamp(%d, %d); got %d; want %d", ts, tsDefault, tsResult, tsExpected)
|
||||
}
|
||||
}
|
||||
f(0, tsDefault)
|
||||
f(1, 1e3)
|
||||
f(1e7, 1e10)
|
||||
f(1e8, 1e11)
|
||||
f(1e9, 1e12)
|
||||
f(1e10, 1e13)
|
||||
f(1e11, 1e11)
|
||||
f(1e12, 1e12)
|
||||
f(1e13, 1e13)
|
||||
f(1e14, 1e11)
|
||||
f(1e15, 1e12)
|
||||
f(1e16, 1e13)
|
||||
f(1e17, 1e11)
|
||||
f(1e18, 1e12)
|
||||
}
|
Loading…
Reference in New Issue
Block a user