app/vmselect/netstorage: fix a bug introduced in 1a254ea20c

The bug results in `duplicate output time series` error
because the same time series is added two times into the orderedMetricNames list
inside the tmpBlocksFileWrapper.Finalize().

While at it, properly release all the tmpBlocksFile structs on tbf.Finalize() error.
Previously only the remaining tbf entries were released. This could result in resource leak.
This commit is contained in:
Aliaksandr Valialkin 2022-08-17 14:07:49 +03:00
parent 1812d33a2d
commit 87e0d69bf4
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1

View File

@ -1139,30 +1139,6 @@ func newTmpBlocksFileWrapper() *tmpBlocksFileWrapper {
}
}
func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, map[string][]tmpBlockAddr, uint64, error) {
var bytesTotal uint64
for i, tbf := range tbfw.tbfs {
if err := tbf.Finalize(); err != nil {
// Close the remaining tbfs before returning the error
closeTmpBlockFiles(tbfw.tbfs[i:])
return nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(tbfw.ms[i]), err)
}
bytesTotal += tbf.Len()
}
orderedMetricNames := tbfw.orderedMetricNamess[0]
addrsByMetricName := make(map[string][]tmpBlockAddr)
for i, m := range tbfw.ms {
for _, metricName := range tbfw.orderedMetricNamess[i] {
dstAddrs, ok := addrsByMetricName[metricName]
if !ok {
orderedMetricNames = append(orderedMetricNames, metricName)
}
addrsByMetricName[metricName] = append(dstAddrs, m[metricName]...)
}
}
return orderedMetricNames, addrsByMetricName, bytesTotal, nil
}
func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerIdx int) error {
bb := tmpBufPool.Get()
bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block)
@ -1180,13 +1156,38 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock,
} else {
// An optimization for big number of time series with long names: store only a single copy of metricNameStr
// in both tbfw.orderedMetricNamess and tbfw.ms.
tbfw.orderedMetricNamess[workerIdx] = append(tbfw.orderedMetricNamess[workerIdx], string(metricName))
metricNameStr := tbfw.orderedMetricNamess[workerIdx][len(tbfw.orderedMetricNamess[workerIdx])-1]
orderedMetricNames := tbfw.orderedMetricNamess[workerIdx]
orderedMetricNames = append(orderedMetricNames, string(metricName))
metricNameStr := orderedMetricNames[len(orderedMetricNames)-1]
m[metricNameStr] = addrs
tbfw.orderedMetricNamess[workerIdx] = orderedMetricNames
}
return nil
}
func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, map[string][]tmpBlockAddr, uint64, error) {
var bytesTotal uint64
for i, tbf := range tbfw.tbfs {
if err := tbf.Finalize(); err != nil {
closeTmpBlockFiles(tbfw.tbfs)
return nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(tbfw.ms[i]), err)
}
bytesTotal += tbf.Len()
}
orderedMetricNames := tbfw.orderedMetricNamess[0]
addrsByMetricName := tbfw.ms[0]
for i, m := range tbfw.ms[1:] {
for _, metricName := range tbfw.orderedMetricNamess[i] {
dstAddrs, ok := addrsByMetricName[metricName]
if !ok {
orderedMetricNames = append(orderedMetricNames, metricName)
}
addrsByMetricName[metricName] = append(dstAddrs, m[metricName]...)
}
}
return orderedMetricNames, addrsByMetricName, bytesTotal, nil
}
var metricNamePool = &sync.Pool{
New: func() interface{} {
return &storage.MetricName{}