diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 49ea0be613..6f1c3483ce 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1349,27 +1349,40 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia } type tmpBlocksFileWrapper struct { - tbfs []*tmpBlocksFile + shards []tmpBlocksFileWrapperShardWithPadding +} - // metricNamesBufs are per-worker bufs for holding all the loaded unique metric names for ms and orderedMetricNamess. +type tmpBlocksFileWrapperShard struct { + // tbf is a file where temporary blocks are stored from the read time series. + tbf *tmpBlocksFile + + // metricNamesBuf is a buf for holding all the loaded unique metric names for m and orderedMetricNames. // It should reduce pressure on Go GC by reducing the number of string allocations // when constructing metricName string from byte slice. - metricNamesBufs [][]byte + metricNamesBuf []byte - // addrssPools are per-worker pools for holding all the blockAddrs across all the loaded time series. + // addrssPool is a pool for holding all the blockAddrs objects across all the loaded time series. // It should reduce pressure on Go GC by reducing the number of blockAddrs object allocations. - addrssPools [][]blockAddrs + addrssPool []blockAddrs - // addrsPools are per-worker pools for holding the most of blockAddrs.addrs slices + // addrsPool is a pool for holding the most of blockAddrs.addrs slices. // It should reduce pressure on Go GC by reducing the number of blockAddrs.addrs allocations. - addrsPools [][]tmpBlockAddr + addrsPool []tmpBlockAddr - // ms maps metricName to the addrssPools index. - ms []map[string]int + // m maps metricName to the addrssPool index. + m map[string]int - // orderedMetricNamess contain metric names in the order of their load. + // orderedMetricNames contains metric names in the order of their load. // This order is important for sequential read of data from tmpBlocksFile. - orderedMetricNamess [][]string + orderedMetricNames []string +} + +type tmpBlocksFileWrapperShardWithPadding struct { + tmpBlocksFileWrapperShard + + // The padding prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(tmpBlocksFileWrapperShard{})%128]byte } type blockAddrs struct { @@ -1382,58 +1395,54 @@ func haveSameBlockAddrTails(a, b []tmpBlockAddr) bool { return sha.Data+uintptr(sha.Len)*unsafe.Sizeof(tmpBlockAddr{}) == shb.Data+uintptr(shb.Len)*unsafe.Sizeof(tmpBlockAddr{}) } -func (tbfw *tmpBlocksFileWrapper) newBlockAddrs(workerID uint) int { - addrssPool := tbfw.addrssPools[workerID] +func (tbfwLocal *tmpBlocksFileWrapperShard) newBlockAddrs() int { + addrssPool := tbfwLocal.addrssPool if cap(addrssPool) > len(addrssPool) { addrssPool = addrssPool[:len(addrssPool)+1] } else { addrssPool = append(addrssPool, blockAddrs{}) } - tbfw.addrssPools[workerID] = addrssPool + tbfwLocal.addrssPool = addrssPool idx := len(addrssPool) - 1 return idx } func newTmpBlocksFileWrapper(sns []*storageNode) *tmpBlocksFileWrapper { n := len(sns) - tbfs := make([]*tmpBlocksFile, n) - for i := range tbfs { - tbfs[i] = getTmpBlocksFile() - } - ms := make([]map[string]int, n) - for i := range ms { - ms[i] = make(map[string]int) + shards := make([]tmpBlocksFileWrapperShardWithPadding, n) + for i := range shards { + shard := &shards[i] + shard.tbf = getTmpBlocksFile() + shard.m = make(map[string]int) } return &tmpBlocksFileWrapper{ - tbfs: tbfs, - metricNamesBufs: make([][]byte, n), - addrssPools: make([][]blockAddrs, n), - addrsPools: make([][]tmpBlockAddr, n), - ms: ms, - orderedMetricNamess: make([][]string, n), + shards: shards, } } func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerID uint) error { + tbfwLocal := &tbfw.shards[workerID] + bb := tmpBufPool.Get() bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block) - addr, err := tbfw.tbfs[workerID].WriteBlockData(bb.B, workerID) + + addr, err := tbfwLocal.tbf.WriteBlockData(bb.B, workerID) tmpBufPool.Put(bb) if err != nil { return err } metricName := mb.MetricName - m := tbfw.ms[workerID] + m := tbfwLocal.m addrsIdx, ok := m[string(metricName)] if !ok { - addrsIdx = tbfw.newBlockAddrs(workerID) + addrsIdx = tbfwLocal.newBlockAddrs() } - addrs := &tbfw.addrssPools[workerID][addrsIdx] - addrsPool := tbfw.addrsPools[workerID] + addrs := &tbfwLocal.addrssPool[addrsIdx] + addrsPool := tbfwLocal.addrsPool if addrs.addrs == nil || haveSameBlockAddrTails(addrs.addrs, addrsPool) { // It is safe appending addr to addrsPool, since there are no other items added there yet. addrsPool = append(addrsPool, addr) - tbfw.addrsPools[workerID] = addrsPool + tbfwLocal.addrsPool = addrsPool addrs.addrs = addrsPool[len(addrsPool)-len(addrs.addrs)-1 : len(addrsPool) : len(addrsPool)] } else { // It is unsafe appending addr to addrsPool, since there are other items added there. @@ -1441,46 +1450,71 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, addrs.addrs = append(addrs.addrs, addr) } if len(addrs.addrs) == 1 { - metricNamesBuf := tbfw.metricNamesBufs[workerID] + metricNamesBuf := tbfwLocal.metricNamesBuf metricNamesBufLen := len(metricNamesBuf) metricNamesBuf = append(metricNamesBuf, metricName...) metricNameStr := bytesutil.ToUnsafeString(metricNamesBuf[metricNamesBufLen:]) - orderedMetricNames := tbfw.orderedMetricNamess[workerID] + orderedMetricNames := tbfwLocal.orderedMetricNames orderedMetricNames = append(orderedMetricNames, metricNameStr) m[metricNameStr] = addrsIdx - tbfw.orderedMetricNamess[workerID] = orderedMetricNames - tbfw.metricNamesBufs[workerID] = metricNamesBuf + tbfwLocal.orderedMetricNames = orderedMetricNames + tbfwLocal.metricNamesBuf = metricNamesBuf } return nil } func (tbfw *tmpBlocksFileWrapper) Finalize() ([]string, []blockAddrs, map[string]int, uint64, error) { + shards := tbfw.shards + var bytesTotal uint64 - for i, tbf := range tbfw.tbfs { + for i := range shards { + tbf := shards[i].tbf if err := tbf.Finalize(); err != nil { - closeTmpBlockFiles(tbfw.tbfs) - return nil, nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(tbfw.ms[i]), err) + tbfw.closeTmpBlockFiles() + return nil, nil, nil, 0, fmt.Errorf("cannot finalize temporary blocks file with %d series: %w", len(shards[i].m), err) } bytesTotal += tbf.Len() } - orderedMetricNames := tbfw.orderedMetricNamess[0] - addrsByMetricName := tbfw.ms[0] - for i, m := range tbfw.ms[1:] { - addrssPools := tbfw.addrssPools[i] - for _, metricName := range tbfw.orderedMetricNamess[i+1] { + + // merge data collected from all the shards + tbfwFirst := &shards[0] + orderedMetricNames := tbfwFirst.orderedMetricNames + addrsByMetricName := tbfwFirst.m + for i := 1; i < len(shards); i++ { + tbfwLocal := &shards[i] + + m := tbfwLocal.m + addrssPool := tbfwLocal.addrssPool + for _, metricName := range tbfwLocal.orderedMetricNames { dstAddrsIdx, ok := addrsByMetricName[metricName] if !ok { orderedMetricNames = append(orderedMetricNames, metricName) - dstAddrsIdx = tbfw.newBlockAddrs(0) + dstAddrsIdx = tbfwFirst.newBlockAddrs() addrsByMetricName[metricName] = dstAddrsIdx } - dstAddrs := &tbfw.addrssPools[0][dstAddrsIdx] - dstAddrs.addrs = append(dstAddrs.addrs, addrssPools[m[metricName]].addrs...) + dstAddrs := &tbfwFirst.addrssPool[dstAddrsIdx] + dstAddrs.addrs = append(dstAddrs.addrs, addrssPool[m[metricName]].addrs...) } } - return orderedMetricNames, tbfw.addrssPools[0], addrsByMetricName, bytesTotal, nil + + return orderedMetricNames, tbfwFirst.addrssPool, addrsByMetricName, bytesTotal, nil +} + +func (tbfw *tmpBlocksFileWrapper) closeTmpBlockFiles() { + tbfs := tbfw.getTmpBlockFiles() + closeTmpBlockFiles(tbfs) +} + +func (tbfw *tmpBlocksFileWrapper) getTmpBlockFiles() []*tmpBlocksFile { + shards := tbfw.shards + + tbfs := make([]*tmpBlocksFile, len(shards)) + for i := range shards { + tbfs[i] = shards[i].tbf + } + return tbfs } var metricNamePool = &sync.Pool{ @@ -1632,7 +1666,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st } isPartial, err := processBlocks(qt, sns, denyPartialResponse, sq, processBlock, deadline) if err != nil { - closeTmpBlockFiles(tbfw.tbfs) + tbfw.closeTmpBlockFiles() return nil, false, fmt.Errorf("error occured during search: %w", err) } orderedMetricNames, addrssPool, m, bytesTotal, err := tbfw.Finalize() @@ -1644,7 +1678,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st var rss Results rss.tr = tr rss.deadline = deadline - rss.tbfs = tbfw.tbfs + rss.tbfs = tbfw.getTmpBlockFiles() pts := make([]packedTimeseries, len(orderedMetricNames)) for i, metricName := range orderedMetricNames { pts[i] = packedTimeseries{