lib/promscrape: make concurrency control optional (#5073)

* lib/promscrape: make concurrency control optional

Before, `-maxConcurrentInserts` was limiting all calls to `promscrape.Parse`
function: during ingestion and scraping. This behavior is incorrect.
Cmd-line flag `-maxConcurrentInserts` should have effect onl on ingestion.

Since both pipelines use the same `promscrape.Parse` function, we extend it
to make concurrency limiter optional. So caller can decide whether concurrency
should be limited or not.

This commit makes c53b5788b4
obsolete.

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* Revert "dashboards: move `Concurrent inserts` panel to Troubleshooting section"

This reverts commit c53b5788b4.

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2023-10-02 21:32:11 +02:00 committed by Aliaksandr Valialkin
parent 40973d37da
commit 1f2cb594d9
No known key found for this signature in database
GPG Key ID: A72BEC6CD3D0DED1
8 changed files with 300 additions and 310 deletions

View File

@ -32,7 +32,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return stream.Parse(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return stream.Parse(req.Body, defaultTimestamp, isGzipped, true, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
}, func(s string) {
httpserver.LogError(req, s)

View File

@ -32,7 +32,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err
}
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return stream.Parse(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return stream.Parse(req.Body, defaultTimestamp, isGzipped, true, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
}, func(s string) {
httpserver.LogError(req, s)

View File

@ -2373,8 +2373,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2390,7 +2389,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 36
"y": 4
},
"id": 92,
"options": {
@ -2476,8 +2475,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2493,7 +2491,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 36
"y": 4
},
"id": 95,
"options": {
@ -2582,8 +2580,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
},
{
"color": "red",
@ -2599,7 +2596,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 44
"y": 12
},
"id": 98,
"options": {
@ -2688,8 +2685,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
},
{
"color": "red",
@ -2705,7 +2701,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 44
"y": 12
},
"id": 99,
"options": {
@ -2793,8 +2789,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2810,7 +2805,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 52
"y": 20
},
"id": 79,
"links": [],
@ -2899,8 +2894,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2916,7 +2910,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 52
"y": 20
},
"id": 18,
"links": [
@ -3010,8 +3004,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -3027,7 +3020,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 60
"y": 28
},
"id": 127,
"links": [],
@ -3114,8 +3107,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -3131,7 +3123,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 60
"y": 28
},
"id": 50,
"options": {
@ -3169,123 +3161,6 @@
"title": "Invalid datapoints rate ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"description": "Shows how many concurrent inserts (parsing and processing of scraped or ingested data) are taking place.\n\nIf the number of concurrent inserts hits the `limit` or is close to the `limit` constantly - it might be a sign of a resource shortage.\n\nIf vmagent's CPU usage and remote write connection saturation are at normal level, it might be that `-maxConcurrentInserts` cmd-line flag needs to be increased.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"links": [],
"mappings": [],
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 68
},
"id": 130,
"links": [],
"options": {
"legend": {
"calcs": [
"mean",
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"pluginVersion": "9.2.6",
"targets": [
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "max_over_time(vm_concurrent_insert_current{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])",
"interval": "",
"legendFormat": "{{instance}} ({{job}})",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "min(vm_concurrent_insert_capacity{job=~\"$job\", instance=~\"$instance\"}) by(job)",
"interval": "",
"legendFormat": "limit ({{job}})",
"range": true,
"refId": "B"
}
],
"title": "Concurrent inserts ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "victoriametrics-datasource",
@ -3306,8 +3181,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -3347,7 +3221,7 @@
"h": 7,
"w": 24,
"x": 0,
"y": 76
"y": 36
},
"id": 129,
"options": {
@ -3366,7 +3240,7 @@
}
]
},
"pluginVersion": "9.2.7",
"pluginVersion": "9.2.6",
"targets": [
{
"datasource": {
@ -4189,7 +4063,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 85
"y": 38
},
"id": 73,
"links": [],
@ -4306,7 +4180,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 85
"y": 38
},
"id": 131,
"links": [],
@ -4345,6 +4219,123 @@
"title": "Rows rate ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"description": "Shows how many concurrent inserts are taking place.\n\nIf the number of concurrent inserts hitting the `limit` or is close to the `limit` constantly - it might be a sign of a resource shortage.\n\n If vmagent's CPU usage and remote write connection saturation are at normal level, it might be that `-maxConcurrentInserts` cmd-line flag need to be increased.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"links": [],
"mappings": [],
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 46
},
"id": 130,
"links": [],
"options": {
"legend": {
"calcs": [
"mean",
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"pluginVersion": "9.2.6",
"targets": [
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "max_over_time(vm_concurrent_insert_current{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])",
"interval": "",
"legendFormat": "{{instance}} ({{job}})",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "min(vm_concurrent_insert_capacity{job=~\"$job\", instance=~\"$instance\"}) by(job)",
"interval": "",
"legendFormat": "limit ({{job}})",
"range": true,
"refId": "B"
}
],
"title": "Concurrent inserts ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "victoriametrics-datasource",
@ -4409,8 +4400,8 @@
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 93
"x": 12,
"y": 46
},
"id": 77,
"links": [],

View File

@ -2372,8 +2372,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2389,7 +2388,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 36
"y": 4
},
"id": 92,
"options": {
@ -2475,8 +2474,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2492,7 +2490,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 36
"y": 4
},
"id": 95,
"options": {
@ -2581,8 +2579,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
},
{
"color": "red",
@ -2598,7 +2595,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 44
"y": 12
},
"id": 98,
"options": {
@ -2687,8 +2684,7 @@
"mode": "absolute",
"steps": [
{
"color": "transparent",
"value": null
"color": "transparent"
},
{
"color": "red",
@ -2704,7 +2700,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 44
"y": 12
},
"id": 99,
"options": {
@ -2792,8 +2788,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2809,7 +2804,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 52
"y": 20
},
"id": 79,
"links": [],
@ -2898,8 +2893,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -2915,7 +2909,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 52
"y": 20
},
"id": 18,
"links": [
@ -3009,8 +3003,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -3026,7 +3019,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 60
"y": 28
},
"id": 127,
"links": [],
@ -3113,8 +3106,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -3130,7 +3122,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 60
"y": 28
},
"id": 50,
"options": {
@ -3168,123 +3160,6 @@
"title": "Invalid datapoints rate ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"description": "Shows how many concurrent inserts (parsing and processing of scraped or ingested data) are taking place.\n\nIf the number of concurrent inserts hits the `limit` or is close to the `limit` constantly - it might be a sign of a resource shortage.\n\nIf vmagent's CPU usage and remote write connection saturation are at normal level, it might be that `-maxConcurrentInserts` cmd-line flag needs to be increased.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"links": [],
"mappings": [],
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 68
},
"id": 130,
"links": [],
"options": {
"legend": {
"calcs": [
"mean",
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"pluginVersion": "9.2.6",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "max_over_time(vm_concurrent_insert_current{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])",
"interval": "",
"legendFormat": "{{instance}} ({{job}})",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "min(vm_concurrent_insert_capacity{job=~\"$job\", instance=~\"$instance\"}) by(job)",
"interval": "",
"legendFormat": "limit ({{job}})",
"range": true,
"refId": "B"
}
],
"title": "Concurrent inserts ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
@ -3305,8 +3180,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
},
{
"color": "red",
@ -3346,7 +3220,7 @@
"h": 7,
"w": 24,
"x": 0,
"y": 76
"y": 36
},
"id": 129,
"options": {
@ -3365,7 +3239,7 @@
}
]
},
"pluginVersion": "9.2.7",
"pluginVersion": "9.2.6",
"targets": [
{
"datasource": {
@ -4188,7 +4062,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 85
"y": 38
},
"id": 73,
"links": [],
@ -4305,7 +4179,7 @@
"h": 8,
"w": 12,
"x": 12,
"y": 85
"y": 38
},
"id": 131,
"links": [],
@ -4344,6 +4218,123 @@
"title": "Rows rate ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"description": "Shows how many concurrent inserts are taking place.\n\nIf the number of concurrent inserts hitting the `limit` or is close to the `limit` constantly - it might be a sign of a resource shortage.\n\n If vmagent's CPU usage and remote write connection saturation are at normal level, it might be that `-maxConcurrentInserts` cmd-line flag need to be increased.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"links": [],
"mappings": [],
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 46
},
"id": 130,
"links": [],
"options": {
"legend": {
"calcs": [
"mean",
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"pluginVersion": "9.2.6",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "max_over_time(vm_concurrent_insert_current{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])",
"interval": "",
"legendFormat": "{{instance}} ({{job}})",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "min(vm_concurrent_insert_capacity{job=~\"$job\", instance=~\"$instance\"}) by(job)",
"interval": "",
"legendFormat": "limit ({{job}})",
"range": true,
"refId": "B"
}
],
"title": "Concurrent inserts ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
@ -4408,8 +4399,8 @@
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 93
"x": 12,
"y": 46
},
"id": 77,
"links": [],

View File

@ -56,11 +56,11 @@ The sandbox cluster installation is running under the constant load generated by
* FEATURE: stop exposing `vm_merge_need_free_disk_space` metric, since it has been appeared that it confuses users while doesn't bring any useful information. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/686#issuecomment-1733844128).
* BUGFIX: [Official Grafana dashboards for VictoriaMetrics](https://grafana.com/orgs/victoriametrics): fix display of ingested rows rate for `Samples ingested/s` and `Samples rate` panels for vmagent's dasbhoard. Previously, not all ingested protocols were accounted in these panels. An extra panel `Rows rate` was added to `Ingestion` section to display the split for rows ingested rate by protocol.
* BUGFIX: [Official Grafana dashboards for VictoriaMetrics](https://grafana.com/orgs/victoriametrics): move vmagent's `Concurrent inserts` panel to Troubleshooting section from `Ingestion` section because this panel is related to both: scraped and ingested data. Before, it could have give a misleading impression that it is related to ingested metrics only.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix the bug causing render looping when switching to heatmap.
* BUGFIX: [VictoriaMetrics enterprise](https://docs.victoriametrics.com/enterprise.html) validate `-dedup.minScrapeInterval` value and `-downsampling.period` intervals are multiples of each other. See [these docs](https://docs.victoriametrics.com/#downsampling).
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): properly copy `appliedRetention.txt` files inside `<-storageDataPath>/{data}` folders during [incremental backups](https://docs.victoriametrics.com/vmbackup.html#incremental-backups). Previously the new `appliedRetention.txt` could be skipped during incremental backups, which could lead to increased load on storage after restoring from backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5005).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress `context canceled` error messages in logs when `vmagent` is reloading service discovery config. This error could appear starting from [v1.93.5](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.5). See [this PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5048).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): remove concurrency limit during parsing of scraped metrics, which was mistakenly applied to it. With this change cmd-line flag `-maxConcurrentInserts` won't have effect on scraping anymore.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): allow passing [median_over_time](https://docs.victoriametrics.com/MetricsQL.html#median_over_time) to [aggr_over_time](https://docs.victoriametrics.com/MetricsQL.html#aggr_over_time). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5034).
* BUGFIX: [vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): fix ingestion via [multitenant url](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels) for opentsdbhttp. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5061). The bug has been introduced in [v1.93.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.2).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix support of legacy DataDog agent, which adds trailing slashes to urls. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5078). Thanks to @maxb for spotting the issue.

View File

@ -587,7 +587,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
if err == nil {
bodyString = bytesutil.ToUnsafeString(sbr.body)
areIdenticalSeries = sw.areIdenticalSeries(lastScrape, bodyString)
err = stream.Parse(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error {
err = stream.Parse(&sbr, scrapeTimestamp, false, false, func(rows []parser.Row) error {
mu.Lock()
defer mu.Unlock()
samplesScraped += len(rows)
@ -808,7 +808,7 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675
var mu sync.Mutex
br := bytes.NewBufferString(bodyString)
err := stream.Parse(br, timestamp, false, func(rows []parser.Row) error {
err := stream.Parse(br, timestamp, false, false, func(rows []parser.Row) error {
mu.Lock()
defer mu.Unlock()
for i := range rows {

View File

@ -20,10 +20,16 @@ import (
// The callback can be called concurrently multiple times for streamed data from r.
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []prometheus.Row) error, errLogger func(string)) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
//
// limitConcurrency defines whether to control the number of concurrent calls to this function.
// It is recommended setting limitConcurrency=true if the caller doesn't have concurrency limits set,
// like /api/v1/write calls.
func Parse(r io.Reader, defaultTimestamp int64, isGzipped, limitConcurrency bool, callback func(rows []prometheus.Row) error, errLogger func(string)) error {
if limitConcurrency {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
}
if isGzipped {
zr, err := common.GetGzipReader(r)
@ -44,7 +50,9 @@ func Parse(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(ro
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
if wcr, ok := r.(*writeconcurrencylimiter.Reader); ok {
wcr.DecConcurrency()
}
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View File

@ -24,7 +24,7 @@ func TestParse(t *testing.T) {
var result []prometheus.Row
var lock sync.Mutex
doneCh := make(chan struct{})
err := Parse(bb, defaultTimestamp, false, func(rows []prometheus.Row) error {
err := Parse(bb, defaultTimestamp, false, true, func(rows []prometheus.Row) error {
lock.Lock()
result = appendRowCopies(result, rows)
if len(result) == len(rowsExpected) {
@ -57,7 +57,7 @@ func TestParse(t *testing.T) {
}
result = nil
doneCh = make(chan struct{})
err = Parse(bb, defaultTimestamp, true, func(rows []prometheus.Row) error {
err = Parse(bb, defaultTimestamp, true, false, func(rows []prometheus.Row) error {
lock.Lock()
result = appendRowCopies(result, rows)
if len(result) == len(rowsExpected) {