2019-05-22 23:16:55 +02:00
package prometheus
import (
"flag"
"fmt"
"math"
2021-09-15 17:04:28 +02:00
"net"
2019-05-22 23:16:55 +02:00
"net/http"
2019-08-04 22:06:55 +02:00
"sort"
2019-05-22 23:16:55 +02:00
"strconv"
2020-10-12 19:01:51 +02:00
"strings"
2019-08-04 22:06:55 +02:00
"sync"
2019-05-22 23:16:55 +02:00
"time"
2020-09-27 22:17:14 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
2020-12-25 15:44:26 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
2020-09-10 23:29:26 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2020-09-26 03:29:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-09-26 03:29:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-05-14 21:01:51 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-07-31 17:00:21 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
2020-03-10 20:45:15 +01:00
"github.com/valyala/fastjson/fastfloat"
2019-05-22 23:16:55 +02:00
"github.com/valyala/quicktemplate"
)
var (
2020-02-12 13:00:38 +01:00
latencyOffset = flag . Duration ( "search.latencyOffset" , time . Second * 30 , "The time when data points become visible in query results after the collection. " +
2019-10-28 11:30:50 +01:00
"Too small value can result in incomplete last points for query results" )
2020-09-10 23:29:26 +02:00
maxQueryLen = flagutil . NewBytes ( "search.maxQueryLen" , 16 * 1024 , "The maximum search query length in bytes" )
2021-06-10 07:32:52 +02:00
maxLookback = flag . Duration ( "search.maxLookback" , 0 , "Synonym to -search.lookback-delta from Prometheus. " +
2020-04-20 18:25:32 +02:00
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. " +
"See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons" )
maxStalenessInterval = flag . Duration ( "search.maxStalenessInterval" , 0 , "The maximum interval for staleness calculations. " +
2020-04-20 18:41:59 +02:00
"By default it is automatically calculated from the median interval between samples. This flag could be useful for tuning " +
2020-04-20 18:25:32 +02:00
"Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. " +
2020-09-15 13:24:27 +02:00
"See also '-search.maxLookback' flag, which has the same meaning due to historical reasons" )
2021-01-19 21:55:46 +01:00
maxStepForPointsAdjustment = flag . Duration ( "search.maxStepForPointsAdjustment" , time . Minute , "The maximum step when /api/v1/query_range handler adjusts " +
"points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data" )
2021-09-15 17:04:28 +02:00
selectNodes = flagutil . NewArray ( "selectNode" , "Comma-serparated addresses of vmselect nodes; usage: -selectNode=vmselect-host1,...,vmselect-hostN" )
2019-05-22 23:16:55 +02:00
)
// Default step used if not set.
const defaultStep = 5 * 60 * 1000
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
2020-02-04 15:13:59 +01:00
func FederateHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer federateDuration . UpdateDuration ( startTime )
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse request form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
2019-10-15 18:12:27 +02:00
lookbackDelta , err := getMaxLookback ( r )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2019-10-15 18:12:27 +02:00
if lookbackDelta <= 0 {
lookbackDelta = defaultStep
}
2020-09-10 23:29:26 +02:00
start , err := searchutils . GetTime ( r , "start" , ct - lookbackDelta )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:29:26 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:29:26 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
if start >= end {
start = end - defaultStep
}
2021-02-01 16:42:35 +01:00
tagFilterss , err := getTagFilterssFromRequest ( r )
2019-05-22 23:16:55 +02:00
if err != nil {
return err
}
2020-11-16 17:00:50 +01:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , start , end , tagFilterss )
2020-11-14 11:36:21 +01:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
rss , isPartial , err := netstorage . ProcessSearchQuery ( at , denyPartialResponse , sq , true , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
2019-05-22 23:16:55 +02:00
}
2020-11-14 11:36:21 +01:00
if isPartial {
return fmt . Errorf ( "cannot export federated metrics, because some of vmstorage nodes are unavailable" )
2019-06-30 00:27:03 +02:00
}
2019-05-22 23:16:55 +02:00
2020-11-13 09:25:39 +01:00
w . Header ( ) . Set ( "Content-Type" , "text/plain; charset=utf-8" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
err = rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
if err := bw . Error ( ) ; err != nil {
return err
}
bb := quicktemplate . AcquireByteBuffer ( )
WriteFederate ( bb , rs )
2020-09-29 10:36:12 +02:00
_ , err := bw . Write ( bb . B )
2019-05-22 23:16:55 +02:00
quicktemplate . ReleaseByteBuffer ( bb )
2020-09-29 10:36:12 +02:00
return err
2020-09-27 22:17:14 +02:00
} )
2019-05-22 23:16:55 +02:00
if err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "error during sending data to remote client: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
return nil
}
var federateDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/federate"} ` )
2020-10-12 19:01:51 +02:00
// ExportCSVHandler exports data in CSV format from /api/v1/export/csv
func ExportCSVHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer exportCSVDuration . UpdateDuration ( startTime )
2020-10-12 19:01:51 +02:00
ct := startTime . UnixNano ( ) / 1e6
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %w" , err )
}
format := r . FormValue ( "format" )
if len ( format ) == 0 {
2021-04-20 19:16:17 +02:00
return fmt . Errorf ( "missing `format` arg; see https://docs.victoriametrics.com/#how-to-export-csv-data" )
2020-10-12 19:01:51 +02:00
}
fieldNames := strings . Split ( format , "," )
start , err := searchutils . GetTime ( r , "start" , 0 )
if err != nil {
return err
}
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return err
}
2021-12-17 09:56:03 +01:00
reduceMemUsage := searchutils . GetBool ( r , "reduce_mem_usage" )
2020-10-12 19:01:51 +02:00
deadline := searchutils . GetDeadlineForExport ( r , startTime )
2021-02-01 16:42:35 +01:00
tagFilterss , err := getTagFilterssFromRequest ( r )
2020-10-12 19:01:51 +02:00
if err != nil {
return err
}
2020-11-16 17:00:50 +01:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , start , end , tagFilterss )
2020-11-13 09:25:39 +01:00
w . Header ( ) . Set ( "Content-Type" , "text/csv; charset=utf-8" )
2020-10-12 19:01:51 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-12-08 19:49:32 +01:00
resultsCh := make ( chan * quicktemplate . ByteBuffer , cgroup . AvailableCPUs ( ) )
2021-12-17 09:56:03 +01:00
writeCSVLine := func ( xb * exportBlock ) {
if len ( xb . timestamps ) == 0 {
return
}
bb := quicktemplate . AcquireByteBuffer ( )
WriteExportCSVLine ( bb , xb , fieldNames )
resultsCh <- bb
}
doneCh := make ( chan error , 1 )
if ! reduceMemUsage {
// Unconditionally deny partial response for the exported data,
// since users usually expect that the exported data is full.
denyPartialResponse := true
2021-12-17 10:52:04 +01:00
rss , _ , err := netstorage . ProcessSearchQuery ( at , denyPartialResponse , sq , true , deadline )
2021-12-17 09:56:03 +01:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
go func ( ) {
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
if err := bw . Error ( ) ; err != nil {
return err
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = & rs . MetricName
xb . timestamps = rs . Timestamps
xb . values = rs . Values
writeCSVLine ( xb )
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
close ( resultsCh )
doneCh <- err
} ( )
} else {
go func ( ) {
err := netstorage . ExportBlocks ( at , sq , deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error {
if err := bw . Error ( ) ; err != nil {
return err
}
if err := b . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block during export: %s" , err )
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = mn
xb . timestamps , xb . values = b . AppendRowsWithTimeRangeFilter ( xb . timestamps [ : 0 ] , xb . values [ : 0 ] , tr )
writeCSVLine ( xb )
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
close ( resultsCh )
doneCh <- err
} ( )
}
2020-10-13 08:35:13 +02:00
// Consume all the data from resultsCh.
2020-10-12 19:01:51 +02:00
for bb := range resultsCh {
2020-10-13 08:35:13 +02:00
// Do not check for error in bw.Write, since this error is checked inside netstorage.ExportBlocks above.
_ , _ = bw . Write ( bb . B )
2020-10-12 19:01:51 +02:00
quicktemplate . ReleaseByteBuffer ( bb )
}
if err := bw . Flush ( ) ; err != nil {
return err
}
err = <- doneCh
if err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "error during sending the exported csv data to remote client: %w" , err )
2020-10-12 19:01:51 +02:00
}
return nil
}
var exportCSVDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export/csv"} ` )
2020-09-26 03:29:45 +02:00
// ExportNativeHandler exports data in native format from /api/v1/export/native.
func ExportNativeHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer exportNativeDuration . UpdateDuration ( startTime )
2020-09-26 03:29:45 +02:00
ct := startTime . UnixNano ( ) / 1e6
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %w" , err )
}
start , err := searchutils . GetTime ( r , "start" , 0 )
if err != nil {
return err
}
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return err
}
deadline := searchutils . GetDeadlineForExport ( r , startTime )
2021-02-01 16:42:35 +01:00
tagFilterss , err := getTagFilterssFromRequest ( r )
2020-09-26 03:29:45 +02:00
if err != nil {
return err
}
2020-11-16 17:00:50 +01:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , start , end , tagFilterss )
2020-09-26 03:29:45 +02:00
w . Header ( ) . Set ( "Content-Type" , "VictoriaMetrics/native" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-09-26 03:29:45 +02:00
// Marshal tr
trBuf := make ( [ ] byte , 0 , 16 )
trBuf = encoding . MarshalInt64 ( trBuf , start )
trBuf = encoding . MarshalInt64 ( trBuf , end )
2020-09-29 10:36:12 +02:00
_ , _ = bw . Write ( trBuf )
2020-09-26 03:29:45 +02:00
2020-09-27 22:17:14 +02:00
// Marshal native blocks.
2020-11-14 11:36:21 +01:00
err = netstorage . ExportBlocks ( at , sq , deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error {
2020-09-27 22:17:14 +02:00
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 03:29:45 +02:00
dstBuf := bbPool . Get ( )
tmpBuf := bbPool . Get ( )
dst := dstBuf . B
tmp := tmpBuf . B
// Marshal mn
2020-09-27 23:56:30 +02:00
tmp = mn . MarshalNoAccountIDProjectID ( tmp [ : 0 ] )
2020-09-26 03:29:45 +02:00
dst = encoding . MarshalUint32 ( dst , uint32 ( len ( tmp ) ) )
dst = append ( dst , tmp ... )
// Marshal b
tmp = b . MarshalPortable ( tmp [ : 0 ] )
dst = encoding . MarshalUint32 ( dst , uint32 ( len ( tmp ) ) )
dst = append ( dst , tmp ... )
tmpBuf . B = tmp
bbPool . Put ( tmpBuf )
2020-09-29 10:36:12 +02:00
_ , err := bw . Write ( dst )
2020-09-26 03:29:45 +02:00
dstBuf . B = dst
bbPool . Put ( dstBuf )
2020-09-29 10:36:12 +02:00
return err
2020-09-26 03:29:45 +02:00
} )
2020-09-27 22:17:14 +02:00
if err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "error during sending native data to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "error during flushing native data to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
return nil
2020-09-26 03:29:45 +02:00
}
2020-09-27 22:17:14 +02:00
var exportNativeDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export/native"} ` )
2020-09-26 03:29:45 +02:00
var bbPool bytesutil . ByteBufferPool
2019-05-22 23:16:55 +02:00
// ExportHandler exports data in raw format from /api/v1/export.
2020-02-04 15:13:59 +01:00
func ExportHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer exportDuration . UpdateDuration ( startTime )
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse request form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
2021-02-11 14:00:12 +01:00
matches := getMatchesFromRequest ( r )
if len ( matches ) == 0 {
return fmt . Errorf ( "missing `match[]` query arg" )
2019-05-22 23:16:55 +02:00
}
2020-09-10 23:29:26 +02:00
start , err := searchutils . GetTime ( r , "start" , 0 )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:29:26 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2019-05-22 23:16:55 +02:00
format := r . FormValue ( "format" )
2020-03-10 20:45:15 +01:00
maxRowsPerLine := int ( fastfloat . ParseInt64BestEffort ( r . FormValue ( "max_rows_per_line" ) ) )
2020-09-26 03:29:45 +02:00
reduceMemUsage := searchutils . GetBool ( r , "reduce_mem_usage" )
2020-09-10 23:29:26 +02:00
deadline := searchutils . GetDeadlineForExport ( r , startTime )
2019-05-22 23:16:55 +02:00
if start >= end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-05-22 23:16:55 +02:00
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 16:42:35 +01:00
if err != nil {
return err
}
2021-12-06 16:07:06 +01:00
if err := exportHandler ( at , w , r , matches , etfs , start , end , format , maxRowsPerLine , reduceMemUsage , deadline ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when exporting data for queries=%q on the time range (start=%d, end=%d): %w" , matches , start , end , err )
2019-05-22 23:16:55 +02:00
}
return nil
}
var exportDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export"} ` )
2021-12-06 16:07:06 +01:00
func exportHandler ( at * auth . Token , w http . ResponseWriter , r * http . Request , matches [ ] string , etfs [ ] [ ] storage . TagFilter , start , end int64 ,
2020-09-26 03:29:45 +02:00
format string , maxRowsPerLine int , reduceMemUsage bool , deadline searchutils . Deadline ) error {
2019-05-22 23:16:55 +02:00
writeResponseFunc := WriteExportStdResponse
2020-09-26 03:29:45 +02:00
writeLineFunc := func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
2020-08-10 19:57:18 +02:00
bb := quicktemplate . AcquireByteBuffer ( )
2020-09-26 03:29:45 +02:00
WriteExportJSONLine ( bb , xb )
2020-08-10 19:57:18 +02:00
resultsCh <- bb
}
2020-11-13 09:25:39 +01:00
contentType := "application/stream+json; charset=utf-8"
2020-09-26 03:29:45 +02:00
if format == "prometheus" {
2020-11-13 09:25:39 +01:00
contentType = "text/plain; charset=utf-8"
2020-09-26 03:29:45 +02:00
writeLineFunc = func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
bb := quicktemplate . AcquireByteBuffer ( )
WriteExportPrometheusLine ( bb , xb )
resultsCh <- bb
}
} else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
bb := quicktemplate . AcquireByteBuffer ( )
WriteExportPromAPILine ( bb , xb )
resultsCh <- bb
}
}
2020-03-10 20:45:15 +01:00
if maxRowsPerLine > 0 {
2020-09-26 03:29:45 +02:00
writeLineFuncOrig := writeLineFunc
writeLineFunc = func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
valuesOrig := xb . values
timestampsOrig := xb . timestamps
2020-03-10 20:45:15 +01:00
values := valuesOrig
timestamps := timestampsOrig
for len ( values ) > 0 {
var valuesChunk [ ] float64
var timestampsChunk [ ] int64
if len ( values ) > maxRowsPerLine {
valuesChunk = values [ : maxRowsPerLine ]
timestampsChunk = timestamps [ : maxRowsPerLine ]
values = values [ maxRowsPerLine : ]
timestamps = timestamps [ maxRowsPerLine : ]
} else {
valuesChunk = values
timestampsChunk = timestamps
values = nil
timestamps = nil
}
2020-09-26 03:29:45 +02:00
xb . values = valuesChunk
xb . timestamps = timestampsChunk
writeLineFuncOrig ( xb , resultsCh )
2020-03-10 20:45:15 +01:00
}
2020-09-26 03:29:45 +02:00
xb . values = valuesOrig
xb . timestamps = timestampsOrig
2020-08-10 19:57:18 +02:00
}
2019-05-22 23:16:55 +02:00
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
2021-12-06 16:07:06 +01:00
tagFilterss = searchutils . JoinTagFilterss ( tagFilterss , etfs )
2020-11-16 17:00:50 +01:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , start , end , tagFilterss )
2020-09-27 22:17:14 +02:00
w . Header ( ) . Set ( "Content-Type" , contentType )
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-12-08 19:49:32 +01:00
resultsCh := make ( chan * quicktemplate . ByteBuffer , cgroup . AvailableCPUs ( ) )
2021-12-17 09:56:03 +01:00
doneCh := make ( chan error , 1 )
2020-09-26 03:29:45 +02:00
if ! reduceMemUsage {
2021-03-25 17:38:49 +01:00
// Unconditionally deny partial response for the exported data,
// since users usually expect that the exported data is full.
2021-01-27 13:38:26 +01:00
denyPartialResponse := true
rss , _ , err := netstorage . ProcessSearchQuery ( at , denyPartialResponse , sq , true , deadline )
2020-09-26 03:29:45 +02:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
go func ( ) {
2020-09-27 22:17:14 +02:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 03:29:45 +02:00
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = & rs . MetricName
xb . timestamps = rs . Timestamps
xb . values = rs . Values
writeLineFunc ( xb , resultsCh )
xb . reset ( )
exportBlockPool . Put ( xb )
2020-09-27 22:17:14 +02:00
return nil
2020-09-26 03:29:45 +02:00
} )
close ( resultsCh )
doneCh <- err
} ( )
} else {
go func ( ) {
2020-11-14 11:36:21 +01:00
err := netstorage . ExportBlocks ( at , sq , deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error {
2020-09-27 22:17:14 +02:00
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 03:29:45 +02:00
if err := b . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block during export: %s" , err )
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = mn
xb . timestamps , xb . values = b . AppendRowsWithTimeRangeFilter ( xb . timestamps [ : 0 ] , xb . values [ : 0 ] , tr )
if len ( xb . timestamps ) > 0 {
writeLineFunc ( xb , resultsCh )
}
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
close ( resultsCh )
doneCh <- err
} ( )
}
2019-05-22 23:16:55 +02:00
2020-09-27 22:17:14 +02:00
// writeResponseFunc must consume all the data from resultsCh.
2020-09-26 03:29:45 +02:00
writeResponseFunc ( bw , resultsCh )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
return err
2019-05-24 13:54:31 +02:00
}
2019-05-22 23:16:55 +02:00
err = <- doneCh
if err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "error during sending the data to remote client: %w" , err )
2019-05-22 23:16:55 +02:00
}
return nil
}
2020-09-26 03:29:45 +02:00
type exportBlock struct {
mn * storage . MetricName
timestamps [ ] int64
values [ ] float64
}
func ( xb * exportBlock ) reset ( ) {
xb . mn = nil
xb . timestamps = xb . timestamps [ : 0 ]
xb . values = xb . values [ : 0 ]
}
var exportBlockPool = & sync . Pool {
New : func ( ) interface { } {
return & exportBlock { }
} ,
}
2019-05-22 23:16:55 +02:00
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
2020-02-04 15:13:59 +01:00
func DeleteHandler ( startTime time . Time , at * auth . Token , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer deleteDuration . UpdateDuration ( startTime )
2021-02-02 23:24:05 +01:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse request form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
if r . FormValue ( "start" ) != "" || r . FormValue ( "end" ) != "" {
return fmt . Errorf ( "start and end aren't supported. Remove these args from the query in order to delete all the matching metrics" )
}
2021-02-01 16:42:35 +01:00
tagFilterss , err := getTagFilterssFromRequest ( r )
2019-05-22 23:16:55 +02:00
if err != nil {
return err
}
2021-02-02 23:24:05 +01:00
ct := startTime . UnixNano ( ) / 1e6
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , 0 , ct , tagFilterss )
2019-05-22 23:23:23 +02:00
deletedCount , err := netstorage . DeleteSeries ( at , sq , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2021-02-01 16:42:35 +01:00
return fmt . Errorf ( "cannot delete time series: %w" , err )
2019-05-22 23:16:55 +02:00
}
if deletedCount > 0 {
2019-05-22 23:23:23 +02:00
// Reset rollup result cache on all the vmselect nodes,
// since the cache may contain deleted data.
// TODO: reset only cache for (account, project)
resetRollupResultCaches ( )
2019-05-22 23:16:55 +02:00
}
return nil
}
var deleteDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/admin/tsdb/delete_series"} ` )
2019-05-22 23:23:23 +02:00
func resetRollupResultCaches ( ) {
2020-12-24 08:01:40 +01:00
resetRollupResultCacheCalls . Inc ( )
// Reset local cache before checking whether selectNodes list is empty.
// This guarantees that at least local cache is reset if selectNodes list is empty.
promql . ResetRollupResultCache ( )
2019-06-18 09:26:44 +02:00
if len ( * selectNodes ) == 0 {
2020-12-24 08:01:40 +01:00
logger . Warnf ( "missing -selectNode flag, cache reset request wont be propagated to the other vmselect nodes." +
"This can be fixed by enumerating all the vmselect node addresses in `-selectNode` command line flag. " +
" For example: -selectNode=select-addr-1:8481,select-addr-2:8481" )
return
2019-05-22 23:23:23 +02:00
}
2019-06-18 09:26:44 +02:00
for _ , selectNode := range * selectNodes {
2021-09-15 17:04:28 +02:00
if _ , _ , err := net . SplitHostPort ( selectNode ) ; err != nil {
// Add missing port
selectNode += ":8481"
}
2019-05-22 23:23:23 +02:00
callURL := fmt . Sprintf ( "http://%s/internal/resetRollupResultCache" , selectNode )
resp , err := httpClient . Get ( callURL )
if err != nil {
logger . Errorf ( "error when accessing %q: %s" , callURL , err )
resetRollupResultCacheErrors . Inc ( )
continue
}
if resp . StatusCode != http . StatusOK {
_ = resp . Body . Close ( )
logger . Errorf ( "unexpected status code at %q; got %d; want %d" , callURL , resp . StatusCode , http . StatusOK )
resetRollupResultCacheErrors . Inc ( )
continue
}
_ = resp . Body . Close ( )
}
}
var (
resetRollupResultCacheErrors = metrics . NewCounter ( "vm_reset_rollup_result_cache_errors_total" )
resetRollupResultCacheCalls = metrics . NewCounter ( "vm_reset_rollup_result_cache_calls_total" )
)
var httpClient = & http . Client {
Timeout : time . Second * 5 ,
}
2019-05-22 23:16:55 +02:00
// LabelValuesHandler processes /api/v1/label/<labelName>/values request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
2020-02-04 15:13:59 +01:00
func LabelValuesHandler ( startTime time . Time , at * auth . Token , labelName string , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer labelValuesDuration . UpdateDuration ( startTime )
2020-09-10 23:29:26 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-08-04 22:06:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2019-08-04 22:06:55 +02:00
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 16:42:35 +01:00
if err != nil {
return err
}
2021-02-11 14:00:12 +01:00
matches := getMatchesFromRequest ( r )
2019-08-04 22:06:55 +02:00
var labelValues [ ] string
var isPartial bool
2020-11-14 11:36:21 +01:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
2021-12-06 16:07:06 +01:00
if len ( matches ) == 0 && len ( etfs ) == 0 {
2020-11-04 23:15:43 +01:00
if len ( r . Form [ "start" ] ) == 0 && len ( r . Form [ "end" ] ) == 0 {
2020-11-05 00:36:13 +01:00
var err error
2020-11-14 11:36:21 +01:00
labelValues , isPartial , err = netstorage . GetLabelValues ( at , denyPartialResponse , labelName , deadline )
2020-11-05 00:36:13 +01:00
if err != nil {
return fmt . Errorf ( ` cannot obtain label values for %q: %w ` , labelName , err )
}
2020-11-04 23:15:43 +01:00
} else {
ct := startTime . UnixNano ( ) / 1e6
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return err
}
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
if err != nil {
return err
}
tr := storage . TimeRange {
MinTimestamp : start ,
MaxTimestamp : end ,
}
2020-11-14 11:36:21 +01:00
labelValues , isPartial , err = netstorage . GetLabelValuesOnTimeRange ( at , denyPartialResponse , labelName , tr , deadline )
2020-11-05 00:36:13 +01:00
if err != nil {
return fmt . Errorf ( ` cannot obtain label values on time range for %q: %w ` , labelName , err )
}
2019-08-04 22:06:55 +02:00
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/label/foo/values?match[]=foobar{baz="abc"}&start=...&end=...
// is equivalent to `label_values(foobar{baz="abc"}, foo)` call on the selected
// time range in Grafana templating.
if len ( matches ) == 0 {
matches = [ ] string { fmt . Sprintf ( "{%s!=''}" , labelName ) }
}
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2020-09-10 23:29:26 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-08-04 22:06:55 +02:00
if err != nil {
return err
}
2020-09-10 23:29:26 +02:00
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
2019-08-04 22:06:55 +02:00
if err != nil {
return err
}
2021-12-06 16:07:06 +01:00
labelValues , isPartial , err = labelValuesWithMatches ( at , denyPartialResponse , labelName , matches , etfs , start , end , deadline )
2019-08-04 22:06:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain label values for %q, match[]=%q, start=%d, end=%d: %w" , labelName , matches , start , end , err )
2019-08-04 22:06:55 +02:00
}
2019-05-22 23:16:55 +02:00
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-11-14 11:36:21 +01:00
WriteLabelValuesResponse ( bw , isPartial , labelValues )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "canot flush label values to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-05-22 23:16:55 +02:00
return nil
}
2021-12-06 16:07:06 +01:00
func labelValuesWithMatches ( at * auth . Token , denyPartialResponse bool , labelName string , matches [ ] string , etfs [ ] [ ] storage . TagFilter ,
2021-02-01 16:42:35 +01:00
start , end int64 , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2019-08-04 22:06:55 +02:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , false , err
}
2020-02-28 22:35:47 +01:00
2020-02-28 11:17:27 +01:00
// Add `labelName!=''` tag filter in order to filter out series without the labelName.
2020-02-28 22:35:47 +01:00
// There is no need in adding `__name__!=''` filter, since all the time series should
// already have non-empty name.
if labelName != "__name__" {
key := [ ] byte ( labelName )
for i , tfs := range tagFilterss {
tagFilterss [ i ] = append ( tfs , storage . TagFilter {
Key : key ,
IsNegative : true ,
} )
}
2019-12-14 23:07:09 +01:00
}
2019-08-04 22:06:55 +02:00
if start >= end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-08-04 22:06:55 +02:00
}
2021-12-06 16:07:06 +01:00
tagFilterss = searchutils . JoinTagFilterss ( tagFilterss , etfs )
2021-02-01 16:42:35 +01:00
if len ( tagFilterss ) == 0 {
logger . Panicf ( "BUG: tagFilterss must be non-empty" )
}
2020-11-16 17:00:50 +01:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , start , end , tagFilterss )
2019-08-04 22:06:55 +02:00
m := make ( map [ string ] struct { } )
2020-11-16 12:45:50 +01:00
isPartial := false
if end - start > 24 * 3600 * 1000 {
// It is cheaper to call SearchMetricNames on time ranges exceeding a day.
mns , isPartialResponse , err := netstorage . SearchMetricNames ( at , denyPartialResponse , sq , deadline )
if err != nil {
return nil , false , fmt . Errorf ( "cannot fetch time series for %q: %w" , sq , err )
}
isPartial = isPartialResponse
for _ , mn := range mns {
labelValue := mn . GetTagValue ( labelName )
if len ( labelValue ) == 0 {
continue
}
m [ string ( labelValue ) ] = struct { } { }
}
} else {
rss , isPartialResponse , err := netstorage . ProcessSearchQuery ( at , denyPartialResponse , sq , false , deadline )
if err != nil {
return nil , false , fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
isPartial = isPartialResponse
var mLock sync . Mutex
err = rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
labelValue := rs . MetricName . GetTagValue ( labelName )
if len ( labelValue ) == 0 {
return nil
}
mLock . Lock ( )
m [ string ( labelValue ) ] = struct { } { }
mLock . Unlock ( )
2020-09-27 22:17:14 +02:00
return nil
2020-11-16 12:45:50 +01:00
} )
if err != nil {
2021-09-16 11:56:58 +02:00
return nil , false , fmt . Errorf ( "cannot fetch label values from storage: %w" , err )
2019-08-04 22:06:55 +02:00
}
}
labelValues := make ( [ ] string , 0 , len ( m ) )
for labelValue := range m {
labelValues = append ( labelValues , labelValue )
}
sort . Strings ( labelValues )
return labelValues , isPartial , nil
}
2019-05-22 23:16:55 +02:00
var labelValuesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/label/ { }/values"} ` )
2019-06-10 17:55:20 +02:00
// LabelsCountHandler processes /api/v1/labels/count request.
2020-02-04 15:13:59 +01:00
func LabelsCountHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer labelsCountDuration . UpdateDuration ( startTime )
2021-03-30 20:38:59 +02:00
deadline := searchutils . GetDeadlineForStatusRequest ( r , startTime )
2020-11-14 11:36:21 +01:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
labelEntries , isPartial , err := netstorage . GetLabelEntries ( at , denyPartialResponse , deadline )
2019-06-10 17:55:20 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( ` cannot obtain label entries: %w ` , err )
2019-06-10 17:55:20 +02:00
}
2020-11-14 11:36:21 +01:00
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-11-14 11:36:21 +01:00
WriteLabelsCountResponse ( bw , isPartial , labelEntries )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send labels count response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-06-10 17:55:20 +02:00
return nil
}
var labelsCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels/count"} ` )
2020-04-22 18:57:36 +02:00
const secsPerDay = 3600 * 24
// TSDBStatusHandler processes /api/v1/status/tsdb request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
2021-05-12 14:18:45 +02:00
//
// It can accept `match[]` filters in order to narrow down the search.
2020-04-22 18:57:36 +02:00
func TSDBStatusHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer tsdbStatusDuration . UpdateDuration ( startTime )
2021-03-30 20:38:59 +02:00
deadline := searchutils . GetDeadlineForStatusRequest ( r , startTime )
2020-04-22 18:57:36 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2020-04-22 18:57:36 +02:00
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-05-12 14:18:45 +02:00
if err != nil {
return err
}
matches := getMatchesFromRequest ( r )
2020-05-14 21:01:51 +02:00
date := fasttime . UnixDate ( )
2020-04-22 18:57:36 +02:00
dateStr := r . FormValue ( "date" )
if len ( dateStr ) > 0 {
t , err := time . Parse ( "2006-01-02" , dateStr )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse `date` arg %q: %w" , dateStr , err )
2020-04-22 18:57:36 +02:00
}
2020-05-14 21:01:51 +02:00
date = uint64 ( t . Unix ( ) ) / secsPerDay
2020-04-22 18:57:36 +02:00
}
topN := 10
topNStr := r . FormValue ( "topN" )
if len ( topNStr ) > 0 {
n , err := strconv . Atoi ( topNStr )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse `topN` arg %q: %w" , topNStr , err )
2020-04-22 18:57:36 +02:00
}
if n <= 0 {
n = 1
}
if n > 1000 {
n = 1000
}
topN = n
}
2020-11-14 11:36:21 +01:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
2021-05-12 14:18:45 +02:00
var status * storage . TSDBStatus
var isPartial bool
2021-12-06 16:07:06 +01:00
if len ( matches ) == 0 && len ( etfs ) == 0 {
2021-05-12 14:18:45 +02:00
status , isPartial , err = netstorage . GetTSDBStatusForDate ( at , denyPartialResponse , deadline , date , topN )
if err != nil {
return fmt . Errorf ( ` cannot obtain tsdb status for date=%d, topN=%d: %w ` , date , topN , err )
}
} else {
2021-12-06 16:07:06 +01:00
status , isPartial , err = tsdbStatusWithMatches ( at , denyPartialResponse , matches , etfs , date , topN , deadline )
2021-05-12 14:18:45 +02:00
if err != nil {
return fmt . Errorf ( "cannot obtain tsdb status with matches for date=%d, topN=%d: %w" , date , topN , err )
}
2020-04-22 18:57:36 +02:00
}
2020-11-14 11:36:21 +01:00
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-11-14 11:36:21 +01:00
WriteTSDBStatusResponse ( bw , isPartial , status )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send tsdb status response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2020-04-22 18:57:36 +02:00
return nil
}
2021-12-06 16:07:06 +01:00
func tsdbStatusWithMatches ( at * auth . Token , denyPartialResponse bool , matches [ ] string , etfs [ ] [ ] storage . TagFilter , date uint64 , topN int , deadline searchutils . Deadline ) ( * storage . TSDBStatus , bool , error ) {
2021-05-12 14:18:45 +02:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , false , err
}
2021-12-06 16:07:06 +01:00
tagFilterss = searchutils . JoinTagFilterss ( tagFilterss , etfs )
2021-05-12 14:18:45 +02:00
if len ( tagFilterss ) == 0 {
logger . Panicf ( "BUG: tagFilterss must be non-empty" )
}
start := int64 ( date * secsPerDay ) * 1000
end := int64 ( date * secsPerDay + secsPerDay ) * 1000
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , start , end , tagFilterss )
status , isPartial , err := netstorage . GetTSDBStatusWithFilters ( at , denyPartialResponse , deadline , sq , topN )
if err != nil {
return nil , false , err
}
return status , isPartial , nil
}
2020-04-22 18:57:36 +02:00
var tsdbStatusDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/status/tsdb"} ` )
2019-05-22 23:16:55 +02:00
// LabelsHandler processes /api/v1/labels request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
2020-02-04 15:13:59 +01:00
func LabelsHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer labelsDuration . UpdateDuration ( startTime )
2020-09-10 23:29:26 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-12-14 23:07:09 +01:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2019-12-14 23:07:09 +01:00
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 16:42:35 +01:00
if err != nil {
return err
}
2021-02-11 14:00:12 +01:00
matches := getMatchesFromRequest ( r )
2019-12-14 23:07:09 +01:00
var labels [ ] string
var isPartial bool
2020-11-14 11:36:21 +01:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
2021-12-06 16:07:06 +01:00
if len ( matches ) == 0 && len ( etfs ) == 0 {
2020-11-04 23:15:43 +01:00
if len ( r . Form [ "start" ] ) == 0 && len ( r . Form [ "end" ] ) == 0 {
2020-11-05 00:36:13 +01:00
var err error
2020-11-14 11:36:21 +01:00
labels , isPartial , err = netstorage . GetLabels ( at , denyPartialResponse , deadline )
2020-11-05 00:36:13 +01:00
if err != nil {
return fmt . Errorf ( "cannot obtain labels: %w" , err )
}
2020-11-04 23:15:43 +01:00
} else {
ct := startTime . UnixNano ( ) / 1e6
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return err
}
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
if err != nil {
return err
}
tr := storage . TimeRange {
MinTimestamp : start ,
MaxTimestamp : end ,
}
2020-11-14 11:36:21 +01:00
labels , isPartial , err = netstorage . GetLabelsOnTimeRange ( at , denyPartialResponse , tr , deadline )
2020-11-05 00:36:13 +01:00
if err != nil {
return fmt . Errorf ( "cannot obtain labels on time range: %w" , err )
}
2019-12-14 23:07:09 +01:00
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/labels?match[]=foobar{baz="abc"}&start=...&end=...
if len ( matches ) == 0 {
matches = [ ] string { "{__name__!=''}" }
}
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2020-09-10 23:29:26 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-12-14 23:07:09 +01:00
if err != nil {
return err
}
2020-09-10 23:29:26 +02:00
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
2019-12-14 23:07:09 +01:00
if err != nil {
return err
}
2021-12-06 16:07:06 +01:00
labels , isPartial , err = labelsWithMatches ( at , denyPartialResponse , matches , etfs , start , end , deadline )
2019-12-14 23:07:09 +01:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain labels for match[]=%q, start=%d, end=%d: %w" , matches , start , end , err )
2019-12-14 23:07:09 +01:00
}
2019-05-22 23:16:55 +02:00
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-11-14 11:36:21 +01:00
WriteLabelsResponse ( bw , isPartial , labels )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send labels response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-05-22 23:16:55 +02:00
return nil
}
2021-12-06 16:07:06 +01:00
func labelsWithMatches ( at * auth . Token , denyPartialResponse bool , matches [ ] string , etfs [ ] [ ] storage . TagFilter , start , end int64 , deadline searchutils . Deadline ) ( [ ] string , bool , error ) {
2019-12-14 23:07:09 +01:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , false , err
}
if start >= end {
end = start + defaultStep
}
2021-12-06 16:07:06 +01:00
tagFilterss = searchutils . JoinTagFilterss ( tagFilterss , etfs )
2021-02-01 16:42:35 +01:00
if len ( tagFilterss ) == 0 {
logger . Panicf ( "BUG: tagFilterss must be non-empty" )
}
2020-11-16 17:00:50 +01:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , start , end , tagFilterss )
2019-12-14 23:07:09 +01:00
m := make ( map [ string ] struct { } )
2020-11-16 12:45:50 +01:00
isPartial := false
if end - start > 24 * 3600 * 1000 {
// It is cheaper to call SearchMetricNames on time ranges exceeding a day.
mns , isPartialResponse , err := netstorage . SearchMetricNames ( at , denyPartialResponse , sq , deadline )
if err != nil {
return nil , false , fmt . Errorf ( "cannot fetch time series for %q: %w" , sq , err )
}
isPartial = isPartialResponse
for _ , mn := range mns {
for _ , tag := range mn . Tags {
m [ string ( tag . Key ) ] = struct { } { }
}
}
2020-11-16 12:54:34 +01:00
if len ( mns ) > 0 {
m [ "__name__" ] = struct { } { }
}
2020-11-16 12:45:50 +01:00
} else {
rss , isPartialResponse , err := netstorage . ProcessSearchQuery ( at , denyPartialResponse , sq , false , deadline )
if err != nil {
return nil , false , fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
isPartial = isPartialResponse
var mLock sync . Mutex
err = rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
mLock . Lock ( )
for _ , tag := range rs . MetricName . Tags {
m [ string ( tag . Key ) ] = struct { } { }
}
m [ "__name__" ] = struct { } { }
mLock . Unlock ( )
return nil
} )
if err != nil {
2021-09-16 11:56:58 +02:00
return nil , false , fmt . Errorf ( "cannot fetch labels from storage: %w" , err )
2020-11-16 12:45:50 +01:00
}
2019-12-14 23:07:09 +01:00
}
labels := make ( [ ] string , 0 , len ( m ) )
for label := range m {
labels = append ( labels , label )
}
sort . Strings ( labels )
return labels , isPartial , nil
}
2019-05-22 23:16:55 +02:00
var labelsDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels"} ` )
// SeriesCountHandler processes /api/v1/series/count request.
2020-02-04 15:13:59 +01:00
func SeriesCountHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer seriesCountDuration . UpdateDuration ( startTime )
2021-03-30 20:38:59 +02:00
deadline := searchutils . GetDeadlineForStatusRequest ( r , startTime )
2020-11-14 11:36:21 +01:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
n , isPartial , err := netstorage . GetSeriesCount ( at , denyPartialResponse , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain series count: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-11-14 11:36:21 +01:00
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-11-14 11:36:21 +01:00
WriteSeriesCountResponse ( bw , isPartial , n )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send series count response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-05-22 23:16:55 +02:00
return nil
}
var seriesCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series/count"} ` )
// SeriesHandler processes /api/v1/series request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
2020-02-04 15:13:59 +01:00
func SeriesHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer seriesDuration . UpdateDuration ( startTime )
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-09-10 23:29:26 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:29:26 +02:00
// Do not set start to searchutils.minTimeMsecs by default as Prometheus does,
2019-08-04 18:42:36 +02:00
// since this leads to fetching and scanning all the data from the storage,
// which can take a lot of time for big storages.
// It is better setting start as end-defaultStep by default.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/91
2020-09-10 23:29:26 +02:00
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:29:26 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
2021-02-01 16:42:35 +01:00
tagFilterss , err := getTagFilterssFromRequest ( r )
2019-05-22 23:16:55 +02:00
if err != nil {
return err
}
if start >= end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-05-22 23:16:55 +02:00
}
2020-11-16 17:00:50 +01:00
sq := storage . NewSearchQuery ( at . AccountID , at . ProjectID , start , end , tagFilterss )
2020-11-14 11:36:21 +01:00
denyPartialResponse := searchutils . GetDenyPartialResponse ( r )
2020-11-16 09:55:55 +01:00
if end - start > 24 * 3600 * 1000 {
// It is cheaper to call SearchMetricNames on time ranges exceeding a day.
mns , isPartial , err := netstorage . SearchMetricNames ( at , denyPartialResponse , sq , deadline )
if err != nil {
return fmt . Errorf ( "cannot fetch time series for %q: %w" , sq , err )
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-11-16 09:55:55 +01:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
resultsCh := make ( chan * quicktemplate . ByteBuffer )
go func ( ) {
for i := range mns {
bb := quicktemplate . AcquireByteBuffer ( )
writemetricNameObject ( bb , & mns [ i ] )
resultsCh <- bb
}
2020-11-16 12:30:41 +01:00
close ( resultsCh )
2020-11-16 09:55:55 +01:00
} ( )
// WriteSeriesResponse must consume all the data from resultsCh.
WriteSeriesResponse ( bw , isPartial , resultsCh )
if err := bw . Flush ( ) ; err != nil {
return err
}
seriesDuration . UpdateDuration ( startTime )
return nil
}
2020-11-14 11:36:21 +01:00
rss , isPartial , err := netstorage . ProcessSearchQuery ( at , denyPartialResponse , sq , false , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
2019-05-22 23:16:55 +02:00
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2019-05-22 23:16:55 +02:00
resultsCh := make ( chan * quicktemplate . ByteBuffer )
doneCh := make ( chan error )
go func ( ) {
2020-09-27 22:17:14 +02:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) error {
if err := bw . Error ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
bb := quicktemplate . AcquireByteBuffer ( )
writemetricNameObject ( bb , & rs . MetricName )
resultsCh <- bb
2020-09-27 22:17:14 +02:00
return nil
2019-05-22 23:16:55 +02:00
} )
close ( resultsCh )
doneCh <- err
} ( )
2020-09-27 22:17:14 +02:00
// WriteSeriesResponse must consume all the data from resultsCh.
2020-11-14 11:36:21 +01:00
WriteSeriesResponse ( bw , isPartial , resultsCh )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot flush series response to remote client: %w" , err )
2019-05-22 23:16:55 +02:00
}
err = <- doneCh
if err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send series response to remote client: %w" , err )
2019-05-22 23:16:55 +02:00
}
return nil
}
var seriesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series"} ` )
// QueryHandler processes /api/v1/query request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
2020-02-04 15:13:59 +01:00
func QueryHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer queryDuration . UpdateDuration ( startTime )
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
query := r . FormValue ( "query" )
2019-06-20 13:05:07 +02:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2020-09-10 23:29:26 +02:00
start , err := searchutils . GetTime ( r , "time" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-04-17 11:24:10 +02:00
lookbackDelta , err := getMaxLookback ( r )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:29:26 +02:00
step , err := searchutils . GetDuration ( r , "step" , lookbackDelta )
2019-10-15 18:12:27 +02:00
if err != nil {
return err
}
2020-04-17 11:24:10 +02:00
if step <= 0 {
step = defaultStep
}
2020-09-10 23:29:26 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
2020-08-16 16:05:52 +02:00
if len ( query ) > maxQueryLen . N {
return fmt . Errorf ( "too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes" , len ( query ) , maxQueryLen . N )
2019-05-22 23:16:55 +02:00
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 16:42:35 +01:00
if err != nil {
return err
}
2021-07-12 16:16:38 +02:00
if childQuery , windowExpr , offsetExpr := promql . IsMetricSelectorWithRollup ( query ) ; childQuery != "" {
window := windowExpr . Duration ( step )
offset := offsetExpr . Duration ( step )
2019-05-22 23:16:55 +02:00
start -= offset
end := start
start = end - window
2021-03-12 11:16:50 +01:00
// Do not include data point with a timestamp matching the lower boundary of the window as Prometheus does.
start ++
if end < start {
end = start
}
2021-12-06 16:07:06 +01:00
if err := exportHandler ( at , w , r , [ ] string { childQuery } , etfs , start , end , "promapi" , 0 , false , deadline ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when exporting data for query=%q on the time range (start=%d, end=%d): %w" , childQuery , start , end , err )
2019-05-22 23:16:55 +02:00
}
queryDuration . UpdateDuration ( startTime )
return nil
}
2021-07-12 16:16:38 +02:00
if childQuery , windowExpr , stepExpr , offsetExpr := promql . IsRollup ( query ) ; childQuery != "" {
newStep := stepExpr . Duration ( step )
2019-12-10 23:42:44 +01:00
if newStep > 0 {
step = newStep
}
2021-07-12 16:16:38 +02:00
window := windowExpr . Duration ( step )
offset := offsetExpr . Duration ( step )
2019-12-10 23:42:44 +01:00
start -= offset
end := start
start = end - window
2021-12-06 16:07:06 +01:00
if err := queryRangeHandler ( startTime , at , w , childQuery , start , end , step , r , ct , etfs ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w" , childQuery , start , end , step , err )
2019-12-10 23:42:44 +01:00
}
queryDuration . UpdateDuration ( startTime )
return nil
}
2019-05-22 23:16:55 +02:00
2020-09-23 11:58:46 +02:00
queryOffset := getLatencyOffsetMilliseconds ( )
if ! searchutils . GetBool ( r , "nocache" ) && ct - start < queryOffset && start - ct < queryOffset {
// Adjust start time only if `nocache` arg isn't set.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/241
startPrev := start
start = ct - queryOffset
queryOffset = startPrev - start
} else {
queryOffset = 0
}
2019-05-22 23:16:55 +02:00
ec := promql . EvalConfig {
2021-12-06 16:07:06 +01:00
AuthToken : at ,
Start : start ,
End : start ,
Step : step ,
QuotedRemoteAddr : httpserver . GetQuotedRemoteAddr ( r ) ,
Deadline : deadline ,
LookbackDelta : lookbackDelta ,
RoundDigits : getRoundDigits ( r ) ,
EnforcedTagFilterss : etfs ,
2019-06-30 00:27:03 +02:00
2020-09-10 23:29:26 +02:00
DenyPartialResponse : searchutils . GetDenyPartialResponse ( r ) ,
2019-05-22 23:16:55 +02:00
}
2019-07-01 16:14:49 +02:00
result , err := promql . Exec ( & ec , query , true )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when executing query=%q for (time=%d, step=%d): %w" , query , start , step , err )
2019-05-22 23:16:55 +02:00
}
2020-09-23 11:58:46 +02:00
if queryOffset > 0 {
for i := range result {
2020-09-23 12:04:17 +02:00
timestamps := result [ i ] . Timestamps
2020-09-23 11:58:46 +02:00
for j := range timestamps {
timestamps [ j ] += queryOffset
}
}
}
2019-05-22 23:16:55 +02:00
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-11-14 11:36:21 +01:00
WriteQueryResponse ( bw , ec . IsPartialResponse , result )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot flush query response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-05-22 23:16:55 +02:00
return nil
}
var queryDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query"} ` )
// QueryRangeHandler processes /api/v1/query_range request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
2020-02-04 15:13:59 +01:00
func QueryRangeHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer queryRangeDuration . UpdateDuration ( startTime )
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
query := r . FormValue ( "query" )
2019-06-20 13:05:07 +02:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2020-09-10 23:29:26 +02:00
start , err := searchutils . GetTime ( r , "start" , ct - defaultStep )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:29:26 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:29:26 +02:00
step , err := searchutils . GetDuration ( r , "step" , defaultStep )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 16:42:35 +01:00
if err != nil {
return err
}
2021-12-06 16:07:06 +01:00
if err := queryRangeHandler ( startTime , at , w , query , start , end , step , r , ct , etfs ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w" , query , start , end , step , err )
2019-12-10 23:42:44 +01:00
}
return nil
}
2021-12-06 16:07:06 +01:00
func queryRangeHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , query string , start , end , step int64 , r * http . Request , ct int64 , etfs [ ] [ ] storage . TagFilter ) error {
2020-09-10 23:29:26 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
mayCache := ! searchutils . GetBool ( r , "nocache" )
2019-10-15 18:12:27 +02:00
lookbackDelta , err := getMaxLookback ( r )
if err != nil {
return err
}
2019-05-22 23:16:55 +02:00
// Validate input args.
2020-08-16 16:05:52 +02:00
if len ( query ) > maxQueryLen . N {
return fmt . Errorf ( "too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes" , len ( query ) , maxQueryLen . N )
2019-05-22 23:16:55 +02:00
}
if start > end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-05-22 23:16:55 +02:00
}
if err := promql . ValidateMaxPointsPerTimeseries ( start , end , step ) ; err != nil {
return err
}
2019-09-04 12:09:20 +02:00
if mayCache {
start , end = promql . AdjustStartEnd ( start , end , step )
}
2019-05-22 23:16:55 +02:00
ec := promql . EvalConfig {
2021-12-06 16:07:06 +01:00
AuthToken : at ,
Start : start ,
End : end ,
Step : step ,
QuotedRemoteAddr : httpserver . GetQuotedRemoteAddr ( r ) ,
Deadline : deadline ,
MayCache : mayCache ,
LookbackDelta : lookbackDelta ,
RoundDigits : getRoundDigits ( r ) ,
EnforcedTagFilterss : etfs ,
2019-06-30 00:27:03 +02:00
2020-09-10 23:29:26 +02:00
DenyPartialResponse : searchutils . GetDenyPartialResponse ( r ) ,
2019-05-22 23:16:55 +02:00
}
2019-07-01 16:14:49 +02:00
result , err := promql . Exec ( & ec , query , false )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot execute query: %w" , err )
2019-05-22 23:16:55 +02:00
}
2021-01-19 21:55:46 +01:00
if step < maxStepForPointsAdjustment . Milliseconds ( ) {
queryOffset := getLatencyOffsetMilliseconds ( )
if ct - queryOffset < end {
result = adjustLastPoints ( result , ct - queryOffset , ct + step )
}
2019-05-22 23:16:55 +02:00
}
2019-08-20 21:52:49 +02:00
// Remove NaN values as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/153
2020-07-20 14:28:36 +02:00
result = removeEmptyValuesAndTimeseries ( result )
2019-08-20 21:52:49 +02:00
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-11-14 11:36:21 +01:00
WriteQueryRangeResponse ( bw , ec . IsPartialResponse , result )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send query range response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-05-22 23:16:55 +02:00
return nil
}
2020-07-20 14:28:36 +02:00
func removeEmptyValuesAndTimeseries ( tss [ ] netstorage . Result ) [ ] netstorage . Result {
dst := tss [ : 0 ]
2019-08-20 21:52:49 +02:00
for i := range tss {
ts := & tss [ i ]
hasNaNs := false
for _ , v := range ts . Values {
if math . IsNaN ( v ) {
hasNaNs = true
break
}
}
if ! hasNaNs {
// Fast path: nothing to remove.
2020-07-20 14:28:36 +02:00
if len ( ts . Values ) > 0 {
dst = append ( dst , * ts )
}
2019-08-20 21:52:49 +02:00
continue
}
// Slow path: remove NaNs.
srcTimestamps := ts . Timestamps
dstValues := ts . Values [ : 0 ]
dstTimestamps := ts . Timestamps [ : 0 ]
for j , v := range ts . Values {
if math . IsNaN ( v ) {
continue
}
dstValues = append ( dstValues , v )
dstTimestamps = append ( dstTimestamps , srcTimestamps [ j ] )
}
ts . Values = dstValues
ts . Timestamps = dstTimestamps
2020-07-20 14:28:36 +02:00
if len ( ts . Values ) > 0 {
dst = append ( dst , * ts )
}
2019-08-20 21:52:49 +02:00
}
2020-07-20 14:28:36 +02:00
return dst
2019-08-20 21:52:49 +02:00
}
2019-05-22 23:16:55 +02:00
var queryRangeDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query_range"} ` )
2020-07-14 11:45:42 +02:00
var nan = math . NaN ( )
2020-07-05 17:17:02 +02:00
// adjustLastPoints substitutes the last point values on the time range (start..end]
2020-07-14 11:45:42 +02:00
// with the previous point values, since these points may contain incomplete values.
2020-07-05 17:17:02 +02:00
func adjustLastPoints ( tss [ ] netstorage . Result , start , end int64 ) [ ] netstorage . Result {
2019-05-22 23:16:55 +02:00
for i := range tss {
2020-07-05 17:17:02 +02:00
ts := & tss [ i ]
values := ts . Values
timestamps := ts . Timestamps
j := len ( timestamps ) - 1
2020-07-14 11:45:42 +02:00
if j >= 0 && timestamps [ j ] > end {
// It looks like the `offset` is used in the query, which shifts time range beyond the `end`.
// Leave such a time series as is, since it is unclear which points may be incomplete in it.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/625
continue
}
2020-07-05 17:17:02 +02:00
for j >= 0 && timestamps [ j ] > start {
2019-05-22 23:16:55 +02:00
j --
}
2020-07-05 17:17:02 +02:00
j ++
2020-07-14 11:45:42 +02:00
lastValue := nan
if j > 0 {
lastValue = values [ j - 1 ]
2020-07-05 16:56:54 +02:00
}
2020-07-05 17:17:02 +02:00
for j < len ( timestamps ) && timestamps [ j ] <= end {
2020-07-14 11:45:42 +02:00
values [ j ] = lastValue
2020-07-05 17:17:02 +02:00
j ++
2019-05-22 23:16:55 +02:00
}
}
2019-07-04 08:14:15 +02:00
return tss
2019-05-22 23:16:55 +02:00
}
2019-10-15 18:12:27 +02:00
func getMaxLookback ( r * http . Request ) ( int64 , error ) {
2020-03-29 20:50:10 +02:00
d := maxLookback . Milliseconds ( )
2020-04-20 18:25:32 +02:00
if d == 0 {
d = maxStalenessInterval . Milliseconds ( )
}
2020-09-10 23:29:26 +02:00
return searchutils . GetDuration ( r , "max_lookback" , d )
2019-05-22 23:16:55 +02:00
}
func getTagFilterssFromMatches ( matches [ ] string ) ( [ ] [ ] storage . TagFilter , error ) {
tagFilterss := make ( [ ] [ ] storage . TagFilter , 0 , len ( matches ) )
for _ , match := range matches {
2021-12-06 16:07:06 +01:00
tagFilters , err := searchutils . ParseMetricSelector ( match )
2019-05-22 23:16:55 +02:00
if err != nil {
2021-12-06 16:07:06 +01:00
return nil , fmt . Errorf ( "cannot parse matches[]=%s: %w" , match , err )
2019-05-22 23:16:55 +02:00
}
tagFilterss = append ( tagFilterss , tagFilters )
}
return tagFilterss , nil
}
2019-06-30 00:27:03 +02:00
2021-02-01 16:42:35 +01:00
func getTagFilterssFromRequest ( r * http . Request ) ( [ ] [ ] storage . TagFilter , error ) {
2021-02-11 14:00:12 +01:00
matches := getMatchesFromRequest ( r )
if len ( matches ) == 0 {
return nil , fmt . Errorf ( "missing `match[]` query arg" )
2021-02-01 16:42:35 +01:00
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 16:42:35 +01:00
if err != nil {
return nil , err
}
2021-12-06 16:07:06 +01:00
tagFilterss = searchutils . JoinTagFilterss ( tagFilterss , etfs )
2021-02-01 16:42:35 +01:00
return tagFilterss , nil
}
2021-02-11 14:00:12 +01:00
func getMatchesFromRequest ( r * http . Request ) [ ] string {
2021-02-01 16:42:35 +01:00
matches := r . Form [ "match[]" ]
2021-02-11 14:00:12 +01:00
// This is needed for backwards compatibility
matches = append ( matches , r . Form [ "match" ] ... )
return matches
2021-02-01 16:42:35 +01:00
}
2021-03-15 11:35:44 +01:00
func getRoundDigits ( r * http . Request ) int {
s := r . FormValue ( "round_digits" )
if len ( s ) == 0 {
return 100
}
n , err := strconv . Atoi ( s )
if err != nil {
return 100
}
return n
}
2019-10-28 11:30:50 +01:00
func getLatencyOffsetMilliseconds ( ) int64 {
2020-03-29 20:50:10 +02:00
d := latencyOffset . Milliseconds ( )
2019-10-28 11:30:50 +01:00
if d <= 1000 {
d = 1000
}
return d
}
2020-12-25 15:42:05 +01:00
2020-12-25 15:44:26 +01:00
// QueryStatsHandler returns query stats at `/api/v1/status/top_queries`
2020-12-27 11:53:50 +01:00
func QueryStatsHandler ( startTime time . Time , at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer queryStatsDuration . UpdateDuration ( startTime )
2020-12-25 15:42:05 +01:00
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse form values: %w" , err )
}
2020-12-25 15:44:26 +01:00
topN := 20
2020-12-25 15:42:05 +01:00
topNStr := r . FormValue ( "topN" )
if len ( topNStr ) > 0 {
n , err := strconv . Atoi ( topNStr )
if err != nil {
return fmt . Errorf ( "cannot parse `topN` arg %q: %w" , topNStr , err )
}
topN = n
}
2020-12-25 15:44:26 +01:00
maxLifetimeMsecs , err := searchutils . GetDuration ( r , "maxLifetime" , 10 * 60 * 1000 )
if err != nil {
return fmt . Errorf ( "cannot parse `maxLifetime` arg: %w" , err )
}
2020-12-27 11:53:50 +01:00
maxLifetime := time . Duration ( maxLifetimeMsecs ) * time . Millisecond
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-12-25 15:42:05 +01:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-12-27 11:53:50 +01:00
if at == nil {
querystats . WriteJSONQueryStats ( bw , topN , maxLifetime )
} else {
querystats . WriteJSONQueryStatsForAccountProject ( bw , topN , at . AccountID , at . ProjectID , maxLifetime )
}
2020-12-25 15:42:05 +01:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send query stats response to client: %w" , err )
2020-12-25 15:42:05 +01:00
}
return nil
}
2020-12-25 15:44:26 +01:00
var queryStatsDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/status/top_queries"} ` )