lib/storage: log metric name plus all its labels when the metric timestamp is outside the configured retention

This should simplify debugging when the source of the metric with unexpected timestamp must be found.
This commit is contained in:
Aliaksandr Valialkin 2020-11-25 14:41:02 +02:00
parent b65236530c
commit 8a057e705a
4 changed files with 49 additions and 13 deletions

View File

@ -19,6 +19,7 @@ See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/851
* FEATURE: add remoteAddr to slow query log in order to simplify identifying the client that sends slow queries to VictoriaMetrics.
Slow query logging is controlled with `-search.logSlowQueryDuration` command-line flag.
* FEATURE: add `/tags/delSeries` handler from Graphite Tags API. See https://victoriametrics.github.io/#graphite-tags-api-usage
* FEATURE: log metric name plus all its labels when the metric timestamp is out of the configured retention. This should simplify detecting the source of metrics with unexpected timestamps.
* BUGFIX: properly parse Prometheus metrics with [exemplars](https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md#exemplars-1) such as `foo 123 # {bar="baz"} 1`.
* BUGFIX: properly parse "infinity" values in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/OpenMetrics.md#abnf).

View File

@ -343,17 +343,17 @@ func hasTag(tags []string, key []byte) bool {
}
// String returns user-readable representation of the metric name.
//
// Use this function only for debug logging.
func (mn *MetricName) String() string {
mn.sortTags()
var mnCopy MetricName
mnCopy.CopyFrom(mn)
mnCopy.sortTags()
var tags []string
for i := range mn.Tags {
t := &mn.Tags[i]
tags = append(tags, fmt.Sprintf("%q=%q", t.Key, t.Value))
for i := range mnCopy.Tags {
t := &mnCopy.Tags[i]
tags = append(tags, fmt.Sprintf("%s=%q", t.Key, t.Value))
}
tagsStr := strings.Join(tags, ", ")
return fmt.Sprintf("MetricGroup=%q, tags=[%s]", mn.MetricGroup, tagsStr)
tagsStr := strings.Join(tags, ",")
return fmt.Sprintf("%s{%s}", mnCopy.MetricGroup, tagsStr)
}
// Marshal appends marshaled mn to dst and returns the result.

View File

@ -6,6 +6,32 @@ import (
"testing"
)
func TestMetricNameString(t *testing.T) {
f := func(mn *MetricName, resultExpected string) {
t.Helper()
result := mn.String()
if result != resultExpected {
t.Fatalf("unexpected result\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
f(&MetricName{
MetricGroup: []byte("foobar"),
}, "foobar{}")
f(&MetricName{
MetricGroup: []byte("abc"),
Tags: []Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("baz"),
Value: []byte("123"),
},
},
}, `abc{baz="123",foo="bar"}`)
}
func TestMetricNameSortTags(t *testing.T) {
testMetricNameSortTags(t, []string{}, []string{})
testMetricNameSortTags(t, []string{"foo"}, []string{"foo"})

View File

@ -1238,9 +1238,10 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
if mr.Timestamp < minTimestamp {
// Skip rows with too small timestamps outside the retention.
if firstWarn == nil {
metricName := getUserReadableMetricName(mr.MetricNameRaw)
firstWarn = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d; "+
"probably you need updating -retentionPeriod command-line flag",
mr.Timestamp, minTimestamp)
"probably you need updating -retentionPeriod command-line flag; metricName: %s",
mr.Timestamp, minTimestamp, metricName)
}
atomic.AddUint64(&s.tooSmallTimestampRows, 1)
continue
@ -1248,9 +1249,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
if mr.Timestamp > maxTimestamp {
// Skip rows with too big timestamps significantly exceeding the current time.
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowed timestamp is %d; "+
"propbably you need updating -retentionPeriod command-line flag",
mr.Timestamp, maxTimestamp)
metricName := getUserReadableMetricName(mr.MetricNameRaw)
firstWarn = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowed timestamp is %d; metricName: %s",
mr.Timestamp, maxTimestamp, metricName)
}
atomic.AddUint64(&s.tooBigTimestampRows, 1)
continue
@ -1359,6 +1360,14 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
return rows, nil
}
func getUserReadableMetricName(metricNameRaw []byte) string {
var mn MetricName
if err := mn.unmarshalRaw(metricNameRaw); err != nil {
return fmt.Sprintf("cannot unmarshal metricNameRaw %q: %s", metricNameRaw, err)
}
return mn.String()
}
type pendingMetricRow struct {
MetricName []byte
mr MetricRow