mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
lib/uint64set: allow reusing bucket16 structs inside uint64set.Set via uint64set.Release method
This reduces the load on memory allocator in Go runtime in production workload.
This commit is contained in:
parent
db6bd69475
commit
d8e7c1ef27
@ -2021,6 +2021,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
|
||||
if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics {
|
||||
return nil, metricIDsForTimeRange, nil
|
||||
}
|
||||
uint64set.Release(metricIDsForTimeRange)
|
||||
return nil, nil, fmt.Errorf("more than %d time series found on the time range %s; either increase -search.maxUniqueTimeseries or shrink the time range",
|
||||
maxMetrics, tr.String())
|
||||
}
|
||||
@ -2313,6 +2314,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
|
||||
}
|
||||
|
||||
sortedMetricIDs := metricIDs.AppendTo(nil)
|
||||
uint64set.Release(metricIDs)
|
||||
|
||||
// Filter out deleted metricIDs.
|
||||
dmis := is.db.s.getDeletedMetricIDs()
|
||||
@ -2340,9 +2342,11 @@ func (is *indexSearch) searchMetricIDsInternal(tfss []*TagFilters, tr TimeRange,
|
||||
}
|
||||
}
|
||||
if err := is.updateMetricIDsForTagFilters(metricIDs, tfs, tr, maxMetrics+1); err != nil {
|
||||
uint64set.Release(metricIDs)
|
||||
return nil, err
|
||||
}
|
||||
if metricIDs.Len() > maxMetrics {
|
||||
uint64set.Release(metricIDs)
|
||||
return nil, fmt.Errorf("the number of matching unique timeseries exceeds %d; either narrow down the search or increase -search.maxUniqueTimeseries", maxMetrics)
|
||||
}
|
||||
}
|
||||
@ -2380,9 +2384,11 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uint64set.Release(minMetricIDs)
|
||||
minMetricIDs = mIDs
|
||||
}
|
||||
metricIDs.UnionMayOwn(minMetricIDs)
|
||||
metricIDs.Union(minMetricIDs)
|
||||
uint64set.Release(minMetricIDs)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -2734,8 +2740,9 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
|
||||
return
|
||||
}
|
||||
if metricIDs.Len() < maxMetrics {
|
||||
metricIDs.UnionMayOwn(m)
|
||||
metricIDs.Union(m)
|
||||
}
|
||||
uint64set.Release(m)
|
||||
}(minDate)
|
||||
minDate++
|
||||
}
|
||||
@ -2763,7 +2770,8 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metricIDs.UnionMayOwn(m)
|
||||
metricIDs.Union(m)
|
||||
uint64set.Release(m)
|
||||
atomic.AddUint64(&is.db.dateRangeSearchHits, 1)
|
||||
return nil
|
||||
}
|
||||
@ -2790,8 +2798,9 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
||||
return
|
||||
}
|
||||
if metricIDs.Len() < maxMetrics {
|
||||
metricIDs.UnionMayOwn(m)
|
||||
metricIDs.Union(m)
|
||||
}
|
||||
uint64set.Release(m)
|
||||
}(minDate)
|
||||
minDate++
|
||||
}
|
||||
@ -2978,6 +2987,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||
} else {
|
||||
metricIDs.Intersect(m)
|
||||
}
|
||||
uint64set.Release(m)
|
||||
}
|
||||
if metricIDs.Len() == 0 {
|
||||
// There is no need in applying tfsPostponed, since the result is empty.
|
||||
|
@ -6,6 +6,8 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
)
|
||||
|
||||
// Set is a fast set for uint64.
|
||||
@ -35,6 +37,18 @@ func (s *bucket32Sorter) Swap(i, j int) {
|
||||
a[i], a[j] = a[j], a[i]
|
||||
}
|
||||
|
||||
// Release releases resources occupied by s.
|
||||
//
|
||||
// s mustn't be used after Release call.
|
||||
func Release(s *Set) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
for i := range s.buckets {
|
||||
releaseBucket32(&s.buckets[i])
|
||||
}
|
||||
}
|
||||
|
||||
// Clone returns an independent copy of s.
|
||||
func (s *Set) Clone() *Set {
|
||||
if s == nil || s.itemsCount == 0 {
|
||||
@ -247,34 +261,18 @@ func (s *Set) sort() {
|
||||
|
||||
// Union adds all the items from a to s.
|
||||
func (s *Set) Union(a *Set) {
|
||||
s.union(a, false)
|
||||
}
|
||||
|
||||
// UnionMayOwn adds all the items from a to s.
|
||||
//
|
||||
// It may own a if s is empty. This means that `a` cannot be used
|
||||
// after the call to UnionMayOwn.
|
||||
func (s *Set) UnionMayOwn(a *Set) {
|
||||
s.union(a, true)
|
||||
}
|
||||
|
||||
func (s *Set) union(a *Set, mayOwn bool) {
|
||||
if a.Len() == 0 {
|
||||
// Fast path - nothing to union.
|
||||
return
|
||||
}
|
||||
if s.Len() == 0 {
|
||||
// Fast path - copy `a` to `s`.
|
||||
if !mayOwn {
|
||||
a = a.Clone()
|
||||
}
|
||||
a = a.Clone()
|
||||
*s = *a
|
||||
return
|
||||
}
|
||||
// Make shallow copy of `a`, since it can be modified by a.sort().
|
||||
if !mayOwn {
|
||||
a = a.cloneShallow()
|
||||
}
|
||||
a = a.cloneShallow()
|
||||
a.sort()
|
||||
s.sort()
|
||||
i := 0
|
||||
@ -301,7 +299,7 @@ func (s *Set) union(a *Set, mayOwn bool) {
|
||||
break
|
||||
}
|
||||
if s.buckets[i].hi == a.buckets[j].hi {
|
||||
s.buckets[i].union(&a.buckets[j], mayOwn)
|
||||
s.buckets[i].union(&a.buckets[j])
|
||||
i++
|
||||
j++
|
||||
}
|
||||
@ -412,6 +410,12 @@ type bucket32 struct {
|
||||
buckets []*bucket16
|
||||
}
|
||||
|
||||
func releaseBucket32(b32 *bucket32) {
|
||||
for _, b16 := range b32.buckets {
|
||||
putBucket16(b16)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bucket32) getLen() int {
|
||||
n := 0
|
||||
for i := range b.buckets {
|
||||
@ -420,7 +424,7 @@ func (b *bucket32) getLen() int {
|
||||
return n
|
||||
}
|
||||
|
||||
func (b *bucket32) union(a *bucket32, mayOwn bool) {
|
||||
func (b *bucket32) union(a *bucket32) {
|
||||
i := 0
|
||||
j := 0
|
||||
bBucketsLen := len(b.buckets)
|
||||
@ -431,22 +435,14 @@ func (b *bucket32) union(a *bucket32, mayOwn bool) {
|
||||
if i >= bBucketsLen {
|
||||
for j < len(a.b16his) {
|
||||
b16 := b.addBucket16(a.b16his[j])
|
||||
if mayOwn {
|
||||
*b16 = *a.buckets[j]
|
||||
} else {
|
||||
a.buckets[j].copyTo(b16)
|
||||
}
|
||||
a.buckets[j].copyTo(b16)
|
||||
j++
|
||||
}
|
||||
break
|
||||
}
|
||||
for j < len(a.b16his) && a.b16his[j] < b.b16his[i] {
|
||||
b16 := b.addBucket16(a.b16his[j])
|
||||
if mayOwn {
|
||||
*b16 = *a.buckets[j]
|
||||
} else {
|
||||
a.buckets[j].copyTo(b16)
|
||||
}
|
||||
a.buckets[j].copyTo(b16)
|
||||
j++
|
||||
}
|
||||
if j >= len(a.b16his) {
|
||||
@ -558,7 +554,7 @@ func (b *bucket32) copyTo(dst *bucket32) {
|
||||
if len(b.buckets) > 0 {
|
||||
dst.buckets = make([]*bucket16, len(b.buckets))
|
||||
for i, b16 := range b.buckets {
|
||||
b16Dst := &bucket16{}
|
||||
b16Dst := getBucket16()
|
||||
b16.copyTo(b16Dst)
|
||||
dst.buckets[i] = b16Dst
|
||||
}
|
||||
@ -632,7 +628,8 @@ func (b *bucket32) addSlow(hi, lo uint16) bool {
|
||||
|
||||
func (b *bucket32) addBucket16(hi uint16) *bucket16 {
|
||||
b.b16his = append(b.b16his, hi)
|
||||
b.buckets = append(b.buckets, &bucket16{})
|
||||
b16 := getBucket16()
|
||||
b.buckets = append(b.buckets, b16)
|
||||
return b.buckets[len(b.buckets)-1]
|
||||
}
|
||||
|
||||
@ -647,7 +644,7 @@ func (b *bucket32) addBucketAtPos(hi uint16, pos int) *bucket16 {
|
||||
b.b16his = append(b.b16his[:pos+1], b.b16his[pos:]...)
|
||||
b.b16his[pos] = hi
|
||||
b.buckets = append(b.buckets[:pos+1], b.buckets[pos:]...)
|
||||
b16 := &bucket16{}
|
||||
b16 := getBucket16()
|
||||
b.buckets[pos] = b16
|
||||
return b16
|
||||
}
|
||||
@ -710,6 +707,40 @@ type bucket16 struct {
|
||||
|
||||
const smallPoolSize = 56
|
||||
|
||||
func getBucket16() *bucket16 {
|
||||
select {
|
||||
case b16 := <-bucket16Pool:
|
||||
return b16
|
||||
default:
|
||||
return &bucket16{}
|
||||
}
|
||||
}
|
||||
|
||||
func putBucket16(b16 *bucket16) {
|
||||
b16.reset()
|
||||
select {
|
||||
case bucket16Pool <- b16:
|
||||
default:
|
||||
// Drop b16 in order to reduce memory usage
|
||||
}
|
||||
}
|
||||
|
||||
// Use a chan instead of sync.Pool for reducing memory usage on systems with big number of CPU cores,
|
||||
// since sync.Pool holds per-CPU pools of potentially big bucket16 structs (more than 64KB each).
|
||||
var bucket16Pool = make(chan *bucket16, 100*cgroup.AvailableCPUs())
|
||||
|
||||
func (b *bucket16) reset() {
|
||||
if bits := b.bits; bits != nil {
|
||||
for i := range bits {
|
||||
bits[i] = 0
|
||||
}
|
||||
}
|
||||
for i := range b.smallPool {
|
||||
b.smallPool[i] = 0
|
||||
}
|
||||
b.smallPoolLen = 0
|
||||
}
|
||||
|
||||
func (b *bucket16) isZero() bool {
|
||||
return b.bits == nil && b.smallPoolLen == 0
|
||||
}
|
||||
|
@ -46,6 +46,7 @@ func TestSetOps(t *testing.T) {
|
||||
if !sbOrig.Equal(sb) {
|
||||
t.Fatalf("sbOrig must be equal to sb after sa.Union(sb); got\n%v\nvs\n%v", sbOrig, sb)
|
||||
}
|
||||
Release(sa)
|
||||
|
||||
// Verify sb.Union(sa)
|
||||
sa = saOrig.Clone()
|
||||
@ -56,27 +57,7 @@ func TestSetOps(t *testing.T) {
|
||||
if !saOrig.Equal(sa) {
|
||||
t.Fatalf("saOrig must be equal to sa after sb.Union(sa); got\n%v\nvs\n%v", saOrig, sa)
|
||||
}
|
||||
|
||||
// Verify sa.UnionMayOwn(sb)
|
||||
sa = saOrig.Clone()
|
||||
sb = sbOrig.Clone()
|
||||
sa.UnionMayOwn(sb)
|
||||
if err := expectEqual(sa, mUnion); err != nil {
|
||||
t.Fatalf("invalid sa.UnionMayOwn(sb): %s", err)
|
||||
}
|
||||
if !sbOrig.Equal(sb) {
|
||||
t.Fatalf("sbOrig must be equal to sb after sa.UnionMayOwn(sb); got\n%v\nvs\n%v", sbOrig, sb)
|
||||
}
|
||||
|
||||
// Verify sb.UnionMayOwn(sa)
|
||||
sa = saOrig.Clone()
|
||||
sb.UnionMayOwn(sa)
|
||||
if err := expectEqual(sb, mUnion); err != nil {
|
||||
t.Fatalf("invalid sb.UnionMayOwn(sa): %s", err)
|
||||
}
|
||||
if !saOrig.Equal(sa) {
|
||||
t.Fatalf("saOrig must be equal to sa after sb.UnionMayOwn(sa); got\n%v\nvs\n%v", saOrig, sa)
|
||||
}
|
||||
Release(sb)
|
||||
|
||||
// Verify sa.Intersect(sb)
|
||||
sa = saOrig.Clone()
|
||||
@ -88,6 +69,7 @@ func TestSetOps(t *testing.T) {
|
||||
if !sbOrig.Equal(sb) {
|
||||
t.Fatalf("sbOrig must be equal to sb after sa.Intersect(sb); got\n%v\nvs\n%v", sbOrig, sb)
|
||||
}
|
||||
Release(sa)
|
||||
|
||||
// Verify sb.Intersect(sa)
|
||||
sa = saOrig.Clone()
|
||||
@ -98,6 +80,8 @@ func TestSetOps(t *testing.T) {
|
||||
if !saOrig.Equal(sa) {
|
||||
t.Fatalf("saOrig must be equal to sa after sb.Intersect(sa); got\n%v\nvs\n%v", saOrig, sa)
|
||||
}
|
||||
Release(sa)
|
||||
Release(sb)
|
||||
|
||||
// Verify sa.Subtract(sb)
|
||||
mSubtractAB := make(map[uint64]bool)
|
||||
@ -116,6 +100,7 @@ func TestSetOps(t *testing.T) {
|
||||
if !sbOrig.Equal(sb) {
|
||||
t.Fatalf("sbOrig must be equal to sb after sa.Subtract(sb); got\n%v\nvs\n%v", sbOrig, sb)
|
||||
}
|
||||
Release(sa)
|
||||
|
||||
// Verify sb.Subtract(sa)
|
||||
mSubtractBA := make(map[uint64]bool)
|
||||
@ -133,6 +118,8 @@ func TestSetOps(t *testing.T) {
|
||||
if !saOrig.Equal(sa) {
|
||||
t.Fatalf("saOrig must be equal to sa after sb.Subtract(sa); got\n%v\nvs\n%v", saOrig, sa)
|
||||
}
|
||||
Release(sa)
|
||||
Release(sb)
|
||||
}
|
||||
|
||||
f(nil, nil)
|
||||
@ -277,6 +264,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
|
||||
if n := sCopy.Len(); n != 0 {
|
||||
t.Fatalf("unexpected sCopy.Len() after double deleting the item; got %d; want 0", n)
|
||||
}
|
||||
Release(sNil)
|
||||
}
|
||||
|
||||
// Verify forward Add
|
||||
@ -410,6 +398,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
|
||||
if itemsCount > 0 && calls != 1 {
|
||||
t.Fatalf("Unexpected number of ForEach callback calls; got %d; want %d", calls, 1)
|
||||
}
|
||||
Release(&s)
|
||||
|
||||
// Verify ForEach on nil set.
|
||||
var s1 *Set
|
||||
@ -449,38 +438,10 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
|
||||
if n := s3.Len(); n != expectedLen {
|
||||
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify UnionMayOwn
|
||||
{
|
||||
const unionOffset = 12345
|
||||
var s1, s2 Set
|
||||
for i := 0; i < itemsCount; i++ {
|
||||
s1.Add(uint64(i) + offset)
|
||||
s2.Add(uint64(i) + offset + unionOffset)
|
||||
}
|
||||
s1.UnionMayOwn(&s2)
|
||||
expectedLen := 2 * itemsCount
|
||||
if itemsCount > unionOffset {
|
||||
expectedLen = itemsCount + unionOffset
|
||||
}
|
||||
if n := s1.Len(); n != expectedLen {
|
||||
t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen)
|
||||
}
|
||||
|
||||
// Verify union on empty set.
|
||||
var s3 Set
|
||||
expectedLen = s1.Len()
|
||||
s3.UnionMayOwn(&s1)
|
||||
if n := s3.Len(); n != expectedLen {
|
||||
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
|
||||
}
|
||||
var s4 Set
|
||||
expectedLen = s3.Len()
|
||||
s3.UnionMayOwn(&s4)
|
||||
if n := s3.Len(); n != expectedLen {
|
||||
t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen)
|
||||
}
|
||||
Release(&s1)
|
||||
Release(&s2)
|
||||
Release(&s3)
|
||||
Release(&s4)
|
||||
}
|
||||
|
||||
// Verify intersect
|
||||
@ -521,6 +482,10 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
|
||||
if n := s4.Len(); n != expectedLen {
|
||||
t.Fatalf("unexpected s4.Len() after intersect with empty set; got %d; want %d", n, expectedLen)
|
||||
}
|
||||
Release(&s1)
|
||||
Release(&s2)
|
||||
Release(&s3)
|
||||
Release(&s4)
|
||||
}
|
||||
|
||||
// Verify subtract
|
||||
@ -547,6 +512,9 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
|
||||
if n := s3.Len(); n != 0 {
|
||||
t.Fatalf("unexpected s3.Len() after subtract from empty set; got %d; want %d", n, expectedLen)
|
||||
}
|
||||
Release(&s1)
|
||||
Release(&s2)
|
||||
Release(&s3)
|
||||
}
|
||||
|
||||
// Verify Del
|
||||
@ -597,6 +565,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
|
||||
t.Fatalf("missing bit %d on sCopy", uint64(i)+offset)
|
||||
}
|
||||
}
|
||||
Release(sCopy)
|
||||
}
|
||||
|
||||
func TestSetSparseItems(t *testing.T) {
|
||||
|
@ -13,6 +13,7 @@ func BenchmarkAddMulti(b *testing.B) {
|
||||
start := uint64(time.Now().UnixNano())
|
||||
sa := createRangeSet(start, itemsCount)
|
||||
a := sa.AppendTo(nil)
|
||||
Release(sa)
|
||||
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
|
||||
benchmarkAddMulti(b, a)
|
||||
})
|
||||
@ -24,6 +25,7 @@ func BenchmarkAdd(b *testing.B) {
|
||||
start := uint64(time.Now().UnixNano())
|
||||
sa := createRangeSet(start, itemsCount)
|
||||
a := sa.AppendTo(nil)
|
||||
Release(sa)
|
||||
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
|
||||
benchmarkAdd(b, a)
|
||||
})
|
||||
@ -38,6 +40,8 @@ func BenchmarkUnionNoOverlap(b *testing.B) {
|
||||
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
|
||||
benchmarkUnion(b, sa, sb)
|
||||
})
|
||||
Release(sa)
|
||||
Release(sb)
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,6 +53,8 @@ func BenchmarkUnionPartialOverlap(b *testing.B) {
|
||||
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
|
||||
benchmarkUnion(b, sa, sb)
|
||||
})
|
||||
Release(sa)
|
||||
Release(sb)
|
||||
}
|
||||
}
|
||||
|
||||
@ -60,6 +66,8 @@ func BenchmarkUnionFullOverlap(b *testing.B) {
|
||||
b.Run(fmt.Sprintf("items_%d", itemsCount), func(b *testing.B) {
|
||||
benchmarkUnion(b, sa, sb)
|
||||
})
|
||||
Release(sa)
|
||||
Release(sb)
|
||||
}
|
||||
}
|
||||
|
||||
@ -72,6 +80,7 @@ func benchmarkAdd(b *testing.B, a []uint64) {
|
||||
for _, x := range a {
|
||||
s.Add(x)
|
||||
}
|
||||
Release(&s)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -91,6 +100,7 @@ func benchmarkAddMulti(b *testing.B, a []uint64) {
|
||||
s.AddMulti(a[n:m])
|
||||
n = m
|
||||
}
|
||||
Release(&s)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -104,6 +114,8 @@ func benchmarkUnion(b *testing.B, sa, sb *Set) {
|
||||
sbCopy := sb.Clone()
|
||||
saCopy.Union(sb)
|
||||
sbCopy.Union(sa)
|
||||
Release(saCopy)
|
||||
Release(sbCopy)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -150,6 +162,8 @@ func benchmarkIntersect(b *testing.B, sa, sb *Set) {
|
||||
sbCopy := sb.Clone()
|
||||
saCopy.Intersect(sb)
|
||||
sbCopy.Intersect(sa)
|
||||
Release(saCopy)
|
||||
Release(sbCopy)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -379,6 +393,7 @@ func BenchmarkSetHasHit(b *testing.B) {
|
||||
}
|
||||
}
|
||||
})
|
||||
Release(&s)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -440,6 +455,7 @@ func BenchmarkSetHasMiss(b *testing.B) {
|
||||
}
|
||||
}
|
||||
})
|
||||
Release(&s)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user