mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-22 08:10:44 +01:00
341 lines
11 KiB
Go
341 lines
11 KiB
Go
package searchutils
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metricsql"
|
|
)
|
|
|
|
var (
|
|
maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call")
|
|
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution")
|
|
maxStatusRequestDuration = flag.Duration("search.maxStatusRequestDuration", time.Minute*5, "The maximum duration for /api/v1/status/* requests")
|
|
denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses if a part of -storageNode instances fail to perform queries; "+
|
|
"this trades availability over consistency; see also -search.maxQueryDuration")
|
|
)
|
|
|
|
func roundToSeconds(ms int64) int64 {
|
|
return ms - ms%1000
|
|
}
|
|
|
|
// GetInt returns integer value from the given argKey.
|
|
func GetInt(r *http.Request, argKey string) (int, error) {
|
|
argValue := r.FormValue(argKey)
|
|
if len(argValue) == 0 {
|
|
return 0, nil
|
|
}
|
|
n, err := strconv.Atoi(argValue)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse integer %q=%q: %w", argKey, argValue, err)
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
// GetTime returns time from the given argKey query arg.
|
|
//
|
|
// If argKey is missing in r, then defaultMs rounded to seconds is returned.
|
|
// The rounding is needed in order to align query results in Grafana
|
|
// executed at different times. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/720
|
|
func GetTime(r *http.Request, argKey string, defaultMs int64) (int64, error) {
|
|
argValue := r.FormValue(argKey)
|
|
if len(argValue) == 0 {
|
|
return roundToSeconds(defaultMs), nil
|
|
}
|
|
secs, err := strconv.ParseFloat(argValue, 64)
|
|
if err != nil {
|
|
// Try parsing string format
|
|
t, err := time.Parse(time.RFC3339, argValue)
|
|
if err != nil {
|
|
// Handle Prometheus'-provided minTime and maxTime.
|
|
// See https://github.com/prometheus/client_golang/issues/614
|
|
switch argValue {
|
|
case prometheusMinTimeFormatted:
|
|
return minTimeMsecs, nil
|
|
case prometheusMaxTimeFormatted:
|
|
return maxTimeMsecs, nil
|
|
}
|
|
// Try parsing duration relative to the current time
|
|
d, err1 := promutils.ParseDuration(argValue)
|
|
if err1 != nil {
|
|
return 0, fmt.Errorf("cannot parse %q=%q: %w", argKey, argValue, err)
|
|
}
|
|
if d > 0 {
|
|
d = -d
|
|
}
|
|
t = time.Now().Add(d)
|
|
}
|
|
secs = float64(t.UnixNano()) / 1e9
|
|
}
|
|
msecs := int64(secs * 1e3)
|
|
if msecs < minTimeMsecs {
|
|
msecs = 0
|
|
}
|
|
if msecs > maxTimeMsecs {
|
|
msecs = maxTimeMsecs
|
|
}
|
|
return msecs, nil
|
|
}
|
|
|
|
var (
|
|
// These constants were obtained from https://github.com/prometheus/prometheus/blob/91d7175eaac18b00e370965f3a8186cc40bf9f55/web/api/v1/api.go#L442
|
|
// See https://github.com/prometheus/client_golang/issues/614 for details.
|
|
prometheusMinTimeFormatted = time.Unix(math.MinInt64/1000+62135596801, 0).UTC().Format(time.RFC3339Nano)
|
|
prometheusMaxTimeFormatted = time.Unix(math.MaxInt64/1000-62135596801, 999999999).UTC().Format(time.RFC3339Nano)
|
|
)
|
|
|
|
const (
|
|
// These values prevent from overflow when storing msec-precision time in int64.
|
|
minTimeMsecs = 0 // use 0 instead of `int64(-1<<63) / 1e6` because the storage engine doesn't actually support negative time
|
|
maxTimeMsecs = int64(1<<63-1) / 1e6
|
|
)
|
|
|
|
// GetDuration returns duration from the given argKey query arg.
|
|
func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, error) {
|
|
argValue := r.FormValue(argKey)
|
|
if len(argValue) == 0 {
|
|
return defaultValue, nil
|
|
}
|
|
secs, err := strconv.ParseFloat(argValue, 64)
|
|
if err != nil {
|
|
// Try parsing string format
|
|
d, err := promutils.ParseDuration(argValue)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse %q=%q: %w", argKey, argValue, err)
|
|
}
|
|
secs = d.Seconds()
|
|
}
|
|
msecs := int64(secs * 1e3)
|
|
if msecs <= 0 || msecs > maxDurationMsecs {
|
|
return 0, fmt.Errorf("%q=%dms is out of allowed range [%d ... %d]", argKey, msecs, 0, int64(maxDurationMsecs))
|
|
}
|
|
return msecs, nil
|
|
}
|
|
|
|
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
|
|
|
|
// GetMaxQueryDuration returns the maximum duration for query from r.
|
|
func GetMaxQueryDuration(r *http.Request) time.Duration {
|
|
dms, err := GetDuration(r, "timeout", 0)
|
|
if err != nil {
|
|
dms = 0
|
|
}
|
|
d := time.Duration(dms) * time.Millisecond
|
|
if d <= 0 || d > *maxQueryDuration {
|
|
d = *maxQueryDuration
|
|
}
|
|
return d
|
|
}
|
|
|
|
// GetDeadlineForQuery returns deadline for the given query r.
|
|
func GetDeadlineForQuery(r *http.Request, startTime time.Time) Deadline {
|
|
dMax := maxQueryDuration.Milliseconds()
|
|
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration")
|
|
}
|
|
|
|
// GetDeadlineForStatusRequest returns deadline for the given request to /api/v1/status/*.
|
|
func GetDeadlineForStatusRequest(r *http.Request, startTime time.Time) Deadline {
|
|
dMax := maxStatusRequestDuration.Milliseconds()
|
|
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxStatusRequestDuration")
|
|
}
|
|
|
|
// GetDeadlineForExport returns deadline for the given request to /api/v1/export.
|
|
func GetDeadlineForExport(r *http.Request, startTime time.Time) Deadline {
|
|
dMax := maxExportDuration.Milliseconds()
|
|
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration")
|
|
}
|
|
|
|
func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) Deadline {
|
|
d, err := GetDuration(r, "timeout", 0)
|
|
if err != nil {
|
|
d = 0
|
|
}
|
|
if d <= 0 || d > dMax {
|
|
d = dMax
|
|
}
|
|
timeout := time.Duration(d) * time.Millisecond
|
|
return NewDeadline(startTime, timeout, flagHint)
|
|
}
|
|
|
|
// GetBool returns boolean value from the given argKey query arg.
|
|
func GetBool(r *http.Request, argKey string) bool {
|
|
argValue := r.FormValue(argKey)
|
|
switch strings.ToLower(argValue) {
|
|
case "", "0", "f", "false", "no":
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
// GetDenyPartialResponse returns whether partial responses are denied.
|
|
func GetDenyPartialResponse(r *http.Request) bool {
|
|
if *denyPartialResponse {
|
|
return true
|
|
}
|
|
if r == nil {
|
|
return false
|
|
}
|
|
return GetBool(r, "deny_partial_response")
|
|
}
|
|
|
|
// Deadline contains deadline with the corresponding timeout for pretty error messages.
|
|
type Deadline struct {
|
|
deadline uint64
|
|
|
|
timeout time.Duration
|
|
flagHint string
|
|
}
|
|
|
|
// NewDeadline returns deadline for the given timeout.
|
|
//
|
|
// flagHint must contain a hit for command-line flag, which could be used
|
|
// in order to increase timeout.
|
|
func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline {
|
|
return Deadline{
|
|
deadline: uint64(startTime.Add(timeout).Unix()),
|
|
timeout: timeout,
|
|
flagHint: flagHint,
|
|
}
|
|
}
|
|
|
|
// DeadlineFromTimestamp returns deadline from the given timestamp in seconds.
|
|
func DeadlineFromTimestamp(timestamp uint64) Deadline {
|
|
startTime := time.Now()
|
|
timeout := time.Unix(int64(timestamp), 0).Sub(startTime)
|
|
return NewDeadline(startTime, timeout, "")
|
|
}
|
|
|
|
// Exceeded returns true if deadline is exceeded.
|
|
func (d *Deadline) Exceeded() bool {
|
|
return fasttime.UnixTimestamp() > d.deadline
|
|
}
|
|
|
|
// Deadline returns deadline in unix timestamp seconds.
|
|
func (d *Deadline) Deadline() uint64 {
|
|
return d.deadline
|
|
}
|
|
|
|
// String returns human-readable string representation for d.
|
|
func (d *Deadline) String() string {
|
|
startTime := time.Unix(int64(d.deadline), 0).Add(-d.timeout)
|
|
elapsed := time.Since(startTime)
|
|
msg := fmt.Sprintf("%.3f seconds (elapsed %.3f seconds)", d.timeout.Seconds(), elapsed.Seconds())
|
|
if d.flagHint != "" {
|
|
msg += fmt.Sprintf("; the timeout can be adjusted with `%s` command-line flag", d.flagHint)
|
|
}
|
|
return msg
|
|
}
|
|
|
|
// GetExtraTagFilters returns additional label filters from request.
|
|
//
|
|
// Label filters can be present in extra_label and extra_filters[] query args.
|
|
// They are combined. For example, the following query args:
|
|
//
|
|
// extra_label=t1=v1&extra_label=t2=v2&extra_filters[]={env="prod",team="devops"}&extra_filters={env=~"dev|staging",team!="devops"}
|
|
//
|
|
// should be translated to the following filters joined with "or":
|
|
//
|
|
// {env="prod",team="devops",t1="v1",t2="v2"}
|
|
// {env=~"dev|staging",team!="devops",t1="v1",t2="v2"}
|
|
func GetExtraTagFilters(r *http.Request) ([][]storage.TagFilter, error) {
|
|
var tagFilters []storage.TagFilter
|
|
for _, match := range r.Form["extra_label"] {
|
|
tmp := strings.SplitN(match, "=", 2)
|
|
if len(tmp) != 2 {
|
|
return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", match)
|
|
}
|
|
if tmp[0] == "__name__" {
|
|
// This is required for storage.Search.
|
|
tmp[0] = ""
|
|
}
|
|
tagFilters = append(tagFilters, storage.TagFilter{
|
|
Key: []byte(tmp[0]),
|
|
Value: []byte(tmp[1]),
|
|
})
|
|
}
|
|
extraFilters := append([]string{}, r.Form["extra_filters"]...)
|
|
extraFilters = append(extraFilters, r.Form["extra_filters[]"]...)
|
|
if len(extraFilters) == 0 {
|
|
if len(tagFilters) == 0 {
|
|
return nil, nil
|
|
}
|
|
return [][]storage.TagFilter{tagFilters}, nil
|
|
}
|
|
var etfs [][]storage.TagFilter
|
|
for _, extraFilter := range extraFilters {
|
|
tfs, err := ParseMetricSelector(extraFilter)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse extra_filters=%s: %w", extraFilter, err)
|
|
}
|
|
tfs = append(tfs, tagFilters...)
|
|
etfs = append(etfs, tfs)
|
|
}
|
|
return etfs, nil
|
|
}
|
|
|
|
// JoinTagFilterss adds etfs to every src filter and returns the result.
|
|
func JoinTagFilterss(src, etfs [][]storage.TagFilter) [][]storage.TagFilter {
|
|
if len(src) == 0 {
|
|
return etfs
|
|
}
|
|
if len(etfs) == 0 {
|
|
return src
|
|
}
|
|
var dst [][]storage.TagFilter
|
|
for _, tf := range src {
|
|
for _, etf := range etfs {
|
|
tfs := append([]storage.TagFilter{}, tf...)
|
|
tfs = append(tfs, etf...)
|
|
dst = append(dst, tfs)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// ParseMetricSelector parses s containing PromQL metric selector and returns the corresponding LabelFilters.
|
|
func ParseMetricSelector(s string) ([]storage.TagFilter, error) {
|
|
expr, err := metricsql.Parse(s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
me, ok := expr.(*metricsql.MetricExpr)
|
|
if !ok {
|
|
return nil, fmt.Errorf("expecting metricSelector; got %q", expr.AppendString(nil))
|
|
}
|
|
if len(me.LabelFilters) == 0 {
|
|
return nil, fmt.Errorf("labelFilters cannot be empty")
|
|
}
|
|
tfs := ToTagFilters(me.LabelFilters)
|
|
return tfs, nil
|
|
}
|
|
|
|
// ToTagFilters converts lfs to a slice of storage.TagFilter
|
|
func ToTagFilters(lfs []metricsql.LabelFilter) []storage.TagFilter {
|
|
tfs := make([]storage.TagFilter, len(lfs))
|
|
for i := range lfs {
|
|
toTagFilter(&tfs[i], &lfs[i])
|
|
}
|
|
return tfs
|
|
}
|
|
|
|
func toTagFilter(dst *storage.TagFilter, src *metricsql.LabelFilter) {
|
|
if src.Label != "__name__" {
|
|
dst.Key = []byte(src.Label)
|
|
} else {
|
|
// This is required for storage.Search.
|
|
dst.Key = nil
|
|
}
|
|
dst.Value = []byte(src.Value)
|
|
dst.IsRegexp = src.IsRegexp
|
|
dst.IsNegative = src.IsNegative
|
|
}
|