2019-05-22 23:16:55 +02:00
package vminsert
import (
"flag"
"fmt"
"net/http"
2020-10-08 17:50:22 +02:00
"strconv"
2019-05-22 23:16:55 +02:00
"strings"
2020-06-19 00:10:18 +02:00
"sync/atomic"
2019-05-22 23:16:55 +02:00
2020-03-10 18:35:58 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
2020-09-26 03:29:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
2019-08-22 11:27:18 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
2020-07-10 11:00:35 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prompush"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
2020-07-02 18:42:12 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
2019-12-09 19:58:19 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2020-02-23 12:35:47 +01:00
graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite"
2020-02-25 18:09:46 +01:00
influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx"
2020-02-23 12:35:47 +01:00
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
2020-04-30 01:15:39 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
2020-09-28 03:11:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2019-08-23 07:45:11 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/metrics"
)
var (
2019-12-13 23:29:14 +01:00
graphiteListenAddr = flag . String ( "graphiteListenAddr" , "" , "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty" )
2020-08-14 17:01:00 +02:00
influxListenAddr = flag . String ( "influxListenAddr" , "" , "TCP and UDP address to listen for Influx line protocol data. Usually :8189 must be set. Doesn't work if empty. " +
"This flag isn't needed when ingesting data over HTTP - just send it to `http://<victoriametrics>:8428/write`" )
2019-12-13 23:29:14 +01:00
opentsdbListenAddr = flag . String ( "opentsdbListenAddr" , "" , "TCP and UDP address to listen for OpentTSDB metrics. " +
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. " +
"Usually :4242 must be set. Doesn't work if empty" )
2019-08-22 11:27:18 +02:00
opentsdbHTTPListenAddr = flag . String ( "opentsdbHTTPListenAddr" , "" , "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty" )
2019-08-23 07:45:11 +02:00
maxLabelsPerTimeseries = flag . Int ( "maxLabelsPerTimeseries" , 30 , "The maximum number of labels accepted per time series. Superflouos labels are dropped" )
2019-05-22 23:16:55 +02:00
)
2019-12-13 23:29:14 +01:00
var (
2020-02-25 18:09:46 +01:00
influxServer * influxserver . Server
2020-02-23 12:35:47 +01:00
graphiteServer * graphiteserver . Server
opentsdbServer * opentsdbserver . Server
opentsdbhttpServer * opentsdbhttpserver . Server
2019-12-13 23:29:14 +01:00
)
2019-05-22 23:16:55 +02:00
// Init initializes vminsert.
func Init ( ) {
2020-07-02 18:42:12 +02:00
relabel . Init ( )
2019-08-23 07:45:11 +02:00
storage . SetMaxLabelsPerTimeseries ( * maxLabelsPerTimeseries )
2020-09-28 03:11:55 +02:00
common . StartUnmarshalWorkers ( )
2020-02-23 12:35:47 +01:00
writeconcurrencylimiter . Init ( )
2020-02-25 18:09:46 +01:00
if len ( * influxListenAddr ) > 0 {
influxServer = influxserver . MustStart ( * influxListenAddr , influx . InsertHandlerForReader )
}
2019-05-22 23:16:55 +02:00
if len ( * graphiteListenAddr ) > 0 {
2020-02-23 12:35:47 +01:00
graphiteServer = graphiteserver . MustStart ( * graphiteListenAddr , graphite . InsertHandler )
2019-05-22 23:16:55 +02:00
}
if len ( * opentsdbListenAddr ) > 0 {
2020-02-23 12:35:47 +01:00
opentsdbServer = opentsdbserver . MustStart ( * opentsdbListenAddr , opentsdb . InsertHandler , opentsdbhttp . InsertHandler )
2019-05-22 23:16:55 +02:00
}
2019-08-22 11:27:18 +02:00
if len ( * opentsdbHTTPListenAddr ) > 0 {
2020-02-23 12:35:47 +01:00
opentsdbhttpServer = opentsdbhttpserver . MustStart ( * opentsdbHTTPListenAddr , opentsdbhttp . InsertHandler )
2019-08-22 11:27:18 +02:00
}
2020-02-23 12:35:47 +01:00
promscrape . Init ( prompush . Push )
2019-05-22 23:16:55 +02:00
}
// Stop stops vminsert.
func Stop ( ) {
2020-02-23 12:35:47 +01:00
promscrape . Stop ( )
2020-02-25 18:09:46 +01:00
if len ( * influxListenAddr ) > 0 {
influxServer . MustStop ( )
}
2019-05-22 23:16:55 +02:00
if len ( * graphiteListenAddr ) > 0 {
2019-12-13 23:29:14 +01:00
graphiteServer . MustStop ( )
2019-05-22 23:16:55 +02:00
}
if len ( * opentsdbListenAddr ) > 0 {
2019-12-13 23:29:14 +01:00
opentsdbServer . MustStop ( )
2019-05-22 23:16:55 +02:00
}
2019-08-22 11:27:18 +02:00
if len ( * opentsdbHTTPListenAddr ) > 0 {
2019-12-13 23:29:14 +01:00
opentsdbhttpServer . MustStop ( )
2019-08-22 11:27:18 +02:00
}
2020-09-28 03:11:55 +02:00
common . StopUnmarshalWorkers ( )
2019-05-22 23:16:55 +02:00
}
// RequestHandler is a handler for Prometheus remote storage write API
func RequestHandler ( w http . ResponseWriter , r * http . Request ) bool {
path := strings . Replace ( r . URL . Path , "//" , "/" , - 1 )
switch path {
case "/api/v1/write" :
prometheusWriteRequests . Inc ( )
2020-02-23 12:35:47 +01:00
if err := promremotewrite . InsertHandler ( r ) ; err != nil {
2019-05-22 23:16:55 +02:00
prometheusWriteErrors . Inc ( )
2020-07-20 13:00:33 +02:00
httpserver . Errorf ( w , r , "error in %q: %s" , r . URL . Path , err )
2019-05-22 23:16:55 +02:00
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-12-09 19:58:19 +01:00
case "/api/v1/import" :
vmimportRequests . Inc ( )
if err := vmimport . InsertHandler ( r ) ; err != nil {
vmimportErrors . Inc ( )
2020-07-20 13:00:33 +02:00
httpserver . Errorf ( w , r , "error in %q: %s" , r . URL . Path , err )
2019-12-09 19:58:19 +01:00
return true
}
2019-12-19 00:21:49 +01:00
w . WriteHeader ( http . StatusNoContent )
2019-12-09 19:58:19 +01:00
return true
2020-03-10 18:35:58 +01:00
case "/api/v1/import/csv" :
csvimportRequests . Inc ( )
if err := csvimport . InsertHandler ( r ) ; err != nil {
csvimportErrors . Inc ( )
2020-07-20 13:00:33 +02:00
httpserver . Errorf ( w , r , "error in %q: %s" , r . URL . Path , err )
2020-03-10 18:35:58 +01:00
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2020-07-10 11:00:35 +02:00
case "/api/v1/import/prometheus" :
prometheusimportRequests . Inc ( )
if err := prometheusimport . InsertHandler ( r ) ; err != nil {
prometheusimportErrors . Inc ( )
2020-07-20 13:00:33 +02:00
httpserver . Errorf ( w , r , "error in %q: %s" , r . URL . Path , err )
2020-07-10 11:00:35 +02:00
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2020-09-26 03:29:45 +02:00
case "/api/v1/import/native" :
nativeimportRequests . Inc ( )
if err := native . InsertHandler ( r ) ; err != nil {
nativeimportErrors . Inc ( )
httpserver . Errorf ( w , r , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-05-22 23:16:55 +02:00
case "/write" , "/api/v2/write" :
influxWriteRequests . Inc ( )
2020-02-25 18:09:46 +01:00
if err := influx . InsertHandlerForHTTP ( r ) ; err != nil {
2019-05-22 23:16:55 +02:00
influxWriteErrors . Inc ( )
2020-07-20 13:00:33 +02:00
httpserver . Errorf ( w , r , "error in %q: %s" , r . URL . Path , err )
2019-05-22 23:16:55 +02:00
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
case "/query" :
2019-06-03 17:37:59 +02:00
// Emulate fake response for influx query.
// This is required for TSBS benchmark.
2019-05-22 23:16:55 +02:00
influxQueryRequests . Inc ( )
fmt . Fprintf ( w , ` { "results":[ { "series":[ { "values":[]}]}]} ` )
return true
2020-02-25 17:13:11 +01:00
case "/targets" :
promscrapeTargetsRequests . Inc ( )
w . Header ( ) . Set ( "Content-Type" , "text/plain" )
2020-10-08 17:50:22 +02:00
showOriginalLabels , _ := strconv . ParseBool ( r . FormValue ( "show_original_labels" ) )
promscrape . WriteHumanReadableTargetsStatus ( w , showOriginalLabels )
2020-02-25 17:13:11 +01:00
return true
2020-04-30 01:15:39 +02:00
case "/-/reload" :
promscrapeConfigReloadRequests . Inc ( )
procutil . SelfSIGHUP ( )
w . WriteHeader ( http . StatusNoContent )
return true
2019-05-22 23:16:55 +02:00
default :
// This is not our link
return false
}
}
var (
2020-03-10 18:35:58 +01:00
prometheusWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/api/v1/write", protocol="promremotewrite"} ` )
prometheusWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/api/v1/write", protocol="promremotewrite"} ` )
vmimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/api/v1/import", protocol="vmimport"} ` )
vmimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/api/v1/import", protocol="vmimport"} ` )
2019-05-22 23:16:55 +02:00
2020-03-10 18:35:58 +01:00
csvimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/api/v1/import/csv", protocol="csvimport"} ` )
csvimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/api/v1/import/csv", protocol="csvimport"} ` )
2019-12-09 19:58:19 +01:00
2020-07-10 11:00:35 +02:00
prometheusimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/api/v1/import/prometheus", protocol="prometheusimport"} ` )
prometheusimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/api/v1/import/prometheus", protocol="prometheusimport"} ` )
2020-09-26 03:29:45 +02:00
nativeimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/api/v1/import/native", protocol="nativeimport"} ` )
nativeimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/api/v1/import/native", protocol="nativeimport"} ` )
2019-05-22 23:16:55 +02:00
influxWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/write", protocol="influx"} ` )
influxWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/write", protocol="influx"} ` )
influxQueryRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/query", protocol="influx"} ` )
2020-02-25 17:13:11 +01:00
promscrapeTargetsRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/targets"} ` )
2020-04-30 01:15:39 +02:00
promscrapeConfigReloadRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/-/reload"} ` )
2020-06-19 00:10:18 +02:00
_ = metrics . NewGauge ( ` vm_metrics_with_dropped_labels_total ` , func ( ) float64 {
return float64 ( atomic . LoadUint64 ( & storage . MetricsWithDroppedLabels ) )
} )
_ = metrics . NewGauge ( ` vm_too_long_label_names_total ` , func ( ) float64 {
return float64 ( atomic . LoadUint64 ( & storage . TooLongLabelNames ) )
} )
_ = metrics . NewGauge ( ` vm_too_long_label_values_total ` , func ( ) float64 {
return float64 ( atomic . LoadUint64 ( & storage . TooLongLabelValues ) )
} )
2019-05-22 23:16:55 +02:00
)