2020-02-23 12:35:47 +01:00
package main
import (
"flag"
"fmt"
"net/http"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite"
2020-02-25 18:09:46 +01:00
influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx"
2020-02-23 12:35:47 +01:00
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
httpListenAddr = flag . String ( "httpListenAddr" , ":8429" , "TCP address to listen for http connections" )
2020-02-25 18:09:46 +01:00
influxListenAddr = flag . String ( "influxListenAddr" , "" , "TCP and UDP address to listen for Influx line protocol data. Usually :8189 must be set. Doesn't work if empty" )
2020-02-23 12:35:47 +01:00
graphiteListenAddr = flag . String ( "graphiteListenAddr" , "" , "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty" )
opentsdbListenAddr = flag . String ( "opentsdbListenAddr" , "" , "TCP and UDP address to listen for OpentTSDB metrics. " +
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. " +
"Usually :4242 must be set. Doesn't work if empty" )
opentsdbHTTPListenAddr = flag . String ( "opentsdbHTTPListenAddr" , "" , "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty" )
)
var (
2020-02-25 18:09:46 +01:00
influxServer * influxserver . Server
2020-02-23 12:35:47 +01:00
graphiteServer * graphiteserver . Server
opentsdbServer * opentsdbserver . Server
opentsdbhttpServer * opentsdbhttpserver . Server
)
func main ( ) {
envflag . Parse ( )
buildinfo . Init ( )
logger . Init ( )
logger . Infof ( "starting vmagent at %q..." , * httpListenAddr )
startTime := time . Now ( )
remotewrite . Init ( )
writeconcurrencylimiter . Init ( )
2020-02-25 18:09:46 +01:00
if len ( * influxListenAddr ) > 0 {
influxServer = influxserver . MustStart ( * influxListenAddr , influx . InsertHandlerForReader )
}
2020-02-23 12:35:47 +01:00
if len ( * graphiteListenAddr ) > 0 {
graphiteServer = graphiteserver . MustStart ( * graphiteListenAddr , graphite . InsertHandler )
}
if len ( * opentsdbListenAddr ) > 0 {
opentsdbServer = opentsdbserver . MustStart ( * opentsdbListenAddr , opentsdb . InsertHandler , opentsdbhttp . InsertHandler )
}
if len ( * opentsdbHTTPListenAddr ) > 0 {
opentsdbhttpServer = opentsdbhttpserver . MustStart ( * opentsdbHTTPListenAddr , opentsdbhttp . InsertHandler )
}
promscrape . Init ( remotewrite . Push )
go httpserver . Serve ( * httpListenAddr , requestHandler )
logger . Infof ( "started vmagent in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
sig := procutil . WaitForSigterm ( )
logger . Infof ( "received signal %s" , sig )
logger . Infof ( "gracefully shutting down webservice at %q" , * httpListenAddr )
startTime = time . Now ( )
if err := httpserver . Stop ( * httpListenAddr ) ; err != nil {
logger . Fatalf ( "cannot stop the webservice: %s" , err )
}
logger . Infof ( "successfully shut down the webservice in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
promscrape . Stop ( )
2020-02-25 18:09:46 +01:00
if len ( * influxListenAddr ) > 0 {
influxServer . MustStop ( )
}
2020-02-23 12:35:47 +01:00
if len ( * graphiteListenAddr ) > 0 {
graphiteServer . MustStop ( )
}
if len ( * opentsdbListenAddr ) > 0 {
opentsdbServer . MustStop ( )
}
if len ( * opentsdbHTTPListenAddr ) > 0 {
opentsdbhttpServer . MustStop ( )
}
remotewrite . Stop ( )
logger . Infof ( "successfully stopped vmagent in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
}
func requestHandler ( w http . ResponseWriter , r * http . Request ) bool {
path := strings . Replace ( r . URL . Path , "//" , "/" , - 1 )
switch path {
case "/api/v1/write" :
prometheusWriteRequests . Inc ( )
if err := promremotewrite . InsertHandler ( r ) ; err != nil {
prometheusWriteErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
case "/api/v1/import" :
vmimportRequests . Inc ( )
if err := vmimport . InsertHandler ( r ) ; err != nil {
vmimportErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
case "/write" , "/api/v2/write" :
influxWriteRequests . Inc ( )
2020-02-25 18:09:46 +01:00
if err := influx . InsertHandlerForHTTP ( r ) ; err != nil {
2020-02-23 12:35:47 +01:00
influxWriteErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
case "/query" :
// Emulate fake response for influx query.
// This is required for TSBS benchmark.
influxQueryRequests . Inc ( )
fmt . Fprintf ( w , ` { "results":[ { "series":[ { "values":[]}]}]} ` )
return true
2020-02-25 17:13:11 +01:00
case "/targets" :
promscrapeTargetsRequests . Inc ( )
w . Header ( ) . Set ( "Content-Type" , "text/plain" )
promscrape . WriteHumanReadableTargetsStatus ( w )
return true
2020-02-23 12:35:47 +01:00
}
return false
}
var (
prometheusWriteRequests = metrics . NewCounter ( ` vmagent_http_requests_total { path="/api/v1/write", protocol="prometheus"} ` )
prometheusWriteErrors = metrics . NewCounter ( ` vmagent_http_request_errors_total { path="/api/v1/write", protocol="prometheus"} ` )
vmimportRequests = metrics . NewCounter ( ` vmagent_http_requests_total { path="/api/v1/import", protocol="vm"} ` )
vmimportErrors = metrics . NewCounter ( ` vmagent_http_request_errors_total { path="/api/v1/import", protocol="vm"} ` )
influxWriteRequests = metrics . NewCounter ( ` vmagent_http_requests_total { path="/write", protocol="influx"} ` )
influxWriteErrors = metrics . NewCounter ( ` vmagent_http_request_errors_total { path="/write", protocol="influx"} ` )
influxQueryRequests = metrics . NewCounter ( ` vmagent_http_requests_total { path="/query", protocol="influx"} ` )
2020-02-25 17:13:11 +01:00
promscrapeTargetsRequests = metrics . NewCounter ( ` vmagent_http_requests_total { path="/targets"} ` )
2020-02-23 12:35:47 +01:00
)