2020-02-23 12:35:47 +01:00
package writeconcurrencylimiter
2019-05-22 23:16:55 +02:00
import (
2019-05-29 11:35:47 +02:00
"flag"
2019-05-22 23:16:55 +02:00
"fmt"
2023-01-07 03:59:39 +01:00
"io"
2019-08-23 08:46:45 +02:00
"net/http"
2023-01-07 03:59:39 +01:00
"sync"
2019-05-22 23:16:55 +02:00
"time"
2019-05-28 16:17:19 +02:00
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2019-08-23 08:46:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-05-28 16:17:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2019-05-29 11:35:47 +02:00
"github.com/VictoriaMetrics/metrics"
2019-05-22 23:16:55 +02:00
)
var (
2023-01-07 03:59:39 +01:00
maxConcurrentInserts = flag . Int ( "maxConcurrentInserts" , 2 * cgroup . AvailableCPUs ( ) , "The maximum number of concurrent insert requests. " +
2023-01-07 09:11:44 +01:00
"Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. " +
"See also -insert.maxQueueDuration" )
2023-01-07 03:59:39 +01:00
maxQueueDuration = flag . Duration ( "insert.maxQueueDuration" , time . Minute , "The maximum duration to wait in the queue when -maxConcurrentInserts " +
2023-01-07 09:11:44 +01:00
"concurrent insert requests are executed" )
2019-05-22 23:16:55 +02:00
)
2023-01-07 03:59:39 +01:00
// Reader is a reader, which increases the concurrency after the first Read() call
//
// The concurrency can be reduced by calling DecConcurrency().
// Then the concurrency is increased after the next Read() call.
type Reader struct {
r io . Reader
increasedConcurrency bool
}
// GetReader returns the Reader for r.
//
// The PutReader() must be called when the returned Reader is no longer needed.
func GetReader ( r io . Reader ) * Reader {
v := readerPool . Get ( )
if v == nil {
return & Reader {
r : r ,
}
}
rr := v . ( * Reader )
rr . r = r
return rr
}
// PutReader returns the r to the pool.
//
// It decreases the concurrency if r has increased concurrency.
func PutReader ( r * Reader ) {
r . DecConcurrency ( )
r . r = nil
readerPool . Put ( r )
}
var readerPool sync . Pool
2020-01-17 12:24:37 +01:00
2023-01-07 03:59:39 +01:00
// Read implements io.Reader.
2019-05-29 11:35:47 +02:00
//
2023-01-07 03:59:39 +01:00
// It increases concurrency after the first call or after the next call after DecConcurrency() call.
func ( r * Reader ) Read ( p [ ] byte ) ( int , error ) {
n , err := r . r . Read ( p )
if ! r . increasedConcurrency {
2023-01-07 08:45:54 +01:00
if ! incConcurrency ( ) {
err = & httpserver . ErrorWithStatusCode {
2023-01-07 09:11:44 +01:00
Err : fmt . Errorf ( "cannot process insert request for %.3f seconds because %d concurrent insert requests are executed. " +
2023-01-07 08:45:54 +01:00
"Possible solutions: to reduce workload; to increase compute resources at the server; " +
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts" ,
maxQueueDuration . Seconds ( ) , * maxConcurrentInserts ) ,
StatusCode : http . StatusServiceUnavailable ,
}
2023-01-07 03:59:39 +01:00
return 0 , err
}
r . increasedConcurrency = true
}
return n , err
}
// DecConcurrency decreases the concurrency, so it could be increased again after the next Read() call.
func ( r * Reader ) DecConcurrency ( ) {
if r . increasedConcurrency {
decConcurrency ( )
r . increasedConcurrency = false
}
}
func initConcurrencyLimitCh ( ) {
concurrencyLimitCh = make ( chan struct { } , * maxConcurrentInserts )
2019-05-29 11:35:47 +02:00
}
2023-01-07 03:59:39 +01:00
var (
concurrencyLimitCh chan struct { }
concurrencyLimitChOnce sync . Once
)
2023-01-07 08:45:54 +01:00
func incConcurrency ( ) bool {
2023-01-07 03:59:39 +01:00
concurrencyLimitChOnce . Do ( initConcurrencyLimitCh )
2019-08-05 17:27:50 +02:00
select {
2023-01-07 03:59:39 +01:00
case concurrencyLimitCh <- struct { } { } :
2023-01-07 08:45:54 +01:00
return true
2019-08-05 17:27:50 +02:00
default :
}
concurrencyLimitReached . Inc ( )
2020-01-17 12:24:37 +01:00
t := timerpool . Get ( * maxQueueDuration )
2019-05-22 23:16:55 +02:00
select {
2023-01-07 03:59:39 +01:00
case concurrencyLimitCh <- struct { } { } :
2019-05-28 16:17:19 +02:00
timerpool . Put ( t )
2023-01-07 08:45:54 +01:00
return true
2019-05-22 23:16:55 +02:00
case <- t . C :
2019-05-28 16:17:19 +02:00
timerpool . Put ( t )
2019-08-05 17:27:50 +02:00
concurrencyLimitTimeout . Inc ( )
2023-01-07 08:45:54 +01:00
return false
2019-05-22 23:16:55 +02:00
}
}
2019-05-29 11:35:47 +02:00
2023-01-07 03:59:39 +01:00
func decConcurrency ( ) {
<- concurrencyLimitCh
}
2019-08-05 17:27:50 +02:00
var (
concurrencyLimitReached = metrics . NewCounter ( ` vm_concurrent_insert_limit_reached_total ` )
concurrencyLimitTimeout = metrics . NewCounter ( ` vm_concurrent_insert_limit_timeout_total ` )
_ = metrics . NewGauge ( ` vm_concurrent_insert_capacity ` , func ( ) float64 {
2023-02-07 19:20:41 +01:00
concurrencyLimitChOnce . Do ( initConcurrencyLimitCh )
2023-01-07 03:59:39 +01:00
return float64 ( cap ( concurrencyLimitCh ) )
2019-08-05 17:27:50 +02:00
} )
_ = metrics . NewGauge ( ` vm_concurrent_insert_current ` , func ( ) float64 {
2023-02-07 19:20:41 +01:00
concurrencyLimitChOnce . Do ( initConcurrencyLimitCh )
2023-01-07 03:59:39 +01:00
return float64 ( len ( concurrencyLimitCh ) )
2019-08-05 17:27:50 +02:00
} )
)