lib/uint64set: optimize adding items to the set via Set.AddMulti

This commit is contained in:
Aliaksandr Valialkin 2020-07-21 20:56:49 +03:00
parent caa2952aa6
commit d3442b40b2
5 changed files with 246 additions and 31 deletions

View File

@ -1218,9 +1218,7 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
// atomically add deleted metricIDs to an inmemory map. // atomically add deleted metricIDs to an inmemory map.
dmis := &uint64set.Set{} dmis := &uint64set.Set{}
for _, metricID := range metricIDs { dmis.AddMulti(metricIDs)
dmis.Add(metricID)
}
db.updateDeletedMetricIDs(dmis) db.updateDeletedMetricIDs(dmis)
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs. // Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
@ -2177,9 +2175,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
return errFallbackToMetricNameMatch return errFallbackToMetricNameMatch
} }
mp.ParseMetricIDs() mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs { metricIDs.AddMulti(mp.MetricIDs)
metricIDs.Add(metricID)
}
} }
if err := ts.Error(); err != nil { if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err) return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
@ -2750,9 +2746,7 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
return err return err
} }
mp.ParseMetricIDs() mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs { metricIDs.AddMulti(mp.MetricIDs)
metricIDs.Add(metricID)
}
if metricIDs.Len() >= maxMetrics { if metricIDs.Len() >= maxMetrics {
return nil return nil
} }

View File

@ -861,9 +861,7 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error {
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time. // Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
prefetchedMetricIDsNew := prefetchedMetricIDs.Clone() prefetchedMetricIDsNew := prefetchedMetricIDs.Clone()
for _, metricID := range metricIDs { prefetchedMetricIDsNew.AddMulti(metricIDs)
prefetchedMetricIDsNew.Add(metricID)
}
s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew) s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew)
return nil return nil
} }

View File

@ -126,14 +126,51 @@ func (s *Set) Add(x uint64) {
return return
} }
} }
s.addAlloc(hi32, lo32) b32 := s.addBucket32()
b32.hi = hi32
_ = b32.add(lo32)
s.itemsCount++
} }
func (s *Set) addAlloc(hi, lo uint32) { // AddMulti adds all the items from a to s.
b32 := s.addBucket32() //
b32.hi = hi // It is usually faster than calling s.Add() for each item in a.
_ = b32.add(lo) //
s.itemsCount++ // The caller is responsible for splitting a into items with clustered values.
func (s *Set) AddMulti(a []uint64) {
if len(a) == 0 {
return
}
slowPath := false
hi := uint32(a[0] >> 32)
for _, x := range a[1:] {
if hi != uint32(x>>32) {
slowPath = true
break
}
}
if slowPath {
for _, x := range a {
s.Add(x)
}
return
}
// Fast path - all the items in a have identical higher 32 bits.
// Put them in a bulk into the corresponding bucket32.
bs := s.buckets
var b32 *bucket32
for i := range bs {
if bs[i].hi == hi {
b32 = &bs[i]
break
}
}
if b32 == nil {
b32 = s.addBucket32()
b32.hi = hi
}
n := b32.addMulti(a)
s.itemsCount += n
} }
func (s *Set) addBucket32() *bucket32 { func (s *Set) addBucket32() *bucket32 {
@ -568,11 +605,53 @@ func (b *bucket32) add(x uint32) bool {
return b.addSlow(hi, lo) return b.addSlow(hi, lo)
} }
func (b *bucket32) addMulti(a []uint64) int {
if len(a) == 0 {
return 0
}
hi := uint16(a[0] >> 16)
slowPath := false
for _, x := range a[1:] {
if hi != uint16(x>>16) {
slowPath = true
break
}
}
if slowPath {
count := 0
for _, x := range a {
if b.add(uint32(x)) {
count++
}
}
return count
}
// Fast path - all the items in a have identical higher 32+16 bits.
// Put them to a single bucket16 in a bulk.
var b16 *bucket16
his := b.b16his
bs := b.buckets
if n := b.getHint(); n < uint32(len(his)) && his[n] == hi {
b16 = &bs[n]
}
if b16 == nil {
n := binarySearch16(his, hi)
if n < 0 || n >= len(his) || his[n] != hi {
b16 = b.addBucketAtPos(hi, n)
} else {
b.setHint(n)
b16 = &bs[n]
}
}
return b16.addMulti(a)
}
func (b *bucket32) addSlow(hi, lo uint16) bool { func (b *bucket32) addSlow(hi, lo uint16) bool {
his := b.b16his his := b.b16his
n := binarySearch16(his, hi) n := binarySearch16(his, hi)
if n < 0 || n >= len(his) || his[n] != hi { if n < 0 || n >= len(his) || his[n] != hi {
b.addAlloc(hi, lo, n) b16 := b.addBucketAtPos(hi, n)
b16.add(lo)
return true return true
} }
b.setHint(n) b.setHint(n)
@ -586,22 +665,20 @@ func (b *bucket32) addBucket16(hi uint16) *bucket16 {
return &b.buckets[len(b.buckets)-1] return &b.buckets[len(b.buckets)-1]
} }
func (b *bucket32) addAlloc(hi, lo uint16, n int) { func (b *bucket32) addBucketAtPos(hi uint16, pos int) *bucket16 {
if n < 0 { if pos < 0 {
// This is a hint to Go compiler to remove automatic bounds checks below. // This is a hint to Go compiler to remove automatic bounds checks below.
return return nil
} }
if n >= len(b.b16his) { if pos >= len(b.b16his) {
b16 := b.addBucket16(hi) return b.addBucket16(hi)
_ = b16.add(lo)
return
} }
b.b16his = append(b.b16his[:n+1], b.b16his[n:]...) b.b16his = append(b.b16his[:pos+1], b.b16his[pos:]...)
b.b16his[n] = hi b.b16his[pos] = hi
b.buckets = append(b.buckets[:n+1], b.buckets[n:]...) b.buckets = append(b.buckets[:pos+1], b.buckets[pos:]...)
b16 := &b.buckets[n] b16 := &b.buckets[pos]
*b16 = bucket16{} *b16 = bucket16{}
_ = b16.add(lo) return b16
} }
func (b *bucket32) has(x uint32) bool { func (b *bucket32) has(x uint32) bool {
@ -766,6 +843,29 @@ func (b *bucket16) add(x uint16) bool {
return ok return ok
} }
func (b *bucket16) addMulti(a []uint64) int {
count := 0
if b.bits == nil {
// Slow path
for _, x := range a {
if b.add(uint16(x)) {
count++
}
}
} else {
// Fast path
for _, x := range a {
wordNum, bitMask := getWordNumBitMask(uint16(x))
word := &b.bits[wordNum]
if *word&bitMask == 0 {
count++
}
*word |= bitMask
}
}
return count
}
func (b *bucket16) addToSmallPool(x uint16) bool { func (b *bucket16) addToSmallPool(x uint16) bool {
if b.hasInSmallPool(x) { if b.hasInSmallPool(x) {
return false return false

View File

@ -688,3 +688,72 @@ func testSetSparseItems(t *testing.T, itemsCount int) {
} }
} }
} }
func TestAddMulti(t *testing.T) {
f := func(a []uint64) {
t.Helper()
var s1, s2 Set
s1.AddMulti(a)
for _, x := range a {
s2.Add(x)
}
if s1.Len() != s2.Len() {
t.Fatalf("unexpected number of items in the set; got %d; want %d\nset:\n%d", s1.Len(), s2.Len(), s1.AppendTo(nil))
}
for _, x := range a {
if !s1.Has(x) {
t.Fatalf("missing item %d in the set", x)
}
}
a1 := s1.AppendTo(nil)
a2 := s2.AppendTo(nil)
if !reflect.DeepEqual(a1, a2) {
t.Fatalf("unexpected items in the set;\ngot\n%d\nwant\n%d", a1, a2)
}
// Try removing and then adding again all the items via AddMulti() to s1.
for _, x := range a {
s1.Del(x)
}
if s1.Len() != 0 {
t.Fatalf("non-zero number of items left after deletion: %d", s1.Len())
}
s1.AddMulti(a)
if s1.Len() != s2.Len() {
t.Fatalf("unexpected number of items in the reused set; got %d; want %d", s1.Len(), s2.Len())
}
for _, x := range a {
if !s1.Has(x) {
t.Fatalf("missing item %d in the reused set", x)
}
}
a1 = s1.AppendTo(nil)
a2 = s2.AppendTo(nil)
if !reflect.DeepEqual(a1, a2) {
t.Fatalf("unexpected items in the reused set;\ngot\n%d\nwant\n%d", a1, a2)
}
}
f(nil)
f([]uint64{1})
f([]uint64{0, 1, 2, 3, 4, 5})
f([]uint64{0, 1 << 16, 2 << 16, 2<<16 + 1})
f([]uint64{0, 1 << 16, 2 << 16, 2<<16 + 1, 1 << 32, 2 << 32, 2<<32 + 1})
var a []uint64
for i := 0; i < 32000; i++ {
a = append(a, uint64(i))
}
f(a)
a = nil
for i := 0; i < 32000; i++ {
a = append(a, 1<<16+uint64(i))
}
f(a)
a = nil
for i := 0; i < 100000; i++ {
a = append(a, 1<<32+uint64(i))
}
f(a)
}

View File

@ -8,6 +8,28 @@ import (
"github.com/valyala/fastrand" "github.com/valyala/fastrand"
) )
func BenchmarkAddMulti(b *testing.B) {
for _, itemsCount := range []int{1e3, 1e4, 1e5, 1e6, 1e7} {
start := uint64(time.Now().UnixNano())
sa := createRangeSet(start, itemsCount)
a := sa.AppendTo(nil)
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkAddMulti(b, a)
})
}
}
func BenchmarkAdd(b *testing.B) {
for _, itemsCount := range []int{1e3, 1e4, 1e5, 1e6, 1e7} {
start := uint64(time.Now().UnixNano())
sa := createRangeSet(start, itemsCount)
a := sa.AppendTo(nil)
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
benchmarkAdd(b, a)
})
}
}
func BenchmarkUnionNoOverlap(b *testing.B) { func BenchmarkUnionNoOverlap(b *testing.B) {
for _, itemsCount := range []int{1e3, 1e4, 1e5, 1e6, 1e7} { for _, itemsCount := range []int{1e3, 1e4, 1e5, 1e6, 1e7} {
start := uint64(time.Now().UnixNano()) start := uint64(time.Now().UnixNano())
@ -41,6 +63,38 @@ func BenchmarkUnionFullOverlap(b *testing.B) {
} }
} }
func benchmarkAdd(b *testing.B, a []uint64) {
b.ReportAllocs()
b.SetBytes(int64(len(a)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var s Set
for _, x := range a {
s.Add(x)
}
}
})
}
func benchmarkAddMulti(b *testing.B, a []uint64) {
b.ReportAllocs()
b.SetBytes(int64(len(a)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var s Set
n := 0
for n < len(a) {
m := n + 64
if m > len(a) {
m = len(a)
}
s.AddMulti(a[n:m])
n = m
}
}
})
}
func benchmarkUnion(b *testing.B, sa, sb *Set) { func benchmarkUnion(b *testing.B, sa, sb *Set) {
b.ReportAllocs() b.ReportAllocs()
b.SetBytes(int64(sa.Len() + sb.Len())) b.SetBytes(int64(sa.Len() + sb.Len()))