2019-05-22 23:16:55 +02:00
package prometheus
import (
"flag"
"fmt"
"math"
"net/http"
2019-08-04 22:09:18 +02:00
"sort"
2019-05-22 23:16:55 +02:00
"strconv"
2020-10-12 19:01:51 +02:00
"strings"
2019-08-04 22:09:18 +02:00
"sync"
2019-05-22 23:16:55 +02:00
"time"
2020-09-27 22:17:14 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
2020-12-25 15:44:26 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
2020-09-10 23:28:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
2020-09-26 03:29:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-09-26 03:29:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-05-14 21:01:51 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-08-16 16:05:52 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-07-31 17:00:21 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-08-04 22:09:18 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2022-06-01 01:29:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
2020-03-10 20:45:15 +01:00
"github.com/valyala/fastjson/fastfloat"
2019-05-22 23:16:55 +02:00
"github.com/valyala/quicktemplate"
)
var (
2020-09-11 13:16:40 +02:00
latencyOffset = flag . Duration ( "search.latencyOffset" , time . Second * 30 , "The time when data points become visible in query results after the collection. " +
2019-10-28 11:30:50 +01:00
"Too small value can result in incomplete last points for query results" )
2020-09-10 23:28:19 +02:00
maxQueryLen = flagutil . NewBytes ( "search.maxQueryLen" , 16 * 1024 , "The maximum search query length in bytes" )
2021-06-10 07:32:52 +02:00
maxLookback = flag . Duration ( "search.maxLookback" , 0 , "Synonym to -search.lookback-delta from Prometheus. " +
2020-04-20 18:25:32 +02:00
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. " +
"See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons" )
maxStalenessInterval = flag . Duration ( "search.maxStalenessInterval" , 0 , "The maximum interval for staleness calculations. " +
2020-04-20 18:41:59 +02:00
"By default it is automatically calculated from the median interval between samples. This flag could be useful for tuning " +
2020-04-20 18:25:32 +02:00
"Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. " +
2020-09-15 13:24:27 +02:00
"See also '-search.maxLookback' flag, which has the same meaning due to historical reasons" )
2021-01-19 21:55:46 +01:00
maxStepForPointsAdjustment = flag . Duration ( "search.maxStepForPointsAdjustment" , time . Minute , "The maximum step when /api/v1/query_range handler adjusts " +
"points with timestamps closer than -search.latencyOffset to the current time. The adjustment is needed because such points may contain incomplete data" )
2022-03-26 09:17:37 +01:00
maxUniqueTimeseries = flag . Int ( "search.maxUniqueTimeseries" , 300e3 , "The maximum number of unique time series, which can be selected during /api/v1/query and /api/v1/query_range queries. This option allows limiting memory usage" )
maxFederateSeries = flag . Int ( "search.maxFederateSeries" , 300e3 , "The maximum number of time series, which can be returned from /federate. This option allows limiting memory usage" )
maxExportSeries = flag . Int ( "search.maxExportSeries" , 1e6 , "The maximum number of time series, which can be returned from /api/v1/export* APIs. This option allows limiting memory usage" )
maxTSDBStatusSeries = flag . Int ( "search.maxTSDBStatusSeries" , 1e6 , "The maximum number of time series, which can be processed during the call to /api/v1/status/tsdb. This option allows limiting memory usage" )
maxSeriesLimit = flag . Int ( "search.maxSeries" , 10e3 , "The maximum number of time series, which can be returned from /api/v1/series. This option allows limiting memory usage" )
2019-05-22 23:16:55 +02:00
)
// Default step used if not set.
const defaultStep = 5 * 60 * 1000
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
2020-02-04 15:13:59 +01:00
func FederateHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer federateDuration . UpdateDuration ( startTime )
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse request form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
2019-10-15 18:12:27 +02:00
lookbackDelta , err := getMaxLookback ( r )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2019-10-15 18:12:27 +02:00
if lookbackDelta <= 0 {
lookbackDelta = defaultStep
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , ct - lookbackDelta )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
if start >= end {
start = end - defaultStep
}
2021-02-01 17:04:14 +01:00
tagFilterss , err := getTagFilterssFromRequest ( r )
2019-05-22 23:16:55 +02:00
if err != nil {
return err
}
2022-03-26 09:17:37 +01:00
sq := storage . NewSearchQuery ( start , end , tagFilterss , * maxFederateSeries )
2022-06-01 01:29:19 +02:00
rss , err := netstorage . ProcessSearchQuery ( nil , sq , true , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
2019-05-22 23:16:55 +02:00
}
2020-11-13 09:25:39 +01:00
w . Header ( ) . Set ( "Content-Type" , "text/plain; charset=utf-8" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-06-01 01:29:19 +02:00
err = rss . RunParallel ( nil , func ( rs * netstorage . Result , workerID uint ) error {
2020-09-27 22:17:14 +02:00
if err := bw . Error ( ) ; err != nil {
return err
}
bb := quicktemplate . AcquireByteBuffer ( )
WriteFederate ( bb , rs )
2020-09-29 10:36:12 +02:00
_ , err := bw . Write ( bb . B )
2019-05-22 23:16:55 +02:00
quicktemplate . ReleaseByteBuffer ( bb )
2020-09-29 10:36:12 +02:00
return err
2020-09-27 22:17:14 +02:00
} )
2019-05-22 23:16:55 +02:00
if err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "error during sending data to remote client: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
return nil
}
var federateDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/federate"} ` )
2020-10-12 19:01:51 +02:00
// ExportCSVHandler exports data in CSV format from /api/v1/export/csv
func ExportCSVHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer exportCSVDuration . UpdateDuration ( startTime )
2020-10-12 19:01:51 +02:00
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %w" , err )
}
format := r . FormValue ( "format" )
if len ( format ) == 0 {
2021-04-20 19:16:17 +02:00
return fmt . Errorf ( "missing `format` arg; see https://docs.victoriametrics.com/#how-to-export-csv-data" )
2020-10-12 19:01:51 +02:00
}
fieldNames := strings . Split ( format , "," )
2021-12-17 09:56:03 +01:00
reduceMemUsage := searchutils . GetBool ( r , "reduce_mem_usage" )
2022-05-03 14:52:50 +02:00
ep , err := getExportParams ( r , startTime )
2020-10-12 19:01:51 +02:00
if err != nil {
return err
}
2022-05-03 14:52:50 +02:00
sq := storage . NewSearchQuery ( ep . start , ep . end , ep . filterss , * maxExportSeries )
2020-11-13 09:25:39 +01:00
w . Header ( ) . Set ( "Content-Type" , "text/csv; charset=utf-8" )
2020-10-12 19:01:51 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-12-08 19:49:32 +01:00
resultsCh := make ( chan * quicktemplate . ByteBuffer , cgroup . AvailableCPUs ( ) )
2021-12-17 09:56:03 +01:00
writeCSVLine := func ( xb * exportBlock ) {
if len ( xb . timestamps ) == 0 {
return
}
bb := quicktemplate . AcquireByteBuffer ( )
WriteExportCSVLine ( bb , xb , fieldNames )
resultsCh <- bb
}
doneCh := make ( chan error , 1 )
if ! reduceMemUsage {
2022-06-01 01:29:19 +02:00
rss , err := netstorage . ProcessSearchQuery ( nil , sq , true , ep . deadline )
2021-12-17 09:56:03 +01:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
go func ( ) {
2022-06-01 01:29:19 +02:00
err := rss . RunParallel ( nil , func ( rs * netstorage . Result , workerID uint ) error {
2021-12-17 09:56:03 +01:00
if err := bw . Error ( ) ; err != nil {
return err
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = & rs . MetricName
xb . timestamps = rs . Timestamps
xb . values = rs . Values
writeCSVLine ( xb )
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
close ( resultsCh )
doneCh <- err
} ( )
} else {
go func ( ) {
2022-06-01 01:29:19 +02:00
err := netstorage . ExportBlocks ( nil , sq , ep . deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error {
2021-12-17 09:56:03 +01:00
if err := bw . Error ( ) ; err != nil {
return err
}
if err := b . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block during export: %s" , err )
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = mn
xb . timestamps , xb . values = b . AppendRowsWithTimeRangeFilter ( xb . timestamps [ : 0 ] , xb . values [ : 0 ] , tr )
writeCSVLine ( xb )
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
close ( resultsCh )
doneCh <- err
} ( )
}
2020-10-13 08:35:13 +02:00
// Consume all the data from resultsCh.
2020-10-12 19:01:51 +02:00
for bb := range resultsCh {
2020-10-13 08:35:13 +02:00
// Do not check for error in bw.Write, since this error is checked inside netstorage.ExportBlocks above.
_ , _ = bw . Write ( bb . B )
2020-10-12 19:01:51 +02:00
quicktemplate . ReleaseByteBuffer ( bb )
}
if err := bw . Flush ( ) ; err != nil {
return err
}
err = <- doneCh
if err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "error during sending the exported csv data to remote client: %w" , err )
2020-10-12 19:01:51 +02:00
}
return nil
}
var exportCSVDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export/csv"} ` )
2020-09-26 03:29:45 +02:00
// ExportNativeHandler exports data in native format from /api/v1/export/native.
func ExportNativeHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer exportNativeDuration . UpdateDuration ( startTime )
2020-09-26 03:29:45 +02:00
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %w" , err )
}
2022-05-03 14:52:50 +02:00
ep , err := getExportParams ( r , startTime )
2020-09-26 03:29:45 +02:00
if err != nil {
return err
}
2022-05-03 14:52:50 +02:00
sq := storage . NewSearchQuery ( ep . start , ep . end , ep . filterss , * maxExportSeries )
2020-09-26 03:29:45 +02:00
w . Header ( ) . Set ( "Content-Type" , "VictoriaMetrics/native" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-09-26 03:29:45 +02:00
// Marshal tr
trBuf := make ( [ ] byte , 0 , 16 )
2022-05-03 14:52:50 +02:00
trBuf = encoding . MarshalInt64 ( trBuf , ep . start )
trBuf = encoding . MarshalInt64 ( trBuf , ep . end )
2020-09-29 10:36:12 +02:00
_ , _ = bw . Write ( trBuf )
2020-09-26 03:29:45 +02:00
2020-09-27 22:17:14 +02:00
// Marshal native blocks.
2022-06-01 01:29:19 +02:00
err = netstorage . ExportBlocks ( nil , sq , ep . deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error {
2020-09-27 22:17:14 +02:00
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 03:29:45 +02:00
dstBuf := bbPool . Get ( )
tmpBuf := bbPool . Get ( )
dst := dstBuf . B
tmp := tmpBuf . B
// Marshal mn
tmp = mn . Marshal ( tmp [ : 0 ] )
dst = encoding . MarshalUint32 ( dst , uint32 ( len ( tmp ) ) )
dst = append ( dst , tmp ... )
// Marshal b
tmp = b . MarshalPortable ( tmp [ : 0 ] )
dst = encoding . MarshalUint32 ( dst , uint32 ( len ( tmp ) ) )
dst = append ( dst , tmp ... )
tmpBuf . B = tmp
bbPool . Put ( tmpBuf )
2020-09-29 10:36:12 +02:00
_ , err := bw . Write ( dst )
2020-09-26 03:29:45 +02:00
dstBuf . B = dst
bbPool . Put ( dstBuf )
2020-09-29 10:36:12 +02:00
return err
2020-09-26 03:29:45 +02:00
} )
2020-09-27 22:17:14 +02:00
if err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "error during sending native data to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "error during flushing native data to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
return nil
2020-09-26 03:29:45 +02:00
}
2020-09-27 22:17:14 +02:00
var exportNativeDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export/native"} ` )
2020-09-26 03:29:45 +02:00
var bbPool bytesutil . ByteBufferPool
2019-05-22 23:16:55 +02:00
// ExportHandler exports data in raw format from /api/v1/export.
2020-02-04 15:13:59 +01:00
func ExportHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer exportDuration . UpdateDuration ( startTime )
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse request form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
2022-05-03 14:52:50 +02:00
ep , err := getExportParams ( r , startTime )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2019-05-22 23:16:55 +02:00
format := r . FormValue ( "format" )
2020-03-10 20:45:15 +01:00
maxRowsPerLine := int ( fastfloat . ParseInt64BestEffort ( r . FormValue ( "max_rows_per_line" ) ) )
2020-09-26 03:29:45 +02:00
reduceMemUsage := searchutils . GetBool ( r , "reduce_mem_usage" )
2022-06-01 01:29:19 +02:00
if err := exportHandler ( nil , w , ep , format , maxRowsPerLine , reduceMemUsage ) ; err != nil {
2022-05-03 14:52:50 +02:00
return fmt . Errorf ( "error when exporting data on the time range (start=%d, end=%d): %w" , ep . start , ep . end , err )
2019-05-22 23:16:55 +02:00
}
return nil
}
var exportDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export"} ` )
2022-06-01 01:29:19 +02:00
func exportHandler ( qt * querytracer . Tracer , w http . ResponseWriter , ep * exportParams , format string , maxRowsPerLine int , reduceMemUsage bool ) error {
2019-05-22 23:16:55 +02:00
writeResponseFunc := WriteExportStdResponse
2020-09-26 03:29:45 +02:00
writeLineFunc := func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
2020-08-10 19:57:18 +02:00
bb := quicktemplate . AcquireByteBuffer ( )
2020-09-26 03:29:45 +02:00
WriteExportJSONLine ( bb , xb )
2020-08-10 19:57:18 +02:00
resultsCh <- bb
}
2020-11-13 09:25:39 +01:00
contentType := "application/stream+json; charset=utf-8"
2020-09-26 03:29:45 +02:00
if format == "prometheus" {
2020-11-13 09:25:39 +01:00
contentType = "text/plain; charset=utf-8"
2020-09-26 03:29:45 +02:00
writeLineFunc = func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
bb := quicktemplate . AcquireByteBuffer ( )
WriteExportPrometheusLine ( bb , xb )
resultsCh <- bb
}
} else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
bb := quicktemplate . AcquireByteBuffer ( )
WriteExportPromAPILine ( bb , xb )
resultsCh <- bb
}
}
2020-03-10 20:45:15 +01:00
if maxRowsPerLine > 0 {
2020-09-26 03:29:45 +02:00
writeLineFuncOrig := writeLineFunc
writeLineFunc = func ( xb * exportBlock , resultsCh chan <- * quicktemplate . ByteBuffer ) {
valuesOrig := xb . values
timestampsOrig := xb . timestamps
2020-03-10 20:45:15 +01:00
values := valuesOrig
timestamps := timestampsOrig
for len ( values ) > 0 {
var valuesChunk [ ] float64
var timestampsChunk [ ] int64
if len ( values ) > maxRowsPerLine {
valuesChunk = values [ : maxRowsPerLine ]
timestampsChunk = timestamps [ : maxRowsPerLine ]
values = values [ maxRowsPerLine : ]
timestamps = timestamps [ maxRowsPerLine : ]
} else {
valuesChunk = values
timestampsChunk = timestamps
values = nil
timestamps = nil
}
2020-09-26 03:29:45 +02:00
xb . values = valuesChunk
xb . timestamps = timestampsChunk
writeLineFuncOrig ( xb , resultsCh )
2020-03-10 20:45:15 +01:00
}
2020-09-26 03:29:45 +02:00
xb . values = valuesOrig
xb . timestamps = timestampsOrig
2020-08-10 19:57:18 +02:00
}
2019-05-22 23:16:55 +02:00
}
2022-05-03 14:52:50 +02:00
sq := storage . NewSearchQuery ( ep . start , ep . end , ep . filterss , * maxExportSeries )
2020-09-27 22:17:14 +02:00
w . Header ( ) . Set ( "Content-Type" , contentType )
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-12-08 19:49:32 +01:00
resultsCh := make ( chan * quicktemplate . ByteBuffer , cgroup . AvailableCPUs ( ) )
2021-12-17 09:56:03 +01:00
doneCh := make ( chan error , 1 )
2020-09-26 03:29:45 +02:00
if ! reduceMemUsage {
2022-06-01 01:29:19 +02:00
rss , err := netstorage . ProcessSearchQuery ( qt , sq , true , ep . deadline )
2020-09-26 03:29:45 +02:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
2022-06-01 01:29:19 +02:00
qtChild := qt . NewChild ( )
2020-09-26 03:29:45 +02:00
go func ( ) {
2022-06-01 01:29:19 +02:00
err := rss . RunParallel ( qtChild , func ( rs * netstorage . Result , workerID uint ) error {
2020-09-27 22:17:14 +02:00
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 03:29:45 +02:00
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = & rs . MetricName
xb . timestamps = rs . Timestamps
xb . values = rs . Values
writeLineFunc ( xb , resultsCh )
xb . reset ( )
exportBlockPool . Put ( xb )
2020-09-27 22:17:14 +02:00
return nil
2020-09-26 03:29:45 +02:00
} )
2022-06-01 01:29:19 +02:00
qtChild . Donef ( "background export format=%s" , format )
2020-09-26 03:29:45 +02:00
close ( resultsCh )
doneCh <- err
} ( )
} else {
2022-06-01 01:29:19 +02:00
qtChild := qt . NewChild ( )
2020-09-26 03:29:45 +02:00
go func ( ) {
2022-06-01 01:29:19 +02:00
err := netstorage . ExportBlocks ( qtChild , sq , ep . deadline , func ( mn * storage . MetricName , b * storage . Block , tr storage . TimeRange ) error {
2020-09-27 22:17:14 +02:00
if err := bw . Error ( ) ; err != nil {
return err
}
2020-09-26 03:29:45 +02:00
if err := b . UnmarshalData ( ) ; err != nil {
return fmt . Errorf ( "cannot unmarshal block during export: %s" , err )
}
xb := exportBlockPool . Get ( ) . ( * exportBlock )
xb . mn = mn
xb . timestamps , xb . values = b . AppendRowsWithTimeRangeFilter ( xb . timestamps [ : 0 ] , xb . values [ : 0 ] , tr )
if len ( xb . timestamps ) > 0 {
writeLineFunc ( xb , resultsCh )
}
xb . reset ( )
exportBlockPool . Put ( xb )
return nil
} )
2022-06-01 01:29:19 +02:00
qtChild . Donef ( "background export format=%s" , format )
2020-09-26 03:29:45 +02:00
close ( resultsCh )
doneCh <- err
} ( )
}
2019-05-22 23:16:55 +02:00
2020-09-27 22:17:14 +02:00
// writeResponseFunc must consume all the data from resultsCh.
2022-06-01 01:29:19 +02:00
writeResponseFunc ( bw , resultsCh , qt )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
return err
2019-05-24 13:54:31 +02:00
}
2022-05-03 14:52:50 +02:00
err := <- doneCh
2019-05-22 23:16:55 +02:00
if err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "error during sending the data to remote client: %w" , err )
2019-05-22 23:16:55 +02:00
}
return nil
}
2020-09-26 03:29:45 +02:00
type exportBlock struct {
mn * storage . MetricName
timestamps [ ] int64
values [ ] float64
}
func ( xb * exportBlock ) reset ( ) {
xb . mn = nil
xb . timestamps = xb . timestamps [ : 0 ]
xb . values = xb . values [ : 0 ]
}
var exportBlockPool = & sync . Pool {
New : func ( ) interface { } {
return & exportBlock { }
} ,
}
2019-05-22 23:16:55 +02:00
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
2020-02-04 15:13:59 +01:00
func DeleteHandler ( startTime time . Time , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer deleteDuration . UpdateDuration ( startTime )
2021-02-02 23:24:05 +01:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse request form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
if r . FormValue ( "start" ) != "" || r . FormValue ( "end" ) != "" {
return fmt . Errorf ( "start and end aren't supported. Remove these args from the query in order to delete all the matching metrics" )
}
2021-02-01 17:04:14 +01:00
tagFilterss , err := getTagFilterssFromRequest ( r )
2019-05-22 23:16:55 +02:00
if err != nil {
return err
}
2021-02-02 23:24:05 +01:00
ct := startTime . UnixNano ( ) / 1e6
2022-03-26 09:17:37 +01:00
sq := storage . NewSearchQuery ( 0 , ct , tagFilterss , 0 )
2022-06-01 01:29:19 +02:00
deletedCount , err := netstorage . DeleteSeries ( nil , sq , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2021-02-01 17:04:14 +01:00
return fmt . Errorf ( "cannot delete time series: %w" , err )
2019-05-22 23:16:55 +02:00
}
if deletedCount > 0 {
promql . ResetRollupResultCache ( )
}
return nil
}
var deleteDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/admin/tsdb/delete_series"} ` )
// LabelValuesHandler processes /api/v1/label/<labelName>/values request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
2022-06-01 01:29:19 +02:00
func LabelValuesHandler ( qt * querytracer . Tracer , startTime time . Time , labelName string , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer labelValuesDuration . UpdateDuration ( startTime )
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-08-04 22:09:18 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2019-08-04 22:09:18 +02:00
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 17:04:14 +01:00
if err != nil {
return err
}
2021-02-11 14:00:12 +01:00
matches := getMatchesFromRequest ( r )
2019-08-04 22:09:18 +02:00
var labelValues [ ] string
2021-12-06 16:07:06 +01:00
if len ( matches ) == 0 && len ( etfs ) == 0 {
2020-11-04 23:15:43 +01:00
if len ( r . Form [ "start" ] ) == 0 && len ( r . Form [ "end" ] ) == 0 {
2020-11-05 00:36:13 +01:00
var err error
2022-06-01 01:29:19 +02:00
labelValues , err = netstorage . GetLabelValues ( qt , labelName , deadline )
2020-11-05 00:36:13 +01:00
if err != nil {
return fmt . Errorf ( ` cannot obtain label values for %q: %w ` , labelName , err )
}
2020-11-04 23:15:43 +01:00
} else {
ct := startTime . UnixNano ( ) / 1e6
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return err
}
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
if err != nil {
return err
}
tr := storage . TimeRange {
MinTimestamp : start ,
MaxTimestamp : end ,
}
2022-06-01 01:29:19 +02:00
labelValues , err = netstorage . GetLabelValuesOnTimeRange ( qt , labelName , tr , deadline )
2020-11-05 00:36:13 +01:00
if err != nil {
return fmt . Errorf ( ` cannot obtain label values on time range for %q: %w ` , labelName , err )
}
2019-08-04 22:09:18 +02:00
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/label/foo/values?match[]=foobar{baz="abc"}&start=...&end=...
// is equivalent to `label_values(foobar{baz="abc"}, foo)` call on the selected
// time range in Grafana templating.
if len ( matches ) == 0 {
matches = [ ] string { fmt . Sprintf ( "{%s!=''}" , labelName ) }
}
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-08-04 22:09:18 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
2019-08-04 22:09:18 +02:00
if err != nil {
return err
}
2022-06-01 01:29:19 +02:00
labelValues , err = labelValuesWithMatches ( qt , labelName , matches , etfs , start , end , deadline )
2019-08-04 22:09:18 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain label values for %q, match[]=%q, start=%d, end=%d: %w" , labelName , matches , start , end , err )
2019-08-04 22:09:18 +02:00
}
2019-05-22 23:16:55 +02:00
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-06-01 01:29:19 +02:00
qtDone := func ( ) {
qt . Donef ( "/api/v1/labels" )
}
WriteLabelValuesResponse ( bw , labelValues , qt , qtDone )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "canot flush label values to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-05-22 23:16:55 +02:00
return nil
}
2022-06-01 01:29:19 +02:00
func labelValuesWithMatches ( qt * querytracer . Tracer , labelName string , matches [ ] string , etfs [ ] [ ] storage . TagFilter ,
start , end int64 , deadline searchutils . Deadline ) ( [ ] string , error ) {
2019-08-04 22:09:18 +02:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
2020-02-28 22:35:47 +01:00
2020-02-28 11:17:27 +01:00
// Add `labelName!=''` tag filter in order to filter out series without the labelName.
2020-02-28 22:35:47 +01:00
// There is no need in adding `__name__!=''` filter, since all the time series should
// already have non-empty name.
if labelName != "__name__" {
key := [ ] byte ( labelName )
for i , tfs := range tagFilterss {
tagFilterss [ i ] = append ( tfs , storage . TagFilter {
Key : key ,
IsNegative : true ,
} )
}
2019-12-14 23:07:09 +01:00
}
2019-08-04 22:09:18 +02:00
if start >= end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-08-04 22:09:18 +02:00
}
2021-12-06 16:07:06 +01:00
tagFilterss = searchutils . JoinTagFilterss ( tagFilterss , etfs )
2021-02-01 17:04:14 +01:00
if len ( tagFilterss ) == 0 {
logger . Panicf ( "BUG: tagFilterss must be non-empty" )
}
2022-03-26 09:17:37 +01:00
sq := storage . NewSearchQuery ( start , end , tagFilterss , * maxSeriesLimit )
2019-08-04 22:09:18 +02:00
m := make ( map [ string ] struct { } )
2020-11-16 12:45:50 +01:00
if end - start > 24 * 3600 * 1000 {
// It is cheaper to call SearchMetricNames on time ranges exceeding a day.
2022-06-01 01:29:19 +02:00
mns , err := netstorage . SearchMetricNames ( qt , sq , deadline )
2020-11-16 12:45:50 +01:00
if err != nil {
return nil , fmt . Errorf ( "cannot fetch time series for %q: %w" , sq , err )
}
for _ , mn := range mns {
labelValue := mn . GetTagValue ( labelName )
if len ( labelValue ) == 0 {
continue
}
m [ string ( labelValue ) ] = struct { } { }
}
} else {
2022-06-01 01:29:19 +02:00
rss , err := netstorage . ProcessSearchQuery ( qt , sq , false , deadline )
2020-11-16 12:45:50 +01:00
if err != nil {
return nil , fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
var mLock sync . Mutex
2022-06-01 01:29:19 +02:00
err = rss . RunParallel ( qt , func ( rs * netstorage . Result , workerID uint ) error {
2020-11-16 12:45:50 +01:00
labelValue := rs . MetricName . GetTagValue ( labelName )
if len ( labelValue ) == 0 {
return nil
}
mLock . Lock ( )
m [ string ( labelValue ) ] = struct { } { }
mLock . Unlock ( )
2020-09-27 22:17:14 +02:00
return nil
2020-11-16 12:45:50 +01:00
} )
if err != nil {
2021-09-16 11:56:58 +02:00
return nil , fmt . Errorf ( "cannot fetch label values from storage: %w" , err )
2019-08-04 22:09:18 +02:00
}
}
labelValues := make ( [ ] string , 0 , len ( m ) )
for labelValue := range m {
labelValues = append ( labelValues , labelValue )
}
sort . Strings ( labelValues )
2022-06-01 01:29:19 +02:00
qt . Printf ( "sort %d label values" , len ( labelValues ) )
2019-08-04 22:09:18 +02:00
return labelValues , nil
}
2019-05-22 23:16:55 +02:00
var labelValuesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/label/ { }/values"} ` )
2019-06-10 17:55:20 +02:00
// LabelsCountHandler processes /api/v1/labels/count request.
2020-02-04 15:13:59 +01:00
func LabelsCountHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer labelsCountDuration . UpdateDuration ( startTime )
2021-03-30 20:38:59 +02:00
deadline := searchutils . GetDeadlineForStatusRequest ( r , startTime )
2022-06-01 01:29:19 +02:00
labelEntries , err := netstorage . GetLabelEntries ( nil , deadline )
2019-06-10 17:55:20 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( ` cannot obtain label entries: %w ` , err )
2019-06-10 17:55:20 +02:00
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteLabelsCountResponse ( bw , labelEntries )
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send labels count response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-06-10 17:55:20 +02:00
return nil
}
var labelsCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels/count"} ` )
2020-04-22 18:57:36 +02:00
const secsPerDay = 3600 * 24
// TSDBStatusHandler processes /api/v1/status/tsdb request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
2021-05-12 15:32:48 +02:00
//
// It can accept `match[]` filters in order to narrow down the search.
2020-04-22 18:57:36 +02:00
func TSDBStatusHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer tsdbStatusDuration . UpdateDuration ( startTime )
2021-03-30 20:38:59 +02:00
deadline := searchutils . GetDeadlineForStatusRequest ( r , startTime )
2020-04-22 18:57:36 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2020-04-22 18:57:36 +02:00
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-05-12 14:18:45 +02:00
if err != nil {
return err
}
matches := getMatchesFromRequest ( r )
2020-05-14 21:01:51 +02:00
date := fasttime . UnixDate ( )
2020-04-22 18:57:36 +02:00
dateStr := r . FormValue ( "date" )
if len ( dateStr ) > 0 {
t , err := time . Parse ( "2006-01-02" , dateStr )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse `date` arg %q: %w" , dateStr , err )
2020-04-22 18:57:36 +02:00
}
2020-05-14 21:01:51 +02:00
date = uint64 ( t . Unix ( ) ) / secsPerDay
2020-04-22 18:57:36 +02:00
}
topN := 10
topNStr := r . FormValue ( "topN" )
if len ( topNStr ) > 0 {
n , err := strconv . Atoi ( topNStr )
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse `topN` arg %q: %w" , topNStr , err )
2020-04-22 18:57:36 +02:00
}
if n <= 0 {
n = 1
}
if n > 1000 {
n = 1000
}
topN = n
}
2021-05-12 14:18:45 +02:00
var status * storage . TSDBStatus
2021-12-06 16:07:06 +01:00
if len ( matches ) == 0 && len ( etfs ) == 0 {
2022-06-01 01:29:19 +02:00
status , err = netstorage . GetTSDBStatusForDate ( nil , deadline , date , topN , * maxTSDBStatusSeries )
2021-05-12 14:18:45 +02:00
if err != nil {
return fmt . Errorf ( ` cannot obtain tsdb status for date=%d, topN=%d: %w ` , date , topN , err )
}
} else {
2022-03-26 09:17:37 +01:00
status , err = tsdbStatusWithMatches ( matches , etfs , date , topN , * maxTSDBStatusSeries , deadline )
2021-05-12 14:18:45 +02:00
if err != nil {
2021-05-12 15:32:48 +02:00
return fmt . Errorf ( "cannot obtain tsdb status with matches for date=%d, topN=%d: %w" , date , topN , err )
2021-05-12 14:18:45 +02:00
}
2020-04-22 18:57:36 +02:00
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteTSDBStatusResponse ( bw , status )
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send tsdb status response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2020-04-22 18:57:36 +02:00
return nil
}
2022-03-26 09:17:37 +01:00
func tsdbStatusWithMatches ( matches [ ] string , etfs [ ] [ ] storage . TagFilter , date uint64 , topN , maxMetrics int , deadline searchutils . Deadline ) ( * storage . TSDBStatus , error ) {
2021-05-12 14:18:45 +02:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
2021-12-06 16:07:06 +01:00
tagFilterss = searchutils . JoinTagFilterss ( tagFilterss , etfs )
2021-05-12 14:18:45 +02:00
if len ( tagFilterss ) == 0 {
logger . Panicf ( "BUG: tagFilterss must be non-empty" )
}
start := int64 ( date * secsPerDay ) * 1000
end := int64 ( date * secsPerDay + secsPerDay ) * 1000
2022-03-26 09:17:37 +01:00
sq := storage . NewSearchQuery ( start , end , tagFilterss , maxMetrics )
2022-06-01 01:29:19 +02:00
status , err := netstorage . GetTSDBStatusWithFilters ( nil , deadline , sq , topN )
2021-05-12 14:18:45 +02:00
if err != nil {
return nil , err
}
return status , nil
}
2020-04-22 18:57:36 +02:00
var tsdbStatusDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/status/tsdb"} ` )
2019-05-22 23:16:55 +02:00
// LabelsHandler processes /api/v1/labels request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
2022-06-01 01:29:19 +02:00
func LabelsHandler ( qt * querytracer . Tracer , startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer labelsDuration . UpdateDuration ( startTime )
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-12-14 23:07:09 +01:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2019-12-14 23:07:09 +01:00
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 17:04:14 +01:00
if err != nil {
return err
}
2021-02-11 14:00:12 +01:00
matches := getMatchesFromRequest ( r )
2019-12-14 23:07:09 +01:00
var labels [ ] string
2021-12-06 16:07:06 +01:00
if len ( matches ) == 0 && len ( etfs ) == 0 {
2020-11-04 23:15:43 +01:00
if len ( r . Form [ "start" ] ) == 0 && len ( r . Form [ "end" ] ) == 0 {
2020-11-05 00:36:13 +01:00
var err error
2022-06-01 01:29:19 +02:00
labels , err = netstorage . GetLabels ( qt , deadline )
2020-11-05 00:36:13 +01:00
if err != nil {
return fmt . Errorf ( "cannot obtain labels: %w" , err )
}
2020-11-04 23:15:43 +01:00
} else {
ct := startTime . UnixNano ( ) / 1e6
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return err
}
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
if err != nil {
return err
}
tr := storage . TimeRange {
MinTimestamp : start ,
MaxTimestamp : end ,
}
2022-06-01 01:29:19 +02:00
labels , err = netstorage . GetLabelsOnTimeRange ( qt , tr , deadline )
2020-11-05 00:36:13 +01:00
if err != nil {
return fmt . Errorf ( "cannot obtain labels on time range: %w" , err )
}
2019-12-14 23:07:09 +01:00
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/labels?match[]=foobar{baz="abc"}&start=...&end=...
if len ( matches ) == 0 {
matches = [ ] string { "{__name__!=''}" }
}
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-12-14 23:07:09 +01:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
2019-12-14 23:07:09 +01:00
if err != nil {
return err
}
2022-06-01 01:29:19 +02:00
labels , err = labelsWithMatches ( qt , matches , etfs , start , end , deadline )
2019-12-14 23:07:09 +01:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain labels for match[]=%q, start=%d, end=%d: %w" , matches , start , end , err )
2019-12-14 23:07:09 +01:00
}
2019-05-22 23:16:55 +02:00
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-06-01 01:29:19 +02:00
qtDone := func ( ) {
qt . Donef ( "/api/v1/labels" )
}
WriteLabelsResponse ( bw , labels , qt , qtDone )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send labels response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-05-22 23:16:55 +02:00
return nil
}
2022-06-01 01:29:19 +02:00
func labelsWithMatches ( qt * querytracer . Tracer , matches [ ] string , etfs [ ] [ ] storage . TagFilter , start , end int64 , deadline searchutils . Deadline ) ( [ ] string , error ) {
2019-12-14 23:07:09 +01:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
if start >= end {
end = start + defaultStep
}
2021-12-06 16:07:06 +01:00
tagFilterss = searchutils . JoinTagFilterss ( tagFilterss , etfs )
2021-02-01 17:04:14 +01:00
if len ( tagFilterss ) == 0 {
logger . Panicf ( "BUG: tagFilterss must be non-empty" )
}
2022-03-26 09:17:37 +01:00
sq := storage . NewSearchQuery ( start , end , tagFilterss , * maxSeriesLimit )
2019-12-14 23:07:09 +01:00
m := make ( map [ string ] struct { } )
2020-11-16 12:45:50 +01:00
if end - start > 24 * 3600 * 1000 {
// It is cheaper to call SearchMetricNames on time ranges exceeding a day.
2022-06-01 01:29:19 +02:00
mns , err := netstorage . SearchMetricNames ( qt , sq , deadline )
2020-11-16 12:45:50 +01:00
if err != nil {
return nil , fmt . Errorf ( "cannot fetch time series for %q: %w" , sq , err )
}
for _ , mn := range mns {
for _ , tag := range mn . Tags {
m [ string ( tag . Key ) ] = struct { } { }
}
}
2020-11-16 12:54:34 +01:00
if len ( mns ) > 0 {
m [ "__name__" ] = struct { } { }
}
2020-11-16 12:45:50 +01:00
} else {
2022-06-01 01:29:19 +02:00
rss , err := netstorage . ProcessSearchQuery ( qt , sq , false , deadline )
2020-11-16 12:45:50 +01:00
if err != nil {
return nil , fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
}
var mLock sync . Mutex
2022-06-01 01:29:19 +02:00
err = rss . RunParallel ( qt , func ( rs * netstorage . Result , workerID uint ) error {
2020-11-16 12:45:50 +01:00
mLock . Lock ( )
for _ , tag := range rs . MetricName . Tags {
m [ string ( tag . Key ) ] = struct { } { }
}
m [ "__name__" ] = struct { } { }
mLock . Unlock ( )
return nil
} )
if err != nil {
2021-09-16 11:56:58 +02:00
return nil , fmt . Errorf ( "cannot fetch labels from storage: %w" , err )
2020-11-16 12:45:50 +01:00
}
2019-12-14 23:07:09 +01:00
}
labels := make ( [ ] string , 0 , len ( m ) )
for label := range m {
labels = append ( labels , label )
}
sort . Strings ( labels )
2022-06-01 01:29:19 +02:00
qt . Printf ( "sort %d labels" , len ( labels ) )
2019-12-14 23:07:09 +01:00
return labels , nil
}
2019-05-22 23:16:55 +02:00
var labelsDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels"} ` )
// SeriesCountHandler processes /api/v1/series/count request.
2020-02-04 15:13:59 +01:00
func SeriesCountHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer seriesCountDuration . UpdateDuration ( startTime )
2021-03-30 20:38:59 +02:00
deadline := searchutils . GetDeadlineForStatusRequest ( r , startTime )
2022-06-01 01:29:19 +02:00
n , err := netstorage . GetSeriesCount ( nil , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot obtain series count: %w" , err )
2019-05-22 23:16:55 +02:00
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
WriteSeriesCountResponse ( bw , n )
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send series count response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-05-22 23:16:55 +02:00
return nil
}
var seriesCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series/count"} ` )
// SeriesHandler processes /api/v1/series request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
2022-06-01 01:29:19 +02:00
func SeriesHandler ( qt * querytracer . Tracer , startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer seriesDuration . UpdateDuration ( startTime )
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
if err := r . ParseForm ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot parse form values: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
// Do not set start to searchutils.minTimeMsecs by default as Prometheus does,
2019-08-04 18:42:36 +02:00
// since this leads to fetching and scanning all the data from the storage,
// which can take a lot of time for big storages.
// It is better setting start as end-defaultStep by default.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/91
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , end - defaultStep )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
2021-02-01 17:04:14 +01:00
tagFilterss , err := getTagFilterssFromRequest ( r )
2019-05-22 23:16:55 +02:00
if err != nil {
return err
}
if start >= end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-05-22 23:16:55 +02:00
}
2022-03-26 09:17:37 +01:00
sq := storage . NewSearchQuery ( start , end , tagFilterss , * maxSeriesLimit )
2022-06-01 01:29:19 +02:00
qtDone := func ( ) {
qt . Donef ( "/api/v1/series: start=%d, end=%d" , start , end )
}
2020-11-16 09:55:55 +01:00
if end - start > 24 * 3600 * 1000 {
// It is cheaper to call SearchMetricNames on time ranges exceeding a day.
2022-06-01 01:29:19 +02:00
mns , err := netstorage . SearchMetricNames ( qt , sq , deadline )
2020-11-16 09:55:55 +01:00
if err != nil {
return fmt . Errorf ( "cannot fetch time series for %q: %w" , sq , err )
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-11-16 09:55:55 +01:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
resultsCh := make ( chan * quicktemplate . ByteBuffer )
go func ( ) {
for i := range mns {
bb := quicktemplate . AcquireByteBuffer ( )
writemetricNameObject ( bb , & mns [ i ] )
resultsCh <- bb
}
2020-11-16 12:30:41 +01:00
close ( resultsCh )
2020-11-16 09:55:55 +01:00
} ( )
// WriteSeriesResponse must consume all the data from resultsCh.
2022-06-01 01:29:19 +02:00
WriteSeriesResponse ( bw , resultsCh , qt , qtDone )
2020-11-16 09:55:55 +01:00
if err := bw . Flush ( ) ; err != nil {
return err
}
seriesDuration . UpdateDuration ( startTime )
return nil
}
2022-06-01 01:29:19 +02:00
rss , err := netstorage . ProcessSearchQuery ( qt , sq , false , deadline )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot fetch data for %q: %w" , sq , err )
2019-05-22 23:16:55 +02:00
}
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2019-05-22 23:16:55 +02:00
resultsCh := make ( chan * quicktemplate . ByteBuffer )
doneCh := make ( chan error )
go func ( ) {
2022-06-01 01:29:19 +02:00
err := rss . RunParallel ( qt , func ( rs * netstorage . Result , workerID uint ) error {
2020-09-27 22:17:14 +02:00
if err := bw . Error ( ) ; err != nil {
return err
}
2019-05-22 23:16:55 +02:00
bb := quicktemplate . AcquireByteBuffer ( )
writemetricNameObject ( bb , & rs . MetricName )
resultsCh <- bb
2020-09-27 22:17:14 +02:00
return nil
2019-05-22 23:16:55 +02:00
} )
close ( resultsCh )
doneCh <- err
} ( )
2020-09-27 22:17:14 +02:00
// WriteSeriesResponse must consume all the data from resultsCh.
2022-06-01 01:29:19 +02:00
WriteSeriesResponse ( bw , resultsCh , qt , qtDone )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot flush series response to remote client: %w" , err )
2019-05-22 23:16:55 +02:00
}
err = <- doneCh
if err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send series response to remote client: %w" , err )
2019-05-22 23:16:55 +02:00
}
return nil
}
var seriesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series"} ` )
// QueryHandler processes /api/v1/query request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
2022-06-01 01:29:19 +02:00
func QueryHandler ( qt * querytracer . Tracer , startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer queryDuration . UpdateDuration ( startTime )
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2022-06-01 01:29:19 +02:00
mayCache := ! searchutils . GetBool ( r , "nocache" )
2019-05-22 23:16:55 +02:00
query := r . FormValue ( "query" )
2019-06-20 13:05:07 +02:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "time" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-04-17 11:24:10 +02:00
lookbackDelta , err := getMaxLookback ( r )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
step , err := searchutils . GetDuration ( r , "step" , lookbackDelta )
2019-10-15 18:12:27 +02:00
if err != nil {
return err
}
2020-04-17 11:24:10 +02:00
if step <= 0 {
step = defaultStep
}
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
2019-05-22 23:16:55 +02:00
2020-08-16 16:05:52 +02:00
if len ( query ) > maxQueryLen . N {
return fmt . Errorf ( "too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes" , len ( query ) , maxQueryLen . N )
2019-05-22 23:16:55 +02:00
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 17:04:14 +01:00
if err != nil {
return err
}
2021-07-12 16:16:38 +02:00
if childQuery , windowExpr , offsetExpr := promql . IsMetricSelectorWithRollup ( query ) ; childQuery != "" {
window := windowExpr . Duration ( step )
offset := offsetExpr . Duration ( step )
2019-05-22 23:16:55 +02:00
start -= offset
end := start
start = end - window
2021-03-12 11:19:05 +01:00
// Do not include data point with a timestamp matching the lower boundary of the window as Prometheus does.
2021-03-12 11:16:50 +01:00
start ++
if end < start {
end = start
}
2022-05-03 14:52:50 +02:00
tagFilterss , err := getTagFilterssFromMatches ( [ ] string { childQuery } )
if err != nil {
return err
}
filterss := searchutils . JoinTagFilterss ( tagFilterss , etfs )
ep := & exportParams {
deadline : deadline ,
start : start ,
end : end ,
filterss : filterss ,
}
2022-06-01 01:29:19 +02:00
if err := exportHandler ( qt , w , ep , "promapi" , 0 , false ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when exporting data for query=%q on the time range (start=%d, end=%d): %w" , childQuery , start , end , err )
2019-05-22 23:16:55 +02:00
}
queryDuration . UpdateDuration ( startTime )
return nil
}
2021-07-12 16:16:38 +02:00
if childQuery , windowExpr , stepExpr , offsetExpr := promql . IsRollup ( query ) ; childQuery != "" {
newStep := stepExpr . Duration ( step )
2019-12-10 23:40:36 +01:00
if newStep > 0 {
step = newStep
}
2021-07-12 16:16:38 +02:00
window := windowExpr . Duration ( step )
offset := offsetExpr . Duration ( step )
2019-12-10 23:40:36 +01:00
start -= offset
end := start
start = end - window
2022-06-01 01:29:19 +02:00
if err := queryRangeHandler ( qt , startTime , w , childQuery , start , end , step , r , ct , etfs ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w" , childQuery , start , end , step , err )
2019-12-10 23:40:36 +01:00
}
queryDuration . UpdateDuration ( startTime )
return nil
}
2019-05-22 23:16:55 +02:00
2020-09-23 11:58:46 +02:00
queryOffset := getLatencyOffsetMilliseconds ( )
if ! searchutils . GetBool ( r , "nocache" ) && ct - start < queryOffset && start - ct < queryOffset {
// Adjust start time only if `nocache` arg isn't set.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/241
startPrev := start
start = ct - queryOffset
queryOffset = startPrev - start
} else {
queryOffset = 0
}
2019-05-22 23:16:55 +02:00
ec := promql . EvalConfig {
2021-12-06 16:07:06 +01:00
Start : start ,
End : start ,
Step : step ,
2022-03-26 09:17:37 +01:00
MaxSeries : * maxUniqueTimeseries ,
2021-12-06 16:07:06 +01:00
QuotedRemoteAddr : httpserver . GetQuotedRemoteAddr ( r ) ,
Deadline : deadline ,
2022-06-01 01:29:19 +02:00
MayCache : mayCache ,
2021-12-06 16:07:06 +01:00
LookbackDelta : lookbackDelta ,
RoundDigits : getRoundDigits ( r ) ,
EnforcedTagFilterss : etfs ,
2019-05-22 23:16:55 +02:00
}
2022-06-01 01:29:19 +02:00
result , err := promql . Exec ( qt , & ec , query , true )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when executing query=%q for (time=%d, step=%d): %w" , query , start , step , err )
2019-05-22 23:16:55 +02:00
}
2020-09-23 11:58:46 +02:00
if queryOffset > 0 {
for i := range result {
2020-09-23 12:04:17 +02:00
timestamps := result [ i ] . Timestamps
2020-09-23 11:58:46 +02:00
for j := range timestamps {
timestamps [ j ] += queryOffset
}
}
}
2019-05-22 23:16:55 +02:00
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-06-01 01:29:19 +02:00
qtDone := func ( ) {
qt . Donef ( "/api/v1/query: query=%s, time=%d: series=%d" , query , start , len ( result ) )
}
WriteQueryResponse ( bw , result , qt , qtDone )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot flush query response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-05-22 23:16:55 +02:00
return nil
}
var queryDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query"} ` )
// QueryRangeHandler processes /api/v1/query_range request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
2022-06-01 01:29:19 +02:00
func QueryRangeHandler ( qt * querytracer . Tracer , startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer queryRangeDuration . UpdateDuration ( startTime )
2020-07-21 17:34:59 +02:00
ct := startTime . UnixNano ( ) / 1e6
2019-05-22 23:16:55 +02:00
query := r . FormValue ( "query" )
2019-06-20 13:05:07 +02:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2020-09-10 23:28:19 +02:00
start , err := searchutils . GetTime ( r , "start" , ct - defaultStep )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
end , err := searchutils . GetTime ( r , "end" , ct )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2020-09-10 23:28:19 +02:00
step , err := searchutils . GetDuration ( r , "step" , defaultStep )
2019-06-06 21:17:13 +02:00
if err != nil {
return err
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 17:04:14 +01:00
if err != nil {
return err
}
2022-06-01 01:29:19 +02:00
if err := queryRangeHandler ( qt , startTime , w , query , start , end , step , r , ct , etfs ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w" , query , start , end , step , err )
2019-12-10 23:40:36 +01:00
}
return nil
}
2022-06-01 01:29:19 +02:00
func queryRangeHandler ( qt * querytracer . Tracer , startTime time . Time , w http . ResponseWriter , query string ,
start , end , step int64 , r * http . Request , ct int64 , etfs [ ] [ ] storage . TagFilter ) error {
2020-09-10 23:28:19 +02:00
deadline := searchutils . GetDeadlineForQuery ( r , startTime )
mayCache := ! searchutils . GetBool ( r , "nocache" )
2019-10-15 18:12:27 +02:00
lookbackDelta , err := getMaxLookback ( r )
if err != nil {
return err
}
2019-05-22 23:16:55 +02:00
// Validate input args.
2020-08-16 16:05:52 +02:00
if len ( query ) > maxQueryLen . N {
return fmt . Errorf ( "too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes" , len ( query ) , maxQueryLen . N )
2019-05-22 23:16:55 +02:00
}
if start > end {
2019-11-22 15:10:33 +01:00
end = start + defaultStep
2019-05-22 23:16:55 +02:00
}
if err := promql . ValidateMaxPointsPerTimeseries ( start , end , step ) ; err != nil {
return err
}
2019-09-04 12:09:20 +02:00
if mayCache {
start , end = promql . AdjustStartEnd ( start , end , step )
}
2019-05-22 23:16:55 +02:00
ec := promql . EvalConfig {
2021-12-06 16:07:06 +01:00
Start : start ,
End : end ,
Step : step ,
2022-03-26 09:17:37 +01:00
MaxSeries : * maxUniqueTimeseries ,
2021-12-06 16:07:06 +01:00
QuotedRemoteAddr : httpserver . GetQuotedRemoteAddr ( r ) ,
Deadline : deadline ,
MayCache : mayCache ,
LookbackDelta : lookbackDelta ,
RoundDigits : getRoundDigits ( r ) ,
EnforcedTagFilterss : etfs ,
2019-05-22 23:16:55 +02:00
}
2022-06-01 01:29:19 +02:00
result , err := promql . Exec ( qt , & ec , query , false )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot execute query: %w" , err )
2019-05-22 23:16:55 +02:00
}
2021-01-19 21:55:46 +01:00
if step < maxStepForPointsAdjustment . Milliseconds ( ) {
queryOffset := getLatencyOffsetMilliseconds ( )
if ct - queryOffset < end {
result = adjustLastPoints ( result , ct - queryOffset , ct + step )
}
2019-05-22 23:16:55 +02:00
}
2019-08-20 21:52:49 +02:00
// Remove NaN values as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/153
2020-07-20 14:28:36 +02:00
result = removeEmptyValuesAndTimeseries ( result )
2019-08-20 21:52:49 +02:00
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-09-27 22:17:14 +02:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2022-06-01 01:29:19 +02:00
qtDone := func ( ) {
qt . Donef ( "/api/v1/query_range: start=%d, end=%d, step=%d, query=%q: series=%d" , start , end , step , query , len ( result ) )
}
WriteQueryRangeResponse ( bw , result , qt , qtDone )
2020-09-27 22:17:14 +02:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send query range response to remote client: %w" , err )
2020-09-27 22:17:14 +02:00
}
2019-05-22 23:16:55 +02:00
return nil
}
2020-07-20 14:28:36 +02:00
func removeEmptyValuesAndTimeseries ( tss [ ] netstorage . Result ) [ ] netstorage . Result {
dst := tss [ : 0 ]
2019-08-20 21:52:49 +02:00
for i := range tss {
ts := & tss [ i ]
hasNaNs := false
for _ , v := range ts . Values {
if math . IsNaN ( v ) {
hasNaNs = true
break
}
}
if ! hasNaNs {
// Fast path: nothing to remove.
2020-07-20 14:28:36 +02:00
if len ( ts . Values ) > 0 {
dst = append ( dst , * ts )
}
2019-08-20 21:52:49 +02:00
continue
}
// Slow path: remove NaNs.
srcTimestamps := ts . Timestamps
dstValues := ts . Values [ : 0 ]
dstTimestamps := ts . Timestamps [ : 0 ]
for j , v := range ts . Values {
if math . IsNaN ( v ) {
continue
}
dstValues = append ( dstValues , v )
dstTimestamps = append ( dstTimestamps , srcTimestamps [ j ] )
}
ts . Values = dstValues
ts . Timestamps = dstTimestamps
2020-07-20 14:28:36 +02:00
if len ( ts . Values ) > 0 {
dst = append ( dst , * ts )
}
2019-08-20 21:52:49 +02:00
}
2020-07-20 14:28:36 +02:00
return dst
2019-08-20 21:52:49 +02:00
}
2019-05-22 23:16:55 +02:00
var queryRangeDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query_range"} ` )
2020-07-14 11:45:42 +02:00
var nan = math . NaN ( )
2020-07-05 17:17:02 +02:00
// adjustLastPoints substitutes the last point values on the time range (start..end]
2020-07-14 11:45:42 +02:00
// with the previous point values, since these points may contain incomplete values.
2020-07-05 17:17:02 +02:00
func adjustLastPoints ( tss [ ] netstorage . Result , start , end int64 ) [ ] netstorage . Result {
2019-05-22 23:16:55 +02:00
for i := range tss {
2020-07-05 17:17:02 +02:00
ts := & tss [ i ]
values := ts . Values
timestamps := ts . Timestamps
j := len ( timestamps ) - 1
2020-07-14 11:45:42 +02:00
if j >= 0 && timestamps [ j ] > end {
// It looks like the `offset` is used in the query, which shifts time range beyond the `end`.
// Leave such a time series as is, since it is unclear which points may be incomplete in it.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/625
continue
}
2020-07-05 17:17:02 +02:00
for j >= 0 && timestamps [ j ] > start {
2019-05-22 23:16:55 +02:00
j --
}
2020-07-05 17:17:02 +02:00
j ++
2020-07-14 11:45:42 +02:00
lastValue := nan
if j > 0 {
lastValue = values [ j - 1 ]
2020-07-05 16:56:54 +02:00
}
2020-07-05 17:17:02 +02:00
for j < len ( timestamps ) && timestamps [ j ] <= end {
2020-07-14 11:45:42 +02:00
values [ j ] = lastValue
2020-07-05 17:17:02 +02:00
j ++
2019-05-22 23:16:55 +02:00
}
}
2019-07-04 08:14:15 +02:00
return tss
2019-05-22 23:16:55 +02:00
}
2019-10-15 18:12:27 +02:00
func getMaxLookback ( r * http . Request ) ( int64 , error ) {
2020-03-29 20:50:10 +02:00
d := maxLookback . Milliseconds ( )
2020-04-20 18:25:32 +02:00
if d == 0 {
d = maxStalenessInterval . Milliseconds ( )
}
2020-09-10 23:28:19 +02:00
return searchutils . GetDuration ( r , "max_lookback" , d )
2019-05-22 23:16:55 +02:00
}
func getTagFilterssFromMatches ( matches [ ] string ) ( [ ] [ ] storage . TagFilter , error ) {
tagFilterss := make ( [ ] [ ] storage . TagFilter , 0 , len ( matches ) )
for _ , match := range matches {
2021-12-06 16:07:06 +01:00
tagFilters , err := searchutils . ParseMetricSelector ( match )
2019-05-22 23:16:55 +02:00
if err != nil {
2021-12-06 16:07:06 +01:00
return nil , fmt . Errorf ( "cannot parse matches[]=%s: %w" , match , err )
2019-05-22 23:16:55 +02:00
}
tagFilterss = append ( tagFilterss , tagFilters )
}
return tagFilterss , nil
}
2019-10-28 11:30:50 +01:00
2021-02-01 17:04:14 +01:00
func getTagFilterssFromRequest ( r * http . Request ) ( [ ] [ ] storage . TagFilter , error ) {
2021-02-11 14:00:12 +01:00
matches := getMatchesFromRequest ( r )
if len ( matches ) == 0 {
return nil , fmt . Errorf ( "missing `match[]` query arg" )
2021-02-01 17:04:14 +01:00
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
2021-12-06 16:07:06 +01:00
etfs , err := searchutils . GetExtraTagFilters ( r )
2021-02-01 17:04:14 +01:00
if err != nil {
return nil , err
}
2021-12-06 16:07:06 +01:00
tagFilterss = searchutils . JoinTagFilterss ( tagFilterss , etfs )
2021-02-01 17:04:14 +01:00
return tagFilterss , nil
}
2021-02-11 14:00:12 +01:00
func getMatchesFromRequest ( r * http . Request ) [ ] string {
2021-02-01 17:04:14 +01:00
matches := r . Form [ "match[]" ]
2021-02-11 14:00:12 +01:00
// This is needed for backwards compatibility
matches = append ( matches , r . Form [ "match" ] ... )
return matches
2021-02-01 17:04:14 +01:00
}
2021-03-15 11:35:44 +01:00
func getRoundDigits ( r * http . Request ) int {
s := r . FormValue ( "round_digits" )
if len ( s ) == 0 {
return 100
}
n , err := strconv . Atoi ( s )
if err != nil {
return 100
}
return n
}
2019-10-28 11:30:50 +01:00
func getLatencyOffsetMilliseconds ( ) int64 {
2020-03-29 20:50:10 +02:00
d := latencyOffset . Milliseconds ( )
2019-10-28 11:30:50 +01:00
if d <= 1000 {
d = 1000
}
return d
}
2020-12-25 15:42:05 +01:00
2020-12-25 15:44:26 +01:00
// QueryStatsHandler returns query stats at `/api/v1/status/top_queries`
func QueryStatsHandler ( startTime time . Time , w http . ResponseWriter , r * http . Request ) error {
2021-08-19 12:58:54 +02:00
defer queryStatsDuration . UpdateDuration ( startTime )
2020-12-25 15:42:05 +01:00
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse form values: %w" , err )
}
2020-12-25 15:44:26 +01:00
topN := 20
2020-12-25 15:42:05 +01:00
topNStr := r . FormValue ( "topN" )
if len ( topNStr ) > 0 {
n , err := strconv . Atoi ( topNStr )
if err != nil {
return fmt . Errorf ( "cannot parse `topN` arg %q: %w" , topNStr , err )
}
topN = n
}
2020-12-25 15:44:26 +01:00
maxLifetimeMsecs , err := searchutils . GetDuration ( r , "maxLifetime" , 10 * 60 * 1000 )
if err != nil {
return fmt . Errorf ( "cannot parse `maxLifetime` arg: %w" , err )
}
2020-12-27 11:53:50 +01:00
maxLifetime := time . Duration ( maxLifetimeMsecs ) * time . Millisecond
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2020-12-25 15:42:05 +01:00
bw := bufferedwriter . Get ( w )
defer bufferedwriter . Put ( bw )
2020-12-27 11:53:50 +01:00
querystats . WriteJSONQueryStats ( bw , topN , maxLifetime )
2020-12-25 15:42:05 +01:00
if err := bw . Flush ( ) ; err != nil {
2021-09-16 11:56:58 +02:00
return fmt . Errorf ( "cannot send query stats response to client: %w" , err )
2020-12-25 15:42:05 +01:00
}
return nil
}
2020-12-25 15:44:26 +01:00
var queryStatsDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/status/top_queries"} ` )
2022-05-03 14:52:50 +02:00
// exportParams contains common parameters for all /api/v1/export* handlers
//
// deadline, start, end, match[], extra_label, extra_filters
type exportParams struct {
deadline searchutils . Deadline
start int64
end int64
filterss [ ] [ ] storage . TagFilter
}
// getExportParams obtains common params from r, which are used for /api/v1/export* handlers
func getExportParams ( r * http . Request , startTime time . Time ) ( * exportParams , error ) {
deadline := searchutils . GetDeadlineForExport ( r , startTime )
start , err := searchutils . GetTime ( r , "start" , 0 )
if err != nil {
return nil , err
}
ct := startTime . UnixNano ( ) / 1e6
end , err := searchutils . GetTime ( r , "end" , ct )
if err != nil {
return nil , err
}
if end < start {
end = start
}
matches := r . Form [ "match[]" ]
if len ( matches ) == 0 {
// Maintain backwards compatibility
match := r . FormValue ( "match" )
if len ( match ) == 0 {
return nil , fmt . Errorf ( "missing `match[]` arg" )
}
matches = [ ] string { match }
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
etfs , err := searchutils . GetExtraTagFilters ( r )
if err != nil {
return nil , err
}
filterss := searchutils . JoinTagFilterss ( tagFilterss , etfs )
return & exportParams {
deadline : deadline ,
start : start ,
end : end ,
filterss : filterss ,
} , nil
}