lib/storage: do not drop stale NaN samples (#6936)

This patch reverts 1fd3385

After discussing it we've come to conclusion that this is a valid
behavior which can be avoided by deleting the time series only once the
corresponding stale NaNs have been received.

On the other hand, the fix leads to lost stale NaNs in some rare but
valid use cases. For example:

- In a cluster configuration the samples for a given time series are
normally sent to the same vmstorage replica. However, wminsert may
reroute the samples to another replica because the original one is down
or is overloaded. In this case the stale NaN may end up on a replica
that has no data for that time series, but we still want to record that
sample.

Thus, reverting that fix.

---

related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5069

Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com>
This commit is contained in:
Artem Fetishev 2024-09-05 17:45:09 +03:00 committed by f41gh7
parent 9cb1704d3c
commit 8bdf52977f
No known key found for this signature in database
GPG Key ID: 4558311CF775EC72
2 changed files with 0 additions and 83 deletions

View File

@ -1923,14 +1923,12 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
j := 0 j := 0
for i := range mrs { for i := range mrs {
mr := &mrs[i] mr := &mrs[i]
var isStaleNan bool
if math.IsNaN(mr.Value) { if math.IsNaN(mr.Value) {
if !decimal.IsStaleNaN(mr.Value) { if !decimal.IsStaleNaN(mr.Value) {
// Skip NaNs other than Prometheus staleness marker, since the underlying encoding // Skip NaNs other than Prometheus staleness marker, since the underlying encoding
// doesn't know how to work with them. // doesn't know how to work with them.
continue continue
} }
isStaleNan = true
} }
if mr.Timestamp < minTimestamp { if mr.Timestamp < minTimestamp {
// Skip rows with too small timestamps outside the retention. // Skip rows with too small timestamps outside the retention.
@ -2044,13 +2042,6 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
continue continue
} }
// If sample is stale and its TSID wasn't found in cache and in indexdb,
// then we skip it. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5069
if isStaleNan {
j--
continue
}
// Slowest path - the TSID for the given mr.MetricNameRaw isn't found in indexdb. Create it. // Slowest path - the TSID for the given mr.MetricNameRaw isn't found in indexdb. Create it.
generateTSID(&genTSID.TSID, mn) generateTSID(&genTSID.TSID, mn)

View File

@ -13,7 +13,6 @@ import (
"testing/quick" "testing/quick"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
@ -1428,67 +1427,6 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
} }
} }
func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
path := "TestStorageSeriesAreNotCreatedOnStaleMarkers"
const accountID = 2344
const projectID = 89823
s := MustOpenStorage(path, -1, 1e5, 1e6)
tr := TimeRange{MinTimestamp: 0, MaxTimestamp: 2e10}
tfsAll := NewTagFilters(accountID, projectID)
if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
t.Fatalf("unexpected error in TagFilters.Add: %s", err)
}
findN := func(n int) {
t.Helper()
lns, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e5, noDeadline)
if err != nil {
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err)
}
if len(lns) != n {
fmt.Println(lns)
t.Fatalf("expected to find %d metric names, found %d instead", n, len(lns))
}
}
// db is empty, so should be search results
findN(0)
rng := rand.New(rand.NewSource(1))
mrs := testGenerateMetricRowsForTenant(accountID, projectID, rng, 20, tr.MinTimestamp, tr.MaxTimestamp)
// populate storage with some rows
s.AddRows(mrs[:10], defaultPrecisionBits)
s.DebugFlush()
// verify ingested rows are searchable
findN(10)
// clean up ingested data
_, err := s.DeleteSeries(nil, []*TagFilters{tfsAll})
if err != nil {
t.Fatalf("DeleteSeries failed: %s", err)
}
// verify that data was actually deleted
findN(0)
// mark every 2nd row as stale, simulating a stale target
for i := 0; i < len(mrs); i = i + 2 {
mrs[i].Value = decimal.StaleNaN
}
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
// verify that rows marked as stale aren't searchable
findN(10)
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
// testRemoveAll removes all storage data produced by a test if the test hasn't // testRemoveAll removes all storage data produced by a test if the test hasn't
// failed. For this to work, the storage must use t.Name() as the base dir in // failed. For this to work, the storage must use t.Name() as the base dir in
// its data path. // its data path.
@ -1581,18 +1519,6 @@ func TestStorageRowsNotAdded(t *testing.T) {
tr: TimeRange{minTimestamp, maxTimestamp}, tr: TimeRange{minTimestamp, maxTimestamp},
}) })
minTimestamp = time.Now().UnixMilli()
maxTimestamp = minTimestamp + 1000
mrs = testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp)
for i := range numRows {
mrs[i].Value = decimal.StaleNaN
}
f(&options{
name: "StaleNaN",
mrs: mrs,
tr: TimeRange{minTimestamp, maxTimestamp},
})
minTimestamp = time.Now().UnixMilli() minTimestamp = time.Now().UnixMilli()
maxTimestamp = minTimestamp + 1000 maxTimestamp = minTimestamp + 1000
mrs = testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp) mrs = testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp)