From 7e526effaa982f79dcecea9cbda9bccf014ad243 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 20 May 2021 13:13:40 +0300 Subject: [PATCH] app/vmagent: add ability to limit series cardinality on a per-hour and per-day basis --- README.md | 2 +- app/vmagent/README.md | 26 ++++++- app/vmagent/remotewrite/remotewrite.go | 92 +++++++++++++++++++++- docs/CHANGELOG.md | 1 + docs/vmagent.md | 26 ++++++- lib/bloomfilter/filter.go | 90 ++++++++++++++++++++++ lib/bloomfilter/filter_test.go | 101 +++++++++++++++++++++++++ lib/bloomfilter/filter_timing_test.go | 82 ++++++++++++++++++++ lib/bloomfilter/limiter.go | 59 +++++++++++++++ lib/bloomfilter/limiter_test.go | 90 ++++++++++++++++++++++ 10 files changed, 564 insertions(+), 5 deletions(-) create mode 100644 lib/bloomfilter/filter.go create mode 100644 lib/bloomfilter/filter_test.go create mode 100644 lib/bloomfilter/filter_timing_test.go create mode 100644 lib/bloomfilter/limiter.go create mode 100644 lib/bloomfilter/limiter_test.go diff --git a/README.md b/README.md index c57afb2fe..86bc45ed1 100644 --- a/README.md +++ b/README.md @@ -1613,7 +1613,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -http.pathPrefix string An optional prefix to add to all the paths handled by http server. For example, if '-http.pathPrefix=/foo/bar' is set, then all the http requests will be handled on '/foo/bar/*' paths. This may be useful for proxied requests. See https://www.robustperception.io/using-external-urls-and-proxies-with-prometheus -http.shutdownDelay duration - Optional delay before http server shutdown. During this delay, the servier returns non-OK responses from /health page, so load balancers can route new requests to other servers + Optional delay before http server shutdown. During this delay, the server returns non-OK responses from /health page, so load balancers can route new requests to other servers -httpAuth.password string Password for HTTP Basic Auth. The authentication is disabled if -httpAuth.username is empty -httpAuth.username string diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 65c0cac0e..dbcd0c5ab 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -34,7 +34,9 @@ to `vmagent` such as the ability to push metrics instead of pulling them. We did are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as the connection to the remote storage is repaired. The maximum disk usage for the buffer can be limited with `-remoteWrite.maxDiskUsagePerURL`. * Uses lower amounts of RAM, CPU, disk IO and network bandwidth compared with Prometheus. -* Scrape targets can be spread among multiple `vmagent` instances when big number of targets must be scraped. See [these docs](#scraping-big-number-of-targets) for details. +* Scrape targets can be spread among multiple `vmagent` instances when big number of targets must be scraped. See [these docs](#scraping-big-number-of-targets). +* Can efficiently scrape targets that expose millions of time series such as [/federate endpoint in Prometheus](https://prometheus.io/docs/prometheus/latest/federation/). See [these docs](#stream-parsing-mode). +* Can deal with high cardinality and high churn rate issues by limiting the number of unique time series sent to remote storage systems. See [these docs](#cardinality-limiter). ## Quick Start @@ -316,6 +318,22 @@ scrape_configs: server_name: real-server-name ``` +## Cardinality limiter + +By default `vmagent` doesn't limit the number of time series written to remote storage systems specified at `-remoteWrite.url`. The limit can be enforced by setting the following command-line flags: + +* `-remoteWrite.maxHourlySeries` - limits the number of unique time series `vmagent` can write to remote storage systems during the last hour. Useful for limiting the number of active time series. +* `-remoteWrite.maxDailySeries` - limits the number of unique time series `vmagent` can write to remote storage systems during the last day. Useful for limiting daily churn rate. + +Both limits can be set simultaneously. It any of these limits is reached, then new time series are dropped before sending the data to remote storage systems. A sample of dropped series is put in the log with `WARNING` level. + +The exceeded limits can be [monitored](#monitoring) with the following metrics: + +* `vmagent_hourly_series_limit_samples_dropped_total` - the number of metrics dropped due to exceeding hourly limit on the number of unique time series. +* `vmagent_daily_series_limit_samples_dropped_total` - the number of metrics dropped due to exceeding daily limit on the number of unique time series. + +These limits are approximate, so `vmagent` can underflow/overflow the limit by a small percentage (usually less than 1%). + ## Monitoring @@ -343,6 +361,12 @@ It may be useful to perform `vmagent` rolling update without any scrape loss. * We recommend you increase the maximum number of open files in the system (`ulimit -n`) when scraping a big number of targets, as `vmagent` establishes at least a single TCP connection per target. +* If `vmagent` uses too big amounts of memory, then the following options can help: + * Enabling stream parsing. See [these docs](#stream-parsing-mode). + * Reducing the number of output queues with `-remoteWrite.queues` command-line option. + * Reducing the amounts of RAM vmagent can use for in-memory buffering with `-memory.allowedPercent` or `-memory.allowedBytes` command-line option. Another option is to reduce memory limits in Docker and/or Kuberntes if `vmagent` runs under these systems. + * Reducing the number of CPU cores vmagent can use by passing `GOMAXPROCS=N` environment variable to `vmagent`, where `N` is the desired limit on CPU cores. Another option is to reduce CPU limits in Docker or Kubernetes if `vmagent` runs under these systems. + * When `vmagent` scrapes many unreliable targets, it can flood the error log with scrape errors. These errors can be suppressed by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` and `http://vmagent-host:8429/api/v1/targets`. diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index de07aa7b4..c8c037562 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -3,9 +3,13 @@ package remotewrite import ( "flag" "fmt" + "strconv" "sync" "sync/atomic" + "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -43,6 +47,10 @@ var ( `This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. `+ `For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}`+ `Enabled sorting for labels can slow down ingestion performance a bit`) + maxHourlySeries = flag.Int("remoteWrite.maxHourlySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last hour. "+ + "Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxDailySeries") + maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+ + "Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxHourlySeries") ) var rwctxs []*remoteWriteCtx @@ -71,6 +79,12 @@ func Init() { if len(*remoteWriteURLs) == 0 { logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set") } + if *maxHourlySeries > 0 { + hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour) + } + if *maxDailySeries > 0 { + dailySeriesLimiter = bloomfilter.NewLimiter(*maxDailySeries, 24*time.Hour) + } if *queues > maxQueues { *queues = maxQueues } @@ -179,8 +193,11 @@ func Push(wr *prompbmarshal.WriteRequest) { globalRelabelMetricsDropped.Add(tssBlockLen - len(tssBlock)) } sortLabelsIfNeeded(tssBlock) - for _, rwctx := range rwctxs { - rwctx.Push(tssBlock) + tssBlock = limitSeriesCardinality(tssBlock) + if len(tssBlock) > 0 { + for _, rwctx := range rwctxs { + rwctx.Push(tssBlock) + } } if rctx != nil { rctx.reset() @@ -201,6 +218,77 @@ func sortLabelsIfNeeded(tss []prompbmarshal.TimeSeries) { } } +func limitSeriesCardinality(tss []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries { + if hourlySeriesLimiter == nil && dailySeriesLimiter == nil { + return tss + } + dst := make([]prompbmarshal.TimeSeries, 0, len(tss)) + for i := range tss { + labels := tss[i].Labels + h := getLabelsHash(labels) + if hourlySeriesLimiter != nil && !hourlySeriesLimiter.Add(h) { + hourlySeriesLimit.Add(len(tss[i].Samples)) + logSkippedSeries(labels, "-remoteWrite.maxHourlySeries", *maxHourlySeries) + continue + } + if dailySeriesLimiter != nil && !dailySeriesLimiter.Add(h) { + dailySeriesLimit.Add(len(tss[i].Samples)) + logSkippedSeries(labels, "-remoteWrite.maxDailySeries", *maxDailySeries) + continue + } + dst = append(dst, tss[i]) + } + return dst +} + +var ( + hourlySeriesLimiter *bloomfilter.Limiter + dailySeriesLimiter *bloomfilter.Limiter + + hourlySeriesLimit = metrics.NewCounter(`vmagent_hourly_series_limit_samples_dropped_total`) + dailySeriesLimit = metrics.NewCounter(`vmagent_daily_series_limit_samples_dropped_total`) +) + +func getLabelsHash(labels []prompbmarshal.Label) uint64 { + bb := labelsHashBufPool.Get() + b := bb.B[:0] + for _, label := range labels { + b = append(b, label.Name...) + b = append(b, label.Value...) + } + h := xxhash.Sum64(b) + bb.B = b + labelsHashBufPool.Put(bb) + return h +} + +var labelsHashBufPool bytesutil.ByteBufferPool + +func logSkippedSeries(labels []prompbmarshal.Label, flagName string, flagValue int) { + select { + case <-logSkippedSeriesTicker.C: + logger.Warnf("skip series %s because %s=%d reached", labelsToString(labels), flagName, flagValue) + default: + } +} + +var logSkippedSeriesTicker = time.NewTicker(5 * time.Second) + +func labelsToString(labels []prompbmarshal.Label) string { + var b []byte + b = append(b, '{') + for i, label := range labels { + b = append(b, label.Name...) + b = append(b, '=') + b = strconv.AppendQuote(b, label.Value) + if i+1 < len(labels) { + b = append(b, ',') + } + } + b = append(b, '}') + return string(b) +} + var globalRelabelMetricsDropped = metrics.NewCounter("vmagent_remotewrite_global_relabel_metrics_dropped_total") type remoteWriteCtx struct { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 7fe195c18..7097c3fc6 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,7 @@ sort: 15 # CHANGELOG +* FEATURE: vmagent: add ability to limit the number of unique time series, which can be sent to remote storage systems per hour and per day. This can help dealing with high cardinality and high churn rate issues. See [these docs](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter). * FEATURE: vmalert: add ability to run alerting and recording rules for multiple tenants. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/740) and [these docs](https://docs.victoriametrics.com/vmalert.html#multitenancy). * FEATURE: vminsert: add support for data ingestion via other `vminsert` nodes. This allows building multi-level data ingestion paths in VictoriaMetrics cluster by writing data from one level of `vminsert` nodes to another level of `vminsert` nodes. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup) and [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/541#issuecomment-835487858) for details. * FEATURE: vmagent: reload `bearer_token_file`, `credentials_file` and `password_file` contents every second. This allows dynamically changing the contents of these files during target scraping and service discovery without the need to restart `vmagent`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1297). diff --git a/docs/vmagent.md b/docs/vmagent.md index 2d9d25e00..4cc78e836 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -38,7 +38,9 @@ to `vmagent` such as the ability to push metrics instead of pulling them. We did are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as the connection to the remote storage is repaired. The maximum disk usage for the buffer can be limited with `-remoteWrite.maxDiskUsagePerURL`. * Uses lower amounts of RAM, CPU, disk IO and network bandwidth compared with Prometheus. -* Scrape targets can be spread among multiple `vmagent` instances when big number of targets must be scraped. See [these docs](#scraping-big-number-of-targets) for details. +* Scrape targets can be spread among multiple `vmagent` instances when big number of targets must be scraped. See [these docs](#scraping-big-number-of-targets). +* Can efficiently scrape targets that expose millions of time series such as [/federate endpoint in Prometheus](https://prometheus.io/docs/prometheus/latest/federation/). See [these docs](#stream-parsing-mode). +* Can deal with high cardinality and high churn rate issues by limiting the number of unique time series sent to remote storage systems. See [these docs](#cardinality-limiter). ## Quick Start @@ -320,6 +322,22 @@ scrape_configs: server_name: real-server-name ``` +## Cardinality limiter + +By default `vmagent` doesn't limit the number of time series written to remote storage systems specified at `-remoteWrite.url`. The limit can be enforced by setting the following command-line flags: + +* `-remoteWrite.maxHourlySeries` - limits the number of unique time series `vmagent` can write to remote storage systems during the last hour. Useful for limiting the number of active time series. +* `-remoteWrite.maxDailySeries` - limits the number of unique time series `vmagent` can write to remote storage systems during the last day. Useful for limiting daily churn rate. + +Both limits can be set simultaneously. It any of these limits is reached, then new time series are dropped before sending the data to remote storage systems. A sample of dropped series is put in the log with `WARNING` level. + +The exceeded limits can be [monitored](#monitoring) with the following metrics: + +* `vmagent_hourly_series_limit_samples_dropped_total` - the number of metrics dropped due to exceeding hourly limit on the number of unique time series. +* `vmagent_daily_series_limit_samples_dropped_total` - the number of metrics dropped due to exceeding daily limit on the number of unique time series. + +These limits are approximate, so `vmagent` can underflow/overflow the limit by a small percentage (usually less than 1%). + ## Monitoring @@ -347,6 +365,12 @@ It may be useful to perform `vmagent` rolling update without any scrape loss. * We recommend you increase the maximum number of open files in the system (`ulimit -n`) when scraping a big number of targets, as `vmagent` establishes at least a single TCP connection per target. +* If `vmagent` uses too big amounts of memory, then the following options can help: + * Enabling stream parsing. See [these docs](#stream-parsing-mode). + * Reducing the number of output queues with `-remoteWrite.queues` command-line option. + * Reducing the amounts of RAM vmagent can use for in-memory buffering with `-memory.allowedPercent` or `-memory.allowedBytes` command-line option. Another option is to reduce memory limits in Docker and/or Kuberntes if `vmagent` runs under these systems. + * Reducing the number of CPU cores vmagent can use by passing `GOMAXPROCS=N` environment variable to `vmagent`, where `N` is the desired limit on CPU cores. Another option is to reduce CPU limits in Docker or Kubernetes if `vmagent` runs under these systems. + * When `vmagent` scrapes many unreliable targets, it can flood the error log with scrape errors. These errors can be suppressed by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` and `http://vmagent-host:8429/api/v1/targets`. diff --git a/lib/bloomfilter/filter.go b/lib/bloomfilter/filter.go new file mode 100644 index 000000000..07a561bb5 --- /dev/null +++ b/lib/bloomfilter/filter.go @@ -0,0 +1,90 @@ +package bloomfilter + +import ( + "sync/atomic" + "unsafe" + + "github.com/cespare/xxhash/v2" +) + +const hashesCount = 4 +const bitsPerItem = 16 + +type filter struct { + maxItems int + bits []uint64 +} + +func newFilter(maxItems int) *filter { + bitsCount := maxItems * bitsPerItem + bits := make([]uint64, (bitsCount+63)/64) + return &filter{ + maxItems: maxItems, + bits: bits, + } +} + +// Reset resets f to initial state. +// +// It is expected no other goroutines call f methods during Reset call. +func (f *filter) Reset() { + bits := f.bits + for i := range bits { + bits[i] = 0 + } +} + +// Has checks whether h presents in f. +// +// Has can be called from concurrent goroutines. +func (f *filter) Has(h uint64) bool { + bits := f.bits + maxBits := uint64(len(bits)) * 64 + bp := (*[8]byte)(unsafe.Pointer(&h)) + b := bp[:] + for i := 0; i < hashesCount; i++ { + hi := xxhash.Sum64(b) + h++ + idx := hi % maxBits + i := idx / 64 + j := idx % 64 + mask := uint64(1) << j + w := atomic.LoadUint64(&bits[i]) + if (w & mask) == 0 { + return false + } + } + return true +} + +// Add adds h to f. +// +// True is returned if h was missing in f. +// +// Add can be called from concurrent goroutines. +// If the same h is added to f from concurrent goroutines, then both goroutines may return true. +func (f *filter) Add(h uint64) bool { + bits := f.bits + maxBits := uint64(len(bits)) * 64 + bp := (*[8]byte)(unsafe.Pointer(&h)) + b := bp[:] + isNew := false + for i := 0; i < hashesCount; i++ { + hi := xxhash.Sum64(b) + h++ + idx := hi % maxBits + i := idx / 64 + j := idx % 64 + mask := uint64(1) << j + w := atomic.LoadUint64(&bits[i]) + for (w & mask) == 0 { + wNew := w | mask + if atomic.CompareAndSwapUint64(&bits[i], w, wNew) { + isNew = true + break + } + w = atomic.LoadUint64(&bits[i]) + } + } + return isNew +} diff --git a/lib/bloomfilter/filter_test.go b/lib/bloomfilter/filter_test.go new file mode 100644 index 000000000..1ba86c723 --- /dev/null +++ b/lib/bloomfilter/filter_test.go @@ -0,0 +1,101 @@ +package bloomfilter + +import ( + "fmt" + "math/rand" + "testing" + "time" +) + +func TestFilter(t *testing.T) { + for _, maxItems := range []int{1e0, 1e1, 1e2, 1e3, 1e4, 1e5} { + testFilter(t, maxItems) + } +} + +func testFilter(t *testing.T, maxItems int) { + r := rand.New(rand.NewSource(int64(0))) + f := newFilter(maxItems) + items := make(map[uint64]struct{}, maxItems) + + // Populate f with maxItems + collisions := 0 + for i := 0; i < maxItems; i++ { + h := r.Uint64() + items[h] = struct{}{} + if !f.Add(h) { + collisions++ + } + if f.Add(h) { + t.Fatalf("unexpected double addition of item %d on iteration %d for maxItems %d", h, i, maxItems) + } + } + p := float64(collisions) / float64(maxItems) + if p > 0.0006 { + t.Fatalf("too big collision share for maxItems=%d: %.5f, collisions: %d", maxItems, p, collisions) + } + + // Verify that the added items exist in f. + i := 0 + for h := range items { + if !f.Has(h) { + t.Fatalf("cannot find item %d on iteration %d for maxItems %d", h, i, maxItems) + } + i++ + } + + // Verify false hits rate. + i = 0 + falseHits := 0 + for i < maxItems { + h := r.Uint64() + if _, ok := items[h]; ok { + continue + } + i++ + if f.Has(h) { + falseHits++ + } + } + p = float64(falseHits) / float64(maxItems) + if p > 0.003 { + t.Fatalf("too big false hits share for maxItems=%d: %.5f, falseHits: %d", maxItems, p, falseHits) + } + + // Check filter reset + f.Reset() + for i := 0; i < maxItems; i++ { + h := r.Uint64() + if f.Has(h) { + t.Fatalf("unexpected item found in empty filter: %d", h) + } + } +} + +func TestFilterConcurrent(t *testing.T) { + concurrency := 3 + maxItems := 10000 + doneCh := make(chan struct{}, concurrency) + f := newFilter(maxItems) + for i := 0; i < concurrency; i++ { + go func(randSeed int) { + r := rand.New(rand.NewSource(int64(randSeed))) + for i := 0; i < maxItems; i++ { + h := r.Uint64() + f.Add(h) + if !f.Has(h) { + panic(fmt.Errorf("the item %d must exist", h)) + } + } + doneCh <- struct{}{} + }(i) + } + tC := time.After(time.Second * 5) + for i := 0; i < concurrency; i++ { + select { + case <-doneCh: + case <-tC: + t.Fatalf("timeout!") + } + } +} diff --git a/lib/bloomfilter/filter_timing_test.go b/lib/bloomfilter/filter_timing_test.go new file mode 100644 index 000000000..3c3d12148 --- /dev/null +++ b/lib/bloomfilter/filter_timing_test.go @@ -0,0 +1,82 @@ +package bloomfilter + +import ( + "fmt" + "testing" +) + +func BenchmarkFilterAdd(b *testing.B) { + for _, maxItems := range []int{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("maxItems=%d", maxItems), func(b *testing.B) { + benchmarkFilterAdd(b, maxItems) + }) + } +} + +func benchmarkFilterAdd(b *testing.B, maxItems int) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + f := newFilter(maxItems) + for pb.Next() { + h := uint64(0) + for i := 0; i < 10000; i++ { + h += uint64(maxItems) + f.Add(h) + } + f.Reset() + } + }) +} + +func BenchmarkFilterHasHit(b *testing.B) { + for _, maxItems := range []int{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("maxItems=%d", maxItems), func(b *testing.B) { + benchmarkFilterHasHit(b, maxItems) + }) + } +} + +func benchmarkFilterHasHit(b *testing.B, maxItems int) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + f := newFilter(maxItems) + h := uint64(0) + for i := 0; i < 10000; i++ { + h += uint64(maxItems) + f.Add(h) + } + for pb.Next() { + h = 0 + for i := 0; i < 10000; i++ { + h += uint64(maxItems) + if !f.Has(h) { + panic(fmt.Errorf("missing item %d", h)) + } + } + } + }) +} + +func BenchmarkFilterHasMiss(b *testing.B) { + for _, maxItems := range []int{1e3, 1e4, 1e5, 1e6, 1e7} { + b.Run(fmt.Sprintf("maxItems=%d", maxItems), func(b *testing.B) { + benchmarkFilterHasMiss(b, maxItems) + }) + } +} + +func benchmarkFilterHasMiss(b *testing.B, maxItems int) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + f := newFilter(maxItems) + for pb.Next() { + h := uint64(0) + for i := 0; i < 10000; i++ { + h += uint64(maxItems) + if f.Has(h) { + panic(fmt.Errorf("unexpected item %d", h)) + } + } + } + }) +} diff --git a/lib/bloomfilter/limiter.go b/lib/bloomfilter/limiter.go new file mode 100644 index 000000000..edaa75fd3 --- /dev/null +++ b/lib/bloomfilter/limiter.go @@ -0,0 +1,59 @@ +package bloomfilter + +import ( + "sync/atomic" + "time" +) + +// Limiter limits the number of added items. +// +// It is safe using the Limiter from concurrent goroutines. +type Limiter struct { + v atomic.Value +} + +// NewLimiter creates new Limiter, which can hold up to maxItems unique items during the given refreshInterval. +func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter { + var l Limiter + l.v.Store(newLimiter(maxItems)) + go func() { + for { + time.Sleep(refreshInterval) + l.v.Store(newLimiter(maxItems)) + } + }() + return &l +} + +// Add adds h to the limiter. +// +// It is safe calling Add from concurrent goroutines. +// +// True is returned if h is added or already exists in l. +// False is returned if h cannot be added to l, since it already has maxItems unique items. +func (l *Limiter) Add(h uint64) bool { + lm := l.v.Load().(*limiter) + return lm.Add(h) +} + +type limiter struct { + currentItems uint64 + f *filter +} + +func newLimiter(maxItems int) *limiter { + return &limiter{ + f: newFilter(maxItems), + } +} + +func (l *limiter) Add(h uint64) bool { + currentItems := atomic.LoadUint64(&l.currentItems) + if currentItems >= uint64(l.f.maxItems) { + return l.f.Has(h) + } + if l.f.Add(h) { + atomic.AddUint64(&l.currentItems, 1) + } + return true +} diff --git a/lib/bloomfilter/limiter_test.go b/lib/bloomfilter/limiter_test.go new file mode 100644 index 000000000..34cf3385b --- /dev/null +++ b/lib/bloomfilter/limiter_test.go @@ -0,0 +1,90 @@ +package bloomfilter + +import ( + "fmt" + "math/rand" + "testing" + "time" +) + +func TestLimiter(t *testing.T) { + for _, maxItems := range []int{1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6} { + testLimiter(t, maxItems) + } +} + +func testLimiter(t *testing.T, maxItems int) { + r := rand.New(rand.NewSource(int64(0))) + l := NewLimiter(maxItems, time.Hour) + items := make(map[uint64]struct{}, maxItems) + + // Populate the l with new items. + for i := 0; i < maxItems; i++ { + h := r.Uint64() + if !l.Add(h) { + t.Fatalf("cannot add item %d on iteration %d out of %d", h, i, maxItems) + } + items[h] = struct{}{} + } + + // Verify that already registered items can be added. + i := 0 + for h := range items { + if !l.Add(h) { + t.Fatalf("cannot add already existing item %d on iteration %d out of %d", h, i, maxItems) + } + i++ + } + + // Verify that new items are rejected with high probability. + falseAdditions := 0 + for i := 0; i < maxItems; i++ { + h := r.Uint64() + if l.Add(h) { + falseAdditions++ + } + } + p := float64(falseAdditions) / float64(maxItems) + if p > 0.003 { + t.Fatalf("too big false additions share=%.5f: %d out of %d", p, falseAdditions, maxItems) + } +} + +func TestLimiterConcurrent(t *testing.T) { + concurrency := 3 + maxItems := 10000 + l := NewLimiter(maxItems, time.Hour) + doneCh := make(chan struct{}, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + r := rand.New(rand.NewSource(0)) + for i := 0; i < maxItems; i++ { + h := r.Uint64() + // Do not check whether the item is added, since less than maxItems can be added to l + // due to passible (expected) race in l.f.Add + l.Add(h) + } + // Verify that new items are rejected with high probability. + falseAdditions := 0 + for i := 0; i < maxItems; i++ { + h := r.Uint64() + if l.Add(h) { + falseAdditions++ + } + } + p := float64(falseAdditions) / float64(maxItems) + if p > 0.003 { + panic(fmt.Errorf("too big false additions share=%.5f: %d out of %d", p, falseAdditions, maxItems)) + } + doneCh <- struct{}{} + }() + } + tC := time.After(time.Second * 5) + for i := 0; i < concurrency; i++ { + select { + case <-doneCh: + case <-tC: + t.Fatalf("timeout!") + } + } +}