mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 08:23:34 +01:00
lib/storage: calculate the maximum number of rows per small part from -memory.allowedPercent
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/159 This simplifies error detection additionally to the `vm_rows_ignored_total` counters.
This commit is contained in:
parent
0a8dd9cc9a
commit
e2eac858b5
@ -773,7 +773,9 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
|
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
|
||||||
var errors []error
|
// Return only the last error, since it has no sense in returning all errors.
|
||||||
|
var lastError error
|
||||||
|
|
||||||
var is *indexSearch
|
var is *indexSearch
|
||||||
var mn *MetricName
|
var mn *MetricName
|
||||||
var kb *bytesutil.ByteBuffer
|
var kb *bytesutil.ByteBuffer
|
||||||
@ -796,11 +798,13 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
|||||||
}
|
}
|
||||||
if mr.Timestamp < minTimestamp {
|
if mr.Timestamp < minTimestamp {
|
||||||
// Skip rows with too small timestamps outside the retention.
|
// Skip rows with too small timestamps outside the retention.
|
||||||
|
lastError = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d", mr.Timestamp, minTimestamp)
|
||||||
atomic.AddUint64(&s.tooSmallTimestampRows, 1)
|
atomic.AddUint64(&s.tooSmallTimestampRows, 1)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if mr.Timestamp > maxTimestamp {
|
if mr.Timestamp > maxTimestamp {
|
||||||
// Skip rows with too big timestamps significantly exceeding the current time.
|
// Skip rows with too big timestamps significantly exceeding the current time.
|
||||||
|
lastError = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d", mr.Timestamp, maxTimestamp)
|
||||||
atomic.AddUint64(&s.tooBigTimestampRows, 1)
|
atomic.AddUint64(&s.tooBigTimestampRows, 1)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -830,8 +834,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
|||||||
// Do not stop adding rows on error - just skip invalid row.
|
// Do not stop adding rows on error - just skip invalid row.
|
||||||
// This guarantees that invalid rows don't prevent
|
// This guarantees that invalid rows don't prevent
|
||||||
// from adding valid rows into the storage.
|
// from adding valid rows into the storage.
|
||||||
err = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err)
|
lastError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err)
|
||||||
errors = append(errors, err)
|
|
||||||
j--
|
j--
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -841,8 +844,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
|||||||
// Do not stop adding rows on error - just skip invalid row.
|
// Do not stop adding rows on error - just skip invalid row.
|
||||||
// This guarantees that invalid rows don't prevent
|
// This guarantees that invalid rows don't prevent
|
||||||
// from adding valid rows into the storage.
|
// from adding valid rows into the storage.
|
||||||
err = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err)
|
lastError = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err)
|
||||||
errors = append(errors, err)
|
|
||||||
j--
|
j--
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -856,18 +858,16 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
|||||||
rows = rows[:rowsLen+j]
|
rows = rows[:rowsLen+j]
|
||||||
|
|
||||||
if err := s.tb.AddRows(rows); err != nil {
|
if err := s.tb.AddRows(rows); err != nil {
|
||||||
err = fmt.Errorf("cannot add rows to table: %s", err)
|
lastError = fmt.Errorf("cannot add rows to table: %s", err)
|
||||||
errors = append(errors, err)
|
|
||||||
}
|
}
|
||||||
errors = s.updateDateMetricIDCache(rows, errors)
|
lastError = s.updateDateMetricIDCache(rows, lastError)
|
||||||
if len(errors) > 0 {
|
if lastError != nil {
|
||||||
// Return only the first error, since it has no sense in returning all errors.
|
return rows, fmt.Errorf("errors occurred during rows addition: %s", lastError)
|
||||||
return rows, fmt.Errorf("errors occurred during rows addition: %s", errors[0])
|
|
||||||
}
|
}
|
||||||
return rows, nil
|
return rows, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error {
|
func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error {
|
||||||
var date uint64
|
var date uint64
|
||||||
var hour uint64
|
var hour uint64
|
||||||
var prevTimestamp int64
|
var prevTimestamp int64
|
||||||
@ -909,11 +909,11 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error
|
|||||||
// by concurrent goroutines.
|
// by concurrent goroutines.
|
||||||
s.dateMetricIDCache.Set(keyBuf, nil)
|
s.dateMetricIDCache.Set(keyBuf, nil)
|
||||||
if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil {
|
if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil {
|
||||||
errors = append(errors, err)
|
lastError = err
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return errors
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Storage) updateCurrHourMetricIDs() {
|
func (s *Storage) updateCurrHourMetricIDs() {
|
||||||
|
@ -349,7 +349,8 @@ func testStorageRandTimestamps(s *Storage) error {
|
|||||||
mrs = append(mrs, mr)
|
mrs = append(mrs, mr)
|
||||||
}
|
}
|
||||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
||||||
if !strings.Contains(err.Error(), "too big timestamp") {
|
errStr := err.Error()
|
||||||
|
if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") {
|
||||||
return fmt.Errorf("unexpected error when adding mrs: %s", err)
|
return fmt.Errorf("unexpected error when adding mrs: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user