mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
lib/storage: remove metricID with missing metricID->metricName entry
The metricID->metricName entry can be missing in the indexdb after unclean shutdown when only a part of entries for new time series is written into indexdb. Recover from such a situation by removing the broken metricID. New metricID will be automatically created for time series with the given metricName when new data point will arive to it.
This commit is contained in:
parent
387f62f468
commit
4e22b521c2
@ -359,6 +359,10 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||
return float64(idbm().DateRangeSearchHits)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_missing_metric_names_for_metric_id_total`, func() float64 {
|
||||
return float64(idbm().MissingMetricNamesForMetricID)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_date_metric_id_cache_syncs_total`, func() float64 {
|
||||
return float64(m().DateMetricIDCacheSyncsCount)
|
||||
})
|
||||
|
@ -330,10 +330,7 @@ func (s *Server) processVMSelectConn(bc *handshake.BufferedConn) error {
|
||||
sizeBuf: make([]byte, 8),
|
||||
}
|
||||
for {
|
||||
err := s.processVMSelectRequest(ctx)
|
||||
n := atomic.LoadUint64(&ctx.sr.MissingMetricNamesForMetricID)
|
||||
missingMetricNamesForMetricID.Add(int(n))
|
||||
if err != nil {
|
||||
if err := s.processVMSelectRequest(ctx); err != nil {
|
||||
if err == io.EOF {
|
||||
// Remote client gracefully closed the connection.
|
||||
return nil
|
||||
@ -346,8 +343,6 @@ func (s *Server) processVMSelectConn(bc *handshake.BufferedConn) error {
|
||||
}
|
||||
}
|
||||
|
||||
var missingMetricNamesForMetricID = metrics.NewCounter(`vm_missing_metric_names_for_metric_id_total`)
|
||||
|
||||
type vmselectRequestCtx struct {
|
||||
bc *handshake.BufferedConn
|
||||
sizeBuf []byte
|
||||
|
@ -101,6 +101,11 @@ type indexDB struct {
|
||||
// The number of hits for date range searches.
|
||||
dateRangeSearchHits uint64
|
||||
|
||||
// missingMetricNamesForMetricID is a counter of missing MetricID -> MetricName entries.
|
||||
// High rate may mean corrupted indexDB due to unclean shutdown.
|
||||
// The db must be automatically recovered after that.
|
||||
missingMetricNamesForMetricID uint64
|
||||
|
||||
mustDrop uint64
|
||||
|
||||
// Start date fully covered by per-day inverted index.
|
||||
@ -228,6 +233,8 @@ type IndexDBMetrics struct {
|
||||
DateRangeSearchCalls uint64
|
||||
DateRangeSearchHits uint64
|
||||
|
||||
MissingMetricNamesForMetricID uint64
|
||||
|
||||
IndexBlocksWithMetricIDsProcessed uint64
|
||||
IndexBlocksWithMetricIDsIncorrectOrder uint64
|
||||
|
||||
@ -269,6 +276,8 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
|
||||
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
|
||||
m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits)
|
||||
|
||||
m.MissingMetricNamesForMetricID += atomic.LoadUint64(&db.missingMetricNamesForMetricID)
|
||||
|
||||
m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed)
|
||||
m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder)
|
||||
|
||||
@ -903,6 +912,13 @@ func (db *indexDB) searchMetricName(dst []byte, metricID uint64, accountID, proj
|
||||
// Cannot find MetricName for the given metricID. This may be the case
|
||||
// when indexDB contains incomplete set of metricID -> metricName entries
|
||||
// after a snapshot or due to unflushed entries.
|
||||
atomic.AddUint64(&db.missingMetricNamesForMetricID, 1)
|
||||
|
||||
// Mark the metricID as deleted, so it will be created again when new data point
|
||||
// for the given time series will arrive.
|
||||
if err := db.deleteMetricIDs([]uint64{metricID}); err != nil {
|
||||
return dst, fmt.Errorf("cannot delete metricID for missing metricID->metricName entry; metricID=%d; error: %s", metricID, err)
|
||||
}
|
||||
return dst, io.EOF
|
||||
}
|
||||
|
||||
@ -927,9 +943,28 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := db.deleteMetricIDs(metricIDs); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Delete TSIDs in the extDB.
|
||||
deletedCount := len(metricIDs)
|
||||
if db.doExtDB(func(extDB *indexDB) {
|
||||
var n int
|
||||
n, err = extDB.DeleteTSIDs(tfss)
|
||||
deletedCount += n
|
||||
}) {
|
||||
if err != nil {
|
||||
return deletedCount, fmt.Errorf("cannot delete tsids in extDB: %s", err)
|
||||
}
|
||||
}
|
||||
return deletedCount, nil
|
||||
}
|
||||
|
||||
func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
|
||||
if len(metricIDs) == 0 {
|
||||
// Nothing to delete
|
||||
return 0, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Mark the found metricIDs as deleted.
|
||||
@ -939,12 +974,11 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
||||
items.B = encoding.MarshalUint64(items.B, metricID)
|
||||
items.Next()
|
||||
}
|
||||
err = db.tb.AddItems(items.Items)
|
||||
err := db.tb.AddItems(items.Items)
|
||||
putIndexItems(items)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return err
|
||||
}
|
||||
deletedCount := len(metricIDs)
|
||||
|
||||
// atomically add deleted metricIDs to an inmemory map.
|
||||
dmis := &uint64set.Set{}
|
||||
@ -958,18 +992,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
||||
|
||||
// Do not reset uselessTagFiltersCache, since the found metricIDs
|
||||
// on cache miss are filtered out later with deletedMetricIDs.
|
||||
|
||||
// Delete TSIDs in the extDB.
|
||||
if db.doExtDB(func(extDB *indexDB) {
|
||||
var n int
|
||||
n, err = extDB.DeleteTSIDs(tfss)
|
||||
deletedCount += n
|
||||
}) {
|
||||
if err != nil {
|
||||
return deletedCount, fmt.Errorf("cannot delete tsids in extDB: %s", err)
|
||||
}
|
||||
}
|
||||
return deletedCount, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *indexDB) getDeletedMetricIDs() *uint64set.Set {
|
||||
|
@ -3,7 +3,6 @@ package storage
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
@ -89,11 +88,6 @@ type Search struct {
|
||||
err error
|
||||
|
||||
needClosing bool
|
||||
|
||||
// MissingMetricNamesForMetricID is a counter of missing MetricID -> MetricName
|
||||
// entries during the search.
|
||||
// High rate may mean corrupted indexDB.
|
||||
MissingMetricNamesForMetricID uint64
|
||||
}
|
||||
|
||||
func (s *Search) reset() {
|
||||
@ -104,7 +98,6 @@ func (s *Search) reset() {
|
||||
s.ts.reset()
|
||||
s.err = nil
|
||||
s.needClosing = false
|
||||
s.MissingMetricNamesForMetricID = 0
|
||||
}
|
||||
|
||||
// Init initializes s from the given storage, tfss and tr.
|
||||
@ -161,8 +154,8 @@ func (s *Search) NextMetricBlock() bool {
|
||||
s.MetricBlock.MetricName, err = s.storage.searchMetricName(s.MetricBlock.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
// Missing metricName for tsid.MetricID. Increment error counter and skip it.
|
||||
atomic.AddUint64(&s.MissingMetricNamesForMetricID, 1)
|
||||
// Skip missing metricName for tsid.MetricID.
|
||||
// It should be automatically fixed. See indexDB.searchMetricName for details.
|
||||
continue
|
||||
}
|
||||
s.err = err
|
||||
|
Loading…
Reference in New Issue
Block a user