VictoriaMetrics/lib/writeconcurrencylimiter/concurrencylimiter.go
Aliaksandr Valialkin b275983403
lib/writeconcurrencylimiter: improve the logic behind -maxConcurrentInserts limit
Previously the -maxConcurrentInserts was limiting the number of established client connections,
which write data to VictoriaMetrics. Some of these connections could be idle.
Such connections do not consume big amounts of CPU and RAM, so there is a little sense in limiting
the number of such connections. So now the -maxConcurrentInserts command-line option
limits the number of concurrently executed insert requests, not including idle connections.

It is recommended removing -maxConcurrentInserts command-line option, since the default value
for this option should work good for most cases.
2023-01-06 22:07:16 -08:00

133 lines
3.5 KiB
Go

package writeconcurrencylimiter
import (
"flag"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
)
var (
maxConcurrentInserts = flag.Int("maxConcurrentInserts", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent insert requests. "+
"Default value should work for most cases, since it minimizes the overhead. See also -insert.maxQueueDuration")
maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration to wait in the queue when -maxConcurrentInserts "+
"concurrent insert requests are already executed")
)
// Reader is a reader, which increases the concurrency after the first Read() call
//
// The concurrency can be reduced by calling DecConcurrency().
// Then the concurrency is increased after the next Read() call.
type Reader struct {
r io.Reader
increasedConcurrency bool
}
// GetReader returns the Reader for r.
//
// The PutReader() must be called when the returned Reader is no longer needed.
func GetReader(r io.Reader) *Reader {
v := readerPool.Get()
if v == nil {
return &Reader{
r: r,
}
}
rr := v.(*Reader)
rr.r = r
return rr
}
// PutReader returns the r to the pool.
//
// It decreases the concurrency if r has increased concurrency.
func PutReader(r *Reader) {
r.DecConcurrency()
r.r = nil
readerPool.Put(r)
}
var readerPool sync.Pool
// Read implements io.Reader.
//
// It increases concurrency after the first call or after the next call after DecConcurrency() call.
func (r *Reader) Read(p []byte) (int, error) {
n, err := r.r.Read(p)
if !r.increasedConcurrency {
if err := incConcurrency(); err != nil {
return 0, err
}
r.increasedConcurrency = true
}
return n, err
}
// DecConcurrency decreases the concurrency, so it could be increased again after the next Read() call.
func (r *Reader) DecConcurrency() {
if r.increasedConcurrency {
decConcurrency()
r.increasedConcurrency = false
}
}
func initConcurrencyLimitCh() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrentInserts)
}
var (
concurrencyLimitCh chan struct{}
concurrencyLimitChOnce sync.Once
)
func incConcurrency() error {
concurrencyLimitChOnce.Do(initConcurrencyLimitCh)
select {
case concurrencyLimitCh <- struct{}{}:
return nil
default:
}
concurrencyLimitReached.Inc()
t := timerpool.Get(*maxQueueDuration)
select {
case concurrencyLimitCh <- struct{}{}:
timerpool.Put(t)
return nil
case <-t.C:
timerpool.Put(t)
concurrencyLimitTimeout.Inc()
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are already executed. "+
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
maxQueueDuration.Seconds(), *maxConcurrentInserts),
StatusCode: http.StatusServiceUnavailable,
}
}
}
func decConcurrency() {
<-concurrencyLimitCh
}
var (
concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_insert_limit_reached_total`)
concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_insert_limit_timeout_total`)
_ = metrics.NewGauge(`vm_concurrent_insert_capacity`, func() float64 {
return float64(cap(concurrencyLimitCh))
})
_ = metrics.NewGauge(`vm_concurrent_insert_current`, func() float64 {
return float64(len(concurrencyLimitCh))
})
)