2020-02-23 12:35:47 +01:00
package promscrape
import (
2020-04-16 22:41:16 +02:00
"flag"
2020-04-16 22:34:37 +02:00
"fmt"
2020-08-10 11:31:59 +02:00
"math"
2020-08-16 21:27:26 +02:00
"math/bits"
2020-11-04 09:38:09 +01:00
"strconv"
2020-08-13 22:12:22 +02:00
"sync"
2020-02-23 12:35:47 +01:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2021-08-13 11:10:00 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-08-13 22:12:22 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-04-13 11:59:05 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
2020-12-24 09:56:10 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
2021-01-26 23:23:10 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/metrics"
2020-05-03 13:29:26 +02:00
xxhash "github.com/cespare/xxhash/v2"
2020-02-23 12:35:47 +01:00
)
2020-04-16 22:41:16 +02:00
var (
suppressScrapeErrors = flag . Bool ( "promscrape.suppressScrapeErrors" , false , "Whether to suppress scrape errors logging. " +
"The last error for each target is always available at '/targets' page even if scrape errors logging is suppressed" )
)
2020-02-23 12:35:47 +01:00
// ScrapeWork represents a unit of work for scraping Prometheus metrics.
2020-12-17 13:30:33 +01:00
//
// It must be immutable during its lifetime, since it is read from concurrently running goroutines.
2020-02-23 12:35:47 +01:00
type ScrapeWork struct {
// Full URL (including query args) for the scrape.
ScrapeURL string
// Interval for scraping the ScrapeURL.
ScrapeInterval time . Duration
// Timeout for scraping the ScrapeURL.
ScrapeTimeout time . Duration
// How to deal with conflicting labels.
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
HonorLabels bool
// How to deal with scraped timestamps.
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
HonorTimestamps bool
2021-04-02 18:56:38 +02:00
// Whether to deny redirects during requests to scrape config.
DenyRedirects bool
2020-10-08 17:50:22 +02:00
// OriginalLabels contains original labels before relabeling.
//
// These labels are needed for relabeling troubleshooting at /targets page.
OriginalLabels [ ] prompbmarshal . Label
2020-02-23 12:35:47 +01:00
// Labels to add to the scraped metrics.
//
// The list contains at least the following labels according to https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
//
// * job
// * __address__
// * __scheme__
// * __metrics_path__
// * __param_<name>
// * __meta_*
// * user-defined labels set via `relabel_configs` section in `scrape_config`
//
// See also https://prometheus.io/docs/concepts/jobs_instances/
Labels [ ] prompbmarshal . Label
2020-12-24 09:52:37 +01:00
// ProxyURL HTTP proxy url
2020-12-24 09:56:10 +01:00
ProxyURL proxy . URL
2020-12-24 09:52:37 +01:00
2021-03-12 02:35:49 +01:00
// Auth config for ProxyUR:
ProxyAuthConfig * promauth . Config
// Auth config
AuthConfig * promauth . Config
2020-02-23 12:35:47 +01:00
// Optional `metric_relabel_configs`.
2021-02-22 15:33:55 +01:00
MetricRelabelConfigs * promrelabel . ParsedConfigs
2020-02-23 12:35:47 +01:00
// The maximum number of metrics to scrape after relabeling.
2020-04-14 10:58:15 +02:00
SampleLimit int
2020-06-23 14:35:19 +02:00
2020-07-02 13:19:11 +02:00
// Whether to disable response compression when querying ScrapeURL.
DisableCompression bool
// Whether to disable HTTP keep-alive when querying ScrapeURL.
DisableKeepAlive bool
2020-11-01 22:12:13 +01:00
// Whether to parse target responses in a streaming manner.
StreamParse bool
2021-02-18 22:51:29 +01:00
// The interval for aligning the first scrape.
ScrapeAlignInterval time . Duration
2021-03-08 10:58:25 +01:00
// The offset for the first scrape.
ScrapeOffset time . Duration
2020-06-23 14:35:19 +02:00
// The original 'job_name'
jobNameOriginal string
2020-02-23 12:35:47 +01:00
}
2020-05-03 11:41:13 +02:00
// key returns unique identifier for the given sw.
//
2020-11-01 22:12:13 +01:00
// it can be used for comparing for equality for two ScrapeWork objects.
2020-05-03 11:41:13 +02:00
func ( sw * ScrapeWork ) key ( ) string {
2020-10-08 17:50:22 +02:00
// Do not take into account OriginalLabels.
2021-04-02 18:56:38 +02:00
key := fmt . Sprintf ( "ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, DenyRedirects=%v, Labels=%s, " +
2021-03-12 02:35:49 +01:00
"ProxyURL=%s, ProxyAuthConfig=%s, AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v, StreamParse=%v, " +
2021-03-08 10:58:25 +01:00
"ScrapeAlignInterval=%s, ScrapeOffset=%s" ,
2021-04-02 18:56:38 +02:00
sw . ScrapeURL , sw . ScrapeInterval , sw . ScrapeTimeout , sw . HonorLabels , sw . HonorTimestamps , sw . DenyRedirects , sw . LabelsString ( ) ,
2021-03-12 02:35:49 +01:00
sw . ProxyURL . String ( ) , sw . ProxyAuthConfig . String ( ) ,
2021-03-08 10:58:25 +01:00
sw . AuthConfig . String ( ) , sw . MetricRelabelConfigs . String ( ) , sw . SampleLimit , sw . DisableCompression , sw . DisableKeepAlive , sw . StreamParse ,
sw . ScrapeAlignInterval , sw . ScrapeOffset )
2020-05-03 11:41:13 +02:00
return key
}
2020-04-14 12:32:55 +02:00
// Job returns job for the ScrapeWork
func ( sw * ScrapeWork ) Job ( ) string {
2020-04-14 13:11:54 +02:00
return promrelabel . GetLabelValueByName ( sw . Labels , "job" )
2020-04-14 12:32:55 +02:00
}
2020-04-16 22:34:37 +02:00
// LabelsString returns labels in Prometheus format for the given sw.
func ( sw * ScrapeWork ) LabelsString ( ) string {
2020-10-08 17:50:22 +02:00
labelsFinalized := promrelabel . FinalizeLabels ( nil , sw . Labels )
return promLabelsString ( labelsFinalized )
}
func promLabelsString ( labels [ ] prompbmarshal . Label ) string {
2020-11-04 09:38:09 +01:00
// Calculate the required memory for storing serialized labels.
n := 2 // for `{...}`
2020-10-08 17:50:22 +02:00
for _ , label := range labels {
2020-11-04 09:38:09 +01:00
n += len ( label . Name ) + len ( label . Value )
n += 4 // for `="...",`
2020-04-16 22:34:37 +02:00
}
2020-11-04 09:38:09 +01:00
b := make ( [ ] byte , 0 , n )
b = append ( b , '{' )
for i , label := range labels {
b = append ( b , label . Name ... )
b = append ( b , '=' )
b = strconv . AppendQuote ( b , label . Value )
if i + 1 < len ( labels ) {
b = append ( b , ',' )
}
}
b = append ( b , '}' )
return bytesutil . ToUnsafeString ( b )
2020-04-16 22:34:37 +02:00
}
2020-02-23 12:35:47 +01:00
type scrapeWork struct {
// Config for the scrape.
2020-12-17 13:30:33 +01:00
Config * ScrapeWork
2020-02-23 12:35:47 +01:00
// ReadData is called for reading the data.
ReadData func ( dst [ ] byte ) ( [ ] byte , error )
2020-11-01 22:12:13 +01:00
// GetStreamReader is called if Config.StreamParse is set.
GetStreamReader func ( ) ( * streamReader , error )
2020-02-23 12:35:47 +01:00
// PushData is called for pushing collected data.
2021-08-05 08:46:19 +02:00
PushData func ( wr * prompbmarshal . WriteRequest )
2020-02-23 12:35:47 +01:00
2020-07-13 20:52:03 +02:00
// ScrapeGroup is name of ScrapeGroup that
// scrapeWork belongs to
ScrapeGroup string
2020-08-13 22:12:22 +02:00
tmpRow parser . Row
2020-08-09 11:44:49 +02:00
2020-09-11 22:36:24 +02:00
// the seriesMap, seriesAdded and labelsHashBuf are used for fast calculation of `scrape_series_added` metric.
seriesMap map [ uint64 ] struct { }
seriesAdded int
2020-08-10 18:47:43 +02:00
labelsHashBuf [ ] byte
2020-08-13 22:12:22 +02:00
2020-08-14 00:16:18 +02:00
// prevBodyLen contains the previous response body length for the given scrape work.
2020-08-13 22:12:22 +02:00
// It is used as a hint in order to reduce memory usage for body buffers.
2020-08-14 00:16:18 +02:00
prevBodyLen int
2020-08-16 21:27:26 +02:00
2021-03-14 21:56:23 +01:00
// prevLabelsLen contains the number labels scraped during the previous scrape.
2020-08-16 21:27:26 +02:00
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
2021-03-14 21:56:23 +01:00
prevLabelsLen int
2021-08-13 11:10:00 +02:00
activeSeriesBuf [ ] byte
activeSeries [ ] [ ] byte
2020-02-23 12:35:47 +01:00
}
func ( sw * scrapeWork ) run ( stopCh <- chan struct { } ) {
2021-02-18 22:51:29 +01:00
var randSleep uint64
2021-03-08 10:58:25 +01:00
scrapeInterval := sw . Config . ScrapeInterval
scrapeAlignInterval := sw . Config . ScrapeAlignInterval
scrapeOffset := sw . Config . ScrapeOffset
if scrapeOffset > 0 {
scrapeAlignInterval = scrapeInterval
}
if scrapeAlignInterval <= 0 {
2021-02-18 22:51:29 +01:00
// Calculate start time for the first scrape from ScrapeURL and labels.
// This should spread load when scraping many targets with different
// scrape urls and labels.
// This also makes consistent scrape times across restarts
// for a target with the same ScrapeURL and labels.
key := fmt . Sprintf ( "ScrapeURL=%s, Labels=%s" , sw . Config . ScrapeURL , sw . Config . LabelsString ( ) )
2021-02-28 17:39:57 +01:00
h := uint32 ( xxhash . Sum64 ( bytesutil . ToUnsafeBytes ( key ) ) )
2021-02-18 23:33:37 +01:00
randSleep = uint64 ( float64 ( scrapeInterval ) * ( float64 ( h ) / ( 1 << 32 ) ) )
2021-02-18 22:51:29 +01:00
sleepOffset := uint64 ( time . Now ( ) . UnixNano ( ) ) % uint64 ( scrapeInterval )
if randSleep < sleepOffset {
randSleep += uint64 ( scrapeInterval )
}
randSleep -= sleepOffset
} else {
2021-03-08 10:58:25 +01:00
d := uint64 ( scrapeAlignInterval )
2021-02-18 22:51:29 +01:00
randSleep = d - uint64 ( time . Now ( ) . UnixNano ( ) ) % d
2021-03-08 10:58:25 +01:00
if scrapeOffset > 0 {
randSleep += uint64 ( scrapeOffset )
}
2021-02-18 22:51:29 +01:00
randSleep %= uint64 ( scrapeInterval )
2020-05-03 13:29:26 +02:00
}
2021-01-26 23:23:10 +01:00
timer := timerpool . Get ( time . Duration ( randSleep ) )
2020-04-01 15:10:35 +02:00
var timestamp int64
2020-02-23 12:35:47 +01:00
var ticker * time . Ticker
select {
case <- stopCh :
2021-01-26 23:23:10 +01:00
timerpool . Put ( timer )
2020-02-23 12:35:47 +01:00
return
2020-04-01 15:10:35 +02:00
case <- timer . C :
2021-01-26 23:23:10 +01:00
timerpool . Put ( timer )
2020-04-01 15:10:35 +02:00
ticker = time . NewTicker ( scrapeInterval )
timestamp = time . Now ( ) . UnixNano ( ) / 1e6
2020-08-10 11:31:59 +02:00
sw . scrapeAndLogError ( timestamp , timestamp )
2020-02-23 12:35:47 +01:00
}
defer ticker . Stop ( )
for {
2020-04-01 15:10:35 +02:00
timestamp += scrapeInterval . Milliseconds ( )
2020-02-23 12:35:47 +01:00
select {
case <- stopCh :
2021-08-13 11:10:00 +02:00
sw . sendStaleMarkers ( )
2020-02-23 12:35:47 +01:00
return
2020-08-10 11:31:59 +02:00
case tt := <- ticker . C :
t := tt . UnixNano ( ) / 1e6
if d := math . Abs ( float64 ( t - timestamp ) ) ; d > 0 && d / float64 ( scrapeInterval . Milliseconds ( ) ) > 0.1 {
2020-04-01 15:10:35 +02:00
// Too big jitter. Adjust timestamp
timestamp = t
2020-02-23 12:35:47 +01:00
}
2020-08-10 11:31:59 +02:00
sw . scrapeAndLogError ( timestamp , t )
2020-02-23 12:35:47 +01:00
}
}
}
func ( sw * scrapeWork ) logError ( s string ) {
2020-04-16 22:41:16 +02:00
if ! * suppressScrapeErrors {
2020-12-06 22:26:34 +01:00
logger . ErrorfSkipframes ( 1 , "error when scraping %q from job %q with labels %s: %s; " +
"scrape errors can be disabled by -promscrape.suppressScrapeErrors command-line flag" ,
sw . Config . ScrapeURL , sw . Config . Job ( ) , sw . Config . LabelsString ( ) , s )
2020-04-16 22:41:16 +02:00
}
2020-02-23 12:35:47 +01:00
}
2020-08-10 11:31:59 +02:00
func ( sw * scrapeWork ) scrapeAndLogError ( scrapeTimestamp , realTimestamp int64 ) {
if err := sw . scrapeInternal ( scrapeTimestamp , realTimestamp ) ; err != nil && ! * suppressScrapeErrors {
2020-04-16 22:34:37 +02:00
logger . Errorf ( "error when scraping %q from job %q with labels %s: %s" , sw . Config . ScrapeURL , sw . Config . Job ( ) , sw . Config . LabelsString ( ) , err )
2020-02-23 12:35:47 +01:00
}
}
var (
scrapeDuration = metrics . NewHistogram ( "vm_promscrape_scrape_duration_seconds" )
scrapeResponseSize = metrics . NewHistogram ( "vm_promscrape_scrape_response_size_bytes" )
scrapedSamples = metrics . NewHistogram ( "vm_promscrape_scraped_samples" )
2020-04-14 10:58:15 +02:00
scrapesSkippedBySampleLimit = metrics . NewCounter ( "vm_promscrape_scrapes_skipped_by_sample_limit_total" )
2020-02-23 12:35:47 +01:00
scrapesFailed = metrics . NewCounter ( "vm_promscrape_scrapes_failed_total" )
pushDataDuration = metrics . NewHistogram ( "vm_promscrape_push_data_duration_seconds" )
)
2020-08-10 11:31:59 +02:00
func ( sw * scrapeWork ) scrapeInternal ( scrapeTimestamp , realTimestamp int64 ) error {
2020-11-01 22:12:13 +01:00
if * streamParse || sw . Config . StreamParse {
// Read data from scrape targets in streaming manner.
// This case is optimized for targets exposing millions and more of metrics per target.
return sw . scrapeStream ( scrapeTimestamp , realTimestamp )
}
// Common case: read all the data from scrape target to memory (body) and then process it.
// This case should work more optimally for than stream parse code above for common case when scrape target exposes
2020-11-26 12:33:46 +01:00
// up to a few thousand metrics.
2020-08-14 00:16:18 +02:00
body := leveledbytebufferpool . Get ( sw . prevBodyLen )
2020-02-23 12:35:47 +01:00
var err error
2020-08-13 22:12:22 +02:00
body . B , err = sw . ReadData ( body . B [ : 0 ] )
2020-02-23 12:35:47 +01:00
endTimestamp := time . Now ( ) . UnixNano ( ) / 1e6
2020-08-10 11:31:59 +02:00
duration := float64 ( endTimestamp - realTimestamp ) / 1e3
2020-02-23 12:35:47 +01:00
scrapeDuration . Update ( duration )
2020-08-13 22:12:22 +02:00
scrapeResponseSize . Update ( float64 ( len ( body . B ) ) )
2020-02-23 12:35:47 +01:00
up := 1
2021-03-14 21:56:23 +01:00
wc := writeRequestCtxPool . Get ( sw . prevLabelsLen )
2020-02-23 12:35:47 +01:00
if err != nil {
up = 0
scrapesFailed . Inc ( )
} else {
2020-08-13 22:12:22 +02:00
bodyString := bytesutil . ToUnsafeString ( body . B )
wc . rows . UnmarshalWithErrLogger ( bodyString , sw . logError )
2020-02-23 12:35:47 +01:00
}
2020-08-13 22:12:22 +02:00
srcRows := wc . rows . Rows
2020-02-23 12:35:47 +01:00
samplesScraped := len ( srcRows )
scrapedSamples . Update ( float64 ( samplesScraped ) )
for i := range srcRows {
2020-08-13 22:12:22 +02:00
sw . addRowToTimeseries ( wc , & srcRows [ i ] , scrapeTimestamp , true )
2020-02-23 12:35:47 +01:00
}
2021-03-14 21:56:23 +01:00
samplesPostRelabeling := len ( wc . writeRequest . Timeseries )
2021-03-09 14:47:15 +01:00
if sw . Config . SampleLimit > 0 && samplesPostRelabeling > sw . Config . SampleLimit {
wc . resetNoRows ( )
up = 0
scrapesSkippedBySampleLimit . Inc ( )
2021-05-27 13:52:44 +02:00
err = fmt . Errorf ( "the response from %q exceeds sample_limit=%d; " +
"either reduce the sample count for the target or increase sample_limit" , sw . Config . ScrapeURL , sw . Config . SampleLimit )
2021-03-09 14:47:15 +01:00
}
2020-09-11 22:36:24 +02:00
sw . updateSeriesAdded ( wc )
seriesAdded := sw . finalizeSeriesAdded ( samplesPostRelabeling )
2020-08-13 22:12:22 +02:00
sw . addAutoTimeseries ( wc , "up" , float64 ( up ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_duration_seconds" , duration , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_scraped" , float64 ( samplesScraped ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_post_metric_relabeling" , float64 ( samplesPostRelabeling ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_series_added" , float64 ( seriesAdded ) , scrapeTimestamp )
2021-08-13 11:10:00 +02:00
sw . updateActiveSeries ( wc )
sw . pushData ( & wc . writeRequest )
2021-03-14 21:56:23 +01:00
sw . prevLabelsLen = len ( wc . labels )
2020-08-13 22:12:22 +02:00
wc . reset ( )
writeRequestCtxPool . Put ( wc )
// body must be released only after wc is released, since wc refers to body.
2020-08-14 00:16:18 +02:00
sw . prevBodyLen = len ( body . B )
2020-08-13 22:12:22 +02:00
leveledbytebufferpool . Put ( body )
2021-06-14 13:01:13 +02:00
tsmGlobal . Update ( sw . Config , sw . ScrapeGroup , up == 1 , realTimestamp , int64 ( duration * 1000 ) , samplesScraped , err )
2020-02-23 12:35:47 +01:00
return err
}
2021-08-13 11:10:00 +02:00
func ( sw * scrapeWork ) pushData ( wr * prompbmarshal . WriteRequest ) {
startTime := time . Now ( )
sw . PushData ( wr )
pushDataDuration . UpdateDuration ( startTime )
}
2020-11-01 22:12:13 +01:00
func ( sw * scrapeWork ) scrapeStream ( scrapeTimestamp , realTimestamp int64 ) error {
samplesScraped := 0
samplesPostRelabeling := 0
2020-12-15 13:08:06 +01:00
responseSize := int64 ( 0 )
2021-03-14 21:56:23 +01:00
wc := writeRequestCtxPool . Get ( sw . prevLabelsLen )
2020-12-15 13:08:06 +01:00
sr , err := sw . GetStreamReader ( )
if err != nil {
err = fmt . Errorf ( "cannot read data: %s" , err )
} else {
var mu sync . Mutex
err = parser . ParseStream ( sr , scrapeTimestamp , false , func ( rows [ ] parser . Row ) error {
mu . Lock ( )
defer mu . Unlock ( )
samplesScraped += len ( rows )
for i := range rows {
sw . addRowToTimeseries ( wc , & rows [ i ] , scrapeTimestamp , true )
}
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
samplesPostRelabeling += len ( wc . writeRequest . Timeseries )
2021-05-27 13:52:44 +02:00
if sw . Config . SampleLimit > 0 && samplesPostRelabeling > sw . Config . SampleLimit {
wc . resetNoRows ( )
scrapesSkippedBySampleLimit . Inc ( )
return fmt . Errorf ( "the response from %q exceeds sample_limit=%d; " +
"either reduce the sample count for the target or increase sample_limit" , sw . Config . ScrapeURL , sw . Config . SampleLimit )
}
2020-12-15 13:08:06 +01:00
sw . updateSeriesAdded ( wc )
2021-08-13 11:10:00 +02:00
sw . pushData ( & wc . writeRequest )
2020-12-15 13:08:06 +01:00
wc . resetNoRows ( )
return nil
2021-01-12 12:31:47 +01:00
} , sw . logError )
2020-12-15 13:08:06 +01:00
responseSize = sr . bytesRead
sr . MustClose ( )
}
2020-11-01 22:12:13 +01:00
scrapedSamples . Update ( float64 ( samplesScraped ) )
endTimestamp := time . Now ( ) . UnixNano ( ) / 1e6
duration := float64 ( endTimestamp - realTimestamp ) / 1e3
scrapeDuration . Update ( duration )
2020-12-15 13:08:06 +01:00
scrapeResponseSize . Update ( float64 ( responseSize ) )
2020-11-01 22:12:13 +01:00
up := 1
if err != nil {
if samplesScraped == 0 {
up = 0
}
scrapesFailed . Inc ( )
}
seriesAdded := sw . finalizeSeriesAdded ( samplesPostRelabeling )
sw . addAutoTimeseries ( wc , "up" , float64 ( up ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_duration_seconds" , duration , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_scraped" , float64 ( samplesScraped ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_samples_post_metric_relabeling" , float64 ( samplesPostRelabeling ) , scrapeTimestamp )
sw . addAutoTimeseries ( wc , "scrape_series_added" , float64 ( seriesAdded ) , scrapeTimestamp )
2021-08-13 11:10:00 +02:00
// Do not call sw.updateActiveSeries(wc), since wc doesn't contain the full list of scraped metrics.
// Do not track active series in streaming mode, since this may need too big amounts of memory
// when the target exports too big number of metrics.
sw . pushData ( & wc . writeRequest )
2021-03-14 21:56:23 +01:00
sw . prevLabelsLen = len ( wc . labels )
2020-11-01 22:12:13 +01:00
wc . reset ( )
writeRequestCtxPool . Put ( wc )
2021-06-14 13:01:13 +02:00
tsmGlobal . Update ( sw . Config , sw . ScrapeGroup , up == 1 , realTimestamp , int64 ( duration * 1000 ) , samplesScraped , err )
2021-01-12 12:31:47 +01:00
return err
2020-11-01 22:12:13 +01:00
}
2020-09-01 09:55:21 +02:00
// leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx
2020-08-16 21:27:26 +02:00
// structs contain mixed number of labels.
//
// Its logic has been copied from leveledbytebufferpool.
type leveledWriteRequestCtxPool struct {
2021-03-14 21:56:23 +01:00
pools [ 13 ] sync . Pool
2020-08-16 21:27:26 +02:00
}
2021-03-14 21:56:23 +01:00
func ( lwp * leveledWriteRequestCtxPool ) Get ( labelsCapacity int ) * writeRequestCtx {
id , capacityNeeded := lwp . getPoolIDAndCapacity ( labelsCapacity )
2020-08-16 21:27:26 +02:00
for i := 0 ; i < 2 ; i ++ {
if id < 0 || id >= len ( lwp . pools ) {
break
}
if v := lwp . pools [ id ] . Get ( ) ; v != nil {
return v . ( * writeRequestCtx )
}
id ++
}
return & writeRequestCtx {
labels : make ( [ ] prompbmarshal . Label , 0 , capacityNeeded ) ,
}
}
func ( lwp * leveledWriteRequestCtxPool ) Put ( wc * writeRequestCtx ) {
2021-03-14 21:56:23 +01:00
capacity := cap ( wc . labels )
id , poolCapacity := lwp . getPoolIDAndCapacity ( capacity )
if capacity <= poolCapacity {
wc . reset ( )
lwp . pools [ id ] . Put ( wc )
}
2020-08-16 21:27:26 +02:00
}
2020-08-28 08:44:08 +02:00
func ( lwp * leveledWriteRequestCtxPool ) getPoolIDAndCapacity ( size int ) ( int , int ) {
2020-08-16 21:27:26 +02:00
size --
if size < 0 {
size = 0
}
size >>= 3
id := bits . Len ( uint ( size ) )
2021-03-14 21:56:23 +01:00
if id >= len ( lwp . pools ) {
2020-08-16 21:27:26 +02:00
id = len ( lwp . pools ) - 1
}
return id , ( 1 << ( id + 3 ) )
}
2020-08-13 22:12:22 +02:00
type writeRequestCtx struct {
rows parser . Rows
writeRequest prompbmarshal . WriteRequest
labels [ ] prompbmarshal . Label
samples [ ] prompbmarshal . Sample
}
func ( wc * writeRequestCtx ) reset ( ) {
wc . rows . Reset ( )
2020-09-11 22:36:24 +02:00
wc . resetNoRows ( )
}
func ( wc * writeRequestCtx ) resetNoRows ( ) {
2020-08-13 22:12:22 +02:00
prompbmarshal . ResetWriteRequest ( & wc . writeRequest )
wc . labels = wc . labels [ : 0 ]
wc . samples = wc . samples [ : 0 ]
}
2020-08-16 21:27:26 +02:00
var writeRequestCtxPool leveledWriteRequestCtxPool
2020-08-13 22:12:22 +02:00
2020-09-11 22:36:24 +02:00
func ( sw * scrapeWork ) updateSeriesAdded ( wc * writeRequestCtx ) {
if sw . seriesMap == nil {
sw . seriesMap = make ( map [ uint64 ] struct { } , len ( wc . writeRequest . Timeseries ) )
}
m := sw . seriesMap
2020-08-13 22:12:22 +02:00
for _ , ts := range wc . writeRequest . Timeseries {
2020-08-10 18:47:43 +02:00
h := sw . getLabelsHash ( ts . Labels )
2020-09-11 22:36:24 +02:00
if _ , ok := m [ h ] ; ! ok {
m [ h ] = struct { } { }
sw . seriesAdded ++
2020-08-09 11:44:49 +02:00
}
}
2020-09-11 22:36:24 +02:00
}
2020-08-09 11:44:49 +02:00
2021-08-13 11:10:00 +02:00
func ( sw * scrapeWork ) updateActiveSeries ( wc * writeRequestCtx ) {
b := sw . activeSeriesBuf [ : 0 ]
as := sw . activeSeries [ : 0 ]
for _ , ts := range wc . writeRequest . Timeseries {
bLen := len ( b )
for _ , label := range ts . Labels {
b = encoding . MarshalBytes ( b , bytesutil . ToUnsafeBytes ( label . Name ) )
b = encoding . MarshalBytes ( b , bytesutil . ToUnsafeBytes ( label . Value ) )
}
as = append ( as , b [ bLen : ] )
}
sw . activeSeriesBuf = b
sw . activeSeries = as
}
func ( sw * scrapeWork ) sendStaleMarkers ( ) {
series := make ( [ ] prompbmarshal . TimeSeries , 0 , len ( sw . activeSeries ) )
staleMarkSamples := [ ] prompbmarshal . Sample {
{
Value : decimal . StaleNaN ,
Timestamp : time . Now ( ) . UnixNano ( ) / 1e6 ,
} ,
}
for _ , b := range sw . activeSeries {
var labels [ ] prompbmarshal . Label
for len ( b ) > 0 {
tail , name , err := encoding . UnmarshalBytes ( b )
if err != nil {
logger . Panicf ( "BUG: cannot unmarshal label name from activeSeries: %s" , err )
}
b = tail
tail , value , err := encoding . UnmarshalBytes ( b )
if err != nil {
logger . Panicf ( "BUG: cannot unmarshal label value from activeSeries: %s" , err )
}
b = tail
labels = append ( labels , prompbmarshal . Label {
Name : bytesutil . ToUnsafeString ( name ) ,
Value : bytesutil . ToUnsafeString ( value ) ,
} )
}
series = append ( series , prompbmarshal . TimeSeries {
Labels : labels ,
Samples : staleMarkSamples ,
} )
}
wr := & prompbmarshal . WriteRequest {
Timeseries : series ,
}
sw . pushData ( wr )
}
2020-09-11 22:36:24 +02:00
func ( sw * scrapeWork ) finalizeSeriesAdded ( lastScrapeSize int ) int {
seriesAdded := sw . seriesAdded
sw . seriesAdded = 0
2020-09-11 23:14:04 +02:00
if len ( sw . seriesMap ) > 4 * lastScrapeSize {
2020-09-11 22:36:24 +02:00
// Reset seriesMap, since it occupies more than 4x metrics collected during the last scrape.
sw . seriesMap = make ( map [ uint64 ] struct { } , lastScrapeSize )
2020-08-09 11:44:49 +02:00
}
return seriesAdded
}
2020-08-10 18:47:43 +02:00
func ( sw * scrapeWork ) getLabelsHash ( labels [ ] prompbmarshal . Label ) uint64 {
2020-08-09 11:44:49 +02:00
// It is OK if there will be hash collisions for distinct sets of labels,
// since the accuracy for `scrape_series_added` metric may be lower than 100%.
2020-08-10 18:47:43 +02:00
b := sw . labelsHashBuf [ : 0 ]
2020-08-09 11:44:49 +02:00
for _ , label := range labels {
2020-08-10 18:47:43 +02:00
b = append ( b , label . Name ... )
b = append ( b , label . Value ... )
2020-08-09 11:44:49 +02:00
}
2020-08-10 18:47:43 +02:00
sw . labelsHashBuf = b
return xxhash . Sum64 ( b )
2020-08-09 11:44:49 +02:00
}
2020-02-23 12:35:47 +01:00
// addAutoTimeseries adds automatically generated time series with the given name, value and timestamp.
//
// See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
2020-08-13 22:12:22 +02:00
func ( sw * scrapeWork ) addAutoTimeseries ( wc * writeRequestCtx , name string , value float64 , timestamp int64 ) {
2020-02-23 12:35:47 +01:00
sw . tmpRow . Metric = name
sw . tmpRow . Tags = nil
sw . tmpRow . Value = value
sw . tmpRow . Timestamp = timestamp
2020-08-13 22:12:22 +02:00
sw . addRowToTimeseries ( wc , & sw . tmpRow , timestamp , false )
2020-02-23 12:35:47 +01:00
}
2020-08-13 22:12:22 +02:00
func ( sw * scrapeWork ) addRowToTimeseries ( wc * writeRequestCtx , r * parser . Row , timestamp int64 , needRelabel bool ) {
labelsLen := len ( wc . labels )
wc . labels = appendLabels ( wc . labels , r . Metric , r . Tags , sw . Config . Labels , sw . Config . HonorLabels )
2020-06-29 21:29:29 +02:00
if needRelabel {
2021-06-04 19:27:55 +02:00
wc . labels = sw . Config . MetricRelabelConfigs . Apply ( wc . labels , labelsLen , true )
2020-06-29 21:29:29 +02:00
} else {
2020-08-13 22:12:22 +02:00
wc . labels = promrelabel . FinalizeLabels ( wc . labels [ : labelsLen ] , wc . labels [ labelsLen : ] )
promrelabel . SortLabels ( wc . labels [ labelsLen : ] )
2020-06-29 21:29:29 +02:00
}
2020-08-13 22:12:22 +02:00
if len ( wc . labels ) == labelsLen {
2020-02-23 12:35:47 +01:00
// Skip row without labels.
return
}
2020-09-11 22:36:24 +02:00
sampleTimestamp := r . Timestamp
if ! sw . Config . HonorTimestamps || sampleTimestamp == 0 {
sampleTimestamp = timestamp
2020-02-23 12:35:47 +01:00
}
2020-09-11 22:36:24 +02:00
wc . samples = append ( wc . samples , prompbmarshal . Sample {
Value : r . Value ,
Timestamp : sampleTimestamp ,
} )
2020-08-13 22:12:22 +02:00
wr := & wc . writeRequest
2020-09-11 22:36:24 +02:00
wr . Timeseries = append ( wr . Timeseries , prompbmarshal . TimeSeries {
Labels : wc . labels [ labelsLen : ] ,
Samples : wc . samples [ len ( wc . samples ) - 1 : ] ,
} )
2020-02-23 12:35:47 +01:00
}
func appendLabels ( dst [ ] prompbmarshal . Label , metric string , src [ ] parser . Tag , extraLabels [ ] prompbmarshal . Label , honorLabels bool ) [ ] prompbmarshal . Label {
dstLen := len ( dst )
dst = append ( dst , prompbmarshal . Label {
Name : "__name__" ,
Value : metric ,
} )
for i := range src {
tag := & src [ i ]
dst = append ( dst , prompbmarshal . Label {
Name : tag . Key ,
Value : tag . Value ,
} )
}
dst = append ( dst , extraLabels ... )
labels := dst [ dstLen : ]
if len ( labels ) <= 1 {
// Fast path - only a single label.
return dst
}
// de-duplicate labels
dstLabels := labels [ : 0 ]
for i := range labels {
label := & labels [ i ]
prevLabel := promrelabel . GetLabelByName ( dstLabels , label . Name )
if prevLabel == nil {
dstLabels = append ( dstLabels , * label )
continue
}
if honorLabels {
// Skip the extra label with the same name.
continue
}
// Rename the prevLabel to "exported_" + label.Name.
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
exportedName := "exported_" + label . Name
if promrelabel . GetLabelByName ( dstLabels , exportedName ) != nil {
// Override duplicate with the current label.
* prevLabel = * label
continue
}
prevLabel . Name = exportedName
dstLabels = append ( dstLabels , * label )
}
return dst [ : dstLen + len ( dstLabels ) ]
}