2019-05-22 23:16:55 +02:00
package influx
import (
2019-06-14 08:57:13 +02:00
"flag"
2020-02-25 18:09:46 +01:00
"io"
2019-05-22 23:16:55 +02:00
"net/http"
"sync"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
2020-07-02 18:42:12 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-07-23 11:50:41 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
2021-01-12 23:52:50 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2020-02-23 12:35:47 +01:00
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2021-03-29 23:06:56 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/metrics"
2020-05-12 12:13:00 +02:00
"github.com/valyala/fastjson/fastfloat"
2019-05-22 23:16:55 +02:00
)
2019-06-14 08:57:13 +02:00
var (
2021-09-13 16:04:28 +02:00
measurementFieldSeparator = flag . String ( "influxMeasurementFieldSeparator" , "_" , "Separator for '{measurement}{separator}{field_name}' metric name when inserted via InfluxDB line protocol" )
skipSingleField = flag . Bool ( "influxSkipSingleField" , false , "Uses '{measurement}' instead of '{measurement}{separator}{field_name}' for metic name if InfluxDB line contains only a single field" )
2020-07-14 13:17:22 +02:00
skipMeasurement = flag . Bool ( "influxSkipMeasurement" , false , "Uses '{field_name}' as a metric name while ignoring '{measurement}' and '-influxMeasurementFieldSeparator'" )
2019-06-14 08:57:13 +02:00
)
2019-07-27 12:20:47 +02:00
var (
2021-03-29 23:06:56 +02:00
rowsInserted = metrics . NewCounter ( ` vm_rows_inserted_total { type="influx"} ` )
rowsTenantInserted = tenantmetrics . NewCounterMap ( ` vm_tenant_inserted_rows_total { type="influx"} ` )
rowsPerInsert = metrics . NewHistogram ( ` vm_rows_per_insert { type="influx"} ` )
2019-07-27 12:20:47 +02:00
)
2019-05-22 23:16:55 +02:00
2020-02-25 18:09:46 +01:00
// InsertHandlerForReader processes remote write for influx line protocol.
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader ( at * auth . Token , r io . Reader ) error {
return writeconcurrencylimiter . Do ( func ( ) error {
return parser . ParseStream ( r , false , "" , "" , func ( db string , rows [ ] parser . Row ) error {
2021-01-12 23:52:50 +01:00
return insertRows ( at , db , rows , nil , true )
2020-02-25 18:09:46 +01:00
} )
} )
}
// InsertHandlerForHTTP processes remote write for influx line protocol.
2019-05-22 23:16:55 +02:00
//
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
2020-02-25 18:09:46 +01:00
func InsertHandlerForHTTP ( at * auth . Token , req * http . Request ) error {
2021-01-12 23:52:50 +01:00
extraLabels , err := parserCommon . GetExtraLabels ( req )
if err != nil {
return err
}
2020-02-23 12:35:47 +01:00
return writeconcurrencylimiter . Do ( func ( ) error {
2020-02-25 18:09:46 +01:00
isGzipped := req . Header . Get ( "Content-Encoding" ) == "gzip"
q := req . URL . Query ( )
precision := q . Get ( "precision" )
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q . Get ( "db" )
return parser . ParseStream ( req . Body , isGzipped , precision , db , func ( db string , rows [ ] parser . Row ) error {
2021-01-12 23:52:50 +01:00
return insertRows ( at , db , rows , extraLabels , false )
2020-02-23 12:35:47 +01:00
} )
2019-05-22 23:16:55 +02:00
} )
}
2021-01-12 23:52:50 +01:00
func insertRows ( at * auth . Token , db string , rows [ ] parser . Row , extraLabels [ ] prompbmarshal . Label , mayOverrideAccountProjectID bool ) error {
2019-05-22 23:16:55 +02:00
ctx := getPushCtx ( )
defer putPushCtx ( ctx )
ic := & ctx . Common
2020-02-26 20:17:35 +01:00
ic . Reset ( ) // This line is required for initializing ic internals.
2019-07-27 12:20:47 +02:00
rowsTotal := 0
2020-05-12 12:13:00 +02:00
atCopy := * at
2020-07-02 18:42:12 +02:00
hasRelabeling := relabel . HasRelabeling ( )
2019-05-22 23:16:55 +02:00
for i := range rows {
r := & rows [ i ]
2020-10-09 12:29:27 +02:00
rowsTotal += len ( r . Fields )
2019-05-22 23:16:55 +02:00
ic . Labels = ic . Labels [ : 0 ]
2020-07-28 20:23:01 +02:00
hasDBKey := false
2019-05-22 23:16:55 +02:00
for j := range r . Tags {
tag := & r . Tags [ j ]
2020-05-12 12:13:00 +02:00
if mayOverrideAccountProjectID {
// Multi-tenancy support via custom tags.
if tag . Key == "VictoriaMetrics_AccountID" {
atCopy . AccountID = uint32 ( fastfloat . ParseUint64BestEffort ( tag . Value ) )
}
if tag . Key == "VictoriaMetrics_ProjectID" {
atCopy . ProjectID = uint32 ( fastfloat . ParseUint64BestEffort ( tag . Value ) )
}
}
2019-08-24 12:51:51 +02:00
if tag . Key == "db" {
2020-07-28 20:23:01 +02:00
hasDBKey = true
2019-08-24 12:51:51 +02:00
}
2019-05-22 23:16:55 +02:00
ic . AddLabel ( tag . Key , tag . Value )
}
2020-07-28 20:23:01 +02:00
if ! hasDBKey {
ic . AddLabel ( "db" , db )
}
2021-01-12 23:52:50 +01:00
for j := range extraLabels {
label := & extraLabels [ j ]
ic . AddLabel ( label . Name , label . Value )
}
2020-07-14 13:17:22 +02:00
ctx . metricGroupBuf = ctx . metricGroupBuf [ : 0 ]
if ! * skipMeasurement {
ctx . metricGroupBuf = append ( ctx . metricGroupBuf , r . Measurement ... )
}
2021-03-22 12:53:42 +01:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139
skipFieldKey := len ( r . Measurement ) > 0 && len ( r . Fields ) == 1 && * skipSingleField
2019-11-30 20:54:34 +01:00
if len ( ctx . metricGroupBuf ) > 0 && ! skipFieldKey {
2019-06-14 09:51:57 +02:00
ctx . metricGroupBuf = append ( ctx . metricGroupBuf , * measurementFieldSeparator ... )
}
2019-05-22 23:16:55 +02:00
metricGroupPrefixLen := len ( ctx . metricGroupBuf )
2020-07-02 22:13:13 +02:00
if hasRelabeling {
2020-07-23 11:50:41 +02:00
ctx . originLabels = append ( ctx . originLabels [ : 0 ] , ic . Labels ... )
ic . MetricNameBuf = storage . MarshalMetricNameRaw ( ic . MetricNameBuf [ : 0 ] , atCopy . AccountID , atCopy . ProjectID , nil )
metricNameBufLen := len ( ic . MetricNameBuf )
for j := range r . Fields {
f := & r . Fields [ j ]
if ! skipFieldKey {
ctx . metricGroupBuf = append ( ctx . metricGroupBuf [ : metricGroupPrefixLen ] , f . Key ... )
}
metricGroup := bytesutil . ToUnsafeString ( ctx . metricGroupBuf )
ic . Labels = append ( ic . Labels [ : 0 ] , ctx . originLabels ... )
ic . AddLabel ( "" , metricGroup )
ic . ApplyRelabeling ( )
if len ( ic . Labels ) == 0 {
// Skip metric without labels.
continue
}
ic . MetricNameBuf = ic . MetricNameBuf [ : metricNameBufLen ]
2021-03-31 22:12:56 +02:00
ic . SortLabelsIfNeeded ( )
2020-07-23 11:50:41 +02:00
for i := range ic . Labels {
ic . MetricNameBuf = storage . MarshalMetricLabelRaw ( ic . MetricNameBuf , & ic . Labels [ i ] )
}
storageNodeIdx := ic . GetStorageNodeIdx ( & atCopy , ic . Labels )
if err := ic . WriteDataPointExt ( & atCopy , storageNodeIdx , ic . MetricNameBuf , r . Timestamp , f . Value ) ; err != nil {
return err
}
2020-07-02 18:42:12 +02:00
}
2020-07-23 11:50:41 +02:00
} else {
2021-03-31 22:12:56 +02:00
ic . SortLabelsIfNeeded ( )
2020-07-23 11:50:41 +02:00
ic . MetricNameBuf = storage . MarshalMetricNameRaw ( ic . MetricNameBuf [ : 0 ] , atCopy . AccountID , atCopy . ProjectID , ic . Labels )
metricNameBufLen := len ( ic . MetricNameBuf )
labelsLen := len ( ic . Labels )
for j := range r . Fields {
f := & r . Fields [ j ]
if ! skipFieldKey {
ctx . metricGroupBuf = append ( ctx . metricGroupBuf [ : metricGroupPrefixLen ] , f . Key ... )
}
metricGroup := bytesutil . ToUnsafeString ( ctx . metricGroupBuf )
ic . Labels = ic . Labels [ : labelsLen ]
ic . AddLabel ( "" , metricGroup )
if len ( ic . Labels ) == 0 {
// Skip metric without labels.
continue
}
ic . MetricNameBuf = ic . MetricNameBuf [ : metricNameBufLen ]
ic . MetricNameBuf = storage . MarshalMetricLabelRaw ( ic . MetricNameBuf , & ic . Labels [ len ( ic . Labels ) - 1 ] )
storageNodeIdx := ic . GetStorageNodeIdx ( & atCopy , ic . Labels )
if err := ic . WriteDataPointExt ( & atCopy , storageNodeIdx , ic . MetricNameBuf , r . Timestamp , f . Value ) ; err != nil {
return err
}
2019-05-22 23:23:23 +02:00
}
2019-05-22 23:16:55 +02:00
}
}
2021-03-29 10:59:04 +02:00
rowsInserted . Add ( rowsTotal )
2021-03-29 23:06:56 +02:00
rowsTenantInserted . Get ( & atCopy ) . Add ( rowsTotal )
2019-07-27 12:20:47 +02:00
rowsPerInsert . Update ( float64 ( rowsTotal ) )
2019-05-22 23:16:55 +02:00
return ic . FlushBufs ( )
}
type pushCtx struct {
2020-02-23 12:35:47 +01:00
Common netstorage . InsertCtx
2019-05-22 23:16:55 +02:00
metricGroupBuf [ ] byte
2020-07-23 11:50:41 +02:00
originLabels [ ] prompb . Label
2019-05-22 23:16:55 +02:00
}
func ( ctx * pushCtx ) reset ( ) {
2019-05-22 23:23:23 +02:00
ctx . Common . Reset ( )
2019-05-22 23:16:55 +02:00
ctx . metricGroupBuf = ctx . metricGroupBuf [ : 0 ]
2020-07-23 11:50:41 +02:00
originLabels := ctx . originLabels
for i := range originLabels {
label := & originLabels [ i ]
label . Name = nil
label . Value = nil
}
ctx . originLabels = ctx . originLabels [ : 0 ]
2019-05-22 23:16:55 +02:00
}
func getPushCtx ( ) * pushCtx {
select {
case ctx := <- pushCtxPoolCh :
return ctx
default :
if v := pushCtxPool . Get ( ) ; v != nil {
return v . ( * pushCtx )
}
return & pushCtx { }
}
}
func putPushCtx ( ctx * pushCtx ) {
ctx . reset ( )
select {
case pushCtxPoolCh <- ctx :
default :
pushCtxPool . Put ( ctx )
}
}
var pushCtxPool sync . Pool
2020-12-08 19:49:32 +01:00
var pushCtxPoolCh = make ( chan * pushCtx , cgroup . AvailableCPUs ( ) )