mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-12 12:46:23 +01:00
vmalert: retry datasource requests with EOF or unexpected EOF errors (#4146)
* vmalert: retry datasource requests with EOF or unexpected EOF errors Retry failed read request on the closed connection one more time. This may improve rules execution reliability when connection between vmalert and datasource closes unexpectedly. Signed-off-by: hagen1778 <roman@victoriametrics.com> * vmalert: fix old tests Signed-off-by: hagen1778 <roman@victoriametrics.com> --------- Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
79ee1749a1
commit
e9ce67adb8
@ -2,6 +2,7 @@ package datasource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -162,6 +163,11 @@ func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response,
|
||||
logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, req.URL.RawQuery)
|
||||
}
|
||||
resp, err := s.c.Do(req.WithContext(ctx))
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
// something in the middle between client and datasource might be closing
|
||||
// the connection. So we do a one more attempt in hope request will succeed.
|
||||
resp, err = s.c.Do(req.WithContext(ctx))
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting response from %s: %w", req.URL.Redacted(), err)
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ func TestVMInstantQuery(t *testing.T) {
|
||||
mux.HandleFunc("/render", func(w http.ResponseWriter, request *http.Request) {
|
||||
c++
|
||||
switch c {
|
||||
case 8:
|
||||
case 7:
|
||||
w.Write([]byte(`[{"target":"constantLine(10)","tags":{"name":"constantLine(10)"},"datapoints":[[10,1611758343],[10,1611758373],[10,1611758403]]}]`))
|
||||
}
|
||||
})
|
||||
@ -62,21 +62,18 @@ func TestVMInstantQuery(t *testing.T) {
|
||||
}
|
||||
switch c {
|
||||
case 0:
|
||||
conn, _, _ := w.(http.Hijacker).Hijack()
|
||||
_ = conn.Close()
|
||||
case 1:
|
||||
w.WriteHeader(500)
|
||||
case 2:
|
||||
case 1:
|
||||
w.Write([]byte("[]"))
|
||||
case 3:
|
||||
case 2:
|
||||
w.Write([]byte(`{"status":"error", "errorType":"type:", "error":"some error msg"}`))
|
||||
case 4:
|
||||
case 3:
|
||||
w.Write([]byte(`{"status":"unknown"}`))
|
||||
case 5:
|
||||
case 4:
|
||||
w.Write([]byte(`{"status":"success","data":{"resultType":"matrix"}}`))
|
||||
case 6:
|
||||
case 5:
|
||||
w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows","foo":"bar"},"value":[1583786142,"13763"]},{"metric":{"__name__":"vm_requests","foo":"baz"},"value":[1583786140,"2000"]}]}}`))
|
||||
case 7:
|
||||
case 6:
|
||||
w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "1"]}}`))
|
||||
}
|
||||
})
|
||||
@ -95,17 +92,20 @@ func TestVMInstantQuery(t *testing.T) {
|
||||
ts := time.Now()
|
||||
|
||||
expErr := func(err string) {
|
||||
if _, _, err := pq.Query(ctx, query, ts); err == nil {
|
||||
_, _, gotErr := pq.Query(ctx, query, ts)
|
||||
if gotErr == nil {
|
||||
t.Fatalf("expected %q got nil", err)
|
||||
}
|
||||
if !strings.Contains(gotErr.Error(), err) {
|
||||
t.Fatalf("expected err %q; got %q", err, gotErr)
|
||||
}
|
||||
}
|
||||
|
||||
expErr("connection error") // 0
|
||||
expErr("invalid response status error") // 1
|
||||
expErr("response body error") // 2
|
||||
expErr("error status") // 3
|
||||
expErr("unknown status") // 4
|
||||
expErr("non-vector resultType error") // 5
|
||||
expErr("500") // 0
|
||||
expErr("error parsing prometheus metrics") // 1
|
||||
expErr("response error") // 2
|
||||
expErr("unknown status") // 3
|
||||
expErr("unexpected end of JSON input") // 4
|
||||
|
||||
m, _, err := pq.Query(ctx, query, ts) // 6 - vector
|
||||
if err != nil {
|
||||
@ -165,6 +165,75 @@ func TestVMInstantQuery(t *testing.T) {
|
||||
},
|
||||
}
|
||||
metricsEqual(t, m, exp)
|
||||
|
||||
}
|
||||
|
||||
func TestVMInstantQueryWithRetry(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) {
|
||||
t.Errorf("should not be called")
|
||||
})
|
||||
c := -1
|
||||
mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) {
|
||||
c++
|
||||
if r.URL.Query().Get("query") != query {
|
||||
t.Errorf("expected %s in query param, got %s", query, r.URL.Query().Get("query"))
|
||||
}
|
||||
switch c {
|
||||
case 0:
|
||||
w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "1"]}}`))
|
||||
case 1:
|
||||
conn, _, _ := w.(http.Hijacker).Hijack()
|
||||
_ = conn.Close()
|
||||
case 2:
|
||||
w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "2"]}}`))
|
||||
case 3:
|
||||
conn, _, _ := w.(http.Hijacker).Hijack()
|
||||
_ = conn.Close()
|
||||
case 4:
|
||||
conn, _, _ := w.(http.Hijacker).Hijack()
|
||||
_ = conn.Close()
|
||||
}
|
||||
})
|
||||
|
||||
srv := httptest.NewServer(mux)
|
||||
defer srv.Close()
|
||||
|
||||
s := NewVMStorage(srv.URL, nil, time.Minute, 0, false, srv.Client())
|
||||
pq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourcePrometheus)})
|
||||
|
||||
expErr := func(err string) {
|
||||
_, _, gotErr := pq.Query(ctx, query, time.Now())
|
||||
if gotErr == nil {
|
||||
t.Fatalf("expected %q got nil", err)
|
||||
}
|
||||
if !strings.Contains(gotErr.Error(), err) {
|
||||
t.Fatalf("expected err %q; got %q", err, gotErr)
|
||||
}
|
||||
}
|
||||
|
||||
expValue := func(v float64) {
|
||||
m, _, err := pq.Query(ctx, query, time.Now())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected %s", err)
|
||||
}
|
||||
if len(m) != 1 {
|
||||
t.Fatalf("expected 1 metrics got %d in %+v", len(m), m)
|
||||
}
|
||||
expected := []Metric{
|
||||
{
|
||||
Timestamps: []int64{1583786142},
|
||||
Values: []float64{v},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(m, expected) {
|
||||
t.Fatalf("unexpected metric %+v want %+v", m, expected)
|
||||
}
|
||||
}
|
||||
|
||||
expValue(1) // 0
|
||||
expValue(2) // 1 - fail, 2 - retry
|
||||
expErr("EOF") // 3, 4 - retries
|
||||
}
|
||||
|
||||
func metricsEqual(t *testing.T, gotM, expectedM []Metric) {
|
||||
|
@ -26,6 +26,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||
* BUGFIX: reduce the probability of sudden increase in the number of small parts on systems with small number of CPU cores.
|
||||
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix performance issue when migrating data from VictoriaMetrics according to [these docs](https://docs.victoriametrics.com/vmctl.html#migrating-data-from-victoriametrics). Add the ability to speed up the data migration via `--vm-native-disable-retries` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4092).
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): fix a panic when the duration in the query contains uppercase `M` suffix. Such a suffix isn't allowed to use in durations, since it clashes with `a million` suffix, e.g. it isn't clear whether `rate(metric[5M])` means rate over 5 minutes, 5 months or 5 million seconds. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3589) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4120) issues.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): retry failed read request on the closed connection one more time. This improves rules execution reliability when connection between vmalert and datasource closes unexpectedly.
|
||||
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): prevent from possible panic when the number of vmstorage nodes increases when [automatic vmstorage discovery](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery) is enabled.
|
||||
|
||||
## [v1.90.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.90.0)
|
||||
|
Loading…
Reference in New Issue
Block a user