mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
app/vmselect: optimize /api/v1/series
by skipping storage data
Fetch and process only time series metainfo.
This commit is contained in:
parent
241170dc05
commit
47e4b50112
@ -50,6 +50,7 @@ func (r *Result) reset() {
|
||||
// Results holds results returned from ProcessSearchQuery.
|
||||
type Results struct {
|
||||
tr storage.TimeRange
|
||||
fetchData bool
|
||||
deadline Deadline
|
||||
|
||||
tbf *tmpBlocksFile
|
||||
@ -103,10 +104,10 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
||||
err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.Timeout)
|
||||
break
|
||||
}
|
||||
if err = pts.Unpack(rss.tbf, rs, rss.tr, maxWorkersCount); err != nil {
|
||||
if err = pts.Unpack(rss.tbf, rs, rss.tr, rss.fetchData, maxWorkersCount); err != nil {
|
||||
break
|
||||
}
|
||||
if len(rs.Timestamps) == 0 {
|
||||
if len(rs.Timestamps) == 0 && rss.fetchData {
|
||||
// Skip empty blocks.
|
||||
continue
|
||||
}
|
||||
@ -149,7 +150,7 @@ type packedTimeseries struct {
|
||||
}
|
||||
|
||||
// Unpack unpacks pts to dst.
|
||||
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, maxWorkersCount int) error {
|
||||
func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.TimeRange, fetchData bool, maxWorkersCount int) error {
|
||||
dst.reset()
|
||||
|
||||
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
||||
@ -176,7 +177,7 @@ func (pts *packedTimeseries) Unpack(tbf *tmpBlocksFile, dst *Result, tr storage.
|
||||
var err error
|
||||
for addr := range workCh {
|
||||
sb := getSortBlock()
|
||||
if err = sb.unpackFrom(tbf, addr, tr); err != nil {
|
||||
if err = sb.unpackFrom(tbf, addr, tr, fetchData); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
@ -295,11 +296,13 @@ func (sb *sortBlock) reset() {
|
||||
sb.NextIdx = 0
|
||||
}
|
||||
|
||||
func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange) error {
|
||||
func (sb *sortBlock) unpackFrom(tbf *tmpBlocksFile, addr tmpBlockAddr, tr storage.TimeRange, fetchData bool) error {
|
||||
tbf.MustReadBlockAt(&sb.b, addr)
|
||||
if fetchData {
|
||||
if err := sb.b.UnmarshalData(); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal block: %s", err)
|
||||
}
|
||||
}
|
||||
timestamps := sb.b.Timestamps()
|
||||
|
||||
// Skip timestamps smaller than tr.MinTimestamp.
|
||||
@ -460,7 +463,7 @@ var ssPool sync.Pool
|
||||
var missingMetricNamesForMetricID = metrics.NewCounter(`vm_missing_metric_names_for_metric_id_total`)
|
||||
|
||||
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
|
||||
func ProcessSearchQuery(sq *storage.SearchQuery, deadline Deadline) (*Results, error) {
|
||||
func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, error) {
|
||||
// Setup search.
|
||||
tfss, err := setupTfss(sq.TagFilterss)
|
||||
if err != nil {
|
||||
@ -476,7 +479,7 @@ func ProcessSearchQuery(sq *storage.SearchQuery, deadline Deadline) (*Results, e
|
||||
|
||||
sr := getStorageSearch()
|
||||
defer putStorageSearch(sr)
|
||||
sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch)
|
||||
sr.Init(vmstorage.Storage, tfss, tr, fetchData, *maxMetricsPerSearch)
|
||||
|
||||
tbf := getTmpBlocksFile()
|
||||
m := make(map[string][]tmpBlockAddr)
|
||||
@ -507,6 +510,7 @@ func ProcessSearchQuery(sq *storage.SearchQuery, deadline Deadline) (*Results, e
|
||||
var rss Results
|
||||
rss.packedTimeseries = make([]packedTimeseries, len(m))
|
||||
rss.tr = tr
|
||||
rss.fetchData = fetchData
|
||||
rss.deadline = deadline
|
||||
rss.tbf = tbf
|
||||
i := 0
|
||||
|
@ -65,7 +65,7 @@ func FederateHandler(w http.ResponseWriter, r *http.Request) error {
|
||||
MaxTimestamp: end,
|
||||
TagFilterss: tagFilterss,
|
||||
}
|
||||
rss, err := netstorage.ProcessSearchQuery(sq, deadline)
|
||||
rss, err := netstorage.ProcessSearchQuery(sq, true, deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
|
||||
}
|
||||
@ -157,7 +157,7 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo
|
||||
MaxTimestamp: end,
|
||||
TagFilterss: tagFilterss,
|
||||
}
|
||||
rss, err := netstorage.ProcessSearchQuery(sq, deadline)
|
||||
rss, err := netstorage.ProcessSearchQuery(sq, true, deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
|
||||
}
|
||||
@ -336,7 +336,7 @@ func SeriesHandler(w http.ResponseWriter, r *http.Request) error {
|
||||
MaxTimestamp: end,
|
||||
TagFilterss: tagFilterss,
|
||||
}
|
||||
rss, err := netstorage.ProcessSearchQuery(sq, deadline)
|
||||
rss, err := netstorage.ProcessSearchQuery(sq, false, deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
|
||||
}
|
||||
|
@ -570,7 +570,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
|
||||
MaxTimestamp: ec.End + ec.Step,
|
||||
TagFilterss: [][]storage.TagFilter{me.TagFilters},
|
||||
}
|
||||
rss, err := netstorage.ProcessSearchQuery(sq, ec.Deadline)
|
||||
rss, err := netstorage.ProcessSearchQuery(sq, true, ec.Deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -28,6 +28,9 @@ type partSearch struct {
|
||||
// tr is a time range to search.
|
||||
tr TimeRange
|
||||
|
||||
// Skip populating timestampsData and valuesData in Block if fetchData=false.
|
||||
fetchData bool
|
||||
|
||||
metaindex []metaindexRow
|
||||
|
||||
ibCache *indexBlockCache
|
||||
@ -61,7 +64,7 @@ func (ps *partSearch) reset() {
|
||||
}
|
||||
|
||||
// Init initializes the ps with the given p, tsids and tr.
|
||||
func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) {
|
||||
func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) {
|
||||
ps.reset()
|
||||
ps.p = p
|
||||
|
||||
@ -72,6 +75,7 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) {
|
||||
ps.tsids = append(ps.tsids[:0], tsids...)
|
||||
}
|
||||
ps.tr = tr
|
||||
ps.fetchData = fetchData
|
||||
ps.metaindex = p.metaindex
|
||||
ps.ibCache = &p.ibCache
|
||||
|
||||
@ -281,11 +285,14 @@ func (ps *partSearch) searchBHS() bool {
|
||||
|
||||
func (ps *partSearch) readBlock(bh *blockHeader) {
|
||||
ps.Block.Reset()
|
||||
ps.Block.bh = *bh
|
||||
if !ps.fetchData {
|
||||
return
|
||||
}
|
||||
|
||||
ps.Block.timestampsData = bytesutil.Resize(ps.Block.timestampsData[:0], int(bh.TimestampsBlockSize))
|
||||
ps.p.timestampsFile.ReadAt(ps.Block.timestampsData, int64(bh.TimestampsBlockOffset))
|
||||
|
||||
ps.Block.valuesData = bytesutil.Resize(ps.Block.valuesData[:0], int(bh.ValuesBlockSize))
|
||||
ps.p.valuesFile.ReadAt(ps.Block.valuesData, int64(bh.ValuesBlockOffset))
|
||||
|
||||
ps.Block.bh = *bh
|
||||
}
|
||||
|
@ -1247,7 +1247,7 @@ func testPartSearch(t *testing.T, p *part, tsids []TSID, tr TimeRange, expectedR
|
||||
|
||||
func testPartSearchSerial(p *part, tsids []TSID, tr TimeRange, expectedRawBlocks []rawBlock) error {
|
||||
var ps partSearch
|
||||
ps.Init(p, tsids, tr)
|
||||
ps.Init(p, tsids, tr, true)
|
||||
var bs []Block
|
||||
for ps.NextBlock() {
|
||||
var b Block
|
||||
|
@ -56,7 +56,7 @@ func (pts *partitionSearch) reset() {
|
||||
// Init initializes the search in the given partition for the given tsid and tr.
|
||||
//
|
||||
// MustClose must be called when partition search is done.
|
||||
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) {
|
||||
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetchData bool) {
|
||||
if pts.needClosing {
|
||||
logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init")
|
||||
}
|
||||
@ -85,7 +85,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) {
|
||||
}
|
||||
pts.psPool = pts.psPool[:len(pts.pws)]
|
||||
for i, pw := range pts.pws {
|
||||
pts.psPool[i].Init(pw.p, tsids, tr)
|
||||
pts.psPool[i].Init(pw.p, tsids, tr, fetchData)
|
||||
}
|
||||
|
||||
// Initialize the psHeap.
|
||||
|
@ -238,7 +238,7 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
|
||||
|
||||
bs := []Block{}
|
||||
var pts partitionSearch
|
||||
pts.Init(pt, tsids, tr)
|
||||
pts.Init(pt, tsids, tr, true)
|
||||
for pts.NextBlock() {
|
||||
var b Block
|
||||
b.CopyFrom(pts.Block)
|
||||
@ -263,7 +263,7 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
|
||||
}
|
||||
|
||||
// verify that empty tsids returns empty result
|
||||
pts.Init(pt, []TSID{}, tr)
|
||||
pts.Init(pt, []TSID{}, tr, true)
|
||||
if pts.NextBlock() {
|
||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block)
|
||||
}
|
||||
@ -271,6 +271,16 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
|
||||
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
||||
}
|
||||
pts.MustClose()
|
||||
|
||||
pts.Init(pt, []TSID{}, tr, false)
|
||||
if pts.NextBlock() {
|
||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", pts.Block)
|
||||
}
|
||||
if err := pts.Error(); err != nil {
|
||||
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
||||
}
|
||||
pts.MustClose()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -110,7 +110,7 @@ func (s *Search) reset() {
|
||||
// Init initializes s from the given storage, tfss and tr.
|
||||
//
|
||||
// MustClose must be called when the search is done.
|
||||
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int) {
|
||||
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, fetchData bool, maxMetrics int) {
|
||||
if s.needClosing {
|
||||
logger.Panicf("BUG: missing MustClose call before the next call to Init")
|
||||
}
|
||||
@ -123,7 +123,7 @@ func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMet
|
||||
// It is ok to call Init on error from storage.searchTSIDs.
|
||||
// Init must be called before returning because it will fail
|
||||
// on Seach.MustClose otherwise.
|
||||
s.ts.Init(storage.tb, tsids, tr)
|
||||
s.ts.Init(storage.tb, tsids, tr, fetchData)
|
||||
|
||||
if err != nil {
|
||||
s.err = err
|
||||
|
@ -193,7 +193,7 @@ func testSearch(st *Storage, tr TimeRange, mrs []MetricRow, accountsCount int) e
|
||||
}
|
||||
|
||||
// Search
|
||||
s.Init(st, []*TagFilters{tfs}, tr, 1e5)
|
||||
s.Init(st, []*TagFilters{tfs}, tr, true, 1e5)
|
||||
var mbs []MetricBlock
|
||||
for s.NextMetricBlock() {
|
||||
var b Block
|
||||
|
@ -502,12 +502,24 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
||||
MaxTimestamp: 2e10,
|
||||
}
|
||||
metricBlocksCount := func(tfs *TagFilters) int {
|
||||
// Verify the number of blocks with fetchData=true
|
||||
n := 0
|
||||
sr.Init(s, []*TagFilters{tfs}, tr, 1e5)
|
||||
sr.Init(s, []*TagFilters{tfs}, tr, true, 1e5)
|
||||
for sr.NextMetricBlock() {
|
||||
n++
|
||||
}
|
||||
sr.MustClose()
|
||||
|
||||
// Make sure the number of blocks with fetchData=false is the same.
|
||||
m := 0
|
||||
sr.Init(s, []*TagFilters{tfs}, tr, false, 1e5)
|
||||
for sr.NextMetricBlock() {
|
||||
m++
|
||||
}
|
||||
sr.MustClose()
|
||||
if n != m {
|
||||
return -1
|
||||
}
|
||||
return n
|
||||
}
|
||||
for i := 0; i < metricsCount; i++ {
|
||||
|
@ -55,7 +55,7 @@ func (ts *tableSearch) reset() {
|
||||
// Init initializes the ts.
|
||||
//
|
||||
// MustClose must be called then the tableSearch is done.
|
||||
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) {
|
||||
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData bool) {
|
||||
if ts.needClosing {
|
||||
logger.Panicf("BUG: missing MustClose call before the next call to Init")
|
||||
}
|
||||
@ -86,7 +86,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) {
|
||||
}
|
||||
ts.ptsPool = ts.ptsPool[:len(ts.ptws)]
|
||||
for i, ptw := range ts.ptws {
|
||||
ts.ptsPool[i].Init(ptw.pt, tsids, tr)
|
||||
ts.ptsPool[i].Init(ptw.pt, tsids, tr, fetchData)
|
||||
}
|
||||
|
||||
// Initialize the ptsHeap.
|
||||
|
@ -251,7 +251,7 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
|
||||
|
||||
bs := []Block{}
|
||||
var ts tableSearch
|
||||
ts.Init(tb, tsids, tr)
|
||||
ts.Init(tb, tsids, tr, true)
|
||||
for ts.NextBlock() {
|
||||
var b Block
|
||||
b.CopyFrom(ts.Block)
|
||||
@ -276,7 +276,7 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
|
||||
}
|
||||
|
||||
// verify that empty tsids returns empty result
|
||||
ts.Init(tb, []TSID{}, tr)
|
||||
ts.Init(tb, []TSID{}, tr, true)
|
||||
if ts.NextBlock() {
|
||||
return fmt.Errorf("unexpected block got for an empty tsids list: %+v", ts.Block)
|
||||
}
|
||||
@ -284,5 +284,15 @@ func testTableSearchSerial(tb *table, tsids []TSID, tr TimeRange, rbsExpected []
|
||||
return fmt.Errorf("unexpected error on empty tsids list: %s", err)
|
||||
}
|
||||
ts.MustClose()
|
||||
|
||||
ts.Init(tb, []TSID{}, tr, false)
|
||||
if ts.NextBlock() {
|
||||
return fmt.Errorf("unexpected block got for an empty tsids list with fetchData=false: %+v", ts.Block)
|
||||
}
|
||||
if err := ts.Error(); err != nil {
|
||||
return fmt.Errorf("unexpected error on empty tsids list with fetchData=false: %s", err)
|
||||
}
|
||||
ts.MustClose()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -26,7 +26,11 @@ func BenchmarkTableSearch(b *testing.B) {
|
||||
b.Run(fmt.Sprintf("tsidsCount_%d", tsidsCount), func(b *testing.B) {
|
||||
for _, tsidsSearch := range []int{1, 1e1, 1e2, 1e3, 1e4} {
|
||||
b.Run(fmt.Sprintf("tsidsSearch_%d", tsidsSearch), func(b *testing.B) {
|
||||
benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch)
|
||||
for _, fetchData := range []bool{true, false} {
|
||||
b.Run(fmt.Sprintf("fetchData_%v", fetchData), func(b *testing.B) {
|
||||
benchmarkTableSearch(b, rowsCount, tsidsCount, tsidsSearch, fetchData)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
@ -103,9 +107,9 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
|
||||
tb.MustClose()
|
||||
}
|
||||
|
||||
func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int) {
|
||||
func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int, fetchData bool) {
|
||||
startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000
|
||||
rowsPerInsert := int(maxRawRowsPerPartition)
|
||||
rowsPerInsert := getMaxRawRowsPerPartition()
|
||||
|
||||
tb := openBenchTable(b, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
|
||||
tr := TimeRange{
|
||||
@ -127,7 +131,7 @@ func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int)
|
||||
for i := range tsids {
|
||||
tsids[i].MetricID = 1 + uint64(i)
|
||||
}
|
||||
ts.Init(tb, tsids, tr)
|
||||
ts.Init(tb, tsids, tr, fetchData)
|
||||
for ts.NextBlock() {
|
||||
}
|
||||
ts.MustClose()
|
||||
|
Loading…
Reference in New Issue
Block a user