VictoriaMetrics/app/vminsert/prometheus/request_handler.go

157 lines
4.3 KiB
Go
Raw Normal View History

2019-05-22 23:16:55 +02:00
package prometheus
import (
"flag"
2019-05-22 23:16:55 +02:00
"fmt"
"io"
2019-05-22 23:16:55 +02:00
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy"
2019-05-22 23:16:55 +02:00
)
var maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
var (
rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="prometheus"}`)
rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="prometheus"}`)
)
2019-05-22 23:16:55 +02:00
// InsertHandler processes remote write for prometheus.
func InsertHandler(at *auth.Token, r *http.Request) error {
2019-05-22 23:16:55 +02:00
return concurrencylimiter.Do(func() error {
return insertHandlerInternal(at, r)
2019-05-22 23:16:55 +02:00
})
}
func insertHandlerInternal(at *auth.Token, r *http.Request) error {
2019-05-22 23:16:55 +02:00
ctx := getPushCtx()
defer putPushCtx(ctx)
if err := ctx.Read(r); err != nil {
2019-05-22 23:16:55 +02:00
return err
}
2019-05-22 23:23:23 +02:00
2019-05-22 23:16:55 +02:00
ic := &ctx.Common
2019-05-22 23:23:23 +02:00
ic.Reset()
timeseries := ctx.req.Timeseries
rowsTotal := 0
2019-05-22 23:16:55 +02:00
for i := range timeseries {
ts := &timeseries[i]
2019-05-22 23:23:23 +02:00
storageNodeIdx := ic.GetStorageNodeIdx(at, ts.Labels)
ic.MetricNameBuf = ic.MetricNameBuf[:0]
2019-05-22 23:16:55 +02:00
for i := range ts.Samples {
r := &ts.Samples[i]
2019-05-22 23:23:23 +02:00
if len(ic.MetricNameBuf) == 0 {
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], at.AccountID, at.ProjectID, ts.Labels)
}
if err := ic.WriteDataPointExt(at, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, r.Value); err != nil {
return err
}
2019-05-22 23:16:55 +02:00
}
rowsTotal += len(ts.Samples)
2019-05-22 23:16:55 +02:00
}
rowsInserted.Get(at).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
2019-05-22 23:16:55 +02:00
return ic.FlushBufs()
}
type pushCtx struct {
2019-05-22 23:23:23 +02:00
Common netstorage.InsertCtx
2019-05-22 23:16:55 +02:00
req prompb.WriteRequest
reqBuf []byte
}
func (ctx *pushCtx) reset() {
2019-05-22 23:23:23 +02:00
ctx.Common.Reset()
2019-05-22 23:16:55 +02:00
ctx.req.Reset()
ctx.reqBuf = ctx.reqBuf[:0]
}
func (ctx *pushCtx) Read(r *http.Request) error {
2019-05-22 23:16:55 +02:00
prometheusReadCalls.Inc()
var err error
ctx.reqBuf, err = readSnappy(ctx.reqBuf[:0], r.Body)
2019-05-22 23:16:55 +02:00
if err != nil {
prometheusReadErrors.Inc()
return fmt.Errorf("cannot read prompb.WriteRequest: %s", err)
}
if err = ctx.req.Unmarshal(ctx.reqBuf); err != nil {
prometheusUnmarshalErrors.Inc()
return fmt.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %s", len(ctx.reqBuf), err)
}
return nil
}
var (
prometheusReadCalls = metrics.NewCounter(`vm_read_calls_total{name="prometheus"}`)
prometheusReadErrors = metrics.NewCounter(`vm_read_errors_total{name="prometheus"}`)
prometheusUnmarshalErrors = metrics.NewCounter(`vm_unmarshal_errors_total{name="prometheus"}`)
)
func getPushCtx() *pushCtx {
select {
case ctx := <-pushCtxPoolCh:
return ctx
default:
if v := pushCtxPool.Get(); v != nil {
return v.(*pushCtx)
}
return &pushCtx{}
}
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
select {
case pushCtxPoolCh <- ctx:
default:
pushCtxPool.Put(ctx)
}
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
func readSnappy(dst []byte, r io.Reader) ([]byte, error) {
lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1)
bb := bodyBufferPool.Get()
reqLen, err := bb.ReadFrom(lr)
if err != nil {
bodyBufferPool.Put(bb)
return dst, fmt.Errorf("cannot read compressed request: %s", err)
}
if reqLen > int64(*maxInsertRequestSize) {
return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize)
}
buf := dst[len(dst):cap(dst)]
buf, err = snappy.Decode(buf, bb.B)
bodyBufferPool.Put(bb)
if err != nil {
err = fmt.Errorf("cannot decompress request with length %d: %s", reqLen, err)
return dst, err
}
if len(buf) > *maxInsertRequestSize {
return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize)
}
if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] {
dst = dst[:len(dst)+len(buf)]
} else {
dst = append(dst, buf...)
}
return dst, nil
}
var bodyBufferPool bytesutil.ByteBufferPool