all: add native format for data export and import

The data can be exported via [/api/v1/export/native](https://victoriametrics.github.io/#how-to-export-data-in-native-format) handler
and imported via [/api/v1/import/native](https://victoriametrics.github.io/#how-to-import-data-in-native-format) handler.
This commit is contained in:
Aliaksandr Valialkin 2020-09-26 04:29:45 +03:00
parent 00ec2b7189
commit aadbd014ff
23 changed files with 1340 additions and 449 deletions

View File

@ -177,6 +177,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag.
See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#sending-opentsdb-data-via-http-apiput-requests) for details.
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` on `vmselect` (see below).
- `prometheus/api/v1/import/native` - for importing data obtained via `api/v1/export/native` on `vmselect` (see below).
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data) for details.
- `prometheus/api/v1/import/prometheus` - for importing data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details.
@ -189,7 +190,8 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `api/v1/labels` - returns a [list of label names](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names).
- `api/v1/label/<label_name>/values` - returns values for the given `<label_name>` according [to API](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values).
- `federate` - returns [federated metrics](https://prometheus.io/docs/prometheus/latest/federation/).
- `api/v1/export` - exports raw data. See [this article](https://medium.com/@valyala/analyzing-prometheus-data-with-external-tools-5f3e5e147639) for details.
- `api/v1/export` - exports raw data in JSON line format. See [this article](https://medium.com/@valyala/analyzing-prometheus-data-with-external-tools-5f3e5e147639) for details.
- `api/v1/export/native` - exports raw data in native binary format. It may be imported into another VictoriaMetrics via `api/v1/import/native` (see above).
- `api/v1/status/tsdb` - for time series stats. See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details.
- `api/v1/status/active_queries` - for currently executed active queries. Note that every `vmselect` maintains an independent list of active queries,
which is returned in the response.

View File

@ -25,7 +25,8 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents).
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data).
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-json-line-format).
* Native data import protocol via `http://<vmagent>:8429/api/v1/import/native`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-native-format).
* Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details.
* Arbitrary CSV data via `http://<vmagent>:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data).
* Can replicate collected metrics simultaneously to multiple remote storage systems.

View File

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport"
@ -171,6 +172,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusNoContent)
return true
case "/api/v1/import/native":
nativeimportRequests.Inc()
if err := native.InsertHandler(r); err != nil {
nativeimportErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "/write", "/api/v2/write":
influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(r); err != nil {
@ -213,6 +223,9 @@ var (
prometheusimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
prometheusimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/prometheus", protocol="prometheusimport"}`)
nativeimportRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/import/native", protocol="nativeimport"}`)
nativeimportErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/import/native", protocol="nativeimport"}`)
influxWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/write", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/write", protocol="influx"}`)

View File

@ -0,0 +1,82 @@
package native
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="native"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="native"}`)
)
// InsertHandler processes `/api/v1/import` request.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(block *parser.Block) error {
return insertRows(block, extraLabels)
})
})
}
func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
mn := &block.MetricName
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: bytesutil.ToUnsafeString(mn.MetricGroup),
})
for j := range mn.Tags {
tag := &mn.Tags[j]
labels = append(labels, prompbmarshal.Label{
Name: bytesutil.ToUnsafeString(tag.Key),
Value: bytesutil.ToUnsafeString(tag.Value),
})
}
labels = append(labels, extraLabels...)
values := block.Values
timestamps := block.Timestamps
if len(timestamps) != len(values) {
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
}
samplesLen := len(samples)
for j, value := range values {
samples = append(samples, prompbmarshal.Sample{
Value: value,
Timestamp: timestamps[j],
})
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
rowsTotal := len(values)
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View File

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
@ -54,7 +55,9 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
labels = append(labels, extraLabels...)
values := r.Values
timestamps := r.Timestamps
_ = timestamps[len(values)-1]
if len(timestamps) != len(values) {
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
}
samplesLen := len(samples)
for j, value := range values {
samples = append(samples, prompbmarshal.Sample{

View File

@ -1 +1 @@
`vminsert` routes the ingested data to `vmstorage` nodes.
`vminsert` routes the ingested data to `vmstorage`.

View File

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
@ -189,6 +190,15 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/import/native":
nativeimportRequests.Inc()
if err := native.InsertHandler(at, r); err != nil {
nativeimportErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "influx/write", "influx/api/v2/write":
influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(at, r); err != nil {
@ -223,6 +233,9 @@ var (
prometheusimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import/prometheus", protocol="prometheusimport"}`)
prometheusimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import/prometheus", protocol="prometheusimport"}`)
nativeimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import/native", protocol="nativeimport"}`)
nativeimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import/native", protocol="nativeimport"}`)
influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/influx/", protocol="influx"}`)
influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/influx/", protocol="influx"}`)

View File

@ -0,0 +1,78 @@
package native
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="native"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="native"}`)
)
// InsertHandler processes `/api/v1/import/native` request.
func InsertHandler(at *auth.Token, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(block *parser.Block) error {
return insertRows(at, block, extraLabels)
})
})
}
func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset() // This line is required for initializing ctx internals.
hasRelabeling := relabel.HasRelabeling()
mn := &block.MetricName
ctx.Labels = ctx.Labels[:0]
ctx.AddLabelBytes(nil, mn.MetricGroup)
for j := range mn.Tags {
tag := &mn.Tags[j]
ctx.AddLabelBytes(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
return nil
}
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels)
storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels)
values := block.Values
timestamps := block.Timestamps
if len(timestamps) != len(values) {
logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values))
}
for j, value := range values {
timestamp := timestamps[j]
if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
return err
}
}
rowsTotal := len(values)
rowsInserted.Get(at).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
}

View File

@ -281,6 +281,14 @@ func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request,
return true
}
return true
case "prometheus/api/v1/export/native":
exportNativeRequests.Inc()
if err := prometheus.ExportNativeHandler(startTime, at, w, r); err != nil {
exportNativeErrors.Inc()
httpserver.Errorf(w, r, "error in %q: %s", r.URL.Path, err)
return true
}
return true
case "prometheus/federate":
federateRequests.Inc()
if err := prometheus.FederateHandler(startTime, at, w, r); err != nil {
@ -401,6 +409,9 @@ var (
exportRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/export"}`)
exportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/prometheus/api/v1/export"}`)
exportNativeRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/api/v1/export/native"}`)
exportNativeErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/prometheus/api/v1/export/native"}`)
federateRequests = metrics.NewCounter(`vm_http_requests_total{path="/select/{}/prometheus/federate"}`)
federateErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/select/{}/prometheus/federate"}`)

View File

@ -15,7 +15,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
@ -399,29 +398,9 @@ func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, add
if err := tmpBlock.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block: %w", err)
}
timestamps := tmpBlock.Timestamps()
// Skip timestamps smaller than tr.MinTimestamp.
i := 0
for i < len(timestamps) && timestamps[i] < tr.MinTimestamp {
i++
}
// Skip timestamps bigger than tr.MaxTimestamp.
j := len(timestamps)
for j > i && timestamps[j-1] > tr.MaxTimestamp {
j--
}
skippedRows := tmpBlock.RowsCount() - (j - i)
sb.Timestamps, sb.Values = tmpBlock.AppendRowsWithTimeRangeFilter(sb.Timestamps[:0], sb.Values[:0], tr)
skippedRows := tmpBlock.RowsCount() - len(sb.Timestamps)
metricRowsSkipped.Add(skippedRows)
// Copy the remaining values.
if i == j {
return nil
}
values := tmpBlock.Values()
sb.Timestamps = append(sb.Timestamps, timestamps[i:j]...)
sb.Values = decimal.AppendDecimalToFloat(sb.Values, values[i:j], tmpBlock.Scale())
return nil
}
@ -1015,17 +994,68 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock)
return err
}
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
var metricNamePool = &sync.Pool{
New: func() interface{} {
return &storage.MetricName{}
},
}
// ExportBlocks searches for time series matching sq and calls f for each found block.
//
// f is called in parallel from multiple goroutines.
// the process is stopped if f return non-nil error.
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
// It is the responsibility of f to filter blocks according to the given tr.
func ExportBlocks(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.Deadline, f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) (bool, error) {
if deadline.Exceeded() {
return false, fmt.Errorf("timeout exceeded before starting data export: %s", deadline.String())
}
tr := storage.TimeRange{
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
processBlock := func(mb *storage.MetricBlock) error {
mn := metricNamePool.Get().(*storage.MetricName)
if err := mn.Unmarshal(mb.MetricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName: %w", err)
}
if err := f(mn, &mb.Block, tr); err != nil {
return err
}
mn.Reset()
metricNamePool.Put(mn)
return nil
}
isPartialResult, err := processSearchQuery(at, sq, true, processBlock, deadline)
if err != nil {
return true, fmt.Errorf("error occured during export: %w", err)
}
return isPartialResult, nil
}
type exportWork struct {
mn storage.MetricName
b storage.Block
}
func (xw *exportWork) reset() {
xw.mn.Reset()
xw.b.Reset()
}
var exportWorkPool = &sync.Pool{
New: func() interface{} {
return &exportWork{}
},
}
// ProcessSearchQuery performs sq until the given deadline.
//
// Results.RunParallel or Results.Cancel must be called on the returned Results.
func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline searchutils.Deadline) (*Results, bool, error) {
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
requestData := sq.Marshal(nil)
// Send the query to all the storage nodes in parallel.
resultsCh := make(chan error, len(storageNodes))
tr := storage.TimeRange{
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
@ -1034,10 +1064,52 @@ func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool,
tbf: getTmpBlocksFile(),
m: make(map[string][]tmpBlockAddr),
}
processBlock := func(mb *storage.MetricBlock) error {
if !fetchData {
tbfw.RegisterEmptyBlock(mb)
return nil
}
if err := tbfw.RegisterAndWriteBlock(mb); err != nil {
return fmt.Errorf("cannot write MetricBlock to temporary blocks file: %w", err)
}
return nil
}
isPartialResult, err := processSearchQuery(at, sq, fetchData, processBlock, deadline)
if err != nil {
putTmpBlocksFile(tbfw.tbf)
return nil, true, fmt.Errorf("error occured during search: %w", err)
}
if err := tbfw.tbf.Finalize(); err != nil {
putTmpBlocksFile(tbfw.tbf)
return nil, false, fmt.Errorf("cannot finalize temporary blocks file with %d time series: %w", len(tbfw.m), err)
}
var rss Results
rss.at = at
rss.tr = tr
rss.fetchData = fetchData
rss.deadline = deadline
rss.tbf = tbfw.tbf
pts := make([]packedTimeseries, len(tbfw.orderedMetricNames))
for i, metricName := range tbfw.orderedMetricNames {
pts[i] = packedTimeseries{
metricName: metricName,
addrs: tbfw.m[metricName],
}
}
rss.packedTimeseries = pts
return &rss, isPartialResult, nil
}
func processSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) (bool, error) {
requestData := sq.Marshal(nil)
// Send the query to all the storage nodes in parallel.
resultsCh := make(chan error, len(storageNodes))
for _, sn := range storageNodes {
go func(sn *storageNode) {
sn.searchRequests.Inc()
err := sn.processSearchQuery(tbfw, requestData, tr, fetchData, deadline)
err := sn.processSearchQuery(requestData, fetchData, processBlock, deadline)
if err != nil {
sn.searchRequestErrors.Inc()
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
@ -1061,38 +1133,18 @@ func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool,
if len(errors) > 0 {
if len(errors) == len(storageNodes) {
// Return only the first error, since it has no sense in returning all errors.
putTmpBlocksFile(tbfw.tbf)
return nil, true, fmt.Errorf("error occured during search: %w", errors[0])
return true, errors[0]
}
// Just return partial results.
// This allows gracefully degrade vmselect in the case
// if certain storageNodes are temporarily unavailable.
// Do not log the error, since it may spam logs on busy vmselect
// Do not return the error, since it may spam logs on busy vmselect
// serving high amount of requests.
partialSearchResults.Inc()
isPartialResult = true
}
if err := tbfw.tbf.Finalize(); err != nil {
putTmpBlocksFile(tbfw.tbf)
return nil, false, fmt.Errorf("cannot finalize temporary blocks file with %d time series: %w", len(tbfw.m), err)
}
var rss Results
rss.at = at
rss.tr = tr
rss.fetchData = fetchData
rss.deadline = deadline
rss.tbf = tbfw.tbf
pts := make([]packedTimeseries, len(tbfw.orderedMetricNames))
for i, metricName := range tbfw.orderedMetricNames {
pts[i] = packedTimeseries{
metricName: metricName,
addrs: tbfw.m[metricName],
}
}
rss.packedTimeseries = pts
return &rss, isPartialResult, nil
return isPartialResult, nil
}
type storageNode struct {
@ -1297,10 +1349,10 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline sear
return n, nil
}
func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestData []byte, tr storage.TimeRange, fetchData bool, deadline searchutils.Deadline) error {
func (sn *storageNode) processSearchQuery(requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error {
var blocksRead int
f := func(bc *handshake.BufferedConn) error {
n, err := sn.processSearchQueryOnConn(tbfw, bc, requestData, tr, fetchData)
n, err := sn.processSearchQueryOnConn(bc, requestData, fetchData, processBlock)
if err != nil {
return err
}
@ -1717,7 +1769,7 @@ const maxMetricBlockSize = 1024 * 1024
// from vmstorage.
const maxErrorMessageSize = 64 * 1024
func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc *handshake.BufferedConn, requestData []byte, tr storage.TimeRange, fetchData bool) (int, error) {
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, fetchData bool, processBlock func(mb *storage.MetricBlock) error) (int, error) {
// Send the request to sn.
if err := writeBytes(bc, requestData); err != nil {
return 0, fmt.Errorf("cannot write requestData: %w", err)
@ -1763,12 +1815,8 @@ func (sn *storageNode) processSearchQueryOnConn(tbfw *tmpBlocksFileWrapper, bc *
blocksRead++
sn.metricBlocksRead.Inc()
sn.metricRowsRead.Add(mb.Block.RowsCount())
if !fetchData {
tbfw.RegisterEmptyBlock(&mb)
continue
}
if err := tbfw.RegisterAndWriteBlock(&mb); err != nil {
return blocksRead, fmt.Errorf("cannot write MetricBlock #%d to temporary blocks file: %w", blocksRead, err)
if err := processBlock(&mb); err != nil {
return blocksRead, fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err)
}
}
}

View File

@ -69,6 +69,10 @@ func testTmpBlocksFile() error {
_, _, _ = b.MarshalData(0, 0)
return &b
}
tr := storage.TimeRange{
MinTimestamp: 0,
MaxTimestamp: 1<<63 - 1,
}
for _, size := range []int{1024, 16 * 1024, maxInmemoryTmpBlocksFile() / 2, 2 * maxInmemoryTmpBlocksFile()} {
err := func() error {
tbf := getTmpBlocksFile()
@ -118,11 +122,13 @@ func testTmpBlocksFile() error {
if b1.RowsCount() != b.RowsCount() {
return fmt.Errorf("unexpected number of rows in tbf block; got %d; want %d", b1.RowsCount(), b.RowsCount())
}
if !reflect.DeepEqual(b1.Timestamps(), b.Timestamps()) {
return fmt.Errorf("unexpected timestamps; got\n%v\nwant\n%v", b1.Timestamps(), b.Timestamps())
timestamps1, values1 := b1.AppendRowsWithTimeRangeFilter(nil, nil, tr)
timestamps, values := b.AppendRowsWithTimeRangeFilter(nil, nil, tr)
if !reflect.DeepEqual(timestamps1, timestamps) {
return fmt.Errorf("unexpected timestamps; got\n%v\nwant\n%v", timestamps1, timestamps)
}
if !reflect.DeepEqual(b1.Values(), b.Values()) {
return fmt.Errorf("unexpected values; got\n%v\nwant\n%v", b1.Values(), b.Values())
if !reflect.DeepEqual(values1, values) {
return fmt.Errorf("unexpected values; got\n%v\nwant\n%v", values1, values)
}
}
return nil

View File

@ -1,30 +1,29 @@
{% import (
"github.com/valyala/quicktemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
) %}
{% stripspace %}
{% func ExportPrometheusLine(rs *netstorage.Result) %}
{% if len(rs.Timestamps) == 0 %}{% return %}{% endif %}
{% func ExportPrometheusLine(xb *exportBlock) %}
{% if len(xb.timestamps) == 0 %}{% return %}{% endif %}
{% code bb := quicktemplate.AcquireByteBuffer() %}
{% code writeprometheusMetricName(bb, &rs.MetricName) %}
{% for i, ts := range rs.Timestamps %}
{% code writeprometheusMetricName(bb, xb.mn) %}
{% for i, ts := range xb.timestamps %}
{%z= bb.B %}{% space %}
{%f= rs.Values[i] %}{% space %}
{%f= xb.values[i] %}{% space %}
{%dl= ts %}{% newline %}
{% endfor %}
{% code quicktemplate.ReleaseByteBuffer(bb) %}
{% endfunc %}
{% func ExportJSONLine(rs *netstorage.Result) %}
{% if len(rs.Timestamps) == 0 %}{% return %}{% endif %}
{% func ExportJSONLine(xb *exportBlock) %}
{% if len(xb.timestamps) == 0 %}{% return %}{% endif %}
{
"metric":{%= metricNameObject(&rs.MetricName) %},
"metric":{%= metricNameObject(xb.mn) %},
"values":[
{% if len(rs.Values) > 0 %}
{% code values := rs.Values %}
{% if len(xb.values) > 0 %}
{% code values := xb.values %}
{%f= values[0] %}
{% code values = values[1:] %}
{% for _, v := range values %}
@ -33,8 +32,8 @@
{% endif %}
],
"timestamps":[
{% if len(rs.Timestamps) > 0 %}
{% code timestamps := rs.Timestamps %}
{% if len(xb.timestamps) > 0 %}
{% code timestamps := xb.timestamps %}
{%dl= timestamps[0] %}
{% code timestamps = timestamps[1:] %}
{% for _, ts := range timestamps %}
@ -45,10 +44,10 @@
}{% newline %}
{% endfunc %}
{% func ExportPromAPILine(rs *netstorage.Result) %}
{% func ExportPromAPILine(xb *exportBlock) %}
{
"metric": {%= metricNameObject(&rs.MetricName) %},
"values": {%= valuesWithTimestamps(rs.Values, rs.Timestamps) %}
"metric": {%= metricNameObject(xb.mn) %},
"values": {%= valuesWithTimestamps(xb.values, xb.timestamps) %}
}
{% endfunc %}

View File

@ -6,380 +6,379 @@ package prometheus
//line app/vmselect/prometheus/export.qtpl:1
import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/export.qtpl:9
//line app/vmselect/prometheus/export.qtpl:8
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/export.qtpl:9
//line app/vmselect/prometheus/export.qtpl:8
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/prometheus/export.qtpl:8
func StreamExportPrometheusLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:9
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:9
func StreamExportPrometheusLine(qw422016 *qt422016.Writer, rs *netstorage.Result) {
//line app/vmselect/prometheus/export.qtpl:10
if len(rs.Timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:10
return
//line app/vmselect/prometheus/export.qtpl:10
//line app/vmselect/prometheus/export.qtpl:9
}
//line app/vmselect/prometheus/export.qtpl:11
//line app/vmselect/prometheus/export.qtpl:10
bb := quicktemplate.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:12
writeprometheusMetricName(bb, &rs.MetricName)
//line app/vmselect/prometheus/export.qtpl:11
writeprometheusMetricName(bb, xb.mn)
//line app/vmselect/prometheus/export.qtpl:12
for i, ts := range xb.timestamps {
//line app/vmselect/prometheus/export.qtpl:13
for i, ts := range rs.Timestamps {
//line app/vmselect/prometheus/export.qtpl:14
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:13
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:14
qw422016.N().F(xb.values[i])
//line app/vmselect/prometheus/export.qtpl:14
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:15
qw422016.N().F(rs.Values[i])
//line app/vmselect/prometheus/export.qtpl:15
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:16
qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:16
//line app/vmselect/prometheus/export.qtpl:15
qw422016.N().S(`
`)
//line app/vmselect/prometheus/export.qtpl:17
//line app/vmselect/prometheus/export.qtpl:16
}
//line app/vmselect/prometheus/export.qtpl:18
//line app/vmselect/prometheus/export.qtpl:17
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:19
//line app/vmselect/prometheus/export.qtpl:18
}
//line app/vmselect/prometheus/export.qtpl:19
func WriteExportPrometheusLine(qq422016 qtio422016.Writer, rs *netstorage.Result) {
//line app/vmselect/prometheus/export.qtpl:19
//line app/vmselect/prometheus/export.qtpl:18
func WriteExportPrometheusLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:18
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:19
StreamExportPrometheusLine(qw422016, rs)
//line app/vmselect/prometheus/export.qtpl:19
//line app/vmselect/prometheus/export.qtpl:18
StreamExportPrometheusLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:18
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:19
//line app/vmselect/prometheus/export.qtpl:18
}
//line app/vmselect/prometheus/export.qtpl:19
func ExportPrometheusLine(rs *netstorage.Result) string {
//line app/vmselect/prometheus/export.qtpl:19
//line app/vmselect/prometheus/export.qtpl:18
func ExportPrometheusLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:18
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:19
WriteExportPrometheusLine(qb422016, rs)
//line app/vmselect/prometheus/export.qtpl:19
//line app/vmselect/prometheus/export.qtpl:18
WriteExportPrometheusLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:18
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:19
//line app/vmselect/prometheus/export.qtpl:18
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:19
//line app/vmselect/prometheus/export.qtpl:18
return qs422016
//line app/vmselect/prometheus/export.qtpl:19
//line app/vmselect/prometheus/export.qtpl:18
}
//line app/vmselect/prometheus/export.qtpl:20
func StreamExportJSONLine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:21
if len(xb.timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:21
func StreamExportJSONLine(qw422016 *qt422016.Writer, rs *netstorage.Result) {
//line app/vmselect/prometheus/export.qtpl:22
if len(rs.Timestamps) == 0 {
//line app/vmselect/prometheus/export.qtpl:22
return
//line app/vmselect/prometheus/export.qtpl:22
//line app/vmselect/prometheus/export.qtpl:21
}
//line app/vmselect/prometheus/export.qtpl:22
//line app/vmselect/prometheus/export.qtpl:21
qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:24
streammetricNameObject(qw422016, &rs.MetricName)
//line app/vmselect/prometheus/export.qtpl:24
//line app/vmselect/prometheus/export.qtpl:23
streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:23
qw422016.N().S(`,"values":[`)
//line app/vmselect/prometheus/export.qtpl:25
if len(xb.values) > 0 {
//line app/vmselect/prometheus/export.qtpl:26
if len(rs.Values) > 0 {
//line app/vmselect/prometheus/export.qtpl:27
values := rs.Values
values := xb.values
//line app/vmselect/prometheus/export.qtpl:28
//line app/vmselect/prometheus/export.qtpl:27
qw422016.N().F(values[0])
//line app/vmselect/prometheus/export.qtpl:29
//line app/vmselect/prometheus/export.qtpl:28
values = values[1:]
//line app/vmselect/prometheus/export.qtpl:30
//line app/vmselect/prometheus/export.qtpl:29
for _, v := range values {
//line app/vmselect/prometheus/export.qtpl:30
//line app/vmselect/prometheus/export.qtpl:29
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:31
//line app/vmselect/prometheus/export.qtpl:30
qw422016.N().F(v)
//line app/vmselect/prometheus/export.qtpl:32
//line app/vmselect/prometheus/export.qtpl:31
}
//line app/vmselect/prometheus/export.qtpl:33
//line app/vmselect/prometheus/export.qtpl:32
}
//line app/vmselect/prometheus/export.qtpl:33
//line app/vmselect/prometheus/export.qtpl:32
qw422016.N().S(`],"timestamps":[`)
//line app/vmselect/prometheus/export.qtpl:35
if len(xb.timestamps) > 0 {
//line app/vmselect/prometheus/export.qtpl:36
if len(rs.Timestamps) > 0 {
//line app/vmselect/prometheus/export.qtpl:37
timestamps := rs.Timestamps
timestamps := xb.timestamps
//line app/vmselect/prometheus/export.qtpl:38
//line app/vmselect/prometheus/export.qtpl:37
qw422016.N().DL(timestamps[0])
//line app/vmselect/prometheus/export.qtpl:39
//line app/vmselect/prometheus/export.qtpl:38
timestamps = timestamps[1:]
//line app/vmselect/prometheus/export.qtpl:40
//line app/vmselect/prometheus/export.qtpl:39
for _, ts := range timestamps {
//line app/vmselect/prometheus/export.qtpl:40
//line app/vmselect/prometheus/export.qtpl:39
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:41
//line app/vmselect/prometheus/export.qtpl:40
qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:42
//line app/vmselect/prometheus/export.qtpl:41
}
//line app/vmselect/prometheus/export.qtpl:43
//line app/vmselect/prometheus/export.qtpl:42
}
//line app/vmselect/prometheus/export.qtpl:43
//line app/vmselect/prometheus/export.qtpl:42
qw422016.N().S(`]}`)
//line app/vmselect/prometheus/export.qtpl:45
//line app/vmselect/prometheus/export.qtpl:44
qw422016.N().S(`
`)
//line app/vmselect/prometheus/export.qtpl:46
//line app/vmselect/prometheus/export.qtpl:45
}
//line app/vmselect/prometheus/export.qtpl:46
func WriteExportJSONLine(qq422016 qtio422016.Writer, rs *netstorage.Result) {
//line app/vmselect/prometheus/export.qtpl:46
//line app/vmselect/prometheus/export.qtpl:45
func WriteExportJSONLine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:45
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:46
StreamExportJSONLine(qw422016, rs)
//line app/vmselect/prometheus/export.qtpl:46
//line app/vmselect/prometheus/export.qtpl:45
StreamExportJSONLine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:45
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:46
//line app/vmselect/prometheus/export.qtpl:45
}
//line app/vmselect/prometheus/export.qtpl:46
func ExportJSONLine(rs *netstorage.Result) string {
//line app/vmselect/prometheus/export.qtpl:46
//line app/vmselect/prometheus/export.qtpl:45
func ExportJSONLine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:45
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:46
WriteExportJSONLine(qb422016, rs)
//line app/vmselect/prometheus/export.qtpl:46
//line app/vmselect/prometheus/export.qtpl:45
WriteExportJSONLine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:45
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:46
//line app/vmselect/prometheus/export.qtpl:45
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:46
//line app/vmselect/prometheus/export.qtpl:45
return qs422016
//line app/vmselect/prometheus/export.qtpl:46
//line app/vmselect/prometheus/export.qtpl:45
}
//line app/vmselect/prometheus/export.qtpl:48
func StreamExportPromAPILine(qw422016 *qt422016.Writer, rs *netstorage.Result) {
//line app/vmselect/prometheus/export.qtpl:48
//line app/vmselect/prometheus/export.qtpl:47
func StreamExportPromAPILine(qw422016 *qt422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:47
qw422016.N().S(`{"metric":`)
//line app/vmselect/prometheus/export.qtpl:50
streammetricNameObject(qw422016, &rs.MetricName)
//line app/vmselect/prometheus/export.qtpl:50
//line app/vmselect/prometheus/export.qtpl:49
streammetricNameObject(qw422016, xb.mn)
//line app/vmselect/prometheus/export.qtpl:49
qw422016.N().S(`,"values":`)
//line app/vmselect/prometheus/export.qtpl:51
streamvaluesWithTimestamps(qw422016, rs.Values, rs.Timestamps)
//line app/vmselect/prometheus/export.qtpl:51
//line app/vmselect/prometheus/export.qtpl:50
streamvaluesWithTimestamps(qw422016, xb.values, xb.timestamps)
//line app/vmselect/prometheus/export.qtpl:50
qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:53
//line app/vmselect/prometheus/export.qtpl:52
}
//line app/vmselect/prometheus/export.qtpl:53
func WriteExportPromAPILine(qq422016 qtio422016.Writer, rs *netstorage.Result) {
//line app/vmselect/prometheus/export.qtpl:53
//line app/vmselect/prometheus/export.qtpl:52
func WriteExportPromAPILine(qq422016 qtio422016.Writer, xb *exportBlock) {
//line app/vmselect/prometheus/export.qtpl:52
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:53
StreamExportPromAPILine(qw422016, rs)
//line app/vmselect/prometheus/export.qtpl:53
//line app/vmselect/prometheus/export.qtpl:52
StreamExportPromAPILine(qw422016, xb)
//line app/vmselect/prometheus/export.qtpl:52
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:53
//line app/vmselect/prometheus/export.qtpl:52
}
//line app/vmselect/prometheus/export.qtpl:53
func ExportPromAPILine(rs *netstorage.Result) string {
//line app/vmselect/prometheus/export.qtpl:53
//line app/vmselect/prometheus/export.qtpl:52
func ExportPromAPILine(xb *exportBlock) string {
//line app/vmselect/prometheus/export.qtpl:52
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:53
WriteExportPromAPILine(qb422016, rs)
//line app/vmselect/prometheus/export.qtpl:53
//line app/vmselect/prometheus/export.qtpl:52
WriteExportPromAPILine(qb422016, xb)
//line app/vmselect/prometheus/export.qtpl:52
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:53
//line app/vmselect/prometheus/export.qtpl:52
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:53
//line app/vmselect/prometheus/export.qtpl:52
return qs422016
//line app/vmselect/prometheus/export.qtpl:53
//line app/vmselect/prometheus/export.qtpl:52
}
//line app/vmselect/prometheus/export.qtpl:55
//line app/vmselect/prometheus/export.qtpl:54
func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:55
//line app/vmselect/prometheus/export.qtpl:54
qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`)
//line app/vmselect/prometheus/export.qtpl:61
//line app/vmselect/prometheus/export.qtpl:60
bb, ok := <-resultsCh
//line app/vmselect/prometheus/export.qtpl:62
//line app/vmselect/prometheus/export.qtpl:61
if ok {
//line app/vmselect/prometheus/export.qtpl:63
//line app/vmselect/prometheus/export.qtpl:62
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:64
//line app/vmselect/prometheus/export.qtpl:63
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:65
//line app/vmselect/prometheus/export.qtpl:64
for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:65
//line app/vmselect/prometheus/export.qtpl:64
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:66
//line app/vmselect/prometheus/export.qtpl:65
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:67
//line app/vmselect/prometheus/export.qtpl:66
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:68
//line app/vmselect/prometheus/export.qtpl:67
}
//line app/vmselect/prometheus/export.qtpl:69
//line app/vmselect/prometheus/export.qtpl:68
}
//line app/vmselect/prometheus/export.qtpl:69
//line app/vmselect/prometheus/export.qtpl:68
qw422016.N().S(`]}}`)
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
}
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
StreamExportPromAPIResponse(qw422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
}
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
WriteExportPromAPIResponse(qb422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
return qs422016
//line app/vmselect/prometheus/export.qtpl:73
//line app/vmselect/prometheus/export.qtpl:72
}
//line app/vmselect/prometheus/export.qtpl:75
//line app/vmselect/prometheus/export.qtpl:74
func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:76
//line app/vmselect/prometheus/export.qtpl:75
for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:77
//line app/vmselect/prometheus/export.qtpl:76
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:78
//line app/vmselect/prometheus/export.qtpl:77
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:79
//line app/vmselect/prometheus/export.qtpl:78
}
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
}
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer) {
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
StreamExportStdResponse(qw422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
}
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer) string {
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
WriteExportStdResponse(qb422016, resultsCh)
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
return qs422016
//line app/vmselect/prometheus/export.qtpl:80
//line app/vmselect/prometheus/export.qtpl:79
}
//line app/vmselect/prometheus/export.qtpl:82
//line app/vmselect/prometheus/export.qtpl:81
func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:83
//line app/vmselect/prometheus/export.qtpl:82
qw422016.N().Z(mn.MetricGroup)
//line app/vmselect/prometheus/export.qtpl:84
//line app/vmselect/prometheus/export.qtpl:83
if len(mn.Tags) > 0 {
//line app/vmselect/prometheus/export.qtpl:84
//line app/vmselect/prometheus/export.qtpl:83
qw422016.N().S(`{`)
//line app/vmselect/prometheus/export.qtpl:86
//line app/vmselect/prometheus/export.qtpl:85
tags := mn.Tags
//line app/vmselect/prometheus/export.qtpl:87
//line app/vmselect/prometheus/export.qtpl:86
qw422016.N().Z(tags[0].Key)
//line app/vmselect/prometheus/export.qtpl:87
//line app/vmselect/prometheus/export.qtpl:86
qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:87
//line app/vmselect/prometheus/export.qtpl:86
qw422016.N().QZ(tags[0].Value)
//line app/vmselect/prometheus/export.qtpl:88
//line app/vmselect/prometheus/export.qtpl:87
tags = tags[1:]
//line app/vmselect/prometheus/export.qtpl:89
//line app/vmselect/prometheus/export.qtpl:88
for i := range tags {
//line app/vmselect/prometheus/export.qtpl:90
//line app/vmselect/prometheus/export.qtpl:89
tag := &tags[i]
//line app/vmselect/prometheus/export.qtpl:90
//line app/vmselect/prometheus/export.qtpl:89
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:91
//line app/vmselect/prometheus/export.qtpl:90
qw422016.N().Z(tag.Key)
//line app/vmselect/prometheus/export.qtpl:91
//line app/vmselect/prometheus/export.qtpl:90
qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:91
//line app/vmselect/prometheus/export.qtpl:90
qw422016.N().QZ(tag.Value)
//line app/vmselect/prometheus/export.qtpl:92
//line app/vmselect/prometheus/export.qtpl:91
}
//line app/vmselect/prometheus/export.qtpl:92
//line app/vmselect/prometheus/export.qtpl:91
qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:94
//line app/vmselect/prometheus/export.qtpl:93
}
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
}
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
streamprometheusMetricName(qw422016, mn)
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
}
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
func prometheusMetricName(mn *storage.MetricName) string {
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
writeprometheusMetricName(qb422016, mn)
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
return qs422016
//line app/vmselect/prometheus/export.qtpl:95
//line app/vmselect/prometheus/export.qtpl:94
}

View File

@ -1,7 +1,7 @@
{% import "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" %}
{% stripspace %}
LabelsCountResponse generates response for /api/v1/label_entries .
LabelsCountResponse generates response for /api/v1/labels/count .
{% func LabelsCountResponse(labelEntries []storage.TagEntry) %}
{
"status":"success",

View File

@ -7,7 +7,7 @@ package prometheus
//line app/vmselect/prometheus/labels_count_response.qtpl:1
import "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
// LabelsCountResponse generates response for /api/v1/label_entries .
// LabelsCountResponse generates response for /api/v1/labels/count .
//line app/vmselect/prometheus/labels_count_response.qtpl:5
import (

View File

@ -1,6 +1,7 @@
package prometheus
import (
"bufio"
"flag"
"fmt"
"math"
@ -15,6 +16,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@ -43,6 +46,11 @@ var (
// Default step used if not set.
const defaultStep = 5 * 60 * 1000
// Buffer size for big responses (i.e. /federate and /api/v1/export/* )
// By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses.
// These buffers may result in visible overhead for responses exceeding tens of megabytes.
const bigResponseBufferSize = 128 * 1024
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6
@ -105,10 +113,12 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
}()
w.Header().Set("Content-Type", "text/plain")
bw := bufio.NewWriterSize(w, bigResponseBufferSize)
for bb := range resultsCh {
w.Write(bb.B)
bw.Write(bb.B)
quicktemplate.ReleaseByteBuffer(bb)
}
_ = bw.Flush()
err = <-doneCh
if err != nil {
@ -120,6 +130,90 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`)
// ExportNativeHandler exports data in native format from /api/v1/export/native.
func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %w", err)
}
matches := r.Form["match[]"]
if len(matches) == 0 {
// Maintain backwards compatibility
match := r.FormValue("match")
if len(match) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
matches = []string{match}
}
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
return err
}
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
}
deadline := searchutils.GetDeadlineForExport(r, startTime)
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return err
}
sq := &storage.SearchQuery{
AccountID: at.AccountID,
ProjectID: at.ProjectID,
MinTimestamp: start,
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
w.Header().Set("Content-Type", "VictoriaMetrics/native")
bw := bufio.NewWriterSize(w, bigResponseBufferSize)
// Marshal tr
trBuf := make([]byte, 0, 16)
trBuf = encoding.MarshalInt64(trBuf, start)
trBuf = encoding.MarshalInt64(trBuf, end)
bw.Write(trBuf)
var bwLock sync.Mutex
isPartial, err := netstorage.ExportBlocks(at, sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
dstBuf := bbPool.Get()
tmpBuf := bbPool.Get()
dst := dstBuf.B
tmp := tmpBuf.B
// Marshal mn
tmp = mn.Marshal(tmp[:0])
dst = encoding.MarshalUint32(dst, uint32(len(tmp)))
dst = append(dst, tmp...)
// Marshal b
tmp = b.MarshalPortable(tmp[:0])
dst = encoding.MarshalUint32(dst, uint32(len(tmp)))
dst = append(dst, tmp...)
tmpBuf.B = tmp
bbPool.Put(tmpBuf)
bwLock.Lock()
_, err := bw.Write(dst)
bwLock.Unlock()
if err != nil {
return fmt.Errorf("cannot write data to client: %w", err)
}
dstBuf.B = dst
bbPool.Put(dstBuf)
return nil
})
_ = bw.Flush()
if err == nil && isPartial && searchutils.GetDenyPartialResponse(r) {
err = fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
return err
}
var bbPool bytesutil.ByteBufferPool
// ExportHandler exports data in raw format from /api/v1/export.
func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
ct := startTime.UnixNano() / 1e6
@ -145,11 +239,12 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
}
format := r.FormValue("format")
maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
reduceMemUsage := searchutils.GetBool(r, "reduce_mem_usage")
deadline := searchutils.GetDeadlineForExport(r, startTime)
if start >= end {
end = start + defaultStep
}
if err := exportHandler(at, w, r, matches, start, end, format, maxRowsPerLine, deadline); err != nil {
if err := exportHandler(at, w, r, matches, start, end, format, maxRowsPerLine, reduceMemUsage, deadline); err != nil {
return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %w", matches, start, end, err)
}
exportDuration.UpdateDuration(startTime)
@ -158,17 +253,35 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64, format string, maxRowsPerLine int, deadline searchutils.Deadline) error {
func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64,
format string, maxRowsPerLine int, reduceMemUsage bool, deadline searchutils.Deadline) error {
writeResponseFunc := WriteExportStdResponse
writeLineFunc := func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportJSONLine(bb, rs)
WriteExportJSONLine(bb, xb)
resultsCh <- bb
}
contentType := "application/stream+json"
if format == "prometheus" {
contentType = "text/plain"
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportPrometheusLine(bb, xb)
resultsCh <- bb
}
} else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportPromAPILine(bb, xb)
resultsCh <- bb
}
}
if maxRowsPerLine > 0 {
writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
valuesOrig := rs.Values
timestampsOrig := rs.Timestamps
writeLineFuncOrig := writeLineFunc
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
valuesOrig := xb.values
timestampsOrig := xb.timestamps
values := valuesOrig
timestamps := timestampsOrig
for len(values) > 0 {
@ -185,30 +298,12 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
values = nil
timestamps = nil
}
rs.Values = valuesChunk
rs.Timestamps = timestampsChunk
bb := quicktemplate.AcquireByteBuffer()
WriteExportJSONLine(bb, rs)
resultsCh <- bb
xb.values = valuesChunk
xb.timestamps = timestampsChunk
writeLineFuncOrig(xb, resultsCh)
}
rs.Values = valuesOrig
rs.Timestamps = timestampsOrig
}
}
contentType := "application/stream+json"
if format == "prometheus" {
contentType = "text/plain"
writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportPrometheusLine(bb, rs)
resultsCh <- bb
}
} else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
WriteExportPromAPILine(bb, rs)
resultsCh <- bb
xb.values = valuesOrig
xb.timestamps = timestampsOrig
}
}
@ -223,27 +318,58 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
MaxTimestamp: end,
TagFilterss: tagFilterss,
}
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, true, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
if isPartial && searchutils.GetDenyPartialResponse(r) {
rss.Cancel()
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
doneCh := make(chan error)
go func() {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
writeLineFunc(rs, resultsCh)
})
close(resultsCh)
doneCh <- err
}()
if !reduceMemUsage {
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, true, deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
if isPartial && searchutils.GetDenyPartialResponse(r) {
rss.Cancel()
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
go func() {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
xb := exportBlockPool.Get().(*exportBlock)
xb.mn = &rs.MetricName
xb.timestamps = rs.Timestamps
xb.values = rs.Values
writeLineFunc(xb, resultsCh)
xb.reset()
exportBlockPool.Put(xb)
})
close(resultsCh)
doneCh <- err
}()
} else {
go func() {
isPartial, err := netstorage.ExportBlocks(at, sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
if err := b.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block during export: %s", err)
}
xb := exportBlockPool.Get().(*exportBlock)
xb.mn = mn
xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr)
if len(xb.timestamps) > 0 {
writeLineFunc(xb, resultsCh)
}
xb.reset()
exportBlockPool.Put(xb)
return nil
})
if err == nil && isPartial && searchutils.GetDenyPartialResponse(r) {
err = fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
}
close(resultsCh)
doneCh <- err
}()
}
w.Header().Set("Content-Type", contentType)
writeResponseFunc(w, resultsCh)
bw := bufio.NewWriterSize(w, bigResponseBufferSize)
writeResponseFunc(bw, resultsCh)
_ = bw.Flush()
// Consume all the data from resultsCh in the event writeResponseFunc
// fails to consume all the data.
@ -257,6 +383,24 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
return nil
}
type exportBlock struct {
mn *storage.MetricName
timestamps []int64
values []float64
}
func (xb *exportBlock) reset() {
xb.mn = nil
xb.timestamps = xb.timestamps[:0]
xb.values = xb.values[:0]
}
var exportBlockPool = &sync.Pool{
New: func() interface{} {
return &exportBlock{}
},
}
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
@ -742,7 +886,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
start -= offset
end := start
start = end - window
if err := exportHandler(at, w, r, []string{childQuery}, start, end, "promapi", 0, deadline); err != nil {
if err := exportHandler(at, w, r, []string{childQuery}, start, end, "promapi", 0, false, deadline); err != nil {
return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %w", childQuery, start, end, err)
}
queryDuration.UpdateDuration(startTime)

View File

@ -177,6 +177,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
This handler is disabled by default. It is exposed on a distinct TCP address set via `-opentsdbHTTPListenAddr` command-line flag.
See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#sending-opentsdb-data-via-http-apiput-requests) for details.
- `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` on `vmselect` (see below).
- `prometheus/api/v1/import/native` - for importing data obtained via `api/v1/export/native` on `vmselect` (see below).
- `prometheus/api/v1/import/csv` - for importing arbitrary CSV data. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data) for details.
- `prometheus/api/v1/import/prometheus` - for importing data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details.
@ -189,7 +190,8 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
- `api/v1/labels` - returns a [list of label names](https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names).
- `api/v1/label/<label_name>/values` - returns values for the given `<label_name>` according [to API](https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values).
- `federate` - returns [federated metrics](https://prometheus.io/docs/prometheus/latest/federation/).
- `api/v1/export` - exports raw data. See [this article](https://medium.com/@valyala/analyzing-prometheus-data-with-external-tools-5f3e5e147639) for details.
- `api/v1/export` - exports raw data in JSON line format. See [this article](https://medium.com/@valyala/analyzing-prometheus-data-with-external-tools-5f3e5e147639) for details.
- `api/v1/export/native` - exports raw data in native binary format. It may be imported into another VictoriaMetrics via `api/v1/import/native` (see above).
- `api/v1/status/tsdb` - for time series stats. See [these docs](https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats) for details.
- `api/v1/status/active_queries` - for currently executed active queries. Note that every `vmselect` maintains an independent list of active queries,
which is returned in the response.

View File

@ -91,7 +91,7 @@ The main differences between Cortex and VictoriaMetrics:
VictoriaMetrics may lose only a few seconds of recent data, which isn't synced to persistent storage yet.
See [this article for details](https://medium.com/@valyala/wal-usage-looks-broken-in-modern-time-series-databases-b62a627ab704).
- Cortex is usually slower and requires more CPU and RAM than VictoriaMetrics. See [this talk from Adidas at PromCon 2019](https://promcon.io/2019-munich/talks/remote-write-storage-wars/) and [other case studies](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/CaseStudies).
- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV.
- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV, JSON, native binary.
See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details.
@ -112,7 +112,7 @@ The main differences between Cortex and VictoriaMetrics:
- Thanos may be harder to setup and operate comparing to VictoriaMetrics, since it has more moving parts, which can be connected with less reliable networks.
See [this article for details](https://medium.com/faun/comparing-thanos-to-victoriametrics-cluster-b193bea1683).
- Thanos is usually slower and requires more CPU and RAM than VictoriaMetrics. See [this talk from Adidas at PromCon 2019](https://promcon.io/2019-munich/talks/remote-write-storage-wars/).
- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV.
- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to Prometheus remote_write protocol - InfluxDB, OpenTSDB, Graphite, CSV, JSON, native binary.
See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details.
@ -120,7 +120,7 @@ The main differences between Cortex and VictoriaMetrics:
- VictoriaMetrics requires [10x less RAM](https://medium.com/@valyala/insert-benchmarks-with-inch-influxdb-vs-victoriametrics-e31a41ae2893) and it [works faster](https://medium.com/@valyala/measuring-vertical-scalability-for-time-series-databases-in-google-cloud-92550d78d8ae).
- VictoriaMetrics provides [better query language](https://medium.com/@valyala/promql-tutorial-for-beginners-9ab455142085) than InfluxQL or Flux.
- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to InfluxDB - Prometheus remote_write, OpenTSDB, Graphite, CSV.
- VictoriaMetrics accepts data in multiple popular data ingestion protocols additionally to InfluxDB - Prometheus remote_write, OpenTSDB, Graphite, CSV, JSON, native binary.
See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data) for details.

View File

@ -77,7 +77,8 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
if `-graphiteListenAddr` is set.
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol) if `-opentsdbListenAddr` is set.
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests) if `-opentsdbHTTPListenAddr` is set.
* [/api/v1/import](#how-to-import-time-series-data).
* [JSON line format](#how-to-import-data-in-json-line-format).
* [Native binary format](#how-to-import-data-in-native-format).
* [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format).
* [Arbitrary CSV data](#how-to-import-csv-data).
* Supports metrics' relabeling. See [these docs](#relabeling) for details.
@ -100,8 +101,6 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to send data from Graphite-compatible agents such as StatsD](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd)
* [Querying Graphite data](#querying-graphite-data)
* [How to send data from OpenTSDB-compatible agents](#how-to-send-data-from-opentsdb-compatible-agents)
* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format)
* [How to import CSV data](#how-to-import-csv-data)
* [Prometheus querying API usage](#prometheus-querying-api-usage)
* [Prometheus querying API enhancements](#prometheus-querying-api-enhancements)
* [Graphite Metrics API usage](#graphite-metrics-api-usage)
@ -117,7 +116,13 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [How to delete time series](#how-to-delete-time-series)
* [Forced merge](#forced-merge)
* [How to export time series](#how-to-export-time-series)
* [How to export data in native format](#how-to-export-data-in-native-format)
* [How to export data in JSON line format](#how-to-export-data-in-json-line-format)
* [How to import time series data](#how-to-import-time-series-data)
* [How to import data in native format](#how-to-import-data-in-native-format)
* [How to import data in json line format](#how-to-import-data-in-json-line-format)
* [How to import CSV data](#how-to-import-csv-data)
* [How to import data in Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format)
* [Relabeling](#relabeling)
* [Federation](#federation)
* [Capacity planning](#capacity-planning)
@ -345,7 +350,7 @@ curl -d 'measurement,tag1=value1,tag2=value2 field1=123,field2=1.23' -X POST 'ht
```
An arbitrary number of lines delimited by '\n' (aka newline char) may be sent in a single request.
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint:
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"measurement_.*"}'
@ -381,7 +386,7 @@ echo "foo.bar.baz;tag1=value1;tag2=value2 123 `date +%s`" | nc -N localhost 2003
VictoriaMetrics sets the current time if the timestamp is omitted.
An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint:
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
@ -425,7 +430,7 @@ echo "put foo.bar.baz `date +%s` 123 tag1=value1 tag2=value2" | nc -N localhost
```
An arbitrary number of lines delimited by `\n` (aka newline char) may be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint:
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
@ -460,7 +465,7 @@ Example for writing multiple data points in a single request:
curl -H 'Content-Type: application/json' -d '[{"metric":"foo","value":45.34},{"metric":"bar","value":43}]' http://localhost:4242/api/put
```
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint:
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]=x.y.z' -d 'match[]=foo' -d 'match[]=bar'
@ -475,91 +480,6 @@ The `/api/v1/export` endpoint should return the following response:
```
### How to import CSV data
Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg.
The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon:
```
<column_pos>:<type>:<context>
```
* `<column_pos>` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary.
* `<type>` describes the column type. Supported types are:
* `metric` - the corresponding CSV column at `<column_pos>` contains metric value, which must be integer or floating-point number.
The metric name is read from the `<context>`. CSV line must have at least a single metric field. Multiple metric fields per CSV line is OK.
* `label` - the corresponding CSV column at `<column_pos>` contains label value. The label name is read from the `<context>`.
CSV line may have arbitrary number of label fields. All these labels are attached to all the configured metrics.
* `time` - the corresponding CSV column at `<column_pos>` contains metric time. CSV line may contain either one or zero columns with time.
If CSV line has no time, then the current time is used. The time is applied to all the configured metrics.
The format of the time is configured via `<context>`. Supported time formats are:
* `unix_s` - unix timestamp in seconds.
* `unix_ms` - unix timestamp in milliseconds.
* `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds.
* `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`.
* `custom:<layout>` - custom layout for the timestamp. The `<layout>` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse).
Each request to `/api/v1/import/csv` may contain arbitrary number of CSV lines.
Example for importing CSV data via `/api/v1/import/csv`:
```bash
curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
```
After that the data may be read via [/api/v1/export](#how-to-export-time-series) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}'
```
The following response should be returned:
```bash
{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]}
{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]}
{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]}
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
```
Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args.
For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
### How to import data in Prometheus exposition format
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics:
```bash
curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus'
```
The following command may be used for verifying the imported data:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}'
```
It should return something like the following:
```
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
```
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
It can be overriden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`.
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
### Prometheus querying API usage
VictoriaMetrics supports the following handlers from [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/):
@ -756,11 +676,35 @@ when new data is ingested into it.
### How to export time series
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`,
VictoriaMetrics provides the following handlers for exporting data:
* `/api/v1/export/native` for exporting data in native binary format. This is the most efficient format for data export.
See [these docs](#how-to-export-data-in-native-format) for details.
* `/api/v1/export` for exporing data in JSON line format. See [these docs](#how-to-export-data-in-json-line-format) for details.
#### How to export data in native format
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export/native?match[]=<timeseries_selector_for_export>`,
where `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
for metrics to export. Use `{__name__!=""}` selector for fetching all the time series.
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
The exported data can be imported to VictoriaMetrics via [/api/v1/import/native](#how-to-import-data-in-native-format).
#### How to export data in JSON line format
Consider [exporting data in native format](#how-to-export-data-in-native-format) if big amounts of data must be migrated between VictoriaMetrics instances,
since exporting in native format usually consumes lower amounts of CPU and memory resources, while the resulting exported data occupies lower amounts of disk space.
In order to export data in JSON line format, send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`,
where `<timeseries_selector_for_export>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
for metrics to export. Use `{__name__!=""}` selector for fetching all the time series.
The response would contain all the data for the selected time series in [JSON streaming format](https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON).
Each JSON line would contain data for a single time series. An example output:
Each JSON line contains samples for a single time series. An example output:
```jsonl
{"metric":{"__name__":"up","job":"node_exporter","instance":"localhost:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]}
@ -770,8 +714,9 @@ Each JSON line would contain data for a single time series. An example output:
Optional `start` and `end` args may be added to the request in order to limit the time frame for the exported data. These args may contain either
unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values.
Optional `max_rows_per_line` arg may be added to the request in order to limit the maximum number of rows exported per each JSON line.
By default each JSON line contains all the rows for a single time series.
Optional `max_rows_per_line` arg may be added to the request for limiting the maximum number of rows exported per each JSON line.
Optional `reduce_mem_usage=1` arg may be added to the request for reducing memory usage when exporting big number of time series.
In this case the output may contain multiple lines with distinct samples for the same time series.
Pass `Accept-Encoding: gzip` HTTP header in the request to `/api/v1/export` in order to reduce network bandwidth during exporing big amounts
of time series data. This enables gzip compression for the exported data. Example for exporting gzipped data:
@ -782,22 +727,58 @@ curl -H 'Accept-Encoding: gzip' http://localhost:8428/api/v1/export -d 'match[]=
The maximum duration for each request to `/api/v1/export` is limited by `-search.maxExportDuration` command-line flag.
Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-time-series-data).
Exported data can be imported via POST'ing it to [/api/v1/import](#how-to-import-data-in-json-line-format).
### How to import time series data
Time series data can be imported via any supported ingestion protocol:
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write)
* [Influx line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf)
* [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd)
* [OpenTSDB telnet put protocol](#sending-data-via-telnet-put-protocol)
* [OpenTSDB http /api/put](#sending-opentsdb-data-via-http-apiput-requests)
* `/api/v1/import` http POST handler, which accepts data from [/api/v1/export](#how-to-export-time-series).
* `/api/v1/import/csv` http POST handler, which accepts CSV data. See [these docs](#how-to-import-csv-data) for details.
* `/api/v1/import/prometheus` http POST handler, which accepts data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
* [Prometheus remote_write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
* Influx line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
* Graphite plaintext protocol. See[these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.
* OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details.
* `/api/v1/import` for importing data obtained from [/api/v1/export](#how-to-export-data-in-json-line-format).
See [these docs](##how-to-import-data-in-json-line-format) for details.
* `/api/v1/import/native` for importing data obtained from [/api/v1/export/native](#how-to-export-data-in-native-format).
See [these docs](#how-to-import-data-in-native-format) for details.
* `/api/v1/import/csv` for importing arbitrary CSV data. See [these docs](#how-to-import-csv-data) for details.
* `/api/v1/import/prometheus` for importing data in Prometheus exposition format. See [these docs](#how-to-import-data-in-prometheus-exposition-format) for details.
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import`. Example for importing data obtained via `/api/v1/export`:
#### How to import data in native format
The most efficient protocol for importing data into VictoriaMetrics is `/api/v1/import/native`.
Example for importing data obtained via [/api/v1/export/native](#how-to-export-data-in-native-format):
```bash
# Export the data from <source-victoriametrics>:
curl http://source-victoriametrics:8428/api/v1/export/native -d 'match={__name__!=""}' > exported_data.bin
# Import the data to <destination-victoriametrics>:
curl -X POST http://destination-victoriametrics:8428/api/v1/import/native -T exported_data.bin
```
Pass `Content-Encoding: gzip` HTTP request header to `/api/v1/import/native` for importing gzipped data:
```bash
# Export gzipped data from <source-victoriametrics>:
curl -H 'Accept-Encoding: gzip' http://source-victoriametrics:8428/api/v1/export/native -d 'match={__name__!=""}' > exported_data.bin.gz
# Import gzipped data to <destination-victoriametrics>:
curl -X POST -H 'Content-Encoding: gzip' http://destination-victoriametrics:8428/api/v1/import/native -T exported_data.bin.gz
```
Extra labels may be added to all the imported time series by passing `extra_label=name=value` query args.
For example, `/api/v1/import/native?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported time series.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
#### How to import data in JSON line format
Example for importing data obtained via [/api/v1/export](#how-to-export-data-in-json-line-format):
```bash
# Export the data from <source-victoriametrics>:
@ -823,6 +804,94 @@ For example, `/api/v1/import?extra_label=foo=bar` would add `"foo":"bar"` label
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
#### How to import CSV data
Arbitrary CSV data can be imported via `/api/v1/import/csv`. The CSV data is imported according to the provided `format` query arg.
The `format` query arg must contain comma-separated list of parsing rules for CSV fields. Each rule consists of three parts delimited by a colon:
```
<column_pos>:<type>:<context>
```
* `<column_pos>` is the position of the CSV column (field). Column numbering starts from 1. The order of parsing rules may be arbitrary.
* `<type>` describes the column type. Supported types are:
* `metric` - the corresponding CSV column at `<column_pos>` contains metric value, which must be integer or floating-point number.
The metric name is read from the `<context>`. CSV line must have at least a single metric field. Multiple metric fields per CSV line is OK.
* `label` - the corresponding CSV column at `<column_pos>` contains label value. The label name is read from the `<context>`.
CSV line may have arbitrary number of label fields. All these labels are attached to all the configured metrics.
* `time` - the corresponding CSV column at `<column_pos>` contains metric time. CSV line may contain either one or zero columns with time.
If CSV line has no time, then the current time is used. The time is applied to all the configured metrics.
The format of the time is configured via `<context>`. Supported time formats are:
* `unix_s` - unix timestamp in seconds.
* `unix_ms` - unix timestamp in milliseconds.
* `unix_ns` - unix timestamp in nanoseconds. Note that VictoriaMetrics rounds the timestamp to milliseconds.
* `rfc3339` - timestamp in [RFC3339](https://tools.ietf.org/html/rfc3339) format, i.e. `2006-01-02T15:04:05Z`.
* `custom:<layout>` - custom layout for the timestamp. The `<layout>` may contain arbitrary time layout according to [time.Parse rules in Go](https://golang.org/pkg/time/#Parse).
Each request to `/api/v1/import/csv` may contain arbitrary number of CSV lines.
Example for importing CSV data via `/api/v1/import/csv`:
```bash
curl -d "GOOG,1.23,4.56,NYSE" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
curl -d "MSFT,3.21,1.67,NASDAQ" 'http://localhost:8428/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'
```
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match[]={ticker!=""}'
```
The following response should be returned:
```bash
{"metric":{"__name__":"bid","market":"NASDAQ","ticker":"MSFT"},"values":[1.67],"timestamps":[1583865146520]}
{"metric":{"__name__":"bid","market":"NYSE","ticker":"GOOG"},"values":[4.56],"timestamps":[1583865146495]}
{"metric":{"__name__":"ask","market":"NASDAQ","ticker":"MSFT"},"values":[3.21],"timestamps":[1583865146520]}
{"metric":{"__name__":"ask","market":"NYSE","ticker":"GOOG"},"values":[1.23],"timestamps":[1583865146495]}
```
Extra labels may be added to all the imported lines by passing `extra_label=name=value` query args.
For example, `/api/v1/import/csv?extra_label=foo=bar` would add `"foo":"bar"` label to all the imported lines.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
#### How to import data in Prometheus exposition format
VictoriaMetrics accepts data in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#text-based-format)
via `/api/v1/import/prometheus` path. For example, the following line imports a single line in Prometheus exposition format into VictoriaMetrics:
```bash
curl -d 'foo{bar="baz"} 123' -X POST 'http://localhost:8428/api/v1/import/prometheus'
```
The following command may be used for verifying the imported data:
```bash
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo"}'
```
It should return something like the following:
```
{"metric":{"__name__":"foo","bar":"baz"},"values":[123],"timestamps":[1594370496905]}
```
Extra labels may be added to all the imported metrics by passing `extra_label=name=value` query args.
For example, `/api/v1/import/prometheus?extra_label=foo=bar` would add `{foo="bar"}` label to all the imported metrics.
If timestamp is missing in `<metric> <value> <timestamp>` Prometheus exposition format line, then the current timestamp is used during data ingestion.
It can be overriden by passing unix timestamp in *milliseconds* via `timestamp` query arg. For example, `/api/v1/import/prometheus?timestamp=1594370496905`.
VictoriaMetrics accepts arbitrary number of lines in a single request to `/api/v1/import/prometheus`, i.e. it supports data streaming.
Note that it could be required to flush response cache after importing historical data. See [these docs](#backfilling) for detail.
VictoriaMetrics also may scrape Prometheus targets - see [these docs](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
### Relabeling
VictoriaMetrics supports Prometheus-compatible relabeling for all the ingested metrics if `-relabelConfig` command-line flag points

View File

@ -25,7 +25,8 @@ to `vmagent` (like the ability to push metrics instead of pulling them). We did
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-send-data-from-opentsdb-compatible-agents).
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-time-series-data).
* JSON lines import protocol via `http://<vmagent>:8429/api/v1/import`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-json-line-format).
* Native data import protocol via `http://<vmagent>:8429/api/v1/import/native`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-native-format).
* Data in Prometheus exposition format. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-data-in-prometheus-exposition-format) for details.
* Arbitrary CSV data via `http://<vmagent>:8429/api/v1/import/csv`. See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/README.md#how-to-import-csv-data).
* Can replicate collected metrics simultaneously to multiple remote storage systems.

View File

@ -0,0 +1,197 @@
package native
import (
"bufio"
"fmt"
"io"
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// ParseStream parses /api/v1/import/native lines from req and calls callback for parsed blocks.
//
// The callback can be called multiple times for streamed data from req.
//
// callback shouldn't hold block after returning.
// callback can be called in parallel from multiple concurrent goroutines.
func ParseStream(req *http.Request, callback func(block *Block) error) error {
r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped vmimport data: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
}
// By default req.Body uses 4Kb buffer. This size is too small for typical request to /api/v1/import/native,
// so use slightly bigger buffer in order to reduce read syscall overhead.
br := bufio.NewReaderSize(r, 1024*1024)
// Read time range (tr)
trBuf := make([]byte, 16)
var tr storage.TimeRange
if _, err := io.ReadFull(br, trBuf); err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read time range: %w", err)
}
tr.MinTimestamp = encoding.UnmarshalInt64(trBuf)
tr.MaxTimestamp = encoding.UnmarshalInt64(trBuf[8:])
// Start GOMAXPROC workers in order to process ingested data in parallel.
gomaxprocs := runtime.GOMAXPROCS(-1)
workCh := make(chan *unmarshalWork, 8*gomaxprocs)
var wg sync.WaitGroup
defer func() {
close(workCh)
wg.Wait()
}()
wg.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {
go func() {
defer wg.Done()
var tmpBlock storage.Block
for uw := range workCh {
if err := uw.unmarshal(&tmpBlock, tr); err != nil {
parseErrors.Inc()
logger.Errorf("error when unmarshaling native block: %s", err)
putUnmarshalWork(uw)
continue
}
if err := callback(&uw.block); err != nil {
processErrors.Inc()
logger.Errorf("error when processing native block: %s", err)
putUnmarshalWork(uw)
continue
}
putUnmarshalWork(uw)
}
}()
}
// Read native blocks and feed workers with work.
sizeBuf := make([]byte, 4)
for {
uw := getUnmarshalWork()
// Read uw.metricNameBuf
if _, err := io.ReadFull(br, sizeBuf); err != nil {
if err == io.EOF {
// End of stream
return nil
}
readErrors.Inc()
return fmt.Errorf("cannot read metricName size: %w", err)
}
readCalls.Inc()
bufSize := encoding.UnmarshalUint32(sizeBuf)
if bufSize > 1024*1024 {
parseErrors.Inc()
return fmt.Errorf("too big metricName size; got %d; shouldn't exceed %d", bufSize, 1024*1024)
}
uw.metricNameBuf = bytesutil.Resize(uw.metricNameBuf, int(bufSize))
if _, err := io.ReadFull(br, uw.metricNameBuf); err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read metricName with size %d bytes: %w", bufSize, err)
}
readCalls.Inc()
// Read uw.blockBuf
if _, err := io.ReadFull(br, sizeBuf); err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read native block size: %w", err)
}
readCalls.Inc()
bufSize = encoding.UnmarshalUint32(sizeBuf)
if bufSize > 1024*1024 {
parseErrors.Inc()
return fmt.Errorf("too big native block size; got %d; shouldn't exceed %d", bufSize, 1024*1024)
}
uw.blockBuf = bytesutil.Resize(uw.blockBuf, int(bufSize))
if _, err := io.ReadFull(br, uw.blockBuf); err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read native block with size %d bytes: %w", bufSize, err)
}
readCalls.Inc()
blocksRead.Inc()
// Feed workers with work.
workCh <- uw
}
}
// Block is a single block from `/api/v1/import/native` request.
type Block struct {
MetricName storage.MetricName
Values []float64
Timestamps []int64
}
func (b *Block) reset() {
b.MetricName.Reset()
b.Values = b.Values[:0]
b.Timestamps = b.Timestamps[:0]
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="native"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="native"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="native"}`)
blocksRead = metrics.NewCounter(`vm_protoparser_blocks_read_total{type="native"}`)
parseErrors = metrics.NewCounter(`vm_protoparser_parse_errors_total{type="native"}`)
processErrors = metrics.NewCounter(`vm_protoparser_process_errors_total{type="native"}`)
)
type unmarshalWork struct {
tr storage.TimeRange
metricNameBuf []byte
blockBuf []byte
block Block
}
func (uw *unmarshalWork) reset() {
uw.metricNameBuf = uw.metricNameBuf[:0]
uw.blockBuf = uw.blockBuf[:0]
uw.block.reset()
}
func (uw *unmarshalWork) unmarshal(tmpBlock *storage.Block, tr storage.TimeRange) error {
block := &uw.block
if err := block.MetricName.Unmarshal(uw.metricNameBuf); err != nil {
return fmt.Errorf("cannot unmarshal metricName from %d bytes: %w", len(uw.metricNameBuf), err)
}
tail, err := tmpBlock.UnmarshalPortable(uw.blockBuf)
if err != nil {
return fmt.Errorf("cannot unmarshal native block from %d bytes: %w", len(uw.blockBuf), err)
}
if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling native block from %d bytes; len(tail)=%d bytes", len(uw.blockBuf), len(tail))
}
block.Timestamps, block.Values = tmpBlock.AppendRowsWithTimeRangeFilter(block.Timestamps[:0], block.Values[:0], tr)
rowsRead.Add(len(block.Timestamps))
return nil
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View File

@ -2,9 +2,11 @@ package storage
import (
"fmt"
"math"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@ -86,21 +88,6 @@ func (b *Block) RowsCount() int {
return int(b.bh.RowsCount)
}
// Values returns b values.
func (b *Block) Values() []int64 {
return b.values
}
// Timestamps returns b timestamps.
func (b *Block) Timestamps() []int64 {
return b.timestamps
}
// Scale returns the decimal scale used for encoding values in the block.
func (b *Block) Scale() int16 {
return b.bh.Scale
}
// Init initializes b with the given tsid, timestamps, values and scale.
func (b *Block) Init(tsid *TSID, timestamps, values []int64, scale int16, precisionBits uint8) {
b.Reset()
@ -289,3 +276,123 @@ func (b *Block) UnmarshalData() error {
return nil
}
// AppendRowsWithTimeRangeFilter filters samples from b according to tr and appends them to dst*.
//
// It is expected that UnmarshalData has been already called on b.
func (b *Block) AppendRowsWithTimeRangeFilter(dstTimestamps []int64, dstValues []float64, tr TimeRange) ([]int64, []float64) {
timestamps, values := b.filterTimestamps(tr)
dstTimestamps = append(dstTimestamps, timestamps...)
dstValues = decimal.AppendDecimalToFloat(dstValues, values, b.bh.Scale)
return dstTimestamps, dstValues
}
func (b *Block) filterTimestamps(tr TimeRange) ([]int64, []int64) {
timestamps := b.timestamps
// Skip timestamps smaller than tr.MinTimestamp.
i := 0
for i < len(timestamps) && timestamps[i] < tr.MinTimestamp {
i++
}
// Skip timestamps bigger than tr.MaxTimestamp.
j := len(timestamps)
for j > i && timestamps[j-1] > tr.MaxTimestamp {
j--
}
if i == j {
return nil, nil
}
return timestamps[i:j], b.values[i:j]
}
// MarshalPortable marshals b to dst, so it could be portably migrated to other VictoriaMetrics instance.
//
// The marshaled value must be unmarshaled with UnmarshalPortable function.
func (b *Block) MarshalPortable(dst []byte) []byte {
b.MarshalData(0, 0)
dst = encoding.MarshalVarInt64(dst, b.bh.MinTimestamp)
dst = encoding.MarshalVarInt64(dst, b.bh.FirstValue)
dst = encoding.MarshalVarUint64(dst, uint64(b.bh.RowsCount))
dst = encoding.MarshalVarInt64(dst, int64(b.bh.Scale))
dst = append(dst, byte(b.bh.TimestampsMarshalType))
dst = append(dst, byte(b.bh.ValuesMarshalType))
dst = encoding.MarshalBytes(dst, b.timestampsData)
dst = encoding.MarshalBytes(dst, b.valuesData)
return dst
}
// UnmarshalPortable unmarshals block from src to b and returns the remaining tail.
//
// It is assumed that the block has been marshaled with MarshalPortable.
func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) {
b.Reset()
// Read header
src, firstTimestamp, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err)
}
b.bh.MinTimestamp = firstTimestamp
src, firstValue, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal firstValue: %w", err)
}
b.bh.FirstValue = firstValue
src, rowsCount, err := encoding.UnmarshalVarUint64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err)
}
if rowsCount > math.MaxUint32 {
return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, math.MaxUint32)
}
b.bh.RowsCount = uint32(rowsCount)
src, scale, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal scale: %w", err)
}
if scale < math.MinInt16 {
return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16)
}
if scale > math.MaxInt16 {
return src, fmt.Errorf("got too big scale=%d; it mustn't exceeed %d", scale, math.MaxInt16)
}
b.bh.Scale = int16(scale)
if len(src) < 1 {
return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1)
}
b.bh.TimestampsMarshalType = encoding.MarshalType(src[0])
src = src[1:]
if len(src) < 1 {
return src, fmt.Errorf("cannot unmarshal marshalType for values from %d bytes; need at least %d bytes", len(src), 1)
}
b.bh.ValuesMarshalType = encoding.MarshalType(src[0])
src = src[1:]
b.bh.PrecisionBits = 64
// Read data
src, timestampsData, err := encoding.UnmarshalBytes(src)
if err != nil {
return src, fmt.Errorf("cannot read timestampsData: %w", err)
}
b.timestampsData = append(b.timestampsData[:0], timestampsData...)
src, valuesData, err := encoding.UnmarshalBytes(src)
if err != nil {
return src, fmt.Errorf("cannot read valuesData: %w", err)
}
b.valuesData = append(b.valuesData[:0], valuesData...)
// Validate
if err := b.bh.validate(); err != nil {
return src, fmt.Errorf("invalid blockHeader: %w", err)
}
if err := b.UnmarshalData(); err != nil {
return src, fmt.Errorf("invalid data: %w", err)
}
return src, nil
}

116
lib/storage/block_test.go Normal file
View File

@ -0,0 +1,116 @@
package storage
import (
"math/rand"
"reflect"
"strings"
"testing"
)
func TestBlockMarshalUnmarshalPortable(t *testing.T) {
var b Block
for i := 0; i < 1000; i++ {
b.Reset()
rowsCount := rand.Intn(maxRowsPerBlock) + 1
b.timestamps = getRandTimestamps(rowsCount)
b.values = getRandValues(rowsCount)
b.bh.Scale = int16(rand.Intn(30) - 15)
b.bh.PrecisionBits = 64
testBlockMarshalUnmarshalPortable(t, &b)
}
}
func testBlockMarshalUnmarshalPortable(t *testing.T, b *Block) {
var b1, b2 Block
b1.CopyFrom(b)
rowsCount := len(b.values)
data := b1.MarshalPortable(nil)
if b1.bh.RowsCount != uint32(rowsCount) {
t.Fatalf("unexpected number of rows marshaled; got %d; want %d", b1.bh.RowsCount, rowsCount)
}
tail, err := b2.UnmarshalPortable(data)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if len(tail) > 0 {
t.Fatalf("unexpected non-empty tail: %X", tail)
}
compareBlocksPortable(t, &b2, b, &b1.bh)
// Verify non-empty prefix and suffix
prefix := "prefix"
suffix := "suffix"
data = append(data[:0], prefix...)
data = b1.MarshalPortable(data)
if b1.bh.RowsCount != uint32(rowsCount) {
t.Fatalf("unexpected number of rows marshaled; got %d; want %d", b1.bh.RowsCount, rowsCount)
}
if !strings.HasPrefix(string(data), prefix) {
t.Fatalf("unexpected prefix in %X; want %X", data, prefix)
}
data = data[len(prefix):]
data = append(data, suffix...)
tail, err = b2.UnmarshalPortable(data)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if string(tail) != suffix {
t.Fatalf("unexpected tail; got %X; want %X", tail, suffix)
}
compareBlocksPortable(t, &b2, b, &b1.bh)
}
func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader) {
t.Helper()
if b1.bh.MinTimestamp != bhExpected.MinTimestamp {
t.Fatalf("unexpected MinTimestamp; got %d; want %d", b1.bh.MinTimestamp, bhExpected.MinTimestamp)
}
if b1.bh.FirstValue != bhExpected.FirstValue {
t.Fatalf("unexpected FirstValue; got %d; want %d", b1.bh.FirstValue, bhExpected.FirstValue)
}
if b1.bh.RowsCount != bhExpected.RowsCount {
t.Fatalf("unexpected RowsCount; got %d; want %d", b1.bh.RowsCount, bhExpected.RowsCount)
}
if b1.bh.Scale != bhExpected.Scale {
t.Fatalf("unexpected Scale; got %d; want %d", b1.bh.Scale, bhExpected.Scale)
}
if b1.bh.TimestampsMarshalType != bhExpected.TimestampsMarshalType {
t.Fatalf("unexpected TimestampsMarshalType; got %d; want %d", b1.bh.TimestampsMarshalType, bhExpected.TimestampsMarshalType)
}
if b1.bh.ValuesMarshalType != bhExpected.ValuesMarshalType {
t.Fatalf("unexpected ValuesMarshalType; got %d; want %d", b1.bh.ValuesMarshalType, bhExpected.ValuesMarshalType)
}
if b1.bh.PrecisionBits != bhExpected.PrecisionBits {
t.Fatalf("unexpected PrecisionBits; got %d; want %d", b1.bh.PrecisionBits, bhExpected.PrecisionBits)
}
if !reflect.DeepEqual(b1.values, b2.values) {
t.Fatalf("unexpected values; got %d; want %d", b1.values, b2.values)
}
if !reflect.DeepEqual(b1.timestamps, b2.timestamps) {
t.Fatalf("unexpected timestamps; got %d; want %d", b1.timestamps, b2.timestamps)
}
if len(b1.values) != int(bhExpected.RowsCount) {
t.Fatalf("unexpected number of values; got %d; want %d", len(b1.values), bhExpected.RowsCount)
}
if len(b1.timestamps) != int(bhExpected.RowsCount) {
t.Fatalf("unexpected number of timestamps; got %d; want %d", len(b1.timestamps), bhExpected.RowsCount)
}
}
func getRandValues(rowsCount int) []int64 {
a := make([]int64, rowsCount)
for i := 0; i < rowsCount; i++ {
a[i] = int64(rand.Intn(1e5) - 0.5e5)
}
return a
}
func getRandTimestamps(rowsCount int) []int64 {
a := make([]int64, rowsCount)
ts := int64(rand.Intn(1e12))
for i := 0; i < rowsCount; i++ {
a[i] = ts
ts += int64(rand.Intn(1e5))
}
return a
}