mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
app/vmselect/netstorage: do not spend CPU time on unpacking empty blocks during /api/v1/series
calls
This commit is contained in:
parent
533bf76a12
commit
973df09686
@ -180,7 +180,6 @@ type unpackWorkItem struct {
|
|||||||
type unpackWork struct {
|
type unpackWork struct {
|
||||||
ws []unpackWorkItem
|
ws []unpackWorkItem
|
||||||
tbf *tmpBlocksFile
|
tbf *tmpBlocksFile
|
||||||
fetchData bool
|
|
||||||
at *auth.Token
|
at *auth.Token
|
||||||
sbs []*sortBlock
|
sbs []*sortBlock
|
||||||
doneCh chan error
|
doneCh chan error
|
||||||
@ -195,7 +194,6 @@ func (upw *unpackWork) reset() {
|
|||||||
}
|
}
|
||||||
upw.ws = upw.ws[:0]
|
upw.ws = upw.ws[:0]
|
||||||
upw.tbf = nil
|
upw.tbf = nil
|
||||||
upw.fetchData = false
|
|
||||||
upw.at = nil
|
upw.at = nil
|
||||||
sbs := upw.sbs
|
sbs := upw.sbs
|
||||||
for i := range sbs {
|
for i := range sbs {
|
||||||
@ -210,7 +208,7 @@ func (upw *unpackWork) reset() {
|
|||||||
func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
|
func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
|
||||||
for _, w := range upw.ws {
|
for _, w := range upw.ws {
|
||||||
sb := getSortBlock()
|
sb := getSortBlock()
|
||||||
if err := sb.unpackFrom(tmpBlock, upw.tbf, w.addr, w.tr, upw.fetchData, upw.at); err != nil {
|
if err := sb.unpackFrom(tmpBlock, upw.tbf, w.addr, w.tr, upw.at); err != nil {
|
||||||
putSortBlock(sb)
|
putSortBlock(sb)
|
||||||
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
|
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
|
||||||
return
|
return
|
||||||
@ -258,17 +256,19 @@ var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1)
|
|||||||
// Unpack unpacks pts to dst.
|
// Unpack unpacks pts to dst.
|
||||||
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token) error {
|
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, at *auth.Token) error {
|
||||||
dst.reset()
|
dst.reset()
|
||||||
|
|
||||||
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
|
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
|
||||||
}
|
}
|
||||||
|
if !fetchData {
|
||||||
|
// Do not spend resources on data reading and unpacking.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Feed workers with work
|
// Feed workers with work
|
||||||
addrsLen := len(pts.addrs)
|
addrsLen := len(pts.addrs)
|
||||||
upws := make([]*unpackWork, 0, 1+addrsLen/unpackBatchSize)
|
upws := make([]*unpackWork, 0, 1+addrsLen/unpackBatchSize)
|
||||||
upw := getUnpackWork()
|
upw := getUnpackWork()
|
||||||
upw.tbf = tbf
|
upw.tbf = tbf
|
||||||
upw.fetchData = fetchData
|
|
||||||
upw.at = at
|
upw.at = at
|
||||||
for _, addr := range pts.addrs {
|
for _, addr := range pts.addrs {
|
||||||
if len(upw.ws) >= unpackBatchSize {
|
if len(upw.ws) >= unpackBatchSize {
|
||||||
@ -276,7 +276,6 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.
|
|||||||
upws = append(upws, upw)
|
upws = append(upws, upw)
|
||||||
upw = getUnpackWork()
|
upw = getUnpackWork()
|
||||||
upw.tbf = tbf
|
upw.tbf = tbf
|
||||||
upw.fetchData = fetchData
|
|
||||||
upw.at = at
|
upw.at = at
|
||||||
}
|
}
|
||||||
upw.ws = append(upw.ws, unpackWorkItem{
|
upw.ws = append(upw.ws, unpackWorkItem{
|
||||||
@ -394,14 +393,12 @@ func (sb *sortBlock) reset() {
|
|||||||
sb.NextIdx = 0
|
sb.NextIdx = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, fetchData bool, at *auth.Token) error {
|
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, at *auth.Token) error {
|
||||||
tmpBlock.Reset()
|
tmpBlock.Reset()
|
||||||
tbf.MustReadBlockAt(tmpBlock, addr)
|
tbf.MustReadBlockAt(tmpBlock, addr)
|
||||||
if fetchData {
|
|
||||||
if err := tmpBlock.UnmarshalData(); err != nil {
|
if err := tmpBlock.UnmarshalData(); err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal block: %w", err)
|
return fmt.Errorf("cannot unmarshal block: %w", err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
timestamps := tmpBlock.Timestamps()
|
timestamps := tmpBlock.Timestamps()
|
||||||
|
|
||||||
// Skip timestamps smaller than tr.MinTimestamp.
|
// Skip timestamps smaller than tr.MinTimestamp.
|
||||||
@ -982,7 +979,19 @@ type tmpBlocksFileWrapper struct {
|
|||||||
orderedMetricNames []string
|
orderedMetricNames []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error {
|
func (tbfw *tmpBlocksFileWrapper) RegisterEmptyBlock(mb *storage.MetricBlock) {
|
||||||
|
metricName := mb.MetricName
|
||||||
|
tbfw.mu.Lock()
|
||||||
|
if addrs := tbfw.m[string(metricName)]; addrs == nil {
|
||||||
|
// An optimization for big number of time series with long names: store only a single copy of metricNameStr
|
||||||
|
// in both tbfw.orderedMetricNames and tbfw.m.
|
||||||
|
tbfw.orderedMetricNames = append(tbfw.orderedMetricNames, string(metricName))
|
||||||
|
tbfw.m[tbfw.orderedMetricNames[len(tbfw.orderedMetricNames)-1]] = []tmpBlockAddr{{}}
|
||||||
|
}
|
||||||
|
tbfw.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock) error {
|
||||||
bb := tmpBufPool.Get()
|
bb := tmpBufPool.Get()
|
||||||
bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block)
|
bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block)
|
||||||
tbfw.mu.Lock()
|
tbfw.mu.Lock()
|
||||||
@ -1754,7 +1763,11 @@ func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc *
|
|||||||
blocksRead++
|
blocksRead++
|
||||||
sn.metricBlocksRead.Inc()
|
sn.metricBlocksRead.Inc()
|
||||||
sn.metricRowsRead.Add(mb.Block.RowsCount())
|
sn.metricRowsRead.Add(mb.Block.RowsCount())
|
||||||
if err := tbfw.WriteBlock(&mb); err != nil {
|
if !fetchData {
|
||||||
|
tbfw.RegisterEmptyBlock(&mb)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := tbfw.RegisterAndWriteBlock(&mb); err != nil {
|
||||||
return blocksRead, fmt.Errorf("cannot write MetricBlock #%d to temporary blocks file: %w", blocksRead, err)
|
return blocksRead, fmt.Errorf("cannot write MetricBlock #%d to temporary blocks file: %w", blocksRead, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user