mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/storage: do not drop stale NaN samples (#6936)
This patch reverts 1fd3385
After discussing it we've come to conclusion that this is a valid
behavior which can be avoided by deleting the time series only once the
corresponding stale NaNs have been received.
On the other hand, the fix leads to lost stale NaNs in some rare but
valid use cases. For example:
- In a cluster configuration the samples for a given time series are
normally sent to the same vmstorage replica. However, wminsert may
reroute the samples to another replica because the original one is down
or is overloaded. In this case the stale NaN may end up on a replica
that has no data for that time series, but we still want to record that
sample.
Thus, reverting that fix.
---
related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5069
Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com>
This commit is contained in:
parent
b48f5f3e59
commit
39294b4919
@ -1815,14 +1815,12 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||
j := 0
|
||||
for i := range mrs {
|
||||
mr := &mrs[i]
|
||||
var isStaleNan bool
|
||||
if math.IsNaN(mr.Value) {
|
||||
if !decimal.IsStaleNaN(mr.Value) {
|
||||
// Skip NaNs other than Prometheus staleness marker, since the underlying encoding
|
||||
// doesn't know how to work with them.
|
||||
continue
|
||||
}
|
||||
isStaleNan = true
|
||||
}
|
||||
if mr.Timestamp < minTimestamp {
|
||||
// Skip rows with too small timestamps outside the retention.
|
||||
@ -1936,13 +1934,6 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||
continue
|
||||
}
|
||||
|
||||
// If sample is stale and its TSID wasn't found in cache and in indexdb,
|
||||
// then we skip it. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5069
|
||||
if isStaleNan {
|
||||
j--
|
||||
continue
|
||||
}
|
||||
|
||||
// Slowest path - the TSID for the given mr.MetricNameRaw isn't found in indexdb. Create it.
|
||||
generateTSID(&genTSID.TSID, mn)
|
||||
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
"testing/quick"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
@ -1232,65 +1231,6 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
|
||||
path := "TestStorageSeriesAreNotCreatedOnStaleMarkers"
|
||||
s := MustOpenStorage(path, -1, 1e5, 1e6)
|
||||
|
||||
tr := TimeRange{MinTimestamp: 0, MaxTimestamp: 2e10}
|
||||
tfsAll := NewTagFilters()
|
||||
if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
|
||||
t.Fatalf("unexpected error in TagFilters.Add: %s", err)
|
||||
}
|
||||
|
||||
findN := func(n int) {
|
||||
t.Helper()
|
||||
lns, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err)
|
||||
}
|
||||
if len(lns) != n {
|
||||
fmt.Println(lns)
|
||||
t.Fatalf("expected to find %d metric names, found %d instead", n, len(lns))
|
||||
}
|
||||
}
|
||||
|
||||
// db is empty, so should be search results
|
||||
findN(0)
|
||||
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
mrs := testGenerateMetricRows(rng, 20, tr.MinTimestamp, tr.MaxTimestamp)
|
||||
// populate storage with some rows
|
||||
s.AddRows(mrs[:10], defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
|
||||
// verify ingested rows are searchable
|
||||
findN(10)
|
||||
|
||||
// clean up ingested data
|
||||
_, err := s.DeleteSeries(nil, []*TagFilters{tfsAll})
|
||||
if err != nil {
|
||||
t.Fatalf("DeleteSeries failed: %s", err)
|
||||
}
|
||||
|
||||
// verify that data was actually deleted
|
||||
findN(0)
|
||||
|
||||
// mark every 2nd row as stale, simulating a stale target
|
||||
for i := 0; i < len(mrs); i = i + 2 {
|
||||
mrs[i].Value = decimal.StaleNaN
|
||||
}
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
s.DebugFlush()
|
||||
|
||||
// verify that rows marked as stale aren't searchable
|
||||
findN(10)
|
||||
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
// testRemoveAll removes all storage data produced by a test if the test hasn't
|
||||
// failed. For this to work, the storage must use t.Name() as the base dir in
|
||||
// its data path.
|
||||
@ -1380,18 +1320,6 @@ func TestStorageRowsNotAdded(t *testing.T) {
|
||||
tr: TimeRange{minTimestamp, maxTimestamp},
|
||||
})
|
||||
|
||||
minTimestamp = time.Now().UnixMilli()
|
||||
maxTimestamp = minTimestamp + 1000
|
||||
mrs = testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp)
|
||||
for i := range numRows {
|
||||
mrs[i].Value = decimal.StaleNaN
|
||||
}
|
||||
f(&options{
|
||||
name: "StaleNaN",
|
||||
mrs: mrs,
|
||||
tr: TimeRange{minTimestamp, maxTimestamp},
|
||||
})
|
||||
|
||||
minTimestamp = time.Now().UnixMilli()
|
||||
maxTimestamp = minTimestamp + 1000
|
||||
mrs = testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp)
|
||||
|
Loading…
Reference in New Issue
Block a user