lib/handshake: add SetReadDeadline and SetWriteDeadline implementations additionally to SetDeadline

This is a follow-up for 27a5461785

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5327
This commit is contained in:
Aliaksandr Valialkin 2023-11-16 16:43:15 +01:00
parent 27a5461785
commit ef80a89a24
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
2 changed files with 33 additions and 7 deletions

View File

@ -31,6 +31,7 @@ The sandbox cluster installation is running under the constant load generated by
* FEATURE: dashboards: use `version` instead of `short_version` in version change annotation for single/cluster dashboards. The update should reflect version changes even if different flavours of the same release were applied (custom builds). * FEATURE: dashboards: use `version` instead of `short_version` in version change annotation for single/cluster dashboards. The update should reflect version changes even if different flavours of the same release were applied (custom builds).
* BUGFIX: fix a bug, which could result in improper results and/or to `cannot merge series: duplicate series found` error during [range query](https://docs.victoriametrics.com/keyConcepts.html#range-query) execution. The issue has been introduced in [v1.95.0](https://docs.victoriametrics.com/CHANGELOG.html#v1950). See [this bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5332) for details. * BUGFIX: fix a bug, which could result in improper results and/or to `cannot merge series: duplicate series found` error during [range query](https://docs.victoriametrics.com/keyConcepts.html#range-query) execution. The issue has been introduced in [v1.95.0](https://docs.victoriametrics.com/CHANGELOG.html#v1950). See [this bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5332) for details.
* BUGFIX: improve deadline detection when using buffered connection for communication between cluster components. Before, due to nature of a buffered connection the deadline could have been exceeded while reading or writing buffered data to connection. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5327).
## [v1.95.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.0) ## [v1.95.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.0)
@ -101,7 +102,6 @@ Released at 2023-11-15
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): increment `vmalert_remotewrite_errors_total` metric if all retries to send remote-write request failed. Before, this metric was incremented only if remote-write client's buffer is overloaded. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): increment `vmalert_remotewrite_errors_total` metric if all retries to send remote-write request failed. Before, this metric was incremented only if remote-write client's buffer is overloaded.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): increment `vmalert_remotewrite_dropped_rows_total` and `vmalert_remotewrite_dropped_bytes_total` metrics if remote-write client's buffer is overloaded. Before, these metrics were incremented only after unsuccessful HTTP calls. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): increment `vmalert_remotewrite_dropped_rows_total` and `vmalert_remotewrite_dropped_bytes_total` metrics if remote-write client's buffer is overloaded. Before, these metrics were incremented only after unsuccessful HTTP calls.
* BUGFIX: `vmselect`: improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087). * BUGFIX: `vmselect`: improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087).
* BUGFIX: improve deadline detection when using buffered connection for communication between cluster components. Before, due to nature of a buffered connection the deadline could have been exceeded while reading or writing data to connection.
* BUGFIX: dashboards: fix vminsert/vmstorage/vmselect metrics filtering when dashboard is used to display data from many sub-clusters with unique job names. Before, only one specific job could have been accounted for component-specific panels, instead of all available jobs for the component. * BUGFIX: dashboards: fix vminsert/vmstorage/vmselect metrics filtering when dashboard is used to display data from many sub-clusters with unique job names. Before, only one specific job could have been accounted for component-specific panels, instead of all available jobs for the component.
* BUGFIX: dashboards: respect `job` and `instance` filters for `alerts` annotation in cluster and single-node dashboards. * BUGFIX: dashboards: respect `job` and `instance` filters for `alerts` annotation in cluster and single-node dashboards.
* BUGFIX: dashboards: update description for RSS and anonymous memory panels to be consistent for single-node, cluster and vmagent dashboards. * BUGFIX: dashboards: update description for RSS and anonymous memory panels to be consistent for single-node, cluster and vmagent dashboards.

View File

@ -23,7 +23,8 @@ type BufferedConn struct {
br io.Reader br io.Reader
bw bufferedWriter bw bufferedWriter
deadline time.Time readDeadline time.Time
writeDeadline time.Time
} }
const bufferSize = 64 * 1024 const bufferSize = 64 * 1024
@ -46,17 +47,35 @@ func newBufferedConn(c net.Conn, compressionLevel int, isReadCompressed bool) *B
return bc return bc
} }
// SetDeadline sets the read and write deadlines associated with the connection. // SetDeadline sets read and write deadlines for bc to t.
// Deadline is checked on each Read call. //
// Deadline is checked on each Read and Write call.
func (bc *BufferedConn) SetDeadline(t time.Time) error { func (bc *BufferedConn) SetDeadline(t time.Time) error {
bc.deadline = t bc.readDeadline = t
bc.writeDeadline = t
return bc.Conn.SetDeadline(t) return bc.Conn.SetDeadline(t)
} }
// SetReadDeadline sets read deadline for bc to t.
//
// Deadline is checked on each Read call.
func (bc *BufferedConn) SetReadDeadline(t time.Time) error {
bc.readDeadline = t
return bc.Conn.SetReadDeadline(t)
}
// SetWriteDeadline sets write deadline for bc to t.
//
// Deadline is checked on each Write call.
func (bc *BufferedConn) SetWriteDeadline(t time.Time) error {
bc.writeDeadline = t
return bc.Conn.SetReadDeadline(t)
}
// Read reads up to len(p) from bc to p. // Read reads up to len(p) from bc to p.
func (bc *BufferedConn) Read(p []byte) (int, error) { func (bc *BufferedConn) Read(p []byte) (int, error) {
startTime := time.Now() startTime := time.Now()
if startTime.After(bc.deadline) { if deadlineExceeded(bc.readDeadline, startTime) {
return 0, os.ErrDeadlineExceeded return 0, os.ErrDeadlineExceeded
} }
n, err := bc.br.Read(p) n, err := bc.br.Read(p)
@ -71,7 +90,7 @@ func (bc *BufferedConn) Read(p []byte) (int, error) {
// Do not forget to call Flush if needed. // Do not forget to call Flush if needed.
func (bc *BufferedConn) Write(p []byte) (int, error) { func (bc *BufferedConn) Write(p []byte) (int, error) {
startTime := time.Now() startTime := time.Now()
if startTime.After(bc.deadline) { if deadlineExceeded(bc.writeDeadline, startTime) {
return 0, os.ErrDeadlineExceeded return 0, os.ErrDeadlineExceeded
} }
n, err := bc.bw.Write(p) n, err := bc.bw.Write(p)
@ -81,6 +100,13 @@ func (bc *BufferedConn) Write(p []byte) (int, error) {
return n, err return n, err
} }
func deadlineExceeded(deadline, currentTime time.Time) bool {
if deadline.IsZero() {
return false
}
return currentTime.After(deadline)
}
// Close closes bc. // Close closes bc.
func (bc *BufferedConn) Close() error { func (bc *BufferedConn) Close() error {
// Close the Conn at first. It is expected that all the required data // Close the Conn at first. It is expected that all the required data