app/vmalert: respect batch size limit for remote write on shutdown (#6039)

During shutdown period of vmalert, remotewrite client retrieve all pending time series from buffer queue, compose them into 1 batch and execute remote write.

This final batch may exceed the limit of -remoteWrite.maxBatchSize, and be rejected by the receiver (gateway, vmcluster or others).

This changes ensures that even during shutdown vmalert won't exceed the max batch size limit for remote write
destination.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6025

(cherry picked from commit 623d257faf)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Jiekun 2024-03-29 14:27:50 +01:00 committed by hagen1778
parent 36c1ca9949
commit 1e0a079143
No known key found for this signature in database
GPG Key ID: 3BF75F3741CA9640
3 changed files with 110 additions and 4 deletions

View File

@ -151,12 +151,22 @@ func (c *Client) run(ctx context.Context) {
ticker := time.NewTicker(c.flushInterval)
wr := &prompbmarshal.WriteRequest{}
shutdown := func() {
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
logger.Infof("shutting down remote write client and flushing remained series")
shutdownFlushCnt := 0
for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
shutdownFlushCnt += len(wr.Timeseries)
c.flush(lastCtx, wr)
}
}
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
// flush the last batch. `flush` will re-check and avoid flushing empty batch.
shutdownFlushCnt += len(wr.Timeseries)
c.flush(lastCtx, wr)
logger.Infof("shutting down remote write client flushed %d series", shutdownFlushCnt)
cancel()
}
c.wg.Add(1)

View File

@ -84,6 +84,70 @@ func TestClient_Push(t *testing.T) {
}
}
func TestClient_run_maxBatchSizeDuringShutdown(t *testing.T) {
batchSize := 20
testTable := []struct {
name string // name of the test case
pushCnt int // how many time series is pushed to the client
batchCnt int // the expected batch count sent by the client
}{
{
name: "pushCnt % batchSize == 0",
pushCnt: batchSize * 40,
batchCnt: 40,
},
{
name: "pushCnt % batchSize != 0",
pushCnt: batchSize*40 + 1,
batchCnt: 40 + 1,
},
}
for _, tt := range testTable {
t.Run(tt.name, func(t *testing.T) {
// run new server
bcServer := newBatchCntRWServer()
// run new client
rwClient, err := NewClient(context.Background(), Config{
MaxBatchSize: batchSize,
// Set everything to 1 to simplify the calculation.
Concurrency: 1,
MaxQueueSize: 1000,
FlushInterval: time.Minute,
// batch count server
Addr: bcServer.URL,
})
if err != nil {
t.Fatalf("new remote write client failed, err: %v", err)
}
// push time series to the client.
for i := 0; i < tt.pushCnt; i++ {
if err = rwClient.Push(prompbmarshal.TimeSeries{}); err != nil {
t.Fatalf("push time series to the client failed, err: %v", err)
}
}
// close the client so the rest ts will be flushed in `shutdown`
if err = rwClient.Close(); err != nil {
t.Fatalf("shutdown client failed, err: %v", err)
}
// finally check how many batches is sent.
if tt.batchCnt != bcServer.acceptedBatches() {
t.Errorf("client sent batch count incorrect, want: %d, get: %d", tt.batchCnt, bcServer.acceptedBatches())
}
if tt.pushCnt != bcServer.accepted() {
t.Errorf("client sent time series count incorrect, want: %d, get: %d", tt.pushCnt, bcServer.accepted())
}
})
}
}
func newRWServer() *rwServer {
rw := &rwServer{}
rw.Server = httptest.NewServer(http.HandlerFunc(rw.handler))
@ -184,3 +248,27 @@ func (frw *faultyRWServer) handler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("server overloaded"))
}
}
type batchCntRWServer struct {
*rwServer
batchCnt atomic.Int64 // accepted batch count, which also equals to request count
}
func newBatchCntRWServer() *batchCntRWServer {
bc := &batchCntRWServer{
rwServer: &rwServer{},
}
bc.Server = httptest.NewServer(http.HandlerFunc(bc.handler))
return bc
}
func (bc *batchCntRWServer) handler(w http.ResponseWriter, r *http.Request) {
bc.batchCnt.Add(1)
bc.rwServer.handler(w, r)
}
func (bc *batchCntRWServer) acceptedBatches() int {
return int(bc.batchCnt.Load())
}

View File

@ -31,6 +31,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
**Update note 1: the `-datasource.lookback` command-line flag at `vmalert` is no-op starting from this release. This flag will be removed in the future, so please switch to [`eval_delay` option](https://docs.victoriametrics.com/vmalert/#groups). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5155) for more details.**
**Update note 2: transform custom HTTP header keys specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options) for scrape targets to [canonical form](https://pkg.go.dev/net/http#CanonicalHeaderKey).**
* SECURITY: upgrade Go builder from Go1.21.7 to Go1.22.1. See [the list of issues addressed in Go1.22.1](https://github.com/golang/go/issues?q=milestone%3AGo1.22.1+label%3ACherryPickApproved).
@ -38,6 +39,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow discovering ip addresses for backend instances hidden behind a shared hostname, via `discover_backend_ips: true` option. This allows evenly spreading load among backend instances. See [these docs](https://docs.victoriametrics.com/vmauth/#discovering-backend-ips) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5707).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow routing incoming requests based on HTTP [query args](https://en.wikipedia.org/wiki/Query_string) via `src_query_args` option at `url_map`. See [these docs](https://docs.victoriametrics.com/vmauth/#generic-http-proxy-for-different-backends) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5878).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): allow routing incoming requests based on HTTP request headers via `src_headers` option at `url_map`. See [these docs](https://docs.victoriametrics.com/vmauth/#generic-http-proxy-for-different-backends).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): added ability to set extra headers where to expect auth token (additionally to Authorization).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). The memory usage reduction is most visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow using `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line flags without the need to specify `-streamAggr.config` and `-remoteWrite.streamAggr.config`. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `-streamAggr.dropInputLabels` command-line flag, which can be used for dropping the listed labels from input samples before applying stream [de-duplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) and aggregation. This is faster and easier to use alternative to [input_relabel_configs](https://docs.victoriametrics.com/stream-aggregation/#relabeling). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels).
@ -57,6 +59,9 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for [native protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5748). Thanks to @khushijain21 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5824).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): support client-side TLS configuration for VictoriaMetrics destination specified via `--vm-*` cmd-line flags used in [InfluxDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-influxdb-1x), [Remote Read protocol](https://docs.victoriametrics.com/vmctl/#migrating-data-by-remote-read-protocol), [OpenTSDB](https://docs.victoriametrics.com/vmctl/#migrating-data-from-opentsdb), [Prometheus](https://docs.victoriametrics.com/vmctl/#migrating-data-from-prometheus) and [Promscale](https://docs.victoriametrics.com/vmctl/#migrating-data-from-promscale) migration modes.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): split [explore phase](https://docs.victoriametrics.com/vmctl/#migrating-data-from-victoriametrics) in `vm-native` mode by time intervals when [--vm-native-step-interval](https://docs.victoriametrics.com/vmctl/#using-time-based-chunking-of-migration) is specified. This should reduce probability of exceeding complexity limits for number of selected series during explore phase. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5369).
* FEATURE: [graphite](https://docs.victoriametrics.com/#graphite-render-api-usage): add support for `aggregateSeriesLists`, `diffSeriesLists`, `multiplySeriesLists` and `sumSeriesLists` functions. Thanks to @rbizos for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5809).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): added command line argument that enables OpenTelementry metric names and labels sanitization.
* BUGFIX: prevent from automatic deletion of newly registered time series when it is queried immediately after the addition. The probability of this bug has been increased significantly after [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) because of optimizations related to registering new time series. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5948) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5959) issue.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly set `Host` header in requests to scrape targets if it is specified via [`headers` option](https://docs.victoriametrics.com/sd_configs/#http-api-client-options). Thanks to @fholzer for [the bugreport](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5969) and [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5970).
@ -66,8 +71,11 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix VictoriaLogs UI query handling to correctly apply `_time` filter across all queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5920).
* BUGFIX: [vmselect](https://docs.victoriametrics.com/): make vmselect resilient to absence of cache folder. If cache folder was mistakenly deleted by user or OS, vmselect will try re-creating it first. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5985).
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): limit duration of requests to /api/v1/labels, /api/v1/label/.../values or /api/v1/series with `-search.maxLabelsAPIDuration` duration. Before, `-search.maxExportDuration` value was used by mistake. Thanks to @kbweave for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5992).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix VictoriaLogs UI query handling to correctly apply `_time` filter across all queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5920).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix response body and headers for AWS Firehose HTTP Destination.
* BUGFIX: properly wait for force merge to be completed during the shutdown. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5944) for the details.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): set correct `endsAt` value in notifications sent to the Alertmanager. Previously, a rule with evaluation intervals lower than 10s could never be triggered. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5995) for details.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly account for `-rule.resendDelay` for alerting rules that are constantly switching state from inactive to firing. Before, notifications for such rules could have been skipped if state change happened more often than `-rule.resendDelay`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6028) for details.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): respect `-remoteWrite.maxBatchSize` at shutdown period. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6025).
## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0)