2020-02-23 12:35:47 +01:00
package promscrape
import (
2020-04-16 22:41:16 +02:00
"flag"
2020-04-16 22:34:37 +02:00
"fmt"
2020-08-10 11:31:59 +02:00
"math"
2020-08-16 21:27:26 +02:00
"math/bits"
2020-11-04 09:38:09 +01:00
"strconv"
2021-09-12 11:49:19 +02:00
"strings"
2020-08-13 22:12:22 +02:00
"sync"
2020-02-23 12:35:47 +01:00
"time"
2021-09-01 13:14:37 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2021-08-13 11:10:00 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
2021-10-16 11:58:34 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2021-10-14 11:29:12 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-08-13 22:12:22 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-04-13 11:59:05 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
2020-12-24 09:56:10 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
2021-01-26 23:23:10 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/metrics"
2020-05-03 13:29:26 +02:00
xxhash "github.com/cespare/xxhash/v2"
2020-02-23 12:35:47 +01:00
)
2020-04-16 22:41:16 +02:00
var (
suppressScrapeErrors = flag . Bool ( "promscrape.suppressScrapeErrors" , false , "Whether to suppress scrape errors logging. " +
"The last error for each target is always available at '/targets' page even if scrape errors logging is suppressed" )
2021-10-14 11:29:12 +02:00
noStaleMarkers = flag . Bool ( "promscrape.noStaleMarkers" , false , "Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. See also https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode" )
seriesLimitPerTarget = flag . Int ( "promscrape.seriesLimitPerTarget" , 0 , "Optional limit on the number of unique time series a single scrape target can expose. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter for more info" )
minResponseSizeForStreamParse = flagutil . NewBytes ( "promscrape.minResponseSizeForStreamParse" , 1e6 , "The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode" )
2020-04-16 22:41:16 +02:00
)
2020-02-23 12:35:47 +01:00
// ScrapeWork represents a unit of work for scraping Prometheus metrics.
2020-12-17 13:30:33 +01:00
//
// It must be immutable during its lifetime, since it is read from concurrently running goroutines.
2020-02-23 12:35:47 +01:00
type ScrapeWork struct {
// Full URL (including query args) for the scrape.
ScrapeURL string
// Interval for scraping the ScrapeURL.
ScrapeInterval time . Duration
// Timeout for scraping the ScrapeURL.
ScrapeTimeout time . Duration
// How to deal with conflicting labels.
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
HonorLabels bool
// How to deal with scraped timestamps.
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
HonorTimestamps bool
2021-04-02 18:56:38 +02:00
// Whether to deny redirects during requests to scrape config.
DenyRedirects bool
2020-10-08 17:50:22 +02:00
// OriginalLabels contains original labels before relabeling.
//
// These labels are needed for relabeling troubleshooting at /targets page.
OriginalLabels [ ] prompbmarshal . Label
2020-02-23 12:35:47 +01:00
// Labels to add to the scraped metrics.
//
// The list contains at least the following labels according to https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
//
// * job
// * __address__
// * __scheme__
// * __metrics_path__
2021-09-12 12:33:39 +02:00
// * __scrape_interval__
// * __scrape_timeout__
2020-02-23 12:35:47 +01:00
// * __param_<name>
// * __meta_*
// * user-defined labels set via `relabel_configs` section in `scrape_config`
//
// See also https://prometheus.io/docs/concepts/jobs_instances/
Labels [ ] prompbmarshal . Label
2020-12-24 09:52:37 +01:00
// ProxyURL HTTP proxy url
2020-12-24 09:56:10 +01:00
ProxyURL proxy . URL
2020-12-24 09:52:37 +01:00
2021-03-12 02:35:49 +01:00
// Auth config for ProxyUR:
ProxyAuthConfig * promauth . Config
// Auth config
AuthConfig * promauth . Config
2020-02-23 12:35:47 +01:00
// Optional `metric_relabel_configs`.
2021-02-22 15:33:55 +01:00
MetricRelabelConfigs * promrelabel . ParsedConfigs
2020-02-23 12:35:47 +01:00
// The maximum number of metrics to scrape after relabeling.
2020-04-14 10:58:15 +02:00
SampleLimit int
2020-06-23 14:35:19 +02:00
2020-07-02 13:19:11 +02:00
// Whether to disable response compression when querying ScrapeURL.
DisableCompression bool
// Whether to disable HTTP keep-alive when querying ScrapeURL.
DisableKeepAlive bool
2020-11-01 22:12:13 +01:00
// Whether to parse target responses in a streaming manner.
StreamParse bool
2021-02-18 22:51:29 +01:00
// The interval for aligning the first scrape.
ScrapeAlignInterval time . Duration
2021-03-08 10:58:25 +01:00
// The offset for the first scrape.
ScrapeOffset time . Duration
2021-09-01 13:14:37 +02:00
// Optional limit on the number of unique series the scrape target can expose.
SeriesLimit int
2020-06-23 14:35:19 +02:00
// The original 'job_name'
jobNameOriginal string
2020-02-23 12:35:47 +01:00
}
2021-10-15 14:26:22 +02:00
func ( sw * ScrapeWork ) canSwitchToStreamParseMode ( ) bool {
// Deny switching to stream parse mode if `sample_limit` or `series_limit` options are set,
// since these limits cannot be applied in stream parsing mode.
2021-10-14 11:29:12 +02:00
return sw . SampleLimit <= 0 && sw . SeriesLimit <= 0
}
2020-05-03 11:41:13 +02:00
// key returns unique identifier for the given sw.
//
2020-11-01 22:12:13 +01:00
// it can be used for comparing for equality for two ScrapeWork objects.
2020-05-03 11:41:13 +02:00
func ( sw * ScrapeWork ) key ( ) string {
2020-10-08 17:50:22 +02:00
// Do not take into account OriginalLabels.
2021-04-02 18:56:38 +02:00
key := fmt . Sprintf ( "ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, DenyRedirects=%v, Labels=%s, " +
2021-03-12 02:35:49 +01:00
"ProxyURL=%s, ProxyAuthConfig=%s, AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v, StreamParse=%v, " +
2021-09-01 13:14:37 +02:00
"ScrapeAlignInterval=%s, ScrapeOffset=%s, SeriesLimit=%d" ,
2021-04-02 18:56:38 +02:00
sw . ScrapeURL , sw . ScrapeInterval , sw . ScrapeTimeout , sw . HonorLabels , sw . HonorTimestamps , sw . DenyRedirects , sw . LabelsString ( ) ,
2021-03-12 02:35:49 +01:00
sw . ProxyURL . String ( ) , sw . ProxyAuthConfig . String ( ) ,
2021-03-08 10:58:25 +01:00
sw . AuthConfig . String ( ) , sw . MetricRelabelConfigs . String ( ) , sw . SampleLimit , sw . DisableCompression , sw . DisableKeepAlive , sw . StreamParse ,
2021-09-01 13:14:37 +02:00
sw . ScrapeAlignInterval , sw . ScrapeOffset , sw . SeriesLimit )
2020-05-03 11:41:13 +02:00
return key
}
2020-04-14 12:32:55 +02:00
// Job returns job for the ScrapeWork
func ( sw * ScrapeWork ) Job ( ) string {
2020-04-14 13:11:54 +02:00
return promrelabel . GetLabelValueByName ( sw . Labels , "job" )
2020-04-14 12:32:55 +02:00
}
2020-04-16 22:34:37 +02:00
// LabelsString returns labels in Prometheus format for the given sw.
func ( sw * ScrapeWork ) LabelsString ( ) string {
2020-10-08 17:50:22 +02:00
labelsFinalized := promrelabel . FinalizeLabels ( nil , sw . Labels )
return promLabelsString ( labelsFinalized )
}
func promLabelsString ( labels [ ] prompbmarshal . Label ) string {
2020-11-04 09:38:09 +01:00
// Calculate the required memory for storing serialized labels.
n := 2 // for `{...}`
2020-10-08 17:50:22 +02:00
for _ , label := range labels {
2020-11-04 09:38:09 +01:00
n += len ( label . Name ) + len ( label . Value )
n += 4 // for `="...",`
2020-04-16 22:34:37 +02:00
}
2020-11-04 09:38:09 +01:00
b := make ( [ ] byte , 0 , n )
b = append ( b , '{' )
for i , label := range labels {
b = append ( b , label . Name ... )
b = append ( b , '=' )
b = strconv . AppendQuote ( b , label . Value )
if i + 1 < len ( labels ) {
b = append ( b , ',' )
}
}
b = append ( b , '}' )
return bytesutil . ToUnsafeString ( b )
2020-04-16 22:34:37 +02:00
}
2020-02-23 12:35:47 +01:00
type scrapeWork struct {
// Config for the scrape.
2020-12-17 13:30:33 +01:00
Config * ScrapeWork
2020-02-23 12:35:47 +01:00
// ReadData is called for reading the data.
ReadData func ( dst [ ] byte ) ( [ ] byte , error )
2020-11-01 22:12:13 +01:00
// GetStreamReader is called if Config.StreamParse is set.
GetStreamReader func ( ) ( * streamReader , error )
2020-02-23 12:35:47 +01:00
// PushData is called for pushing collected data.
2021-08-05 08:46:19 +02:00
PushData func ( wr * prompbmarshal . WriteRequest )
2020-02-23 12:35:47 +01:00
2020-07-13 20:52:03 +02:00
// ScrapeGroup is name of ScrapeGroup that
// scrapeWork belongs to
ScrapeGroup string
2020-08-13 22:12:22 +02:00
tmpRow parser . Row
2020-08-09 11:44:49 +02:00
2021-09-12 11:49:19 +02:00
// This flag is set to true if series_limit is exceeded.
seriesLimitExceeded bool
// labelsHashBuf is used for calculating the hash on series labels
2020-08-10 18:47:43 +02:00
labelsHashBuf [ ] byte
2020-08-13 22:12:22 +02:00
2021-09-01 13:14:37 +02:00
// Optional limiter on the number of unique series per scrape target.
seriesLimiter * bloomfilter . Limiter
2020-08-14 00:16:18 +02:00
// prevBodyLen contains the previous response body length for the given scrape work.
2020-08-13 22:12:22 +02:00
// It is used as a hint in order to reduce memory usage for body buffers.
2020-08-14 00:16:18 +02:00
prevBodyLen int
2020-08-16 21:27:26 +02:00
2021-03-14 21:56:23 +01:00
// prevLabelsLen contains the number labels scraped during the previous scrape.
2020-08-16 21:27:26 +02:00
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
2021-03-14 21:56:23 +01:00
prevLabelsLen int
2021-08-13 11:10:00 +02:00
2021-08-21 20:16:50 +02:00
// lastScrape holds the last response from scrape target.
lastScrape [ ] byte
2021-10-16 11:58:34 +02:00
// lastScrapeCompressed is used for storing the compressed lastScrape between scrapes
// in stream parsing mode in order to reduce memory usage when the lastScrape size
// equals to or exceeds -promscrape.minResponseSizeForStreamParse
lastScrapeCompressed [ ] byte
}
func ( sw * scrapeWork ) loadLastScrape ( ) {
if len ( sw . lastScrapeCompressed ) == 0 {
// The lastScrape is already stored in sw.lastScrape
return
}
b , err := encoding . DecompressZSTD ( sw . lastScrape [ : 0 ] , sw . lastScrapeCompressed )
if err != nil {
logger . Panicf ( "BUG: cannot unpack compressed previous response: %s" , err )
}
sw . lastScrape = b
}
func ( sw * scrapeWork ) storeLastScrape ( lastScrape [ ] byte ) {
mustCompress := minResponseSizeForStreamParse . N > 0 && len ( lastScrape ) >= minResponseSizeForStreamParse . N
if mustCompress {
sw . lastScrapeCompressed = encoding . CompressZSTDLevel ( sw . lastScrapeCompressed [ : 0 ] , lastScrape , 1 )
sw . lastScrape = nil
} else {
sw . lastScrape = append ( sw . lastScrape [ : 0 ] , lastScrape ... )
sw . lastScrapeCompressed = nil
}
}
func ( sw * scrapeWork ) finalizeLastScrape ( ) {
if len ( sw . lastScrapeCompressed ) > 0 {
// The compressed lastScrape is available in sw.lastScrapeCompressed.
// Release the memory occupied by sw.lastScrape, so it won't be occupied between scrapes.
sw . lastScrape = nil
}
if len ( sw . lastScrape ) > 0 {
// Release the memory occupied by sw.lastScrapeCompressed, so it won't be occupied between scrapes.
sw . lastScrapeCompressed = nil
}
2020-02-23 12:35:47 +01:00
}
func ( sw * scrapeWork ) run ( stopCh <- chan struct { } ) {
2021-02-18 22:51:29 +01:00
var randSleep uint64
2021-03-08 10:58:25 +01:00
scrapeInterval := sw . Config . ScrapeInterval
scrapeAlignInterval := sw . Config . ScrapeAlignInterval
scrapeOffset := sw . Config . ScrapeOffset
if scrapeOffset > 0 {
scrapeAlignInterval = scrapeInterval
}
if scrapeAlignInterval <= 0 {
2021-02-18 22:51:29 +01:00
// Calculate start time for the first scrape from ScrapeURL and labels.
// This should spread load when scraping many targets with different
// scrape urls and labels.
// This also makes consistent scrape times across restarts
// for a target with the same ScrapeURL and labels.
key := fmt . Sprintf ( "ScrapeURL=%s, Labels=%s" , sw . Config . ScrapeURL , sw . Config . LabelsString ( ) )
2021-02-28 17:39:57 +01:00
h := uint32 ( xxhash . Sum64 ( bytesutil . ToUnsafeBytes ( key ) ) )
2021-02-18 23:33:37 +01:00
randSleep = uint64 ( float64 ( scrapeInterval ) * ( float64 ( h ) / ( 1 << 32 ) ) )
2021-02-18 22:51:29 +01:00
sleepOffset := uint64 ( time . Now ( ) . UnixNano ( ) ) % uint64 ( scrapeInterval )
if randSleep < sleepOffset {
randSleep += uint64 ( scrapeInterval )
}
randSleep -= sleepOffset
} else {
2021-03-08 10:58:25 +01:00
d := uint64 ( scrapeAlignInterval )
2021-02-18 22:51:29 +01:00
randSleep = d - uint64 ( time . Now ( ) . UnixNano ( ) ) % d
2021-03-08 10:58:25 +01:00
if scrapeOffset > 0 {
randSleep += uint64 ( scrapeOffset )
}
2021-02-18 22:51:29 +01:00
randSleep %= uint64 ( scrapeInterval )
2020-05-03 13:29:26 +02:00
}
2021-01-26 23:23:10 +01:00
timer := timerpool . Get ( time . Duration ( randSleep ) )
2020-04-01 15:10:35 +02:00
var timestamp int64
2020-02-23 12:35:47 +01:00
var ticker * time . Ticker
select {
case <- stopCh :
2021-01-26 23:23:10 +01:00
timerpool . Put ( timer )
2020-02-23 12:35:47 +01:00
return
2020-04-01 15:10:35 +02:00
case <- timer . C :
2021-01-26 23:23:10 +01:00
timerpool . Put ( timer )
2020-04-01 15:10:35 +02:00
ticker = time . NewTicker ( scrapeInterval )
timestamp = time . Now ( ) . UnixNano ( ) / 1e6
2020-08-10 11:31:59 +02:00
sw . scrapeAndLogError ( timestamp , timestamp )
2020-02-23 12:35:47 +01:00
}
defer ticker . Stop ( )
for {
2020-04-01 15:10:35 +02:00
timestamp += scrapeInterval . Milliseconds ( )
2020-02-23 12:35:47 +01:00
select {
case <- stopCh :
2021-08-19 13:18:02 +02:00
t := time . Now ( ) . UnixNano ( ) / 1e6
2021-09-11 09:51:04 +02:00
sw . sendStaleSeries ( "" , t , true )
2021-09-01 13:14:37 +02:00
if sw . seriesLimiter != nil {
sw . seriesLimiter . MustStop ( )
}
2020-02-23 12:35:47 +01:00
return
2020-08-10 11:31:59 +02:00
case tt := <- ticker . C :
t := tt . UnixNano ( ) / 1e6
if d := math . Abs ( float64 ( t - timestamp ) ) ; d > 0 && d / float64 ( scrapeInterval . Milliseconds ( ) ) > 0.1 {
2020-04-01 15:10:35 +02:00
// Too big jitter. Adjust timestamp
timestamp = t
2020-02-23 12:35:47 +01:00
}
2020-08-10 11:31:59 +02:00
sw . scrapeAndLogError ( timestamp , t )
2020-02-23 12:35:47 +01:00
}
}
}
func ( sw * scrapeWork ) logError ( s string ) {
2020-04-16 22:41:16 +02:00
if ! * suppressScrapeErrors {
2020-12-06 22:26:34 +01:00
logger . ErrorfSkipframes ( 1 , "error when scraping %q from job %q with labels %s: %s; " +
"scrape errors can be disabled by -promscrape.suppressScrapeErrors command-line flag" ,
sw . Config . ScrapeURL , sw . Config . Job ( ) , sw . Config . LabelsString ( ) , s )
2020-04-16 22:41:16 +02:00
}
2020-02-23 12:35:47 +01:00
}
2020-08-10 11:31:59 +02:00
func ( sw * scrapeWork ) scrapeAndLogError ( scrapeTimestamp , realTimestamp int64 ) {
if err := sw . scrapeInternal ( scrapeTimestamp , realTimestamp ) ; err != nil && ! * suppressScrapeErrors {
2020-04-16 22:34:37 +02:00
logger . Errorf ( "error when scraping %q from job %q with labels %s: %s" , sw . Config . ScrapeURL , sw . Config . Job ( ) , sw . Config . LabelsString ( ) , err )
2020-02-23 12:35:47 +01:00
}
}
var (
scrapeDuration = metrics . NewHistogram ( "vm_promscrape_scrape_duration_seconds" )
scrapeResponseSize = metrics . NewHistogram ( "vm_promscrape_scrape_response_size_bytes" )
scrapedSamples = metrics . NewHistogram ( "vm_promscrape_scraped_samples" )
2020-04-14 10:58:15 +02:00
scrapesSkippedBySampleLimit = metrics . NewCounter ( "vm_promscrape_scrapes_skipped_by_sample_limit_total" )
2020-02-23 12:35:47 +01:00
scrapesFailed = metrics . NewCounter ( "vm_promscrape_scrapes_failed_total" )
pushDataDuration = metrics . NewHistogram ( "vm_promscrape_push_data_duration_seconds" )
)
2021-10-15 14:26:22 +02:00
func ( sw * scrapeWork ) mustSwitchToStreamParseMode ( responseSize int ) bool {
if minResponseSizeForStreamParse . N <= 0 {
return false
}
return sw . Config . canSwitchToStreamParseMode ( ) && responseSize >= minResponseSizeForStreamParse . N
}
2020-08-10 11:31:59 +02:00
func ( sw * scrapeWork ) scrapeInternal ( scrapeTimestamp , realTimestamp int64 ) error {
2021-10-16 11:58:34 +02:00
if * streamParse || sw . Config . StreamParse || sw . mustSwitchToStreamParseMode ( sw . prevBodyLen ) {
2020-11-01 22:12:13 +01:00
// Read data from scrape targets in streaming manner.
2021-10-14 11:29:12 +02:00
// This case is optimized for targets exposing more than ten thousand of metrics per target.
2020-11-01 22:12:13 +01:00
return sw . scrapeStream ( scrapeTimestamp , realTimestamp )
}
// Common case: read all the data from scrape target to memory (body) and then process it.
2021-08-21 20:16:50 +02:00
// This case should work more optimally than stream parse code for common case when scrape target exposes
2020-11-26 12:33:46 +01:00
// up to a few thousand metrics.
2020-08-14 00:16:18 +02:00
body := leveledbytebufferpool . Get ( sw . prevBodyLen )
2020-02-23 12:35:47 +01:00
var err error
2020-08-13 22:12:22 +02:00
body . B , err = sw . ReadData ( body . B [ : 0 ] )
2020-02-23 12:35:47 +01:00
endTimestamp := time . Now ( ) . UnixNano ( ) / 1e6
2020-08-10 11:31:59 +02:00
duration := float64 ( endTimestamp - realTimestamp ) / 1e3
2020-02-23 12:35:47 +01:00
scrapeDuration . Update ( duration )
2020-08-13 22:12:22 +02:00
scrapeResponseSize . Update ( float64 ( len ( body . B ) ) )
2020-02-23 12:35:47 +01:00
up := 1
2021-03-14 21:56:23 +01:00
wc := writeRequestCtxPool . Get ( sw . prevLabelsLen )
2021-10-16 11:58:34 +02:00
sw . loadLastScrape ( )
2021-08-21 20:16:50 +02:00
bodyString := bytesutil . ToUnsafeString ( body . B )
2021-09-12 11:49:19 +02:00
lastScrape := bytesutil . ToUnsafeString ( sw . lastScrape )
areIdenticalSeries := parser . AreIdenticalSeriesFast ( lastScrape , bodyString )
2020-02-23 12:35:47 +01:00
if err != nil {
up = 0
scrapesFailed . Inc ( )
} else {
2020-08-13 22:12:22 +02:00
wc . rows . UnmarshalWithErrLogger ( bodyString , sw . logError )
2020-02-23 12:35:47 +01:00
}
2020-08-13 22:12:22 +02:00
srcRows := wc . rows . Rows
2020-02-23 12:35:47 +01:00
samplesScraped := len ( srcRows )
scrapedSamples . Update ( float64 ( samplesScraped ) )
for i := range srcRows {
2020-08-13 22:12:22 +02:00
sw . addRowToTimeseries ( wc , & srcRows [ i ] , scrapeTimestamp , true )
2020-02-23 12:35:47 +01:00
}
2021-03-14 21:56:23 +01:00
samplesPostRelabeling := len ( wc . writeRequest . Timeseries )
2021-03-09 14:47:15 +01:00
if sw . Config . SampleLimit > 0 && samplesPostRelabeling > sw . Config . SampleLimit {
wc . resetNoRows ( )
up = 0
scrapesSkippedBySampleLimit . Inc ( )
2021-05-27 13:52:44 +02:00
err = fmt . Errorf ( "the response from %q exceeds sample_limit=%d; " +
"either reduce the sample count for the target or increase sample_limit" , sw . Config . ScrapeURL , sw . Config . SampleLimit )
2021-03-09 14:47:15 +01:00
}
2021-09-12 11:49:19 +02:00
if up == 0 {
bodyString = ""
}
seriesAdded := 0
if ! areIdenticalSeries {
// The returned value for seriesAdded may be bigger than the real number of added series
// if some series were removed during relabeling.
// This is a trade-off between performance and accuracy.
seriesAdded = sw . getSeriesAdded ( bodyString )
}
if sw . seriesLimitExceeded || ! areIdenticalSeries {
if sw . applySeriesLimit ( wc ) {
sw . seriesLimitExceeded = true
}
}
2020-08-13 22:12:22 +02:00
sw . addAutoTimeseries ( wc , "up" , float64 ( up ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_duration_seconds" , duration , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_scraped" , float64 ( samplesScraped ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_post_metric_relabeling" , float64 ( samplesPostRelabeling ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_series_added" , float64 ( seriesAdded ) , scrapeTimestamp )
2021-09-12 14:20:42 +02:00
sw . addAutoTimeseries ( wc , "scrape_timeout_seconds" , sw . Config . ScrapeTimeout . Seconds ( ) , scrapeTimestamp )
2021-08-13 11:10:00 +02:00
sw . pushData ( & wc . writeRequest )
2021-03-14 21:56:23 +01:00
sw . prevLabelsLen = len ( wc . labels )
2021-10-16 11:58:34 +02:00
sw . prevBodyLen = len ( bodyString )
2020-08-13 22:12:22 +02:00
wc . reset ( )
2021-10-16 11:58:34 +02:00
mustSwitchToStreamParse := sw . mustSwitchToStreamParseMode ( len ( bodyString ) )
if ! mustSwitchToStreamParse {
2021-10-14 11:29:12 +02:00
// Return wc to the pool if the parsed response size was smaller than -promscrape.minResponseSizeForStreamParse
// This should reduce memory usage when scraping targets with big responses.
writeRequestCtxPool . Put ( wc )
}
2020-08-13 22:12:22 +02:00
// body must be released only after wc is released, since wc refers to body.
2021-09-12 11:49:19 +02:00
if ! areIdenticalSeries {
sw . sendStaleSeries ( bodyString , scrapeTimestamp , false )
2021-10-16 11:58:34 +02:00
sw . storeLastScrape ( body . B )
2021-09-12 11:49:19 +02:00
}
2021-10-16 11:58:34 +02:00
sw . finalizeLastScrape ( )
if ! mustSwitchToStreamParse {
2021-10-15 14:26:22 +02:00
// Return wc to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse
2021-10-14 11:29:12 +02:00
// This should reduce memory usage when scraping targets which return big responses.
leveledbytebufferpool . Put ( body )
}
2021-06-14 13:01:13 +02:00
tsmGlobal . Update ( sw . Config , sw . ScrapeGroup , up == 1 , realTimestamp , int64 ( duration * 1000 ) , samplesScraped , err )
2021-08-21 20:16:50 +02:00
return err
2021-08-18 20:58:40 +02:00
}
2021-08-13 11:10:00 +02:00
func ( sw * scrapeWork ) pushData ( wr * prompbmarshal . WriteRequest ) {
startTime := time . Now ( )
sw . PushData ( wr )
pushDataDuration . UpdateDuration ( startTime )
}
2021-10-15 14:26:22 +02:00
type streamBodyReader struct {
sr * streamReader
body [ ] byte
}
func ( sbr * streamBodyReader ) Read ( b [ ] byte ) ( int , error ) {
n , err := sbr . sr . Read ( b )
sbr . body = append ( sbr . body , b [ : n ] ... )
return n , err
}
2020-11-01 22:12:13 +01:00
func ( sw * scrapeWork ) scrapeStream ( scrapeTimestamp , realTimestamp int64 ) error {
samplesScraped := 0
samplesPostRelabeling := 0
2021-03-14 21:56:23 +01:00
wc := writeRequestCtxPool . Get ( sw . prevLabelsLen )
2021-10-16 11:58:34 +02:00
// Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses.
sbr := & streamBodyReader { }
2020-12-15 13:08:06 +01:00
sr , err := sw . GetStreamReader ( )
if err != nil {
err = fmt . Errorf ( "cannot read data: %s" , err )
} else {
var mu sync . Mutex
2021-10-15 14:26:22 +02:00
sbr . sr = sr
err = parser . ParseStream ( sbr , scrapeTimestamp , false , func ( rows [ ] parser . Row ) error {
2020-12-15 13:08:06 +01:00
mu . Lock ( )
defer mu . Unlock ( )
samplesScraped += len ( rows )
for i := range rows {
sw . addRowToTimeseries ( wc , & rows [ i ] , scrapeTimestamp , true )
}
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
samplesPostRelabeling += len ( wc . writeRequest . Timeseries )
2021-05-27 13:52:44 +02:00
if sw . Config . SampleLimit > 0 && samplesPostRelabeling > sw . Config . SampleLimit {
wc . resetNoRows ( )
scrapesSkippedBySampleLimit . Inc ( )
return fmt . Errorf ( "the response from %q exceeds sample_limit=%d; " +
"either reduce the sample count for the target or increase sample_limit" , sw . Config . ScrapeURL , sw . Config . SampleLimit )
}
2021-08-13 11:10:00 +02:00
sw . pushData ( & wc . writeRequest )
2020-12-15 13:08:06 +01:00
wc . resetNoRows ( )
return nil
2021-01-12 12:31:47 +01:00
} , sw . logError )
2020-12-15 13:08:06 +01:00
sr . MustClose ( )
}
2021-10-16 11:58:34 +02:00
sw . loadLastScrape ( )
2021-10-15 14:26:22 +02:00
bodyString := bytesutil . ToUnsafeString ( sbr . body )
lastScrape := bytesutil . ToUnsafeString ( sw . lastScrape )
areIdenticalSeries := parser . AreIdenticalSeriesFast ( lastScrape , bodyString )
2020-12-15 13:08:06 +01:00
2020-11-01 22:12:13 +01:00
scrapedSamples . Update ( float64 ( samplesScraped ) )
endTimestamp := time . Now ( ) . UnixNano ( ) / 1e6
duration := float64 ( endTimestamp - realTimestamp ) / 1e3
scrapeDuration . Update ( duration )
2021-10-15 14:26:22 +02:00
scrapeResponseSize . Update ( float64 ( len ( bodyString ) ) )
2020-11-01 22:12:13 +01:00
up := 1
if err != nil {
if samplesScraped == 0 {
up = 0
}
scrapesFailed . Inc ( )
}
2021-10-15 14:26:22 +02:00
seriesAdded := 0
if ! areIdenticalSeries {
// The returned value for seriesAdded may be bigger than the real number of added series
// if some series were removed during relabeling.
// This is a trade-off between performance and accuracy.
seriesAdded = sw . getSeriesAdded ( bodyString )
}
2020-11-01 22:12:13 +01:00
sw . addAutoTimeseries ( wc , "up" , float64 ( up ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_duration_seconds" , duration , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_scraped" , float64 ( samplesScraped ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_post_metric_relabeling" , float64 ( samplesPostRelabeling ) , scrapeTimestamp )
2021-10-15 14:26:22 +02:00
sw . addAutoTimeseries ( wc , "scrape_series_added" , float64 ( seriesAdded ) , scrapeTimestamp )
2021-09-12 14:20:42 +02:00
sw . addAutoTimeseries ( wc , "scrape_timeout_seconds" , sw . Config . ScrapeTimeout . Seconds ( ) , scrapeTimestamp )
2021-08-13 11:10:00 +02:00
sw . pushData ( & wc . writeRequest )
2021-03-14 21:56:23 +01:00
sw . prevLabelsLen = len ( wc . labels )
2021-10-15 14:26:22 +02:00
sw . prevBodyLen = len ( bodyString )
2020-11-01 22:12:13 +01:00
wc . reset ( )
writeRequestCtxPool . Put ( wc )
2021-10-15 14:26:22 +02:00
if ! areIdenticalSeries {
sw . sendStaleSeries ( bodyString , scrapeTimestamp , false )
2021-10-16 11:58:34 +02:00
sw . storeLastScrape ( sbr . body )
2021-10-15 14:26:22 +02:00
}
2021-10-16 11:58:34 +02:00
sw . finalizeLastScrape ( )
2021-06-14 13:01:13 +02:00
tsmGlobal . Update ( sw . Config , sw . ScrapeGroup , up == 1 , realTimestamp , int64 ( duration * 1000 ) , samplesScraped , err )
2021-08-21 20:16:50 +02:00
// Do not track active series in streaming mode, since this may need too big amounts of memory
// when the target exports too big number of metrics.
2021-01-12 12:31:47 +01:00
return err
2020-11-01 22:12:13 +01:00
}
2020-09-01 09:55:21 +02:00
// leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx
2020-08-16 21:27:26 +02:00
// structs contain mixed number of labels.
//
// Its logic has been copied from leveledbytebufferpool.
type leveledWriteRequestCtxPool struct {
2021-03-14 21:56:23 +01:00
pools [ 13 ] sync . Pool
2020-08-16 21:27:26 +02:00
}
2021-03-14 21:56:23 +01:00
func ( lwp * leveledWriteRequestCtxPool ) Get ( labelsCapacity int ) * writeRequestCtx {
id , capacityNeeded := lwp . getPoolIDAndCapacity ( labelsCapacity )
2020-08-16 21:27:26 +02:00
for i := 0 ; i < 2 ; i ++ {
if id < 0 || id >= len ( lwp . pools ) {
break
}
if v := lwp . pools [ id ] . Get ( ) ; v != nil {
return v . ( * writeRequestCtx )
}
id ++
}
return & writeRequestCtx {
labels : make ( [ ] prompbmarshal . Label , 0 , capacityNeeded ) ,
}
}
func ( lwp * leveledWriteRequestCtxPool ) Put ( wc * writeRequestCtx ) {
2021-03-14 21:56:23 +01:00
capacity := cap ( wc . labels )
id , poolCapacity := lwp . getPoolIDAndCapacity ( capacity )
if capacity <= poolCapacity {
wc . reset ( )
lwp . pools [ id ] . Put ( wc )
}
2020-08-16 21:27:26 +02:00
}
2020-08-28 08:44:08 +02:00
func ( lwp * leveledWriteRequestCtxPool ) getPoolIDAndCapacity ( size int ) ( int , int ) {
2020-08-16 21:27:26 +02:00
size --
if size < 0 {
size = 0
}
size >>= 3
id := bits . Len ( uint ( size ) )
2021-03-14 21:56:23 +01:00
if id >= len ( lwp . pools ) {
2020-08-16 21:27:26 +02:00
id = len ( lwp . pools ) - 1
}
return id , ( 1 << ( id + 3 ) )
}
2020-08-13 22:12:22 +02:00
type writeRequestCtx struct {
rows parser . Rows
writeRequest prompbmarshal . WriteRequest
labels [ ] prompbmarshal . Label
samples [ ] prompbmarshal . Sample
}
func ( wc * writeRequestCtx ) reset ( ) {
wc . rows . Reset ( )
2020-09-11 22:36:24 +02:00
wc . resetNoRows ( )
}
func ( wc * writeRequestCtx ) resetNoRows ( ) {
2020-08-13 22:12:22 +02:00
prompbmarshal . ResetWriteRequest ( & wc . writeRequest )
wc . labels = wc . labels [ : 0 ]
wc . samples = wc . samples [ : 0 ]
}
2020-08-16 21:27:26 +02:00
var writeRequestCtxPool leveledWriteRequestCtxPool
2020-08-13 22:12:22 +02:00
2021-09-12 11:49:19 +02:00
func ( sw * scrapeWork ) getSeriesAdded ( currScrape string ) int {
if currScrape == "" {
return 0
2020-09-11 22:36:24 +02:00
}
2021-09-12 11:49:19 +02:00
lastScrape := bytesutil . ToUnsafeString ( sw . lastScrape )
bodyString := parser . GetRowsDiff ( currScrape , lastScrape )
return strings . Count ( bodyString , "\n" )
}
func ( sw * scrapeWork ) applySeriesLimit ( wc * writeRequestCtx ) bool {
2021-09-01 13:14:37 +02:00
seriesLimit := * seriesLimitPerTarget
if sw . Config . SeriesLimit > 0 {
seriesLimit = sw . Config . SeriesLimit
}
if sw . seriesLimiter == nil && seriesLimit > 0 {
sw . seriesLimiter = bloomfilter . NewLimiter ( seriesLimit , 24 * time . Hour )
}
hsl := sw . seriesLimiter
2021-09-12 11:49:19 +02:00
if hsl == nil {
return false
}
2021-09-01 13:14:37 +02:00
dstSeries := wc . writeRequest . Timeseries [ : 0 ]
2021-09-09 16:37:34 +02:00
job := sw . Config . Job ( )
2021-09-12 11:49:19 +02:00
limitExceeded := false
2020-08-13 22:12:22 +02:00
for _ , ts := range wc . writeRequest . Timeseries {
2020-08-10 18:47:43 +02:00
h := sw . getLabelsHash ( ts . Labels )
2021-09-12 11:49:19 +02:00
if ! hsl . Add ( h ) {
2021-09-01 13:14:37 +02:00
// The limit on the number of hourly unique series per scrape target has been exceeded.
// Drop the metric.
2021-09-09 16:37:34 +02:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` promscrape_series_limit_rows_dropped_total { scrape_job_original=%q,scrape_job=%q,scrape_target=%q} ` ,
sw . Config . jobNameOriginal , job , sw . Config . ScrapeURL ) ) . Inc ( )
2021-09-12 11:49:19 +02:00
limitExceeded = true
2021-09-01 13:14:37 +02:00
continue
}
dstSeries = append ( dstSeries , ts )
2020-08-09 11:44:49 +02:00
}
2021-09-01 13:14:37 +02:00
wc . writeRequest . Timeseries = dstSeries
2021-09-12 11:49:19 +02:00
return limitExceeded
2020-09-11 22:36:24 +02:00
}
2020-08-09 11:44:49 +02:00
2021-09-11 09:51:04 +02:00
func ( sw * scrapeWork ) sendStaleSeries ( currScrape string , timestamp int64 , addAutoSeries bool ) {
2021-08-18 12:54:12 +02:00
if * noStaleMarkers {
2021-08-18 12:43:17 +02:00
return
}
2021-09-11 09:51:04 +02:00
lastScrape := bytesutil . ToUnsafeString ( sw . lastScrape )
bodyString := lastScrape
if currScrape != "" {
2021-09-12 11:49:19 +02:00
bodyString = parser . GetRowsDiff ( lastScrape , currScrape )
2021-09-11 09:51:04 +02:00
}
2021-09-12 11:49:19 +02:00
wc := & writeRequestCtx { }
2021-09-11 09:51:04 +02:00
if bodyString != "" {
wc . rows . Unmarshal ( bodyString )
srcRows := wc . rows . Rows
for i := range srcRows {
sw . addRowToTimeseries ( wc , & srcRows [ i ] , timestamp , true )
}
2021-08-21 20:16:50 +02:00
}
if addAutoSeries {
sw . addAutoTimeseries ( wc , "up" , 0 , timestamp )
sw . addAutoTimeseries ( wc , "scrape_duration_seconds" , 0 , timestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_scraped" , 0 , timestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_post_metric_relabeling" , 0 , timestamp )
sw . addAutoTimeseries ( wc , "scrape_series_added" , 0 , timestamp )
2021-08-13 11:10:00 +02:00
}
2021-08-21 20:16:50 +02:00
series := wc . writeRequest . Timeseries
2021-08-18 20:58:40 +02:00
if len ( series ) == 0 {
return
}
2021-08-21 20:16:50 +02:00
// Substitute all the values with Prometheus stale markers.
for _ , tss := range series {
samples := tss . Samples
for i := range samples {
samples [ i ] . Value = decimal . StaleNaN
}
2021-08-13 11:10:00 +02:00
}
2021-08-21 20:16:50 +02:00
sw . pushData ( & wc . writeRequest )
2020-08-09 11:44:49 +02:00
}
2020-08-10 18:47:43 +02:00
func ( sw * scrapeWork ) getLabelsHash ( labels [ ] prompbmarshal . Label ) uint64 {
2020-08-09 11:44:49 +02:00
// It is OK if there will be hash collisions for distinct sets of labels,
// since the accuracy for `scrape_series_added` metric may be lower than 100%.
2020-08-10 18:47:43 +02:00
b := sw . labelsHashBuf [ : 0 ]
2020-08-09 11:44:49 +02:00
for _ , label := range labels {
2020-08-10 18:47:43 +02:00
b = append ( b , label . Name ... )
b = append ( b , label . Value ... )
2020-08-09 11:44:49 +02:00
}
2020-08-10 18:47:43 +02:00
sw . labelsHashBuf = b
return xxhash . Sum64 ( b )
2020-08-09 11:44:49 +02:00
}
2020-02-23 12:35:47 +01:00
// addAutoTimeseries adds automatically generated time series with the given name, value and timestamp.
//
// See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
2020-08-13 22:12:22 +02:00
func ( sw * scrapeWork ) addAutoTimeseries ( wc * writeRequestCtx , name string , value float64 , timestamp int64 ) {
2020-02-23 12:35:47 +01:00
sw . tmpRow . Metric = name
sw . tmpRow . Tags = nil
sw . tmpRow . Value = value
sw . tmpRow . Timestamp = timestamp
2020-08-13 22:12:22 +02:00
sw . addRowToTimeseries ( wc , & sw . tmpRow , timestamp , false )
2020-02-23 12:35:47 +01:00
}
2020-08-13 22:12:22 +02:00
func ( sw * scrapeWork ) addRowToTimeseries ( wc * writeRequestCtx , r * parser . Row , timestamp int64 , needRelabel bool ) {
labelsLen := len ( wc . labels )
wc . labels = appendLabels ( wc . labels , r . Metric , r . Tags , sw . Config . Labels , sw . Config . HonorLabels )
2020-06-29 21:29:29 +02:00
if needRelabel {
2021-06-04 19:27:55 +02:00
wc . labels = sw . Config . MetricRelabelConfigs . Apply ( wc . labels , labelsLen , true )
2020-06-29 21:29:29 +02:00
} else {
2020-08-13 22:12:22 +02:00
wc . labels = promrelabel . FinalizeLabels ( wc . labels [ : labelsLen ] , wc . labels [ labelsLen : ] )
promrelabel . SortLabels ( wc . labels [ labelsLen : ] )
2020-06-29 21:29:29 +02:00
}
2020-08-13 22:12:22 +02:00
if len ( wc . labels ) == labelsLen {
2020-02-23 12:35:47 +01:00
// Skip row without labels.
return
}
2020-09-11 22:36:24 +02:00
sampleTimestamp := r . Timestamp
if ! sw . Config . HonorTimestamps || sampleTimestamp == 0 {
sampleTimestamp = timestamp
2020-02-23 12:35:47 +01:00
}
2020-09-11 22:36:24 +02:00
wc . samples = append ( wc . samples , prompbmarshal . Sample {
Value : r . Value ,
Timestamp : sampleTimestamp ,
} )
2020-08-13 22:12:22 +02:00
wr := & wc . writeRequest
2020-09-11 22:36:24 +02:00
wr . Timeseries = append ( wr . Timeseries , prompbmarshal . TimeSeries {
Labels : wc . labels [ labelsLen : ] ,
Samples : wc . samples [ len ( wc . samples ) - 1 : ] ,
} )
2020-02-23 12:35:47 +01:00
}
func appendLabels ( dst [ ] prompbmarshal . Label , metric string , src [ ] parser . Tag , extraLabels [ ] prompbmarshal . Label , honorLabels bool ) [ ] prompbmarshal . Label {
dstLen := len ( dst )
dst = append ( dst , prompbmarshal . Label {
Name : "__name__" ,
Value : metric ,
} )
for i := range src {
tag := & src [ i ]
dst = append ( dst , prompbmarshal . Label {
Name : tag . Key ,
Value : tag . Value ,
} )
}
dst = append ( dst , extraLabels ... )
labels := dst [ dstLen : ]
if len ( labels ) <= 1 {
// Fast path - only a single label.
return dst
}
// de-duplicate labels
dstLabels := labels [ : 0 ]
for i := range labels {
label := & labels [ i ]
prevLabel := promrelabel . GetLabelByName ( dstLabels , label . Name )
if prevLabel == nil {
dstLabels = append ( dstLabels , * label )
continue
}
if honorLabels {
// Skip the extra label with the same name.
continue
}
// Rename the prevLabel to "exported_" + label.Name.
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
exportedName := "exported_" + label . Name
if promrelabel . GetLabelByName ( dstLabels , exportedName ) != nil {
// Override duplicate with the current label.
* prevLabel = * label
continue
}
prevLabel . Name = exportedName
dstLabels = append ( dstLabels , * label )
}
return dst [ : dstLen + len ( dstLabels ) ]
}