mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
app/vminsert: allows parsing tenant id from labels (#3009)
* app/vminsert: allows parsing tenant id from labels it should help mitigate issues with vmagent's multiTenant mode, which works incorrectly at heavy load and it cannot handle more then 100 different tenants. This functional hidden with flag and do not change vminsert default behaviour https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2970 * Update docs/Cluster-VictoriaMetrics.md Co-authored-by: Roman Khavronenko <roman@victoriametrics.com> * wip * app/vminsert/netstorage: clean remaining labels in order to free up GC * docs/Cluster-VictoriaMetrics.md: typo fix * wip * wip Co-authored-by: Roman Khavronenko <roman@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
39ba55dbb3
commit
505d359b39
35
README.md
35
README.md
@ -39,7 +39,9 @@ It increases cluster availability, and simplifies cluster maintenance as well as
|
||||
|
||||
VictoriaMetrics cluster supports multiple isolated tenants (aka namespaces).
|
||||
Tenants are identified by `accountID` or `accountID:projectID`, which are put inside request urls.
|
||||
See [these docs](#url-format) for details. Some facts about tenants in VictoriaMetrics:
|
||||
See [these docs](#url-format) for details.
|
||||
|
||||
Some facts about tenants in VictoriaMetrics:
|
||||
|
||||
- Each `accountID` and `projectID` is identified by an arbitrary 32-bit integer in the range `[0 .. 2^32)`.
|
||||
If `projectID` is missing, then it is automatically assigned to `0`. It is expected that other information about tenants
|
||||
@ -55,6 +57,33 @@ when different tenants have different amounts of data and different query load.
|
||||
|
||||
- VictoriaMetrics doesn't support querying multiple tenants in a single request.
|
||||
|
||||
See also [multitenancy via labels](#multitenancy-via-labels).
|
||||
|
||||
|
||||
## Multitenancy via labels
|
||||
|
||||
`vminsert` can accept data from multiple [tenants](#multitenancy) via a special `multitenant` endpoints `http://vminsert:8480/insert/multitenant/<suffix>`,
|
||||
where `<suffix>` can be replaced with any supported suffix for data ingestion from [this list](#url-format).
|
||||
In this case the account id and project id are obtained from optional `vm_account_id` and `vm_project_id` labels of the incoming samples.
|
||||
If `vm_account_id` or `vm_project_id` labels are missing or invalid, then the corresponding `accountID` or `projectID` is set to 0.
|
||||
These labels are automatically removed from samples before forwarding them to `vmstorage`.
|
||||
For example, if the following samples are written into `http://vminsert:8480/insert/multitenant/prometheus/api/v1/write`:
|
||||
|
||||
```
|
||||
http_requests_total{path="/foo",vm_account_id="42"} 12
|
||||
http_requests_total{path="/bar",vm_account_id="7",vm_project_id="9"} 34
|
||||
```
|
||||
|
||||
Then the `http_requests_total{path="/foo"} 12` would be stored in the tenant `accountID=42, projectID=0`,
|
||||
while the `http_requests_total{path="/bar"} 34` would be stored in the tenant `accountID=7, projectID=9`.
|
||||
|
||||
The `vm_account_id` and `vm_project_id` labels are extracted after applying the [relabeling](https://docs.victoriametrics.com/relabeling.html)
|
||||
set via `-relabelConfig` command-line flag, so these labels can be set at this stage.
|
||||
|
||||
**Security considerations:** it is recommended restricting access to `multitenant` endpoints only to trusted sources,
|
||||
since untrusted source may break per-tenant data by writing unwanted samples to aribtrary tenants.
|
||||
|
||||
|
||||
## Binaries
|
||||
|
||||
Compiled binaries for the cluster version are available in the `assets` section of the [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
|
||||
@ -214,7 +243,9 @@ See [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html
|
||||
|
||||
- URLs for data ingestion: `http://<vminsert>:8480/insert/<accountID>/<suffix>`, where:
|
||||
- `<accountID>` is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). It is possible to set it as `accountID:projectID`,
|
||||
where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`.
|
||||
where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. See [multitenancy docs](#multitenancy) for more details.
|
||||
The `<accountID>` can be set to `multitenant` string, e.g. `http://<vminsert>:8480/insert/multitenant/<suffix>`. Such urls accept data from multiple tenants
|
||||
specified via `vm_account_id` and `vm_project_id` labels. See [multitenancy via labels](#multitenancy-via-labels) for more details.
|
||||
- `<suffix>` may have the following values:
|
||||
- `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
|
||||
- `datadog/api/v1/series` - for inserting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
|
||||
|
@ -142,6 +142,10 @@ While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx
|
||||
|
||||
By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`.
|
||||
|
||||
[VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html) supports writing data to multiple tenants
|
||||
specified via special labels - see [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels).
|
||||
This allows specifying tenant ids via [relabeling](#relabeling) and writing multitenant data to a single `-remoteWrite.url=http://<vminsert-addr>/insert/multitenant/api/v1/write`.
|
||||
|
||||
[Multitenancy](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) support is enabled when `-remoteWrite.multitenantURL` command-line flag is set. In this case `vmagent` accepts multitenant data at `http://vmagent:8429/insert/<accountID>/...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert/<accountID>/prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants.
|
||||
|
||||
If `-remoteWrite.multitenantURL` command-line flag is set and `vmagent` is configured to scrape Prometheus-compatible targets (e.g. if `-promscrape.config` command-line flag is set)
|
||||
|
@ -38,6 +38,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
||||
defer netstorage.PutInsertCtx(ctx)
|
||||
|
||||
ctx.Reset() // This line is required for initializing ctx internals.
|
||||
perTenantRows := make(map[auth.Token]int)
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
@ -59,12 +60,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
||||
continue
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||
atLocal := ctx.GetLocalAuthToken(at)
|
||||
if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
perTenantRows[*atLocal]++
|
||||
}
|
||||
rowsInserted.Add(len(rows))
|
||||
rowsTenantInserted.Get(at).Add(len(rows))
|
||||
rowsTenantInserted.MultiAdd(perTenantRows)
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return ctx.FlushBufs()
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars
|
||||
|
||||
ctx.Reset()
|
||||
rowsTotal := 0
|
||||
perTenantRows := make(map[auth.Token]int)
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
for i := range series {
|
||||
ss := &series[i]
|
||||
@ -74,18 +75,20 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars
|
||||
continue
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels)
|
||||
atLocal := ctx.GetLocalAuthToken(at)
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
|
||||
for _, pt := range ss.Points {
|
||||
timestamp := pt.Timestamp()
|
||||
value := pt.Value()
|
||||
if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
|
||||
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
perTenantRows[*atLocal] += len(ss.Points)
|
||||
}
|
||||
rowsInserted.Add(rowsTotal)
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
rowsTenantInserted.MultiAdd(perTenantRows)
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return ctx.FlushBufs()
|
||||
}
|
||||
|
@ -35,7 +35,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
|
||||
defer netstorage.PutInsertCtx(ctx)
|
||||
|
||||
ctx.Reset() // This line is required for initializing ctx internals.
|
||||
atCopy := *at
|
||||
var atCopy auth.Token
|
||||
if at != nil {
|
||||
atCopy = *at
|
||||
}
|
||||
perTenantRows := make(map[auth.Token]int)
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
@ -63,13 +67,14 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
|
||||
continue
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
if err := ctx.WriteDataPoint(&atCopy, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||
atLocal := ctx.GetLocalAuthToken(&atCopy)
|
||||
if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
perTenantRows[*atLocal]++
|
||||
}
|
||||
// Assume that all the rows for a single connection belong to the same (AccountID, ProjectID).
|
||||
rowsInserted.Add(len(rows))
|
||||
rowsTenantInserted.Get(&atCopy).Add(len(rows))
|
||||
rowsTenantInserted.MultiAdd(perTenantRows)
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return ctx.FlushBufs()
|
||||
}
|
||||
|
@ -73,7 +73,11 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
|
||||
ic := &ctx.Common
|
||||
ic.Reset() // This line is required for initializing ic internals.
|
||||
rowsTotal := 0
|
||||
atCopy := *at
|
||||
var atCopy auth.Token
|
||||
if at != nil {
|
||||
atCopy = *at
|
||||
}
|
||||
perTenantRows := make(map[auth.Token]int)
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
@ -115,8 +119,6 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
|
||||
metricGroupPrefixLen := len(ctx.metricGroupBuf)
|
||||
if hasRelabeling {
|
||||
ctx.originLabels = append(ctx.originLabels[:0], ic.Labels...)
|
||||
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, nil)
|
||||
metricNameBufLen := len(ic.MetricNameBuf)
|
||||
for j := range r.Fields {
|
||||
f := &r.Fields[j]
|
||||
if !skipFieldKey {
|
||||
@ -130,19 +132,22 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
|
||||
// Skip metric without labels.
|
||||
continue
|
||||
}
|
||||
ic.MetricNameBuf = ic.MetricNameBuf[:metricNameBufLen]
|
||||
ic.SortLabelsIfNeeded()
|
||||
atLocal := ic.GetLocalAuthToken(&atCopy)
|
||||
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, nil)
|
||||
for i := range ic.Labels {
|
||||
ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &ic.Labels[i])
|
||||
}
|
||||
storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels)
|
||||
if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil {
|
||||
storageNodeIdx := ic.GetStorageNodeIdx(atLocal, ic.Labels)
|
||||
if err := ic.WriteDataPointExt(storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
perTenantRows[*atLocal]++
|
||||
}
|
||||
} else {
|
||||
ic.SortLabelsIfNeeded()
|
||||
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, ic.Labels)
|
||||
atLocal := ic.GetLocalAuthToken(&atCopy)
|
||||
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ic.Labels)
|
||||
metricNameBufLen := len(ic.MetricNameBuf)
|
||||
labelsLen := len(ic.Labels)
|
||||
for j := range r.Fields {
|
||||
@ -159,15 +164,16 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
|
||||
}
|
||||
ic.MetricNameBuf = ic.MetricNameBuf[:metricNameBufLen]
|
||||
ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &ic.Labels[len(ic.Labels)-1])
|
||||
storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels)
|
||||
if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil {
|
||||
storageNodeIdx := ic.GetStorageNodeIdx(atLocal, ic.Labels)
|
||||
if err := ic.WriteDataPointExt(storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
perTenantRows[*atLocal]++
|
||||
}
|
||||
}
|
||||
}
|
||||
rowsInserted.Add(rowsTotal)
|
||||
rowsTenantInserted.Get(&atCopy).Add(rowsTotal)
|
||||
rowsTenantInserted.MultiAdd(perTenantRows)
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return ic.FlushBufs()
|
||||
}
|
||||
|
@ -108,20 +108,17 @@ func main() {
|
||||
}
|
||||
if len(*graphiteListenAddr) > 0 {
|
||||
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, func(r io.Reader) error {
|
||||
var at auth.Token // TODO: properly initialize auth token
|
||||
return graphite.InsertHandler(&at, r)
|
||||
return graphite.InsertHandler(nil, r)
|
||||
})
|
||||
}
|
||||
if len(*influxListenAddr) > 0 {
|
||||
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
|
||||
var at auth.Token // TODO: properly initialize auth token
|
||||
return influx.InsertHandlerForReader(&at, r)
|
||||
return influx.InsertHandlerForReader(nil, r)
|
||||
})
|
||||
}
|
||||
if len(*opentsdbListenAddr) > 0 {
|
||||
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, func(r io.Reader) error {
|
||||
var at auth.Token // TODO: properly initialize auth token
|
||||
return opentsdb.InsertHandler(&at, r)
|
||||
return opentsdb.InsertHandler(nil, r)
|
||||
}, opentsdbhttp.InsertHandler)
|
||||
}
|
||||
if len(*opentsdbHTTPListenAddr) > 0 {
|
||||
|
@ -44,7 +44,9 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal
|
||||
// since relabeling can prevent from inserting the rows.
|
||||
rowsLen := len(block.Values)
|
||||
rowsInserted.Add(rowsLen)
|
||||
rowsTenantInserted.Get(at).Add(rowsLen)
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsLen)
|
||||
}
|
||||
rowsPerInsert.Update(float64(rowsLen))
|
||||
|
||||
ctx.Reset() // This line is required for initializing ctx internals.
|
||||
@ -68,8 +70,9 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal
|
||||
return nil
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels)
|
||||
atLocal := ctx.GetLocalAuthToken(at)
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
|
||||
values := block.Values
|
||||
timestamps := block.Timestamps
|
||||
if len(timestamps) != len(values) {
|
||||
@ -77,9 +80,15 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal
|
||||
}
|
||||
for j, value := range values {
|
||||
timestamp := timestamps[j]
|
||||
if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
|
||||
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return ctx.FlushBufs()
|
||||
if err := ctx.FlushBufs(); err != nil {
|
||||
return err
|
||||
}
|
||||
if at == nil {
|
||||
rowsTenantInserted.Get(atLocal).Add(rowsLen)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package netstorage
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
@ -25,6 +26,8 @@ type InsertCtx struct {
|
||||
labelsBuf []byte
|
||||
|
||||
relabelCtx relabel.Ctx
|
||||
|
||||
at auth.Token
|
||||
}
|
||||
|
||||
type bufRows struct {
|
||||
@ -68,6 +71,7 @@ func (ctx *InsertCtx) Reset() {
|
||||
}
|
||||
ctx.labelsBuf = ctx.labelsBuf[:0]
|
||||
ctx.relabelCtx.Reset()
|
||||
ctx.at.Set(0, 0)
|
||||
}
|
||||
|
||||
// AddLabelBytes adds (name, value) label to ctx.Labels.
|
||||
@ -115,11 +119,11 @@ func (ctx *InsertCtx) ApplyRelabeling() {
|
||||
func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error {
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(at, labels)
|
||||
return ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value)
|
||||
return ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value)
|
||||
}
|
||||
|
||||
// WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx.
|
||||
func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
|
||||
func (ctx *InsertCtx) WriteDataPointExt(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
|
||||
br := &ctx.bufRowss[storageNodeIdx]
|
||||
sn := storageNodes[storageNodeIdx]
|
||||
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value)
|
||||
@ -181,3 +185,46 @@ func marshalBytesFast(dst []byte, s []byte) []byte {
|
||||
dst = append(dst, s...)
|
||||
return dst
|
||||
}
|
||||
|
||||
// GetLocalAuthToken obtains auth.Token from context labels vm_account_id and vm_project_id if at is nil.
|
||||
//
|
||||
// At is returned as is if it isn't nil.
|
||||
//
|
||||
// The vm_account_id and vm_project_id labels are automatically removed from the ctx.
|
||||
func (ctx *InsertCtx) GetLocalAuthToken(at *auth.Token) *auth.Token {
|
||||
if at != nil {
|
||||
return at
|
||||
}
|
||||
accountID := uint32(0)
|
||||
projectID := uint32(0)
|
||||
tmpLabels := ctx.Labels[:0]
|
||||
for _, label := range ctx.Labels {
|
||||
if string(label.Name) == "vm_account_id" {
|
||||
accountID = parseUint32(label.Value)
|
||||
continue
|
||||
}
|
||||
if string(label.Name) == "vm_project_id" {
|
||||
projectID = parseUint32(label.Value)
|
||||
continue
|
||||
}
|
||||
tmpLabels = append(tmpLabels, label)
|
||||
}
|
||||
cleanLabels := ctx.Labels[len(tmpLabels):]
|
||||
for i := range cleanLabels {
|
||||
label := &cleanLabels[i]
|
||||
label.Name = nil
|
||||
label.Value = nil
|
||||
}
|
||||
ctx.Labels = tmpLabels
|
||||
ctx.at.Set(accountID, projectID)
|
||||
return &ctx.at
|
||||
}
|
||||
|
||||
func parseUint32(b []byte) uint32 {
|
||||
s := bytesutil.ToUnsafeString(b)
|
||||
n, err := strconv.ParseUint(s, 10, 32)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return uint32(n)
|
||||
}
|
||||
|
@ -35,7 +35,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
|
||||
defer netstorage.PutInsertCtx(ctx)
|
||||
|
||||
ctx.Reset() // This line is required for initializing ctx internals.
|
||||
atCopy := *at
|
||||
var atCopy auth.Token
|
||||
if at != nil {
|
||||
atCopy = *at
|
||||
}
|
||||
perTenantRows := make(map[auth.Token]int)
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
@ -63,13 +67,14 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
|
||||
continue
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
if err := ctx.WriteDataPoint(&atCopy, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||
atLocal := ctx.GetLocalAuthToken(&atCopy)
|
||||
if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
perTenantRows[*atLocal]++
|
||||
}
|
||||
// Assume that all the rows for a single connection belong to the same (AccountID, ProjectID).
|
||||
rowsInserted.Add(len(rows))
|
||||
rowsTenantInserted.Get(&atCopy).Add(len(rows))
|
||||
rowsTenantInserted.MultiAdd(perTenantRows)
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return ctx.FlushBufs()
|
||||
}
|
||||
|
@ -57,8 +57,8 @@ func InsertHandler(req *http.Request) error {
|
||||
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := netstorage.GetInsertCtx()
|
||||
defer netstorage.PutInsertCtx(ctx)
|
||||
|
||||
ctx.Reset() // This line is required for initializing ctx internals.
|
||||
perTenantRows := make(map[auth.Token]int)
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
@ -80,12 +80,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
||||
continue
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||
atLocal := ctx.GetLocalAuthToken(at)
|
||||
if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
perTenantRows[*atLocal]++
|
||||
}
|
||||
rowsInserted.Add(len(rows))
|
||||
rowsTenantInserted.Get(at).Add(len(rows))
|
||||
rowsTenantInserted.MultiAdd(perTenantRows)
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return ctx.FlushBufs()
|
||||
}
|
||||
|
@ -43,6 +43,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
||||
defer netstorage.PutInsertCtx(ctx)
|
||||
|
||||
ctx.Reset() // This line is required for initializing ctx internals.
|
||||
perTenantRows := make(map[auth.Token]int)
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
@ -64,12 +65,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
||||
continue
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||
atLocal := ctx.GetLocalAuthToken(at)
|
||||
if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
perTenantRows[*atLocal]++
|
||||
}
|
||||
rowsInserted.Add(len(rows))
|
||||
rowsTenantInserted.Get(at).Add(len(rows))
|
||||
rowsTenantInserted.MultiAdd(perTenantRows)
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return ctx.FlushBufs()
|
||||
}
|
||||
|
@ -41,6 +41,7 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr
|
||||
|
||||
ctx.Reset() // This line is required for initializing ctx internals.
|
||||
rowsTotal := 0
|
||||
perTenantRows := make(map[auth.Token]int)
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
for i := range timeseries {
|
||||
ts := ×eries[i]
|
||||
@ -62,21 +63,23 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr
|
||||
continue
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels)
|
||||
atLocal := ctx.GetLocalAuthToken(at)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
|
||||
ctx.MetricNameBuf = ctx.MetricNameBuf[:0]
|
||||
samples := ts.Samples
|
||||
for i := range samples {
|
||||
r := &samples[i]
|
||||
if len(ctx.MetricNameBuf) == 0 {
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels)
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
|
||||
}
|
||||
if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, r.Timestamp, r.Value); err != nil {
|
||||
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, r.Timestamp, r.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
perTenantRows[*atLocal] += len(ts.Samples)
|
||||
}
|
||||
rowsInserted.Add(rowsTotal)
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
rowsTenantInserted.MultiAdd(perTenantRows)
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return ctx.FlushBufs()
|
||||
}
|
||||
|
@ -44,6 +44,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
||||
|
||||
ctx.Reset() // This line is required for initializing ctx internals.
|
||||
rowsTotal := 0
|
||||
perTenantRows := make(map[auth.Token]int)
|
||||
hasRelabeling := relabel.HasRelabeling()
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
@ -65,8 +66,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
||||
continue
|
||||
}
|
||||
ctx.SortLabelsIfNeeded()
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels)
|
||||
atLocal := ctx.GetLocalAuthToken(at)
|
||||
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
|
||||
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
|
||||
values := r.Values
|
||||
timestamps := r.Timestamps
|
||||
if len(timestamps) != len(values) {
|
||||
@ -74,13 +76,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
||||
}
|
||||
for j, value := range values {
|
||||
timestamp := timestamps[j]
|
||||
if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
|
||||
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
perTenantRows[*atLocal] += len(r.Values)
|
||||
}
|
||||
rowsInserted.Add(rowsTotal)
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
rowsTenantInserted.MultiAdd(perTenantRows)
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return ctx.FlushBufs()
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||
|
||||
**Update note 2:** [vmalert](https://docs.victoriametrics.com/vmalert.html) changes default value for command-line flag `-datasource.queryStep` from `0s` to `5m`. The change supposed to improve reliability of the rules evaluation when evaluation interval is lower than scraping interval.
|
||||
|
||||
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): support specifying tenant ids via `vm_account_id` and `vm_project_id` labels. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2970).
|
||||
* FEATURE: improve [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) performance by up to 3x if non-trivial `regex` values are used.
|
||||
* FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105).
|
||||
* FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113).
|
||||
|
@ -43,7 +43,9 @@ It increases cluster availability, and simplifies cluster maintenance as well as
|
||||
|
||||
VictoriaMetrics cluster supports multiple isolated tenants (aka namespaces).
|
||||
Tenants are identified by `accountID` or `accountID:projectID`, which are put inside request urls.
|
||||
See [these docs](#url-format) for details. Some facts about tenants in VictoriaMetrics:
|
||||
See [these docs](#url-format) for details.
|
||||
|
||||
Some facts about tenants in VictoriaMetrics:
|
||||
|
||||
- Each `accountID` and `projectID` is identified by an arbitrary 32-bit integer in the range `[0 .. 2^32)`.
|
||||
If `projectID` is missing, then it is automatically assigned to `0`. It is expected that other information about tenants
|
||||
@ -59,6 +61,30 @@ when different tenants have different amounts of data and different query load.
|
||||
|
||||
- VictoriaMetrics doesn't support querying multiple tenants in a single request.
|
||||
|
||||
See also [multitenancy via labels](#multitenancy-via-labels).
|
||||
|
||||
|
||||
## Multitenancy via labels
|
||||
|
||||
`vminsert` can accept data from multiple [tenants](#multitenancy) via a special `multitenant` endpoints `http://vminsert:8480/insert/multitenant/<suffix>`,
|
||||
where `<suffix>` can be replaced with any supported suffix for data ingestion from [this list](#url-format).
|
||||
In this case the account id and project id are obtained from optional `vm_account_id` and `vm_project_id` labels of the incoming samples.
|
||||
If `vm_account_id` or `vm_project_id` labels are missing or invalid, then the corresponding `accountID` or `projectID` is set to 0.
|
||||
These labels are automatically removed from samples before forwarding them to `vmstorage`.
|
||||
For example, if the following samples are written into `http://vminsert:8480/insert/multitenant/prometheus/api/v1/write`:
|
||||
|
||||
```
|
||||
http_requests_total{path="/foo",vm_account_id="42"} 12
|
||||
http_requests_total{path="/bar",vm_account_id="7",vm_project_id="9"} 34
|
||||
```
|
||||
|
||||
Then the `http_requests_total{path="/foo"} 12` would be stored in the tenant `accountID=42, projectID=0`,
|
||||
while the `http_requests_total{path="/bar"} 34` would be stored in the tenant `accountID=7, projectID=9`.
|
||||
|
||||
The `vm_account_id` and `vm_project_id` labels are extracted after applying the [relabeling](https://docs.victoriametrics.com/relabeling.html)
|
||||
set via `-relabelConfig` command-line flag, so these labels can be set at this stage.
|
||||
|
||||
|
||||
## Binaries
|
||||
|
||||
Compiled binaries for the cluster version are available in the `assets` section of the [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
|
||||
@ -218,7 +244,9 @@ See [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html
|
||||
|
||||
- URLs for data ingestion: `http://<vminsert>:8480/insert/<accountID>/<suffix>`, where:
|
||||
- `<accountID>` is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). It is possible to set it as `accountID:projectID`,
|
||||
where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`.
|
||||
where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. See [multitenancy docs](#multitenancy) for more details.
|
||||
The `<accountID>` can be set to `multitenant` string, e.g. `http://<vminsert>:8480/insert/multitenant/<suffix>`. Such urls accept data from multiple tenants
|
||||
specified via `vm_account_id` and `vm_project_id` labels. See [multitenancy via labels](#multitenancy-via-labels) for more details.
|
||||
- `<suffix>` may have the following values:
|
||||
- `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
|
||||
- `datadog/api/v1/series` - for inserting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details.
|
||||
|
@ -146,6 +146,10 @@ While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx
|
||||
|
||||
By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`.
|
||||
|
||||
[VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html) supports writing data to multiple tenants
|
||||
specified via special labels - see [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels).
|
||||
This allows specifying tenant ids via [relabeling](#relabeling) and writing multitenant data to a single `-remoteWrite.url=http://<vminsert-addr>/insert/multitenant/api/v1/write`.
|
||||
|
||||
[Multitenancy](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) support is enabled when `-remoteWrite.multitenantURL` command-line flag is set. In this case `vmagent` accepts multitenant data at `http://vmagent:8429/insert/<accountID>/...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert/<accountID>/prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants.
|
||||
|
||||
If `-remoteWrite.multitenantURL` command-line flag is set and `vmagent` is configured to scrape Prometheus-compatible targets (e.g. if `-promscrape.config` command-line flag is set)
|
||||
|
@ -14,30 +14,54 @@ type Token struct {
|
||||
|
||||
// String returns string representation of t.
|
||||
func (t *Token) String() string {
|
||||
if t == nil {
|
||||
return "multitenant"
|
||||
}
|
||||
if t.ProjectID == 0 {
|
||||
return fmt.Sprintf("%d", t.AccountID)
|
||||
}
|
||||
return fmt.Sprintf("%d:%d", t.AccountID, t.ProjectID)
|
||||
}
|
||||
|
||||
// NewToken returns new Token for the given authToken
|
||||
// NewToken returns new Token for the given authToken.
|
||||
//
|
||||
// If authToken == "multitenant", then nil Token is returned.
|
||||
func NewToken(authToken string) (*Token, error) {
|
||||
if authToken == "multitenant" {
|
||||
return nil, nil
|
||||
}
|
||||
var t Token
|
||||
if err := t.Init(authToken); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &t, nil
|
||||
}
|
||||
|
||||
// Init initializes t from authToken.
|
||||
func (t *Token) Init(authToken string) error {
|
||||
tmp := strings.Split(authToken, ":")
|
||||
if len(tmp) > 2 {
|
||||
return nil, fmt.Errorf("unexpected number of items in authToken %q; got %d; want 1 or 2", authToken, len(tmp))
|
||||
return fmt.Errorf("unexpected number of items in authToken %q; got %d; want 1 or 2", authToken, len(tmp))
|
||||
}
|
||||
var at Token
|
||||
accountID, err := strconv.ParseUint(tmp[0], 10, 32)
|
||||
n, err := strconv.ParseUint(tmp[0], 10, 32)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse accountID from %q: %w", tmp[0], err)
|
||||
return fmt.Errorf("cannot parse accountID from %q: %w", tmp[0], err)
|
||||
}
|
||||
at.AccountID = uint32(accountID)
|
||||
accountID := uint32(n)
|
||||
projectID := uint32(0)
|
||||
if len(tmp) > 1 {
|
||||
projectID, err := strconv.ParseUint(tmp[1], 10, 32)
|
||||
n, err := strconv.ParseUint(tmp[1], 10, 32)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse projectID from %q: %w", tmp[1], err)
|
||||
return fmt.Errorf("cannot parse projectID from %q: %w", tmp[1], err)
|
||||
}
|
||||
at.ProjectID = uint32(projectID)
|
||||
projectID = uint32(n)
|
||||
}
|
||||
return &at, nil
|
||||
t.Set(accountID, projectID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set sets accountID and projectID for the t.
|
||||
func (t *Token) Set(accountID, projectID uint32) {
|
||||
t.AccountID = accountID
|
||||
t.ProjectID = projectID
|
||||
}
|
||||
|
@ -26,6 +26,8 @@ func TestNewTokenSuccess(t *testing.T) {
|
||||
f("1:4294967295", "1:4294967295")
|
||||
// max uint32 accountID and projectID
|
||||
f("4294967295:4294967295", "4294967295:4294967295")
|
||||
// multitenant
|
||||
f("multitenant", "multitenant")
|
||||
}
|
||||
|
||||
func TestNewTokenFailure(t *testing.T) {
|
||||
|
@ -22,6 +22,8 @@ func ParsePath(path string) (*Path, error) {
|
||||
//
|
||||
// - prefix must contain `select`, `insert` or `delete`.
|
||||
// - authToken contains `accountID[:projectID]`, where projectID is optional.
|
||||
// authToken may also contain `multitenant` string. In this case the accountID and projectID
|
||||
// are obtained from vm_account_id and vm_project_id labels of the ingested samples.
|
||||
// - suffix contains arbitrary suffix.
|
||||
//
|
||||
// prefix must be used for the routing to the appropriate service
|
||||
@ -29,14 +31,14 @@ func ParsePath(path string) (*Path, error) {
|
||||
s := skipPrefixSlashes(path)
|
||||
n := strings.IndexByte(s, '/')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("cannot find {prefix}")
|
||||
return nil, fmt.Errorf("cannot find {prefix} in %q; expecting /{prefix}/{authToken}/{suffix} format", path)
|
||||
}
|
||||
prefix := s[:n]
|
||||
|
||||
s = skipPrefixSlashes(s[n+1:])
|
||||
n = strings.IndexByte(s, '/')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("cannot find {authToken}")
|
||||
return nil, fmt.Errorf("cannot find {authToken} in %q; expecting /{prefix}/{authToken}/{suffix} format", path)
|
||||
}
|
||||
authToken := s[:n]
|
||||
|
||||
|
@ -39,6 +39,13 @@ func (cm *CounterMap) Get(at *auth.Token) *metrics.Counter {
|
||||
return cm.GetByTenant(key)
|
||||
}
|
||||
|
||||
// MultiAdd adds multiple values grouped by auth.Token
|
||||
func (cm *CounterMap) MultiAdd(perTenantValues map[auth.Token]int) {
|
||||
for token, value := range perTenantValues {
|
||||
cm.Get(&token).Add(value)
|
||||
}
|
||||
}
|
||||
|
||||
// GetByTenant returns counter for the given key.
|
||||
func (cm *CounterMap) GetByTenant(key TenantID) *metrics.Counter {
|
||||
m := cm.m.Load().(map[TenantID]*metrics.Counter)
|
||||
|
Loading…
Reference in New Issue
Block a user