lib/storage: create and use lib/uint64set instead of map[uint64]struct{}

This should improve inverted index search performance for filters matching big number of time series,
since `lib/uint64set.Set` is faster than `map[uint64]struct{}` for both `Add` and `Has` calls.
See the corresponding benchmarks in `lib/uint64set`.
This commit is contained in:
Aliaksandr Valialkin 2019-09-24 21:10:22 +03:00
parent 2212d0e421
commit de0e4eee2c
12 changed files with 1044 additions and 191 deletions

1
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/klauspost/compress v1.8.3
github.com/lithammer/go-jump-consistent-hash v1.0.1
github.com/valyala/fastjson v1.4.1
github.com/valyala/fastrand v1.0.0
github.com/valyala/gozstd v1.6.2
github.com/valyala/histogram v1.0.1
github.com/valyala/quicktemplate v1.2.0

View File

@ -371,7 +371,7 @@ func binarySearchKey(items [][]byte, key []byte) int {
i, j := uint(0), n
for i < j {
h := uint(i+j) >> 1
if string(key) > string(items[h]) {
if h >= 0 && h < uint(len(items)) && string(key) > string(items[h]) {
i = h + 1
} else {
j = h

View File

@ -18,6 +18,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
xxhash "github.com/cespare/xxhash/v2"
@ -67,12 +68,12 @@ type indexDB struct {
indexSearchPool sync.Pool
// An inmemory map[uint64]struct{} of deleted metricIDs.
// An inmemory set of deleted metricIDs.
//
// The map holds deleted metricIDs for the current db and for the extDB.
// The set holds deleted metricIDs for the current db and for the extDB.
//
// It is safe to keep the map in memory even for big number of deleted
// metricIDs, since it occupies only 8 bytes per deleted metricID.
// It is safe to keep the set in memory even for big number of deleted
// metricIDs, since it usually requires 1 bit per deleted metricID.
deletedMetricIDs atomic.Value
deletedMetricIDsUpdateLock sync.Mutex
@ -199,7 +200,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.UselessTagFiltersCacheRequests += cs.GetBigCalls
m.UselessTagFiltersCacheMisses += cs.Misses
m.DeletedMetricsCount += uint64(len(db.getDeletedMetricIDs()))
m.DeletedMetricsCount += uint64(db.getDeletedMetricIDs().Len())
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
@ -237,7 +238,7 @@ func (db *indexDB) SetExtDB(extDB *indexDB) {
// Add deleted metricIDs from extDB to db.
if extDB != nil {
dmisExt := extDB.getDeletedMetricIDs()
metricIDs := getSortedMetricIDs(dmisExt)
metricIDs := dmisExt.AppendTo(nil)
db.updateDeletedMetricIDs(metricIDs)
}
@ -894,30 +895,27 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
return deletedCount, nil
}
func (db *indexDB) getDeletedMetricIDs() map[uint64]struct{} {
return db.deletedMetricIDs.Load().(map[uint64]struct{})
func (db *indexDB) getDeletedMetricIDs() *uint64set.Set {
return db.deletedMetricIDs.Load().(*uint64set.Set)
}
func (db *indexDB) setDeletedMetricIDs(dmis map[uint64]struct{}) {
func (db *indexDB) setDeletedMetricIDs(dmis *uint64set.Set) {
db.deletedMetricIDs.Store(dmis)
}
func (db *indexDB) updateDeletedMetricIDs(metricIDs []uint64) {
db.deletedMetricIDsUpdateLock.Lock()
dmisOld := db.getDeletedMetricIDs()
dmisNew := make(map[uint64]struct{}, len(dmisOld)+len(metricIDs))
for metricID := range dmisOld {
dmisNew[metricID] = struct{}{}
}
dmisNew := dmisOld.Clone()
for _, metricID := range metricIDs {
dmisNew[metricID] = struct{}{}
dmisNew.Add(metricID)
}
db.setDeletedMetricIDs(dmisNew)
db.deletedMetricIDsUpdateLock.Unlock()
}
func (is *indexSearch) loadDeletedMetricIDs() (map[uint64]struct{}, error) {
dmis := make(map[uint64]struct{})
func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
dmis := &uint64set.Set{}
ts := &is.ts
kb := &is.kb
kb.B = append(kb.B[:0], nsPrefixDeteletedMetricID)
@ -932,7 +930,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (map[uint64]struct{}, error) {
return nil, fmt.Errorf("unexpected item len; got %d bytes; want %d bytes", len(item), 8)
}
metricID := encoding.UnmarshalUint64(item)
dmis[metricID] = struct{}{}
dmis.Add(metricID)
}
if err := ts.Error(); err != nil {
return nil, err
@ -1024,9 +1022,9 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail)
}
if len(dmis) > 0 {
if dmis.Len() > 0 {
// Verify whether the dst is marked as deleted.
if _, deleted := dmis[dst.MetricID]; deleted {
if dmis.Has(dst.MetricID) {
// The dst is deleted. Continue searching.
continue
}
@ -1193,9 +1191,9 @@ func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, erro
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
// and adds matching metrics to metricIDs.
func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs map[uint64]struct{}, tfs []*tagFilter, accountID, projectID uint32) error {
func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs *uint64set.Set, tfs []*tagFilter, accountID, projectID uint32) error {
// sort srcMetricIDs in order to speed up Seek below.
sortedMetricIDs := getSortedMetricIDs(srcMetricIDs)
sortedMetricIDs := srcMetricIDs.AppendTo(nil)
metricName := kbPool.Get()
defer kbPool.Put(metricName)
@ -1219,12 +1217,12 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
if !ok {
continue
}
metricIDs[metricID] = struct{}{}
metricIDs.Add(metricID)
}
return nil
}
func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) {
func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
// Try fast path with the minimized number of maxMetrics.
maxMetricsAdjusted := is.adjustMaxMetricsAdaptive(tr, maxMetrics)
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetricsAdjusted)
@ -1261,7 +1259,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
if err != nil {
return nil, nil, err
}
if len(metricIDsForTimeRange) <= maxTimeRangeMetrics {
if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics {
return nil, metricIDsForTimeRange, nil
}
@ -1291,7 +1289,7 @@ func (is *indexSearch) adjustMaxMetricsAdaptive(tr TimeRange, maxMetrics int) in
if !hmPrev.isFull {
return maxMetrics
}
hourMetrics := len(hmPrev.m)
hourMetrics := hmPrev.m.Len()
if hourMetrics >= 256 && maxMetrics > hourMetrics/4 {
// It is cheaper to filter on the hour or day metrics if the minimum
// number of matching metrics across tfs exceeds hourMetrics / 4.
@ -1300,7 +1298,7 @@ func (is *indexSearch) adjustMaxMetricsAdaptive(tr TimeRange, maxMetrics int) in
return maxMetrics
}
func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) {
func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
kb := &is.kb
kb.B = append(kb.B[:0], uselessMultiTagFiltersKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
@ -1322,7 +1320,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters
if err != nil {
return nil, nil, err
}
if len(minMetricIDs) < maxAllowedMetrics {
if minMetricIDs.Len() < maxAllowedMetrics {
// Found the tag filter with the minimum number of metrics.
return minTf, minMetricIDs, nil
}
@ -1348,8 +1346,8 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters
var errTooManyMetrics = errors.New("all the tag filters match too many metrics")
func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) {
var minMetricIDs map[uint64]struct{}
func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
var minMetricIDs *uint64set.Set
var minTf *tagFilter
kb := &is.kb
uselessTagFilters := 0
@ -1382,7 +1380,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
}
return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %s", tf, err)
}
if len(metricIDs) >= maxMetrics {
if metricIDs.Len() >= maxMetrics {
// The tf matches at least maxMetrics. Skip it
kb.B = append(kb.B[:0], uselessSingleTagFilterKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
@ -1394,7 +1392,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
minMetricIDs = metricIDs
minTf = tf
maxMetrics = len(minMetricIDs)
maxMetrics = minMetricIDs.Len()
if maxMetrics <= 1 {
// There is no need in inspecting other filters, since minTf
// already matches 0 or 1 metric.
@ -1417,11 +1415,11 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 {
return nil, nil, errTooManyMetrics
}
metricIDs := make(map[uint64]struct{})
metricIDs := &uint64set.Set{}
if err := is.updateMetricIDsAll(metricIDs, tfs.accountID, tfs.projectID, maxMetrics); err != nil {
return nil, nil, err
}
if len(metricIDs) >= maxMetrics {
if metricIDs.Len() >= maxMetrics {
kb.B = append(kb.B[:0], uselessNegativeTagFilterKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
kb.B = tfs.marshal(kb.B)
@ -1492,14 +1490,14 @@ func matchTagFilter(b []byte, tf *tagFilter) (bool, error) {
}
func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
metricIDs := make(map[uint64]struct{})
metricIDs := &uint64set.Set{}
for _, tfs := range tfss {
if len(tfs.tfs) == 0 {
// Return all the metric ids
if err := is.updateMetricIDsAll(metricIDs, tfs.accountID, tfs.projectID, maxMetrics+1); err != nil {
return nil, err
}
if len(metricIDs) > maxMetrics {
if metricIDs.Len() > maxMetrics {
return nil, fmt.Errorf("the number or unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics)
}
// Stop the iteration, since we cannot find more metric ids with the remaining tfss.
@ -1508,23 +1506,23 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil {
return nil, err
}
if len(metricIDs) > maxMetrics {
if metricIDs.Len() > maxMetrics {
return nil, fmt.Errorf("the number or matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics)
}
}
if len(metricIDs) == 0 {
if metricIDs.Len() == 0 {
// Nothing found
return nil, nil
}
sortedMetricIDs := getSortedMetricIDs(metricIDs)
sortedMetricIDs := metricIDs.AppendTo(nil)
// Filter out deleted metricIDs.
dmis := is.db.getDeletedMetricIDs()
if len(dmis) > 0 {
if dmis.Len() > 0 {
metricIDsFiltered := sortedMetricIDs[:0]
for _, metricID := range sortedMetricIDs {
if _, deleted := dmis[metricID]; !deleted {
if !dmis.Has(metricID) {
metricIDsFiltered = append(metricIDsFiltered, metricID)
}
}
@ -1534,7 +1532,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
return sortedMetricIDs, nil
}
func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs map[uint64]struct{}, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
// Sort tag filters for faster ts.Seek below.
sort.Slice(tfs.tfs, func(i, j int) bool { return bytes.Compare(tfs.tfs[i].prefix, tfs.tfs[j].prefix) < 0 })
@ -1577,8 +1575,8 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs map[uint64]struct{
}
minMetricIDs = mIDs
}
for metricID := range minMetricIDs {
metricIDs[metricID] = struct{}{}
for _, metricID := range minMetricIDs.AppendTo(nil) {
metricIDs.Add(metricID)
}
return nil
}
@ -1591,11 +1589,11 @@ const (
var uselessTagFilterCacheValue = []byte("1")
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (map[uint64]struct{}, error) {
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (*uint64set.Set, error) {
if tf.isNegative {
logger.Panicf("BUG: isNegative must be false")
}
metricIDs := make(map[uint64]struct{}, maxMetrics)
metricIDs := &uint64set.Set{}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffxies.
if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil {
@ -1610,8 +1608,8 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
// Slow path - scan for all the rows with the given prefix.
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool {
metricIDs[metricID] = struct{}{}
return len(metricIDs) < maxMetrics
metricIDs.Add(metricID)
return metricIDs.Len() < maxMetrics
})
if err != nil {
if err == errFallbackToMetricNameMatch {
@ -1706,7 +1704,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
return nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs map[uint64]struct{}) error {
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) error {
if tf.isNegative {
logger.Panicf("BUG: isNegative must be false")
}
@ -1719,15 +1717,15 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMe
if err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs); err != nil {
return err
}
if len(metricIDs) >= maxMetrics {
if metricIDs.Len() >= maxMetrics {
return nil
}
}
return nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter map[uint64]struct{}) error {
sortedFilter := getSortedMetricIDs(filter)
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) error {
sortedFilter := filter.AppendTo(nil)
kb := kbPool.Get()
defer kbPool.Put(kb)
for _, orSuffix := range tf.orSuffixes {
@ -1741,14 +1739,14 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, met
return nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs map[uint64]struct{}) error {
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) error {
ts := &is.ts
mp := &is.mp
mp.Reset()
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
loops := 0
ts.Seek(prefix)
for len(metricIDs) < maxMetrics && ts.NextItem() {
for metricIDs.Len() < maxMetrics && ts.NextItem() {
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
return nil
@ -1762,7 +1760,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
metricIDs[metricID] = struct{}{}
metricIDs.Add(metricID)
}
}
if err := ts.Error(); err != nil {
@ -1771,7 +1769,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
return nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs map[uint64]struct{}, sortedFilter []uint64, isNegative bool) error {
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) error {
if len(sortedFilter) == 0 {
return nil
}
@ -1827,9 +1825,9 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
continue
}
if isNegative {
delete(metricIDs, metricID)
metricIDs.Del(metricID)
} else {
metricIDs[metricID] = struct{}{}
metricIDs.Add(metricID)
}
sf = sf[1:]
}
@ -1844,7 +1842,7 @@ var errFallbackToMetricNameMatch = errors.New("fall back to updateMetricIDsByMet
var errMissingMetricIDsForDate = errors.New("missing metricIDs for date")
func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, accountID, projectID uint32) (map[uint64]struct{}, error) {
func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, accountID, projectID uint32) (*uint64set.Set, error) {
if tr.isZero() {
return nil, errMissingMetricIDsForDate
}
@ -1868,7 +1866,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac
// Too much dates must be covered. Give up.
return nil, errMissingMetricIDsForDate
}
metricIDs = make(map[uint64]struct{}, maxMetrics)
metricIDs = &uint64set.Set{}
for minDate <= maxDate {
if err := is.getMetricIDsForDate(minDate, metricIDs, maxMetrics, accountID, projectID); err != nil {
return nil, err
@ -1879,7 +1877,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac
return metricIDs, nil
}
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, accountID, projectID uint32) (map[uint64]struct{}, bool, error) {
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, accountID, projectID uint32) (*uint64set.Set, bool, error) {
metricIDs, ok := is.getMetricIDsForRecentHoursAll(tr, maxMetrics)
if !ok {
return nil, false, nil
@ -1887,7 +1885,7 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
// Filter out metricIDs for non-matching (accountID, projectID).
// Sort metricIDs for faster lookups below.
sortedMetricIDs := getSortedMetricIDs(metricIDs)
sortedMetricIDs := metricIDs.AppendTo(nil)
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID, accountID, projectID)
@ -1901,7 +1899,7 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
break
}
if !bytes.HasPrefix(ts.Item, kb.B) {
delete(metricIDs, metricID)
metricIDs.Del(metricID)
}
}
if err := ts.Error(); err != nil {
@ -1910,7 +1908,7 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
return metricIDs, true, nil
}
func (is *indexSearch) getMetricIDsForRecentHoursAll(tr TimeRange, maxMetrics int) (map[uint64]struct{}, bool) {
func (is *indexSearch) getMetricIDsForRecentHoursAll(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) {
// Return all the metricIDs for all the (AccountID, ProjectID) entries.
// The caller is responsible for proper filtering later.
minHour := uint64(tr.MinTimestamp) / msecPerHour
@ -1920,46 +1918,35 @@ func (is *indexSearch) getMetricIDsForRecentHoursAll(tr TimeRange, maxMetrics in
// The tr fits the current hour.
// Return a copy of hmCurr.m, because the caller may modify
// the returned map.
if len(hmCurr.m) > maxMetrics {
if hmCurr.m.Len() > maxMetrics {
return nil, false
}
return getMetricIDsCopy(hmCurr.m), true
return hmCurr.m.Clone(), true
}
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
// The tr fits the previous hour.
// Return a copy of hmPrev.m, because the caller may modify
// the returned map.
if len(hmPrev.m) > maxMetrics {
if hmPrev.m.Len() > maxMetrics {
return nil, false
}
return getMetricIDsCopy(hmPrev.m), true
return hmPrev.m.Clone(), true
}
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
// The tr spans the previous and the current hours.
if len(hmCurr.m)+len(hmPrev.m) > maxMetrics {
if hmCurr.m.Len()+hmPrev.m.Len() > maxMetrics {
return nil, false
}
metricIDs := make(map[uint64]struct{}, len(hmCurr.m)+len(hmPrev.m))
for metricID := range hmCurr.m {
metricIDs[metricID] = struct{}{}
}
for metricID := range hmPrev.m {
metricIDs[metricID] = struct{}{}
metricIDs := hmCurr.m.Clone()
for _, metricID := range hmPrev.m.AppendTo(nil) {
metricIDs.Add(metricID)
}
return metricIDs, true
}
return nil, false
}
func getMetricIDsCopy(src map[uint64]struct{}) map[uint64]struct{} {
dst := make(map[uint64]struct{}, len(src))
for metricID := range src {
dst[metricID] = struct{}{}
}
return dst
}
func (db *indexDB) storeDateMetricID(date, metricID uint64, accountID, projectID uint32) error {
is := db.getIndexSearch()
ok, err := is.hasDateMetricID(date, metricID, accountID, projectID)
@ -2001,14 +1988,14 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64, accountID, project
return true, nil
}
func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs map[uint64]struct{}, maxMetrics int, accountID, projectID uint32) error {
func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs *uint64set.Set, maxMetrics int, accountID, projectID uint32) error {
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID, accountID, projectID)
kb.B = encoding.MarshalUint64(kb.B, date)
ts.Seek(kb.B)
items := 0
for len(metricIDs) < maxMetrics && ts.NextItem() {
for metricIDs.Len() < maxMetrics && ts.NextItem() {
if !bytes.HasPrefix(ts.Item, kb.B) {
break
}
@ -2018,7 +2005,7 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, metricIDs map[uint64]str
return fmt.Errorf("cannot extract metricID from k; want %d bytes; got %d bytes", 8, len(v))
}
metricID := encoding.UnmarshalUint64(v)
metricIDs[metricID] = struct{}{}
metricIDs.Add(metricID)
items++
}
if err := ts.Error(); err != nil {
@ -2054,7 +2041,7 @@ func (is *indexSearch) containsTimeRange(tr TimeRange, accountID, projectID uint
return true, nil
}
func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, accountID, projectID uint32, maxMetrics int) error {
func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, accountID, projectID uint32, maxMetrics int) error {
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID, accountID, projectID)
@ -2070,8 +2057,8 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, account
return fmt.Errorf("cannot unmarshal metricID from item with size %d; need at least 9 bytes; item=%q", len(tail), tail)
}
metricID := encoding.UnmarshalUint64(tail)
metricIDs[metricID] = struct{}{}
if len(metricIDs) >= maxMetrics {
metricIDs.Add(metricID)
if metricIDs.Len() >= maxMetrics {
return nil
}
}
@ -2086,13 +2073,13 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs map[uint64]struct{}, account
// over the found metrics.
const maxIndexScanLoopsPerMetric = 400
func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map[uint64]struct{}) (map[uint64]struct{}, error) {
if len(filter) == 0 {
func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) {
if filter.Len() == 0 {
return nil, nil
}
metricIDs := filter
if !tf.isNegative {
metricIDs = make(map[uint64]struct{}, len(filter))
metricIDs = &uint64set.Set{}
}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
@ -2106,15 +2093,15 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter map
}
// Slow path - scan for all the rows with the given prefix.
maxLoops := len(filter) * maxIndexScanLoopsPerMetric
maxLoops := filter.Len() * maxIndexScanLoopsPerMetric
err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, func(metricID uint64) bool {
if tf.isNegative {
// filter must be equal to metricIDs
delete(metricIDs, metricID)
metricIDs.Del(metricID)
return true
}
if _, ok := filter[metricID]; ok {
metricIDs[metricID] = struct{}{}
if filter.Has(metricID) {
metricIDs.Add(metricID)
}
return true
})
@ -2159,18 +2146,6 @@ func unmarshalCommonPrefix(src []byte) ([]byte, byte, uint32, uint32, error) {
// 1 byte for prefix, 4 bytes for accountID, 4 bytes for projectID
const commonPrefixLen = 9
func getSortedMetricIDs(m map[uint64]struct{}) []uint64 {
a := make(uint64Sorter, len(m))
i := 0
for metricID := range m {
a[i] = metricID
i++
}
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations
sort.Sort(a)
return a
}
type tagToMetricIDsRowParser struct {
// AccountID contains parsed value after Init call
AccountID uint32
@ -2287,13 +2262,13 @@ func (mp *tagToMetricIDsRowParser) ParseMetricIDs() {
// IsDeletedTag verifies whether the tag from mp is deleted according to dmis.
//
// dmis must contain deleted MetricIDs.
func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis map[uint64]struct{}) bool {
if len(dmis) == 0 {
func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis *uint64set.Set) bool {
if dmis.Len() == 0 {
return false
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
if _, ok := dmis[metricID]; !ok {
if !dmis.Has(metricID) {
return false
}
}

View File

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
// mergeBlockStreams merges bsrs into bsw and updates ph.
@ -14,7 +15,7 @@ import (
//
// rowsMerged is atomically updated with the number of merged rows during the merge.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, rowsMerged *uint64,
deletedMetricIDs map[uint64]struct{}, rowsDeleted *uint64) error {
deletedMetricIDs *uint64set.Set, rowsDeleted *uint64) error {
ph.Reset()
bsm := bsmPool.Get().(*blockStreamMerger)
@ -41,7 +42,7 @@ var bsmPool = &sync.Pool{
var errForciblyStopped = fmt.Errorf("forcibly stopped")
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, rowsMerged *uint64,
deletedMetricIDs map[uint64]struct{}, rowsDeleted *uint64) error {
deletedMetricIDs *uint64set.Set, rowsDeleted *uint64) error {
// Search for the first block to merge
var pendingBlock *Block
for bsm.NextBlock() {
@ -50,7 +51,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
return errForciblyStopped
default:
}
if _, deleted := deletedMetricIDs[bsm.Block.bh.TSID.MetricID]; deleted {
if deletedMetricIDs.Has(bsm.Block.bh.TSID.MetricID) {
// Skip blocks for deleted metrics.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
@ -72,7 +73,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
return errForciblyStopped
default:
}
if _, deleted := deletedMetricIDs[bsm.Block.bh.TSID.MetricID]; deleted {
if deletedMetricIDs.Has(bsm.Block.bh.TSID.MetricID) {
// Skip blocks for deleted metrics.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue

View File

@ -19,6 +19,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
func maxRowsPerSmallPart() uint64 {
@ -93,7 +94,7 @@ type partition struct {
bigPartsPath string
// The callack that returns deleted metric ids which must be skipped during merge.
getDeletedMetricIDs func() map[uint64]struct{}
getDeletedMetricIDs func() *uint64set.Set
// Name is the name of the partition in the form YYYY_MM.
name string
@ -183,7 +184,7 @@ func (pw *partWrapper) decRef() {
// createPartition creates new partition for the given timestamp and the given paths
// to small and big partitions.
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() map[uint64]struct{}) (*partition, error) {
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) {
name := timestampToPartitionName(timestamp)
smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
@ -218,7 +219,7 @@ func (pt *partition) Drop() {
}
// openPartition opens the existing partition from the given paths.
func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() map[uint64]struct{}) (*partition, error) {
func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) {
smallPartsPath = filepath.Clean(smallPartsPath)
bigPartsPath = filepath.Clean(bigPartsPath)
@ -255,7 +256,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
return pt, nil
}
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() map[uint64]struct{}) *partition {
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition {
return &partition{
name: name,
smallPartsPath: smallPartsPath,

View File

@ -7,6 +7,8 @@ import (
"sort"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
func TestPartitionSearch(t *testing.T) {
@ -284,6 +286,6 @@ func testPartitionSearchSerial(pt *partition, tsids []TSID, tr TimeRange, rbsExp
return nil
}
func nilGetDeletedMetricIDs() map[uint64]struct{} {
func nilGetDeletedMetricIDs() *uint64set.Set {
return nil
}

View File

@ -20,6 +20,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
)
@ -59,7 +60,7 @@ type Storage struct {
// Pending MetricID values to be added to currHourMetricIDs.
pendingHourMetricIDsLock sync.Mutex
pendingHourMetricIDs map[uint64]struct{}
pendingHourMetricIDs *uint64set.Set
stop chan struct{}
@ -122,7 +123,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids")
s.currHourMetricIDs.Store(hmCurr)
s.prevHourMetricIDs.Store(hmPrev)
s.pendingHourMetricIDs = make(map[uint64]struct{})
s.pendingHourMetricIDs = &uint64set.Set{}
// Load indexdb
idbPath := path + "/indexdb"
@ -158,7 +159,7 @@ func (s *Storage) debugFlush() {
s.idb().tb.DebugFlush()
}
func (s *Storage) getDeletedMetricIDs() map[uint64]struct{} {
func (s *Storage) getDeletedMetricIDs() *uint64set.Set {
return s.idb().getDeletedMetricIDs()
}
@ -364,9 +365,9 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
hourMetricIDsLen := len(hmPrev.m)
if len(hmCurr.m) > hourMetricIDsLen {
hourMetricIDsLen = len(hmCurr.m)
hourMetricIDsLen := hmPrev.m.Len()
if hmCurr.m.Len() > hourMetricIDsLen {
hourMetricIDsLen = hmCurr.m.Len()
}
m.HourMetricIDCacheSize += uint64(hourMetricIDsLen)
@ -508,11 +509,11 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen)
return &hourMetricIDs{}
}
m := make(map[uint64]struct{}, hmLen)
m := &uint64set.Set{}
for i := uint64(0); i < hmLen; i++ {
metricID := encoding.UnmarshalUint64(src)
src = src[8:]
m[metricID] = struct{}{}
m.Add(metricID)
}
logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen)
return &hourMetricIDs{
@ -526,21 +527,21 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) {
path := s.cachePath + "/" + name
logger.Infof("saving %s to %q...", name, path)
startTime := time.Now()
dst := make([]byte, 0, len(hm.m)*8+24)
dst := make([]byte, 0, hm.m.Len()*8+24)
isFull := uint64(0)
if hm.isFull {
isFull = 1
}
dst = encoding.MarshalUint64(dst, isFull)
dst = encoding.MarshalUint64(dst, hm.hour)
dst = encoding.MarshalUint64(dst, uint64(len(hm.m)))
for metricID := range hm.m {
dst = encoding.MarshalUint64(dst, uint64(hm.m.Len()))
for _, metricID := range hm.m.AppendTo(nil) {
dst = encoding.MarshalUint64(dst, metricID)
}
if err := ioutil.WriteFile(path, dst, 0644); err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err)
}
logger.Infof("saved %s to %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), len(hm.m), len(dst))
logger.Infof("saved %s to %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hm.m.Len(), len(dst))
}
func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcache.Cache {
@ -814,11 +815,11 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
r.Value = mr.Value
r.PrecisionBits = precisionBits
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
if len(dmis) == 0 {
if dmis.Len() == 0 {
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
continue
}
if _, deleted := dmis[r.TSID.MetricID]; !deleted {
if !dmis.Has(r.TSID.MetricID) {
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
continue
}
@ -888,12 +889,12 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hour == hm.hour {
// The r belongs to the current hour. Check for the current hour cache.
if _, ok := hm.m[metricID]; ok {
if hm.m.Has(metricID) {
// Fast path: the metricID is in the current hour cache.
continue
}
s.pendingHourMetricIDsLock.Lock()
s.pendingHourMetricIDs[metricID] = struct{}{}
s.pendingHourMetricIDs.Add(metricID)
s.pendingHourMetricIDsLock.Unlock()
}
@ -919,7 +920,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
func (s *Storage) updateCurrHourMetricIDs() {
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.pendingHourMetricIDsLock.Lock()
newMetricIDsLen := len(s.pendingHourMetricIDs)
newMetricIDsLen := s.pendingHourMetricIDs.Len()
s.pendingHourMetricIDsLock.Unlock()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
if newMetricIDsLen == 0 && hm.hour == hour {
@ -928,23 +929,20 @@ func (s *Storage) updateCurrHourMetricIDs() {
}
// Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs.
var m map[uint64]struct{}
var m *uint64set.Set
isFull := hm.isFull
if hm.hour == hour {
m = make(map[uint64]struct{}, len(hm.m)+newMetricIDsLen)
for metricID := range hm.m {
m[metricID] = struct{}{}
}
m = hm.m.Clone()
} else {
m = make(map[uint64]struct{}, newMetricIDsLen)
m = &uint64set.Set{}
isFull = true
}
s.pendingHourMetricIDsLock.Lock()
newMetricIDs := s.pendingHourMetricIDs
s.pendingHourMetricIDs = make(map[uint64]struct{}, len(newMetricIDs))
newMetricIDs := s.pendingHourMetricIDs.AppendTo(nil)
s.pendingHourMetricIDs = &uint64set.Set{}
s.pendingHourMetricIDsLock.Unlock()
for metricID := range newMetricIDs {
m[metricID] = struct{}{}
for _, metricID := range newMetricIDs {
m.Add(metricID)
}
hmNew := &hourMetricIDs{
@ -959,7 +957,7 @@ func (s *Storage) updateCurrHourMetricIDs() {
}
type hourMetricIDs struct {
m map[uint64]struct{}
m *uint64set.Set
hour uint64
isFull bool
}

View File

@ -9,6 +9,8 @@ import (
"testing"
"testing/quick"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
func TestUpdateCurrHourMetricIDs(t *testing.T) {
@ -16,19 +18,18 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
var s Storage
s.currHourMetricIDs.Store(&hourMetricIDs{})
s.prevHourMetricIDs.Store(&hourMetricIDs{})
s.pendingHourMetricIDs = make(map[uint64]struct{})
s.pendingHourMetricIDs = &uint64set.Set{}
return &s
}
t.Run("empty_pedning_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: map[uint64]struct{}{
12: {},
34: {},
},
m: &uint64set.Set{},
hour: 123,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
@ -39,8 +40,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
}
if len(hmCurr.m) != 0 {
t.Fatalf("unexpected length of hm.m; got %d; want %d", len(hmCurr.m), 0)
if hmCurr.m.Len() != 0 {
t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0)
}
if !hmCurr.isFull {
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true)
@ -51,20 +52,19 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
if len(s.pendingHourMetricIDs) != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0)
if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
}
})
t.Run("empty_pedning_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: map[uint64]struct{}{
12: {},
34: {},
},
m: &uint64set.Set{},
hour: hour,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
@ -90,27 +90,25 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if len(s.pendingHourMetricIDs) != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0)
if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
pendingHourMetricIDs := map[uint64]struct{}{
343: {},
32424: {},
8293432: {},
}
pendingHourMetricIDs := &uint64set.Set{}
pendingHourMetricIDs.Add(343)
pendingHourMetricIDs.Add(32424)
pendingHourMetricIDs.Add(8293432)
s.pendingHourMetricIDs = pendingHourMetricIDs
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: map[uint64]struct{}{
12: {},
34: {},
},
m: &uint64set.Set{},
hour: 123,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
@ -133,27 +131,25 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
if len(s.pendingHourMetricIDs) != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0)
if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
pendingHourMetricIDs := map[uint64]struct{}{
343: {},
32424: {},
8293432: {},
}
pendingHourMetricIDs := &uint64set.Set{}
pendingHourMetricIDs.Add(343)
pendingHourMetricIDs.Add(32424)
pendingHourMetricIDs.Add(8293432)
s.pendingHourMetricIDs = pendingHourMetricIDs
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmOrig := &hourMetricIDs{
m: map[uint64]struct{}{
12: {},
34: {},
},
m: &uint64set.Set{},
hour: hour,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
@ -166,9 +162,10 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
// Do not run other checks, since they may fail.
return
}
m := getMetricIDsCopy(pendingHourMetricIDs)
for metricID := range hmOrig.m {
m[metricID] = struct{}{}
m := pendingHourMetricIDs.Clone()
origMetricIDs := hmOrig.m.AppendTo(nil)
for _, metricID := range origMetricIDs {
m.Add(metricID)
}
if !reflect.DeepEqual(hmCurr.m, m) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
@ -183,8 +180,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if len(s.pendingHourMetricIDs) != 0 {
t.Fatalf("unexpected len(s.pendingHourMetricIDs); got %d; want %d", len(s.pendingHourMetricIDs), 0)
if s.pendingHourMetricIDs.Len() != 0 {
t.Fatalf("unexpected s.pendingHourMetricIDs.Len(); got %d; want %d", s.pendingHourMetricIDs.Len(), 0)
}
})
}

View File

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
// table represents a single table with time series data.
@ -18,7 +19,7 @@ type table struct {
smallPartitionsPath string
bigPartitionsPath string
getDeletedMetricIDs func() map[uint64]struct{}
getDeletedMetricIDs func() *uint64set.Set
ptws []*partitionWrapper
ptwsLock sync.Mutex
@ -75,7 +76,7 @@ func (ptw *partitionWrapper) scheduleToDrop() {
// The table is created if it doesn't exist.
//
// Data older than the retentionMonths may be dropped at any time.
func openTable(path string, retentionMonths int, getDeletedMetricIDs func() map[uint64]struct{}) (*table, error) {
func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uint64set.Set) (*table, error) {
path = filepath.Clean(path)
// Create a directory for the table if it doesn't exist yet.
@ -430,7 +431,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
}
}
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() map[uint64]struct{}) ([]*partition, error) {
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) ([]*partition, error) {
smallD, err := os.Open(smallPartitionsPath)
if err != nil {
return nil, fmt.Errorf("cannot open directory with small partitions %q: %s", smallPartitionsPath, err)

332
lib/uint64set/uint64set.go Normal file
View File

@ -0,0 +1,332 @@
package uint64set
import (
"sort"
)
// Set is a fast set for uint64.
//
// It should work faster than map[uint64]struct{} for semi-sparse uint64 values
// such as MetricIDs generated by lib/storage.
//
// It is unsafe calling Set methods from concurrent goroutines.
type Set struct {
itemsCount int
buckets bucket32Sorter
}
type bucket32Sorter []*bucket32
func (s *bucket32Sorter) Len() int { return len(*s) }
func (s *bucket32Sorter) Less(i, j int) bool {
a := *s
return a[i].hi < a[j].hi
}
func (s *bucket32Sorter) Swap(i, j int) {
a := *s
a[i], a[j] = a[j], a[i]
}
// Clone returns an independent copy of s.
func (s *Set) Clone() *Set {
if s == nil {
return nil
}
var dst Set
dst.itemsCount = s.itemsCount
dst.buckets = make([]*bucket32, len(s.buckets))
for i, b32 := range s.buckets {
dst.buckets[i] = b32.clone()
}
return &dst
}
// Len returns the number of distinct uint64 values in s.
func (s *Set) Len() int {
if s == nil {
return 0
}
return s.itemsCount
}
// Add adds x to s.
func (s *Set) Add(x uint64) {
hi := uint32(x >> 32)
lo := uint32(x)
for _, b32 := range s.buckets {
if b32.hi == hi {
if b32.add(lo) {
s.itemsCount++
}
return
}
}
s.addAlloc(hi, lo)
}
func (s *Set) addAlloc(hi, lo uint32) {
var b32 bucket32
b32.hi = hi
_ = b32.add(lo)
s.itemsCount++
s.buckets = append(s.buckets, &b32)
}
// Has verifies whether x exists in s.
func (s *Set) Has(x uint64) bool {
hi := uint32(x >> 32)
lo := uint32(x)
if s == nil {
return false
}
for _, b32 := range s.buckets {
if b32.hi == hi {
return b32.has(lo)
}
}
return false
}
// Del deletes x from s.
func (s *Set) Del(x uint64) {
hi := uint32(x >> 32)
lo := uint32(x)
for _, b32 := range s.buckets {
if b32.hi == hi {
if b32.del(lo) {
s.itemsCount--
}
return
}
}
}
// AppendTo appends all the items from the set to dst and returns the result.
//
// The returned items are sorted.
func (s *Set) AppendTo(dst []uint64) []uint64 {
if s == nil {
return dst
}
// pre-allocate memory for dst
dstLen := len(dst)
if n := s.Len() - cap(dst) + dstLen; n > 0 {
dst = append(dst[:cap(dst)], make([]uint64, n)...)
dst = dst[:dstLen]
}
// sort s.buckets if it isn't sorted yet
if !sort.IsSorted(&s.buckets) {
sort.Sort(&s.buckets)
}
for _, b32 := range s.buckets {
dst = b32.appendTo(dst)
}
return dst
}
type bucket32 struct {
hi uint32
b16his []uint16
buckets []*bucket16
}
func (b *bucket32) clone() *bucket32 {
var dst bucket32
dst.hi = b.hi
dst.b16his = append(dst.b16his[:0], b.b16his...)
dst.buckets = make([]*bucket16, len(b.buckets))
for i, b16 := range b.buckets {
dst.buckets[i] = b16.clone()
}
return &dst
}
// This is for sort.Interface
func (b *bucket32) Len() int { return len(b.b16his) }
func (b *bucket32) Less(i, j int) bool { return b.b16his[i] < b.b16his[j] }
func (b *bucket32) Swap(i, j int) {
his := b.b16his
buckets := b.buckets
his[i], his[j] = his[j], his[i]
buckets[i], buckets[j] = buckets[j], buckets[i]
}
const maxUnsortedBuckets = 32
func (b *bucket32) add(x uint32) bool {
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
return b.addSlow(hi, lo)
}
for i, hi16 := range b.b16his {
if hi16 == hi {
return i < len(b.buckets) && b.buckets[i].add(lo)
}
}
b.addAllocSmall(hi, lo)
return true
}
func (b *bucket32) addAllocSmall(hi, lo uint16) {
var b16 bucket16
_ = b16.add(lo)
b.b16his = append(b.b16his, hi)
b.buckets = append(b.buckets, &b16)
if len(b.buckets) > maxUnsortedBuckets {
sort.Sort(b)
}
}
func (b *bucket32) addSlow(hi, lo uint16) bool {
n := binarySearch16(b.b16his, hi)
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
b.addAllocBig(hi, lo, n)
return true
}
return n < len(b.buckets) && b.buckets[n].add(lo)
}
func (b *bucket32) addAllocBig(hi, lo uint16, n int) {
if n < 0 {
return
}
var b16 bucket16
_ = b16.add(lo)
if n >= len(b.b16his) {
b.b16his = append(b.b16his, hi)
b.buckets = append(b.buckets, &b16)
return
}
b.b16his = append(b.b16his[:n+1], b.b16his[n:]...)
b.b16his[n] = hi
b.buckets = append(b.buckets[:n+1], b.buckets[n:]...)
b.buckets[n] = &b16
}
func (b *bucket32) has(x uint32) bool {
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
return b.hasSlow(hi, lo)
}
for i, hi16 := range b.b16his {
if hi16 == hi {
return i < len(b.buckets) && b.buckets[i].has(lo)
}
}
return false
}
func (b *bucket32) hasSlow(hi, lo uint16) bool {
n := binarySearch16(b.b16his, hi)
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
return false
}
return n < len(b.buckets) && b.buckets[n].has(lo)
}
func (b *bucket32) del(x uint32) bool {
hi := uint16(x >> 16)
lo := uint16(x)
if len(b.buckets) > maxUnsortedBuckets {
return b.delSlow(hi, lo)
}
for i, hi16 := range b.b16his {
if hi16 == hi {
return i < len(b.buckets) && b.buckets[i].del(lo)
}
}
return false
}
func (b *bucket32) delSlow(hi, lo uint16) bool {
n := binarySearch16(b.b16his, hi)
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
return false
}
return n < len(b.buckets) && b.buckets[n].del(lo)
}
func (b *bucket32) appendTo(dst []uint64) []uint64 {
if len(b.buckets) <= maxUnsortedBuckets && !sort.IsSorted(b) {
sort.Sort(b)
}
for i, b16 := range b.buckets {
hi16 := b.b16his[i]
dst = b16.appendTo(dst, b.hi, hi16)
}
return dst
}
const (
bitsPerBucket = 1 << 16
wordsPerBucket = bitsPerBucket / 64
)
type bucket16 struct {
bits [wordsPerBucket]uint64
}
func (b *bucket16) clone() *bucket16 {
var dst bucket16
copy(dst.bits[:], b.bits[:])
return &dst
}
func (b *bucket16) add(x uint16) bool {
wordNum, bitMask := getWordNumBitMask(x)
word := &b.bits[wordNum]
ok := *word&bitMask == 0
*word |= bitMask
return ok
}
func (b *bucket16) has(x uint16) bool {
wordNum, bitMask := getWordNumBitMask(x)
return b.bits[wordNum]&bitMask != 0
}
func (b *bucket16) del(x uint16) bool {
wordNum, bitMask := getWordNumBitMask(x)
word := &b.bits[wordNum]
ok := *word&bitMask != 0
*word &^= bitMask
return ok
}
func (b *bucket16) appendTo(dst []uint64, hi uint32, hi16 uint16) []uint64 {
hi64 := uint64(hi)<<32 | uint64(hi16)<<16
var wordNum uint64
for _, word := range b.bits {
for bitNum := uint64(0); bitNum < 64; bitNum++ {
if word&(uint64(1)<<bitNum) != 0 {
x := hi64 | uint64(wordNum)*64 | bitNum
dst = append(dst, x)
}
}
wordNum++
}
return dst
}
func getWordNumBitMask(x uint16) (uint16, uint64) {
wordNum := x / 64
bitMask := uint64(1) << (x & 63)
return wordNum, bitMask
}
func binarySearch16(u16 []uint16, x uint16) int {
// The code has been adapted from sort.Search.
n := len(u16)
i, j := 0, n
for i < j {
h := int(uint(i+j) >> 1)
if h >= 0 && h < len(u16) && u16[h] < x {
i = h + 1
} else {
j = h
}
}
return i
}

View File

@ -0,0 +1,224 @@
package uint64set
import (
"fmt"
"math/rand"
"sort"
"testing"
"time"
)
func TestSetBasicOps(t *testing.T) {
for _, itemsCount := range []int{1e2, 1e3, 1e4, 1e5, 1e6, maxUnsortedBuckets * bitsPerBucket * 2} {
t.Run(fmt.Sprintf("items_%d", itemsCount), func(t *testing.T) {
testSetBasicOps(t, itemsCount)
})
}
}
func testSetBasicOps(t *testing.T, itemsCount int) {
var s Set
offset := uint64(time.Now().UnixNano())
// Verify forward Add
for i := 0; i < itemsCount/2; i++ {
s.Add(uint64(i) + offset)
}
if n := s.Len(); n != itemsCount/2 {
t.Fatalf("unexpected s.Len() after forward Add; got %d; want %d", n, itemsCount/2)
}
// Verify backward Add
for i := 0; i < itemsCount/2; i++ {
s.Add(uint64(itemsCount-i-1) + offset)
}
if n := s.Len(); n != itemsCount {
t.Fatalf("unexpected s.Len() after backward Add; got %d; want %d", n, itemsCount)
}
// Verify repeated Add
for i := 0; i < itemsCount/2; i++ {
s.Add(uint64(i) + offset)
}
if n := s.Len(); n != itemsCount {
t.Fatalf("unexpected s.Len() after repeated Add; got %d; want %d", n, itemsCount)
}
// Verify Has on existing bits
for i := 0; i < itemsCount; i++ {
if !s.Has(uint64(i) + offset) {
t.Fatalf("missing bit %d", i)
}
}
// Verify Has on missing bits
for i := itemsCount; i < 2*itemsCount; i++ {
if s.Has(uint64(i) + offset) {
t.Fatalf("unexpected bit found: %d", i)
}
}
// Verify Clone
sCopy := s.Clone()
if n := sCopy.Len(); n != itemsCount {
t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, itemsCount)
}
for i := 0; i < itemsCount; i++ {
if !sCopy.Has(uint64(i) + offset) {
t.Fatalf("missing bit %d on sCopy", i)
}
}
// Verify AppendTo
a := s.AppendTo(nil)
if len(a) != itemsCount {
t.Fatalf("unexpected len of exported array; got %d; want %d; array:\n%d", len(a), itemsCount, a)
}
if !sort.SliceIsSorted(a, func(i, j int) bool { return a[i] < a[j] }) {
t.Fatalf("unsorted result returned from AppendTo: %d", a)
}
m := make(map[uint64]bool)
for _, x := range a {
m[x] = true
}
for i := 0; i < itemsCount; i++ {
if !m[uint64(i)+offset] {
t.Fatalf("missing bit %d in the exported bits; array:\n%d", i, a)
}
}
// Verify Del
for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ {
s.Del(uint64(i) + offset)
}
if n := s.Len(); n != itemsCount-itemsCount/4 {
t.Fatalf("unexpected s.Len() after Del; got %d; want %d", n, itemsCount-itemsCount/4)
}
a = s.AppendTo(a[:0])
if len(a) != itemsCount-itemsCount/4 {
t.Fatalf("unexpected len of exported array; got %d; want %d", len(a), itemsCount-itemsCount/4)
}
m = make(map[uint64]bool)
for _, x := range a {
m[x] = true
}
for i := 0; i < itemsCount; i++ {
if i >= itemsCount/2 && i < itemsCount-itemsCount/4 {
if m[uint64(i)+offset] {
t.Fatalf("unexpected bit found after deleting: %d", i)
}
} else {
if !m[uint64(i)+offset] {
t.Fatalf("missing bit %d in the exported bits after deleting", i)
}
}
}
// Try Del for non-existing items
for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ {
s.Del(uint64(i) + offset)
s.Del(uint64(i) + offset)
s.Del(uint64(i) + offset + uint64(itemsCount))
}
if n := s.Len(); n != itemsCount-itemsCount/4 {
t.Fatalf("unexpected s.Len() after Del for non-existing items; got %d; want %d", n, itemsCount-itemsCount/4)
}
// Verify sCopy has the original data
if n := sCopy.Len(); n != itemsCount {
t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, itemsCount)
}
for i := 0; i < itemsCount; i++ {
if !sCopy.Has(uint64(i) + offset) {
t.Fatalf("missing bit %d on sCopy", i)
}
}
}
func TestSetSparseItems(t *testing.T) {
for _, itemsCount := range []int{1e2, 1e3, 1e4} {
t.Run(fmt.Sprintf("items_%d", itemsCount), func(t *testing.T) {
testSetSparseItems(t, itemsCount)
})
}
}
func testSetSparseItems(t *testing.T, itemsCount int) {
var s Set
m := make(map[uint64]bool)
for i := 0; i < itemsCount; i++ {
x := rand.Uint64()
s.Add(x)
m[x] = true
}
if n := s.Len(); n != len(m) {
t.Fatalf("unexpected Len(); got %d; want %d", n, len(m))
}
// Check Has
for x := range m {
if !s.Has(x) {
t.Fatalf("missing item %d", x)
}
}
for i := 0; i < itemsCount; i++ {
x := uint64(i)
if m[x] {
continue
}
if s.Has(x) {
t.Fatalf("unexpected item found %d", x)
}
}
// Check Clone
sCopy := s.Clone()
if n := sCopy.Len(); n != len(m) {
t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, len(m))
}
for x := range m {
if !sCopy.Has(x) {
t.Fatalf("missing item %d on sCopy", x)
}
}
// Check AppendTo
a := s.AppendTo(nil)
if len(a) != len(m) {
t.Fatalf("unexpected len for AppendTo result; got %d; want %d", len(a), len(m))
}
if !sort.SliceIsSorted(a, func(i, j int) bool { return a[i] < a[j] }) {
t.Fatalf("unsorted result returned from AppendTo: %d", a)
}
for _, x := range a {
if !m[x] {
t.Fatalf("unexpected item found in AppendTo result: %d", x)
}
}
// Check Del
for x := range m {
s.Del(x)
s.Del(x)
s.Del(x + 1)
s.Del(x - 1)
}
if n := s.Len(); n != 0 {
t.Fatalf("unexpected number of items left after Del; got %d; want 0", n)
}
a = s.AppendTo(a[:0])
if len(a) != 0 {
t.Fatalf("unexpected number of items returned from AppendTo after Del; got %d; want 0; items\n%d", len(a), a)
}
// Check items in sCopy
if n := sCopy.Len(); n != len(m) {
t.Fatalf("unexpected sCopy.Len() after Del; got %d; want %d", n, len(m))
}
for x := range m {
if !sCopy.Has(x) {
t.Fatalf("missing item %d on sCopy after Del", x)
}
}
}

View File

@ -0,0 +1,321 @@
package uint64set
import (
"fmt"
"testing"
"time"
"github.com/valyala/fastrand"
)
func BenchmarkSetAddRandomLastBits(b *testing.B) {
const itemsCount = 1e5
for _, lastBits := range []uint64{20, 24, 28, 32} {
mask := (uint64(1) << lastBits) - 1
b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
start := uint64(time.Now().UnixNano())
var s Set
var rng fastrand.RNG
for i := 0; i < itemsCount; i++ {
n := start | (uint64(rng.Uint32()) & mask)
s.Add(n)
}
}
})
})
}
}
func BenchmarkMapAddRandomLastBits(b *testing.B) {
const itemsCount = 1e5
for _, lastBits := range []uint64{20, 24, 28, 32} {
mask := (uint64(1) << lastBits) - 1
b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
start := uint64(time.Now().UnixNano())
m := make(map[uint64]struct{})
var rng fastrand.RNG
for i := 0; i < itemsCount; i++ {
n := start | (uint64(rng.Uint32()) & mask)
m[n] = struct{}{}
}
}
})
})
}
}
func BenchmarkSetAddWithAllocs(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
var s Set
n := start
for n < end {
s.Add(n)
n++
}
}
})
})
}
}
func BenchmarkMapAddWithAllocs(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
m := make(map[uint64]struct{})
n := start
for n < end {
m[n] = struct{}{}
n++
}
}
})
})
}
}
func BenchmarkMapAddNoAllocs(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
m := make(map[uint64]struct{}, itemsCount)
n := start
for n < end {
m[n] = struct{}{}
n++
}
}
})
})
}
}
func BenchmarkMapAddReuse(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
m := make(map[uint64]struct{}, itemsCount)
for pb.Next() {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
for k := range m {
delete(m, k)
}
n := start
for n < end {
m[n] = struct{}{}
n++
}
}
})
})
}
}
func BenchmarkSetHasHitRandomLastBits(b *testing.B) {
const itemsCount = 1e5
for _, lastBits := range []uint64{20, 24, 28, 32} {
mask := (uint64(1) << lastBits) - 1
b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
var s Set
var rng fastrand.RNG
for i := 0; i < itemsCount; i++ {
n := start | (uint64(rng.Uint32()) & mask)
s.Add(n)
}
a := s.AppendTo(nil)
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(a)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for _, n := range a {
if !s.Has(n) {
panic("unexpected miss")
}
}
}
})
})
}
}
func BenchmarkMapHasHitRandomLastBits(b *testing.B) {
const itemsCount = 1e5
for _, lastBits := range []uint64{20, 24, 28, 32} {
mask := (uint64(1) << lastBits) - 1
b.Run(fmt.Sprintf("lastBits_%d", lastBits), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
m := make(map[uint64]struct{})
var rng fastrand.RNG
for i := 0; i < itemsCount; i++ {
n := start | (uint64(rng.Uint32()) & mask)
m[n] = struct{}{}
}
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(m)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for n := range m {
if _, ok := m[n]; !ok {
panic("unexpected miss")
}
}
}
})
})
}
}
func BenchmarkSetHasHit(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
var s Set
n := start
for n < end {
s.Add(n)
n++
}
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n := start
for n < end {
if !s.Has(n) {
panic("unexpected miss")
}
n++
}
}
})
})
}
}
func BenchmarkMapHasHit(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
m := make(map[uint64]struct{}, itemsCount)
n := start
for n < end {
m[n] = struct{}{}
n++
}
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n := start
for n < end {
if _, ok := m[n]; !ok {
panic("unexpected miss")
}
n++
}
}
})
})
}
}
func BenchmarkSetHasMiss(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
var s Set
n := start
for n < end {
s.Add(n)
n++
}
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n := end
nEnd := end + itemsCount
for n < nEnd {
if s.Has(n) {
panic("unexpected hit")
}
n++
}
}
})
})
}
}
func BenchmarkMapHasMiss(b *testing.B) {
for _, itemsCount := range []uint64{1e3, 1e4, 1e5, 1e6, 1e7} {
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
start := uint64(time.Now().UnixNano())
end := start + itemsCount
m := make(map[uint64]struct{}, itemsCount)
n := start
for n < end {
m[n] = struct{}{}
n++
}
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(itemsCount))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
n := end
nEnd := end + itemsCount
for n < nEnd {
if _, ok := m[n]; ok {
panic("unexpected hit")
}
n++
}
}
})
})
}
}