VictoriaMetrics/lib/logstorage/storage_search.go
Aliaksandr Valialkin a4ea3b87d7
lib/logstorage: optimize query imeediately after its parsing
This eliminates possible bugs related to forgotten Query.Optimize() calls.

This also allows removing optimize() function from pipe interface.

While at it, drop filterNoop inside filterAnd.

(cherry picked from commit 66b2987f49)
2024-11-08 17:07:56 +01:00

1300 lines
32 KiB
Go

package logstorage
import (
"context"
"fmt"
"math"
"slices"
"sort"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// genericSearchOptions contain options used for search.
type genericSearchOptions struct {
// tenantIDs must contain the list of tenantIDs for the search.
tenantIDs []TenantID
// streamIDs is an optional sorted list of streamIDs for the search.
// If it is empty, then the search is performed by tenantIDs
streamIDs []streamID
// minTimestamp is the minimum timestamp for the search
minTimestamp int64
// maxTimestamp is the maximum timestamp for the search
maxTimestamp int64
// filter is the filter to use for the search
filter filter
// neededColumnNames contains names of columns to return in the result
neededColumnNames []string
// unneededColumnNames contains names of columns, which mustn't be returned in the result.
//
// This list is consulted if needAllColumns=true
unneededColumnNames []string
// needAllColumns is set to true when all the columns except of unneededColumnNames must be returned in the result
needAllColumns bool
}
type searchOptions struct {
// Optional sorted list of tenantIDs for the search.
// If it is empty, then the search is performed by streamIDs
tenantIDs []TenantID
// Optional sorted list of streamIDs for the search.
// If it is empty, then the search is performed by tenantIDs
streamIDs []streamID
// minTimestamp is the minimum timestamp for the search
minTimestamp int64
// maxTimestamp is the maximum timestamp for the search
maxTimestamp int64
// filter is the filter to use for the search
filter filter
// neededColumnNames contains names of columns to return in the result
neededColumnNames []string
// unneededColumnNames contains names of columns, which mustn't be returned in the result.
//
// This list is consulted when needAllColumns=true.
unneededColumnNames []string
// needAllColumns is set to true when all the columns except of unneededColumnNames must be returned in the result
needAllColumns bool
}
// WriteBlockFunc must write a block with the given timestamps and columns.
//
// WriteBlockFunc cannot hold references to timestamps and columns after returning.
type WriteBlockFunc func(workerID uint, timestamps []int64, columns []BlockColumn)
// RunQuery runs the given q and calls writeBlock for results.
func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) error {
writeBlockResult := func(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
brs := getBlockRows()
csDst := brs.cs
cs := br.getColumns()
for _, c := range cs {
values := c.getValues(br)
csDst = append(csDst, BlockColumn{
Name: c.name,
Values: values,
})
}
timestamps := br.getTimestamps()
writeBlock(workerID, timestamps, csDst)
brs.cs = csDst
putBlockRows(brs)
}
return s.runQuery(ctx, tenantIDs, q, writeBlockResult)
}
func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlockResultFunc func(workerID uint, br *blockResult)) error {
qNew, err := s.initFilterInValues(ctx, tenantIDs, q)
if err != nil {
return err
}
qNew, err = s.initJoinMaps(ctx, tenantIDs, qNew)
if err != nil {
return err
}
q = qNew
streamIDs := q.getStreamIDs()
sort.Slice(streamIDs, func(i, j int) bool {
return streamIDs[i].less(&streamIDs[j])
})
minTimestamp, maxTimestamp := q.GetFilterTimeRange()
neededColumnNames, unneededColumnNames := q.getNeededColumns()
so := &genericSearchOptions{
tenantIDs: tenantIDs,
streamIDs: streamIDs,
minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp,
filter: q.f,
neededColumnNames: neededColumnNames,
unneededColumnNames: unneededColumnNames,
needAllColumns: slices.Contains(neededColumnNames, "*"),
}
workersCount := cgroup.AvailableCPUs()
ppMain := newDefaultPipeProcessor(writeBlockResultFunc)
pp := ppMain
stopCh := ctx.Done()
cancels := make([]func(), len(q.pipes))
pps := make([]pipeProcessor, len(q.pipes))
var errPipe error
for i := len(q.pipes) - 1; i >= 0; i-- {
p := q.pipes[i]
ctxChild, cancel := context.WithCancel(ctx)
pp = p.newPipeProcessor(workersCount, stopCh, cancel, pp)
pcp, ok := pp.(*pipeStreamContextProcessor)
if ok {
pcp.init(s, neededColumnNames, unneededColumnNames)
if i > 0 {
errPipe = fmt.Errorf("[%s] pipe must go after [%s] filter; now it goes after the [%s] pipe", p, q.f, q.pipes[i-1])
}
}
stopCh = ctxChild.Done()
ctx = ctxChild
cancels[i] = cancel
pps[i] = pp
}
if errPipe == nil {
s.search(workersCount, so, stopCh, pp.writeBlock)
}
var errFlush error
for i, pp := range pps {
if err := pp.flush(); err != nil && errFlush == nil {
errFlush = err
}
cancel := cancels[i]
cancel()
}
if err := ppMain.flush(); err != nil && errFlush == nil {
errFlush = err
}
if errPipe != nil {
return errPipe
}
return errFlush
}
// GetFieldNames returns field names from q results for the given tenantIDs.
func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) {
pipes := append([]pipe{}, q.pipes...)
pipeStr := "field_names"
lex := newLexer(pipeStr)
pf, err := parsePipeFieldNames(lex)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'field_names' pipe at [%s]: %s", pipeStr, err)
}
pf.isFirstPipe = len(pipes) == 0
if !lex.isEnd() {
logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s)
}
pipes = append(pipes, pf)
q = &Query{
f: q.f,
pipes: pipes,
}
return s.runValuesWithHitsQuery(ctx, tenantIDs, q)
}
func (s *Storage) getJoinMap(ctx context.Context, tenantIDs []TenantID, q *Query, byFields []string, prefix string) (map[string][][]Field, error) {
// TODO: track memory usage
m := make(map[string][][]Field)
var mLock sync.Mutex
writeBlockResult := func(_ uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
cs := br.getColumns()
columnNames := make([]string, len(cs))
byValuesIdxs := make([]int, len(cs))
for i := range cs {
name := strings.Clone(cs[i].name)
idx := slices.Index(byFields, name)
if prefix != "" && idx < 0 {
name = prefix + name
}
columnNames[i] = name
byValuesIdxs[i] = idx
}
byValues := make([]string, len(byFields))
var tmpBuf []byte
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
fields := make([]Field, 0, len(cs))
clear(byValues)
for j := range cs {
name := columnNames[j]
v := cs[j].getValueAtRow(br, rowIdx)
if cIdx := byValuesIdxs[j]; cIdx >= 0 {
byValues[cIdx] = v
continue
}
if v == "" {
continue
}
value := strings.Clone(v)
fields = append(fields, Field{
Name: name,
Value: value,
})
}
tmpBuf = marshalStrings(tmpBuf[:0], byValues)
k := string(tmpBuf)
mLock.Lock()
m[k] = append(m[k], fields)
mLock.Unlock()
}
}
if err := s.runQuery(ctx, tenantIDs, q, writeBlockResult); err != nil {
return nil, err
}
return m, nil
}
func marshalStrings(dst []byte, a []string) []byte {
for _, v := range a {
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(v))
}
return dst
}
func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string) ([]string, error) {
// TODO: track memory usage
pipes := append([]pipe{}, q.pipes...)
quotedFieldName := quoteTokenIfNeeded(fieldName)
pipeStr := fmt.Sprintf("uniq by (%s)", quotedFieldName)
lex := newLexer(pipeStr)
pu, err := parsePipeUniq(lex)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {
logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s)
}
pipes = append(pipes, pu)
q = &Query{
f: q.f,
pipes: pipes,
}
var values []string
var valuesLock sync.Mutex
writeBlockResult := func(_ uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
cs := br.getColumns()
if len(cs) != 1 {
logger.Panicf("BUG: expecting one column; got %d columns", len(cs))
}
columnValues := cs[0].getValues(br)
columnValuesCopy := make([]string, len(columnValues))
for i := range columnValues {
columnValuesCopy[i] = strings.Clone(columnValues[i])
}
valuesLock.Lock()
values = append(values, columnValuesCopy...)
valuesLock.Unlock()
}
if err := s.runQuery(ctx, tenantIDs, q, writeBlockResult); err != nil {
return nil, err
}
return values, nil
}
// GetFieldValues returns unique values with the number of hits for the given fieldName returned by q for the given tenantIDs.
//
// If limit > 0, then up to limit unique values are returned.
func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]ValueWithHits, error) {
pipes := append([]pipe{}, q.pipes...)
quotedFieldName := quoteTokenIfNeeded(fieldName)
pipeStr := fmt.Sprintf("field_values %s limit %d", quotedFieldName, limit)
lex := newLexer(pipeStr)
pu, err := parsePipeFieldValues(lex)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'field_values' pipe at [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {
logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s)
}
pipes = append(pipes, pu)
q = &Query{
f: q.f,
pipes: pipes,
}
return s.runValuesWithHitsQuery(ctx, tenantIDs, q)
}
// ValueWithHits contains value and hits.
type ValueWithHits struct {
Value string
Hits uint64
}
func toValuesWithHits(m map[string]*uint64) []ValueWithHits {
results := make([]ValueWithHits, 0, len(m))
for k, pHits := range m {
results = append(results, ValueWithHits{
Value: k,
Hits: *pHits,
})
}
sortValuesWithHits(results)
return results
}
func sortValuesWithHits(results []ValueWithHits) {
slices.SortFunc(results, func(a, b ValueWithHits) int {
if a.Hits == b.Hits {
if a.Value == b.Value {
return 0
}
if lessString(a.Value, b.Value) {
return -1
}
return 1
}
// Sort in descending order of hits
if a.Hits < b.Hits {
return 1
}
return -1
})
}
// GetStreamFieldNames returns stream field names from q results for the given tenantIDs.
func (s *Storage) GetStreamFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) {
streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64)
if err != nil {
return nil, err
}
m := make(map[string]*uint64)
forEachStreamField(streams, func(f Field, hits uint64) {
pHits := m[f.Name]
if pHits == nil {
nameCopy := strings.Clone(f.Name)
hitsLocal := uint64(0)
pHits = &hitsLocal
m[nameCopy] = pHits
}
*pHits += hits
})
names := toValuesWithHits(m)
return names, nil
}
// GetStreamFieldValues returns stream field values for the given fieldName from q results for the given tenantIDs.
//
// If limit > 9, then up to limit unique values are returned.
func (s *Storage) GetStreamFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]ValueWithHits, error) {
streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64)
if err != nil {
return nil, err
}
m := make(map[string]*uint64)
forEachStreamField(streams, func(f Field, hits uint64) {
if f.Name != fieldName {
return
}
pHits := m[f.Value]
if pHits == nil {
valueCopy := strings.Clone(f.Value)
hitsLocal := uint64(0)
pHits = &hitsLocal
m[valueCopy] = pHits
}
*pHits += hits
})
values := toValuesWithHits(m)
if limit > 0 && uint64(len(values)) > limit {
values = values[:limit]
}
return values, nil
}
// GetStreams returns streams from q results for the given tenantIDs.
//
// If limit > 0, then up to limit unique streams are returned.
func (s *Storage) GetStreams(ctx context.Context, tenantIDs []TenantID, q *Query, limit uint64) ([]ValueWithHits, error) {
return s.GetFieldValues(ctx, tenantIDs, q, "_stream", limit)
}
// GetStreamIDs returns stream_id field values from q results for the given tenantIDs.
//
// If limit > 0, then up to limit unique streams are returned.
func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []TenantID, q *Query, limit uint64) ([]ValueWithHits, error) {
return s.GetFieldValues(ctx, tenantIDs, q, "_stream_id", limit)
}
func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) {
var results []ValueWithHits
var resultsLock sync.Mutex
writeBlockResult := func(_ uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
cs := br.getColumns()
if len(cs) != 2 {
logger.Panicf("BUG: expecting two columns; got %d columns", len(cs))
}
columnValues := cs[0].getValues(br)
columnHits := cs[1].getValues(br)
valuesWithHits := make([]ValueWithHits, len(columnValues))
for i := range columnValues {
x := &valuesWithHits[i]
hits, _ := tryParseUint64(columnHits[i])
x.Value = strings.Clone(columnValues[i])
x.Hits = hits
}
resultsLock.Lock()
results = append(results, valuesWithHits...)
resultsLock.Unlock()
}
err := s.runQuery(ctx, tenantIDs, q, writeBlockResult)
if err != nil {
return nil, err
}
sortValuesWithHits(results)
return results, nil
}
func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) {
if !hasFilterInWithQueryForFilter(q.f) && !hasFilterInWithQueryForPipes(q.pipes) {
return q, nil
}
getFieldValues := func(q *Query, fieldName string) ([]string, error) {
return s.getFieldValuesNoHits(ctx, tenantIDs, q, fieldName)
}
cache := make(map[string][]string)
fNew, err := initFilterInValuesForFilter(cache, q.f, getFieldValues)
if err != nil {
return nil, err
}
pipesNew, err := initFilterInValuesForPipes(cache, q.pipes, getFieldValues)
if err != nil {
return nil, err
}
qNew := &Query{
f: fNew,
pipes: pipesNew,
}
return qNew, nil
}
type getJoinMapFunc func(q *Query, byFields []string, prefix string) (map[string][][]Field, error)
func (s *Storage) initJoinMaps(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) {
if !hasJoinPipes(q.pipes) {
return q, nil
}
getJoinMap := func(q *Query, byFields []string, prefix string) (map[string][][]Field, error) {
return s.getJoinMap(ctx, tenantIDs, q, byFields, prefix)
}
pipesNew := make([]pipe, len(q.pipes))
for i := range q.pipes {
p := q.pipes[i]
if pj, ok := p.(*pipeJoin); ok {
pNew, err := pj.initJoinMap(getJoinMap)
if err != nil {
return nil, err
}
p = pNew
}
pipesNew[i] = p
}
qNew := &Query{
f: q.f,
pipes: pipesNew,
}
return qNew, nil
}
func hasJoinPipes(pipes []pipe) bool {
for _, p := range pipes {
if _, ok := p.(*pipeJoin); ok {
return true
}
}
return false
}
func (iff *ifFilter) hasFilterInWithQuery() bool {
if iff == nil {
return false
}
return hasFilterInWithQueryForFilter(iff.f)
}
func hasFilterInWithQueryForFilter(f filter) bool {
if f == nil {
return false
}
visitFunc := func(f filter) bool {
switch t := f.(type) {
case *filterIn:
return t.needExecuteQuery
case *filterStreamID:
return t.needExecuteQuery
default:
return false
}
}
return visitFilter(f, visitFunc)
}
func hasFilterInWithQueryForPipes(pipes []pipe) bool {
for _, p := range pipes {
if p.hasFilterInWithQuery() {
return true
}
}
return false
}
type getFieldValuesFunc func(q *Query, fieldName string) ([]string, error)
func (iff *ifFilter) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (*ifFilter, error) {
if iff == nil {
return nil, nil
}
f, err := initFilterInValuesForFilter(cache, iff.f, getFieldValuesFunc)
if err != nil {
return nil, err
}
iffNew := *iff
iffNew.f = f
return &iffNew, nil
}
func initFilterInValuesForFilter(cache map[string][]string, f filter, getFieldValuesFunc getFieldValuesFunc) (filter, error) {
if f == nil {
return nil, nil
}
visitFunc := func(f filter) bool {
switch t := f.(type) {
case *filterIn:
return t.needExecuteQuery
case *filterStreamID:
return t.needExecuteQuery
default:
return false
}
}
copyFunc := func(f filter) (filter, error) {
switch t := f.(type) {
case *filterIn:
values, err := getValuesForQuery(t.q, t.qFieldName, cache, getFieldValuesFunc)
if err != nil {
return nil, fmt.Errorf("cannot obtain unique values for %s: %w", t, err)
}
fiNew := &filterIn{
fieldName: t.fieldName,
q: t.q,
values: values,
}
return fiNew, nil
case *filterStreamID:
values, err := getValuesForQuery(t.q, t.qFieldName, cache, getFieldValuesFunc)
if err != nil {
return nil, fmt.Errorf("cannot obtain unique values for %s: %w", t, err)
}
// convert values to streamID list
streamIDs := make([]streamID, 0, len(values))
for _, v := range values {
var sid streamID
if sid.tryUnmarshalFromString(v) {
streamIDs = append(streamIDs, sid)
}
}
fsNew := &filterStreamID{
streamIDs: streamIDs,
q: t.q,
}
return fsNew, nil
default:
return f, nil
}
}
return copyFilter(f, visitFunc, copyFunc)
}
func getValuesForQuery(q *Query, qFieldName string, cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) ([]string, error) {
qStr := q.String()
values, ok := cache[qStr]
if ok {
return values, nil
}
vs, err := getFieldValuesFunc(q, qFieldName)
if err != nil {
return nil, err
}
cache[qStr] = vs
return vs, nil
}
func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFieldValuesFunc getFieldValuesFunc) ([]pipe, error) {
pipesNew := make([]pipe, len(pipes))
for i, p := range pipes {
pNew, err := p.initFilterInValues(cache, getFieldValuesFunc)
if err != nil {
return nil, err
}
pipesNew[i] = pNew
}
return pipesNew, nil
}
type blockRows struct {
cs []BlockColumn
}
func (brs *blockRows) reset() {
cs := brs.cs
for i := range cs {
cs[i].reset()
}
brs.cs = cs[:0]
}
func getBlockRows() *blockRows {
v := blockRowsPool.Get()
if v == nil {
return &blockRows{}
}
return v.(*blockRows)
}
func putBlockRows(brs *blockRows) {
brs.reset()
blockRowsPool.Put(brs)
}
var blockRowsPool sync.Pool
// BlockColumn is a single column of a block of data
type BlockColumn struct {
// Name is the column name
Name string
// Values is column values
Values []string
}
func (c *BlockColumn) reset() {
c.Name = ""
c.Values = nil
}
// searchResultFunc must process sr.
//
// The callback is called at the worker with the given workerID.
type searchResultFunc func(workerID uint, br *blockResult)
// search searches for the matching rows according to so.
//
// It calls processBlockResult for each matching block.
func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) {
// Spin up workers
var wgWorkers sync.WaitGroup
workCh := make(chan *blockSearchWorkBatch, workersCount)
wgWorkers.Add(workersCount)
for i := 0; i < workersCount; i++ {
go func(workerID uint) {
bs := getBlockSearch()
bm := getBitmap(0)
for bswb := range workCh {
bsws := bswb.bsws
for i := range bsws {
bsw := &bsws[i]
if needStop(stopCh) {
// The search has been canceled. Just skip all the scheduled work in order to save CPU time.
bsw.reset()
continue
}
bs.search(bsw, bm)
if bs.br.rowsLen > 0 {
processBlockResult(workerID, &bs.br)
}
bsw.reset()
}
bswb.bsws = bswb.bsws[:0]
putBlockSearchWorkBatch(bswb)
}
putBlockSearch(bs)
putBitmap(bm)
wgWorkers.Done()
}(uint(i))
}
// Select partitions according to the selected time range
s.partitionsLock.Lock()
ptws := s.partitions
minDay := so.minTimestamp / nsecsPerDay
n := sort.Search(len(ptws), func(i int) bool {
return ptws[i].day >= minDay
})
ptws = ptws[n:]
maxDay := so.maxTimestamp / nsecsPerDay
n = sort.Search(len(ptws), func(i int) bool {
return ptws[i].day > maxDay
})
ptws = ptws[:n]
// Copy the selected partitions, so they don't interfere with s.partitions.
ptws = append([]*partitionWrapper{}, ptws...)
for _, ptw := range ptws {
ptw.incRef()
}
s.partitionsLock.Unlock()
// Obtain common filterStream from f
sf, f := getCommonStreamFilter(so.filter)
// Schedule concurrent search across matching partitions.
psfs := make([]partitionSearchFinalizer, len(ptws))
var wgSearchers sync.WaitGroup
for i, ptw := range ptws {
partitionSearchConcurrencyLimitCh <- struct{}{}
wgSearchers.Add(1)
go func(idx int, pt *partition) {
psfs[idx] = pt.search(sf, f, so, workCh, stopCh)
wgSearchers.Done()
<-partitionSearchConcurrencyLimitCh
}(i, ptw.pt)
}
wgSearchers.Wait()
// Wait until workers finish their work
close(workCh)
wgWorkers.Wait()
// Finalize partition search
for _, psf := range psfs {
psf()
}
// Decrement references to partitions
for _, ptw := range ptws {
ptw.decRef()
}
}
// partitionSearchConcurrencyLimitCh limits the number of concurrent searches in partition.
//
// This is needed for limiting memory usage under high load.
var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
type partitionSearchFinalizer func()
func (pt *partition) search(sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
if needStop(stopCh) {
// Do not spend CPU time on search, since it is already stopped.
return func() {}
}
tenantIDs := so.tenantIDs
var streamIDs []streamID
if sf != nil {
streamIDs = pt.idb.searchStreamIDs(tenantIDs, sf)
if len(so.streamIDs) > 0 {
streamIDs = intersectStreamIDs(streamIDs, so.streamIDs)
}
tenantIDs = nil
} else if len(so.streamIDs) > 0 {
streamIDs = getStreamIDsForTenantIDs(so.streamIDs, tenantIDs)
tenantIDs = nil
}
if hasStreamFilters(f) {
f = initStreamFilters(tenantIDs, pt.idb, f)
}
soInternal := &searchOptions{
tenantIDs: tenantIDs,
streamIDs: streamIDs,
minTimestamp: so.minTimestamp,
maxTimestamp: so.maxTimestamp,
filter: f,
neededColumnNames: so.neededColumnNames,
unneededColumnNames: so.unneededColumnNames,
needAllColumns: so.needAllColumns,
}
return pt.ddb.search(soInternal, workCh, stopCh)
}
func intersectStreamIDs(a, b []streamID) []streamID {
m := make(map[streamID]struct{}, len(b))
for _, streamID := range b {
m[streamID] = struct{}{}
}
result := make([]streamID, 0, len(a))
for _, streamID := range a {
if _, ok := m[streamID]; ok {
result = append(result, streamID)
}
}
return result
}
func getStreamIDsForTenantIDs(streamIDs []streamID, tenantIDs []TenantID) []streamID {
m := make(map[TenantID]struct{}, len(tenantIDs))
for _, tenantID := range tenantIDs {
m[tenantID] = struct{}{}
}
result := make([]streamID, 0, len(streamIDs))
for _, streamID := range streamIDs {
if _, ok := m[streamID.tenantID]; ok {
result = append(result, streamID)
}
}
return result
}
func hasStreamFilters(f filter) bool {
visitFunc := func(f filter) bool {
_, ok := f.(*filterStream)
return ok
}
return visitFilter(f, visitFunc)
}
func initStreamFilters(tenantIDs []TenantID, idb *indexdb, f filter) filter {
visitFunc := func(f filter) bool {
_, ok := f.(*filterStream)
return ok
}
copyFunc := func(f filter) (filter, error) {
fs := f.(*filterStream)
fsNew := &filterStream{
f: fs.f,
tenantIDs: tenantIDs,
idb: idb,
}
return fsNew, nil
}
f, err := copyFilter(f, visitFunc, copyFunc)
if err != nil {
logger.Panicf("BUG: unexpected error: %s", err)
}
return f
}
func (ddb *datadb) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
// Select parts with data for the given time range
ddb.partsLock.Lock()
pws := appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp)
pws = appendPartsInTimeRange(pws, ddb.smallParts, so.minTimestamp, so.maxTimestamp)
pws = appendPartsInTimeRange(pws, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp)
// Increase references to the searched parts, so they aren't deleted during search.
// References to the searched parts must be decremented by calling the returned partitionSearchFinalizer.
for _, pw := range pws {
pw.incRef()
}
ddb.partsLock.Unlock()
// Apply search to matching parts
for _, pw := range pws {
pw.p.search(so, workCh, stopCh)
}
return func() {
for _, pw := range pws {
pw.decRef()
}
}
}
func (p *part) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
bhss := getBlockHeaders()
if len(so.tenantIDs) > 0 {
p.searchByTenantIDs(so, bhss, workCh, stopCh)
} else {
p.searchByStreamIDs(so, bhss, workCh, stopCh)
}
putBlockHeaders(bhss)
}
func getBlockHeaders() *blockHeaders {
v := blockHeadersPool.Get()
if v == nil {
return &blockHeaders{}
}
return v.(*blockHeaders)
}
func putBlockHeaders(bhss *blockHeaders) {
bhss.reset()
blockHeadersPool.Put(bhss)
}
var blockHeadersPool sync.Pool
type blockHeaders struct {
bhs []blockHeader
}
func (bhss *blockHeaders) reset() {
bhs := bhss.bhs
for i := range bhs {
bhs[i].reset()
}
bhss.bhs = bhs[:0]
}
func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
// it is assumed that tenantIDs are sorted
tenantIDs := so.tenantIDs
bswb := getBlockSearchWorkBatch()
scheduleBlockSearch := func(bh *blockHeader) bool {
if bswb.appendBlockSearchWork(p, so, bh) {
return true
}
select {
case <-stopCh:
return false
case workCh <- bswb:
bswb = getBlockSearchWorkBatch()
return true
}
}
// it is assumed that ibhs are sorted
ibhs := p.indexBlockHeaders
for len(ibhs) > 0 && len(tenantIDs) > 0 {
if needStop(stopCh) {
return
}
// locate tenantID equal or bigger than the tenantID in ibhs[0]
tenantID := &tenantIDs[0]
if tenantID.less(&ibhs[0].streamID.tenantID) {
tenantID = &ibhs[0].streamID.tenantID
n := sort.Search(len(tenantIDs), func(i int) bool {
return !tenantIDs[i].less(tenantID)
})
if n == len(tenantIDs) {
tenantIDs = nil
break
}
tenantID = &tenantIDs[n]
tenantIDs = tenantIDs[n:]
}
// locate indexBlockHeader with equal or bigger tenantID than the given tenantID
n := 0
if ibhs[0].streamID.tenantID.less(tenantID) {
n = sort.Search(len(ibhs), func(i int) bool {
return !ibhs[i].streamID.tenantID.less(tenantID)
})
// The end of ibhs[n-1] may contain blocks for the given tenantID, so move it backwards
n--
}
ibh := &ibhs[n]
ibhs = ibhs[n+1:]
if so.minTimestamp > ibh.maxTimestamp || so.maxTimestamp < ibh.minTimestamp {
// Skip the ibh, since it doesn't contain entries on the requested time range
continue
}
bhss.bhs = ibh.mustReadBlockHeaders(bhss.bhs[:0], p)
bhs := bhss.bhs
for len(bhs) > 0 {
// search for blocks with the given tenantID
n = sort.Search(len(bhs), func(i int) bool {
return !bhs[i].streamID.tenantID.less(tenantID)
})
bhs = bhs[n:]
for len(bhs) > 0 && bhs[0].streamID.tenantID.equal(tenantID) {
bh := &bhs[0]
bhs = bhs[1:]
th := &bh.timestampsHeader
if so.minTimestamp > th.maxTimestamp || so.maxTimestamp < th.minTimestamp {
continue
}
if !scheduleBlockSearch(bh) {
return
}
}
if len(bhs) == 0 {
break
}
// search for the next tenantID, which can potentially match tenantID from bhs[0]
tenantID = &bhs[0].streamID.tenantID
n = sort.Search(len(tenantIDs), func(i int) bool {
return !tenantIDs[i].less(tenantID)
})
if n == len(tenantIDs) {
tenantIDs = nil
break
}
tenantID = &tenantIDs[n]
tenantIDs = tenantIDs[n:]
}
}
// Flush the remaining work
select {
case <-stopCh:
case workCh <- bswb:
}
}
func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
// it is assumed that streamIDs are sorted
streamIDs := so.streamIDs
bswb := getBlockSearchWorkBatch()
scheduleBlockSearch := func(bh *blockHeader) bool {
if bswb.appendBlockSearchWork(p, so, bh) {
return true
}
select {
case <-stopCh:
return false
case workCh <- bswb:
bswb = getBlockSearchWorkBatch()
return true
}
}
// it is assumed that ibhs are sorted
ibhs := p.indexBlockHeaders
for len(ibhs) > 0 && len(streamIDs) > 0 {
if needStop(stopCh) {
return
}
// locate streamID equal or bigger than the streamID in ibhs[0]
streamID := &streamIDs[0]
if streamID.less(&ibhs[0].streamID) {
streamID = &ibhs[0].streamID
n := sort.Search(len(streamIDs), func(i int) bool {
return !streamIDs[i].less(streamID)
})
if n == len(streamIDs) {
streamIDs = nil
break
}
streamID = &streamIDs[n]
streamIDs = streamIDs[n:]
}
// locate indexBlockHeader with equal or bigger streamID than the given streamID
n := 0
if ibhs[0].streamID.less(streamID) {
n = sort.Search(len(ibhs), func(i int) bool {
return !ibhs[i].streamID.less(streamID)
})
// The end of ibhs[n-1] may contain blocks for the given streamID, so move it backwards.
n--
}
ibh := &ibhs[n]
ibhs = ibhs[n+1:]
if so.minTimestamp > ibh.maxTimestamp || so.maxTimestamp < ibh.minTimestamp {
// Skip the ibh, since it doesn't contain entries on the requested time range
continue
}
bhss.bhs = ibh.mustReadBlockHeaders(bhss.bhs[:0], p)
bhs := bhss.bhs
for len(bhs) > 0 {
// search for blocks with the given streamID
n = sort.Search(len(bhs), func(i int) bool {
return !bhs[i].streamID.less(streamID)
})
bhs = bhs[n:]
for len(bhs) > 0 && bhs[0].streamID.equal(streamID) {
bh := &bhs[0]
bhs = bhs[1:]
th := &bh.timestampsHeader
if so.minTimestamp > th.maxTimestamp || so.maxTimestamp < th.minTimestamp {
continue
}
if !scheduleBlockSearch(bh) {
return
}
}
if len(bhs) == 0 {
break
}
// search for the next streamID, which can potentially match streamID from bhs[0]
streamID = &bhs[0].streamID
n = sort.Search(len(streamIDs), func(i int) bool {
return !streamIDs[i].less(streamID)
})
if n == len(streamIDs) {
streamIDs = nil
break
}
streamID = &streamIDs[n]
streamIDs = streamIDs[n:]
}
}
// Flush the remaining work
select {
case <-stopCh:
case workCh <- bswb:
}
}
func appendPartsInTimeRange(dst, src []*partWrapper, minTimestamp, maxTimestamp int64) []*partWrapper {
for _, pw := range src {
if maxTimestamp < pw.p.ph.MinTimestamp || minTimestamp > pw.p.ph.MaxTimestamp {
continue
}
dst = append(dst, pw)
}
return dst
}
func getCommonStreamFilter(f filter) (*StreamFilter, filter) {
switch t := f.(type) {
case *filterAnd:
filters := t.filters
for i, filter := range filters {
sf, ok := filter.(*filterStream)
if ok && !sf.f.isEmpty() {
// Remove sf from filters, since it doesn't filter out anything then.
fa := &filterAnd{
filters: append(filters[:i:i], filters[i+1:]...),
}
return sf.f, fa
}
}
case *filterStream:
return t.f, &filterNoop{}
}
return nil, f
}
func forEachStreamField(streams []ValueWithHits, f func(f Field, hits uint64)) {
var fields []Field
for i := range streams {
var err error
fields, err = parseStreamFields(fields[:0], streams[i].Value)
if err != nil {
continue
}
hits := streams[i].Hits
for j := range fields {
f(fields[j], hits)
}
}
}
func parseStreamFields(dst []Field, s string) ([]Field, error) {
if len(s) == 0 || s[0] != '{' {
return dst, fmt.Errorf("missing '{' at the beginning of stream name")
}
s = s[1:]
if len(s) == 0 || s[len(s)-1] != '}' {
return dst, fmt.Errorf("missing '}' at the end of stream name")
}
s = s[:len(s)-1]
if len(s) == 0 {
return dst, nil
}
for {
n := strings.Index(s, `="`)
if n < 0 {
return dst, fmt.Errorf("cannot find field value in double quotes at [%s]", s)
}
name := s[:n]
s = s[n+1:]
value, nOffset := tryUnquoteString(s, "")
if nOffset < 0 {
return dst, fmt.Errorf("cannot find parse field value in double quotes at [%s]", s)
}
s = s[nOffset:]
dst = append(dst, Field{
Name: name,
Value: value,
})
if len(s) == 0 {
return dst, nil
}
if s[0] != ',' {
return dst, fmt.Errorf("missing ',' after %s=%q", name, value)
}
s = s[1:]
}
}