app/vmselect/netstorage: substitute sorting packedTimeseries with the natural order of the fetched blocks

This should minimize the number of disk seeks when reading data from temporary file.
This commit is contained in:
Aliaksandr Valialkin 2020-04-26 16:45:51 +03:00
parent 31861c5b8e
commit 23a310cc68

View File

@ -814,6 +814,7 @@ type tmpBlocksFileWrapper struct {
mu sync.Mutex mu sync.Mutex
tbf *tmpBlocksFile tbf *tmpBlocksFile
m map[string][]tmpBlockAddr m map[string][]tmpBlockAddr
orderedMetricNames []string
} }
func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error { func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error {
@ -824,7 +825,11 @@ func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error {
tmpBufPool.Put(bb) tmpBufPool.Put(bb)
if err == nil { if err == nil {
metricName := mb.MetricName metricName := mb.MetricName
tbfw.m[string(metricName)] = append(tbfw.m[string(metricName)], addr) addrs := tbfw.m[string(metricName)]
if len(addrs) == 0 {
tbfw.orderedMetricNames = append(tbfw.orderedMetricNames, string(metricName))
}
tbfw.m[string(metricName)] = append(addrs, addr)
} }
tbfw.mu.Unlock() tbfw.mu.Unlock()
return err return err
@ -889,28 +894,19 @@ func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool,
} }
var rss Results var rss Results
rss.packedTimeseries = make([]packedTimeseries, len(tbfw.m))
rss.at = at rss.at = at
rss.tr = tr rss.tr = tr
rss.fetchData = fetchData rss.fetchData = fetchData
rss.deadline = deadline rss.deadline = deadline
rss.tbf = tbfw.tbf rss.tbf = tbfw.tbf
i := 0 pts := make([]packedTimeseries, len(tbfw.orderedMetricNames))
for metricName, addrs := range tbfw.m { for i, metricName := range tbfw.orderedMetricNames {
pts := &rss.packedTimeseries[i] pts[i] = packedTimeseries{
i++ metricName: metricName,
pts.metricName = metricName addrs: tbfw.m[metricName],
pts.addrs = addrs
} }
}
// Sort rss.packedTimeseries by the first addr offset in order rss.packedTimeseries = pts
// to reduce the number of disk seeks during unpacking in RunParallel.
// In this case tmpBlocksFile must be read almost sequentially.
sort.Slice(rss.packedTimeseries, func(i, j int) bool {
pts := rss.packedTimeseries
return pts[i].addrs[0].offset < pts[j].addrs[0].offset
})
return &rss, isPartialResult, nil return &rss, isPartialResult, nil
} }