diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index fa751f5666..0f381d7f97 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1449,8 +1449,8 @@ func (snr *storageNodesRequest) collectResults(partialResultsCounter *metrics.Co type storageNode struct { connPool *netutil.ConnPool - // The channel for limiting the maximum number of concurrent queries to storageNode. - concurrentQueriesCh chan struct{} + // The number of concurrent queries to storageNode. + concurrentQueries *metrics.Counter // The number of RegisterMetricNames requests to storageNode. registerMetricNamesRequests *metrics.Counter @@ -1750,14 +1750,8 @@ func (sn *storageNode) execOnConnWithPossibleRetry(funcName string, f func(bc *h } func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error { - select { - case sn.concurrentQueriesCh <- struct{}{}: - default: - return fmt.Errorf("too many concurrent queries (more than %d)", cap(sn.concurrentQueriesCh)) - } - defer func() { - <-sn.concurrentQueriesCh - }() + sn.concurrentQueries.Inc() + defer sn.concurrentQueries.Dec() d := time.Unix(int64(deadline.Deadline()), 0) nowSecs := fasttime.UnixTimestamp() @@ -2457,7 +2451,7 @@ func InitStorageNodes(addrs []string) { // There is no need in requests compression, since they are usually very small. connPool: netutil.NewConnPool("vmselect", addr, handshake.VMSelectClient, 0), - concurrentQueriesCh: make(chan struct{}, maxConcurrentQueriesPerStorageNode), + concurrentQueries: metrics.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)), registerMetricNamesRequests: metrics.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)), registerMetricNamesErrors: metrics.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)), @@ -2488,9 +2482,6 @@ func InitStorageNodes(addrs []string) { metricBlocksRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)), metricRowsRead: metrics.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)), } - metrics.NewGauge(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr), func() float64 { - return float64(len(sn.concurrentQueriesCh)) - }) storageNodes = append(storageNodes, sn) } } @@ -2513,9 +2504,6 @@ var ( partialSearchResults = metrics.NewCounter(`vm_partial_results_total{type="search", name="vmselect"}`) ) -// The maximum number of concurrent queries per storageNode. -const maxConcurrentQueriesPerStorageNode = 100 - func applyGraphiteRegexpFilter(filter string, ss []string) ([]string, error) { // Anchor filter regexp to the beginning of the string as Graphite does. // See https://github.com/graphite-project/graphite-web/blob/3ad279df5cb90b211953e39161df416e54a84948/webapp/graphite/tags/localdatabase.py#L157