VictoriaMetrics/app/vmalert/datasource/client.go
Hui Wang 9616814728
Some checks are pending
build / Build (push) Waiting to run
CodeQL Go / Analyze (push) Waiting to run
main / lint (push) Waiting to run
main / test (test-full) (push) Blocked by required conditions
main / test (test-full-386) (push) Blocked by required conditions
main / test (test-pure) (push) Blocked by required conditions
vmalert: integrate with victorialogs (#7255)
address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6706.
See
https://github.com/VictoriaMetrics/VictoriaMetrics/blob/vmalert-support-vlog-ds/docs/VictoriaLogs/vmalert.md.

Related fix
https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7254.

Note: in this pull request, vmalert doesn't support
[backfilling](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/vmalert-support-vlog-ds/docs/VictoriaLogs/vmalert.md#rules-backfilling)
for rules with a customized time filter. It might be added in the
future, see [this
issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7289)
for details.

Feature can be tested with image
`victoriametrics/vmalert:heads-vmalert-support-vlog-ds-0-g420629c-scratch`.

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit 68bad22fd2)
2024-10-29 16:32:00 +01:00

334 lines
10 KiB
Go

package datasource
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
type datasourceType string
const (
datasourcePrometheus datasourceType = "prometheus"
datasourceGraphite datasourceType = "graphite"
datasourceVLogs datasourceType = "vlogs"
)
func toDatasourceType(s string) datasourceType {
switch s {
case string(datasourcePrometheus):
return datasourcePrometheus
case string(datasourceGraphite):
return datasourceGraphite
case string(datasourceVLogs):
return datasourceVLogs
default:
logger.Panicf("BUG: unknown datasource type %q", s)
}
return ""
}
// Client is a datasource entity for reading data,
// supported clients are enumerated in datasourceType.
// WARN: when adding a new field, remember to check if Clone() method needs to be updated.
type Client struct {
c *http.Client
authCfg *promauth.Config
datasourceURL string
appendTypePrefix bool
queryStep time.Duration
dataSourceType datasourceType
// ApplyIntervalAsTimeFilter is only valid for vlogs datasource.
// Set to true if there is no [timeFilter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) in the rule expression,
// and we will add evaluation interval as an additional timeFilter when querying.
applyIntervalAsTimeFilter bool
// evaluationInterval will help setting request's `step` param,
// or adding time filter for LogsQL expression.
evaluationInterval time.Duration
// extraParams contains params to be attached to each HTTP request
extraParams url.Values
// extraHeaders are headers to be attached to each HTTP request
extraHeaders []keyValue
// whether to print additional log messages
// for each sent request
debug bool
}
type keyValue struct {
key string
value string
}
// Clone clones shared http client and other configuration to the new client.
func (c *Client) Clone() *Client {
ns := &Client{
c: c.c,
authCfg: c.authCfg,
datasourceURL: c.datasourceURL,
appendTypePrefix: c.appendTypePrefix,
queryStep: c.queryStep,
dataSourceType: c.dataSourceType,
evaluationInterval: c.evaluationInterval,
// init map so it can be populated below
extraParams: url.Values{},
debug: c.debug,
}
if len(c.extraHeaders) > 0 {
ns.extraHeaders = make([]keyValue, len(c.extraHeaders))
copy(ns.extraHeaders, c.extraHeaders)
}
for k, v := range c.extraParams {
ns.extraParams[k] = v
}
return ns
}
// ApplyParams - changes given querier params.
func (c *Client) ApplyParams(params QuerierParams) *Client {
if params.DataSourceType != "" {
c.dataSourceType = toDatasourceType(params.DataSourceType)
}
c.evaluationInterval = params.EvaluationInterval
c.applyIntervalAsTimeFilter = params.ApplyIntervalAsTimeFilter
if params.QueryParams != nil {
if c.extraParams == nil {
c.extraParams = url.Values{}
}
for k, vl := range params.QueryParams {
// custom query params are prior to default ones
if c.extraParams.Has(k) {
c.extraParams.Del(k)
}
for _, v := range vl {
// don't use .Set() instead of Del/Add since it is allowed
// for GET params to be duplicated
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4908
c.extraParams.Add(k, v)
}
}
}
if params.Headers != nil {
for key, value := range params.Headers {
kv := keyValue{key: key, value: value}
c.extraHeaders = append(c.extraHeaders, kv)
}
}
c.debug = params.Debug
return c
}
// BuildWithParams - implements interface.
func (c *Client) BuildWithParams(params QuerierParams) Querier {
return c.Clone().ApplyParams(params)
}
// NewPrometheusClient returns a new prometheus datasource client.
func NewPrometheusClient(baseURL string, authCfg *promauth.Config, appendTypePrefix bool, c *http.Client) *Client {
return &Client{
c: c,
authCfg: authCfg,
datasourceURL: strings.TrimSuffix(baseURL, "/"),
appendTypePrefix: appendTypePrefix,
queryStep: *queryStep,
dataSourceType: datasourcePrometheus,
extraParams: url.Values{},
}
}
// Query executes the given query and returns parsed response
func (c *Client) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) {
req, err := c.newQueryRequest(ctx, query, ts)
if err != nil {
return Result{}, nil, err
}
resp, err := c.do(req)
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) {
// Return unexpected error to the caller.
return Result{}, nil, err
}
// Something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, err = c.newQueryRequest(ctx, query, ts)
if err != nil {
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
}
resp, err = c.do(req)
if err != nil {
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
}
}
// Process the received response.
var parseFn func(req *http.Request, resp *http.Response) (Result, error)
switch c.dataSourceType {
case datasourcePrometheus:
parseFn = parsePrometheusResponse
case datasourceGraphite:
parseFn = parseGraphiteResponse
case datasourceVLogs:
parseFn = parseVLogsResponse
default:
logger.Panicf("BUG: unsupported datasource type %q to parse query response", c.dataSourceType)
}
result, err := parseFn(req, resp)
_ = resp.Body.Close()
return result, req, err
}
// QueryRange executes the given query on the given time range.
// For Prometheus type see https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
// Graphite type isn't supported.
func (c *Client) QueryRange(ctx context.Context, query string, start, end time.Time) (res Result, err error) {
if c.dataSourceType == datasourceGraphite {
return res, fmt.Errorf("%q is not supported for QueryRange", c.dataSourceType)
}
// TODO: disable range query LogsQL with time filter now
if c.dataSourceType == datasourceVLogs && !c.applyIntervalAsTimeFilter {
return res, fmt.Errorf("range query is not supported for LogsQL expression %q because it contains time filter. Remove time filter from the expression and try again", query)
}
if start.IsZero() {
return res, fmt.Errorf("start param is missing")
}
if end.IsZero() {
return res, fmt.Errorf("end param is missing")
}
req, err := c.newQueryRangeRequest(ctx, query, start, end)
if err != nil {
return res, err
}
resp, err := c.do(req)
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) {
// Return unexpected error to the caller.
return res, err
}
// Something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, err = c.newQueryRangeRequest(ctx, query, start, end)
if err != nil {
return res, fmt.Errorf("second attempt: %w", err)
}
resp, err = c.do(req)
if err != nil {
return res, fmt.Errorf("second attempt: %w", err)
}
}
// Process the received response.
var parseFn func(req *http.Request, resp *http.Response) (Result, error)
switch c.dataSourceType {
case datasourcePrometheus:
parseFn = parsePrometheusResponse
case datasourceVLogs:
parseFn = parseVLogsResponse
default:
logger.Panicf("BUG: unsupported datasource type %q to parse query range response", c.dataSourceType)
}
res, err = parseFn(req, resp)
_ = resp.Body.Close()
return res, err
}
func (c *Client) do(req *http.Request) (*http.Response, error) {
ru := req.URL.Redacted()
if *showDatasourceURL {
ru = req.URL.String()
}
if c.debug {
logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, ru)
}
resp, err := c.c.Do(req)
if err != nil {
return nil, fmt.Errorf("error getting response from %s: %w", ru, err)
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
return nil, fmt.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, ru, body)
}
return resp, nil
}
func (c *Client) newQueryRangeRequest(ctx context.Context, query string, start, end time.Time) (*http.Request, error) {
req, err := c.newRequest(ctx)
if err != nil {
return nil, fmt.Errorf("cannot create query_range request to datasource %q: %w", c.datasourceURL, err)
}
switch c.dataSourceType {
case datasourcePrometheus:
c.setPrometheusRangeReqParams(req, query, start, end)
case datasourceVLogs:
c.setVLogsRangeReqParams(req, query, start, end)
default:
logger.Panicf("BUG: unsupported datasource type %q to create range query request", c.dataSourceType)
}
return req, nil
}
func (c *Client) newQueryRequest(ctx context.Context, query string, ts time.Time) (*http.Request, error) {
req, err := c.newRequest(ctx)
if err != nil {
return nil, fmt.Errorf("cannot create query request to datasource %q: %w", c.datasourceURL, err)
}
switch c.dataSourceType {
case datasourcePrometheus:
c.setPrometheusInstantReqParams(req, query, ts)
case datasourceGraphite:
c.setGraphiteReqParams(req, query)
case datasourceVLogs:
c.setVLogsInstantReqParams(req, query, ts)
default:
logger.Panicf("BUG: unsupported datasource type %q to create query request", c.dataSourceType)
}
return req, nil
}
func (c *Client) newRequest(ctx context.Context) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.datasourceURL, nil)
if err != nil {
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", c.datasourceURL, err)
}
req.Header.Set("Content-Type", "application/json")
if c.authCfg != nil {
err = c.authCfg.SetHeaders(req, true)
if err != nil {
return nil, err
}
}
for _, h := range c.extraHeaders {
req.Header.Set(h.key, h.value)
}
return req, nil
}
// setReqParams adds query and other extra params for the request.
func (c *Client) setReqParams(r *http.Request, query string) {
q := r.URL.Query()
for k, vs := range c.extraParams {
if q.Has(k) { // extraParams are prior to params in URL
q.Del(k)
}
for _, v := range vs {
q.Add(k, v)
}
}
q.Set("query", query)
r.URL.RawQuery = q.Encode()
}