diff --git a/CHANGELOG.md b/CHANGELOG.md index 48612790c1..310f90ba1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ * `num_or_scalar op foo{filters} op bar` -> `num_or_scalar op foo{filters} op bar{filters}` * FEATURE: improve time series search for queries with multiple label filters. I.e. `foo{label1="value", label2=~"regexp"}`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/781 +* FEATURE: vmagent: add `stream parse` mode. This mode allows reducing memory usage when individual scrape targets expose tens of millions of metrics. + For example, during scraping Prometheus in [federation](https://prometheus.io/docs/prometheus/latest/federation/) mode. + See `-promscrape.streamParse` command-line option and `stream_parse: true` config option for `scrape_config` section in `-promscrape.config`. + See also [troubleshooting docs for vmagent](https://victoriametrics.github.io/vmagent.html#troubleshooting). * FEATURE: vmalert: add `-dryRun` command-line option for validating the provided config files without the need to start `vmalert` service. * FEATURE: accept optional third argument of string type at `topk_*` and `bottomk_*` functions. This is label name for additional time series to return with the sum of time series outside top/bottom K. See [MetricsQL docs](https://victoriametrics.github.io/MetricsQL.html) for more details. * FEATURE: vmagent: expose `/api/v1/targets` page according to [the corresponding Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/#targets). diff --git a/app/vmagent/README.md b/app/vmagent/README.md index ed1c90b124..22d2c64030 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -231,6 +231,24 @@ This information may be useful for debugging target relabeling. by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` and `http://vmagent-host:8429/api/v1/targets`. +* If `vmagent` scrapes targets with millions of metrics per each target (for instance, when scraping [federation endpoints](https://prometheus.io/docs/prometheus/latest/federation/)), + then it is recommended enabling `stream parsing mode` in order to reduce memory usage during scraping. This mode may be enabled either globally for all the scrape targets + by passing `-promscrape.streamParse` command-line flag or on a per-scrape target basis with `stream_parse: true` option. For example: + + ```yml + scrape_configs: + - job_name: 'big-federate' + stream_parse: true + static_configs: + - targets: + - big-prometeus1 + - big-prometeus2 + honor_labels: true + metrics_path: /federate + params: + 'match[]': ['{__name__!=""}'] + ``` + * It is recommended to increase `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` metric exported at `http://vmagent-host:8429/metrics` page constantly grows. * If you see gaps on the data pushed by `vmagent` to remote storage when `-remoteWrite.maxDiskUsagePerURL` is set, then try increasing `-remoteWrite.queues`. diff --git a/docs/vmagent.md b/docs/vmagent.md index ed1c90b124..22d2c64030 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -231,6 +231,24 @@ This information may be useful for debugging target relabeling. by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets` and `http://vmagent-host:8429/api/v1/targets`. +* If `vmagent` scrapes targets with millions of metrics per each target (for instance, when scraping [federation endpoints](https://prometheus.io/docs/prometheus/latest/federation/)), + then it is recommended enabling `stream parsing mode` in order to reduce memory usage during scraping. This mode may be enabled either globally for all the scrape targets + by passing `-promscrape.streamParse` command-line flag or on a per-scrape target basis with `stream_parse: true` option. For example: + + ```yml + scrape_configs: + - job_name: 'big-federate' + stream_parse: true + static_configs: + - targets: + - big-prometeus1 + - big-prometeus2 + honor_labels: true + metrics_path: /federate + params: + 'match[]': ['{__name__!=""}'] + ``` + * It is recommended to increase `-remoteWrite.queues` if `vmagent_remotewrite_pending_data_bytes` metric exported at `http://vmagent-host:8429/metrics` page constantly grows. * If you see gaps on the data pushed by `vmagent` to remote storage when `-remoteWrite.maxDiskUsagePerURL` is set, then try increasing `-remoteWrite.queues`. diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index e7f4d0f5e6..1a0e068277 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -1,13 +1,18 @@ package promscrape import ( + "context" "crypto/tls" "flag" "fmt" + "io" + "io/ioutil" + "net/http" "strings" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/metrics" ) @@ -22,11 +27,19 @@ var ( "This may be useful when targets has no support for HTTP keep-alive connection. "+ "It is possible to set `disable_keepalive: true` individually per each 'scrape_config` section in '-promscrape.config' for fine grained control. "+ "Note that disabling HTTP keep-alive may increase load on both vmagent and scrape targets") + streamParse = flag.Bool("promscrape.streamParse", false, "Whether to enable stream parsing for metrics obtained from scrape targets. This may be useful "+ + "for reducing memory usage when millions of metrics are exposed per each scrape target. "+ + "It is posible to set `stream_parse: true` individually per each `scrape_config` section in `-promscrape.config` for fine grained control") ) type client struct { + // hc is the default client optimized for common case of scraping targets with moderate number of metrics. hc *fasthttp.HostClient + // sc (aka `stream client`) is used instead of hc if ScrapeWork.ParseStream is set. + // It may be useful for scraping targets with millions of metrics per target. + sc *http.Client + scrapeURL string host string requestURI string @@ -64,8 +77,23 @@ func newClient(sw *ScrapeWork) *client { MaxResponseBodySize: maxScrapeSize.N, MaxIdempotentRequestAttempts: 1, } + var sc *http.Client + if *streamParse || sw.StreamParse { + sc = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 2 * sw.ScrapeInterval, + DisableCompression: *disableCompression || sw.DisableCompression, + DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive, + DialContext: statStdDial, + }, + Timeout: sw.ScrapeTimeout, + } + } return &client{ hc: hc, + sc: sc, scrapeURL: sw.ScrapeURL, host: host, @@ -76,6 +104,43 @@ func newClient(sw *ScrapeWork) *client { } } +func (c *client) GetStreamReader() (*streamReader, error) { + deadline := time.Now().Add(c.hc.ReadTimeout) + ctx, cancel := context.WithDeadline(context.Background(), deadline) + req, err := http.NewRequestWithContext(ctx, "GET", c.scrapeURL, nil) + if err != nil { + cancel() + return nil, fmt.Errorf("cannot create request for %q: %w", c.scrapeURL, err) + } + // The following `Accept` header has been copied from Prometheus sources. + // See https://github.com/prometheus/prometheus/blob/f9d21f10ecd2a343a381044f131ea4e46381ce09/scrape/scrape.go#L532 . + // This is needed as a workaround for scraping stupid Java-based servers such as Spring Boot. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details. + // Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now. + req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1") + if c.authHeader != "" { + req.Header.Set("Authorization", c.authHeader) + } + resp, err := c.sc.Do(req) + if err != nil { + cancel() + return nil, fmt.Errorf("cannot scrape %q: %w", c.scrapeURL, err) + } + if resp.StatusCode != http.StatusOK { + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_scrapes_total{status_code="%d"}`, resp.StatusCode)).Inc() + respBody, _ := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + cancel() + return nil, fmt.Errorf("unexpected status code returned when scraping %q: %d; expecting %d; response body: %q", + c.scrapeURL, resp.StatusCode, http.StatusOK, respBody) + } + scrapesOK.Inc() + return &streamReader{ + r: resp.Body, + cancel: cancel, + }, nil +} + func (c *client) ReadData(dst []byte) ([]byte, error) { deadline := time.Now().Add(c.hc.ReadTimeout) req := fasthttp.AcquireRequest() @@ -87,7 +152,7 @@ func (c *client) ReadData(dst []byte) ([]byte, error) { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/608 for details. // Do not bloat the `Accept` header with OpenMetrics shit, since it looks like dead standard now. req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1") - if !*disableCompression || c.disableCompression { + if !*disableCompression && !c.disableCompression { req.Header.Set("Accept-Encoding", "gzip") } if *disableKeepAlive || c.disableKeepAlive { @@ -185,3 +250,22 @@ func doRequestWithPossibleRetry(hc *fasthttp.HostClient, req *fasthttp.Request, } } } + +type streamReader struct { + r io.ReadCloser + cancel context.CancelFunc + bytesRead int64 +} + +func (sr *streamReader) Read(p []byte) (int, error) { + n, err := sr.r.Read(p) + sr.bytesRead += int64(n) + return n, err +} + +func (sr *streamReader) MustClose() { + sr.cancel() + if err := sr.r.Close(); err != nil { + logger.Errorf("cannot close reader: %s", err) + } +} diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 64287eadea..458a8dd318 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -84,6 +84,7 @@ type ScrapeConfig struct { // These options are supported only by lib/promscrape. DisableCompression bool `yaml:"disable_compression"` DisableKeepAlive bool `yaml:"disable_keepalive"` + StreamParse bool `yaml:"stream_parse"` // This is set in loadConfig swc *scrapeWorkConfig @@ -473,6 +474,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf sampleLimit: sc.SampleLimit, disableCompression: sc.DisableCompression, disableKeepAlive: sc.DisableKeepAlive, + streamParse: sc.StreamParse, } return swc, nil } @@ -493,6 +495,7 @@ type scrapeWorkConfig struct { sampleLimit int disableCompression bool disableKeepAlive bool + streamParse bool } func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) { @@ -699,6 +702,7 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex SampleLimit: swc.sampleLimit, DisableCompression: swc.disableCompression, DisableKeepAlive: swc.disableKeepAlive, + StreamParse: swc.streamParse, jobNameOriginal: swc.jobName, }) diff --git a/lib/promscrape/config_test.go b/lib/promscrape/config_test.go index e84567b404..f1dd2727bb 100644 --- a/lib/promscrape/config_test.go +++ b/lib/promscrape/config_test.go @@ -1276,6 +1276,7 @@ scrape_configs: sample_limit: 100 disable_keepalive: true disable_compression: true + stream_parse: true static_configs: - targets: - 192.168.1.2 # SNMP device. @@ -1328,6 +1329,7 @@ scrape_configs: SampleLimit: 100, DisableKeepAlive: true, DisableCompression: true, + StreamParse: true, jobNameOriginal: "snmp", }, }) diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 4451820cd6..341e66462a 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -334,6 +334,7 @@ func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.Wr sc.sw.Config = *sw sc.sw.ScrapeGroup = group sc.sw.ReadData = c.ReadData + sc.sw.GetStreamReader = c.GetStreamReader sc.sw.PushData = pushData return sc } diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 939fff88da..81515c1571 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -82,19 +82,22 @@ type ScrapeWork struct { // Whether to disable HTTP keep-alive when querying ScrapeURL. DisableKeepAlive bool + // Whether to parse target responses in a streaming manner. + StreamParse bool + // The original 'job_name' jobNameOriginal string } // key returns unique identifier for the given sw. // -// it can be used for comparing for equality two ScrapeWork objects. +// it can be used for comparing for equality for two ScrapeWork objects. func (sw *ScrapeWork) key() string { // Do not take into account OriginalLabels. key := fmt.Sprintf("ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, Labels=%s, "+ - "AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v", + "AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v, StreamParse=%v", sw.ScrapeURL, sw.ScrapeInterval, sw.ScrapeTimeout, sw.HonorLabels, sw.HonorTimestamps, sw.LabelsString(), - sw.AuthConfig.String(), sw.metricRelabelConfigsString(), sw.SampleLimit, sw.DisableCompression, sw.DisableKeepAlive) + sw.AuthConfig.String(), sw.metricRelabelConfigsString(), sw.SampleLimit, sw.DisableCompression, sw.DisableKeepAlive, sw.StreamParse) return key } @@ -132,6 +135,9 @@ type scrapeWork struct { // ReadData is called for reading the data. ReadData func(dst []byte) ([]byte, error) + // GetStreamReader is called if Config.StreamParse is set. + GetStreamReader func() (*streamReader, error) + // PushData is called for pushing collected data. PushData func(wr *prompbmarshal.WriteRequest) @@ -221,6 +227,15 @@ var ( ) func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error { + if *streamParse || sw.Config.StreamParse { + // Read data from scrape targets in streaming manner. + // This case is optimized for targets exposing millions and more of metrics per target. + return sw.scrapeStream(scrapeTimestamp, realTimestamp) + } + + // Common case: read all the data from scrape target to memory (body) and then process it. + // This case should work more optimally for than stream parse code above for common case when scrape target exposes + // up to a few thouthand metrics. body := leveledbytebufferpool.Get(sw.prevBodyLen) var err error body.B, err = sw.ReadData(body.B[:0]) @@ -281,6 +296,66 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error return err } +func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { + sr, err := sw.GetStreamReader() + if err != nil { + return fmt.Errorf("cannot read data: %s", err) + } + samplesScraped := 0 + samplesPostRelabeling := 0 + wc := writeRequestCtxPool.Get(sw.prevRowsLen) + var mu sync.Mutex + err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error { + mu.Lock() + defer mu.Unlock() + samplesScraped += len(rows) + for i := range rows { + sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) + if len(wc.labels) > 40000 { + // Limit the maximum size of wc.writeRequest. + // This should reduce memory usage when scraping targets with millions of metrics and/or labels. + // For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/ + samplesPostRelabeling += len(wc.writeRequest.Timeseries) + sw.updateSeriesAdded(wc) + startTime := time.Now() + sw.PushData(&wc.writeRequest) + pushDataDuration.UpdateDuration(startTime) + wc.resetNoRows() + } + } + return nil + }) + scrapedSamples.Update(float64(samplesScraped)) + endTimestamp := time.Now().UnixNano() / 1e6 + duration := float64(endTimestamp-realTimestamp) / 1e3 + scrapeDuration.Update(duration) + scrapeResponseSize.Update(float64(sr.bytesRead)) + sr.MustClose() + up := 1 + if err != nil { + if samplesScraped == 0 { + up = 0 + } + scrapesFailed.Inc() + } + samplesPostRelabeling += len(wc.writeRequest.Timeseries) + sw.updateSeriesAdded(wc) + seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling) + sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) + startTime := time.Now() + sw.PushData(&wc.writeRequest) + pushDataDuration.UpdateDuration(startTime) + sw.prevRowsLen = len(wc.rows.Rows) + wc.reset() + writeRequestCtxPool.Put(wc) + tsmGlobal.Update(&sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err) + return nil +} + // leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx // structs contain mixed number of labels. // diff --git a/lib/promscrape/statconn.go b/lib/promscrape/statconn.go index 10d45f4aa0..420b8b624f 100644 --- a/lib/promscrape/statconn.go +++ b/lib/promscrape/statconn.go @@ -1,14 +1,48 @@ package promscrape import ( + "context" "net" + "sync" "sync/atomic" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/metrics" ) +func statStdDial(ctx context.Context, network, addr string) (net.Conn, error) { + d := getStdDialer() + conn, err := d.DialContext(ctx, network, addr) + dialsTotal.Inc() + if err != nil { + dialErrors.Inc() + return nil, err + } + conns.Inc() + sc := &statConn{ + Conn: conn, + } + return sc, nil +} + +func getStdDialer() *net.Dialer { + stdDialerOnce.Do(func() { + stdDialer = &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: netutil.TCP6Enabled(), + } + }) + return stdDialer +} + +var ( + stdDialer *net.Dialer + stdDialerOnce sync.Once +) + func statDial(addr string) (conn net.Conn, err error) { if netutil.TCP6Enabled() { conn, err = fasthttp.DialDualStack(addr) diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 247526b8bd..027ecc4589 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -23,7 +23,8 @@ var ( // ParseStream parses csv from req and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from req. +// The callback can be called concurrently multiple times for streamed data from req. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(req *http.Request, callback func(rows []Row) error) error { diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 2eea19bd5a..7fa734f9f1 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -23,7 +23,8 @@ var ( // ParseStream parses Graphite lines from r and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from r. +// The callback can be called concurrently multiple times for streamed data from r. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index 408d0a3445..d302e3ef01 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -24,7 +24,8 @@ var ( // ParseStream parses r with the given args and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from r. +// The callback can be called concurrently multiple times for streamed data from r. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error { diff --git a/lib/protoparser/native/streamparser.go b/lib/protoparser/native/streamparser.go index 2f362183f3..f8fca92683 100644 --- a/lib/protoparser/native/streamparser.go +++ b/lib/protoparser/native/streamparser.go @@ -17,7 +17,8 @@ import ( // ParseStream parses /api/v1/import/native lines from req and calls callback for parsed blocks. // -// The callback can be called multiple times for streamed data from req. +// The callback can be called concurrently multiple times for streamed data from req. +// The callback can be called after ParseStream returns. // // callback shouldn't hold block after returning. // callback can be called in parallel from multiple concurrent goroutines. diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index cf2b4f89eb..752f1cd5e3 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -23,7 +23,8 @@ var ( // ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from r. +// The callback can be called concurrently multiple times for streamed data from r. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index 9a7ffaa5f3..5a1f9012e7 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -26,7 +26,8 @@ var ( // ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from req. +// The callback can be called concurrently multiple times for streamed data from req. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(req *http.Request, callback func(rows []Row) error) error { diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index cfee948cbd..8c915c9fd3 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -16,7 +16,8 @@ import ( // ParseStream parses lines with Prometheus exposition format from r and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from r. +// The callback can be called concurrently multiple times for streamed data from r. +// It is guaranteed that the callback isn't called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error { @@ -32,11 +33,17 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = callback + uw.callback = func(rows []Row) error { + err := callback(rows) + ctx.wg.Done() + return err + } uw.defaultTimestamp = defaultTimestamp uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf + ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) } + ctx.wg.Wait() // wait for all the outstanding callback calls before returning return ctx.Error() } @@ -61,6 +68,8 @@ type streamContext struct { reqBuf []byte tailBuf []byte err error + + wg sync.WaitGroup } func (ctx *streamContext) Error() error { diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index 4312310d62..0ce5632ce5 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -21,6 +21,9 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102 // ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries. // +// The callback can be called concurrently multiple times for streamed data from req. +// The callback can be called after ParseStream returns. +// // callback shouldn't hold tss after returning. func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error) error { ctx := getPushCtx(req.Body) diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 915ac18e8e..66ad6c3221 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -20,10 +20,10 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maxi // ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows. // -// The callback can be called multiple times for streamed data from req. +// The callback can be called concurrently multiple times for streamed data from req. +// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. -// callback is called from multiple concurrent goroutines. func ParseStream(req *http.Request, callback func(rows []Row) error) error { r := req.Body if req.Header.Get("Content-Encoding") == "gzip" {