app/vmselect/netstorage: remove duplicate limiter on concurrent queries

It duplicates the `-search.maxConcurrentRequests` limiter.
This commit is contained in:
Aliaksandr Valialkin 2021-05-24 19:11:35 +03:00
parent 25ed1f0c4f
commit a0b001bfec

View File

@ -1449,8 +1449,8 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co
type storageNode struct { type storageNode struct {
connPool *netutil.ConnPool connPool *netutil.ConnPool
// The channel for limiting the maximum number of concurrent queries to storageNode. // The number of concurrent queries to storageNode.
concurrentQueriesCh chan struct{} concurrentQueries *metrics.Counter
// The number of RegisterMetricNames requests to storageNode. // The number of RegisterMetricNames requests to storageNode.
registerMetricNamesRequests *metrics.Counter registerMetricNamesRequests *metrics.Counter
@ -1750,14 +1750,8 @@ func (sn *storageNode) execOnConnWithPossibleRetry(funcName string, f func(bc *h
} }
func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error { func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error {
select { sn.concurrentQueries.Inc()
case sn.concurrentQueriesCh <- struct{}{}: defer sn.concurrentQueries.Dec()
default:
return fmt.Errorf("too many concurrent queries (more than %d)", cap(sn.concurrentQueriesCh))
}
defer func() {
<-sn.concurrentQueriesCh
}()
d := time.Unix(int64(deadline.Deadline()), 0) d := time.Unix(int64(deadline.Deadline()), 0)
nowSecs := fasttime.UnixTimestamp() nowSecs := fasttime.UnixTimestamp()
@ -2457,7 +2451,7 @@ func InitStorageNodes(addrs []string) {
// There is no need in requests compression, since they are usually very small. // There is no need in requests compression, since they are usually very small.
connPool: netutil.NewConnPool("vmselect", addr, handshake.VMSelectClient, 0), connPool: netutil.NewConnPool("vmselect", addr, handshake.VMSelectClient, 0),
concurrentQueriesCh: make(chan struct{}, maxConcurrentQueriesPerStorageNode), concurrentQueries: metrics.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),
registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)), registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
registerMetricNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)), registerMetricNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
@ -2488,9 +2482,6 @@ func InitStorageNodes(addrs []string) {
metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)), metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)), metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
} }
metrics.NewGauge(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr), func() float64 {
return float64(len(sn.concurrentQueriesCh))
})
storageNodes = append(storageNodes, sn) storageNodes = append(storageNodes, sn)
} }
} }
@ -2513,9 +2504,6 @@ var (
partialSearchResults = metrics.NewCounter(`vm_partial_results_total{type="search", name="vmselect"}`) partialSearchResults = metrics.NewCounter(`vm_partial_results_total{type="search", name="vmselect"}`)
) )
// The maximum number of concurrent queries per storageNode.
const maxConcurrentQueriesPerStorageNode = 100
func applyGraphiteRegexpFilter(filter string, ss []string) ([]string, error) { func applyGraphiteRegexpFilter(filter string, ss []string) ([]string, error) {
// Anchor filter regexp to the beginning of the string as Graphite does. // Anchor filter regexp to the beginning of the string as Graphite does.
// See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/localdatabase.py#L157 // See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/localdatabase.py#L157