2020-02-23 12:35:47 +01:00
package influx
import (
"flag"
2020-02-25 18:09:46 +01:00
"io"
2020-02-23 12:35:47 +01:00
"net/http"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
2021-08-05 08:44:29 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-12-08 19:49:32 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2020-11-07 15:16:56 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
2021-01-12 23:52:50 +01:00
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2020-02-23 12:35:47 +01:00
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
2021-08-05 08:44:29 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
2021-09-13 16:04:28 +02:00
measurementFieldSeparator = flag . String ( "influxMeasurementFieldSeparator" , "_" , "Separator for '{measurement}{separator}{field_name}' metric name when inserted via InfluxDB line protocol" )
skipSingleField = flag . Bool ( "influxSkipSingleField" , false , "Uses '{measurement}' instead of '{measurement}{separator}{field_name}' for metic name if InfluxDB line contains only a single field" )
2020-07-14 13:17:22 +02:00
skipMeasurement = flag . Bool ( "influxSkipMeasurement" , false , "Uses '{field_name}' as a metric name while ignoring '{measurement}' and '-influxMeasurementFieldSeparator'" )
2020-02-23 12:35:47 +01:00
)
var (
2021-08-05 08:44:29 +02:00
rowsInserted = metrics . NewCounter ( ` vmagent_rows_inserted_total { type="influx"} ` )
2021-08-05 08:46:19 +02:00
rowsTenantInserted = tenantmetrics . NewCounterMap ( ` vmagent_tenant_inserted_rows_total { type="influx"} ` )
2021-08-05 08:44:29 +02:00
rowsPerInsert = metrics . NewHistogram ( ` vmagent_rows_per_insert { type="influx"} ` )
2020-02-23 12:35:47 +01:00
)
2020-02-25 18:09:46 +01:00
// InsertHandlerForReader processes remote write for influx line protocol.
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
2021-09-20 13:49:28 +02:00
func InsertHandlerForReader ( r io . Reader , isGzipped bool ) error {
2020-02-25 18:09:46 +01:00
return writeconcurrencylimiter . Do ( func ( ) error {
2021-09-20 13:49:28 +02:00
return parser . ParseStream ( r , isGzipped , "" , "" , func ( db string , rows [ ] parser . Row ) error {
2021-08-05 08:44:29 +02:00
return insertRows ( nil , db , rows , nil )
2021-01-12 23:52:50 +01:00
} )
2020-02-25 18:09:46 +01:00
} )
}
// InsertHandlerForHTTP processes remote write for influx line protocol.
2020-02-23 12:35:47 +01:00
//
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
2021-08-05 08:46:19 +02:00
func InsertHandlerForHTTP ( at * auth . Token , req * http . Request ) error {
2021-01-12 23:52:50 +01:00
extraLabels , err := parserCommon . GetExtraLabels ( req )
if err != nil {
return err
}
2020-02-23 12:35:47 +01:00
return writeconcurrencylimiter . Do ( func ( ) error {
2020-02-25 18:09:46 +01:00
isGzipped := req . Header . Get ( "Content-Encoding" ) == "gzip"
q := req . URL . Query ( )
precision := q . Get ( "precision" )
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q . Get ( "db" )
2021-01-12 23:52:50 +01:00
return parser . ParseStream ( req . Body , isGzipped , precision , db , func ( db string , rows [ ] parser . Row ) error {
2021-08-05 08:46:19 +02:00
return insertRows ( at , db , rows , extraLabels )
2021-01-12 23:52:50 +01:00
} )
2020-02-23 12:35:47 +01:00
} )
}
2021-08-05 08:46:19 +02:00
func insertRows ( at * auth . Token , db string , rows [ ] parser . Row , extraLabels [ ] prompbmarshal . Label ) error {
2020-02-23 12:35:47 +01:00
ctx := getPushCtx ( )
defer putPushCtx ( ctx )
rowsTotal := 0
tssDst := ctx . ctx . WriteRequest . Timeseries [ : 0 ]
labels := ctx . ctx . Labels [ : 0 ]
samples := ctx . ctx . Samples [ : 0 ]
commonLabels := ctx . commonLabels [ : 0 ]
buf := ctx . buf [ : 0 ]
for i := range rows {
r := & rows [ i ]
2020-10-09 12:29:27 +02:00
rowsTotal += len ( r . Fields )
2020-02-23 12:35:47 +01:00
commonLabels = commonLabels [ : 0 ]
2020-07-28 20:23:01 +02:00
hasDBKey := false
2020-02-23 12:35:47 +01:00
for j := range r . Tags {
tag := & r . Tags [ j ]
if tag . Key == "db" {
2020-07-28 20:23:01 +02:00
hasDBKey = true
2020-02-23 12:35:47 +01:00
}
commonLabels = append ( commonLabels , prompbmarshal . Label {
Name : tag . Key ,
Value : tag . Value ,
} )
}
2020-07-28 20:23:01 +02:00
if len ( db ) > 0 && ! hasDBKey {
2020-02-23 12:35:47 +01:00
commonLabels = append ( commonLabels , prompbmarshal . Label {
Name : "db" ,
Value : db ,
} )
}
2021-01-12 23:52:50 +01:00
commonLabels = append ( commonLabels , extraLabels ... )
2020-07-14 13:17:22 +02:00
ctx . metricGroupBuf = ctx . metricGroupBuf [ : 0 ]
if ! * skipMeasurement {
ctx . metricGroupBuf = append ( ctx . metricGroupBuf , r . Measurement ... )
}
2021-03-22 12:53:42 +01:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139
skipFieldKey := len ( r . Measurement ) > 0 && len ( r . Fields ) == 1 && * skipSingleField
2020-02-23 12:35:47 +01:00
if len ( ctx . metricGroupBuf ) > 0 && ! skipFieldKey {
ctx . metricGroupBuf = append ( ctx . metricGroupBuf , * measurementFieldSeparator ... )
}
for j := range r . Fields {
f := & r . Fields [ j ]
bufLen := len ( buf )
buf = append ( buf , ctx . metricGroupBuf ... )
if ! skipFieldKey {
buf = append ( buf , f . Key ... )
}
metricGroup := bytesutil . ToUnsafeString ( buf [ bufLen : ] )
labelsLen := len ( labels )
labels = append ( labels , prompbmarshal . Label {
Name : "__name__" ,
Value : metricGroup ,
} )
labels = append ( labels , commonLabels ... )
samples = append ( samples , prompbmarshal . Sample {
Timestamp : r . Timestamp ,
Value : f . Value ,
} )
tssDst = append ( tssDst , prompbmarshal . TimeSeries {
Labels : labels [ labelsLen : ] ,
Samples : samples [ len ( samples ) - 1 : ] ,
} )
}
}
ctx . buf = buf
ctx . ctx . WriteRequest . Timeseries = tssDst
ctx . ctx . Labels = labels
ctx . ctx . Samples = samples
ctx . commonLabels = commonLabels
2021-08-05 08:46:19 +02:00
remotewrite . PushWithAuthToken ( at , & ctx . ctx . WriteRequest )
2020-02-23 12:35:47 +01:00
rowsInserted . Add ( rowsTotal )
2021-08-05 08:46:19 +02:00
if at != nil {
rowsTenantInserted . Get ( at ) . Add ( rowsTotal )
2021-08-05 08:44:29 +02:00
}
2020-02-23 12:35:47 +01:00
rowsPerInsert . Update ( float64 ( rowsTotal ) )
return nil
}
type pushCtx struct {
ctx common . PushCtx
commonLabels [ ] prompbmarshal . Label
metricGroupBuf [ ] byte
buf [ ] byte
}
func ( ctx * pushCtx ) reset ( ) {
ctx . ctx . Reset ( )
2020-11-07 15:16:56 +01:00
promrelabel . CleanLabels ( ctx . commonLabels )
ctx . commonLabels = ctx . commonLabels [ : 0 ]
2020-02-23 12:35:47 +01:00
ctx . metricGroupBuf = ctx . metricGroupBuf [ : 0 ]
ctx . buf = ctx . buf [ : 0 ]
}
func getPushCtx ( ) * pushCtx {
select {
case ctx := <- pushCtxPoolCh :
return ctx
default :
if v := pushCtxPool . Get ( ) ; v != nil {
return v . ( * pushCtx )
}
return & pushCtx { }
}
}
func putPushCtx ( ctx * pushCtx ) {
ctx . reset ( )
select {
case pushCtxPoolCh <- ctx :
default :
pushCtxPool . Put ( ctx )
}
}
var pushCtxPool sync . Pool
2020-12-08 19:49:32 +01:00
var pushCtxPoolCh = make ( chan * pushCtx , cgroup . AvailableCPUs ( ) )