2019-05-22 23:23:23 +02:00
package main
2019-05-22 23:16:55 +02:00
import (
"flag"
"fmt"
"net/http"
2019-05-22 23:23:23 +02:00
"time"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheus"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/metrics"
)
var (
graphiteListenAddr = flag . String ( "graphiteListenAddr" , "" , "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty" )
opentsdbListenAddr = flag . String ( "opentsdbListenAddr" , "" , "TCP and UDP address to listen for OpentTSDB put messages. Usually :4242 must be set. Doesn't work if empty" )
2019-05-22 23:23:23 +02:00
httpListenAddr = flag . String ( "httpListenAddr" , ":8480" , "Address to listen for http connections" )
2019-05-22 23:16:55 +02:00
maxInsertRequestSize = flag . Int ( "maxInsertRequestSize" , 32 * 1024 * 1024 , "The maximum size of a single insert request in bytes" )
2019-05-22 23:23:23 +02:00
storageNodes flagutil . Array
2019-05-22 23:16:55 +02:00
)
2019-05-22 23:23:23 +02:00
func main ( ) {
flag . Var ( & storageNodes , "storageNode" , "Vmstorage address, usage -storageNode=vmstorage-host1:8400 -storageNode=vmstorage-host2:8400" )
flag . Parse ( )
buildinfo . Init ( )
logger . Init ( )
logger . Infof ( "initializing netstorage for storageNodes=%v..." , storageNodes )
startTime := time . Now ( )
if len ( storageNodes ) == 0 {
logger . Fatalf ( "storageNodes cannot be empty" )
}
netstorage . InitStorageNodes ( storageNodes )
logger . Infof ( "successfully initialized netstorage in %s" , time . Since ( startTime ) )
2019-05-22 23:16:55 +02:00
if len ( * graphiteListenAddr ) > 0 {
go graphite . Serve ( * graphiteListenAddr )
}
if len ( * opentsdbListenAddr ) > 0 {
go opentsdb . Serve ( * opentsdbListenAddr )
}
2019-05-22 23:23:23 +02:00
go func ( ) {
httpserver . Serve ( * httpListenAddr , requestHandler )
} ( )
sig := procutil . WaitForSigterm ( )
logger . Infof ( "service received signal %s" , sig )
logger . Infof ( "gracefully shutting down the service at %q" , * httpListenAddr )
startTime = time . Now ( )
if err := httpserver . Stop ( * httpListenAddr ) ; err != nil {
logger . Fatalf ( "cannot stop the service: %s" , err )
}
logger . Infof ( "successfully shut down the service in %s" , time . Since ( startTime ) )
2019-05-22 23:16:55 +02:00
if len ( * graphiteListenAddr ) > 0 {
graphite . Stop ( )
}
if len ( * opentsdbListenAddr ) > 0 {
opentsdb . Stop ( )
}
2019-05-22 23:23:23 +02:00
logger . Infof ( "shutting down neststorage..." )
startTime = time . Now ( )
netstorage . Stop ( )
logger . Infof ( "successfully stopped netstorage in %s" , time . Since ( startTime ) )
logger . Infof ( "the vminsert has been stopped" )
2019-05-22 23:16:55 +02:00
}
2019-05-22 23:23:23 +02:00
func requestHandler ( w http . ResponseWriter , r * http . Request ) bool {
p , err := httpserver . ParsePath ( r . URL . Path )
if err != nil {
httpserver . Errorf ( w , "cannot parse path %q: %s" , r . URL . Path , err )
return true
}
if p . Prefix != "insert" {
// This is not our link.
return false
}
at , err := auth . NewToken ( p . AuthToken )
if err != nil {
httpserver . Errorf ( w , "auth error: %s" , err )
return true
}
switch p . Suffix {
case "prometheus/" , "prometheus" :
2019-05-22 23:16:55 +02:00
prometheusWriteRequests . Inc ( )
2019-05-22 23:23:23 +02:00
if err := prometheus . InsertHandler ( at , r , int64 ( * maxInsertRequestSize ) ) ; err != nil {
2019-05-22 23:16:55 +02:00
prometheusWriteErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-05-22 23:23:23 +02:00
case "influx/write" , "influx/api/v2/write" :
2019-05-22 23:16:55 +02:00
influxWriteRequests . Inc ( )
2019-05-22 23:23:23 +02:00
if err := influx . InsertHandler ( at , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
influxWriteErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-05-22 23:23:23 +02:00
case "influx/query" :
2019-05-22 23:16:55 +02:00
// Emulate fake response for influx query
influxQueryRequests . Inc ( )
fmt . Fprintf ( w , ` { "results":[ { "series":[ { "values":[]}]}]} ` )
return true
default :
// This is not our link
return false
}
}
var (
2019-05-22 23:23:23 +02:00
prometheusWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/prometheus/", protocol="prometheus"} ` )
prometheusWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/prometheus/", protocol="prometheus"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
influxWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/influx/", protocol="influx"} ` )
influxWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/influx/", protocol="influx"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
influxQueryRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/influx/query", protocol="influx"} ` )
2019-05-22 23:16:55 +02:00
)