2020-02-23 12:35:47 +01:00
package promscrape
import (
2023-01-24 06:52:57 +01:00
"bytes"
2020-04-16 22:41:16 +02:00
"flag"
2020-04-16 22:34:37 +02:00
"fmt"
2022-08-21 23:13:44 +02:00
"io"
2020-08-10 11:31:59 +02:00
"math"
2020-08-16 21:27:26 +02:00
"math/bits"
2021-09-12 11:49:19 +02:00
"strings"
2020-08-13 22:12:22 +02:00
"sync"
2020-02-23 12:35:47 +01:00
"time"
2023-01-07 07:59:15 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2021-09-01 13:14:37 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2023-01-07 07:59:15 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2021-08-13 11:10:00 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
2021-10-16 11:58:34 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2022-05-25 21:58:30 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2021-10-14 11:29:12 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-08-13 22:12:22 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-04-13 11:59:05 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
2022-11-30 06:22:12 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
2020-02-23 12:35:47 +01:00
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
2023-02-13 18:26:07 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
2020-12-24 09:56:10 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
2021-01-26 23:23:10 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/metrics"
2022-05-26 16:24:01 +02:00
"github.com/cespare/xxhash/v2"
2020-02-23 12:35:47 +01:00
)
2020-04-16 22:41:16 +02:00
var (
suppressScrapeErrors = flag . Bool ( "promscrape.suppressScrapeErrors" , false , "Whether to suppress scrape errors logging. " +
2022-05-25 21:58:30 +02:00
"The last error for each target is always available at '/targets' page even if scrape errors logging is suppressed. " +
"See also -promscrape.suppressScrapeErrorsDelay" )
suppressScrapeErrorsDelay = flag . Duration ( "promscrape.suppressScrapeErrorsDelay" , 0 , "The delay for suppressing repeated scrape errors logging per each scrape targets. " +
"This may be used for reducing the number of log lines related to scrape errors. See also -promscrape.suppressScrapeErrors" )
2021-10-14 11:29:12 +02:00
minResponseSizeForStreamParse = flagutil . NewBytes ( "promscrape.minResponseSizeForStreamParse" , 1e6 , "The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode" )
2020-04-16 22:41:16 +02:00
)
2020-02-23 12:35:47 +01:00
// ScrapeWork represents a unit of work for scraping Prometheus metrics.
2020-12-17 13:30:33 +01:00
//
// It must be immutable during its lifetime, since it is read from concurrently running goroutines.
2020-02-23 12:35:47 +01:00
type ScrapeWork struct {
// Full URL (including query args) for the scrape.
ScrapeURL string
// Interval for scraping the ScrapeURL.
ScrapeInterval time . Duration
// Timeout for scraping the ScrapeURL.
ScrapeTimeout time . Duration
// How to deal with conflicting labels.
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
HonorLabels bool
// How to deal with scraped timestamps.
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
HonorTimestamps bool
2021-04-02 18:56:38 +02:00
// Whether to deny redirects during requests to scrape config.
DenyRedirects bool
2023-07-07 00:59:56 +02:00
// Do not support enable_http2 option because of the following reasons:
//
// - http2 is used very rarely comparing to http for Prometheus metrics exposition and service discovery
// - http2 is much harder to debug than http
// - http2 has very bad security record because of its complexity - see https://portswigger.net/research/http2
//
// VictoriaMetrics components are compiled with nethttpomithttp2 tag because of these issues.
//
// EnableHTTP2 bool
2023-06-05 15:56:49 +02:00
2020-10-08 17:50:22 +02:00
// OriginalLabels contains original labels before relabeling.
//
// These labels are needed for relabeling troubleshooting at /targets page.
2022-10-09 13:51:14 +02:00
//
// OriginalLabels are sorted by name.
2022-11-30 06:22:12 +01:00
OriginalLabels * promutils . Labels
2020-10-08 17:50:22 +02:00
2020-02-23 12:35:47 +01:00
// Labels to add to the scraped metrics.
//
2022-10-07 21:39:28 +02:00
// The list contains at least the following labels according to https://www.robustperception.io/life-of-a-label/
2020-02-23 12:35:47 +01:00
//
// * job
2022-10-07 21:39:28 +02:00
// * instance
2020-02-23 12:35:47 +01:00
// * user-defined labels set via `relabel_configs` section in `scrape_config`
//
// See also https://prometheus.io/docs/concepts/jobs_instances/
2022-10-07 21:39:28 +02:00
//
2022-10-09 13:51:14 +02:00
// Labels are sorted by name.
2022-11-30 06:22:12 +01:00
Labels * promutils . Labels
2020-02-23 12:35:47 +01:00
2022-10-01 15:13:17 +02:00
// ExternalLabels contains labels from global->external_labels section of -promscrape.config
//
// These labels are added to scraped metrics after the relabeling.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3137
2022-10-09 13:51:14 +02:00
//
// ExternalLabels are sorted by name.
2022-11-30 06:22:12 +01:00
ExternalLabels * promutils . Labels
2022-10-01 15:13:17 +02:00
2020-12-24 09:52:37 +01:00
// ProxyURL HTTP proxy url
2021-10-26 20:21:08 +02:00
ProxyURL * proxy . URL
2020-12-24 09:52:37 +01:00
2021-03-12 02:35:49 +01:00
// Auth config for ProxyUR:
ProxyAuthConfig * promauth . Config
// Auth config
AuthConfig * promauth . Config
2022-12-10 11:09:21 +01:00
// Optional `relabel_configs`.
RelabelConfigs * promrelabel . ParsedConfigs
2020-02-23 12:35:47 +01:00
// Optional `metric_relabel_configs`.
2021-02-22 15:33:55 +01:00
MetricRelabelConfigs * promrelabel . ParsedConfigs
2020-02-23 12:35:47 +01:00
// The maximum number of metrics to scrape after relabeling.
2020-04-14 10:58:15 +02:00
SampleLimit int
2020-06-23 14:35:19 +02:00
2020-07-02 13:19:11 +02:00
// Whether to disable response compression when querying ScrapeURL.
DisableCompression bool
// Whether to disable HTTP keep-alive when querying ScrapeURL.
DisableKeepAlive bool
2020-11-01 22:12:13 +01:00
// Whether to parse target responses in a streaming manner.
StreamParse bool
2021-02-18 22:51:29 +01:00
// The interval for aligning the first scrape.
ScrapeAlignInterval time . Duration
2021-03-08 10:58:25 +01:00
// The offset for the first scrape.
ScrapeOffset time . Duration
2021-09-01 13:14:37 +02:00
// Optional limit on the number of unique series the scrape target can expose.
SeriesLimit int
2022-10-07 22:36:11 +02:00
// Whether to process stale markers for the given target.
// See https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers
NoStaleMarkers bool
2023-02-01 20:21:44 +01:00
// The Tenant Info
2022-08-08 13:10:18 +02:00
AuthToken * auth . Token
2020-06-23 14:35:19 +02:00
// The original 'job_name'
jobNameOriginal string
2020-02-23 12:35:47 +01:00
}
2021-10-15 14:26:22 +02:00
func ( sw * ScrapeWork ) canSwitchToStreamParseMode ( ) bool {
// Deny switching to stream parse mode if `sample_limit` or `series_limit` options are set,
// since these limits cannot be applied in stream parsing mode.
2021-10-14 11:29:12 +02:00
return sw . SampleLimit <= 0 && sw . SeriesLimit <= 0
}
2020-05-03 11:41:13 +02:00
// key returns unique identifier for the given sw.
//
2021-12-20 17:38:03 +01:00
// It can be used for comparing for equality for two ScrapeWork objects.
2020-05-03 11:41:13 +02:00
func ( sw * ScrapeWork ) key ( ) string {
2021-12-20 17:38:03 +01:00
// Do not take into account OriginalLabels, since they can be changed with relabeling.
2022-12-10 11:09:21 +01:00
// Do not take into account RelabelConfigs, since it is already applied to Labels.
2021-12-20 17:38:03 +01:00
// Take into account JobNameOriginal in order to capture the case when the original job_name is changed via relabeling.
key := fmt . Sprintf ( "JobNameOriginal=%s, ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, DenyRedirects=%v, Labels=%s, " +
2022-10-01 15:13:17 +02:00
"ExternalLabels=%s, " +
2022-12-10 11:09:21 +01:00
"ProxyURL=%s, ProxyAuthConfig=%s, AuthConfig=%s, MetricRelabelConfigs=%q, " +
"SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v, StreamParse=%v, " +
2022-10-07 22:36:11 +02:00
"ScrapeAlignInterval=%s, ScrapeOffset=%s, SeriesLimit=%d, NoStaleMarkers=%v" ,
2022-12-10 11:09:21 +01:00
sw . jobNameOriginal , sw . ScrapeURL , sw . ScrapeInterval , sw . ScrapeTimeout , sw . HonorLabels , sw . HonorTimestamps , sw . DenyRedirects , sw . Labels . String ( ) ,
2022-11-30 06:22:12 +01:00
sw . ExternalLabels . String ( ) ,
2022-12-10 11:09:21 +01:00
sw . ProxyURL . String ( ) , sw . ProxyAuthConfig . String ( ) , sw . AuthConfig . String ( ) , sw . MetricRelabelConfigs . String ( ) ,
sw . SampleLimit , sw . DisableCompression , sw . DisableKeepAlive , sw . StreamParse ,
2022-10-07 22:36:11 +02:00
sw . ScrapeAlignInterval , sw . ScrapeOffset , sw . SeriesLimit , sw . NoStaleMarkers )
2020-05-03 11:41:13 +02:00
return key
}
2020-04-14 12:32:55 +02:00
// Job returns job for the ScrapeWork
func ( sw * ScrapeWork ) Job ( ) string {
2022-11-30 06:22:12 +01:00
return sw . Labels . Get ( "job" )
2020-04-14 12:32:55 +02:00
}
2020-02-23 12:35:47 +01:00
type scrapeWork struct {
// Config for the scrape.
2020-12-17 13:30:33 +01:00
Config * ScrapeWork
2020-02-23 12:35:47 +01:00
// ReadData is called for reading the data.
ReadData func ( dst [ ] byte ) ( [ ] byte , error )
2020-11-01 22:12:13 +01:00
// GetStreamReader is called if Config.StreamParse is set.
GetStreamReader func ( ) ( * streamReader , error )
2020-02-23 12:35:47 +01:00
// PushData is called for pushing collected data.
2022-08-08 13:10:18 +02:00
PushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest )
2020-02-23 12:35:47 +01:00
2020-07-13 20:52:03 +02:00
// ScrapeGroup is name of ScrapeGroup that
// scrapeWork belongs to
ScrapeGroup string
2020-08-13 22:12:22 +02:00
tmpRow parser . Row
2020-08-09 11:44:49 +02:00
2021-09-12 11:49:19 +02:00
// This flag is set to true if series_limit is exceeded.
seriesLimitExceeded bool
// labelsHashBuf is used for calculating the hash on series labels
2020-08-10 18:47:43 +02:00
labelsHashBuf [ ] byte
2020-08-13 22:12:22 +02:00
2021-09-01 13:14:37 +02:00
// Optional limiter on the number of unique series per scrape target.
seriesLimiter * bloomfilter . Limiter
2020-08-14 00:16:18 +02:00
// prevBodyLen contains the previous response body length for the given scrape work.
2020-08-13 22:12:22 +02:00
// It is used as a hint in order to reduce memory usage for body buffers.
2020-08-14 00:16:18 +02:00
prevBodyLen int
2020-08-16 21:27:26 +02:00
2021-03-14 21:56:23 +01:00
// prevLabelsLen contains the number labels scraped during the previous scrape.
2020-08-16 21:27:26 +02:00
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
2021-03-14 21:56:23 +01:00
prevLabelsLen int
2021-08-13 11:10:00 +02:00
2021-08-21 20:16:50 +02:00
// lastScrape holds the last response from scrape target.
2021-10-22 12:10:26 +02:00
// It is used for staleness tracking and for populating scrape_series_added metric.
// The lastScrape isn't populated if -promscrape.noStaleMarkers is set. This reduces memory usage.
2021-08-21 20:16:50 +02:00
lastScrape [ ] byte
2021-10-16 11:58:34 +02:00
// lastScrapeCompressed is used for storing the compressed lastScrape between scrapes
// in stream parsing mode in order to reduce memory usage when the lastScrape size
// equals to or exceeds -promscrape.minResponseSizeForStreamParse
lastScrapeCompressed [ ] byte
2022-05-25 21:58:30 +02:00
2023-01-12 10:09:26 +01:00
// nextErrorLogTime is the timestamp in millisecond when the next scrape error should be logged.
nextErrorLogTime int64
2022-05-25 21:58:30 +02:00
2023-01-12 10:09:26 +01:00
// failureRequestsCount is the number of suppressed scrape errors during the last suppressScrapeErrorsDelay
failureRequestsCount int
// successRequestsCount is the number of success requests during the last suppressScrapeErrorsDelay
successRequestsCount int
2021-10-16 11:58:34 +02:00
}
2021-10-22 12:10:26 +02:00
func ( sw * scrapeWork ) loadLastScrape ( ) string {
if len ( sw . lastScrapeCompressed ) > 0 {
b , err := encoding . DecompressZSTD ( sw . lastScrape [ : 0 ] , sw . lastScrapeCompressed )
if err != nil {
logger . Panicf ( "BUG: cannot unpack compressed previous response: %s" , err )
}
sw . lastScrape = b
2021-10-16 11:58:34 +02:00
}
2021-10-22 12:10:26 +02:00
return bytesutil . ToUnsafeString ( sw . lastScrape )
2021-10-16 11:58:34 +02:00
}
func ( sw * scrapeWork ) storeLastScrape ( lastScrape [ ] byte ) {
2022-12-15 04:26:24 +01:00
mustCompress := minResponseSizeForStreamParse . N > 0 && len ( lastScrape ) >= minResponseSizeForStreamParse . IntN ( )
2021-10-16 11:58:34 +02:00
if mustCompress {
sw . lastScrapeCompressed = encoding . CompressZSTDLevel ( sw . lastScrapeCompressed [ : 0 ] , lastScrape , 1 )
sw . lastScrape = nil
} else {
sw . lastScrape = append ( sw . lastScrape [ : 0 ] , lastScrape ... )
sw . lastScrapeCompressed = nil
}
}
func ( sw * scrapeWork ) finalizeLastScrape ( ) {
if len ( sw . lastScrapeCompressed ) > 0 {
// The compressed lastScrape is available in sw.lastScrapeCompressed.
// Release the memory occupied by sw.lastScrape, so it won't be occupied between scrapes.
sw . lastScrape = nil
}
if len ( sw . lastScrape ) > 0 {
// Release the memory occupied by sw.lastScrapeCompressed, so it won't be occupied between scrapes.
sw . lastScrapeCompressed = nil
}
2020-02-23 12:35:47 +01:00
}
2022-01-07 00:17:55 +01:00
func ( sw * scrapeWork ) run ( stopCh <- chan struct { } , globalStopCh <- chan struct { } ) {
2021-02-18 22:51:29 +01:00
var randSleep uint64
2021-03-08 10:58:25 +01:00
scrapeInterval := sw . Config . ScrapeInterval
scrapeAlignInterval := sw . Config . ScrapeAlignInterval
scrapeOffset := sw . Config . ScrapeOffset
if scrapeOffset > 0 {
scrapeAlignInterval = scrapeInterval
}
if scrapeAlignInterval <= 0 {
2021-02-18 22:51:29 +01:00
// Calculate start time for the first scrape from ScrapeURL and labels.
// This should spread load when scraping many targets with different
// scrape urls and labels.
// This also makes consistent scrape times across restarts
// for a target with the same ScrapeURL and labels.
2021-12-22 23:20:34 +01:00
//
2022-06-03 23:35:51 +02:00
// Include clusterName to the key in order to guarantee that the same
// scrape target is scraped at different offsets per each cluster.
// This guarantees that the deduplication consistently leaves samples received from the same vmagent.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2679
//
// Include clusterMemberID to the key in order to guarantee that each member in vmagent cluster
2021-12-22 23:20:34 +01:00
// scrapes replicated targets at different time offsets. This guarantees that the deduplication consistently leaves samples
// received from the same vmagent replica.
// See https://docs.victoriametrics.com/vmagent.html#scraping-big-number-of-targets
2022-12-10 11:09:21 +01:00
key := fmt . Sprintf ( "clusterName=%s, clusterMemberID=%d, ScrapeURL=%s, Labels=%s" , * clusterName , clusterMemberID , sw . Config . ScrapeURL , sw . Config . Labels . String ( ) )
2021-10-27 18:59:13 +02:00
h := xxhash . Sum64 ( bytesutil . ToUnsafeBytes ( key ) )
randSleep = uint64 ( float64 ( scrapeInterval ) * ( float64 ( h ) / ( 1 << 64 ) ) )
2021-02-18 22:51:29 +01:00
sleepOffset := uint64 ( time . Now ( ) . UnixNano ( ) ) % uint64 ( scrapeInterval )
if randSleep < sleepOffset {
randSleep += uint64 ( scrapeInterval )
}
randSleep -= sleepOffset
} else {
2021-03-08 10:58:25 +01:00
d := uint64 ( scrapeAlignInterval )
2021-02-18 22:51:29 +01:00
randSleep = d - uint64 ( time . Now ( ) . UnixNano ( ) ) % d
2021-03-08 10:58:25 +01:00
if scrapeOffset > 0 {
randSleep += uint64 ( scrapeOffset )
}
2021-02-18 22:51:29 +01:00
randSleep %= uint64 ( scrapeInterval )
2020-05-03 13:29:26 +02:00
}
2021-01-26 23:23:10 +01:00
timer := timerpool . Get ( time . Duration ( randSleep ) )
2020-04-01 15:10:35 +02:00
var timestamp int64
2020-02-23 12:35:47 +01:00
var ticker * time . Ticker
select {
case <- stopCh :
2021-01-26 23:23:10 +01:00
timerpool . Put ( timer )
2020-02-23 12:35:47 +01:00
return
2020-04-01 15:10:35 +02:00
case <- timer . C :
2021-01-26 23:23:10 +01:00
timerpool . Put ( timer )
2020-04-01 15:10:35 +02:00
ticker = time . NewTicker ( scrapeInterval )
timestamp = time . Now ( ) . UnixNano ( ) / 1e6
2020-08-10 11:31:59 +02:00
sw . scrapeAndLogError ( timestamp , timestamp )
2020-02-23 12:35:47 +01:00
}
defer ticker . Stop ( )
for {
2020-04-01 15:10:35 +02:00
timestamp += scrapeInterval . Milliseconds ( )
2020-02-23 12:35:47 +01:00
select {
case <- stopCh :
2021-08-19 13:18:02 +02:00
t := time . Now ( ) . UnixNano ( ) / 1e6
2021-10-22 12:10:26 +02:00
lastScrape := sw . loadLastScrape ( )
2022-01-07 00:17:55 +01:00
select {
case <- globalStopCh :
// Do not send staleness markers on graceful shutdown as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2013#issuecomment-1006994079
default :
2022-06-23 09:55:14 +02:00
// Send staleness markers to all the metrics scraped last time from the target
// when the given target disappears as Prometheus does.
// Use the current real timestamp for staleness markers, so queries
// stop returning data just after the time the target disappears.
2022-01-07 00:17:55 +01:00
sw . sendStaleSeries ( lastScrape , "" , t , true )
}
2021-09-01 13:14:37 +02:00
if sw . seriesLimiter != nil {
sw . seriesLimiter . MustStop ( )
2022-08-17 12:18:47 +02:00
sw . seriesLimiter = nil
2021-09-01 13:14:37 +02:00
}
2020-02-23 12:35:47 +01:00
return
2020-08-10 11:31:59 +02:00
case tt := <- ticker . C :
t := tt . UnixNano ( ) / 1e6
2023-10-02 20:59:48 +02:00
if d := math . Abs ( float64 ( t - timestamp ) ) ; d > 0 && d / float64 ( scrapeInterval . Milliseconds ( ) ) > 0.1 {
// Too big jitter. Adjust timestamp
timestamp = t
2020-02-23 12:35:47 +01:00
}
2020-08-10 11:31:59 +02:00
sw . scrapeAndLogError ( timestamp , t )
2020-02-23 12:35:47 +01:00
}
}
}
func ( sw * scrapeWork ) logError ( s string ) {
2020-04-16 22:41:16 +02:00
if ! * suppressScrapeErrors {
2020-12-06 22:26:34 +01:00
logger . ErrorfSkipframes ( 1 , "error when scraping %q from job %q with labels %s: %s; " +
"scrape errors can be disabled by -promscrape.suppressScrapeErrors command-line flag" ,
2022-12-10 11:09:21 +01:00
sw . Config . ScrapeURL , sw . Config . Job ( ) , sw . Config . Labels . String ( ) , s )
2020-04-16 22:41:16 +02:00
}
2020-02-23 12:35:47 +01:00
}
2020-08-10 11:31:59 +02:00
func ( sw * scrapeWork ) scrapeAndLogError ( scrapeTimestamp , realTimestamp int64 ) {
2022-05-25 21:58:30 +02:00
err := sw . scrapeInternal ( scrapeTimestamp , realTimestamp )
2023-01-12 10:09:26 +01:00
if * suppressScrapeErrors {
2022-05-25 21:58:30 +02:00
return
}
2023-01-12 10:09:26 +01:00
if err == nil {
sw . successRequestsCount ++
2022-05-25 21:58:30 +02:00
return
}
2023-01-12 10:09:26 +01:00
sw . failureRequestsCount ++
if sw . nextErrorLogTime == 0 {
sw . nextErrorLogTime = realTimestamp + suppressScrapeErrorsDelay . Milliseconds ( )
}
if realTimestamp < sw . nextErrorLogTime {
return
2020-02-23 12:35:47 +01:00
}
2023-01-12 10:09:26 +01:00
totalRequests := sw . failureRequestsCount + sw . successRequestsCount
logger . Warnf ( "cannot scrape target %q (%s) %d out of %d times during -promscrape.suppressScrapeErrorsDelay=%s; the last error: %s" ,
sw . Config . ScrapeURL , sw . Config . Labels . String ( ) , sw . failureRequestsCount , totalRequests , * suppressScrapeErrorsDelay , err )
sw . nextErrorLogTime = realTimestamp + suppressScrapeErrorsDelay . Milliseconds ( )
sw . failureRequestsCount = 0
sw . successRequestsCount = 0
2020-02-23 12:35:47 +01:00
}
var (
scrapeDuration = metrics . NewHistogram ( "vm_promscrape_scrape_duration_seconds" )
scrapeResponseSize = metrics . NewHistogram ( "vm_promscrape_scrape_response_size_bytes" )
scrapedSamples = metrics . NewHistogram ( "vm_promscrape_scraped_samples" )
2020-04-14 10:58:15 +02:00
scrapesSkippedBySampleLimit = metrics . NewCounter ( "vm_promscrape_scrapes_skipped_by_sample_limit_total" )
2020-02-23 12:35:47 +01:00
scrapesFailed = metrics . NewCounter ( "vm_promscrape_scrapes_failed_total" )
pushDataDuration = metrics . NewHistogram ( "vm_promscrape_push_data_duration_seconds" )
)
2021-10-15 14:26:22 +02:00
func ( sw * scrapeWork ) mustSwitchToStreamParseMode ( responseSize int ) bool {
if minResponseSizeForStreamParse . N <= 0 {
return false
}
2022-12-15 04:26:24 +01:00
return sw . Config . canSwitchToStreamParseMode ( ) && responseSize >= minResponseSizeForStreamParse . IntN ( )
2021-10-15 14:26:22 +02:00
}
2022-02-03 17:57:36 +01:00
// getTargetResponse() fetches response from sw target in the same way as when scraping the target.
func ( sw * scrapeWork ) getTargetResponse ( ) ( [ ] byte , error ) {
2023-07-07 00:59:56 +02:00
// use stream reader when stream mode enabled
if * streamParse || sw . Config . StreamParse || sw . mustSwitchToStreamParseMode ( sw . prevBodyLen ) {
2022-02-03 17:57:36 +01:00
// Read the response in stream mode.
sr , err := sw . GetStreamReader ( )
if err != nil {
return nil , err
}
2022-08-21 23:13:44 +02:00
data , err := io . ReadAll ( sr )
2022-02-03 17:57:36 +01:00
sr . MustClose ( )
return data , err
}
// Read the response in usual mode.
return sw . ReadData ( nil )
}
2020-08-10 11:31:59 +02:00
func ( sw * scrapeWork ) scrapeInternal ( scrapeTimestamp , realTimestamp int64 ) error {
2021-10-16 11:58:34 +02:00
if * streamParse || sw . Config . StreamParse || sw . mustSwitchToStreamParseMode ( sw . prevBodyLen ) {
2020-11-01 22:12:13 +01:00
// Read data from scrape targets in streaming manner.
2021-10-14 11:29:12 +02:00
// This case is optimized for targets exposing more than ten thousand of metrics per target.
2020-11-01 22:12:13 +01:00
return sw . scrapeStream ( scrapeTimestamp , realTimestamp )
}
// Common case: read all the data from scrape target to memory (body) and then process it.
2021-08-21 20:16:50 +02:00
// This case should work more optimally than stream parse code for common case when scrape target exposes
2020-11-26 12:33:46 +01:00
// up to a few thousand metrics.
2020-08-14 00:16:18 +02:00
body := leveledbytebufferpool . Get ( sw . prevBodyLen )
2020-02-23 12:35:47 +01:00
var err error
2020-08-13 22:12:22 +02:00
body . B , err = sw . ReadData ( body . B [ : 0 ] )
2023-01-07 07:59:15 +01:00
releaseBody , err := sw . processScrapedData ( scrapeTimestamp , realTimestamp , body , err )
if releaseBody {
leveledbytebufferpool . Put ( body )
}
return err
}
2023-01-18 08:09:34 +01:00
var processScrapedDataConcurrencyLimitCh = make ( chan struct { } , cgroup . AvailableCPUs ( ) )
2023-01-07 07:59:15 +01:00
func ( sw * scrapeWork ) processScrapedData ( scrapeTimestamp , realTimestamp int64 , body * bytesutil . ByteBuffer , err error ) ( bool , error ) {
// This function is CPU-bound, while it may allocate big amounts of memory.
// That's why it is a good idea to limit the number of concurrent calls to this function
// in order to limit memory usage under high load without sacrificing the performance.
2023-01-18 08:09:34 +01:00
processScrapedDataConcurrencyLimitCh <- struct { } { }
2023-01-07 07:59:15 +01:00
defer func ( ) {
2023-01-18 08:09:34 +01:00
<- processScrapedDataConcurrencyLimitCh
2023-01-07 07:59:15 +01:00
} ( )
2020-02-23 12:35:47 +01:00
endTimestamp := time . Now ( ) . UnixNano ( ) / 1e6
2020-08-10 11:31:59 +02:00
duration := float64 ( endTimestamp - realTimestamp ) / 1e3
2020-02-23 12:35:47 +01:00
scrapeDuration . Update ( duration )
2020-08-13 22:12:22 +02:00
scrapeResponseSize . Update ( float64 ( len ( body . B ) ) )
2020-02-23 12:35:47 +01:00
up := 1
2021-03-14 21:56:23 +01:00
wc := writeRequestCtxPool . Get ( sw . prevLabelsLen )
2021-10-22 12:10:26 +02:00
lastScrape := sw . loadLastScrape ( )
2021-08-21 20:16:50 +02:00
bodyString := bytesutil . ToUnsafeString ( body . B )
2023-01-17 19:14:46 +01:00
areIdenticalSeries := sw . areIdenticalSeries ( lastScrape , bodyString )
2020-02-23 12:35:47 +01:00
if err != nil {
up = 0
scrapesFailed . Inc ( )
} else {
2020-08-13 22:12:22 +02:00
wc . rows . UnmarshalWithErrLogger ( bodyString , sw . logError )
2020-02-23 12:35:47 +01:00
}
2020-08-13 22:12:22 +02:00
srcRows := wc . rows . Rows
2020-02-23 12:35:47 +01:00
samplesScraped := len ( srcRows )
scrapedSamples . Update ( float64 ( samplesScraped ) )
for i := range srcRows {
2020-08-13 22:12:22 +02:00
sw . addRowToTimeseries ( wc , & srcRows [ i ] , scrapeTimestamp , true )
2020-02-23 12:35:47 +01:00
}
2021-03-14 21:56:23 +01:00
samplesPostRelabeling := len ( wc . writeRequest . Timeseries )
2021-03-09 14:47:15 +01:00
if sw . Config . SampleLimit > 0 && samplesPostRelabeling > sw . Config . SampleLimit {
wc . resetNoRows ( )
up = 0
scrapesSkippedBySampleLimit . Inc ( )
2021-05-27 13:52:44 +02:00
err = fmt . Errorf ( "the response from %q exceeds sample_limit=%d; " +
"either reduce the sample count for the target or increase sample_limit" , sw . Config . ScrapeURL , sw . Config . SampleLimit )
2021-03-09 14:47:15 +01:00
}
2021-09-12 11:49:19 +02:00
if up == 0 {
bodyString = ""
}
seriesAdded := 0
if ! areIdenticalSeries {
// The returned value for seriesAdded may be bigger than the real number of added series
// if some series were removed during relabeling.
// This is a trade-off between performance and accuracy.
2021-10-22 12:10:26 +02:00
seriesAdded = sw . getSeriesAdded ( lastScrape , bodyString )
2021-09-12 11:49:19 +02:00
}
2022-08-17 12:18:47 +02:00
samplesDropped := 0
2021-09-12 11:49:19 +02:00
if sw . seriesLimitExceeded || ! areIdenticalSeries {
2022-08-17 12:18:47 +02:00
samplesDropped = sw . applySeriesLimit ( wc )
2021-09-12 11:49:19 +02:00
}
2022-08-17 12:18:47 +02:00
am := & autoMetrics {
up : up ,
scrapeDurationSeconds : duration ,
samplesScraped : samplesScraped ,
samplesPostRelabeling : samplesPostRelabeling ,
seriesAdded : seriesAdded ,
seriesLimitSamplesDropped : samplesDropped ,
2022-07-06 11:37:13 +02:00
}
2022-08-17 12:18:47 +02:00
sw . addAutoMetrics ( am , wc , scrapeTimestamp )
2022-08-08 13:10:18 +02:00
sw . pushData ( sw . Config . AuthToken , & wc . writeRequest )
2021-03-14 21:56:23 +01:00
sw . prevLabelsLen = len ( wc . labels )
2021-10-16 11:58:34 +02:00
sw . prevBodyLen = len ( bodyString )
2020-08-13 22:12:22 +02:00
wc . reset ( )
2021-10-16 11:58:34 +02:00
mustSwitchToStreamParse := sw . mustSwitchToStreamParseMode ( len ( bodyString ) )
if ! mustSwitchToStreamParse {
2021-10-14 11:29:12 +02:00
// Return wc to the pool if the parsed response size was smaller than -promscrape.minResponseSizeForStreamParse
// This should reduce memory usage when scraping targets with big responses.
writeRequestCtxPool . Put ( wc )
}
2020-08-13 22:12:22 +02:00
// body must be released only after wc is released, since wc refers to body.
2021-09-12 11:49:19 +02:00
if ! areIdenticalSeries {
2022-06-23 09:55:14 +02:00
// Send stale markers for disappeared metrics with the real scrape timestamp
// in order to guarantee that query doesn't return data after this time for the disappeared metrics.
sw . sendStaleSeries ( lastScrape , bodyString , realTimestamp , false )
2021-10-16 11:58:34 +02:00
sw . storeLastScrape ( body . B )
2021-09-12 11:49:19 +02:00
}
2021-10-16 11:58:34 +02:00
sw . finalizeLastScrape ( )
2022-06-06 23:57:05 +02:00
tsmGlobal . Update ( sw , up == 1 , realTimestamp , int64 ( duration * 1000 ) , samplesScraped , err )
2023-01-07 07:59:15 +01:00
return ! mustSwitchToStreamParse , err
2021-08-18 20:58:40 +02:00
}
2022-08-08 13:10:18 +02:00
func ( sw * scrapeWork ) pushData ( at * auth . Token , wr * prompbmarshal . WriteRequest ) {
2021-08-13 11:10:00 +02:00
startTime := time . Now ( )
2022-08-08 13:10:18 +02:00
sw . PushData ( at , wr )
2021-08-13 11:10:00 +02:00
pushDataDuration . UpdateDuration ( startTime )
}
2021-10-15 14:26:22 +02:00
type streamBodyReader struct {
2022-09-14 12:14:04 +02:00
body [ ] byte
bodyLen int
readOffset int
}
func ( sbr * streamBodyReader ) Init ( sr * streamReader ) error {
sbr . body = nil
sbr . bodyLen = 0
sbr . readOffset = 0
// Read the whole response body in memory before parsing it in stream mode.
// This minimizes the time needed for reading response body from scrape target.
startTime := fasttime . UnixTimestamp ( )
body , err := io . ReadAll ( sr )
if err != nil {
d := fasttime . UnixTimestamp ( ) - startTime
return fmt . Errorf ( "cannot read stream body in %d seconds: %w" , d , err )
}
sbr . body = body
sbr . bodyLen = len ( body )
return nil
2021-10-15 14:26:22 +02:00
}
func ( sbr * streamBodyReader ) Read ( b [ ] byte ) ( int , error ) {
2022-09-14 12:14:04 +02:00
if sbr . readOffset >= len ( sbr . body ) {
return 0 , io . EOF
2021-10-22 15:44:41 +02:00
}
2022-09-14 12:14:04 +02:00
n := copy ( b , sbr . body [ sbr . readOffset : ] )
sbr . readOffset += n
return n , nil
2021-10-15 14:26:22 +02:00
}
2020-11-01 22:12:13 +01:00
func ( sw * scrapeWork ) scrapeStream ( scrapeTimestamp , realTimestamp int64 ) error {
samplesScraped := 0
samplesPostRelabeling := 0
2021-03-14 21:56:23 +01:00
wc := writeRequestCtxPool . Get ( sw . prevLabelsLen )
2021-10-16 11:58:34 +02:00
// Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses.
2022-09-14 12:14:04 +02:00
var sbr streamBodyReader
2020-12-15 13:08:06 +01:00
2022-12-09 01:33:33 +01:00
lastScrape := sw . loadLastScrape ( )
bodyString := ""
areIdenticalSeries := true
samplesDropped := 0
2020-12-15 13:08:06 +01:00
sr , err := sw . GetStreamReader ( )
if err != nil {
2023-10-25 21:24:01 +02:00
err = fmt . Errorf ( "cannot read data: %w" , err )
2020-12-15 13:08:06 +01:00
} else {
var mu sync . Mutex
2022-09-14 12:14:04 +02:00
err = sbr . Init ( sr )
2022-09-14 14:06:31 +02:00
if err == nil {
2022-12-09 01:33:33 +01:00
bodyString = bytesutil . ToUnsafeString ( sbr . body )
2023-01-17 19:14:46 +01:00
areIdenticalSeries = sw . areIdenticalSeries ( lastScrape , bodyString )
2023-10-02 21:32:11 +02:00
err = stream . Parse ( & sbr , scrapeTimestamp , false , false , func ( rows [ ] parser . Row ) error {
2022-09-14 12:14:04 +02:00
mu . Lock ( )
defer mu . Unlock ( )
samplesScraped += len ( rows )
for i := range rows {
sw . addRowToTimeseries ( wc , & rows [ i ] , scrapeTimestamp , true )
}
samplesPostRelabeling += len ( wc . writeRequest . Timeseries )
if sw . Config . SampleLimit > 0 && samplesPostRelabeling > sw . Config . SampleLimit {
wc . resetNoRows ( )
scrapesSkippedBySampleLimit . Inc ( )
return fmt . Errorf ( "the response from %q exceeds sample_limit=%d; " +
"either reduce the sample count for the target or increase sample_limit" , sw . Config . ScrapeURL , sw . Config . SampleLimit )
}
2022-12-09 01:33:33 +01:00
if sw . seriesLimitExceeded || ! areIdenticalSeries {
samplesDropped += sw . applySeriesLimit ( wc )
}
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
2022-09-14 12:14:04 +02:00
sw . pushData ( sw . Config . AuthToken , & wc . writeRequest )
2021-05-27 13:52:44 +02:00
wc . resetNoRows ( )
2022-09-14 12:14:04 +02:00
return nil
} , sw . logError )
}
2020-12-15 13:08:06 +01:00
sr . MustClose ( )
}
2020-11-01 22:12:13 +01:00
scrapedSamples . Update ( float64 ( samplesScraped ) )
endTimestamp := time . Now ( ) . UnixNano ( ) / 1e6
duration := float64 ( endTimestamp - realTimestamp ) / 1e3
scrapeDuration . Update ( duration )
2021-10-22 15:44:41 +02:00
scrapeResponseSize . Update ( float64 ( sbr . bodyLen ) )
2020-11-01 22:12:13 +01:00
up := 1
if err != nil {
2022-07-22 12:28:57 +02:00
// Mark the scrape as failed even if it already read and pushed some samples
// to remote storage. This makes the logic compatible with Prometheus.
up = 0
2020-11-01 22:12:13 +01:00
scrapesFailed . Inc ( )
}
2021-10-15 14:26:22 +02:00
seriesAdded := 0
if ! areIdenticalSeries {
// The returned value for seriesAdded may be bigger than the real number of added series
// if some series were removed during relabeling.
// This is a trade-off between performance and accuracy.
2021-10-22 12:10:26 +02:00
seriesAdded = sw . getSeriesAdded ( lastScrape , bodyString )
2021-10-15 14:26:22 +02:00
}
2022-08-17 12:18:47 +02:00
am := & autoMetrics {
2022-12-09 01:33:33 +01:00
up : up ,
scrapeDurationSeconds : duration ,
samplesScraped : samplesScraped ,
samplesPostRelabeling : samplesPostRelabeling ,
seriesAdded : seriesAdded ,
seriesLimitSamplesDropped : samplesDropped ,
2022-08-17 12:18:47 +02:00
}
sw . addAutoMetrics ( am , wc , scrapeTimestamp )
2022-08-08 13:10:18 +02:00
sw . pushData ( sw . Config . AuthToken , & wc . writeRequest )
2021-03-14 21:56:23 +01:00
sw . prevLabelsLen = len ( wc . labels )
2021-10-22 15:44:41 +02:00
sw . prevBodyLen = sbr . bodyLen
2020-11-01 22:12:13 +01:00
wc . reset ( )
writeRequestCtxPool . Put ( wc )
2021-10-15 14:26:22 +02:00
if ! areIdenticalSeries {
2022-06-23 09:55:14 +02:00
// Send stale markers for disappeared metrics with the real scrape timestamp
// in order to guarantee that query doesn't return data after this time for the disappeared metrics.
sw . sendStaleSeries ( lastScrape , bodyString , realTimestamp , false )
2021-10-16 11:58:34 +02:00
sw . storeLastScrape ( sbr . body )
2021-10-15 14:26:22 +02:00
}
2021-10-16 11:58:34 +02:00
sw . finalizeLastScrape ( )
2022-06-06 23:57:05 +02:00
tsmGlobal . Update ( sw , up == 1 , realTimestamp , int64 ( duration * 1000 ) , samplesScraped , err )
2021-08-21 20:16:50 +02:00
// Do not track active series in streaming mode, since this may need too big amounts of memory
// when the target exports too big number of metrics.
2021-01-12 12:31:47 +01:00
return err
2020-11-01 22:12:13 +01:00
}
2023-01-17 19:14:46 +01:00
func ( sw * scrapeWork ) areIdenticalSeries ( prevData , currData string ) bool {
if sw . Config . NoStaleMarkers && sw . Config . SeriesLimit <= 0 {
// Do not spend CPU time on tracking the changes in series if stale markers are disabled.
// The check for series_limit is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
return true
}
return parser . AreIdenticalSeriesFast ( prevData , currData )
}
2020-09-01 09:55:21 +02:00
// leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx
2020-08-16 21:27:26 +02:00
// structs contain mixed number of labels.
//
// Its logic has been copied from leveledbytebufferpool.
type leveledWriteRequestCtxPool struct {
2021-03-14 21:56:23 +01:00
pools [ 13 ] sync . Pool
2020-08-16 21:27:26 +02:00
}
2021-03-14 21:56:23 +01:00
func ( lwp * leveledWriteRequestCtxPool ) Get ( labelsCapacity int ) * writeRequestCtx {
id , capacityNeeded := lwp . getPoolIDAndCapacity ( labelsCapacity )
2020-08-16 21:27:26 +02:00
for i := 0 ; i < 2 ; i ++ {
if id < 0 || id >= len ( lwp . pools ) {
break
}
if v := lwp . pools [ id ] . Get ( ) ; v != nil {
return v . ( * writeRequestCtx )
}
id ++
}
return & writeRequestCtx {
labels : make ( [ ] prompbmarshal . Label , 0 , capacityNeeded ) ,
}
}
func ( lwp * leveledWriteRequestCtxPool ) Put ( wc * writeRequestCtx ) {
2021-03-14 21:56:23 +01:00
capacity := cap ( wc . labels )
id , poolCapacity := lwp . getPoolIDAndCapacity ( capacity )
if capacity <= poolCapacity {
wc . reset ( )
lwp . pools [ id ] . Put ( wc )
}
2020-08-16 21:27:26 +02:00
}
2020-08-28 08:44:08 +02:00
func ( lwp * leveledWriteRequestCtxPool ) getPoolIDAndCapacity ( size int ) ( int , int ) {
2020-08-16 21:27:26 +02:00
size --
if size < 0 {
size = 0
}
size >>= 3
id := bits . Len ( uint ( size ) )
2021-03-14 21:56:23 +01:00
if id >= len ( lwp . pools ) {
2020-08-16 21:27:26 +02:00
id = len ( lwp . pools ) - 1
}
return id , ( 1 << ( id + 3 ) )
}
2020-08-13 22:12:22 +02:00
type writeRequestCtx struct {
rows parser . Rows
writeRequest prompbmarshal . WriteRequest
labels [ ] prompbmarshal . Label
samples [ ] prompbmarshal . Sample
}
func ( wc * writeRequestCtx ) reset ( ) {
wc . rows . Reset ( )
2020-09-11 22:36:24 +02:00
wc . resetNoRows ( )
}
func ( wc * writeRequestCtx ) resetNoRows ( ) {
2024-01-14 22:04:45 +01:00
wc . writeRequest . Reset ( )
2022-10-28 20:34:07 +02:00
labels := wc . labels
for i := range labels {
label := & labels [ i ]
label . Name = ""
label . Value = ""
}
2020-08-13 22:12:22 +02:00
wc . labels = wc . labels [ : 0 ]
2022-10-28 20:34:07 +02:00
2020-08-13 22:12:22 +02:00
wc . samples = wc . samples [ : 0 ]
}
2020-08-16 21:27:26 +02:00
var writeRequestCtxPool leveledWriteRequestCtxPool
2020-08-13 22:12:22 +02:00
2021-10-22 12:10:26 +02:00
func ( sw * scrapeWork ) getSeriesAdded ( lastScrape , currScrape string ) int {
2021-09-12 11:49:19 +02:00
if currScrape == "" {
return 0
2020-09-11 22:36:24 +02:00
}
2021-09-12 11:49:19 +02:00
bodyString := parser . GetRowsDiff ( currScrape , lastScrape )
return strings . Count ( bodyString , "\n" )
}
2022-08-17 12:18:47 +02:00
func ( sw * scrapeWork ) applySeriesLimit ( wc * writeRequestCtx ) int {
2023-01-17 19:14:46 +01:00
if sw . Config . SeriesLimit <= 0 {
return 0
2021-09-01 13:14:37 +02:00
}
2023-01-17 19:14:46 +01:00
if sw . seriesLimiter == nil {
sw . seriesLimiter = bloomfilter . NewLimiter ( sw . Config . SeriesLimit , 24 * time . Hour )
2022-08-17 12:18:47 +02:00
}
sl := sw . seriesLimiter
2021-09-01 13:14:37 +02:00
dstSeries := wc . writeRequest . Timeseries [ : 0 ]
2022-08-17 12:18:47 +02:00
samplesDropped := 0
2020-08-13 22:12:22 +02:00
for _ , ts := range wc . writeRequest . Timeseries {
2020-08-10 18:47:43 +02:00
h := sw . getLabelsHash ( ts . Labels )
2022-08-17 12:18:47 +02:00
if ! sl . Add ( h ) {
samplesDropped ++
2021-09-01 13:14:37 +02:00
continue
}
dstSeries = append ( dstSeries , ts )
2020-08-09 11:44:49 +02:00
}
2022-10-28 20:34:07 +02:00
prompbmarshal . ResetTimeSeries ( wc . writeRequest . Timeseries [ len ( dstSeries ) : ] )
2021-09-01 13:14:37 +02:00
wc . writeRequest . Timeseries = dstSeries
2023-01-17 19:14:46 +01:00
if samplesDropped > 0 && ! sw . seriesLimitExceeded {
sw . seriesLimitExceeded = true
}
2022-08-17 12:18:47 +02:00
return samplesDropped
2020-09-11 22:36:24 +02:00
}
2020-08-09 11:44:49 +02:00
2023-01-18 08:09:34 +01:00
var sendStaleSeriesConcurrencyLimitCh = make ( chan struct { } , cgroup . AvailableCPUs ( ) )
2021-10-22 12:10:26 +02:00
func ( sw * scrapeWork ) sendStaleSeries ( lastScrape , currScrape string , timestamp int64 , addAutoSeries bool ) {
2023-01-18 08:09:34 +01:00
// This function is CPU-bound, while it may allocate big amounts of memory.
// That's why it is a good idea to limit the number of concurrent calls to this function
// in order to limit memory usage under high load without sacrificing the performance.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668
sendStaleSeriesConcurrencyLimitCh <- struct { } { }
defer func ( ) {
<- sendStaleSeriesConcurrencyLimitCh
} ( )
2022-10-07 22:36:11 +02:00
if sw . Config . NoStaleMarkers {
2021-08-18 12:43:17 +02:00
return
}
2021-09-11 09:51:04 +02:00
bodyString := lastScrape
if currScrape != "" {
2021-09-12 11:49:19 +02:00
bodyString = parser . GetRowsDiff ( lastScrape , currScrape )
2021-09-11 09:51:04 +02:00
}
2023-01-18 08:09:34 +01:00
wc := writeRequestCtxPool . Get ( sw . prevLabelsLen )
defer func ( ) {
wc . reset ( )
writeRequestCtxPool . Put ( wc )
} ( )
2021-09-11 09:51:04 +02:00
if bodyString != "" {
2023-01-24 06:52:57 +01:00
// Send stale markers in streaming mode in order to reduce memory usage
// when stale markers for targets exposing big number of metrics must be generated.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675
var mu sync . Mutex
br := bytes . NewBufferString ( bodyString )
2023-10-02 21:32:11 +02:00
err := stream . Parse ( br , timestamp , false , false , func ( rows [ ] parser . Row ) error {
2023-01-24 06:52:57 +01:00
mu . Lock ( )
defer mu . Unlock ( )
for i := range rows {
sw . addRowToTimeseries ( wc , & rows [ i ] , timestamp , true )
}
// Apply series limit to stale markers in order to prevent sending stale markers for newly created series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
if sw . seriesLimitExceeded {
sw . applySeriesLimit ( wc )
}
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
setStaleMarkersForRows ( wc . writeRequest . Timeseries )
sw . pushData ( sw . Config . AuthToken , & wc . writeRequest )
wc . resetNoRows ( )
return nil
} , sw . logError )
if err != nil {
2023-10-25 21:24:01 +02:00
sw . logError ( fmt . Errorf ( "cannot send stale markers: %w" , err ) . Error ( ) )
2023-01-24 06:15:59 +01:00
}
2023-01-24 06:52:57 +01:00
}
if addAutoSeries {
am := & autoMetrics { }
sw . addAutoMetrics ( am , wc , timestamp )
}
setStaleMarkersForRows ( wc . writeRequest . Timeseries )
sw . pushData ( sw . Config . AuthToken , & wc . writeRequest )
}
2023-01-24 06:15:59 +01:00
2023-01-24 06:52:57 +01:00
func setStaleMarkersForRows ( series [ ] prompbmarshal . TimeSeries ) {
for _ , tss := range series {
samples := tss . Samples
for i := range samples {
samples [ i ] . Value = decimal . StaleNaN
2021-08-21 20:16:50 +02:00
}
2023-01-24 06:52:57 +01:00
staleSamplesCreated . Add ( len ( samples ) )
2021-08-13 11:10:00 +02:00
}
2020-08-09 11:44:49 +02:00
}
2022-05-06 14:33:13 +02:00
var staleSamplesCreated = metrics . NewCounter ( ` vm_promscrape_stale_samples_created_total ` )
2022-01-13 23:57:44 +01:00
2020-08-10 18:47:43 +02:00
func ( sw * scrapeWork ) getLabelsHash ( labels [ ] prompbmarshal . Label ) uint64 {
2020-08-09 11:44:49 +02:00
// It is OK if there will be hash collisions for distinct sets of labels,
// since the accuracy for `scrape_series_added` metric may be lower than 100%.
2020-08-10 18:47:43 +02:00
b := sw . labelsHashBuf [ : 0 ]
2020-08-09 11:44:49 +02:00
for _ , label := range labels {
2020-08-10 18:47:43 +02:00
b = append ( b , label . Name ... )
b = append ( b , label . Value ... )
2020-08-09 11:44:49 +02:00
}
2020-08-10 18:47:43 +02:00
sw . labelsHashBuf = b
return xxhash . Sum64 ( b )
2020-08-09 11:44:49 +02:00
}
2022-08-17 12:18:47 +02:00
type autoMetrics struct {
up int
scrapeDurationSeconds float64
samplesScraped int
samplesPostRelabeling int
seriesAdded int
seriesLimitSamplesDropped int
}
2022-11-29 03:37:06 +01:00
func isAutoMetric ( s string ) bool {
switch s {
case "up" , "scrape_duration_seconds" , "scrape_samples_scraped" ,
"scrape_samples_post_metric_relabeling" , "scrape_series_added" ,
"scrape_timeout_seconds" , "scrape_samples_limit" ,
"scrape_series_limit_samples_dropped" , "scrape_series_limit" ,
"scrape_series_current" :
return true
}
return false
}
2022-08-17 12:18:47 +02:00
func ( sw * scrapeWork ) addAutoMetrics ( am * autoMetrics , wc * writeRequestCtx , timestamp int64 ) {
sw . addAutoTimeseries ( wc , "up" , float64 ( am . up ) , timestamp )
sw . addAutoTimeseries ( wc , "scrape_duration_seconds" , am . scrapeDurationSeconds , timestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_scraped" , float64 ( am . samplesScraped ) , timestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_post_metric_relabeling" , float64 ( am . samplesPostRelabeling ) , timestamp )
sw . addAutoTimeseries ( wc , "scrape_series_added" , float64 ( am . seriesAdded ) , timestamp )
sw . addAutoTimeseries ( wc , "scrape_timeout_seconds" , sw . Config . ScrapeTimeout . Seconds ( ) , timestamp )
if sampleLimit := sw . Config . SampleLimit ; sampleLimit > 0 {
2023-12-06 08:44:39 +01:00
// Expose scrape_samples_limit metric if sample_limit config is set for the target.
2022-08-17 12:18:47 +02:00
// See https://github.com/VictoriaMetrics/operator/issues/497
sw . addAutoTimeseries ( wc , "scrape_samples_limit" , float64 ( sampleLimit ) , timestamp )
}
if sl := sw . seriesLimiter ; sl != nil {
sw . addAutoTimeseries ( wc , "scrape_series_limit_samples_dropped" , float64 ( am . seriesLimitSamplesDropped ) , timestamp )
sw . addAutoTimeseries ( wc , "scrape_series_limit" , float64 ( sl . MaxItems ( ) ) , timestamp )
sw . addAutoTimeseries ( wc , "scrape_series_current" , float64 ( sl . CurrentItems ( ) ) , timestamp )
}
}
2020-02-23 12:35:47 +01:00
// addAutoTimeseries adds automatically generated time series with the given name, value and timestamp.
//
// See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
2020-08-13 22:12:22 +02:00
func ( sw * scrapeWork ) addAutoTimeseries ( wc * writeRequestCtx , name string , value float64 , timestamp int64 ) {
2020-02-23 12:35:47 +01:00
sw . tmpRow . Metric = name
sw . tmpRow . Tags = nil
sw . tmpRow . Value = value
sw . tmpRow . Timestamp = timestamp
2020-08-13 22:12:22 +02:00
sw . addRowToTimeseries ( wc , & sw . tmpRow , timestamp , false )
2020-02-23 12:35:47 +01:00
}
2020-08-13 22:12:22 +02:00
func ( sw * scrapeWork ) addRowToTimeseries ( wc * writeRequestCtx , r * parser . Row , timestamp int64 , needRelabel bool ) {
2022-11-29 03:37:06 +01:00
metric := r . Metric
2023-02-01 21:00:50 +01:00
// Add `exported_` prefix to metrics, which clash with the automatically generated
// metric names only if the following conditions are met:
//
// - The `honor_labels` option isn't set to true in the scrape_config.
// If `honor_labels: true`, then the scraped metric name must remain unchanged
// because the user explicitly asked about it in the config.
// - The metric has no labels (tags). If it has labels, then the metric value
// will be written into a separate time series comparing to automatically generated time series.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3557
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3406
if needRelabel && ! sw . Config . HonorLabels && len ( r . Tags ) == 0 && isAutoMetric ( metric ) {
bb := bbPool . Get ( )
bb . B = append ( bb . B , "exported_" ... )
bb . B = append ( bb . B , metric ... )
metric = bytesutil . InternBytes ( bb . B )
bbPool . Put ( bb )
2022-11-29 03:37:06 +01:00
}
2020-08-13 22:12:22 +02:00
labelsLen := len ( wc . labels )
2022-11-30 06:22:12 +01:00
targetLabels := sw . Config . Labels . GetLabels ( )
wc . labels = appendLabels ( wc . labels , metric , r . Tags , targetLabels , sw . Config . HonorLabels )
2020-06-29 21:29:29 +02:00
if needRelabel {
2022-10-09 13:51:14 +02:00
wc . labels = sw . Config . MetricRelabelConfigs . Apply ( wc . labels , labelsLen )
2020-06-29 21:29:29 +02:00
}
2022-10-09 13:51:14 +02:00
wc . labels = promrelabel . FinalizeLabels ( wc . labels [ : labelsLen ] , wc . labels [ labelsLen : ] )
2020-08-13 22:12:22 +02:00
if len ( wc . labels ) == labelsLen {
2020-02-23 12:35:47 +01:00
// Skip row without labels.
return
}
2022-10-01 15:13:17 +02:00
// Add labels from `global->external_labels` section after the relabeling like Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3137
2022-11-30 06:22:12 +01:00
externalLabels := sw . Config . ExternalLabels . GetLabels ( )
wc . labels = appendExtraLabels ( wc . labels , externalLabels , labelsLen , sw . Config . HonorLabels )
2020-09-11 22:36:24 +02:00
sampleTimestamp := r . Timestamp
if ! sw . Config . HonorTimestamps || sampleTimestamp == 0 {
sampleTimestamp = timestamp
2020-02-23 12:35:47 +01:00
}
2020-09-11 22:36:24 +02:00
wc . samples = append ( wc . samples , prompbmarshal . Sample {
Value : r . Value ,
Timestamp : sampleTimestamp ,
} )
2020-08-13 22:12:22 +02:00
wr := & wc . writeRequest
2020-09-11 22:36:24 +02:00
wr . Timeseries = append ( wr . Timeseries , prompbmarshal . TimeSeries {
Labels : wc . labels [ labelsLen : ] ,
Samples : wc . samples [ len ( wc . samples ) - 1 : ] ,
} )
2020-02-23 12:35:47 +01:00
}
2022-11-29 03:37:06 +01:00
var bbPool bytesutil . ByteBufferPool
2020-02-23 12:35:47 +01:00
func appendLabels ( dst [ ] prompbmarshal . Label , metric string , src [ ] parser . Tag , extraLabels [ ] prompbmarshal . Label , honorLabels bool ) [ ] prompbmarshal . Label {
dstLen := len ( dst )
dst = append ( dst , prompbmarshal . Label {
Name : "__name__" ,
Value : metric ,
} )
for i := range src {
tag := & src [ i ]
dst = append ( dst , prompbmarshal . Label {
Name : tag . Key ,
Value : tag . Value ,
} )
}
2022-10-01 15:13:17 +02:00
return appendExtraLabels ( dst , extraLabels , dstLen , honorLabels )
}
func appendExtraLabels ( dst , extraLabels [ ] prompbmarshal . Label , offset int , honorLabels bool ) [ ] prompbmarshal . Label {
// Add extraLabels to labels.
// Handle duplicates in the same way as Prometheus does.
2022-10-28 20:34:07 +02:00
if len ( dst ) == offset {
2022-10-01 15:13:17 +02:00
// Fast path - add extraLabels to dst without the need to de-duplicate.
dst = append ( dst , extraLabels ... )
2020-02-23 12:35:47 +01:00
return dst
}
2022-10-28 20:34:07 +02:00
offsetEnd := len ( dst )
2022-10-01 15:13:17 +02:00
for _ , label := range extraLabels {
2022-10-28 20:34:07 +02:00
labels := dst [ offset : offsetEnd ]
2022-10-01 15:13:17 +02:00
prevLabel := promrelabel . GetLabelByName ( labels , label . Name )
2020-02-23 12:35:47 +01:00
if prevLabel == nil {
2022-10-01 15:13:17 +02:00
// Fast path - the label doesn't exist in labels, so just add it to dst.
dst = append ( dst , label )
2020-02-23 12:35:47 +01:00
continue
}
if honorLabels {
// Skip the extra label with the same name.
continue
}
2022-10-01 15:13:17 +02:00
// Rename the prevLabel to "exported_" + label.Name
2020-02-23 12:35:47 +01:00
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
exportedName := "exported_" + label . Name
2022-10-01 15:13:17 +02:00
exportedLabel := promrelabel . GetLabelByName ( labels , exportedName )
2022-10-28 20:34:07 +02:00
if exportedLabel != nil {
// The label with the name exported_<label.Name> already exists.
// Add yet another 'exported_' prefix to it.
exportedLabel . Name = "exported_" + exportedName
2020-02-23 12:35:47 +01:00
}
2022-10-28 20:34:07 +02:00
prevLabel . Name = exportedName
dst = append ( dst , label )
2020-02-23 12:35:47 +01:00
}
2022-10-01 15:13:17 +02:00
return dst
2020-02-23 12:35:47 +01:00
}