mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
lib/storage: more tuning for tag filters sorting according the time they take
This commit is contained in:
parent
458c89324d
commit
d61f7b7279
@ -1124,8 +1124,8 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct
|
||||
defer wg.Done()
|
||||
tvssLocal := make(map[string]struct{})
|
||||
isLocal := is.db.getIndexSearch(is.deadline)
|
||||
defer is.db.putIndexSearch(isLocal)
|
||||
err := isLocal.searchTagValueSuffixesForDate(tvssLocal, date, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
is.db.putIndexSearch(isLocal)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if errGlobal != nil {
|
||||
@ -2310,7 +2310,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
|
||||
// Slow path - try searching over the whole inverted index.
|
||||
|
||||
// Sort tag filters for faster ts.Seek below.
|
||||
sort.Slice(tfs.tfs, func(i, j int) bool {
|
||||
sort.SliceStable(tfs.tfs, func(i, j int) bool {
|
||||
return tfs.tfs[i].Less(&tfs.tfs[j])
|
||||
})
|
||||
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics)
|
||||
@ -2685,8 +2685,8 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
|
||||
go func(date uint64) {
|
||||
defer wg.Done()
|
||||
isLocal := is.db.getIndexSearch(is.deadline)
|
||||
defer is.db.putIndexSearch(isLocal)
|
||||
m, err := isLocal.getMetricIDsForDate(date, maxMetrics)
|
||||
is.db.putIndexSearch(isLocal)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if errGlobal != nil {
|
||||
@ -2744,8 +2744,8 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
||||
go func(date uint64) {
|
||||
defer wg.Done()
|
||||
isLocal := is.db.getIndexSearch(is.deadline)
|
||||
defer is.db.putIndexSearch(isLocal)
|
||||
m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics)
|
||||
is.db.putIndexSearch(isLocal)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if errGlobal != nil {
|
||||
@ -2778,35 +2778,40 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
||||
|
||||
func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
|
||||
// Sort tfs by the duration from previous queries.
|
||||
// This way we limit the amount of work below by applying more specific filters at first.
|
||||
// This way we limit the amount of work below by applying fast filters at first.
|
||||
type tagFilterWithWeight struct {
|
||||
tf *tagFilter
|
||||
durationSeconds float64
|
||||
lastQueryTimestamp uint64
|
||||
}
|
||||
tfsWithWeight := make([]tagFilterWithWeight, len(tfs.tfs))
|
||||
kb := &is.kb
|
||||
var buf []byte
|
||||
tfws := make([]tagFilterWithWeight, len(tfs.tfs))
|
||||
ct := fasttime.UnixTimestamp()
|
||||
for i := range tfs.tfs {
|
||||
tf := &tfs.tfs[i]
|
||||
kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf)
|
||||
buf = is.db.durationsPerDateTagFilterCache.Get(buf[:0], kb.B)
|
||||
var lastQueryTimestamp uint64
|
||||
// Assume unknwon tag filters may take up to a second for execution.
|
||||
durationSeconds := float64(1)
|
||||
if len(buf) == 16 {
|
||||
lastQueryTimestamp = encoding.UnmarshalUint64(buf)
|
||||
n := encoding.UnmarshalUint64(buf[8:])
|
||||
durationSeconds = math.Float64frombits(n)
|
||||
durationSeconds, lastQueryTimestamp := is.getDurationAndTimestampForDateFilter(date, tf)
|
||||
if ct > lastQueryTimestamp+60 {
|
||||
// It is time to update filter duration stats.
|
||||
if tf.isNegative || tf.isRegexp && len(tf.orSuffixes) == 0 {
|
||||
// Negative and regexp filters usually take the most time, so move them to the end of filters
|
||||
// in the hope they won't be executed at all.
|
||||
if durationSeconds == 0 {
|
||||
durationSeconds = 10
|
||||
}
|
||||
} else {
|
||||
// Reset duration stats for relatively fast {key="value"} and {key=~"foo|bar|baz"} filters, so it is re-populated below.
|
||||
if durationSeconds < 0.5 {
|
||||
durationSeconds = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
tfsWithWeight[i] = tagFilterWithWeight{
|
||||
tfws[i] = tagFilterWithWeight{
|
||||
tf: tf,
|
||||
durationSeconds: durationSeconds,
|
||||
lastQueryTimestamp: lastQueryTimestamp,
|
||||
}
|
||||
}
|
||||
sort.Slice(tfsWithWeight, func(i, j int) bool {
|
||||
a, b := &tfsWithWeight[i], &tfsWithWeight[j]
|
||||
sort.SliceStable(tfws, func(i, j int) bool {
|
||||
a, b := &tfws[i], &tfws[j]
|
||||
if a.durationSeconds != b.durationSeconds {
|
||||
return a.durationSeconds < b.durationSeconds
|
||||
}
|
||||
@ -2817,12 +2822,12 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||
var tfsPostponed []*tagFilter
|
||||
var metricIDs *uint64set.Set
|
||||
maxDateMetrics := maxMetrics * 50
|
||||
tfsRemainingWithWeight := tfsWithWeight[:0]
|
||||
for i := range tfsWithWeight {
|
||||
tfw := tfsWithWeight[i]
|
||||
tfwsRemaining := tfws[:0]
|
||||
for i := range tfws {
|
||||
tfw := tfws[i]
|
||||
tf := tfw.tf
|
||||
if tf.isNegative {
|
||||
tfsRemainingWithWeight = append(tfsRemainingWithWeight, tfw)
|
||||
tfwsRemaining = append(tfwsRemaining, tfw)
|
||||
continue
|
||||
}
|
||||
m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, tfs.commonPrefix, maxDateMetrics)
|
||||
@ -2836,8 +2841,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||
}
|
||||
metricIDs = m
|
||||
i++
|
||||
for i < len(tfsWithWeight) {
|
||||
tfsRemainingWithWeight = append(tfsRemainingWithWeight, tfsWithWeight[i])
|
||||
for i < len(tfws) {
|
||||
tfwsRemaining = append(tfwsRemaining, tfws[i])
|
||||
i++
|
||||
}
|
||||
break
|
||||
@ -2867,8 +2872,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||
// when the intial tag filters significantly reduce the number of found metricIDs,
|
||||
// so the remaining filters could be performed via much faster metricName matching instead
|
||||
// of slow selecting of matching metricIDs.
|
||||
for i := range tfsRemainingWithWeight {
|
||||
tfw := tfsRemainingWithWeight[i]
|
||||
for i := range tfwsRemaining {
|
||||
tfw := tfwsRemaining[i]
|
||||
tf := tfw.tf
|
||||
metricIDsLen := metricIDs.Len()
|
||||
if metricIDsLen == 0 {
|
||||
@ -3069,44 +3074,57 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime
|
||||
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
|
||||
}
|
||||
kb := kbPool.Get()
|
||||
defer kbPool.Put(kb)
|
||||
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||
kb.B = append(kb.B, tf.prefix[len(commonPrefix):]...)
|
||||
|
||||
tfNew := *tf
|
||||
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
|
||||
tfNew.prefix = kb.B
|
||||
startTime := time.Now()
|
||||
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
|
||||
kbPool.Put(kb)
|
||||
currentTimestamp := fasttime.UnixTimestamp()
|
||||
if currentTimestamp == lastQueryTimestamp {
|
||||
if currentTimestamp > lastQueryTimestamp+5 {
|
||||
// The cache already contains quite fresh entry for the current (date, tf).
|
||||
// Do not update it too frequently.
|
||||
return metricIDs, err
|
||||
}
|
||||
// Store the duration for tag filter execution in the cache in order to sort tag filters
|
||||
// in ascending durations on the next search.
|
||||
duration := time.Since(startTime)
|
||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
|
||||
if err != nil {
|
||||
// Set duration to big value, so the given tag filter will be moved to the end
|
||||
// of tag filters on the next search.
|
||||
duration = time.Hour
|
||||
}
|
||||
durationSeconds := duration.Seconds()
|
||||
durationSeconds := time.Since(startTime).Seconds()
|
||||
if metricIDs.Len() >= maxMetrics {
|
||||
// Increase the duration for tag filter matching too many metrics,
|
||||
// So next time it will be applied after filters matching lower number of metrics.
|
||||
durationSeconds *= 2
|
||||
}
|
||||
n := math.Float64bits(durationSeconds)
|
||||
kb.B = encoding.MarshalUint64(kb.B[:0], currentTimestamp)
|
||||
kb.B = encoding.MarshalUint64(kb.B, n)
|
||||
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
||||
is.storeDurationAndTimestampForDateFilter(date, tf, durationSeconds, currentTimestamp)
|
||||
return metricIDs, err
|
||||
}
|
||||
|
||||
func (is *indexSearch) getDurationAndTimestampForDateFilter(date uint64, tf *tagFilter) (float64, uint64) {
|
||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
|
||||
kb := kbPool.Get()
|
||||
defer kbPool.Put(kb)
|
||||
kb.B = is.db.durationsPerDateTagFilterCache.Get(kb.B[:0], is.kb.B)
|
||||
if len(kb.B) != 16 {
|
||||
return 0, 0
|
||||
}
|
||||
n := encoding.UnmarshalUint64(kb.B)
|
||||
durationSeconds := math.Float64frombits(n)
|
||||
timestamp := encoding.UnmarshalUint64(kb.B[8:])
|
||||
return durationSeconds, timestamp
|
||||
}
|
||||
|
||||
func (is *indexSearch) storeDurationAndTimestampForDateFilter(date uint64, tf *tagFilter, durationSeconds float64, timestamp uint64) {
|
||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
|
||||
n := math.Float64bits(durationSeconds)
|
||||
kb := kbPool.Get()
|
||||
kb.B = encoding.MarshalUint64(kb.B[:0], n)
|
||||
kb.B = encoding.MarshalUint64(kb.B, timestamp)
|
||||
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
||||
kbPool.Put(kb)
|
||||
}
|
||||
|
||||
func appendDateTagFilterCacheKey(dst []byte, date uint64, tf *tagFilter) []byte {
|
||||
dst = encoding.MarshalUint64(dst, date)
|
||||
dst = tf.Marshal(dst)
|
||||
|
Loading…
Reference in New Issue
Block a user