lib/promscrape: add stream parse mode for efficient scraping of targets that expose millions of metrics

This commit is contained in:
Aliaksandr Valialkin 2020-11-01 23:12:13 +02:00
parent 901514be88
commit ed724d25ba
18 changed files with 272 additions and 14 deletions

View File

@ -9,6 +9,10 @@
* `num_or_scalar op foo{filters} op bar` -> `num_or_scalar op foo{filters} op bar{filters}` * `num_or_scalar op foo{filters} op bar` -> `num_or_scalar op foo{filters} op bar{filters}`
* FEATURE: improve time series search for queries with multiple label filters. I.e. `foo{label1="value", label2=~"regexp"}`. * FEATURE: improve time series search for queries with multiple label filters. I.e. `foo{label1="value", label2=~"regexp"}`.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/781 See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/781
* FEATURE: vmagent: add `stream parse` mode. This mode allows reducing memory usage when individual scrape targets expose tens of millions of metrics.
For example, during scraping Prometheus in [federation](https://prometheus.io/docs/prometheus/latest/federation/) mode.
See `-promscrape.streamParse` command-line option and `stream_parse: true` config option for `scrape_config` section in `-promscrape.config`.
See also [troubleshooting docs for vmagent](https://victoriametrics.github.io/vmagent.html#troubleshooting).
* FEATURE: vmalert: add `-dryRun` command-line option for validating the provided config files without the need to start `vmalert` service. * FEATURE: vmalert: add `-dryRun` command-line option for validating the provided config files without the need to start `vmalert` service.
* FEATURE: accept optional third argument of string type at `topk_*` and `bottomk_*` functions. This is label name for additional time series to return with the sum of time series outside top/bottom K. See [MetricsQL docs](https://victoriametrics.github.io/MetricsQL.html) for more details. * FEATURE: accept optional third argument of string type at `topk_*` and `bottomk_*` functions. This is label name for additional time series to return with the sum of time series outside top/bottom K. See [MetricsQL docs](https://victoriametrics.github.io/MetricsQL.html) for more details.
* FEATURE: vmagent: expose `/api/v1/targets` page according to [the corresponding Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). * FEATURE: vmagent: expose `/api/v1/targets` page according to [the corresponding Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets).

View File

@ -231,6 +231,24 @@ This information may be useful for debugging target relabeling.
by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets`
and `http://vmagent-host:8429/api/v1/targets`. and `http://vmagent-host:8429/api/v1/targets`.
* If `vmagent` scrapes targets with millions of metrics per each target (for instance, when scraping [federation endpoints](https://prometheus.io/docs/prometheus/latest/federation/)),
then it is recommended enabling `stream parsing mode` in order to reduce memory usage during scraping. This mode may be enabled either globally for all the scrape targets
by passing `-promscrape.streamParse` command-line flag or on a per-scrape target basis with `stream_parse: true` option. For example:
```yml
scrape_configs:
- job_name: 'big-federate'
stream_parse: true
static_configs:
- targets:
- big-prometeus1
- big-prometeus2
honor_labels: true
metrics_path: /federate
params:
'match[]': ['{__name__!=""}']
```
* It is recommended to increase `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` metric exported at `http://vmagent-host:8429/metrics` page constantly grows. * It is recommended to increase `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` metric exported at `http://vmagent-host:8429/metrics` page constantly grows.
* If you see gaps on the data pushed by `vmagent` to remote storage when `-remoteWrite.maxDiskUsagePerURL` is set, then try increasing `-remoteWrite.queues`. * If you see gaps on the data pushed by `vmagent` to remote storage when `-remoteWrite.maxDiskUsagePerURL` is set, then try increasing `-remoteWrite.queues`.

View File

@ -231,6 +231,24 @@ This information may be useful for debugging target relabeling.
by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets`
and `http://vmagent-host:8429/api/v1/targets`. and `http://vmagent-host:8429/api/v1/targets`.
* If `vmagent` scrapes targets with millions of metrics per each target (for instance, when scraping [federation endpoints](https://prometheus.io/docs/prometheus/latest/federation/)),
then it is recommended enabling `stream parsing mode` in order to reduce memory usage during scraping. This mode may be enabled either globally for all the scrape targets
by passing `-promscrape.streamParse` command-line flag or on a per-scrape target basis with `stream_parse: true` option. For example:
```yml
scrape_configs:
- job_name: 'big-federate'
stream_parse: true
static_configs:
- targets:
- big-prometeus1
- big-prometeus2
honor_labels: true
metrics_path: /federate
params:
'match[]': ['{__name__!=""}']
```
* It is recommended to increase `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` metric exported at `http://vmagent-host:8429/metrics` page constantly grows. * It is recommended to increase `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` metric exported at `http://vmagent-host:8429/metrics` page constantly grows.
* If you see gaps on the data pushed by `vmagent` to remote storage when `-remoteWrite.maxDiskUsagePerURL` is set, then try increasing `-remoteWrite.queues`. * If you see gaps on the data pushed by `vmagent` to remote storage when `-remoteWrite.maxDiskUsagePerURL` is set, then try increasing `-remoteWrite.queues`.

View File

@ -1,13 +1,18 @@
package promscrape package promscrape
import ( import (
"context"
"crypto/tls" "crypto/tls"
"flag" "flag"
"fmt" "fmt"
"io"
"io/ioutil"
"net/http"
"strings" "strings"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/fasthttp"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -22,11 +27,19 @@ var (
"This may be useful when targets has no support for HTTP keep-alive connection. "+ "This may be useful when targets has no support for HTTP keep-alive connection. "+
"It is possible to set `disable_keepalive: true` individually per each 'scrape_config` section in '-promscrape.config' for fine grained control. "+ "It is possible to set `disable_keepalive: true` individually per each 'scrape_config` section in '-promscrape.config' for fine grained control. "+
"Note that disabling HTTP keep-alive may increase load on both vmagent and scrape targets") "Note that disabling HTTP keep-alive may increase load on both vmagent and scrape targets")
streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+
"for reducing memory usage when millions of metrics are exposed per each scrape target. "+
"It is posible to set `stream_parse: true` individually per each `scrape_config` section in `-promscrape.config` for fine grained control")
) )
type client struct { type client struct {
// hc is the default client optimized for common case of scraping targets with moderate number of metrics.
hc *fasthttp.HostClient hc *fasthttp.HostClient
// sc (aka `stream client`) is used instead of hc if ScrapeWork.ParseStream is set.
// It may be useful for scraping targets with millions of metrics per target.
sc *http.Client
scrapeURL string scrapeURL string
host string host string
requestURI string requestURI string
@ -64,8 +77,23 @@ func newClient(sw *ScrapeWork) *client {
MaxResponseBodySize: maxScrapeSize.N, MaxResponseBodySize: maxScrapeSize.N,
MaxIdempotentRequestAttempts: 1, MaxIdempotentRequestAttempts: 1,
} }
var sc *http.Client
if *streamParse || sw.StreamParse {
sc = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
TLSHandshakeTimeout: 10 * time.Second,
IdleConnTimeout: 2 * sw.ScrapeInterval,
DisableCompression: *disableCompression || sw.DisableCompression,
DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive,
DialContext: statStdDial,
},
Timeout: sw.ScrapeTimeout,
}
}
return &client{ return &client{
hc: hc, hc: hc,
sc: sc,
scrapeURL: sw.ScrapeURL, scrapeURL: sw.ScrapeURL,
host: host, host: host,
@ -76,6 +104,43 @@ func newClient(sw *ScrapeWork) *client {
} }
} }
func (c *client) GetStreamReader() (*streamReader, error) {
deadline := time.Now().Add(c.hc.ReadTimeout)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
req, err := http.NewRequestWithContext(ctx, "GET", c.scrapeURL, nil)
if err != nil {
cancel()
return nil, fmt.Errorf("cannot create request for %q: %w", c.scrapeURL, err)
}
// The following `Accept` header has been copied from Prometheus sources.
// See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 .
// This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader)
}
resp, err := c.sc.Do(req)
if err != nil {
cancel()
return nil, fmt.Errorf("cannot scrape %q: %w", c.scrapeURL, err)
}
if resp.StatusCode != http.StatusOK {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, resp.StatusCode)).Inc()
respBody, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
cancel()
return nil, fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q",
c.scrapeURL, resp.StatusCode, http.StatusOK, respBody)
}
scrapesOK.Inc()
return &streamReader{
r: resp.Body,
cancel: cancel,
}, nil
}
func (c *client) ReadData(dst []byte) ([]byte, error) { func (c *client) ReadData(dst []byte) ([]byte, error) {
deadline := time.Now().Add(c.hc.ReadTimeout) deadline := time.Now().Add(c.hc.ReadTimeout)
req := fasthttp.AcquireRequest() req := fasthttp.AcquireRequest()
@ -87,7 +152,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details.
// Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now. // Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now.
req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1") req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1")
if !*disableCompression || c.disableCompression { if !*disableCompression && !c.disableCompression {
req.Header.Set("Accept-Encoding", "gzip") req.Header.Set("Accept-Encoding", "gzip")
} }
if *disableKeepAlive || c.disableKeepAlive { if *disableKeepAlive || c.disableKeepAlive {
@ -185,3 +250,22 @@ func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request,
} }
} }
} }
type streamReader struct {
r io.ReadCloser
cancel context.CancelFunc
bytesRead int64
}
func (sr *streamReader) Read(p []byte) (int, error) {
n, err := sr.r.Read(p)
sr.bytesRead += int64(n)
return n, err
}
func (sr *streamReader) MustClose() {
sr.cancel()
if err := sr.r.Close(); err != nil {
logger.Errorf("cannot close reader: %s", err)
}
}

View File

@ -84,6 +84,7 @@ type ScrapeConfig struct {
// These options are supported only by lib/promscrape. // These options are supported only by lib/promscrape.
DisableCompression bool `yaml:"disable_compression"` DisableCompression bool `yaml:"disable_compression"`
DisableKeepAlive bool `yaml:"disable_keepalive"` DisableKeepAlive bool `yaml:"disable_keepalive"`
StreamParse bool `yaml:"stream_parse"`
// This is set in loadConfig // This is set in loadConfig
swc *scrapeWorkConfig swc *scrapeWorkConfig
@ -473,6 +474,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
sampleLimit: sc.SampleLimit, sampleLimit: sc.SampleLimit,
disableCompression: sc.DisableCompression, disableCompression: sc.DisableCompression,
disableKeepAlive: sc.DisableKeepAlive, disableKeepAlive: sc.DisableKeepAlive,
streamParse: sc.StreamParse,
} }
return swc, nil return swc, nil
} }
@ -493,6 +495,7 @@ type scrapeWorkConfig struct {
sampleLimit int sampleLimit int
disableCompression bool disableCompression bool
disableKeepAlive bool disableKeepAlive bool
streamParse bool
} }
func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
@ -699,6 +702,7 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex
SampleLimit: swc.sampleLimit, SampleLimit: swc.sampleLimit,
DisableCompression: swc.disableCompression, DisableCompression: swc.disableCompression,
DisableKeepAlive: swc.disableKeepAlive, DisableKeepAlive: swc.disableKeepAlive,
StreamParse: swc.streamParse,
jobNameOriginal: swc.jobName, jobNameOriginal: swc.jobName,
}) })

View File

@ -1276,6 +1276,7 @@ scrape_configs:
sample_limit: 100 sample_limit: 100
disable_keepalive: true disable_keepalive: true
disable_compression: true disable_compression: true
stream_parse: true
static_configs: static_configs:
- targets: - targets:
- 192.168.1.2 # SNMP device. - 192.168.1.2 # SNMP device.
@ -1328,6 +1329,7 @@ scrape_configs:
SampleLimit: 100, SampleLimit: 100,
DisableKeepAlive: true, DisableKeepAlive: true,
DisableCompression: true, DisableCompression: true,
StreamParse: true,
jobNameOriginal: "snmp", jobNameOriginal: "snmp",
}, },
}) })

View File

@ -334,6 +334,7 @@ func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.Wr
sc.sw.Config = *sw sc.sw.Config = *sw
sc.sw.ScrapeGroup = group sc.sw.ScrapeGroup = group
sc.sw.ReadData = c.ReadData sc.sw.ReadData = c.ReadData
sc.sw.GetStreamReader = c.GetStreamReader
sc.sw.PushData = pushData sc.sw.PushData = pushData
return sc return sc
} }

View File

@ -82,19 +82,22 @@ type ScrapeWork struct {
// Whether to disable HTTP keep-alive when querying ScrapeURL. // Whether to disable HTTP keep-alive when querying ScrapeURL.
DisableKeepAlive bool DisableKeepAlive bool
// Whether to parse target responses in a streaming manner.
StreamParse bool
// The original 'job_name' // The original 'job_name'
jobNameOriginal string jobNameOriginal string
} }
// key returns unique identifier for the given sw. // key returns unique identifier for the given sw.
// //
// it can be used for comparing for equality two ScrapeWork objects. // it can be used for comparing for equality for two ScrapeWork objects.
func (sw *ScrapeWork) key() string { func (sw *ScrapeWork) key() string {
// Do not take into account OriginalLabels. // Do not take into account OriginalLabels.
key := fmt.Sprintf("ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, Labels=%s, "+ key := fmt.Sprintf("ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, Labels=%s, "+
"AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v", "AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v, StreamParse=%v",
sw.ScrapeURL, sw.ScrapeInterval, sw.ScrapeTimeout, sw.HonorLabels, sw.HonorTimestamps, sw.LabelsString(), sw.ScrapeURL, sw.ScrapeInterval, sw.ScrapeTimeout, sw.HonorLabels, sw.HonorTimestamps, sw.LabelsString(),
sw.AuthConfig.String(), sw.metricRelabelConfigsString(), sw.SampleLimit, sw.DisableCompression, sw.DisableKeepAlive) sw.AuthConfig.String(), sw.metricRelabelConfigsString(), sw.SampleLimit, sw.DisableCompression, sw.DisableKeepAlive, sw.StreamParse)
return key return key
} }
@ -132,6 +135,9 @@ type scrapeWork struct {
// ReadData is called for reading the data. // ReadData is called for reading the data.
ReadData func(dst []byte) ([]byte, error) ReadData func(dst []byte) ([]byte, error)
// GetStreamReader is called if Config.StreamParse is set.
GetStreamReader func() (*streamReader, error)
// PushData is called for pushing collected data. // PushData is called for pushing collected data.
PushData func(wr *prompbmarshal.WriteRequest) PushData func(wr *prompbmarshal.WriteRequest)
@ -221,6 +227,15 @@ var (
) )
func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error { func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error {
if *streamParse || sw.Config.StreamParse {
// Read data from scrape targets in streaming manner.
// This case is optimized for targets exposing millions and more of metrics per target.
return sw.scrapeStream(scrapeTimestamp, realTimestamp)
}
// Common case: read all the data from scrape target to memory (body) and then process it.
// This case should work more optimally for than stream parse code above for common case when scrape target exposes
// up to a few thouthand metrics.
body := leveledbytebufferpool.Get(sw.prevBodyLen) body := leveledbytebufferpool.Get(sw.prevBodyLen)
var err error var err error
body.B, err = sw.ReadData(body.B[:0]) body.B, err = sw.ReadData(body.B[:0])
@ -281,6 +296,66 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
return err return err
} }
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
sr, err := sw.GetStreamReader()
if err != nil {
return fmt.Errorf("cannot read data: %s", err)
}
samplesScraped := 0
samplesPostRelabeling := 0
wc := writeRequestCtxPool.Get(sw.prevRowsLen)
var mu sync.Mutex
err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error {
mu.Lock()
defer mu.Unlock()
samplesScraped += len(rows)
for i := range rows {
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
if len(wc.labels) > 40000 {
// Limit the maximum size of wc.writeRequest.
// This should reduce memory usage when scraping targets with millions of metrics and/or labels.
// For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
sw.updateSeriesAdded(wc)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
wc.resetNoRows()
}
}
return nil
})
scrapedSamples.Update(float64(samplesScraped))
endTimestamp := time.Now().UnixNano() / 1e6
duration := float64(endTimestamp-realTimestamp) / 1e3
scrapeDuration.Update(duration)
scrapeResponseSize.Update(float64(sr.bytesRead))
sr.MustClose()
up := 1
if err != nil {
if samplesScraped == 0 {
up = 0
}
scrapesFailed.Inc()
}
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
sw.updateSeriesAdded(wc)
seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling)
sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
sw.prevRowsLen = len(wc.rows.Rows)
wc.reset()
writeRequestCtxPool.Put(wc)
tsmGlobal.Update(&sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
return nil
}
// leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx // leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx
// structs contain mixed number of labels. // structs contain mixed number of labels.
// //

View File

@ -1,14 +1,48 @@
package promscrape package promscrape
import ( import (
"context"
"net" "net"
"sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/fasthttp"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
func statStdDial(ctx context.Context, network, addr string) (net.Conn, error) {
d := getStdDialer()
conn, err := d.DialContext(ctx, network, addr)
dialsTotal.Inc()
if err != nil {
dialErrors.Inc()
return nil, err
}
conns.Inc()
sc := &statConn{
Conn: conn,
}
return sc, nil
}
func getStdDialer() *net.Dialer {
stdDialerOnce.Do(func() {
stdDialer = &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: netutil.TCP6Enabled(),
}
})
return stdDialer
}
var (
stdDialer *net.Dialer
stdDialerOnce sync.Once
)
func statDial(addr string) (conn net.Conn, err error) { func statDial(addr string) (conn net.Conn, err error) {
if netutil.TCP6Enabled() { if netutil.TCP6Enabled() {
conn, err = fasthttp.DialDualStack(addr) conn, err = fasthttp.DialDualStack(addr)

View File

@ -23,7 +23,8 @@ var (
// ParseStream parses csv from req and calls callback for the parsed rows. // ParseStream parses csv from req and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from req. // The callback can be called concurrently multiple times for streamed data from req.
// The callback can be called after ParseStream returns.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error { func ParseStream(req *http.Request, callback func(rows []Row) error) error {

View File

@ -23,7 +23,8 @@ var (
// ParseStream parses Graphite lines from r and calls callback for the parsed rows. // ParseStream parses Graphite lines from r and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// The callback can be called after ParseStream returns.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, callback func(rows []Row) error) error { func ParseStream(r io.Reader, callback func(rows []Row) error) error {

View File

@ -24,7 +24,8 @@ var (
// ParseStream parses r with the given args and calls callback for the parsed rows. // ParseStream parses r with the given args and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// The callback can be called after ParseStream returns.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error { func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error {

View File

@ -17,7 +17,8 @@ import (
// ParseStream parses /api/v1/import/native lines from req and calls callback for parsed blocks. // ParseStream parses /api/v1/import/native lines from req and calls callback for parsed blocks.
// //
// The callback can be called multiple times for streamed data from req. // The callback can be called concurrently multiple times for streamed data from req.
// The callback can be called after ParseStream returns.
// //
// callback shouldn't hold block after returning. // callback shouldn't hold block after returning.
// callback can be called in parallel from multiple concurrent goroutines. // callback can be called in parallel from multiple concurrent goroutines.

View File

@ -23,7 +23,8 @@ var (
// ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. // ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// The callback can be called after ParseStream returns.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, callback func(rows []Row) error) error { func ParseStream(r io.Reader, callback func(rows []Row) error) error {

View File

@ -26,7 +26,8 @@ var (
// ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows. // ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from req. // The callback can be called concurrently multiple times for streamed data from req.
// The callback can be called after ParseStream returns.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error { func ParseStream(req *http.Request, callback func(rows []Row) error) error {

View File

@ -16,7 +16,8 @@ import (
// ParseStream parses lines with Prometheus exposition format from r and calls callback for the parsed rows. // ParseStream parses lines with Prometheus exposition format from r and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// It is guaranteed that the callback isn't called after ParseStream returns.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error { func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error {
@ -32,11 +33,17 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f
defer putStreamContext(ctx) defer putStreamContext(ctx)
for ctx.Read() { for ctx.Read() {
uw := getUnmarshalWork() uw := getUnmarshalWork()
uw.callback = callback uw.callback = func(rows []Row) error {
err := callback(rows)
ctx.wg.Done()
return err
}
uw.defaultTimestamp = defaultTimestamp uw.defaultTimestamp = defaultTimestamp
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw) common.ScheduleUnmarshalWork(uw)
} }
ctx.wg.Wait() // wait for all the outstanding callback calls before returning
return ctx.Error() return ctx.Error()
} }
@ -61,6 +68,8 @@ type streamContext struct {
reqBuf []byte reqBuf []byte
tailBuf []byte tailBuf []byte
err error err error
wg sync.WaitGroup
} }
func (ctx *streamContext) Error() error { func (ctx *streamContext) Error() error {

View File

@ -21,6 +21,9 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102
// ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries. // ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries.
// //
// The callback can be called concurrently multiple times for streamed data from req.
// The callback can be called after ParseStream returns.
//
// callback shouldn't hold tss after returning. // callback shouldn't hold tss after returning.
func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error) error { func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error) error {
ctx := getPushCtx(req.Body) ctx := getPushCtx(req.Body)

View File

@ -20,10 +20,10 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maxi
// ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows. // ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from req. // The callback can be called concurrently multiple times for streamed data from req.
// The callback can be called after ParseStream returns.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
// callback is called from multiple concurrent goroutines.
func ParseStream(req *http.Request, callback func(rows []Row) error) error { func ParseStream(req *http.Request, callback func(rows []Row) error) error {
r := req.Body r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" { if req.Header.Get("Content-Encoding") == "gzip" {