mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-05 22:32:20 +01:00
6875fb411a
This may reduce response times for aggregation over big number of time series with small step between output data points.
735 lines
21 KiB
Go
735 lines
21 KiB
Go
package prometheus
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/valyala/quicktemplate"
|
|
)
|
|
|
|
var (
|
|
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution")
|
|
maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
|
|
denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses when some of vmstorage nodes are unavailable. This trades consistency over availability")
|
|
selectNodes = flagutil.NewArray("selectNode", "Addresses of vmselect nodes; usage: -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481")
|
|
)
|
|
|
|
// Default step used if not set.
|
|
const defaultStep = 5 * 60 * 1000
|
|
|
|
// Latency for data processing pipeline, i.e. the time between data is ignested
|
|
// into the system and the time it becomes visible to search.
|
|
const latencyOffset = 60 * 1000
|
|
|
|
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
|
|
func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
|
startTime := time.Now()
|
|
ct := currentTime()
|
|
if err := r.ParseForm(); err != nil {
|
|
return fmt.Errorf("cannot parse request form values: %s", err)
|
|
}
|
|
matches := r.Form["match[]"]
|
|
if len(matches) == 0 {
|
|
return fmt.Errorf("missing `match[]` arg")
|
|
}
|
|
maxLookback, err := getDuration(r, "max_lookback", defaultStep)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
start, err := getTime(r, "start", ct-maxLookback)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
end, err := getTime(r, "end", ct)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
deadline := getDeadline(r)
|
|
if start >= end {
|
|
start = end - defaultStep
|
|
}
|
|
tagFilterss, err := getTagFilterssFromMatches(matches)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sq := &storage.SearchQuery{
|
|
AccountID: at.AccountID,
|
|
ProjectID: at.ProjectID,
|
|
MinTimestamp: start,
|
|
MaxTimestamp: end,
|
|
TagFilterss: tagFilterss,
|
|
}
|
|
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
|
|
}
|
|
if isPartial && getDenyPartialResponse(r) {
|
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
|
}
|
|
|
|
resultsCh := make(chan *quicktemplate.ByteBuffer)
|
|
doneCh := make(chan error)
|
|
go func() {
|
|
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
|
|
bb := quicktemplate.AcquireByteBuffer()
|
|
WriteFederate(bb, rs)
|
|
resultsCh <- bb
|
|
})
|
|
close(resultsCh)
|
|
doneCh <- err
|
|
}()
|
|
|
|
w.Header().Set("Content-Type", "text/plain")
|
|
for bb := range resultsCh {
|
|
w.Write(bb.B)
|
|
quicktemplate.ReleaseByteBuffer(bb)
|
|
}
|
|
|
|
err = <-doneCh
|
|
if err != nil {
|
|
return fmt.Errorf("error during data fetching: %s", err)
|
|
}
|
|
federateDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`)
|
|
|
|
// ExportHandler exports data in raw format from /api/v1/export.
|
|
func ExportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
|
startTime := time.Now()
|
|
ct := currentTime()
|
|
if err := r.ParseForm(); err != nil {
|
|
return fmt.Errorf("cannot parse request form values: %s", err)
|
|
}
|
|
matches := r.Form["match[]"]
|
|
if len(matches) == 0 {
|
|
// Maintain backwards compatibility
|
|
match := r.FormValue("match")
|
|
if len(match) == 0 {
|
|
return fmt.Errorf("missing `match[]` arg")
|
|
}
|
|
matches = []string{match}
|
|
}
|
|
start, err := getTime(r, "start", 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
end, err := getTime(r, "end", ct)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
format := r.FormValue("format")
|
|
deadline := getDeadline(r)
|
|
if start >= end {
|
|
start = end - defaultStep
|
|
}
|
|
if err := exportHandler(at, w, matches, start, end, format, deadline); err != nil {
|
|
return err
|
|
}
|
|
exportDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
|
|
|
|
func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, start, end int64, format string, deadline netstorage.Deadline) error {
|
|
writeResponseFunc := WriteExportStdResponse
|
|
writeLineFunc := WriteExportJSONLine
|
|
contentType := "application/json"
|
|
if format == "prometheus" {
|
|
contentType = "text/plain"
|
|
writeLineFunc = WriteExportPrometheusLine
|
|
} else if format == "promapi" {
|
|
writeResponseFunc = WriteExportPromAPIResponse
|
|
writeLineFunc = WriteExportPromAPILine
|
|
}
|
|
|
|
tagFilterss, err := getTagFilterssFromMatches(matches)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sq := &storage.SearchQuery{
|
|
AccountID: at.AccountID,
|
|
ProjectID: at.ProjectID,
|
|
MinTimestamp: start,
|
|
MaxTimestamp: end,
|
|
TagFilterss: tagFilterss,
|
|
}
|
|
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
|
|
}
|
|
if isPartial {
|
|
rss.Cancel()
|
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
|
}
|
|
|
|
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
|
|
doneCh := make(chan error)
|
|
go func() {
|
|
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
|
|
bb := quicktemplate.AcquireByteBuffer()
|
|
writeLineFunc(bb, rs)
|
|
resultsCh <- bb
|
|
})
|
|
close(resultsCh)
|
|
doneCh <- err
|
|
}()
|
|
|
|
w.Header().Set("Content-Type", contentType)
|
|
writeResponseFunc(w, resultsCh)
|
|
|
|
// Consume all the data from resultsCh in the event writeResponseFunc
|
|
// fails to consume all the data.
|
|
for bb := range resultsCh {
|
|
quicktemplate.ReleaseByteBuffer(bb)
|
|
}
|
|
err = <-doneCh
|
|
if err != nil {
|
|
return fmt.Errorf("error during data fetching: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
|
|
func DeleteHandler(at *auth.Token, r *http.Request) error {
|
|
startTime := time.Now()
|
|
if err := r.ParseForm(); err != nil {
|
|
return fmt.Errorf("cannot parse request form values: %s", err)
|
|
}
|
|
if r.FormValue("start") != "" || r.FormValue("end") != "" {
|
|
return fmt.Errorf("start and end aren't supported. Remove these args from the query in order to delete all the matching metrics")
|
|
}
|
|
matches := r.Form["match[]"]
|
|
if len(matches) == 0 {
|
|
return fmt.Errorf("missing `match[]` arg")
|
|
}
|
|
deadline := getDeadline(r)
|
|
tagFilterss, err := getTagFilterssFromMatches(matches)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sq := &storage.SearchQuery{
|
|
AccountID: at.AccountID,
|
|
ProjectID: at.ProjectID,
|
|
TagFilterss: tagFilterss,
|
|
}
|
|
deletedCount, err := netstorage.DeleteSeries(at, sq, deadline)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot delete time series matching %q: %s", matches, err)
|
|
}
|
|
if deletedCount > 0 {
|
|
// Reset rollup result cache on all the vmselect nodes,
|
|
// since the cache may contain deleted data.
|
|
// TODO: reset only cache for (account, project)
|
|
resetRollupResultCaches()
|
|
}
|
|
deleteDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
var deleteDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/admin/tsdb/delete_series"}`)
|
|
|
|
func resetRollupResultCaches() {
|
|
if len(*selectNodes) == 0 {
|
|
logger.Panicf("BUG: missing -selectNode flag")
|
|
}
|
|
for _, selectNode := range *selectNodes {
|
|
callURL := fmt.Sprintf("http://%s/internal/resetRollupResultCache", selectNode)
|
|
resp, err := httpClient.Get(callURL)
|
|
if err != nil {
|
|
logger.Errorf("error when accessing %q: %s", callURL, err)
|
|
resetRollupResultCacheErrors.Inc()
|
|
continue
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
_ = resp.Body.Close()
|
|
logger.Errorf("unexpected status code at %q; got %d; want %d", callURL, resp.StatusCode, http.StatusOK)
|
|
resetRollupResultCacheErrors.Inc()
|
|
continue
|
|
}
|
|
_ = resp.Body.Close()
|
|
}
|
|
resetRollupResultCacheCalls.Inc()
|
|
}
|
|
|
|
var (
|
|
resetRollupResultCacheErrors = metrics.NewCounter("vm_reset_rollup_result_cache_errors_total")
|
|
resetRollupResultCacheCalls = metrics.NewCounter("vm_reset_rollup_result_cache_calls_total")
|
|
)
|
|
|
|
var httpClient = &http.Client{
|
|
Timeout: time.Second * 5,
|
|
}
|
|
|
|
// LabelValuesHandler processes /api/v1/label/<labelName>/values request.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
|
|
func LabelValuesHandler(at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error {
|
|
startTime := time.Now()
|
|
deadline := getDeadline(r)
|
|
labelValues, isPartial, err := netstorage.GetLabelValues(at, labelName, deadline)
|
|
if err != nil {
|
|
return fmt.Errorf(`cannot obtain label values for %q: %s`, labelName, err)
|
|
}
|
|
if isPartial && getDenyPartialResponse(r) {
|
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
WriteLabelValuesResponse(w, labelValues)
|
|
labelValuesDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/label/{}/values"}`)
|
|
|
|
// LabelsCountHandler processes /api/v1/labels/count request.
|
|
func LabelsCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
|
startTime := time.Now()
|
|
deadline := getDeadline(r)
|
|
labelEntries, isPartial, err := netstorage.GetLabelEntries(at, deadline)
|
|
if err != nil {
|
|
return fmt.Errorf(`cannot obtain label entries: %s`, err)
|
|
}
|
|
if isPartial && getDenyPartialResponse(r) {
|
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
WriteLabelsCountResponse(w, labelEntries)
|
|
labelsCountDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
var labelsCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels/count"}`)
|
|
|
|
// LabelsHandler processes /api/v1/labels request.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
|
|
func LabelsHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
|
startTime := time.Now()
|
|
deadline := getDeadline(r)
|
|
labels, isPartial, err := netstorage.GetLabels(at, deadline)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot obtain labels: %s", err)
|
|
}
|
|
if isPartial && getDenyPartialResponse(r) {
|
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
WriteLabelsResponse(w, labels)
|
|
labelsDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels"}`)
|
|
|
|
// SeriesCountHandler processes /api/v1/series/count request.
|
|
func SeriesCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
|
startTime := time.Now()
|
|
deadline := getDeadline(r)
|
|
n, isPartial, err := netstorage.GetSeriesCount(at, deadline)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot obtain series count: %s", err)
|
|
}
|
|
if isPartial && getDenyPartialResponse(r) {
|
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
WriteSeriesCountResponse(w, n)
|
|
seriesCountDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
var seriesCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/series/count"}`)
|
|
|
|
// SeriesHandler processes /api/v1/series request.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
|
|
func SeriesHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
|
startTime := time.Now()
|
|
ct := currentTime()
|
|
|
|
if err := r.ParseForm(); err != nil {
|
|
return fmt.Errorf("cannot parse form values: %s", err)
|
|
}
|
|
matches := r.Form["match[]"]
|
|
if len(matches) == 0 {
|
|
return fmt.Errorf("missing `match[]` arg")
|
|
}
|
|
// Set start to minTimeMsecs by default as Prometheus does.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/91
|
|
start, err := getTime(r, "start", minTimeMsecs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
end, err := getTime(r, "end", ct)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
deadline := getDeadline(r)
|
|
|
|
tagFilterss, err := getTagFilterssFromMatches(matches)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if start >= end {
|
|
start = end - defaultStep
|
|
}
|
|
sq := &storage.SearchQuery{
|
|
AccountID: at.AccountID,
|
|
ProjectID: at.ProjectID,
|
|
MinTimestamp: start,
|
|
MaxTimestamp: end,
|
|
TagFilterss: tagFilterss,
|
|
}
|
|
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
|
|
}
|
|
if isPartial && getDenyPartialResponse(r) {
|
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
|
}
|
|
|
|
resultsCh := make(chan *quicktemplate.ByteBuffer)
|
|
doneCh := make(chan error)
|
|
go func() {
|
|
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
|
|
bb := quicktemplate.AcquireByteBuffer()
|
|
writemetricNameObject(bb, &rs.MetricName)
|
|
resultsCh <- bb
|
|
})
|
|
close(resultsCh)
|
|
doneCh <- err
|
|
}()
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
WriteSeriesResponse(w, resultsCh)
|
|
|
|
// Consume all the data from resultsCh in the event WriteSeriesResponse
|
|
// fails to consume all the data.
|
|
for bb := range resultsCh {
|
|
quicktemplate.ReleaseByteBuffer(bb)
|
|
}
|
|
err = <-doneCh
|
|
if err != nil {
|
|
return fmt.Errorf("error during data fetching: %s", err)
|
|
}
|
|
seriesDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
var seriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/series"}`)
|
|
|
|
// QueryHandler processes /api/v1/query request.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
|
|
func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
|
startTime := time.Now()
|
|
ct := currentTime()
|
|
|
|
query := r.FormValue("query")
|
|
if len(query) == 0 {
|
|
return fmt.Errorf("missing `query` arg")
|
|
}
|
|
start, err := getTime(r, "time", ct)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
step, err := getDuration(r, "step", latencyOffset)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
deadline := getDeadline(r)
|
|
|
|
if len(query) > *maxQueryLen {
|
|
return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen)
|
|
}
|
|
if ct-start < latencyOffset {
|
|
start -= latencyOffset
|
|
}
|
|
if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" {
|
|
var window int64
|
|
if len(windowStr) > 0 {
|
|
var err error
|
|
window, err = promql.DurationValue(windowStr, step)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
var offset int64
|
|
if len(offsetStr) > 0 {
|
|
var err error
|
|
offset, err = promql.DurationValue(offsetStr, step)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
start -= offset
|
|
end := start
|
|
start = end - window
|
|
if err := exportHandler(at, w, []string{childQuery}, start, end, "promapi", deadline); err != nil {
|
|
return err
|
|
}
|
|
queryDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
ec := promql.EvalConfig{
|
|
AuthToken: at,
|
|
Start: start,
|
|
End: start,
|
|
Step: step,
|
|
Deadline: deadline,
|
|
|
|
DenyPartialResponse: getDenyPartialResponse(r),
|
|
}
|
|
result, err := promql.Exec(&ec, query, true)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot execute %q: %s", query, err)
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
WriteQueryResponse(w, result)
|
|
queryDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
var queryDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/query"}`)
|
|
|
|
// QueryRangeHandler processes /api/v1/query_range request.
|
|
//
|
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
|
|
func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
|
startTime := time.Now()
|
|
ct := currentTime()
|
|
|
|
query := r.FormValue("query")
|
|
if len(query) == 0 {
|
|
return fmt.Errorf("missing `query` arg")
|
|
}
|
|
start, err := getTime(r, "start", ct-defaultStep)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
end, err := getTime(r, "end", ct)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
step, err := getDuration(r, "step", defaultStep)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
deadline := getDeadline(r)
|
|
mayCache := !getBool(r, "nocache")
|
|
|
|
// Validate input args.
|
|
if len(query) > *maxQueryLen {
|
|
return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen)
|
|
}
|
|
if start > end {
|
|
start = end
|
|
}
|
|
if err := promql.ValidateMaxPointsPerTimeseries(start, end, step); err != nil {
|
|
return err
|
|
}
|
|
start, end = promql.AdjustStartEnd(start, end, step)
|
|
|
|
ec := promql.EvalConfig{
|
|
AuthToken: at,
|
|
Start: start,
|
|
End: end,
|
|
Step: step,
|
|
Deadline: deadline,
|
|
MayCache: mayCache,
|
|
|
|
DenyPartialResponse: getDenyPartialResponse(r),
|
|
}
|
|
result, err := promql.Exec(&ec, query, false)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot execute %q: %s", query, err)
|
|
}
|
|
if ct-end < latencyOffset {
|
|
result = adjustLastPoints(result)
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
WriteQueryRangeResponse(w, result)
|
|
queryRangeDuration.UpdateDuration(startTime)
|
|
return nil
|
|
}
|
|
|
|
var queryRangeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/query_range"}`)
|
|
|
|
// adjustLastPoints substitutes the last point values with the previous
|
|
// point values, since the last points may contain garbage.
|
|
func adjustLastPoints(tss []netstorage.Result) []netstorage.Result {
|
|
if len(tss) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Search for the last non-NaN value across all the timeseries.
|
|
lastNonNaNIdx := -1
|
|
for i := range tss {
|
|
values := tss[i].Values
|
|
j := len(values) - 1
|
|
for j >= 0 && math.IsNaN(values[j]) {
|
|
j--
|
|
}
|
|
if j > lastNonNaNIdx {
|
|
lastNonNaNIdx = j
|
|
}
|
|
}
|
|
if lastNonNaNIdx == -1 {
|
|
// All timeseries contain only NaNs.
|
|
return nil
|
|
}
|
|
|
|
// Substitute the last two values starting from lastNonNaNIdx
|
|
// with the previous values for each timeseries.
|
|
for i := range tss {
|
|
values := tss[i].Values
|
|
for j := 0; j < 2; j++ {
|
|
idx := lastNonNaNIdx + j
|
|
if idx <= 0 || idx >= len(values) || math.IsNaN(values[idx-1]) {
|
|
continue
|
|
}
|
|
values[idx] = values[idx-1]
|
|
}
|
|
}
|
|
return tss
|
|
}
|
|
|
|
func getTime(r *http.Request, argKey string, defaultValue int64) (int64, error) {
|
|
argValue := r.FormValue(argKey)
|
|
if len(argValue) == 0 {
|
|
return defaultValue, nil
|
|
}
|
|
secs, err := strconv.ParseFloat(argValue, 64)
|
|
if err != nil {
|
|
// Try parsing string format
|
|
t, err := time.Parse(time.RFC3339, argValue)
|
|
if err != nil {
|
|
// Handle Prometheus'-provided minTime and maxTime.
|
|
// See https://github.com/prometheus/client_golang/issues/614
|
|
switch argValue {
|
|
case prometheusMinTimeFormatted:
|
|
return minTimeMsecs, nil
|
|
case prometheusMaxTimeFormatted:
|
|
return maxTimeMsecs, nil
|
|
}
|
|
return 0, fmt.Errorf("cannot parse %q=%q: %s", argKey, argValue, err)
|
|
}
|
|
secs = float64(t.UnixNano()) / 1e9
|
|
}
|
|
msecs := int64(secs * 1e3)
|
|
if msecs < minTimeMsecs {
|
|
msecs = 0
|
|
}
|
|
if msecs > maxTimeMsecs {
|
|
msecs = maxTimeMsecs
|
|
}
|
|
return msecs, nil
|
|
}
|
|
|
|
var (
|
|
// These constants were obtained from https://github.com/prometheus/prometheus/blob/91d7175eaac18b00e370965f3a8186cc40bf9f55/web/api/v1/api.go#L442
|
|
// See https://github.com/prometheus/client_golang/issues/614 for details.
|
|
prometheusMinTimeFormatted = time.Unix(math.MinInt64/1000+62135596801, 0).UTC().Format(time.RFC3339Nano)
|
|
prometheusMaxTimeFormatted = time.Unix(math.MaxInt64/1000-62135596801, 999999999).UTC().Format(time.RFC3339Nano)
|
|
)
|
|
|
|
const (
|
|
// These values prevent from overflow when storing msec-precision time in int64.
|
|
minTimeMsecs = 0 // use 0 instead of `int64(-1<<63) / 1e6` because the storage engine doesn't actually support negative time
|
|
maxTimeMsecs = int64(1<<63-1) / 1e6
|
|
)
|
|
|
|
func getDuration(r *http.Request, argKey string, defaultValue int64) (int64, error) {
|
|
argValue := r.FormValue(argKey)
|
|
if len(argValue) == 0 {
|
|
return defaultValue, nil
|
|
}
|
|
secs, err := strconv.ParseFloat(argValue, 64)
|
|
if err != nil {
|
|
// Try parsing string format
|
|
d, err := time.ParseDuration(argValue)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot parse %q=%q: %s", argKey, argValue, err)
|
|
}
|
|
secs = d.Seconds()
|
|
}
|
|
msecs := int64(secs * 1e3)
|
|
if msecs <= 0 || msecs > maxDurationMsecs {
|
|
return 0, fmt.Errorf("%q=%dms is out of allowed range [%d ... %d]", argKey, msecs, 0, int64(maxDurationMsecs))
|
|
}
|
|
return msecs, nil
|
|
}
|
|
|
|
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
|
|
|
|
func getDeadline(r *http.Request) netstorage.Deadline {
|
|
d, err := getDuration(r, "timeout", 0)
|
|
if err != nil {
|
|
d = 0
|
|
}
|
|
dMax := int64(maxQueryDuration.Seconds() * 1e3)
|
|
if d <= 0 || d > dMax {
|
|
d = dMax
|
|
}
|
|
timeout := time.Duration(d) * time.Millisecond
|
|
return netstorage.NewDeadline(timeout)
|
|
}
|
|
|
|
func getBool(r *http.Request, argKey string) bool {
|
|
argValue := r.FormValue(argKey)
|
|
switch strings.ToLower(argValue) {
|
|
case "", "0", "f", "false", "no":
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func currentTime() int64 {
|
|
return int64(time.Now().UTC().Unix()) * 1e3
|
|
}
|
|
|
|
func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) {
|
|
tagFilterss := make([][]storage.TagFilter, 0, len(matches))
|
|
for _, match := range matches {
|
|
tagFilters, err := promql.ParseMetricSelector(match)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse %q: %s", match, err)
|
|
}
|
|
tagFilterss = append(tagFilterss, tagFilters)
|
|
}
|
|
return tagFilterss, nil
|
|
}
|
|
|
|
func getDenyPartialResponse(r *http.Request) bool {
|
|
if *denyPartialResponse {
|
|
return true
|
|
}
|
|
return getBool(r, "deny_partial_response")
|
|
}
|