From d3442b40b289ef75003089152f456e8d7ab20dfd Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 21 Jul 2020 20:56:49 +0300 Subject: [PATCH] lib/uint64set: optimize adding items to the set via Set.AddMulti --- lib/storage/index_db.go | 12 +-- lib/storage/storage.go | 4 +- lib/uint64set/uint64set.go | 138 +++++++++++++++++++++---- lib/uint64set/uint64set_test.go | 69 +++++++++++++ lib/uint64set/uint64set_timing_test.go | 54 ++++++++++ 5 files changed, 246 insertions(+), 31 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index e55f486ab..65e598ff8 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1218,9 +1218,7 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { // atomically add deleted metricIDs to an inmemory map. dmis := &uint64set.Set{} - for _, metricID := range metricIDs { - dmis.Add(metricID) - } + dmis.AddMulti(metricIDs) db.updateDeletedMetricIDs(dmis) // Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs. @@ -2177,9 +2175,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr return errFallbackToMetricNameMatch } mp.ParseMetricIDs() - for _, metricID := range mp.MetricIDs { - metricIDs.Add(metricID) - } + metricIDs.AddMulti(mp.MetricIDs) } if err := ts.Error(); err != nil { return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err) @@ -2750,9 +2746,7 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64 return err } mp.ParseMetricIDs() - for _, metricID := range mp.MetricIDs { - metricIDs.Add(metricID) - } + metricIDs.AddMulti(mp.MetricIDs) if metricIDs.Len() >= maxMetrics { return nil } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 0bf71257b..a2a0915e3 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -861,9 +861,7 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error { // Store the pre-fetched metricIDs, so they aren't pre-fetched next time. prefetchedMetricIDsNew := prefetchedMetricIDs.Clone() - for _, metricID := range metricIDs { - prefetchedMetricIDsNew.Add(metricID) - } + prefetchedMetricIDsNew.AddMulti(metricIDs) s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew) return nil } diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go index 194b4aeeb..ecd7b2efa 100644 --- a/lib/uint64set/uint64set.go +++ b/lib/uint64set/uint64set.go @@ -126,14 +126,51 @@ func (s *Set) Add(x uint64) { return } } - s.addAlloc(hi32, lo32) + b32 := s.addBucket32() + b32.hi = hi32 + _ = b32.add(lo32) + s.itemsCount++ } -func (s *Set) addAlloc(hi, lo uint32) { - b32 := s.addBucket32() - b32.hi = hi - _ = b32.add(lo) - s.itemsCount++ +// AddMulti adds all the items from a to s. +// +// It is usually faster than calling s.Add() for each item in a. +// +// The caller is responsible for splitting a into items with clustered values. +func (s *Set) AddMulti(a []uint64) { + if len(a) == 0 { + return + } + slowPath := false + hi := uint32(a[0] >> 32) + for _, x := range a[1:] { + if hi != uint32(x>>32) { + slowPath = true + break + } + } + if slowPath { + for _, x := range a { + s.Add(x) + } + return + } + // Fast path - all the items in a have identical higher 32 bits. + // Put them in a bulk into the corresponding bucket32. + bs := s.buckets + var b32 *bucket32 + for i := range bs { + if bs[i].hi == hi { + b32 = &bs[i] + break + } + } + if b32 == nil { + b32 = s.addBucket32() + b32.hi = hi + } + n := b32.addMulti(a) + s.itemsCount += n } func (s *Set) addBucket32() *bucket32 { @@ -568,11 +605,53 @@ func (b *bucket32) add(x uint32) bool { return b.addSlow(hi, lo) } +func (b *bucket32) addMulti(a []uint64) int { + if len(a) == 0 { + return 0 + } + hi := uint16(a[0] >> 16) + slowPath := false + for _, x := range a[1:] { + if hi != uint16(x>>16) { + slowPath = true + break + } + } + if slowPath { + count := 0 + for _, x := range a { + if b.add(uint32(x)) { + count++ + } + } + return count + } + // Fast path - all the items in a have identical higher 32+16 bits. + // Put them to a single bucket16 in a bulk. + var b16 *bucket16 + his := b.b16his + bs := b.buckets + if n := b.getHint(); n < uint32(len(his)) && his[n] == hi { + b16 = &bs[n] + } + if b16 == nil { + n := binarySearch16(his, hi) + if n < 0 || n >= len(his) || his[n] != hi { + b16 = b.addBucketAtPos(hi, n) + } else { + b.setHint(n) + b16 = &bs[n] + } + } + return b16.addMulti(a) +} + func (b *bucket32) addSlow(hi, lo uint16) bool { his := b.b16his n := binarySearch16(his, hi) if n < 0 || n >= len(his) || his[n] != hi { - b.addAlloc(hi, lo, n) + b16 := b.addBucketAtPos(hi, n) + b16.add(lo) return true } b.setHint(n) @@ -586,22 +665,20 @@ func (b *bucket32) addBucket16(hi uint16) *bucket16 { return &b.buckets[len(b.buckets)-1] } -func (b *bucket32) addAlloc(hi, lo uint16, n int) { - if n < 0 { +func (b *bucket32) addBucketAtPos(hi uint16, pos int) *bucket16 { + if pos < 0 { // This is a hint to Go compiler to remove automatic bounds checks below. - return + return nil } - if n >= len(b.b16his) { - b16 := b.addBucket16(hi) - _ = b16.add(lo) - return + if pos >= len(b.b16his) { + return b.addBucket16(hi) } - b.b16his = append(b.b16his[:n+1], b.b16his[n:]...) - b.b16his[n] = hi - b.buckets = append(b.buckets[:n+1], b.buckets[n:]...) - b16 := &b.buckets[n] + b.b16his = append(b.b16his[:pos+1], b.b16his[pos:]...) + b.b16his[pos] = hi + b.buckets = append(b.buckets[:pos+1], b.buckets[pos:]...) + b16 := &b.buckets[pos] *b16 = bucket16{} - _ = b16.add(lo) + return b16 } func (b *bucket32) has(x uint32) bool { @@ -766,6 +843,29 @@ func (b *bucket16) add(x uint16) bool { return ok } +func (b *bucket16) addMulti(a []uint64) int { + count := 0 + if b.bits == nil { + // Slow path + for _, x := range a { + if b.add(uint16(x)) { + count++ + } + } + } else { + // Fast path + for _, x := range a { + wordNum, bitMask := getWordNumBitMask(uint16(x)) + word := &b.bits[wordNum] + if *word&bitMask == 0 { + count++ + } + *word |= bitMask + } + } + return count +} + func (b *bucket16) addToSmallPool(x uint16) bool { if b.hasInSmallPool(x) { return false diff --git a/lib/uint64set/uint64set_test.go b/lib/uint64set/uint64set_test.go index 7fcb0523e..5cd97cf95 100644 --- a/lib/uint64set/uint64set_test.go +++ b/lib/uint64set/uint64set_test.go @@ -688,3 +688,72 @@ func testSetSparseItems(t *testing.T, itemsCount int) { } } } + +func TestAddMulti(t *testing.T) { + f := func(a []uint64) { + t.Helper() + var s1, s2 Set + s1.AddMulti(a) + for _, x := range a { + s2.Add(x) + } + if s1.Len() != s2.Len() { + t.Fatalf("unexpected number of items in the set; got %d; want %d\nset:\n%d", s1.Len(), s2.Len(), s1.AppendTo(nil)) + } + for _, x := range a { + if !s1.Has(x) { + t.Fatalf("missing item %d in the set", x) + } + } + a1 := s1.AppendTo(nil) + a2 := s2.AppendTo(nil) + if !reflect.DeepEqual(a1, a2) { + t.Fatalf("unexpected items in the set;\ngot\n%d\nwant\n%d", a1, a2) + } + + // Try removing and then adding again all the items via AddMulti() to s1. + for _, x := range a { + s1.Del(x) + } + if s1.Len() != 0 { + t.Fatalf("non-zero number of items left after deletion: %d", s1.Len()) + } + s1.AddMulti(a) + if s1.Len() != s2.Len() { + t.Fatalf("unexpected number of items in the reused set; got %d; want %d", s1.Len(), s2.Len()) + } + for _, x := range a { + if !s1.Has(x) { + t.Fatalf("missing item %d in the reused set", x) + } + } + a1 = s1.AppendTo(nil) + a2 = s2.AppendTo(nil) + if !reflect.DeepEqual(a1, a2) { + t.Fatalf("unexpected items in the reused set;\ngot\n%d\nwant\n%d", a1, a2) + } + } + f(nil) + f([]uint64{1}) + f([]uint64{0, 1, 2, 3, 4, 5}) + f([]uint64{0, 1 << 16, 2 << 16, 2<<16 + 1}) + f([]uint64{0, 1 << 16, 2 << 16, 2<<16 + 1, 1 << 32, 2 << 32, 2<<32 + 1}) + + var a []uint64 + for i := 0; i < 32000; i++ { + a = append(a, uint64(i)) + } + f(a) + + a = nil + for i := 0; i < 32000; i++ { + a = append(a, 1<<16+uint64(i)) + } + f(a) + + a = nil + for i := 0; i < 100000; i++ { + a = append(a, 1<<32+uint64(i)) + } + f(a) +} diff --git a/lib/uint64set/uint64set_timing_test.go b/lib/uint64set/uint64set_timing_test.go index 1b0d6955c..ebd981d01 100644 --- a/lib/uint64set/uint64set_timing_test.go +++ b/lib/uint64set/uint64set_timing_test.go @@ -8,6 +8,28 @@ import ( "github.com/valyala/fastrand" ) +func BenchmarkAddMulti(b *testing.B) { + for _, itemsCount := range []int{1e3, 1e4, 1e5, 1e6, 1e7} { + start := uint64(time.Now().UnixNano()) + sa := createRangeSet(start, itemsCount) + a := sa.AppendTo(nil) + b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { + benchmarkAddMulti(b, a) + }) + } +} + +func BenchmarkAdd(b *testing.B) { + for _, itemsCount := range []int{1e3, 1e4, 1e5, 1e6, 1e7} { + start := uint64(time.Now().UnixNano()) + sa := createRangeSet(start, itemsCount) + a := sa.AppendTo(nil) + b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) { + benchmarkAdd(b, a) + }) + } +} + func BenchmarkUnionNoOverlap(b *testing.B) { for _, itemsCount := range []int{1e3, 1e4, 1e5, 1e6, 1e7} { start := uint64(time.Now().UnixNano()) @@ -41,6 +63,38 @@ func BenchmarkUnionFullOverlap(b *testing.B) { } } +func benchmarkAdd(b *testing.B, a []uint64) { + b.ReportAllocs() + b.SetBytes(int64(len(a))) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var s Set + for _, x := range a { + s.Add(x) + } + } + }) +} + +func benchmarkAddMulti(b *testing.B, a []uint64) { + b.ReportAllocs() + b.SetBytes(int64(len(a))) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var s Set + n := 0 + for n < len(a) { + m := n + 64 + if m > len(a) { + m = len(a) + } + s.AddMulti(a[n:m]) + n = m + } + } + }) +} + func benchmarkUnion(b *testing.B, sa, sb *Set) { b.ReportAllocs() b.SetBytes(int64(sa.Len() + sb.Len()))