lib/mergeset: panic when too long item is passed to Table.AddItems()

This commit is contained in:
Aliaksandr Valialkin 2022-12-03 23:30:31 -08:00
parent dccd70ce10
commit a13d21513e
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
6 changed files with 38 additions and 171 deletions

View File

@ -138,12 +138,11 @@ func (riss *rawItemsShards) init() {
riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)
}
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) error {
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) {
n := atomic.AddUint32(&riss.shardIdx, 1)
shards := riss.shards
idx := n % uint32(len(shards))
shard := &shards[idx]
return shard.addItems(tb, items)
shards[idx].addItems(tb, items)
}
func (riss *rawItemsShards) Len() int {
@ -180,8 +179,7 @@ func (ris *rawItemsShard) Len() int {
return n
}
func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
var err error
func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) {
var blocksToFlush []*inmemoryBlock
ris.mu.Lock()
@ -193,17 +191,18 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
}
ib := ibs[len(ibs)-1]
for _, item := range items {
if !ib.Add(item) {
if ib.Add(item) {
continue
}
ib = getInmemoryBlock()
if !ib.Add(item) {
putInmemoryBlock(ib)
err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item))
break
}
if ib.Add(item) {
ibs = append(ibs, ib)
continue
}
putInmemoryBlock(ib)
logger.Panicf("BUG: cannot insert too big item into an empty inmemoryBlock len(item)=%d; the caller should be responsible for avoiding too big items", len(item))
}
ris.ibs = ibs
}
}
if len(ibs) >= maxBlocksPerShard {
blocksToFlush = append(blocksToFlush, ibs...)
for i := range ibs {
@ -215,7 +214,6 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
ris.mu.Unlock()
tb.mergeRawItemsBlocks(blocksToFlush, false)
return err
}
type partWrapper struct {
@ -457,17 +455,17 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
}
// AddItems adds the given items to the tb.
func (tb *Table) AddItems(items [][]byte) error {
if err := tb.rawItems.addItems(tb, items); err != nil {
return fmt.Errorf("cannot insert data into %q: %w", tb.path, err)
}
//
// The function panics when items contains an item with length exceeding maxInmemoryBlockSize.
// It is caller's responsibility to make sure there are no too long items.
func (tb *Table) AddItems(items [][]byte) {
tb.rawItems.addItems(tb, items)
atomic.AddUint64(&tb.itemsAdded, uint64(len(items)))
n := 0
for _, item := range items {
n += len(item)
}
atomic.AddUint64(&tb.itemsAddedSizeBytes, uint64(n))
return nil
}
// getParts appends parts snapshot to dst and returns it.

View File

@ -161,9 +161,7 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) {
items := make([]string, itemsCount)
for i := 0; i < itemsCount; i++ {
item := fmt.Sprintf("%d:%d", rand.Intn(1e9), i)
if err := tb.AddItems([][]byte{[]byte(item)}); err != nil {
return nil, nil, fmt.Errorf("cannot add item: %w", err)
}
tb.AddItems([][]byte{[]byte(item)})
items[i] = item
}
tb.DebugFlush()

View File

@ -7,8 +7,6 @@ import (
"sync"
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
func TestTableOpenClose(t *testing.T) {
@ -120,9 +118,7 @@ func testAddItemsSerial(tb *Table, itemsCount int) {
if len(item) > maxInmemoryBlockSize {
item = item[:maxInmemoryBlockSize]
}
if err := tb.AddItems([][]byte{item}); err != nil {
logger.Panicf("BUG: cannot add item to table: %s", err)
}
tb.AddItems([][]byte{item})
}
}
@ -146,9 +142,7 @@ func TestTableCreateSnapshotAt(t *testing.T) {
const itemsCount = 3e5
for i := 0; i < itemsCount; i++ {
item := []byte(fmt.Sprintf("item %d", i))
if err := tb.AddItems([][]byte{item}); err != nil {
t.Fatalf("cannot add item to table: %s", err)
}
tb.AddItems([][]byte{item})
}
tb.DebugFlush()
@ -276,9 +270,7 @@ func testAddItemsConcurrent(tb *Table, itemsCount int) {
if len(item) > maxInmemoryBlockSize {
item = item[:maxInmemoryBlockSize]
}
if err := tb.AddItems([][]byte{item}); err != nil {
logger.Panicf("BUG: cannot add item to table: %s", err)
}
tb.AddItems([][]byte{item})
}
}()
}

View File

@ -408,12 +408,8 @@ func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date
return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err)
}
mn.sortTags()
if err := is.createGlobalIndexes(tsid, mn); err != nil {
return false, fmt.Errorf("cannot create global indexes: %w", err)
}
if err := is.createPerDayIndexes(date, tsid.MetricID, mn); err != nil {
return false, fmt.Errorf("cannot create per-day indexes for date=%s: %w", dateToString(date), err)
}
is.createGlobalIndexes(tsid, mn)
is.createPerDayIndexes(date, tsid.MetricID, mn)
PutMetricName(mn)
atomic.AddUint64(&is.db.timeseriesRepopulated, 1)
return true, nil
@ -619,12 +615,8 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []b
if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
return err
}
if err := is.createGlobalIndexes(dst, mn); err != nil {
return fmt.Errorf("cannot create global indexes: %w", err)
}
if err := is.createPerDayIndexes(date, dst.MetricID, mn); err != nil {
return fmt.Errorf("cannot create per-day indexes for date=%s: %w", dateToString(date), err)
}
is.createGlobalIndexes(dst, mn)
is.createPerDayIndexes(date, dst.MetricID, mn)
// There is no need in invalidating tag cache, since it is invalidated
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to OpenTable.
@ -690,7 +682,7 @@ func generateTSID(dst *TSID, mn *MetricName) {
dst.MetricID = generateUniqueMetricID()
}
func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) error {
func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) {
// The order of index items is important.
// It guarantees index consistency.
@ -721,7 +713,7 @@ func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) error {
ii.registerTagIndexes(prefix.B, mn, tsid.MetricID)
kbPool.Put(prefix)
return is.db.tb.AddItems(ii.Items)
is.db.tb.AddItems(ii.Items)
}
type indexItems struct {
@ -1792,9 +1784,7 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64, accoun
// Mark the metricID as deleted, so it will be created again when new data point
// for the given time series will arrive.
if err := db.deleteMetricIDs([]uint64{metricID}); err != nil {
return dst, fmt.Errorf("cannot delete metricID for missing metricID->metricName entry; metricID=%d; error: %w", metricID, err)
}
db.deleteMetricIDs([]uint64{metricID})
return dst, io.EOF
}
@ -1821,9 +1811,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int,
if err != nil {
return 0, err
}
if err := db.deleteMetricIDs(metricIDs); err != nil {
return 0, err
}
db.deleteMetricIDs(metricIDs)
// Delete TSIDs in the extDB.
deletedCount := len(metricIDs)
@ -1841,10 +1829,10 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int,
return deletedCount, nil
}
func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
func (db *indexDB) deleteMetricIDs(metricIDs []uint64) {
if len(metricIDs) == 0 {
// Nothing to delete
return nil
return
}
// atomically add deleted metricIDs to an inmemory map.
@ -1869,9 +1857,8 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
}
err := db.tb.AddItems(items.Items)
db.tb.AddItems(items.Items)
putIndexItems(items)
return err
}
func (db *indexDB) loadDeletedMetricIDs() (*uint64set.Set, error) {
@ -2947,7 +2934,7 @@ const (
int64Max = int64((1 << 63) - 1)
)
func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) error {
func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) {
ii := getIndexItems()
defer putIndexItems(ii)
@ -2962,12 +2949,8 @@ func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, mn.AccountID, mn.ProjectID)
kb.B = encoding.MarshalUint64(kb.B, date)
ii.registerTagIndexes(kb.B, mn, metricID)
if err := is.db.tb.AddItems(ii.Items); err != nil {
return fmt.Errorf("cannot add per-day entires for metricID %d: %w", metricID, err)
}
is.db.tb.AddItems(ii.Items)
is.db.s.dateMetricIDCache.Set(date, metricID)
return nil
}
func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) {

View File

@ -541,22 +541,13 @@ func TestIndexDB(t *testing.T) {
}
}()
if err := testIndexDBBigMetricName(db); err != nil {
t.Fatalf("unexpected error: %s", err)
}
mns, tsids, tenants, err := testIndexDBGetOrCreateTSIDByName(db, accountsCount, projectsCount, metricGroups)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := testIndexDBBigMetricName(db); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := testIndexDBCheckTSIDByName(db, mns, tsids, tenants, false); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := testIndexDBBigMetricName(db); err != nil {
t.Fatalf("unexpected error: %s", err)
}
// Re-open the db and verify it works as expected.
db.MustClose()
@ -564,15 +555,9 @@ func TestIndexDB(t *testing.T) {
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
if err := testIndexDBBigMetricName(db); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := testIndexDBCheckTSIDByName(db, mns, tsids, tenants, false); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if err := testIndexDBBigMetricName(db); err != nil {
t.Fatalf("unexpected error: %s", err)
}
})
t.Run("concurrent", func(t *testing.T) {
@ -595,27 +580,15 @@ func TestIndexDB(t *testing.T) {
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
if err := testIndexDBBigMetricName(db); err != nil {
ch <- err
return
}
mns, tsid, tenants, err := testIndexDBGetOrCreateTSIDByName(db, accountsCount, projectsCount, metricGroups)
if err != nil {
ch <- err
return
}
if err := testIndexDBBigMetricName(db); err != nil {
ch <- err
return
}
if err := testIndexDBCheckTSIDByName(db, mns, tsid, tenants, true); err != nil {
ch <- err
return
}
if err := testIndexDBBigMetricName(db); err != nil {
ch <- err
return
}
ch <- nil
}()
}
@ -636,74 +609,6 @@ func TestIndexDB(t *testing.T) {
})
}
func testIndexDBBigMetricName(db *indexDB) error {
var bigBytes []byte
for i := 0; i < 128*1000; i++ {
bigBytes = append(bigBytes, byte(i))
}
var mn MetricName
var tsid TSID
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
// Try creating too big metric group
mn.Reset()
mn.MetricGroup = append(mn.MetricGroup[:0], bigBytes...)
mn.sortTags()
metricName := mn.Marshal(nil)
metricNameRaw := mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big MetricGroup")
}
// Try creating too big tag key
mn.Reset()
mn.MetricGroup = append(mn.MetricGroup[:0], "xxx"...)
mn.Tags = []Tag{{
Key: append([]byte(nil), bigBytes...),
Value: []byte("foobar"),
}}
mn.sortTags()
metricName = mn.Marshal(nil)
metricNameRaw = mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag key")
}
// Try creating too big tag value
mn.Reset()
mn.MetricGroup = append(mn.MetricGroup[:0], "xxx"...)
mn.Tags = []Tag{{
Key: []byte("foobar"),
Value: append([]byte(nil), bigBytes...),
}}
mn.sortTags()
metricName = mn.Marshal(nil)
metricNameRaw = mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag value")
}
// Try creating metric name with too many tags
mn.Reset()
mn.MetricGroup = append(mn.MetricGroup[:0], "xxx"...)
for i := 0; i < 60000; i++ {
mn.Tags = append(mn.Tags, Tag{
Key: []byte(fmt.Sprintf("foobar %d", i)),
Value: []byte(fmt.Sprintf("sdfjdslkfj %d", i)),
})
}
mn.sortTags()
metricName = mn.Marshal(nil)
metricNameRaw = mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too many tags")
}
return nil
}
func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, metricGroups int) ([]MetricName, []TSID, []string, error) {
// Create tsids.
var mns []MetricName
@ -756,9 +661,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
date := uint64(timestampFromTime(time.Now())) / msecPerDay
for i := range tsids {
tsid := &tsids[i]
if err := is.createPerDayIndexes(date, tsid.MetricID, &mns[i]); err != nil {
return nil, nil, nil, fmt.Errorf("error in createPerDayIndexes(%d, %d): %w", date, tsid.MetricID, err)
}
is.createPerDayIndexes(date, tsid.MetricID, &mns[i])
}
// Flush index to disk, so it becomes visible for search
@ -1838,9 +1741,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
for i := range tsids {
tsid := &tsids[i]
metricIDs.Add(tsid.MetricID)
if err := is.createPerDayIndexes(date, tsid.MetricID, &mns[i]); err != nil {
t.Fatalf("error in createPerDayIndexes(%d, %d): %s", date, tsid.MetricID, err)
}
is.createPerDayIndexes(date, tsid.MetricID, &mns[i])
}
allMetricIDs.Union(&metricIDs)
perDayMetricIDs[date] = &metricIDs

View File

@ -2196,12 +2196,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
continue
}
mn.sortTags()
if err := is.createPerDayIndexes(date, metricID, mn); err != nil {
if firstError == nil {
firstError = fmt.Errorf("error when storing per-date inverted index for (date=%s, metricID=%d): %w", dateToString(date), metricID, err)
}
continue
}
is.createPerDayIndexes(date, metricID, mn)
}
dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{
date: date,