Fix inconsistent error handling in Storage.AddRows() (#6583)

`Storage.AddRows()` returns an error only in one case: when
`Storage.updatePerDateData()` fails to unmarshal a `metricNameRaw`. But
the same error is treated as a warning when it happens inside
`Storage.add()` or returned by `Storage.prefillNextIndexDB()`.

This commit fixes this inconsistency by treating the error returned by
`Storage.updatePerDateData()` as a warning as well. As a result
`Storage.add()` does not need a return value anymore and so doesn't
`Storage.AddRows()`.

Additionally, this commit adds a unit test that checks all cases that
result in a row not being added to the storage.

---------

Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com>
Co-authored-by: Nikolay <nik@victoriametrics.com>
This commit is contained in:
rtm0 2024-07-17 12:07:14 +02:00 committed by Aliaksandr Valialkin
parent e1f6926d2a
commit 1b03d7e6de
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
7 changed files with 199 additions and 65 deletions

View File

@ -123,7 +123,8 @@ func (s *VMInsertServer) run() {
logger.Infof("processing vminsert conn from %s", c.RemoteAddr())
err = stream.Parse(bc, func(rows []storage.MetricRow) error {
vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits))
s.storage.AddRows(rows, uint8(*precisionBits))
return nil
}, s.storage.IsReadOnly)
if err != nil {
if s.isStopping() {

View File

@ -67,6 +67,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix input cursor position reset in modal settings. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6530).
* BUGFIX: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): fix `vm_backup_last_run_failed` metric not being properly initialized during startup. Previously, it could imply an error even if the backup have been completed successfully. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6550) for the details.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly calculate [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) over Prometheus buckets with inconsistent values. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4580#issuecomment-2186659102) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6547). Updates [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2819).
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Fix inconsistent error handling in storage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6583) for details.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): fix panic when using multiple topics with the same name when [ingesting metrics from Kafka](https://docs.victoriametrics.com/vmagent/#kafka-integration). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6636) for the details.
## [v1.102.0-rc2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.102.0-rc2)

View File

@ -1573,9 +1573,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
timeMin := currentDayTimestamp - 24*3600*1000
timeMax := currentDayTimestamp + 24*3600*1000
mrs := testGenerateMetricRows(r, metricRowsN, timeMin, timeMax)
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatalf("unexpected error when adding mrs: %s", err)
}
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
// verify the storage contains rows.
@ -1627,9 +1625,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
}
// Re-insert rows again and verify that all the entries belong to new generation
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatalf("unexpected error when adding mrs: %s", err)
}
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
for _, mr := range mrs {

View File

@ -112,15 +112,11 @@ func TestSearch(t *testing.T) {
blockRowsCount++
if blockRowsCount == rowsPerBlock {
if err := st.AddRows(mrs[i-blockRowsCount+1:i+1], defaultPrecisionBits); err != nil {
t.Fatalf("cannot add rows %d-%d: %s", i-blockRowsCount+1, i+1, err)
}
st.AddRows(mrs[i-blockRowsCount+1:i+1], defaultPrecisionBits)
blockRowsCount = 0
}
}
if err := st.AddRows(mrs[rowsCount-blockRowsCount:], defaultPrecisionBits); err != nil {
t.Fatalf("cannot add rows %v-%v: %s", rowsCount-blockRowsCount, rowsCount, err)
}
st.AddRows(mrs[rowsCount-blockRowsCount:], defaultPrecisionBits)
endTimestamp := mrs[len(mrs)-1].Timestamp
// Re-open the storage in order to flush all the pending cached data.

View File

@ -1720,13 +1720,12 @@ var rowsAddedTotal atomic.Uint64
//
// The caller should limit the number of concurrent AddRows calls to the number
// of available CPU cores in order to limit memory usage.
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) {
if len(mrs) == 0 {
return nil
return
}
// Add rows to the storage in blocks with limited size in order to reduce memory usage.
var firstErr error
ic := getMetricRowsInsertCtx()
maxBlockLen := len(ic.rrs)
for len(mrs) > 0 {
@ -1737,17 +1736,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
} else {
mrs = nil
}
if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
rowsAddedTotal.Add(uint64(len(mrsBlock)))
}
putMetricRowsInsertCtx(ic)
return firstErr
}
type metricRowsInsertCtx struct {
@ -1886,7 +1878,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
}
}
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) {
idb := s.idb()
generation := idb.generation
is := idb.getIndexSearch(0, 0, noDeadline)
@ -1910,7 +1902,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
var genTSID generationTSID
// Return only the first error, since it has no sense in returning all errors.
// Log only the first error, since it has no sense in logging all errors.
var firstWarn error
j := 0
@ -2076,23 +2068,20 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
if err := s.prefillNextIndexDB(rows, dstMrs); err != nil {
if firstWarn == nil {
firstWarn = err
firstWarn = fmt.Errorf("cannot prefill next indexdb: %w", err)
}
}
if err := s.updatePerDateData(rows, dstMrs); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot not update per-day index: %w", err)
}
}
if firstWarn != nil {
storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn)
}
err := s.updatePerDateData(rows, dstMrs)
if err != nil {
err = fmt.Errorf("cannot update per-date data: %w", err)
} else {
s.tb.MustAddRows(rows)
}
if err != nil {
return fmt.Errorf("error occurred during rows addition: %w", err)
}
return nil
s.tb.MustAddRows(rows)
}
var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second)

View File

@ -2,12 +2,12 @@ package storage
import (
"fmt"
"math"
"math/rand"
"os"
"path/filepath"
"reflect"
"sort"
"strings"
"sync"
"testing"
"testing/quick"
@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
@ -655,12 +656,7 @@ func testStorageRandTimestamps(s *Storage) error {
}
mrs = append(mrs, mr)
}
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
errStr := err.Error()
if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") {
return fmt.Errorf("unexpected error when adding mrs: %w", err)
}
}
s.AddRows(mrs, defaultPrecisionBits)
}
// Verify the storage contains rows.
@ -780,9 +776,7 @@ func testStorageDeleteSeries(s *Storage, workerNum int) error {
}
mrs = append(mrs, mr)
}
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
return fmt.Errorf("unexpected error when adding mrs: %w", err)
}
s.AddRows(mrs, defaultPrecisionBits)
}
s.DebugFlush()
@ -1200,9 +1194,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
minTimestamp := maxTimestamp - s.retentionMsecs + 3600*1000
for i := 0; i < addsCount; i++ {
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
return fmt.Errorf("unexpected error when adding mrs: %w", err)
}
s.AddRows(mrs, defaultPrecisionBits)
}
// Verify the storage contains rows.
@ -1343,9 +1335,7 @@ func testStorageAddMetrics(s *Storage, workerNum int) error {
Timestamp: timestamp,
Value: value,
}
if err := s.AddRows([]MetricRow{mr}, defaultPrecisionBits); err != nil {
return fmt.Errorf("unexpected error when adding mrs: %w", err)
}
s.AddRows([]MetricRow{mr}, defaultPrecisionBits)
}
// Verify the storage contains rows.
@ -1369,9 +1359,7 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
minTimestamp := maxTimestamp - s.retentionMsecs
for i := 0; i < addsCount; i++ {
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatalf("unexpected error when adding mrs: %s", err)
}
s.AddRows(mrs, defaultPrecisionBits)
}
// Try creating a snapshot from the storage.
snapshotName, err := s.CreateSnapshot()
@ -1441,9 +1429,7 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
rng := rand.New(rand.NewSource(1))
mrs := testGenerateMetricRowsForTenant(accountID, projectID, rng, 20, tr.MinTimestamp, tr.MaxTimestamp)
// populate storage with some rows
if err := s.AddRows(mrs[:10], defaultPrecisionBits); err != nil {
t.Fatal("error when adding mrs: %w", err)
}
s.AddRows(mrs[:10], defaultPrecisionBits)
s.DebugFlush()
// verify ingested rows are searchable
@ -1462,9 +1448,7 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
for i := 0; i < len(mrs); i = i + 2 {
mrs[i].Value = decimal.StaleNaN
}
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatal("error when adding mrs: %w", err)
}
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
// verify that rows marked as stale aren't searchable
@ -1475,3 +1459,172 @@ func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
// testRemoveAll removes all storage data produced by a test if the test hasn't
// failed. For this to work, the storage must use t.Name() as the base dir in
// its data path.
//
// In case of failure, the data is kept for further debugging.
func testRemoveAll(t *testing.T) {
defer func() {
if !t.Failed() {
fs.MustRemoveAll(t.Name())
}
}()
}
func TestStorageRowsNotAdded(t *testing.T) {
const accountID = 123
const projectID = 456
defer testRemoveAll(t)
type options struct {
name string
retention time.Duration
mrs []MetricRow
tr TimeRange
}
f := func(opts *options) {
t.Helper()
var gotMetrics Metrics
path := fmt.Sprintf("%s/%s", t.Name(), opts.name)
s := MustOpenStorage(path, opts.retention, 0, 0)
defer s.MustClose()
s.AddRows(opts.mrs, defaultPrecisionBits)
s.DebugFlush()
s.UpdateMetrics(&gotMetrics)
got := testCountAllMetricNames(s, accountID, projectID, opts.tr)
if got != 0 {
t.Fatalf("unexpected metric name count: got %d, want 0", got)
}
}
const numRows = 1000
var (
rng = rand.New(rand.NewSource(1))
retention time.Duration
minTimestamp int64
maxTimestamp int64
mrs []MetricRow
)
minTimestamp = -1000
maxTimestamp = -1
f(&options{
name: "NegativeTimestamps",
retention: retentionMax,
mrs: testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp),
tr: TimeRange{minTimestamp, maxTimestamp},
})
retention = 48 * time.Hour
minTimestamp = time.Now().Add(-retention - time.Hour).UnixMilli()
maxTimestamp = minTimestamp + 1000
f(&options{
name: "TooSmallTimestamps",
retention: retention,
mrs: testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp),
tr: TimeRange{minTimestamp, maxTimestamp},
})
retention = 48 * time.Hour
minTimestamp = time.Now().Add(7 * 24 * time.Hour).UnixMilli()
maxTimestamp = minTimestamp + 1000
f(&options{
name: "TooBigTimestamps",
retention: retention,
mrs: testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp),
tr: TimeRange{minTimestamp, maxTimestamp},
})
minTimestamp = time.Now().UnixMilli()
maxTimestamp = minTimestamp + 1000
mrs = testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp)
for i := range numRows {
mrs[i].Value = math.NaN()
}
f(&options{
name: "NaN",
mrs: mrs,
tr: TimeRange{minTimestamp, maxTimestamp},
})
minTimestamp = time.Now().UnixMilli()
maxTimestamp = minTimestamp + 1000
mrs = testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp)
for i := range numRows {
mrs[i].Value = decimal.StaleNaN
}
f(&options{
name: "StaleNaN",
mrs: mrs,
tr: TimeRange{minTimestamp, maxTimestamp},
})
minTimestamp = time.Now().UnixMilli()
maxTimestamp = minTimestamp + 1000
mrs = testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp)
for i := range numRows {
mrs[i].MetricNameRaw = []byte("garbage")
}
f(&options{
name: "InvalidMetricNameRaw",
mrs: mrs,
tr: TimeRange{minTimestamp, maxTimestamp},
})
}
func TestStorageRowsNotAdded_SeriesLimitExceeded(t *testing.T) {
const accountID = 123
const projectID = 456
defer testRemoveAll(t)
f := func(name string, maxHourlySeries int, maxDailySeries int) {
t.Helper()
rng := rand.New(rand.NewSource(1))
numRows := uint64(1000)
minTimestamp := time.Now().UnixMilli()
maxTimestamp := minTimestamp + 1000
mrs := testGenerateMetricRowsForTenant(accountID, projectID, rng, numRows, minTimestamp, maxTimestamp)
var gotMetrics Metrics
path := fmt.Sprintf("%s/%s", t.Name(), name)
s := MustOpenStorage(path, 0, maxHourlySeries, maxDailySeries)
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
s.UpdateMetrics(&gotMetrics)
want := numRows - (gotMetrics.HourlySeriesLimitRowsDropped + gotMetrics.DailySeriesLimitRowsDropped)
if got := testCountAllMetricNames(s, accountID, projectID, TimeRange{minTimestamp, maxTimestamp}); uint64(got) != want {
t.Fatalf("unexpected metric name count: %d, want %d", got, want)
}
}
maxHourlySeries := 1
maxDailySeries := 0 // No limit
f("HourlyLimitExceeded", maxHourlySeries, maxDailySeries)
maxHourlySeries = 0 // No limit
maxDailySeries = 1
f("DailyLimitExceeded", maxHourlySeries, maxDailySeries)
}
// testCountAllMetricNames is a test helper function that counts the names of
// all time series within the given time range.
func testCountAllMetricNames(s *Storage, accountID, projectID uint32, tr TimeRange) int {
tfsAll := NewTagFilters(accountID, projectID)
if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err))
}
names, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline)
if err != nil {
panic(fmt.Sprintf("SeachMetricNames() failed unexpectedly: %v", err))
}
return len(names)
}

View File

@ -48,9 +48,7 @@ func benchmarkStorageAddRows(b *testing.B, rowsPerBatch int) {
mr.Timestamp = int64(offset + i)
mr.Value = float64(offset + i)
}
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
panic(fmt.Errorf("cannot add rows to storage: %w", err))
}
s.AddRows(mrs, defaultPrecisionBits)
}
})
b.StopTimer()