mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-25 20:00:06 +01:00
83217b7473
This commit returns back limits for these endpoints, which have been removed at 5d66ee88bd
,
since it has been appeared that missing limits result in high CPU usage, while the introduced concurrency limiter
results in failed lightweight requests to these endpoints because of timeout when heavyweight requests are executed.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5055
230 lines
7.5 KiB
Go
230 lines
7.5 KiB
Go
package searchutils
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metricsql"
|
|
)
|
|
|
|
var (
|
|
maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call")
|
|
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution")
|
|
maxStatusRequestDuration = flag.Duration("search.maxStatusRequestDuration", time.Minute*5, "The maximum duration for /api/v1/status/* requests")
|
|
maxLabelsAPIDuration = flag.Duration("search.maxLabelsAPIDuration", time.Second*5, "The maximum duration for /api/v1/labels, /api/v1/label/.../values and /api/v1/series requests. "+
|
|
"See also -search.maxLabelsAPISeries")
|
|
)
|
|
|
|
// GetMaxQueryDuration returns the maximum duration for query from r.
|
|
func GetMaxQueryDuration(r *http.Request) time.Duration {
|
|
dms, err := httputils.GetDuration(r, "timeout", 0)
|
|
if err != nil {
|
|
dms = 0
|
|
}
|
|
d := time.Duration(dms) * time.Millisecond
|
|
if d <= 0 || d > *maxQueryDuration {
|
|
d = *maxQueryDuration
|
|
}
|
|
return d
|
|
}
|
|
|
|
// GetDeadlineForQuery returns deadline for the given query r.
|
|
func GetDeadlineForQuery(r *http.Request, startTime time.Time) Deadline {
|
|
dMax := maxQueryDuration.Milliseconds()
|
|
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration")
|
|
}
|
|
|
|
// GetDeadlineForStatusRequest returns deadline for the given request to /api/v1/status/*.
|
|
func GetDeadlineForStatusRequest(r *http.Request, startTime time.Time) Deadline {
|
|
dMax := maxStatusRequestDuration.Milliseconds()
|
|
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxStatusRequestDuration")
|
|
}
|
|
|
|
// GetDeadlineForExport returns deadline for the given request to /api/v1/export.
|
|
func GetDeadlineForExport(r *http.Request, startTime time.Time) Deadline {
|
|
dMax := maxExportDuration.Milliseconds()
|
|
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration")
|
|
}
|
|
|
|
// GetDeadlineForLabelsAPI returns deadline for the given request to /api/v1/labels, /api/v1/label/.../values or /api/v1/series
|
|
func GetDeadlineForLabelsAPI(r *http.Request, startTime time.Time) Deadline {
|
|
dMax := maxLabelsAPIDuration.Milliseconds()
|
|
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxLabelsAPIDuration")
|
|
}
|
|
|
|
func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) Deadline {
|
|
d, err := httputils.GetDuration(r, "timeout", 0)
|
|
if err != nil {
|
|
d = 0
|
|
}
|
|
if d <= 0 || d > dMax {
|
|
d = dMax
|
|
}
|
|
timeout := time.Duration(d) * time.Millisecond
|
|
return NewDeadline(startTime, timeout, flagHint)
|
|
}
|
|
|
|
// Deadline contains deadline with the corresponding timeout for pretty error messages.
|
|
type Deadline struct {
|
|
deadline uint64
|
|
|
|
timeout time.Duration
|
|
flagHint string
|
|
}
|
|
|
|
// NewDeadline returns deadline for the given timeout.
|
|
//
|
|
// flagHint must contain a hit for command-line flag, which could be used
|
|
// in order to increase timeout.
|
|
func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline {
|
|
return Deadline{
|
|
deadline: uint64(startTime.Add(timeout).Unix()),
|
|
timeout: timeout,
|
|
flagHint: flagHint,
|
|
}
|
|
}
|
|
|
|
// DeadlineFromTimestamp returns deadline from the given timestamp in seconds.
|
|
func DeadlineFromTimestamp(timestamp uint64) Deadline {
|
|
startTime := time.Now()
|
|
timeout := time.Unix(int64(timestamp), 0).Sub(startTime)
|
|
return NewDeadline(startTime, timeout, "")
|
|
}
|
|
|
|
// Exceeded returns true if deadline is exceeded.
|
|
func (d *Deadline) Exceeded() bool {
|
|
return fasttime.UnixTimestamp() > d.deadline
|
|
}
|
|
|
|
// Deadline returns deadline in unix timestamp seconds.
|
|
func (d *Deadline) Deadline() uint64 {
|
|
return d.deadline
|
|
}
|
|
|
|
// String returns human-readable string representation for d.
|
|
func (d *Deadline) String() string {
|
|
startTime := time.Unix(int64(d.deadline), 0).Add(-d.timeout)
|
|
elapsed := time.Since(startTime)
|
|
msg := fmt.Sprintf("%.3f seconds (elapsed %.3f seconds)", d.timeout.Seconds(), elapsed.Seconds())
|
|
if float64(elapsed)/float64(d.timeout) > 0.9 && d.flagHint != "" {
|
|
msg += fmt.Sprintf("; the timeout can be adjusted with `%s` command-line flag", d.flagHint)
|
|
}
|
|
return msg
|
|
}
|
|
|
|
// GetExtraTagFilters returns additional label filters from request.
|
|
//
|
|
// Label filters can be present in extra_label and extra_filters[] query args.
|
|
// They are combined. For example, the following query args:
|
|
//
|
|
// extra_label=t1=v1&extra_label=t2=v2&extra_filters[]={env="prod",team="devops"}&extra_filters={env=~"dev|staging",team!="devops"}
|
|
//
|
|
// should be translated to the following filters joined with "or":
|
|
//
|
|
// {env="prod",team="devops",t1="v1",t2="v2"}
|
|
// {env=~"dev|staging",team!="devops",t1="v1",t2="v2"}
|
|
func GetExtraTagFilters(r *http.Request) ([][]storage.TagFilter, error) {
|
|
var tagFilters []storage.TagFilter
|
|
for _, match := range r.Form["extra_label"] {
|
|
tmp := strings.SplitN(match, "=", 2)
|
|
if len(tmp) != 2 {
|
|
return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", match)
|
|
}
|
|
if tmp[0] == "__name__" {
|
|
// This is required for storage.Search.
|
|
tmp[0] = ""
|
|
}
|
|
tagFilters = append(tagFilters, storage.TagFilter{
|
|
Key: []byte(tmp[0]),
|
|
Value: []byte(tmp[1]),
|
|
})
|
|
}
|
|
extraFilters := append([]string{}, r.Form["extra_filters"]...)
|
|
extraFilters = append(extraFilters, r.Form["extra_filters[]"]...)
|
|
if len(extraFilters) == 0 {
|
|
if len(tagFilters) == 0 {
|
|
return nil, nil
|
|
}
|
|
return [][]storage.TagFilter{tagFilters}, nil
|
|
}
|
|
var etfs [][]storage.TagFilter
|
|
for _, extraFilter := range extraFilters {
|
|
tfss, err := ParseMetricSelector(extraFilter)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse extra_filters=%s: %w", extraFilter, err)
|
|
}
|
|
for i := range tfss {
|
|
tfss[i] = append(tfss[i], tagFilters...)
|
|
}
|
|
etfs = append(etfs, tfss...)
|
|
}
|
|
return etfs, nil
|
|
}
|
|
|
|
// JoinTagFilterss adds etfs to every src filter and returns the result.
|
|
func JoinTagFilterss(src, etfs [][]storage.TagFilter) [][]storage.TagFilter {
|
|
if len(src) == 0 {
|
|
return etfs
|
|
}
|
|
if len(etfs) == 0 {
|
|
return src
|
|
}
|
|
var dst [][]storage.TagFilter
|
|
for _, tf := range src {
|
|
for _, etf := range etfs {
|
|
tfs := append([]storage.TagFilter{}, tf...)
|
|
tfs = append(tfs, etf...)
|
|
dst = append(dst, tfs)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// ParseMetricSelector parses s containing PromQL metric selector and returns the corresponding LabelFilters.
|
|
func ParseMetricSelector(s string) ([][]storage.TagFilter, error) {
|
|
expr, err := metricsql.Parse(s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
me, ok := expr.(*metricsql.MetricExpr)
|
|
if !ok {
|
|
return nil, fmt.Errorf("expecting metricSelector; got %q", expr.AppendString(nil))
|
|
}
|
|
if len(me.LabelFilterss) == 0 {
|
|
return nil, fmt.Errorf("labelFilterss cannot be empty")
|
|
}
|
|
tfss := ToTagFilterss(me.LabelFilterss)
|
|
return tfss, nil
|
|
}
|
|
|
|
// ToTagFilterss converts lfss to or-delimited slices of storage.TagFilter
|
|
func ToTagFilterss(lfss [][]metricsql.LabelFilter) [][]storage.TagFilter {
|
|
tfss := make([][]storage.TagFilter, len(lfss))
|
|
for i, lfs := range lfss {
|
|
tfs := make([]storage.TagFilter, len(lfs))
|
|
for j := range lfs {
|
|
toTagFilter(&tfs[j], &lfs[j])
|
|
}
|
|
tfss[i] = tfs
|
|
}
|
|
return tfss
|
|
}
|
|
|
|
func toTagFilter(dst *storage.TagFilter, src *metricsql.LabelFilter) {
|
|
if src.Label != "__name__" {
|
|
dst.Key = []byte(src.Label)
|
|
} else {
|
|
// This is required for storage.Search.
|
|
dst.Key = nil
|
|
}
|
|
dst.Value = []byte(src.Value)
|
|
dst.IsRegexp = src.IsRegexp
|
|
dst.IsNegative = src.IsNegative
|
|
}
|