mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 20:37:12 +01:00
lib/uint64set: optimize Intersect, Subtract and Union functions
This should improve performance for queries over big number of time series.
This commit is contained in:
parent
cd53f7d177
commit
9f027ec176
@ -32,18 +32,23 @@ func (s *bucket32Sorter) Swap(i, j int) {
|
|||||||
|
|
||||||
// Clone returns an independent copy of s.
|
// Clone returns an independent copy of s.
|
||||||
func (s *Set) Clone() *Set {
|
func (s *Set) Clone() *Set {
|
||||||
if s == nil {
|
if s == nil || s.itemsCount == 0 {
|
||||||
// Return an empty set, so data could be added into it later.
|
// Return an empty set, so data could be added into it later.
|
||||||
return &Set{}
|
return &Set{}
|
||||||
}
|
}
|
||||||
var dst Set
|
var dst Set
|
||||||
dst.itemsCount = s.itemsCount
|
dst.itemsCount = s.itemsCount
|
||||||
if len(s.buckets) > 0 {
|
|
||||||
dst.buckets = make([]bucket32, len(s.buckets))
|
dst.buckets = make([]bucket32, len(s.buckets))
|
||||||
for i := range s.buckets {
|
for i := range s.buckets {
|
||||||
s.buckets[i].copyTo(&dst.buckets[i])
|
s.buckets[i].copyTo(&dst.buckets[i])
|
||||||
}
|
}
|
||||||
}
|
return &dst
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Set) cloneShallow() *Set {
|
||||||
|
var dst Set
|
||||||
|
dst.itemsCount = s.itemsCount
|
||||||
|
dst.buckets = append(dst.buckets[:0], s.buckets...)
|
||||||
return &dst
|
return &dst
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -144,16 +149,20 @@ func (s *Set) AppendTo(dst []uint64) []uint64 {
|
|||||||
dst = append(dst[:cap(dst)], make([]uint64, n)...)
|
dst = append(dst[:cap(dst)], make([]uint64, n)...)
|
||||||
dst = dst[:dstLen]
|
dst = dst[:dstLen]
|
||||||
}
|
}
|
||||||
// sort s.buckets if it isn't sorted yet
|
s.sort()
|
||||||
if !sort.IsSorted(&s.buckets) {
|
|
||||||
sort.Sort(&s.buckets)
|
|
||||||
}
|
|
||||||
for i := range s.buckets {
|
for i := range s.buckets {
|
||||||
dst = s.buckets[i].appendTo(dst)
|
dst = s.buckets[i].appendTo(dst)
|
||||||
}
|
}
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Set) sort() {
|
||||||
|
// sort s.buckets if it isn't sorted yet
|
||||||
|
if !sort.IsSorted(&s.buckets) {
|
||||||
|
sort.Sort(&s.buckets)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Union adds all the items from a to s.
|
// Union adds all the items from a to s.
|
||||||
func (s *Set) Union(a *Set) {
|
func (s *Set) Union(a *Set) {
|
||||||
if s.Len() == 0 {
|
if s.Len() == 0 {
|
||||||
@ -181,14 +190,38 @@ func (s *Set) Intersect(a *Set) {
|
|||||||
*s = Set{}
|
*s = Set{}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.ForEach(func(part []uint64) bool {
|
// Make shallow copy of `a`, since it can be modified below.
|
||||||
for _, x := range part {
|
a = a.cloneShallow()
|
||||||
if !a.Has(x) {
|
a.sort()
|
||||||
s.Del(x)
|
s.sort()
|
||||||
|
itemsCount := 0
|
||||||
|
i := 0
|
||||||
|
j := 0
|
||||||
|
for {
|
||||||
|
for i < len(s.buckets) && j <= len(a.buckets) && s.buckets[i].hi < a.buckets[j].hi {
|
||||||
|
s.buckets[i] = bucket32{}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
if i >= len(s.buckets) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for j < len(a.buckets) && a.buckets[j].hi < s.buckets[i].hi {
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
if j >= len(a.buckets) {
|
||||||
|
for i < len(s.buckets) {
|
||||||
|
s.buckets[i] = bucket32{}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if s.buckets[i].hi == a.buckets[j].hi {
|
||||||
|
itemsCount += s.buckets[i].intersect(&a.buckets[j])
|
||||||
|
i++
|
||||||
|
j++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
s.itemsCount = itemsCount
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subtract removes from s all the shared items between s and a.
|
// Subtract removes from s all the shared items between s and a.
|
||||||
@ -242,6 +275,53 @@ type bucket32 struct {
|
|||||||
hi uint32
|
hi uint32
|
||||||
b16his []uint16
|
b16his []uint16
|
||||||
buckets []bucket16
|
buckets []bucket16
|
||||||
|
|
||||||
|
// hint may contain bucket index for the last successful add or del operation.
|
||||||
|
// This allows saving CPU time on subsequent calls to the same bucket.
|
||||||
|
hint int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bucket32) cloneShallow() *bucket32 {
|
||||||
|
var dst bucket32
|
||||||
|
dst.hi = b.hi
|
||||||
|
dst.b16his = append(dst.b16his[:0], b.b16his...)
|
||||||
|
dst.buckets = append(dst.buckets[:0], b.buckets...)
|
||||||
|
dst.hint = b.hint
|
||||||
|
return &dst
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bucket32) intersect(a *bucket32) int {
|
||||||
|
a = a.cloneShallow() // clone a, since is is sorted below.
|
||||||
|
a.sort()
|
||||||
|
b.sort()
|
||||||
|
itemsCount := 0
|
||||||
|
i := 0
|
||||||
|
j := 0
|
||||||
|
for {
|
||||||
|
for i < len(b.b16his) && j < len(a.b16his) && b.b16his[i] < a.b16his[j] {
|
||||||
|
b.buckets[i] = bucket16{}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
if i >= len(b.b16his) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for j < len(a.b16his) && a.b16his[j] < b.b16his[i] {
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
if j >= len(a.b16his) {
|
||||||
|
for i < len(b.b16his) {
|
||||||
|
b.buckets[i] = bucket16{}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if b.b16his[i] == a.b16his[j] {
|
||||||
|
itemsCount += b.buckets[i].intersect(&a.buckets[j])
|
||||||
|
i++
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return itemsCount
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bucket32) forEach(f func(part []uint64) bool) bool {
|
func (b *bucket32) forEach(f func(part []uint64) bool) bool {
|
||||||
@ -288,6 +368,7 @@ func (b *bucket32) copyTo(dst *bucket32) {
|
|||||||
b.buckets[i].copyTo(&dst.buckets[i])
|
b.buckets[i].copyTo(&dst.buckets[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
dst.hint = b.hint
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is for sort.Interface
|
// This is for sort.Interface
|
||||||
@ -305,11 +386,26 @@ const maxUnsortedBuckets = 32
|
|||||||
func (b *bucket32) add(x uint32) bool {
|
func (b *bucket32) add(x uint32) bool {
|
||||||
hi := uint16(x >> 16)
|
hi := uint16(x >> 16)
|
||||||
lo := uint16(x)
|
lo := uint16(x)
|
||||||
if len(b.buckets) > maxUnsortedBuckets {
|
if n := b.hint; n < len(b.b16his) && b.b16his[n] == hi {
|
||||||
|
// Fast path - add to the previously used bucket.
|
||||||
|
return n < len(b.buckets) && b.buckets[n].add(lo)
|
||||||
|
}
|
||||||
return b.addSlow(hi, lo)
|
return b.addSlow(hi, lo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bucket32) addSlow(hi, lo uint16) bool {
|
||||||
|
if len(b.buckets) > maxUnsortedBuckets {
|
||||||
|
n := binarySearch16(b.b16his, hi)
|
||||||
|
b.hint = n
|
||||||
|
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
|
||||||
|
b.addAllocBig(hi, lo, n)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return n < len(b.buckets) && b.buckets[n].add(lo)
|
||||||
}
|
}
|
||||||
for i, hi16 := range b.b16his {
|
for i, hi16 := range b.b16his {
|
||||||
if hi16 == hi {
|
if hi16 == hi {
|
||||||
|
b.hint = i
|
||||||
return i < len(b.buckets) && b.buckets[i].add(lo)
|
return i < len(b.buckets) && b.buckets[i].add(lo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -331,15 +427,6 @@ func (b *bucket32) addBucket16() *bucket16 {
|
|||||||
return &b.buckets[len(b.buckets)-1]
|
return &b.buckets[len(b.buckets)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bucket32) addSlow(hi, lo uint16) bool {
|
|
||||||
n := binarySearch16(b.b16his, hi)
|
|
||||||
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
|
|
||||||
b.addAllocBig(hi, lo, n)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return n < len(b.buckets) && b.buckets[n].add(lo)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *bucket32) addAllocBig(hi, lo uint16, n int) {
|
func (b *bucket32) addAllocBig(hi, lo uint16, n int) {
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
// This is a hint to Go compiler to remove automatic bounds checks below.
|
// This is a hint to Go compiler to remove automatic bounds checks below.
|
||||||
@ -384,28 +471,34 @@ func (b *bucket32) hasSlow(hi, lo uint16) bool {
|
|||||||
func (b *bucket32) del(x uint32) bool {
|
func (b *bucket32) del(x uint32) bool {
|
||||||
hi := uint16(x >> 16)
|
hi := uint16(x >> 16)
|
||||||
lo := uint16(x)
|
lo := uint16(x)
|
||||||
if len(b.buckets) > maxUnsortedBuckets {
|
if n := b.hint; n < len(b.b16his) && b.b16his[n] == hi {
|
||||||
|
// Fast path - use the bucket from the previous operation.
|
||||||
|
return n < len(b.buckets) && b.buckets[n].del(lo)
|
||||||
|
}
|
||||||
return b.delSlow(hi, lo)
|
return b.delSlow(hi, lo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bucket32) delSlow(hi, lo uint16) bool {
|
||||||
|
if len(b.buckets) > maxUnsortedBuckets {
|
||||||
|
n := binarySearch16(b.b16his, hi)
|
||||||
|
b.hint = n
|
||||||
|
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return n < len(b.buckets) && b.buckets[n].del(lo)
|
||||||
}
|
}
|
||||||
for i, hi16 := range b.b16his {
|
for i, hi16 := range b.b16his {
|
||||||
if hi16 == hi {
|
if hi16 == hi {
|
||||||
|
b.hint = i
|
||||||
return i < len(b.buckets) && b.buckets[i].del(lo)
|
return i < len(b.buckets) && b.buckets[i].del(lo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bucket32) delSlow(hi, lo uint16) bool {
|
|
||||||
n := binarySearch16(b.b16his, hi)
|
|
||||||
if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return n < len(b.buckets) && b.buckets[n].del(lo)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *bucket32) appendTo(dst []uint64) []uint64 {
|
func (b *bucket32) appendTo(dst []uint64) []uint64 {
|
||||||
if len(b.buckets) <= maxUnsortedBuckets && !sort.IsSorted(b) {
|
if len(b.buckets) <= maxUnsortedBuckets {
|
||||||
sort.Sort(b)
|
b.sort()
|
||||||
}
|
}
|
||||||
for i := range b.buckets {
|
for i := range b.buckets {
|
||||||
hi16 := b.b16his[i]
|
hi16 := b.b16his[i]
|
||||||
@ -414,6 +507,12 @@ func (b *bucket32) appendTo(dst []uint64) []uint64 {
|
|||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *bucket32) sort() {
|
||||||
|
if !sort.IsSorted(b) {
|
||||||
|
sort.Sort(b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
bitsPerBucket = 1 << 16
|
bitsPerBucket = 1 << 16
|
||||||
wordsPerBucket = bitsPerBucket / 64
|
wordsPerBucket = bitsPerBucket / 64
|
||||||
@ -425,6 +524,38 @@ type bucket16 struct {
|
|||||||
smallPool [56]uint16
|
smallPool [56]uint16
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *bucket16) intersect(a *bucket16) int {
|
||||||
|
itemsCount := 0
|
||||||
|
if a.bits != nil && b.bits != nil {
|
||||||
|
// Fast path - use bitwise ops
|
||||||
|
for i, ax := range a.bits {
|
||||||
|
bx := b.bits[i]
|
||||||
|
bx &= ax
|
||||||
|
if bx > 0 {
|
||||||
|
itemsCount += bits.OnesCount64(bx)
|
||||||
|
}
|
||||||
|
b.bits[i] = bx
|
||||||
|
}
|
||||||
|
return itemsCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path
|
||||||
|
xbuf := partBufPool.Get().(*[]uint64)
|
||||||
|
buf := *xbuf
|
||||||
|
buf = b.appendTo(buf[:0], 0, 0)
|
||||||
|
itemsCount = len(buf)
|
||||||
|
for _, x := range buf {
|
||||||
|
x16 := uint16(x)
|
||||||
|
if !a.has(x16) {
|
||||||
|
b.del(x16)
|
||||||
|
itemsCount--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*xbuf = buf
|
||||||
|
partBufPool.Put(xbuf)
|
||||||
|
return itemsCount
|
||||||
|
}
|
||||||
|
|
||||||
func (b *bucket16) sizeBytes() uint64 {
|
func (b *bucket16) sizeBytes() uint64 {
|
||||||
return uint64(unsafe.Sizeof(*b)) + uint64(unsafe.Sizeof(*b.bits))
|
return uint64(unsafe.Sizeof(*b)) + uint64(unsafe.Sizeof(*b.bits))
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user