lib/storage: limit the number of rows per each block in Storage.AddRows()

This should reduce memory usage when ingesting big blocks or rows.
This commit is contained in:
Aliaksandr Valialkin 2021-05-24 15:24:04 +03:00
parent 95b735a883
commit 2601844de3

View File

@ -1480,18 +1480,56 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
} }
} }
// Add rows to the storage. // Add rows to the storage in blocks with limited size in order to reduce memory usage.
var err error var err error
rr := getRawRowsWithSize(len(mrs)) ic := getMetricRowsInsertCtx()
rr.rows, err = s.add(rr.rows[:0], mrs, precisionBits) maxBlockLen := len(ic.rrs)
putRawRows(rr) for len(mrs) > 0 && err == nil {
mrsBlock := mrs
if len(mrs) > maxBlockLen {
mrsBlock = mrs[:maxBlockLen]
mrs = mrs[maxBlockLen:]
} else {
mrs = nil
}
err = s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
}
putMetricRowsInsertCtx(ic)
<-addRowsConcurrencyCh <-addRowsConcurrencyCh
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs)))
return err return err
} }
type metricRowsInsertCtx struct {
rrs []rawRow
tmpMrs []*MetricRow
}
func getMetricRowsInsertCtx() *metricRowsInsertCtx {
v := metricRowsInsertCtxPool.Get()
if v == nil {
v = &metricRowsInsertCtx{
rrs: make([]rawRow, maxMetricRowsPerBlock),
tmpMrs: make([]*MetricRow, maxMetricRowsPerBlock),
}
}
return v.(*metricRowsInsertCtx)
}
func putMetricRowsInsertCtx(ic *metricRowsInsertCtx) {
tmpMrs := ic.tmpMrs
for i := range tmpMrs {
tmpMrs[i] = nil
}
metricRowsInsertCtxPool.Put(ic)
}
var metricRowsInsertCtxPool sync.Pool
const maxMetricRowsPerBlock = 8000
var ( var (
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation // Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent // is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
@ -1564,13 +1602,8 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
return nil return nil
} }
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) { func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
idb := s.idb() idb := s.idb()
dstMrs := make([]*MetricRow, len(mrs))
if n := len(mrs) - cap(rows); n > 0 {
rows = append(rows[:cap(rows)], make([]rawRow, n)...)
}
rows = rows[:len(mrs)]
j := 0 j := 0
var ( var (
// These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName. // These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName.
@ -1714,9 +1747,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
firstError = fmt.Errorf("cannot update per-date data: %w", err) firstError = fmt.Errorf("cannot update per-date data: %w", err)
} }
if firstError != nil { if firstError != nil {
return rows, fmt.Errorf("error occurred during rows addition: %w", firstError) return fmt.Errorf("error occurred during rows addition: %w", firstError)
} }
return rows, nil return nil
} }
func (s *Storage) isSeriesCardinalityExceeded(metricID uint64, metricNameRaw []byte) bool { func (s *Storage) isSeriesCardinalityExceeded(metricID uint64, metricNameRaw []byte) bool {