app/vmselect/netstorage: group per-vmstorage fields at tmpBlocksFileWrapperShard

This improves code readability a bit
This commit is contained in:
Aliaksandr Valialkin 2024-01-23 00:50:32 +02:00
parent d52b121222
commit f8a9ef8cbd
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB

View File

@ -1349,27 +1349,40 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia
} }
type tmpBlocksFileWrapper struct { type tmpBlocksFileWrapper struct {
tbfs []*tmpBlocksFile shards []tmpBlocksFileWrapperShardWithPadding
}
// metricNamesBufs are per-worker bufs for holding all the loaded unique metric names for ms and orderedMetricNamess. type tmpBlocksFileWrapperShard struct {
// tbf is a file where temporary blocks are stored from the read time series.
tbf *tmpBlocksFile
// metricNamesBuf is a buf for holding all the loaded unique metric names for m and orderedMetricNames.
// It should reduce pressure on Go GC by reducing the number of string allocations // It should reduce pressure on Go GC by reducing the number of string allocations
// when constructing metricName string from byte slice. // when constructing metricName string from byte slice.
metricNamesBufs [][]byte metricNamesBuf []byte
// addrssPools are per-worker pools for holding all the blockAddrs across all the loaded time series. // addrssPool is a pool for holding all the blockAddrs objects across all the loaded time series.
// It should reduce pressure on Go GC by reducing the number of blockAddrs object allocations. // It should reduce pressure on Go GC by reducing the number of blockAddrs object allocations.
addrssPools [][]blockAddrs addrssPool []blockAddrs
// addrsPools are per-worker pools for holding the most of blockAddrs.addrs slices // addrsPool is a pool for holding the most of blockAddrs.addrs slices.
// It should reduce pressure on Go GC by reducing the number of blockAddrs.addrs allocations. // It should reduce pressure on Go GC by reducing the number of blockAddrs.addrs allocations.
addrsPools [][]tmpBlockAddr addrsPool []tmpBlockAddr
// ms maps metricName to the addrssPools index. // m maps metricName to the addrssPool index.
ms []map[string]int m map[string]int
// orderedMetricNamess contain metric names in the order of their load. // orderedMetricNames contains metric names in the order of their load.
// This order is important for sequential read of data from tmpBlocksFile. // This order is important for sequential read of data from tmpBlocksFile.
orderedMetricNamess [][]string orderedMetricNames []string
}
type tmpBlocksFileWrapperShardWithPadding struct {
tmpBlocksFileWrapperShard
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(tmpBlocksFileWrapperShard{})%128]byte
} }
type blockAddrs struct { type blockAddrs struct {
@ -1382,58 +1395,54 @@ func haveSameBlockAddrTails(a, b []tmpBlockAddr) bool {
return sha.Data+uintptr(sha.Len)*unsafe.Sizeof(tmpBlockAddr{}) == shb.Data+uintptr(shb.Len)*unsafe.Sizeof(tmpBlockAddr{}) return sha.Data+uintptr(sha.Len)*unsafe.Sizeof(tmpBlockAddr{}) == shb.Data+uintptr(shb.Len)*unsafe.Sizeof(tmpBlockAddr{})
} }
func (tbfw *tmpBlocksFileWrapper) newBlockAddrs(workerID uint) int { func (tbfwLocal *tmpBlocksFileWrapperShard) newBlockAddrs() int {
addrssPool := tbfw.addrssPools[workerID] addrssPool := tbfwLocal.addrssPool
if cap(addrssPool) > len(addrssPool) { if cap(addrssPool) > len(addrssPool) {
addrssPool = addrssPool[:len(addrssPool)+1] addrssPool = addrssPool[:len(addrssPool)+1]
} else { } else {
addrssPool = append(addrssPool, blockAddrs{}) addrssPool = append(addrssPool, blockAddrs{})
} }
tbfw.addrssPools[workerID] = addrssPool tbfwLocal.addrssPool = addrssPool
idx := len(addrssPool) - 1 idx := len(addrssPool) - 1
return idx return idx
} }
func newTmpBlocksFileWrapper(sns []*storageNode) *tmpBlocksFileWrapper { func newTmpBlocksFileWrapper(sns []*storageNode) *tmpBlocksFileWrapper {
n := len(sns) n := len(sns)
tbfs := make([]*tmpBlocksFile, n) shards := make([]tmpBlocksFileWrapperShardWithPadding, n)
for i := range tbfs { for i := range shards {
tbfs[i] = getTmpBlocksFile() shard := &shards[i]
} shard.tbf = getTmpBlocksFile()
ms := make([]map[string]int, n) shard.m = make(map[string]int)
for i := range ms {
ms[i] = make(map[string]int)
} }
return &tmpBlocksFileWrapper{ return &tmpBlocksFileWrapper{
tbfs: tbfs, shards: shards,
metricNamesBufs: make([][]byte, n),
addrssPools: make([][]blockAddrs, n),
addrsPools: make([][]tmpBlockAddr, n),
ms: ms,
orderedMetricNamess: make([][]string, n),
} }
} }
func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerID uint) error { func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerID uint) error {
tbfwLocal := &tbfw.shards[workerID]
bb := tmpBufPool.Get() bb := tmpBufPool.Get()
bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block) bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block)
addr, err := tbfw.tbfs[workerID].WriteBlockData(bb.B, workerID)
addr, err := tbfwLocal.tbf.WriteBlockData(bb.B, workerID)
tmpBufPool.Put(bb) tmpBufPool.Put(bb)
if err != nil { if err != nil {
return err return err
} }
metricName := mb.MetricName metricName := mb.MetricName
m := tbfw.ms[workerID] m := tbfwLocal.m
addrsIdx, ok := m[string(metricName)] addrsIdx, ok := m[string(metricName)]
if !ok { if !ok {
addrsIdx = tbfw.newBlockAddrs(workerID) addrsIdx = tbfwLocal.newBlockAddrs()
} }
addrs := &tbfw.addrssPools[workerID][addrsIdx] addrs := &tbfwLocal.addrssPool[addrsIdx]
addrsPool := tbfw.addrsPools[workerID] addrsPool := tbfwLocal.addrsPool
if addrs.addrs == nil || haveSameBlockAddrTails(addrs.addrs, addrsPool) { if addrs.addrs == nil || haveSameBlockAddrTails(addrs.addrs, addrsPool) {
// It is safe appending addr to addrsPool, since there are no other items added there yet. // It is safe appending addr to addrsPool, since there are no other items added there yet.
addrsPool = append(addrsPool, addr) addrsPool = append(addrsPool, addr)
tbfw.addrsPools[workerID] = addrsPool tbfwLocal.addrsPool = addrsPool
addrs.addrs = addrsPool[len(addrsPool)-len(addrs.addrs)-1 : len(addrsPool) : len(addrsPool)] addrs.addrs = addrsPool[len(addrsPool)-len(addrs.addrs)-1 : len(addrsPool) : len(addrsPool)]
} else { } else {
// It is unsafe appending addr to addrsPool, since there are other items added there. // It is unsafe appending addr to addrsPool, since there are other items added there.
@ -1441,46 +1450,71 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock,
addrs.addrs = append(addrs.addrs, addr) addrs.addrs = append(addrs.addrs, addr)
} }
if len(addrs.addrs) == 1 { if len(addrs.addrs) == 1 {
metricNamesBuf := tbfw.metricNamesBufs[workerID] metricNamesBuf := tbfwLocal.metricNamesBuf
metricNamesBufLen := len(metricNamesBuf) metricNamesBufLen := len(metricNamesBuf)
metricNamesBuf = append(metricNamesBuf, metricName...) metricNamesBuf = append(metricNamesBuf, metricName...)
metricNameStr := bytesutil.ToUnsafeString(metricNamesBuf[metricNamesBufLen:]) metricNameStr := bytesutil.ToUnsafeString(metricNamesBuf[metricNamesBufLen:])
orderedMetricNames := tbfw.orderedMetricNamess[workerID] orderedMetricNames := tbfwLocal.orderedMetricNames
orderedMetricNames = append(orderedMetricNames, metricNameStr) orderedMetricNames = append(orderedMetricNames, metricNameStr)
m[metricNameStr] = addrsIdx m[metricNameStr] = addrsIdx
tbfw.orderedMetricNamess[workerID] = orderedMetricNames tbfwLocal.orderedMetricNames = orderedMetricNames
tbfw.metricNamesBufs[workerID] = metricNamesBuf tbfwLocal.metricNamesBuf = metricNamesBuf
} }
return nil return nil
} }
func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, []blockAddrs, map[string]int, uint64, error) { func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, []blockAddrs, map[string]int, uint64, error) {
shards := tbfw.shards
var bytesTotal uint64 var bytesTotal uint64
for i, tbf := range tbfw.tbfs { for i := range shards {
tbf := shards[i].tbf
if err := tbf.Finalize(); err != nil { if err := tbf.Finalize(); err != nil {
closeTmpBlockFiles(tbfw.tbfs) tbfw.closeTmpBlockFiles()
return nil, nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(tbfw.ms[i]), err) return nil, nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(shards[i].m), err)
} }
bytesTotal += tbf.Len() bytesTotal += tbf.Len()
} }
orderedMetricNames := tbfw.orderedMetricNamess[0]
addrsByMetricName := tbfw.ms[0] // merge data collected from all the shards
for i, m := range tbfw.ms[1:] { tbfwFirst := &shards[0]
addrssPools := tbfw.addrssPools[i] orderedMetricNames := tbfwFirst.orderedMetricNames
for _, metricName := range tbfw.orderedMetricNamess[i+1] { addrsByMetricName := tbfwFirst.m
for i := 1; i < len(shards); i++ {
tbfwLocal := &shards[i]
m := tbfwLocal.m
addrssPool := tbfwLocal.addrssPool
for _, metricName := range tbfwLocal.orderedMetricNames {
dstAddrsIdx, ok := addrsByMetricName[metricName] dstAddrsIdx, ok := addrsByMetricName[metricName]
if !ok { if !ok {
orderedMetricNames = append(orderedMetricNames, metricName) orderedMetricNames = append(orderedMetricNames, metricName)
dstAddrsIdx = tbfw.newBlockAddrs(0) dstAddrsIdx = tbfwFirst.newBlockAddrs()
addrsByMetricName[metricName] = dstAddrsIdx addrsByMetricName[metricName] = dstAddrsIdx
} }
dstAddrs := &tbfw.addrssPools[0][dstAddrsIdx] dstAddrs := &tbfwFirst.addrssPool[dstAddrsIdx]
dstAddrs.addrs = append(dstAddrs.addrs, addrssPools[m[metricName]].addrs...) dstAddrs.addrs = append(dstAddrs.addrs, addrssPool[m[metricName]].addrs...)
} }
} }
return orderedMetricNames, tbfw.addrssPools[0], addrsByMetricName, bytesTotal, nil
return orderedMetricNames, tbfwFirst.addrssPool, addrsByMetricName, bytesTotal, nil
}
func (tbfw *tmpBlocksFileWrapper) closeTmpBlockFiles() {
tbfs := tbfw.getTmpBlockFiles()
closeTmpBlockFiles(tbfs)
}
func (tbfw *tmpBlocksFileWrapper) getTmpBlockFiles() []*tmpBlocksFile {
shards := tbfw.shards
tbfs := make([]*tmpBlocksFile, len(shards))
for i := range shards {
tbfs[i] = shards[i].tbf
}
return tbfs
} }
var metricNamePool = &sync.Pool{ var metricNamePool = &sync.Pool{
@ -1632,7 +1666,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
} }
isPartial, err := processBlocks(qt, sns, denyPartialResponse, sq, processBlock, deadline) isPartial, err := processBlocks(qt, sns, denyPartialResponse, sq, processBlock, deadline)
if err != nil { if err != nil {
closeTmpBlockFiles(tbfw.tbfs) tbfw.closeTmpBlockFiles()
return nil, false, fmt.Errorf("error occured during search: %w", err) return nil, false, fmt.Errorf("error occured during search: %w", err)
} }
orderedMetricNames, addrssPool, m, bytesTotal, err := tbfw.Finalize() orderedMetricNames, addrssPool, m, bytesTotal, err := tbfw.Finalize()
@ -1644,7 +1678,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
var rss Results var rss Results
rss.tr = tr rss.tr = tr
rss.deadline = deadline rss.deadline = deadline
rss.tbfs = tbfw.tbfs rss.tbfs = tbfw.getTmpBlockFiles()
pts := make([]packedTimeseries, len(orderedMetricNames)) pts := make([]packedTimeseries, len(orderedMetricNames))
for i, metricName := range orderedMetricNames { for i, metricName := range orderedMetricNames {
pts[i] = packedTimeseries{ pts[i] = packedTimeseries{