app/vmselect: add ability to export data in CSV format via /api/v1/export/csv

This commit is contained in:
Aliaksandr Valialkin 2020-10-12 20:01:51 +03:00
parent 6105756b26
commit 0867dea5fc
7 changed files with 647 additions and 258 deletions

View File

@ -8,6 +8,7 @@
```
* FEATURE: vmagent: add Docker Swarm service discovery (aka [dockerswarm_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config)).
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/656
* FEATURE: add ability to export data in CSV format. See [these docs](https://victoriametrics.github.io/#how-to-export-csv-data) for details.
* FEATURE: vmagent: add `-promscrape.suppressDuplicateScrapeTargetErrors` command-line flag for suppressing `duplicate scrape target` errors.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/651 and https://victoriametrics.github.io/vmagent.html#troubleshooting .
* FEATURE: vmagent: show original labels before relabeling is applied on `duplicate scrape target` errors. This should simplify debugging for incorrect relabeling.

View File

@ -192,6 +192,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `federate` - returns [federated metrics](https://prometheus.io/docs/prometheus/latest/federation/).
- `api/v1/export` - exports raw data in JSON line format. See [this article](https://medium.com/@valyala/analyzing-prometheus-data-with-external-tools-5f3e5e147639) for details.
- `api/v1/export/native` - exports raw data in native binary format. It may be imported into another VictoriaMetrics via `api/v1/import/native` (see above).
- `api/v1/export/csv` - exports data in CSV. It may be imported into another VictoriaMetrics via `api/v1/import/csv` (see above).
- `api/v1/status/tsdb` - for time series stats. See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details.
- `api/v1/status/active_queries` - for currently executed active queries. Note that every `vmselect` maintains an independent list of active queries,
which is returned in the response.

View File

@ -293,6 +293,14 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request,
return true
}
return true
case "prometheus/api/v1/export/csv":
exportCSVRequests.Inc()
if err := prometheus.ExportCSVHandler(startTime, at, w, r); err != nil {
exportCSVErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "prometheus/federate":
federateRequests.Inc()
if err := prometheus.FederateHandler(startTime, at, w, r); err != nil {
@ -416,6 +424,9 @@ var (
exportNativeRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/export/native"}`)
exportNativeErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/prometheus/api/v1/export/native"}`)
exportCSVRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/export/csv"}`)
exportCSVErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/prometheus/api/v1/export/csv"}`)
federateRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/federate"}`)
federateErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/prometheus/federate"}`)

View File

@ -1,10 +1,83 @@
{% import (
"bytes"
"strings"
"time"
"github.com/valyala/quicktemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
) %}
{% stripspace %}
{% func ExportCSVLine(xb *exportBlock, fieldNames []string) %}
{% if len(xb.timestamps) == 0 || len(fieldNames) == 0 %}{% return %}{% endif %}
{% for i, timestamp := range xb.timestamps %}
{% code value := xb.values[i] %}
{%= exportCSVField(xb.mn, fieldNames[0], timestamp, value) %}
{% for _, fieldName := range fieldNames[1:] %}
,
{%= exportCSVField(xb.mn, fieldName, timestamp, value) %}
{% endfor %}
{% newline %}
{% endfor %}
{% endfunc %}
{% func exportCSVField(mn *storage.MetricName, fieldName string, timestamp int64, value float64) %}
{% if fieldName == "__value__" %}
{%f= value %}
{% return %}
{% endif %}
{% if fieldName == "__timestamp__" %}
{%dl timestamp %}
{% return %}
{% endif %}
{% if strings.HasPrefix(fieldName, "__timestamp__:") %}
{% code timeFormat := fieldName[len("__timestamp__:"):] %}
{% switch timeFormat %}
{% case "unix_s" %}
{%dl= timestamp/1000 %}
{% case "unix_ms" %}
{%dl= timestamp %}
{% case "unix_ns" %}
{%dl= timestamp*1e6 %}
{% case "rfc3339" %}
{% code
bb := quicktemplate.AcquireByteBuffer()
bb.B = time.Unix(timestamp/1000, (timestamp%1000)*1e6).AppendFormat(bb.B[:0], time.RFC3339)
%}
{%z= bb.B %}
{% code
quicktemplate.ReleaseByteBuffer(bb)
%}
{% default %}
{% if strings.HasPrefix(timeFormat, "custom:") %}
{% code
layout := timeFormat[len("custom:"):]
bb := quicktemplate.AcquireByteBuffer()
bb.B = time.Unix(timestamp/1000, (timestamp%1000)*1e6).AppendFormat(bb.B[:0], layout)
%}
{% if bytes.ContainsAny(bb.B, `"`+",\n") %}
{%qz bb.B %}
{% else %}
{%z= bb.B %}
{% endif %}
{% code
quicktemplate.ReleaseByteBuffer(bb)
%}
{% else %}
Unsupported timeFormat={%s= timeFormat %}
{% endif %}
{% endswitch %}
{% return %}
{% endif %}
{% code v := mn.GetTagValue(fieldName) %}
{% if bytes.ContainsAny(v, `"`+",\n") %}
{%qz= v %}
{% else %}
{%z= v %}
{% endif %}
{% endfunc %}
{% func ExportPrometheusLine(xb *exportBlock) %}
{% if len(xb.timestamps) == 0 %}{% return %}{% endif %}
{% code bb := quicktemplate.AcquireByteBuffer() %}

View File

@ -6,306 +6,201 @@ package prometheus
//line app/vmselect/prometheus/export.qtpl:1
import (
"bytes"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/export.qtpl:8
//line app/vmselect/prometheus/export.qtpl:12
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/export.qtpl:8
//line app/vmselect/prometheus/export.qtpl:12
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/prometheus/export.qtpl:8
func StreamExportPrometheusLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:9
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:9
return
//line app/vmselect/prometheus/export.qtpl:9
}
//line app/vmselect/prometheus/export.qtpl:10
bb := quicktemplate.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:11
writeprometheusMetricName(bb, xb.mn)
//line app/vmselect/prometheus/export.qtpl:12
for i, ts := range xb.timestamps {
func StreamExportCSVLine(qw422016 *qt422016.Writer, xb *exportBlock, fieldNames []string) {
//line app/vmselect/prometheus/export.qtpl:13
qw422016.N().Z(bb.B)
if len(xb.timestamps) == 0 || len(fieldNames) == 0 {
//line app/vmselect/prometheus/export.qtpl:13
qw422016.N().S(` `)
return
//line app/vmselect/prometheus/export.qtpl:13
}
//line app/vmselect/prometheus/export.qtpl:14
qw422016.N().F(xb.values[i])
//line app/vmselect/prometheus/export.qtpl:14
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:15
qw422016.N().DL(ts)
for i, timestamp := range xb.timestamps {
//line app/vmselect/prometheus/export.qtpl:15
value := xb.values[i]
//line app/vmselect/prometheus/export.qtpl:16
streamexportCSVField(qw422016, xb.mn, fieldNames[0], timestamp, value)
//line app/vmselect/prometheus/export.qtpl:17
for _, fieldName := range fieldNames[1:] {
//line app/vmselect/prometheus/export.qtpl:17
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:19
streamexportCSVField(qw422016, xb.mn, fieldName, timestamp, value)
//line app/vmselect/prometheus/export.qtpl:20
}
//line app/vmselect/prometheus/export.qtpl:21
qw422016.N().S(`
`)
//line app/vmselect/prometheus/export.qtpl:16
//line app/vmselect/prometheus/export.qtpl:22
}
//line app/vmselect/prometheus/export.qtpl:17
//line app/vmselect/prometheus/export.qtpl:23
}
//line app/vmselect/prometheus/export.qtpl:23
func WriteExportCSVLine(qq422016 qtio422016.Writer, xb *exportBlock, fieldNames []string) {
//line app/vmselect/prometheus/export.qtpl:23
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:23
StreamExportCSVLine(qw422016, xb, fieldNames)
//line app/vmselect/prometheus/export.qtpl:23
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:23
}
//line app/vmselect/prometheus/export.qtpl:23
func ExportCSVLine(xb *exportBlock, fieldNames []string) string {
//line app/vmselect/prometheus/export.qtpl:23
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:23
WriteExportCSVLine(qb422016, xb, fieldNames)
//line app/vmselect/prometheus/export.qtpl:23
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:23
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:23
return qs422016
//line app/vmselect/prometheus/export.qtpl:23
}
//line app/vmselect/prometheus/export.qtpl:25
func streamexportCSVField(qw422016 *qt422016.Writer, mn *storage.MetricName, fieldName string, timestamp int64, value float64) {
//line app/vmselect/prometheus/export.qtpl:26
if fieldName == "__value__" {
//line app/vmselect/prometheus/export.qtpl:27
qw422016.N().F(value)
//line app/vmselect/prometheus/export.qtpl:28
return
//line app/vmselect/prometheus/export.qtpl:29
}
//line app/vmselect/prometheus/export.qtpl:30
if fieldName == "__timestamp__" {
//line app/vmselect/prometheus/export.qtpl:31
qw422016.N().DL(timestamp)
//line app/vmselect/prometheus/export.qtpl:32
return
//line app/vmselect/prometheus/export.qtpl:33
}
//line app/vmselect/prometheus/export.qtpl:34
if strings.HasPrefix(fieldName, "__timestamp__:") {
//line app/vmselect/prometheus/export.qtpl:35
timeFormat := fieldName[len("__timestamp__:"):]
//line app/vmselect/prometheus/export.qtpl:36
switch timeFormat {
//line app/vmselect/prometheus/export.qtpl:37
case "unix_s":
//line app/vmselect/prometheus/export.qtpl:38
qw422016.N().DL(timestamp / 1000)
//line app/vmselect/prometheus/export.qtpl:39
case "unix_ms":
//line app/vmselect/prometheus/export.qtpl:40
qw422016.N().DL(timestamp)
//line app/vmselect/prometheus/export.qtpl:41
case "unix_ns":
//line app/vmselect/prometheus/export.qtpl:42
qw422016.N().DL(timestamp * 1e6)
//line app/vmselect/prometheus/export.qtpl:43
case "rfc3339":
//line app/vmselect/prometheus/export.qtpl:45
bb := quicktemplate.AcquireByteBuffer()
bb.B = time.Unix(timestamp/1000, (timestamp%1000)*1e6).AppendFormat(bb.B[:0], time.RFC3339)
//line app/vmselect/prometheus/export.qtpl:48
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:50
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:18
}
//line app/vmselect/prometheus/export.qtpl:18
func WriteExportPrometheusLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:18
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:18
StreamExportPrometheusLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:18
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:18
}
//line app/vmselect/prometheus/export.qtpl:18
func ExportPrometheusLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:18
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:18
WriteExportPrometheusLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:18
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:18
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:18
return qs422016
//line app/vmselect/prometheus/export.qtpl:18
}
//line app/vmselect/prometheus/export.qtpl:20
func StreamExportJSONLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:21
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:21
return
//line app/vmselect/prometheus/export.qtpl:21
}
//line app/vmselect/prometheus/export.qtpl:21
qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:23
streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:23
qw422016.N().S(`,"values":[`)
//line app/vmselect/prometheus/export.qtpl:25
if len(xb.values) > 0 {
//line app/vmselect/prometheus/export.qtpl:26
values := xb.values
//line app/vmselect/prometheus/export.qtpl:27
qw422016.N().F(values[0])
//line app/vmselect/prometheus/export.qtpl:28
values = values[1:]
//line app/vmselect/prometheus/export.qtpl:29
for _, v := range values {
//line app/vmselect/prometheus/export.qtpl:29
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:30
qw422016.N().F(v)
//line app/vmselect/prometheus/export.qtpl:31
}
//line app/vmselect/prometheus/export.qtpl:32
}
//line app/vmselect/prometheus/export.qtpl:32
qw422016.N().S(`],"timestamps":[`)
//line app/vmselect/prometheus/export.qtpl:35
if len(xb.timestamps) > 0 {
//line app/vmselect/prometheus/export.qtpl:36
timestamps := xb.timestamps
//line app/vmselect/prometheus/export.qtpl:37
qw422016.N().DL(timestamps[0])
//line app/vmselect/prometheus/export.qtpl:38
timestamps = timestamps[1:]
//line app/vmselect/prometheus/export.qtpl:39
for _, ts := range timestamps {
//line app/vmselect/prometheus/export.qtpl:39
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:40
qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:41
}
//line app/vmselect/prometheus/export.qtpl:42
}
//line app/vmselect/prometheus/export.qtpl:42
qw422016.N().S(`]}`)
//line app/vmselect/prometheus/export.qtpl:44
qw422016.N().S(`
`)
//line app/vmselect/prometheus/export.qtpl:45
}
//line app/vmselect/prometheus/export.qtpl:45
func WriteExportJSONLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:45
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:45
StreamExportJSONLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:45
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:45
}
//line app/vmselect/prometheus/export.qtpl:45
func ExportJSONLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:45
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:45
WriteExportJSONLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:45
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:45
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:45
return qs422016
//line app/vmselect/prometheus/export.qtpl:45
}
//line app/vmselect/prometheus/export.qtpl:47
func StreamExportPromAPILine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:47
qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:49
streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:49
qw422016.N().S(`,"values":`)
//line app/vmselect/prometheus/export.qtpl:50
streamvaluesWithTimestamps(qw422016, xb.values, xb.timestamps)
//line app/vmselect/prometheus/export.qtpl:50
qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:52
}
default:
//line app/vmselect/prometheus/export.qtpl:53
if strings.HasPrefix(timeFormat, "custom:") {
//line app/vmselect/prometheus/export.qtpl:55
layout := timeFormat[len("custom:"):]
bb := quicktemplate.AcquireByteBuffer()
bb.B = time.Unix(timestamp/1000, (timestamp%1000)*1e6).AppendFormat(bb.B[:0], layout)
//line app/vmselect/prometheus/export.qtpl:52
func WriteExportPromAPILine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:52
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:52
StreamExportPromAPILine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:52
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:52
}
//line app/vmselect/prometheus/export.qtpl:52
func ExportPromAPILine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:52
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:52
WriteExportPromAPILine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:52
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:52
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:52
return qs422016
//line app/vmselect/prometheus/export.qtpl:52
}
//line app/vmselect/prometheus/export.qtpl:54
func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:54
qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`)
//line app/vmselect/prometheus/export.qtpl:59
if bytes.ContainsAny(bb.B, `"`+",\n") {
//line app/vmselect/prometheus/export.qtpl:60
bb, ok := <-resultsCh
qw422016.E().QZ(bb.B)
//line app/vmselect/prometheus/export.qtpl:61
if ok {
} else {
//line app/vmselect/prometheus/export.qtpl:62
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:63
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:64
for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:64
qw422016.N().S(`,`)
}
//line app/vmselect/prometheus/export.qtpl:65
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:66
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:67
}
} else {
//line app/vmselect/prometheus/export.qtpl:67
qw422016.N().S(`Unsupported timeFormat=`)
//line app/vmselect/prometheus/export.qtpl:68
qw422016.N().S(timeFormat)
//line app/vmselect/prometheus/export.qtpl:69
}
//line app/vmselect/prometheus/export.qtpl:68
qw422016.N().S(`]}}`)
//line app/vmselect/prometheus/export.qtpl:72
}
//line app/vmselect/prometheus/export.qtpl:72
func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:72
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:72
StreamExportPromAPIResponse(qw422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:72
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:72
}
//line app/vmselect/prometheus/export.qtpl:72
func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
//line app/vmselect/prometheus/export.qtpl:72
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:72
WriteExportPromAPIResponse(qb422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:72
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:72
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:72
return qs422016
//line app/vmselect/prometheus/export.qtpl:70
}
//line app/vmselect/prometheus/export.qtpl:71
return
//line app/vmselect/prometheus/export.qtpl:72
}
//line app/vmselect/prometheus/export.qtpl:73
v := mn.GetTagValue(fieldName)
//line app/vmselect/prometheus/export.qtpl:74
func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
if bytes.ContainsAny(v, `"`+",\n") {
//line app/vmselect/prometheus/export.qtpl:75
for bb := range resultsCh {
qw422016.N().QZ(v)
//line app/vmselect/prometheus/export.qtpl:76
qw422016.N().Z(bb.B)
} else {
//line app/vmselect/prometheus/export.qtpl:77
quicktemplate.ReleaseByteBuffer(bb)
qw422016.N().Z(v)
//line app/vmselect/prometheus/export.qtpl:78
}
//line app/vmselect/prometheus/export.qtpl:79
}
//line app/vmselect/prometheus/export.qtpl:79
func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
func writeexportCSVField(qq422016 qtio422016.Writer, mn *storage.MetricName, fieldName string, timestamp int64, value float64) {
//line app/vmselect/prometheus/export.qtpl:79
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:79
StreamExportStdResponse(qw422016, resultsCh)
streamexportCSVField(qw422016, mn, fieldName, timestamp, value)
//line app/vmselect/prometheus/export.qtpl:79
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:79
}
//line app/vmselect/prometheus/export.qtpl:79
func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
func exportCSVField(mn *storage.MetricName, fieldName string, timestamp int64, value float64) string {
//line app/vmselect/prometheus/export.qtpl:79
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:79
WriteExportStdResponse(qb422016, resultsCh)
writeexportCSVField(qb422016, mn, fieldName, timestamp, value)
//line app/vmselect/prometheus/export.qtpl:79
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:79
@ -316,69 +211,361 @@ func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
}
//line app/vmselect/prometheus/export.qtpl:81
func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) {
func StreamExportPrometheusLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:82
qw422016.N().Z(mn.MetricGroup)
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:82
return
//line app/vmselect/prometheus/export.qtpl:82
}
//line app/vmselect/prometheus/export.qtpl:83
if len(mn.Tags) > 0 {
//line app/vmselect/prometheus/export.qtpl:83
qw422016.N().S(`{`)
bb := quicktemplate.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:84
writeprometheusMetricName(bb, xb.mn)
//line app/vmselect/prometheus/export.qtpl:85
for i, ts := range xb.timestamps {
//line app/vmselect/prometheus/export.qtpl:86
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:86
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:87
qw422016.N().F(xb.values[i])
//line app/vmselect/prometheus/export.qtpl:87
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:88
qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:88
qw422016.N().S(`
`)
//line app/vmselect/prometheus/export.qtpl:89
}
//line app/vmselect/prometheus/export.qtpl:90
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:91
}
//line app/vmselect/prometheus/export.qtpl:91
func WriteExportPrometheusLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:91
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:91
StreamExportPrometheusLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:91
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:91
}
//line app/vmselect/prometheus/export.qtpl:91
func ExportPrometheusLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:91
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:91
WriteExportPrometheusLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:91
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:91
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:91
return qs422016
//line app/vmselect/prometheus/export.qtpl:91
}
//line app/vmselect/prometheus/export.qtpl:93
func StreamExportJSONLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:94
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:94
return
//line app/vmselect/prometheus/export.qtpl:94
}
//line app/vmselect/prometheus/export.qtpl:94
qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:96
streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:96
qw422016.N().S(`,"values":[`)
//line app/vmselect/prometheus/export.qtpl:98
if len(xb.values) > 0 {
//line app/vmselect/prometheus/export.qtpl:99
values := xb.values
//line app/vmselect/prometheus/export.qtpl:100
qw422016.N().F(values[0])
//line app/vmselect/prometheus/export.qtpl:101
values = values[1:]
//line app/vmselect/prometheus/export.qtpl:102
for _, v := range values {
//line app/vmselect/prometheus/export.qtpl:102
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:103
qw422016.N().F(v)
//line app/vmselect/prometheus/export.qtpl:104
}
//line app/vmselect/prometheus/export.qtpl:105
}
//line app/vmselect/prometheus/export.qtpl:105
qw422016.N().S(`],"timestamps":[`)
//line app/vmselect/prometheus/export.qtpl:108
if len(xb.timestamps) > 0 {
//line app/vmselect/prometheus/export.qtpl:109
timestamps := xb.timestamps
//line app/vmselect/prometheus/export.qtpl:110
qw422016.N().DL(timestamps[0])
//line app/vmselect/prometheus/export.qtpl:111
timestamps = timestamps[1:]
//line app/vmselect/prometheus/export.qtpl:112
for _, ts := range timestamps {
//line app/vmselect/prometheus/export.qtpl:112
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:113
qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:114
}
//line app/vmselect/prometheus/export.qtpl:115
}
//line app/vmselect/prometheus/export.qtpl:115
qw422016.N().S(`]}`)
//line app/vmselect/prometheus/export.qtpl:117
qw422016.N().S(`
`)
//line app/vmselect/prometheus/export.qtpl:118
}
//line app/vmselect/prometheus/export.qtpl:118
func WriteExportJSONLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:118
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:118
StreamExportJSONLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:118
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:118
}
//line app/vmselect/prometheus/export.qtpl:118
func ExportJSONLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:118
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:118
WriteExportJSONLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:118
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:118
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:118
return qs422016
//line app/vmselect/prometheus/export.qtpl:118
}
//line app/vmselect/prometheus/export.qtpl:120
func StreamExportPromAPILine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:120
qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:122
streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:122
qw422016.N().S(`,"values":`)
//line app/vmselect/prometheus/export.qtpl:123
streamvaluesWithTimestamps(qw422016, xb.values, xb.timestamps)
//line app/vmselect/prometheus/export.qtpl:123
qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:125
}
//line app/vmselect/prometheus/export.qtpl:125
func WriteExportPromAPILine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:125
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:125
StreamExportPromAPILine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:125
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:125
}
//line app/vmselect/prometheus/export.qtpl:125
func ExportPromAPILine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:125
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:125
WriteExportPromAPILine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:125
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:125
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:125
return qs422016
//line app/vmselect/prometheus/export.qtpl:125
}
//line app/vmselect/prometheus/export.qtpl:127
func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:127
qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`)
//line app/vmselect/prometheus/export.qtpl:133
bb, ok := <-resultsCh
//line app/vmselect/prometheus/export.qtpl:134
if ok {
//line app/vmselect/prometheus/export.qtpl:135
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:136
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:137
for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:137
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:138
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:139
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:140
}
//line app/vmselect/prometheus/export.qtpl:141
}
//line app/vmselect/prometheus/export.qtpl:141
qw422016.N().S(`]}}`)
//line app/vmselect/prometheus/export.qtpl:145
}
//line app/vmselect/prometheus/export.qtpl:145
func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:145
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:145
StreamExportPromAPIResponse(qw422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:145
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:145
}
//line app/vmselect/prometheus/export.qtpl:145
func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
//line app/vmselect/prometheus/export.qtpl:145
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:145
WriteExportPromAPIResponse(qb422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:145
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:145
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:145
return qs422016
//line app/vmselect/prometheus/export.qtpl:145
}
//line app/vmselect/prometheus/export.qtpl:147
func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:148
for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:149
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:150
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:151
}
//line app/vmselect/prometheus/export.qtpl:152
}
//line app/vmselect/prometheus/export.qtpl:152
func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:152
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:152
StreamExportStdResponse(qw422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:152
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:152
}
//line app/vmselect/prometheus/export.qtpl:152
func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
//line app/vmselect/prometheus/export.qtpl:152
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:152
WriteExportStdResponse(qb422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:152
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:152
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:152
return qs422016
//line app/vmselect/prometheus/export.qtpl:152
}
//line app/vmselect/prometheus/export.qtpl:154
func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:155
qw422016.N().Z(mn.MetricGroup)
//line app/vmselect/prometheus/export.qtpl:156
if len(mn.Tags) > 0 {
//line app/vmselect/prometheus/export.qtpl:156
qw422016.N().S(`{`)
//line app/vmselect/prometheus/export.qtpl:158
tags := mn.Tags
//line app/vmselect/prometheus/export.qtpl:86
//line app/vmselect/prometheus/export.qtpl:159
qw422016.N().Z(tags[0].Key)
//line app/vmselect/prometheus/export.qtpl:86
//line app/vmselect/prometheus/export.qtpl:159
qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:86
//line app/vmselect/prometheus/export.qtpl:159
qw422016.N().QZ(tags[0].Value)
//line app/vmselect/prometheus/export.qtpl:87
//line app/vmselect/prometheus/export.qtpl:160
tags = tags[1:]
//line app/vmselect/prometheus/export.qtpl:88
//line app/vmselect/prometheus/export.qtpl:161
for i := range tags {
//line app/vmselect/prometheus/export.qtpl:89
//line app/vmselect/prometheus/export.qtpl:162
tag := &tags[i]
//line app/vmselect/prometheus/export.qtpl:89
//line app/vmselect/prometheus/export.qtpl:162
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:90
//line app/vmselect/prometheus/export.qtpl:163
qw422016.N().Z(tag.Key)
//line app/vmselect/prometheus/export.qtpl:90
//line app/vmselect/prometheus/export.qtpl:163
qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:90
//line app/vmselect/prometheus/export.qtpl:163
qw422016.N().QZ(tag.Value)
//line app/vmselect/prometheus/export.qtpl:91
//line app/vmselect/prometheus/export.qtpl:164
}
//line app/vmselect/prometheus/export.qtpl:91
//line app/vmselect/prometheus/export.qtpl:164
qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:93
//line app/vmselect/prometheus/export.qtpl:166
}
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
}
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
streamprometheusMetricName(qw422016, mn)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
}
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
func prometheusMetricName(mn *storage.MetricName) string {
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
writeprometheusMetricName(qb422016, mn)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
return qs422016
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:167
}

View File

@ -8,6 +8,7 @@ import (
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -120,6 +121,95 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`)
// ExportCSVHandler exports data in CSV format from /api/v1/export/csv
func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %w", err)
}
format := r.FormValue("format")
if len(format) == 0 {
return fmt.Errorf("missing `format` arg; see https://victoriametrics.github.io/#how-to-export-csv-data")
}
fieldNames := strings.Split(format, ",")
matches := r.Form["match[]"]
if len(matches) == 0 {
// Maintain backwards compatibility
match := r.FormValue("match")
if len(match) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
matches = []string{match}
}
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
return err
}
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
deadline := searchutils.GetDeadlineForExport(r, startTime)
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return err
}
sq := &storage.SearchQuery{
AccountID: at.AccountID,
ProjectID: at.ProjectID,
MinTimestamp: start,
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
w.Header().Set("Content-Type", "text/csv")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
doneCh := make(chan error)
go func() {
isPartial, err := netstorage.ExportBlocks(at, sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := bw.Error(); err != nil {
return err
}
if err := b.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block during export: %s", err)
}
xb := exportBlockPool.Get().(*exportBlock)
xb.mn = mn
xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr)
if len(xb.timestamps) > 0 {
bb := quicktemplate.AcquireByteBuffer()
WriteExportCSVLine(bb, xb, fieldNames)
resultsCh <- bb
}
xb.reset()
exportBlockPool.Put(xb)
return nil
})
if err == nil && isPartial && searchutils.GetDenyPartialResponse(r) {
err = fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
close(resultsCh)
doneCh <- err
}()
for bb := range resultsCh {
bw.Write(bb.B)
quicktemplate.ReleaseByteBuffer(bb)
}
if err := bw.Flush(); err != nil {
return err
}
err = <-doneCh
if err != nil {
return fmt.Errorf("error during exporting data to csv: %w", err)
}
exportCSVDuration.UpdateDuration(startTime)
return nil
}
var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/csv"}`)
// ExportNativeHandler exports data in native format from /api/v1/export/native.
func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6

View File

@ -118,6 +118,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to export time series](#how-to-export-time-series)
* [How to export data in native format](#how-to-export-data-in-native-format)
* [How to export data in JSON line format](#how-to-export-data-in-json-line-format)
* [How to export CSV data](#how-to-export-csv-data)
* [How to import time series data](#how-to-import-time-series-data)
* [How to import data in native format](#how-to-import-data-in-native-format)
* [How to import data in json line format](#how-to-import-data-in-json-line-format)
@ -683,6 +684,7 @@ VictoriaMetrics provides the following handlers for exporting data:
* `/api/v1/export/native` for exporting data in native binary format. This is the most efficient format for data export.
See [these docs](#how-to-export-data-in-native-format) for details.
* `/api/v1/export` for exporing data in JSON line format. See [these docs](#how-to-export-data-in-json-line-format) for details.
* `/api/v1/export/csv` for exporting data in CSV. See [these docs](#how-to-export-csv-data) for details.
#### How to export data in native format
@ -732,6 +734,30 @@ The maximum duration for each request to `/api/v1/export` is limited by `-search
Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-data-in-json-line-format).
#### How to export CSV data
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export/csv?format=<format>&match=<timeseries_selector_for_export>`,
where:
* `<format>` must contain comma-delimited label names for the exported CSV. The following special label names are supported:
* `__name__` - metric name
* `__value__` - sample value
* `__timestamp__:<ts_format>` - sample timestamp. `<ts_format>` can have the following values:
* `unix_s` - unix seconds
* `unix_ms` - unix milliseconds
* `unix_ns` - unix nanoseconds
* `rfc3339` - [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) time
* `custom:<layout>` - custom layout for time that is supported by [time.Format](https://golang.org/pkg/time/#Time.Format) function from Go.
* `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
for metrics to export.
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
The exported CSV data can be imported to VictoriaMetrics via [/api/v1/import/csv](#how-to-import-csv-data).
### How to import time series data
Time series data can be imported via any supported ingestion protocol: