VictoriaMetrics/app/vmselect/prometheus/prometheus.go

671 lines
19 KiB
Go

package prometheus
import (
"flag"
"fmt"
"math"
"net/http"
"runtime"
"strconv"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/quicktemplate"
)
var (
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution")
maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
selectNodes flagutil.Array
)
func init() {
flag.Var(&selectNodes, "selectNode", "vmselect address, usage -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481")
}
// Default step used if not set.
const defaultStep = 5 * 60 * 1000
// Latency for data processing pipeline, i.e. the time between data is ignested
// into the system and the time it becomes visible to search.
const latencyOffset = 60 * 1000
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
ct := currentTime()
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %s", err)
}
matches := r.Form["match[]"]
maxLookback, err := getDuration(r, "max_lookback", defaultStep)
if err != nil {
return err
}
start, err := getTime(r, "start", ct-maxLookback)
if err != nil {
return err
}
end, err := getTime(r, "end", ct)
if err != nil {
return err
}
deadline := getDeadline(r)
if start >= end {
start = end - defaultStep
}
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return err
}
sq := &storage.SearchQuery{
AccountID: at.AccountID,
ProjectID: at.ProjectID,
MinTimestamp: start,
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
rss, _, err := netstorage.ProcessSearchQuery(at, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
}
resultsCh := make(chan *quicktemplate.ByteBuffer)
doneCh := make(chan error)
go func() {
err := rss.RunParallel(func(rs *netstorage.Result) {
bb := quicktemplate.AcquireByteBuffer()
WriteFederate(bb, rs)
resultsCh <- bb
})
close(resultsCh)
doneCh <- err
}()
w.Header().Set("Content-Type", "text/plain")
for bb := range resultsCh {
w.Write(bb.B)
quicktemplate.ReleaseByteBuffer(bb)
}
err = <-doneCh
if err != nil {
return fmt.Errorf("error during data fetching: %s", err)
}
federateDuration.UpdateDuration(startTime)
return nil
}
var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`)
// ExportHandler exports data in raw format from /api/v1/export.
func ExportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
ct := currentTime()
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %s", err)
}
matches := r.Form["match[]"]
if len(matches) == 0 {
// Maintain backwards compatibility
match := r.FormValue("match")
matches = []string{match}
}
start, err := getTime(r, "start", 0)
if err != nil {
return err
}
end, err := getTime(r, "end", ct)
if err != nil {
return err
}
format := r.FormValue("format")
deadline := getDeadline(r)
if start >= end {
start = end - defaultStep
}
if err := exportHandler(at, w, matches, start, end, format, deadline); err != nil {
return err
}
exportDuration.UpdateDuration(startTime)
return nil
}
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, start, end int64, format string, deadline netstorage.Deadline) error {
writeResponseFunc := WriteExportStdResponse
writeLineFunc := WriteExportJSONLine
contentType := "application/json"
if format == "prometheus" {
contentType = "text/plain"
writeLineFunc = WriteExportPrometheusLine
} else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = WriteExportPromAPILine
}
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return err
}
sq := &storage.SearchQuery{
AccountID: at.AccountID,
ProjectID: at.ProjectID,
MinTimestamp: start,
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
}
if isPartial {
rss.Cancel()
return fmt.Errorf("some of the storage nodes are unavailable at the moment")
}
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
doneCh := make(chan error)
go func() {
err := rss.RunParallel(func(rs *netstorage.Result) {
bb := quicktemplate.AcquireByteBuffer()
writeLineFunc(bb, rs)
resultsCh <- bb
})
close(resultsCh)
doneCh <- err
}()
w.Header().Set("Content-Type", contentType)
writeResponseFunc(w, resultsCh)
// Consume all the data from resultsCh in the event writeResponseFunc
// fails to consume all the data.
for bb := range resultsCh {
quicktemplate.ReleaseByteBuffer(bb)
}
err = <-doneCh
if err != nil {
return fmt.Errorf("error during data fetching: %s", err)
}
return nil
}
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
func DeleteHandler(at *auth.Token, r *http.Request) error {
startTime := time.Now()
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %s", err)
}
if r.FormValue("start") != "" || r.FormValue("end") != "" {
return fmt.Errorf("start and end aren't supported. Remove these args from the query in order to delete all the matching metrics")
}
matches := r.Form["match[]"]
deadline := getDeadline(r)
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return err
}
sq := &storage.SearchQuery{
AccountID: at.AccountID,
ProjectID: at.ProjectID,
TagFilterss: tagFilterss,
}
deletedCount, err := netstorage.DeleteSeries(at, sq, deadline)
if err != nil {
return fmt.Errorf("cannot delete time series matching %q: %s", matches, err)
}
if deletedCount > 0 {
// Reset rollup result cache on all the vmselect nodes,
// since the cache may contain deleted data.
// TODO: reset only cache for (account, project)
resetRollupResultCaches()
}
deleteDuration.UpdateDuration(startTime)
return nil
}
var deleteDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/admin/tsdb/delete_series"}`)
func resetRollupResultCaches() {
if len(selectNodes) == 0 {
logger.Panicf("BUG: missing -selectNode flag")
}
for _, selectNode := range selectNodes {
callURL := fmt.Sprintf("http://%s/internal/resetRollupResultCache", selectNode)
resp, err := httpClient.Get(callURL)
if err != nil {
logger.Errorf("error when accessing %q: %s", callURL, err)
resetRollupResultCacheErrors.Inc()
continue
}
if resp.StatusCode != http.StatusOK {
_ = resp.Body.Close()
logger.Errorf("unexpected status code at %q; got %d; want %d", callURL, resp.StatusCode, http.StatusOK)
resetRollupResultCacheErrors.Inc()
continue
}
_ = resp.Body.Close()
}
resetRollupResultCacheCalls.Inc()
}
var (
resetRollupResultCacheErrors = metrics.NewCounter("vm_reset_rollup_result_cache_errors_total")
resetRollupResultCacheCalls = metrics.NewCounter("vm_reset_rollup_result_cache_calls_total")
)
var httpClient = &http.Client{
Timeout: time.Second * 5,
}
// LabelValuesHandler processes /api/v1/label/<labelName>/values request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
func LabelValuesHandler(at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
deadline := getDeadline(r)
labelValues, _, err := netstorage.GetLabelValues(at, labelName, deadline)
if err != nil {
return fmt.Errorf(`cannot obtain label values for %q: %s`, labelName, err)
}
w.Header().Set("Content-Type", "application/json")
WriteLabelValuesResponse(w, labelValues)
labelValuesDuration.UpdateDuration(startTime)
return nil
}
var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/label/{}/values"}`)
// LabelsCountHandler processes /api/v1/labels/count request.
func LabelsCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
deadline := getDeadline(r)
labelEntries, _, err := netstorage.GetLabelEntries(at, deadline)
if err != nil {
return fmt.Errorf(`cannot obtain label entries: %s`, err)
}
w.Header().Set("Content-Type", "application/json")
WriteLabelsCountResponse(w, labelEntries)
labelsCountDuration.UpdateDuration(startTime)
return nil
}
var labelsCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels/count"}`)
// LabelsHandler processes /api/v1/labels request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
func LabelsHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
deadline := getDeadline(r)
labels, _, err := netstorage.GetLabels(at, deadline)
if err != nil {
return fmt.Errorf("cannot obtain labels: %s", err)
}
w.Header().Set("Content-Type", "application/json")
WriteLabelsResponse(w, labels)
labelsDuration.UpdateDuration(startTime)
return nil
}
var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels"}`)
// SeriesCountHandler processes /api/v1/series/count request.
func SeriesCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
deadline := getDeadline(r)
n, _, err := netstorage.GetSeriesCount(at, deadline)
if err != nil {
return fmt.Errorf("cannot obtain series count: %s", err)
}
w.Header().Set("Content-Type", "application/json")
WriteSeriesCountResponse(w, n)
seriesCountDuration.UpdateDuration(startTime)
return nil
}
var seriesCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/series/count"}`)
// SeriesHandler processes /api/v1/series request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
func SeriesHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
ct := currentTime()
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %s", err)
}
matches := r.Form["match[]"]
start, err := getTime(r, "start", ct-defaultStep)
if err != nil {
return err
}
end, err := getTime(r, "end", ct)
if err != nil {
return err
}
deadline := getDeadline(r)
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return err
}
if start >= end {
start = end - defaultStep
}
sq := &storage.SearchQuery{
AccountID: at.AccountID,
ProjectID: at.ProjectID,
MinTimestamp: start,
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
rss, _, err := netstorage.ProcessSearchQuery(at, sq, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
}
resultsCh := make(chan *quicktemplate.ByteBuffer)
doneCh := make(chan error)
go func() {
err := rss.RunParallel(func(rs *netstorage.Result) {
bb := quicktemplate.AcquireByteBuffer()
writemetricNameObject(bb, &rs.MetricName)
resultsCh <- bb
})
close(resultsCh)
doneCh <- err
}()
w.Header().Set("Content-Type", "application/json")
WriteSeriesResponse(w, resultsCh)
// Consume all the data from resultsCh in the event WriteSeriesResponse
// fails to consume all the data.
for bb := range resultsCh {
quicktemplate.ReleaseByteBuffer(bb)
}
err = <-doneCh
if err != nil {
return fmt.Errorf("error during data fetching: %s", err)
}
seriesDuration.UpdateDuration(startTime)
return nil
}
var seriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/series"}`)
// QueryHandler processes /api/v1/query request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
ct := currentTime()
query := r.FormValue("query")
start, err := getTime(r, "time", ct)
if err != nil {
return err
}
step, err := getDuration(r, "step", latencyOffset)
if err != nil {
return err
}
deadline := getDeadline(r)
if len(query) > *maxQueryLen {
return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen)
}
if ct-start < latencyOffset {
start -= latencyOffset
}
if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" {
var window int64
if len(windowStr) > 0 {
var err error
window, err = promql.DurationValue(windowStr, step)
if err != nil {
return err
}
}
var offset int64
if len(offsetStr) > 0 {
var err error
offset, err = promql.DurationValue(offsetStr, step)
if err != nil {
return err
}
}
start -= offset
end := start
start = end - window
if err := exportHandler(at, w, []string{childQuery}, start, end, "promapi", deadline); err != nil {
return err
}
queryDuration.UpdateDuration(startTime)
return nil
}
ec := promql.EvalConfig{
AuthToken: at,
Start: start,
End: start,
Step: step,
Deadline: deadline,
}
result, err := promql.Exec(&ec, query)
if err != nil {
return fmt.Errorf("cannot execute %q: %s", query, err)
}
w.Header().Set("Content-Type", "application/json")
WriteQueryResponse(w, result)
queryDuration.UpdateDuration(startTime)
return nil
}
var queryDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/query"}`)
// QueryRangeHandler processes /api/v1/query_range request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
startTime := time.Now()
ct := currentTime()
query := r.FormValue("query")
start, err := getTime(r, "start", ct-defaultStep)
if err != nil {
return err
}
end, err := getTime(r, "end", ct)
if err != nil {
return err
}
step, err := getDuration(r, "step", defaultStep)
if err != nil {
return err
}
deadline := getDeadline(r)
mayCache := !getBool(r, "nocache")
// Validate input args.
if len(query) > *maxQueryLen {
return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen)
}
if start > end {
start = end
}
if err := promql.ValidateMaxPointsPerTimeseries(start, end, step); err != nil {
return err
}
start, end = promql.AdjustStartEnd(start, end, step)
ec := promql.EvalConfig{
AuthToken: at,
Start: start,
End: end,
Step: step,
Deadline: deadline,
MayCache: mayCache,
}
result, err := promql.Exec(&ec, query)
if err != nil {
return fmt.Errorf("cannot execute %q: %s", query, err)
}
if ct-end < latencyOffset {
adjustLastPoints(result)
}
w.Header().Set("Content-Type", "application/json")
WriteQueryRangeResponse(w, result)
queryRangeDuration.UpdateDuration(startTime)
return nil
}
var queryRangeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/query_range"}`)
// adjustLastPoints substitutes the last point values with the previous
// point values, since the last points may contain garbage.
func adjustLastPoints(tss []netstorage.Result) {
if len(tss) == 0 {
return
}
// Search for the last non-NaN value across all the timeseries.
lastNonNaNIdx := -1
for i := range tss {
r := &tss[i]
j := len(r.Values) - 1
for j >= 0 && math.IsNaN(r.Values[j]) {
j--
}
if j > lastNonNaNIdx {
lastNonNaNIdx = j
}
}
if lastNonNaNIdx == -1 {
// All timeseries contain only NaNs.
return
}
// Substitute last three values starting from lastNonNaNIdx
// with the previous values for each timeseries.
for i := range tss {
r := &tss[i]
for j := 0; j < 3; j++ {
idx := lastNonNaNIdx + j
if idx <= 0 || idx >= len(r.Values) {
continue
}
r.Values[idx] = r.Values[idx-1]
}
}
}
func getTime(r *http.Request, argKey string, defaultValue int64) (int64, error) {
argValue := r.FormValue(argKey)
if len(argValue) == 0 {
return defaultValue, nil
}
secs, err := strconv.ParseFloat(argValue, 64)
if err != nil {
// Try parsing string format
t, err := time.Parse(time.RFC3339, argValue)
if err != nil {
return 0, fmt.Errorf("cannot parse %q=%q: %s", argKey, argValue, err)
}
secs = float64(t.UnixNano()) / 1e9
}
msecs := int64(secs * 1e3)
if msecs < minTimeMsecs || msecs > maxTimeMsecs {
return 0, fmt.Errorf("%q=%dms is out of allowed range [%d ... %d]", argKey, msecs, minTimeMsecs, maxTimeMsecs)
}
return msecs, nil
}
const (
// These values prevent from overflow when storing msec-precision time in int64.
minTimeMsecs = int64(-1<<63) / 1e6
maxTimeMsecs = int64(1<<63-1) / 1e6
)
func getDuration(r *http.Request, argKey string, defaultValue int64) (int64, error) {
argValue := r.FormValue(argKey)
if len(argValue) == 0 {
return defaultValue, nil
}
secs, err := strconv.ParseFloat(argValue, 64)
if err != nil {
// Try parsing string format
d, err := time.ParseDuration(argValue)
if err != nil {
return 0, fmt.Errorf("cannot parse %q=%q: %s", argKey, argValue, err)
}
secs = d.Seconds()
}
msecs := int64(secs * 1e3)
if msecs <= 0 || msecs > maxDurationMsecs {
return 0, fmt.Errorf("%q=%dms is out of allowed range [%d ... %d]", argKey, msecs, 0, maxDurationMsecs)
}
return msecs, nil
}
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
func getDeadline(r *http.Request) netstorage.Deadline {
d, err := getDuration(r, "timeout", 0)
if err != nil {
d = 0
}
dMax := int64(maxQueryDuration.Seconds() * 1e3)
if d <= 0 || d > dMax {
d = dMax
}
timeout := time.Duration(d) * time.Millisecond
return netstorage.NewDeadline(timeout)
}
func getBool(r *http.Request, argKey string) bool {
argValue := r.FormValue(argKey)
switch strings.ToLower(argValue) {
case "", "0", "f", "false", "no":
return false
default:
return true
}
}
func currentTime() int64 {
return int64(time.Now().UTC().Unix()) * 1e3
}
func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) {
tagFilterss := make([][]storage.TagFilter, 0, len(matches))
for _, match := range matches {
tagFilters, err := promql.ParseMetricSelector(match)
if err != nil {
return nil, fmt.Errorf("cannot parse %q: %s", match, err)
}
tagFilterss = append(tagFilterss, tagFilters)
}
return tagFilterss, nil
}