2019-05-22 23:16:55 +02:00
package prometheus
import (
"flag"
"fmt"
"math"
"net/http"
"runtime"
2019-08-04 22:09:18 +02:00
"sort"
2019-05-22 23:16:55 +02:00
"strconv"
2020-10-12 19:01:51 +02:00
"strings"
2019-08-04 22:09:18 +02:00
"sync"
2019-05-22 23:16:55 +02:00
"time"
2020-09-27 22:17:14 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
2020-09-10 23:28:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
2020-09-26 03:29:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-05-14 21:01:51 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-08-16 16:05:52 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-07-31 17:00:21 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-08-04 22:09:18 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
2020-04-28 14:28:22 +02:00
"github.com/VictoriaMetrics/metricsql"
2020-03-10 20:45:15 +01:00
"github.com/valyala/fastjson/fastfloat"
2019-05-22 23:16:55 +02:00
"github.com/valyala/quicktemplate"
)
var (
2020-09-11 13:16:40 +02:00
latencyOffset = flag . Duration ( "search.latencyOffset" , time . Second * 30 , "The time when data points become visible in query results after the collection. " +
2019-10-28 11:30:50 +01:00
"Too small value can result in incomplete last points for query results" )
2020-09-10 23:28:19 +02:00
maxQueryLen = flagutil . NewBytes ( "search.maxQueryLen" , 16 * 1024 , "The maximum search query length in bytes" )
maxLookback = flag . Duration ( "search.maxLookback" , 0 , "Synonim to -search.lookback-delta from Prometheus. " +
2020-04-20 18:25:32 +02:00
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. " +
"See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons" )
maxStalenessInterval = flag . Duration ( "search.maxStalenessInterval" , 0 , "The maximum interval for staleness calculations. " +
2020-04-20 18:41:59 +02:00
"By default it is automatically calculated from the median interval between samples. This flag could be useful for tuning " +
2020-04-20 18:25:32 +02:00
"Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. " +
2020-09-15 13:24:27 +02:00
"See also '-search.maxLookback' flag, which has the same meaning due to historical reasons" )
2019-05-22 23:16:55 +02:00
)
// Default step used if not set.
const defaultStep = 5 * 60 * 1000
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
2020-02-04 15:13:59 +01:00
func FederateHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse request form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
matches := r . Form [ "match[]" ]
2019-06-20 13:05:07 +02:00
if len ( matches ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-10-15 18:12:27 +02:00
lookbackDelta , err := getMaxLookback ( r )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2019-10-15 18:12:27 +02:00
if lookbackDelta <= 0 {
lookbackDelta = defaultStep
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , ct - lookbackDelta )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
if start >= end {
start = end - defaultStep
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
2019-08-04 21:15:33 +02:00
rss , err := netstorage . ProcessSearchQuery ( sq , true , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
2019-05-22 23:16:55 +02:00
}
w . Header ( ) . Set ( "Content-Type" , "text/plain" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
err = rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
if err := bw . Error ( ) ; err != nil {
return err
}
bb := quicktemplate . AcquireByteBuffer ( )
WriteFederate ( bb , rs )
2020-09-29 10:36:12 +02:00
_ , err := bw . Write ( bb . B )
2019-05-22 23:16:55 +02:00
quicktemplate . ReleaseByteBuffer ( bb )
2020-09-29 10:36:12 +02:00
return err
2020-09-27 22:17:14 +02:00
} )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error during data fetching: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
federateDuration . UpdateDuration ( startTime )
return nil
}
var federateDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/federate"} ` )
2020-10-12 19:01:51 +02:00
// ExportCSVHandler exports data in CSV format from /api/v1/export/csv
func ExportCSVHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
ct := startTime . UnixNano ( ) / 1e6
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %w" , err )
}
format := r . FormValue ( "format" )
if len ( format ) == 0 {
return fmt . Errorf ( "missing `format` arg; see https://victoriametrics.github.io/#how-to-export-csv-data" )
}
fieldNames := strings . Split ( format , "," )
matches := r . Form [ "match[]" ]
if len ( matches ) == 0 {
// Maintain backwards compatibility
match := r . FormValue ( "match" )
if len ( match ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
matches = [ ] string { match }
}
start , err := searchutils . GetTime ( r , "start" , 0 )
if err != nil {
return err
}
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return err
}
deadline := searchutils . GetDeadlineForExport ( r , startTime )
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
w . Header ( ) . Set ( "Content-Type" , "text/csv" )
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
resultsCh := make ( chan * quicktemplate . ByteBuffer , runtime . GOMAXPROCS ( - 1 ) )
doneCh := make ( chan error )
go func ( ) {
err := netstorage . ExportBlocks ( sq , deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error {
if err := bw . Error ( ) ; err != nil {
return err
}
if err := b . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block during export: %s" , err )
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = mn
xb . timestamps , xb . values = b . AppendRowsWithTimeRangeFilter ( xb . timestamps [ : 0 ] , xb . values [ : 0 ] , tr )
if len ( xb . timestamps ) > 0 {
bb := quicktemplate . AcquireByteBuffer ( )
WriteExportCSVLine ( bb , xb , fieldNames )
resultsCh <- bb
}
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
close ( resultsCh )
doneCh <- err
} ( )
2020-10-13 08:35:13 +02:00
// Consume all the data from resultsCh.
2020-10-12 19:01:51 +02:00
for bb := range resultsCh {
2020-10-13 08:35:13 +02:00
// Do not check for error in bw.Write, since this error is checked inside netstorage.ExportBlocks above.
_ , _ = bw . Write ( bb . B )
2020-10-12 19:01:51 +02:00
quicktemplate . ReleaseByteBuffer ( bb )
}
if err := bw . Flush ( ) ; err != nil {
return err
}
err = <- doneCh
if err != nil {
return fmt . Errorf ( "error during exporting data to csv: %w" , err )
}
exportCSVDuration . UpdateDuration ( startTime )
return nil
}
var exportCSVDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export/csv"} ` )
2020-09-26 03:29:45 +02:00
// ExportNativeHandler exports data in native format from /api/v1/export/native.
func ExportNativeHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
ct := startTime . UnixNano ( ) / 1e6
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %w" , err )
}
matches := r . Form [ "match[]" ]
if len ( matches ) == 0 {
// Maintain backwards compatibility
match := r . FormValue ( "match" )
if len ( match ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
matches = [ ] string { match }
}
start , err := searchutils . GetTime ( r , "start" , 0 )
if err != nil {
return err
}
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return err
}
deadline := searchutils . GetDeadlineForExport ( r , startTime )
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
w . Header ( ) . Set ( "Content-Type" , "VictoriaMetrics/native" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-09-26 03:29:45 +02:00
// Marshal tr
trBuf := make ( [ ] byte , 0 , 16 )
trBuf = encoding . MarshalInt64 ( trBuf , start )
trBuf = encoding . MarshalInt64 ( trBuf , end )
2020-09-29 10:36:12 +02:00
_ , _ = bw . Write ( trBuf )
2020-09-26 03:29:45 +02:00
2020-09-27 22:17:14 +02:00
// Marshal native blocks.
2020-09-26 03:29:45 +02:00
err = netstorage . ExportBlocks ( sq , deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error {
2020-09-27 22:17:14 +02:00
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 03:29:45 +02:00
dstBuf := bbPool . Get ( )
tmpBuf := bbPool . Get ( )
dst := dstBuf . B
tmp := tmpBuf . B
// Marshal mn
tmp = mn . Marshal ( tmp [ : 0 ] )
dst = encoding . MarshalUint32 ( dst , uint32 ( len ( tmp ) ) )
dst = append ( dst , tmp ... )
// Marshal b
tmp = b . MarshalPortable ( tmp [ : 0 ] )
dst = encoding . MarshalUint32 ( dst , uint32 ( len ( tmp ) ) )
dst = append ( dst , tmp ... )
tmpBuf . B = tmp
bbPool . Put ( tmpBuf )
2020-09-29 10:36:12 +02:00
_ , err := bw . Write ( dst )
2020-09-26 03:29:45 +02:00
dstBuf . B = dst
bbPool . Put ( dstBuf )
2020-09-29 10:36:12 +02:00
return err
2020-09-26 03:29:45 +02:00
} )
2020-09-27 22:17:14 +02:00
if err != nil {
return err
}
if err := bw . Flush ( ) ; err != nil {
return err
}
exportNativeDuration . UpdateDuration ( startTime )
return nil
2020-09-26 03:29:45 +02:00
}
2020-09-27 22:17:14 +02:00
var exportNativeDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export/native"} ` )
2020-09-26 03:29:45 +02:00
var bbPool bytesutil . ByteBufferPool
2019-05-22 23:16:55 +02:00
// ExportHandler exports data in raw format from /api/v1/export.
2020-02-04 15:13:59 +01:00
func ExportHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse request form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
matches := r . Form [ "match[]" ]
if len ( matches ) == 0 {
// Maintain backwards compatibility
match := r . FormValue ( "match" )
2019-06-20 13:05:07 +02:00
if len ( match ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-05-22 23:16:55 +02:00
matches = [ ] string { match }
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , 0 )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2019-05-22 23:16:55 +02:00
format := r . FormValue ( "format" )
2020-03-10 20:45:15 +01:00
maxRowsPerLine := int ( fastfloat . ParseInt64BestEffort ( r . FormValue ( "max_rows_per_line" ) ) )
2020-09-26 03:29:45 +02:00
reduceMemUsage := searchutils . GetBool ( r , "reduce_mem_usage" )
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForExport ( r , startTime )
2019-05-22 23:16:55 +02:00
if start >= end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-05-22 23:16:55 +02:00
}
2020-09-26 03:29:45 +02:00
if err := exportHandler ( w , matches , start , end , format , maxRowsPerLine , reduceMemUsage , deadline ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when exporting data for queries=%q on the time range (start=%d, end=%d): %w" , matches , start , end , err )
2019-05-22 23:16:55 +02:00
}
exportDuration . UpdateDuration ( startTime )
return nil
}
var exportDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export"} ` )
2020-09-26 03:29:45 +02:00
func exportHandler ( w http . ResponseWriter , matches [ ] string , start , end int64 , format string , maxRowsPerLine int , reduceMemUsage bool , deadline searchutils . Deadline ) error {
2019-05-22 23:16:55 +02:00
writeResponseFunc := WriteExportStdResponse
2020-09-26 03:29:45 +02:00
writeLineFunc := func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
2020-08-10 19:57:18 +02:00
bb := quicktemplate . AcquireByteBuffer ( )
2020-09-26 03:29:45 +02:00
WriteExportJSONLine ( bb , xb )
2020-08-10 19:57:18 +02:00
resultsCh <- bb
}
2020-09-26 03:29:45 +02:00
contentType := "application/stream+json"
if format == "prometheus" {
contentType = "text/plain"
writeLineFunc = func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
bb := quicktemplate . AcquireByteBuffer ( )
WriteExportPrometheusLine ( bb , xb )
resultsCh <- bb
}
} else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
bb := quicktemplate . AcquireByteBuffer ( )
WriteExportPromAPILine ( bb , xb )
resultsCh <- bb
}
}
2020-03-10 20:45:15 +01:00
if maxRowsPerLine > 0 {
2020-09-26 03:29:45 +02:00
writeLineFuncOrig := writeLineFunc
writeLineFunc = func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
valuesOrig := xb . values
timestampsOrig := xb . timestamps
2020-03-10 20:45:15 +01:00
values := valuesOrig
timestamps := timestampsOrig
for len ( values ) > 0 {
var valuesChunk [ ] float64
var timestampsChunk [ ] int64
if len ( values ) > maxRowsPerLine {
valuesChunk = values [ : maxRowsPerLine ]
timestampsChunk = timestamps [ : maxRowsPerLine ]
values = values [ maxRowsPerLine : ]
timestamps = timestamps [ maxRowsPerLine : ]
} else {
valuesChunk = values
timestampsChunk = timestamps
values = nil
timestamps = nil
}
2020-09-26 03:29:45 +02:00
xb . values = valuesChunk
xb . timestamps = timestampsChunk
writeLineFuncOrig ( xb , resultsCh )
2020-03-10 20:45:15 +01:00
}
2020-09-26 03:29:45 +02:00
xb . values = valuesOrig
xb . timestamps = timestampsOrig
2020-08-10 19:57:18 +02:00
}
2019-05-22 23:16:55 +02:00
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
2020-09-27 22:17:14 +02:00
w . Header ( ) . Set ( "Content-Type" , contentType )
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2019-05-22 23:16:55 +02:00
resultsCh := make ( chan * quicktemplate . ByteBuffer , runtime . GOMAXPROCS ( - 1 ) )
doneCh := make ( chan error )
2020-09-26 03:29:45 +02:00
if ! reduceMemUsage {
rss , err := netstorage . ProcessSearchQuery ( sq , true , deadline )
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
go func ( ) {
2020-09-27 22:17:14 +02:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 03:29:45 +02:00
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = & rs . MetricName
xb . timestamps = rs . Timestamps
xb . values = rs . Values
writeLineFunc ( xb , resultsCh )
xb . reset ( )
exportBlockPool . Put ( xb )
2020-09-27 22:17:14 +02:00
return nil
2020-09-26 03:29:45 +02:00
} )
close ( resultsCh )
doneCh <- err
} ( )
} else {
go func ( ) {
err := netstorage . ExportBlocks ( sq , deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error {
2020-09-27 22:17:14 +02:00
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 03:29:45 +02:00
if err := b . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block during export: %s" , err )
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = mn
xb . timestamps , xb . values = b . AppendRowsWithTimeRangeFilter ( xb . timestamps [ : 0 ] , xb . values [ : 0 ] , tr )
if len ( xb . timestamps ) > 0 {
writeLineFunc ( xb , resultsCh )
}
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
close ( resultsCh )
doneCh <- err
} ( )
}
2019-05-22 23:16:55 +02:00
2020-09-27 22:17:14 +02:00
// writeResponseFunc must consume all the data from resultsCh.
2020-09-26 03:29:45 +02:00
writeResponseFunc ( bw , resultsCh )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
return err
2019-05-24 13:54:31 +02:00
}
2019-05-22 23:16:55 +02:00
err = <- doneCh
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error during data fetching: %w" , err )
2019-05-22 23:16:55 +02:00
}
return nil
}
2020-09-26 03:29:45 +02:00
type exportBlock struct {
mn * storage . MetricName
timestamps [ ] int64
values [ ] float64
}
func ( xb * exportBlock ) reset ( ) {
xb . mn = nil
xb . timestamps = xb . timestamps [ : 0 ]
xb . values = xb . values [ : 0 ]
}
var exportBlockPool = & sync . Pool {
New : func ( ) interface { } {
return & exportBlock { }
} ,
}
2019-05-22 23:16:55 +02:00
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
2020-02-04 15:13:59 +01:00
func DeleteHandler ( startTime time . Time , r * http . Request ) error {
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse request form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
if r . FormValue ( "start" ) != "" || r . FormValue ( "end" ) != "" {
return fmt . Errorf ( "start and end aren't supported. Remove these args from the query in order to delete all the matching metrics" )
}
matches := r . Form [ "match[]" ]
2019-06-20 13:05:07 +02:00
if len ( matches ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-05-22 23:16:55 +02:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
TagFilterss : tagFilterss ,
}
deletedCount , err := netstorage . DeleteSeries ( sq )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot delete time series matching %q: %w" , matches , err )
2019-05-22 23:16:55 +02:00
}
if deletedCount > 0 {
promql . ResetRollupResultCache ( )
}
deleteDuration . UpdateDuration ( startTime )
return nil
}
var deleteDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/admin/tsdb/delete_series"} ` )
// LabelValuesHandler processes /api/v1/label/<labelName>/values request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
2020-02-04 15:13:59 +01:00
func LabelValuesHandler ( startTime time . Time , labelName string , w http . ResponseWriter , r * http . Request ) error {
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-08-04 22:09:18 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2019-08-04 22:09:18 +02:00
}
var labelValues [ ] string
if len ( r . Form [ "match[]" ] ) == 0 && len ( r . Form [ "start" ] ) == 0 && len ( r . Form [ "end" ] ) == 0 {
var err error
labelValues , err = netstorage . GetLabelValues ( labelName , deadline )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( ` cannot obtain label values for %q: %w ` , labelName , err )
2019-08-04 22:09:18 +02:00
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/label/foo/values?match[]=foobar{baz="abc"}&start=...&end=...
// is equivalent to `label_values(foobar{baz="abc"}, foo)` call on the selected
// time range in Grafana templating.
matches := r . Form [ "match[]" ]
if len ( matches ) == 0 {
matches = [ ] string { fmt . Sprintf ( "{%s!=''}" , labelName ) }
}
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-08-04 22:09:18 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
2019-08-04 22:09:18 +02:00
if err != nil {
return err
}
labelValues , err = labelValuesWithMatches ( labelName , matches , start , end , deadline )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain label values for %q, match[]=%q, start=%d, end=%d: %w" , labelName , matches , start , end , err )
2019-08-04 22:09:18 +02:00
}
2019-05-22 23:16:55 +02:00
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteLabelValuesResponse ( bw , labelValues )
if err := bw . Flush ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
labelValuesDuration . UpdateDuration ( startTime )
return nil
}
2020-09-11 12:18:57 +02:00
func labelValuesWithMatches ( labelName string , matches [ ] string , start , end int64 , deadline searchutils . Deadline ) ( [ ] string , error ) {
2019-08-04 22:09:18 +02:00
if len ( matches ) == 0 {
logger . Panicf ( "BUG: matches must be non-empty" )
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
2020-02-28 22:35:47 +01:00
2020-02-28 11:17:27 +01:00
// Add `labelName!=''` tag filter in order to filter out series without the labelName.
2020-02-28 22:35:47 +01:00
// There is no need in adding `__name__!=''` filter, since all the time series should
// already have non-empty name.
if labelName != "__name__" {
key := [ ] byte ( labelName )
for i , tfs := range tagFilterss {
tagFilterss [ i ] = append ( tfs , storage . TagFilter {
Key : key ,
IsNegative : true ,
} )
}
2019-12-14 23:07:09 +01:00
}
2019-08-04 22:09:18 +02:00
if start >= end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-08-04 22:09:18 +02:00
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
rss , err := netstorage . ProcessSearchQuery ( sq , false , deadline )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
2019-08-04 22:09:18 +02:00
}
m := make ( map [ string ] struct { } )
var mLock sync . Mutex
2020-09-27 22:17:14 +02:00
err = rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
2019-08-04 22:09:18 +02:00
labelValue := rs . MetricName . GetTagValue ( labelName )
if len ( labelValue ) == 0 {
2020-09-27 22:17:14 +02:00
return nil
2019-08-04 22:09:18 +02:00
}
mLock . Lock ( )
m [ string ( labelValue ) ] = struct { } { }
mLock . Unlock ( )
2020-09-27 22:17:14 +02:00
return nil
2019-08-04 22:09:18 +02:00
} )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "error when data fetching: %w" , err )
2019-08-04 22:09:18 +02:00
}
labelValues := make ( [ ] string , 0 , len ( m ) )
for labelValue := range m {
labelValues = append ( labelValues , labelValue )
}
sort . Strings ( labelValues )
return labelValues , nil
}
2019-05-22 23:16:55 +02:00
var labelValuesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/label/ { }/values"} ` )
2019-06-10 17:55:20 +02:00
// LabelsCountHandler processes /api/v1/labels/count request.
2020-02-04 15:13:59 +01:00
func LabelsCountHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-06-10 17:55:20 +02:00
labelEntries , err := netstorage . GetLabelEntries ( deadline )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( ` cannot obtain label entries: %w ` , err )
2019-06-10 17:55:20 +02:00
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteLabelsCountResponse ( bw , labelEntries )
if err := bw . Flush ( ) ; err != nil {
return err
}
2019-06-10 17:55:20 +02:00
labelsCountDuration . UpdateDuration ( startTime )
return nil
}
var labelsCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels/count"} ` )
2020-04-22 18:57:36 +02:00
const secsPerDay = 3600 * 24
// TSDBStatusHandler processes /api/v1/status/tsdb request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
func TSDBStatusHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2020-04-22 18:57:36 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2020-04-22 18:57:36 +02:00
}
2020-05-14 21:01:51 +02:00
date := fasttime . UnixDate ( )
2020-04-22 18:57:36 +02:00
dateStr := r . FormValue ( "date" )
if len ( dateStr ) > 0 {
t , err := time . Parse ( "2006-01-02" , dateStr )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse `date` arg %q: %w" , dateStr , err )
2020-04-22 18:57:36 +02:00
}
2020-05-14 21:01:51 +02:00
date = uint64 ( t . Unix ( ) ) / secsPerDay
2020-04-22 18:57:36 +02:00
}
topN := 10
topNStr := r . FormValue ( "topN" )
if len ( topNStr ) > 0 {
n , err := strconv . Atoi ( topNStr )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse `topN` arg %q: %w" , topNStr , err )
2020-04-22 18:57:36 +02:00
}
if n <= 0 {
n = 1
}
if n > 1000 {
n = 1000
}
topN = n
}
2020-05-14 21:01:51 +02:00
status , err := netstorage . GetTSDBStatusForDate ( deadline , date , topN )
2020-04-22 18:57:36 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( ` cannot obtain tsdb status for date=%d, topN=%d: %w ` , date , topN , err )
2020-04-22 18:57:36 +02:00
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteTSDBStatusResponse ( bw , status )
if err := bw . Flush ( ) ; err != nil {
return err
}
2020-04-22 18:57:36 +02:00
tsdbStatusDuration . UpdateDuration ( startTime )
return nil
}
var tsdbStatusDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/status/tsdb"} ` )
2019-05-22 23:16:55 +02:00
// LabelsHandler processes /api/v1/labels request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
2020-02-04 15:13:59 +01:00
func LabelsHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-12-14 23:07:09 +01:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2019-12-14 23:07:09 +01:00
}
var labels [ ] string
if len ( r . Form [ "match[]" ] ) == 0 && len ( r . Form [ "start" ] ) == 0 && len ( r . Form [ "end" ] ) == 0 {
var err error
labels , err = netstorage . GetLabels ( deadline )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain labels: %w" , err )
2019-12-14 23:07:09 +01:00
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/labels?match[]=foobar{baz="abc"}&start=...&end=...
matches := r . Form [ "match[]" ]
if len ( matches ) == 0 {
matches = [ ] string { "{__name__!=''}" }
}
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-12-14 23:07:09 +01:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
2019-12-14 23:07:09 +01:00
if err != nil {
return err
}
labels , err = labelsWithMatches ( matches , start , end , deadline )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain labels for match[]=%q, start=%d, end=%d: %w" , matches , start , end , err )
2019-12-14 23:07:09 +01:00
}
2019-05-22 23:16:55 +02:00
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteLabelsResponse ( bw , labels )
if err := bw . Flush ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
labelsDuration . UpdateDuration ( startTime )
return nil
}
2020-09-11 12:18:57 +02:00
func labelsWithMatches ( matches [ ] string , start , end int64 , deadline searchutils . Deadline ) ( [ ] string , error ) {
2019-12-14 23:07:09 +01:00
if len ( matches ) == 0 {
logger . Panicf ( "BUG: matches must be non-empty" )
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
if start >= end {
end = start + defaultStep
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
rss , err := netstorage . ProcessSearchQuery ( sq , false , deadline )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
2019-12-14 23:07:09 +01:00
}
m := make ( map [ string ] struct { } )
var mLock sync . Mutex
2020-09-27 22:17:14 +02:00
err = rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
2019-12-14 23:07:09 +01:00
mLock . Lock ( )
tags := rs . MetricName . Tags
for i := range tags {
t := & tags [ i ]
m [ string ( t . Key ) ] = struct { } { }
}
m [ "__name__" ] = struct { } { }
mLock . Unlock ( )
2020-09-27 22:17:14 +02:00
return nil
2019-12-14 23:07:09 +01:00
} )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "error when data fetching: %w" , err )
2019-12-14 23:07:09 +01:00
}
labels := make ( [ ] string , 0 , len ( m ) )
for label := range m {
labels = append ( labels , label )
}
sort . Strings ( labels )
return labels , nil
}
2019-05-22 23:16:55 +02:00
var labelsDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels"} ` )
// SeriesCountHandler processes /api/v1/series/count request.
2020-02-04 15:13:59 +01:00
func SeriesCountHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
n , err := netstorage . GetSeriesCount ( deadline )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain series count: %w" , err )
2019-05-22 23:16:55 +02:00
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteSeriesCountResponse ( bw , n )
if err := bw . Flush ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
seriesCountDuration . UpdateDuration ( startTime )
return nil
}
var seriesCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series/count"} ` )
// SeriesHandler processes /api/v1/series request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
2020-02-04 15:13:59 +01:00
func SeriesHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
matches := r . Form [ "match[]" ]
2019-06-20 13:05:07 +02:00
if len ( matches ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
// Do not set start to searchutils.minTimeMsecs by default as Prometheus does,
2019-08-04 18:42:36 +02:00
// since this leads to fetching and scanning all the data from the storage,
// which can take a lot of time for big storages.
// It is better setting start as end-defaultStep by default.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/91
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
if start >= end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-05-22 23:16:55 +02:00
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
2019-08-04 21:15:33 +02:00
rss , err := netstorage . ProcessSearchQuery ( sq , false , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
2019-05-22 23:16:55 +02:00
}
2020-09-27 22:17:14 +02:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2019-05-22 23:16:55 +02:00
resultsCh := make ( chan * quicktemplate . ByteBuffer )
doneCh := make ( chan error )
go func ( ) {
2020-09-27 22:17:14 +02:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
if err := bw . Error ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
bb := quicktemplate . AcquireByteBuffer ( )
writemetricNameObject ( bb , & rs . MetricName )
resultsCh <- bb
2020-09-27 22:17:14 +02:00
return nil
2019-05-22 23:16:55 +02:00
} )
close ( resultsCh )
doneCh <- err
} ( )
2020-09-27 22:17:14 +02:00
// WriteSeriesResponse must consume all the data from resultsCh.
WriteSeriesResponse ( bw , resultsCh )
if err := bw . Flush ( ) ; err != nil {
return err
2019-05-22 23:16:55 +02:00
}
err = <- doneCh
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error during data fetching: %w" , err )
2019-05-22 23:16:55 +02:00
}
seriesDuration . UpdateDuration ( startTime )
return nil
}
var seriesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series"} ` )
// QueryHandler processes /api/v1/query request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
2020-02-04 15:13:59 +01:00
func QueryHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
query := r . FormValue ( "query" )
2019-06-20 13:05:07 +02:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "time" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-04-17 11:24:10 +02:00
lookbackDelta , err := getMaxLookback ( r )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
step , err := searchutils . GetDuration ( r , "step" , lookbackDelta )
2019-10-15 18:12:27 +02:00
if err != nil {
return err
}
2020-04-17 11:24:10 +02:00
if step <= 0 {
step = defaultStep
}
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
2020-08-16 16:05:52 +02:00
if len ( query ) > maxQueryLen . N {
return fmt . Errorf ( "too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes" , len ( query ) , maxQueryLen . N )
2019-05-22 23:16:55 +02:00
}
if childQuery , windowStr , offsetStr := promql . IsMetricSelectorWithRollup ( query ) ; childQuery != "" {
2019-12-10 23:40:36 +01:00
window , err := parsePositiveDuration ( windowStr , step )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse window: %w" , err )
2019-05-22 23:16:55 +02:00
}
2019-12-10 23:40:36 +01:00
offset , err := parseDuration ( offsetStr , step )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse offset: %w" , err )
2019-05-22 23:16:55 +02:00
}
start -= offset
end := start
start = end - window
2020-09-26 03:29:45 +02:00
if err := exportHandler ( w , [ ] string { childQuery } , start , end , "promapi" , 0 , false , deadline ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when exporting data for query=%q on the time range (start=%d, end=%d): %w" , childQuery , start , end , err )
2019-05-22 23:16:55 +02:00
}
queryDuration . UpdateDuration ( startTime )
return nil
}
2019-12-10 23:40:36 +01:00
if childQuery , windowStr , stepStr , offsetStr := promql . IsRollup ( query ) ; childQuery != "" {
newStep , err := parsePositiveDuration ( stepStr , step )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse step: %w" , err )
2019-12-10 23:40:36 +01:00
}
if newStep > 0 {
step = newStep
}
window , err := parsePositiveDuration ( windowStr , step )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse window: %w" , err )
2019-12-10 23:40:36 +01:00
}
offset , err := parseDuration ( offsetStr , step )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse offset: %w" , err )
2019-12-10 23:40:36 +01:00
}
start -= offset
end := start
start = end - window
2020-07-21 17:34:59 +02:00
if err := queryRangeHandler ( startTime , w , childQuery , start , end , step , r , ct ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w" , childQuery , start , end , step , err )
2019-12-10 23:40:36 +01:00
}
queryDuration . UpdateDuration ( startTime )
return nil
}
2019-05-22 23:16:55 +02:00
2020-09-23 11:58:46 +02:00
queryOffset := getLatencyOffsetMilliseconds ( )
if ! searchutils . GetBool ( r , "nocache" ) && ct - start < queryOffset && start - ct < queryOffset {
// Adjust start time only if `nocache` arg isn't set.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/241
startPrev := start
start = ct - queryOffset
queryOffset = startPrev - start
} else {
queryOffset = 0
}
2019-05-22 23:16:55 +02:00
ec := promql . EvalConfig {
2020-07-31 17:00:21 +02:00
Start : start ,
End : start ,
Step : step ,
QuotedRemoteAddr : httpserver . GetQuotedRemoteAddr ( r ) ,
Deadline : deadline ,
LookbackDelta : lookbackDelta ,
2019-05-22 23:16:55 +02:00
}
2019-07-01 16:14:49 +02:00
result , err := promql . Exec ( & ec , query , true )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when executing query=%q for (time=%d, step=%d): %w" , query , start , step , err )
2019-05-22 23:16:55 +02:00
}
2020-09-23 11:58:46 +02:00
if queryOffset > 0 {
for i := range result {
2020-09-23 12:04:17 +02:00
timestamps := result [ i ] . Timestamps
2020-09-23 11:58:46 +02:00
for j := range timestamps {
timestamps [ j ] += queryOffset
}
}
}
2019-05-22 23:16:55 +02:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteQueryResponse ( bw , result )
if err := bw . Flush ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
queryDuration . UpdateDuration ( startTime )
return nil
}
var queryDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query"} ` )
2019-12-10 23:40:36 +01:00
func parseDuration ( s string , step int64 ) ( int64 , error ) {
if len ( s ) == 0 {
return 0 , nil
}
2019-12-25 20:35:47 +01:00
return metricsql . DurationValue ( s , step )
2019-12-10 23:40:36 +01:00
}
func parsePositiveDuration ( s string , step int64 ) ( int64 , error ) {
if len ( s ) == 0 {
return 0 , nil
}
2019-12-25 20:35:47 +01:00
return metricsql . PositiveDurationValue ( s , step )
2019-12-10 23:40:36 +01:00
}
2019-05-22 23:16:55 +02:00
// QueryRangeHandler processes /api/v1/query_range request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
2020-02-04 15:13:59 +01:00
func QueryRangeHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
query := r . FormValue ( "query" )
2019-06-20 13:05:07 +02:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , ct - defaultStep )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
step , err := searchutils . GetDuration ( r , "step" , defaultStep )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-07-21 17:34:59 +02:00
if err := queryRangeHandler ( startTime , w , query , start , end , step , r , ct ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w" , query , start , end , step , err )
2019-12-10 23:40:36 +01:00
}
queryRangeDuration . UpdateDuration ( startTime )
return nil
}
2020-07-21 17:34:59 +02:00
func queryRangeHandler ( startTime time . Time , w http . ResponseWriter , query string , start , end , step int64 , r * http . Request , ct int64 ) error {
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
mayCache := ! searchutils . GetBool ( r , "nocache" )
2019-10-15 18:12:27 +02:00
lookbackDelta , err := getMaxLookback ( r )
if err != nil {
return err
}
2019-05-22 23:16:55 +02:00
// Validate input args.
2020-08-16 16:05:52 +02:00
if len ( query ) > maxQueryLen . N {
return fmt . Errorf ( "too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes" , len ( query ) , maxQueryLen . N )
2019-05-22 23:16:55 +02:00
}
if start > end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-05-22 23:16:55 +02:00
}
if err := promql . ValidateMaxPointsPerTimeseries ( start , end , step ) ; err != nil {
return err
}
2019-09-04 12:09:20 +02:00
if mayCache {
start , end = promql . AdjustStartEnd ( start , end , step )
}
2019-05-22 23:16:55 +02:00
ec := promql . EvalConfig {
2020-07-31 17:00:21 +02:00
Start : start ,
End : end ,
Step : step ,
QuotedRemoteAddr : httpserver . GetQuotedRemoteAddr ( r ) ,
Deadline : deadline ,
MayCache : mayCache ,
LookbackDelta : lookbackDelta ,
2019-05-22 23:16:55 +02:00
}
2019-07-01 16:14:49 +02:00
result , err := promql . Exec ( & ec , query , false )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot execute query: %w" , err )
2019-05-22 23:16:55 +02:00
}
2019-10-28 11:30:50 +01:00
queryOffset := getLatencyOffsetMilliseconds ( )
2020-07-05 17:17:02 +02:00
if ct - queryOffset < end {
2020-07-14 11:45:42 +02:00
result = adjustLastPoints ( result , ct - queryOffset , ct + step )
2019-05-22 23:16:55 +02:00
}
2019-08-20 21:52:49 +02:00
// Remove NaN values as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/153
2020-07-20 14:28:36 +02:00
result = removeEmptyValuesAndTimeseries ( result )
2019-08-20 21:52:49 +02:00
2019-05-22 23:16:55 +02:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteQueryRangeResponse ( bw , result )
if err := bw . Flush ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
return nil
}
2020-07-20 14:28:36 +02:00
func removeEmptyValuesAndTimeseries ( tss [ ] netstorage . Result ) [ ] netstorage . Result {
dst := tss [ : 0 ]
2019-08-20 21:52:49 +02:00
for i := range tss {
ts := & tss [ i ]
hasNaNs := false
for _ , v := range ts . Values {
if math . IsNaN ( v ) {
hasNaNs = true
break
}
}
if ! hasNaNs {
// Fast path: nothing to remove.
2020-07-20 14:28:36 +02:00
if len ( ts . Values ) > 0 {
dst = append ( dst , * ts )
}
2019-08-20 21:52:49 +02:00
continue
}
// Slow path: remove NaNs.
srcTimestamps := ts . Timestamps
dstValues := ts . Values [ : 0 ]
dstTimestamps := ts . Timestamps [ : 0 ]
for j , v := range ts . Values {
if math . IsNaN ( v ) {
continue
}
dstValues = append ( dstValues , v )
dstTimestamps = append ( dstTimestamps , srcTimestamps [ j ] )
}
ts . Values = dstValues
ts . Timestamps = dstTimestamps
2020-07-20 14:28:36 +02:00
if len ( ts . Values ) > 0 {
dst = append ( dst , * ts )
}
2019-08-20 21:52:49 +02:00
}
2020-07-20 14:28:36 +02:00
return dst
2019-08-20 21:52:49 +02:00
}
2019-05-22 23:16:55 +02:00
var queryRangeDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query_range"} ` )
2020-07-14 11:45:42 +02:00
var nan = math . NaN ( )
2020-07-05 17:17:02 +02:00
// adjustLastPoints substitutes the last point values on the time range (start..end]
2020-07-14 11:45:42 +02:00
// with the previous point values, since these points may contain incomplete values.
2020-07-05 17:17:02 +02:00
func adjustLastPoints ( tss [ ] netstorage . Result , start , end int64 ) [ ] netstorage . Result {
2019-05-22 23:16:55 +02:00
for i := range tss {
2020-07-05 17:17:02 +02:00
ts := & tss [ i ]
values := ts . Values
timestamps := ts . Timestamps
j := len ( timestamps ) - 1
2020-07-14 11:45:42 +02:00
if j >= 0 && timestamps [ j ] > end {
// It looks like the `offset` is used in the query, which shifts time range beyond the `end`.
// Leave such a time series as is, since it is unclear which points may be incomplete in it.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/625
continue
}
2020-07-05 17:17:02 +02:00
for j >= 0 && timestamps [ j ] > start {
2019-05-22 23:16:55 +02:00
j --
}
2020-07-05 17:17:02 +02:00
j ++
2020-07-14 11:45:42 +02:00
lastValue := nan
if j > 0 {
lastValue = values [ j - 1 ]
2020-07-05 16:56:54 +02:00
}
2020-07-05 17:17:02 +02:00
for j < len ( timestamps ) && timestamps [ j ] <= end {
2020-07-14 11:45:42 +02:00
values [ j ] = lastValue
2020-07-05 17:17:02 +02:00
j ++
2019-05-22 23:16:55 +02:00
}
}
2019-07-04 08:14:15 +02:00
return tss
2019-05-22 23:16:55 +02:00
}
2019-10-15 18:12:27 +02:00
func getMaxLookback ( r * http . Request ) ( int64 , error ) {
2020-03-29 20:50:10 +02:00
d := maxLookback . Milliseconds ( )
2020-04-20 18:25:32 +02:00
if d == 0 {
d = maxStalenessInterval . Milliseconds ( )
}
2020-09-10 23:28:19 +02:00
return searchutils . GetDuration ( r , "max_lookback" , d )
2019-05-22 23:16:55 +02:00
}
func getTagFilterssFromMatches ( matches [ ] string ) ( [ ] [ ] storage . TagFilter , error ) {
tagFilterss := make ( [ ] [ ] storage . TagFilter , 0 , len ( matches ) )
for _ , match := range matches {
tagFilters , err := promql . ParseMetricSelector ( match )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot parse %q: %w" , match , err )
2019-05-22 23:16:55 +02:00
}
tagFilterss = append ( tagFilterss , tagFilters )
}
return tagFilterss , nil
}
2019-10-28 11:30:50 +01:00
func getLatencyOffsetMilliseconds ( ) int64 {
2020-03-29 20:50:10 +02:00
d := latencyOffset . Milliseconds ( )
2019-10-28 11:30:50 +01:00
if d <= 1000 {
d = 1000
}
return d
}