From e9ce67adb8bd5ba9807bafb66e4180233f03c657 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Wed, 19 Apr 2023 10:18:32 +0200 Subject: [PATCH] vmalert: retry datasource requests with EOF or unexpected EOF errors (#4146) * vmalert: retry datasource requests with EOF or unexpected EOF errors Retry failed read request on the closed connection one more time. This may improve rules execution reliability when connection between vmalert and datasource closes unexpectedly. Signed-off-by: hagen1778 * vmalert: fix old tests Signed-off-by: hagen1778 --------- Signed-off-by: hagen1778 --- app/vmalert/datasource/vm.go | 6 ++ app/vmalert/datasource/vm_test.go | 103 +++++++++++++++++++++++++----- docs/CHANGELOG.md | 1 + 3 files changed, 93 insertions(+), 17 deletions(-) diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index e9d579c694..fc538266f1 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -2,6 +2,7 @@ package datasource import ( "context" + "errors" "fmt" "io" "net/http" @@ -162,6 +163,11 @@ func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, req.URL.RawQuery) } resp, err := s.c.Do(req.WithContext(ctx)) + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + // something in the middle between client and datasource might be closing + // the connection. So we do a one more attempt in hope request will succeed. + resp, err = s.c.Do(req.WithContext(ctx)) + } if err != nil { return nil, fmt.Errorf("error getting response from %s: %w", req.URL.Redacted(), err) } diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/vm_test.go index e9d7287638..d6413ab6e8 100644 --- a/app/vmalert/datasource/vm_test.go +++ b/app/vmalert/datasource/vm_test.go @@ -38,7 +38,7 @@ func TestVMInstantQuery(t *testing.T) { mux.HandleFunc("/render", func(w http.ResponseWriter, request *http.Request) { c++ switch c { - case 8: + case 7: w.Write([]byte(`[{"target":"constantLine(10)","tags":{"name":"constantLine(10)"},"datapoints":[[10,1611758343],[10,1611758373],[10,1611758403]]}]`)) } }) @@ -62,21 +62,18 @@ func TestVMInstantQuery(t *testing.T) { } switch c { case 0: - conn, _, _ := w.(http.Hijacker).Hijack() - _ = conn.Close() - case 1: w.WriteHeader(500) - case 2: + case 1: w.Write([]byte("[]")) - case 3: + case 2: w.Write([]byte(`{"status":"error", "errorType":"type:", "error":"some error msg"}`)) - case 4: + case 3: w.Write([]byte(`{"status":"unknown"}`)) - case 5: + case 4: w.Write([]byte(`{"status":"success","data":{"resultType":"matrix"}}`)) - case 6: + case 5: w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows","foo":"bar"},"value":[1583786142,"13763"]},{"metric":{"__name__":"vm_requests","foo":"baz"},"value":[1583786140,"2000"]}]}}`)) - case 7: + case 6: w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "1"]}}`)) } }) @@ -95,17 +92,20 @@ func TestVMInstantQuery(t *testing.T) { ts := time.Now() expErr := func(err string) { - if _, _, err := pq.Query(ctx, query, ts); err == nil { + _, _, gotErr := pq.Query(ctx, query, ts) + if gotErr == nil { t.Fatalf("expected %q got nil", err) } + if !strings.Contains(gotErr.Error(), err) { + t.Fatalf("expected err %q; got %q", err, gotErr) + } } - expErr("connection error") // 0 - expErr("invalid response status error") // 1 - expErr("response body error") // 2 - expErr("error status") // 3 - expErr("unknown status") // 4 - expErr("non-vector resultType error") // 5 + expErr("500") // 0 + expErr("error parsing prometheus metrics") // 1 + expErr("response error") // 2 + expErr("unknown status") // 3 + expErr("unexpected end of JSON input") // 4 m, _, err := pq.Query(ctx, query, ts) // 6 - vector if err != nil { @@ -165,6 +165,75 @@ func TestVMInstantQuery(t *testing.T) { }, } metricsEqual(t, m, exp) + +} + +func TestVMInstantQueryWithRetry(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) { + t.Errorf("should not be called") + }) + c := -1 + mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) { + c++ + if r.URL.Query().Get("query") != query { + t.Errorf("expected %s in query param, got %s", query, r.URL.Query().Get("query")) + } + switch c { + case 0: + w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "1"]}}`)) + case 1: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + case 2: + w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "2"]}}`)) + case 3: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + case 4: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + } + }) + + srv := httptest.NewServer(mux) + defer srv.Close() + + s := NewVMStorage(srv.URL, nil, time.Minute, 0, false, srv.Client()) + pq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourcePrometheus)}) + + expErr := func(err string) { + _, _, gotErr := pq.Query(ctx, query, time.Now()) + if gotErr == nil { + t.Fatalf("expected %q got nil", err) + } + if !strings.Contains(gotErr.Error(), err) { + t.Fatalf("expected err %q; got %q", err, gotErr) + } + } + + expValue := func(v float64) { + m, _, err := pq.Query(ctx, query, time.Now()) + if err != nil { + t.Fatalf("unexpected %s", err) + } + if len(m) != 1 { + t.Fatalf("expected 1 metrics got %d in %+v", len(m), m) + } + expected := []Metric{ + { + Timestamps: []int64{1583786142}, + Values: []float64{v}, + }, + } + if !reflect.DeepEqual(m, expected) { + t.Fatalf("unexpected metric %+v want %+v", m, expected) + } + } + + expValue(1) // 0 + expValue(2) // 1 - fail, 2 - retry + expErr("EOF") // 3, 4 - retries } func metricsEqual(t *testing.T, gotM, expectedM []Metric) { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 156af5d324..dcd03a3459 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -26,6 +26,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: reduce the probability of sudden increase in the number of small parts on systems with small number of CPU cores. * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix performance issue when migrating data from VictoriaMetrics according to [these docs](https://docs.victoriametrics.com/vmctl.html#migrating-data-from-victoriametrics). Add the ability to speed up the data migration via `--vm-native-disable-retries` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4092). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): fix a panic when the duration in the query contains uppercase `M` suffix. Such a suffix isn't allowed to use in durations, since it clashes with `a million` suffix, e.g. it isn't clear whether `rate(metric[5M])` means rate over 5 minutes, 5 months or 5 million seconds. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3589) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4120) issues. +* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): retry failed read request on the closed connection one more time. This improves rules execution reliability when connection between vmalert and datasource closes unexpectedly. * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): prevent from possible panic when the number of vmstorage nodes increases when [automatic vmstorage discovery](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery) is enabled. ## [v1.90.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.90.0)