lib/storage: do not stop data ingestion on the first error in Storage.AddRows

Continue data ingestion for the rest of blocks.
This commit is contained in:
Aliaksandr Valialkin 2021-05-24 15:30:39 +03:00
parent 2601844de3
commit 1c16cbacf5

View File

@ -1481,10 +1481,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
} }
// Add rows to the storage in blocks with limited size in order to reduce memory usage. // Add rows to the storage in blocks with limited size in order to reduce memory usage.
var err error var firstErr error
ic := getMetricRowsInsertCtx() ic := getMetricRowsInsertCtx()
maxBlockLen := len(ic.rrs) maxBlockLen := len(ic.rrs)
for len(mrs) > 0 && err == nil { for len(mrs) > 0 {
mrsBlock := mrs mrsBlock := mrs
if len(mrs) > maxBlockLen { if len(mrs) > maxBlockLen {
mrsBlock = mrs[:maxBlockLen] mrsBlock = mrs[:maxBlockLen]
@ -1492,14 +1492,19 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
} else { } else {
mrs = nil mrs = nil
} }
err = s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits) if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock))) atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
} }
putMetricRowsInsertCtx(ic) putMetricRowsInsertCtx(ic)
<-addRowsConcurrencyCh <-addRowsConcurrencyCh
return err return firstErr
} }
type metricRowsInsertCtx struct { type metricRowsInsertCtx struct {