2020-02-23 12:35:47 +01:00
package writeconcurrencylimiter
2019-05-22 23:16:55 +02:00
import (
2019-05-29 11:35:47 +02:00
"flag"
2019-05-22 23:16:55 +02:00
"fmt"
2019-08-23 08:46:45 +02:00
"net/http"
2019-05-22 23:16:55 +02:00
"time"
2019-05-28 16:17:19 +02:00
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2019-08-23 08:46:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-05-28 16:17:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2019-05-29 11:35:47 +02:00
"github.com/VictoriaMetrics/metrics"
2019-05-22 23:16:55 +02:00
)
var (
2020-12-08 19:49:32 +01:00
maxConcurrentInserts = flag . Int ( "maxConcurrentInserts" , cgroup . AvailableCPUs ( ) * 4 , "The maximum number of concurrent inserts. Default value should work for most cases, " +
2020-04-20 20:02:54 +02:00
"since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration" )
maxQueueDuration = flag . Duration ( "insert.maxQueueDuration" , time . Minute , "The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts" )
2019-05-22 23:16:55 +02:00
)
2020-01-17 12:24:37 +01:00
// ch is the channel for limiting concurrent calls to Do.
var ch chan struct { }
2019-05-29 11:35:47 +02:00
// Init initializes concurrencylimiter.
//
// Init must be called after flag.Parse call.
func Init ( ) {
ch = make ( chan struct { } , * maxConcurrentInserts )
}
2019-05-22 23:16:55 +02:00
// Do calls f with the limited concurrency.
func Do ( f func ( ) error ) error {
2019-05-29 11:35:47 +02:00
// Limit the number of conurrent f calls in order to prevent from excess
2019-05-22 23:16:55 +02:00
// memory usage and CPU trashing.
2019-08-05 17:27:50 +02:00
select {
case ch <- struct { } { } :
err := f ( )
<- ch
return err
default :
}
// All the workers are busy.
2020-01-17 12:24:37 +01:00
// Sleep for up to *maxQueueDuration.
2019-08-05 17:27:50 +02:00
concurrencyLimitReached . Inc ( )
2020-01-17 12:24:37 +01:00
t := timerpool . Get ( * maxQueueDuration )
2019-05-22 23:16:55 +02:00
select {
case ch <- struct { } { } :
2019-05-28 16:17:19 +02:00
timerpool . Put ( t )
2019-05-22 23:16:55 +02:00
err := f ( )
<- ch
return err
case <- t . C :
2019-05-28 16:17:19 +02:00
timerpool . Put ( t )
2019-08-05 17:27:50 +02:00
concurrencyLimitTimeout . Inc ( )
2019-08-23 08:46:45 +02:00
return & httpserver . ErrorWithStatusCode {
2020-01-17 12:24:37 +01:00
Err : fmt . Errorf ( "cannot handle more than %d concurrent inserts during %s; possible solutions: " +
2020-02-23 12:35:47 +01:00
"increase `-insert.maxQueueDuration`, increase `-maxConcurrentInserts`, increase server capacity" , * maxConcurrentInserts , * maxQueueDuration ) ,
2019-08-23 08:46:45 +02:00
StatusCode : http . StatusServiceUnavailable ,
}
2019-05-22 23:16:55 +02:00
}
}
2019-05-29 11:35:47 +02:00
2019-08-05 17:27:50 +02:00
var (
concurrencyLimitReached = metrics . NewCounter ( ` vm_concurrent_insert_limit_reached_total ` )
concurrencyLimitTimeout = metrics . NewCounter ( ` vm_concurrent_insert_limit_timeout_total ` )
_ = metrics . NewGauge ( ` vm_concurrent_insert_capacity ` , func ( ) float64 {
return float64 ( cap ( ch ) )
} )
_ = metrics . NewGauge ( ` vm_concurrent_insert_current ` , func ( ) float64 {
return float64 ( len ( ch ) )
} )
)