VictoriaMetrics/app/vminsert/opentsdbhttp/request_handler.go
Aliaksandr Valialkin 512addc608 app/{vminsert,vmagent}: add -sortLabels command-line option for sorting time series labels before ingesting them in the storage
This option can be useful when samples for the same time series are ingested with distinct order of labels.
For example, metric{k1="v1",k2="v2"} and metric{k2="v2",k1="v1"}.
2021-03-31 23:27:21 +03:00

92 lines
2.9 KiB
Go

package opentsdbhttp
import (
"fmt"
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="opentsdbhttp"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="opentsdbhttp"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="opentsdbhttp"}`)
)
// InsertHandler processes HTTP OpenTSDB put requests.
// See http://opentsdb.net/docs/build/html/api_http/put.html
func InsertHandler(req *http.Request) error {
path := req.URL.Path
p, err := httpserver.ParsePath(path)
if err != nil {
return fmt.Errorf("cannot parse path %q: %w", path, err)
}
if p.Prefix != "insert" {
// This is not our link.
return fmt.Errorf("unexpected path requested on HTTP OpenTSDB server: %q", path)
}
at, err := auth.NewToken(p.AuthToken)
if err != nil {
return fmt.Errorf("auth error: %w", err)
}
switch p.Suffix {
case "api/put", "opentsdb/api/put":
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
})
})
default:
return fmt.Errorf("unexpected path requested on HTTP OpenTSDB server: %q", path)
}
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset() // This line is required for initializing ctx internals.
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", r.Metric)
for j := range r.Tags {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}
}
rowsInserted.Add(len(rows))
rowsTenantInserted.Get(at).Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs()
}