VictoriaMetrics/app/vminsert/promremotewrite/request_handler.go

66 lines
1.8 KiB
Go

package promremotewrite
import (
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="promremotewrite"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="promremotewrite"}`)
)
// InsertHandler processes remote write for prometheus.
func InsertHandler(req *http.Request) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, insertRows)
})
}
func insertRows(timeseries []prompb.TimeSeries) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
rowsLen := 0
for i := range timeseries {
rowsLen += len(timeseries[i].Samples)
}
ctx.Reset(rowsLen)
rowsTotal := 0
hasRelabeling := relabel.HasRelabeling()
for i := range timeseries {
ts := &timeseries[i]
ctx.Labels = ctx.Labels[:0]
srcLabels := ts.Labels
for _, srcLabel := range srcLabels {
ctx.AddLabelBytes(srcLabel.Name, srcLabel.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
var metricNameRaw []byte
var err error
for i := range ts.Samples {
r := &ts.Samples[i]
metricNameRaw, err = ctx.WriteDataPointExt(metricNameRaw, ctx.Labels, r.Timestamp, r.Value)
if err != nil {
return err
}
}
rowsTotal += len(ts.Samples)
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
}