2019-05-22 23:23:23 +02:00
package main
2019-05-22 23:16:55 +02:00
import (
"flag"
"net/http"
"runtime"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/metrics"
)
var (
2019-05-22 23:23:23 +02:00
httpListenAddr = flag . String ( "httpListenAddr" , ":8481" , "Address to listen for http connections" )
cacheDataPath = flag . String ( "cacheDataPath" , "" , "Path to directory for cache files. Cache isn't saved if empty" )
2019-05-22 23:16:55 +02:00
maxConcurrentRequests = flag . Int ( "search.maxConcurrentRequests" , runtime . GOMAXPROCS ( - 1 ) * 2 , "The maximum number of concurrent search requests. It shouldn't exceed 2*vCPUs for better performance. See also -search.maxQueueDuration" )
maxQueueDuration = flag . Duration ( "search.maxQueueDuration" , 10 * time . Second , "The maximum time the request waits for execution when -search.maxConcurrentRequests limit is reached" )
2019-05-22 23:23:23 +02:00
storageNodes flagutil . Array
2019-05-22 23:16:55 +02:00
)
2019-05-22 23:23:23 +02:00
func main ( ) {
flag . Var ( & storageNodes , "storageNode" , "Vmstorage address, usage -storageNode=vmstorage-host1:8401 -storageNode=vmstorage-host2:8401" )
flag . Parse ( )
buildinfo . Init ( )
logger . Init ( )
logger . Infof ( "starting netstorage at storageNodes=%v" , storageNodes )
startTime := time . Now ( )
if len ( storageNodes ) == 0 {
logger . Fatalf ( "storageNodes cannot be empty" )
}
netstorage . InitStorageNodes ( storageNodes )
logger . Infof ( "started netstorage in %s" , time . Since ( startTime ) )
if len ( * cacheDataPath ) > 0 {
tmpDataPath := * cacheDataPath + "/tmp"
fs . RemoveDirContents ( tmpDataPath )
netstorage . InitTmpBlocksDir ( tmpDataPath )
promql . InitRollupResultCache ( * cacheDataPath + "/rollupResult" )
} else {
netstorage . InitTmpBlocksDir ( "" )
promql . InitRollupResultCache ( "" )
}
2019-05-22 23:16:55 +02:00
concurrencyCh = make ( chan struct { } , * maxConcurrentRequests )
2019-05-22 23:23:23 +02:00
go func ( ) {
httpserver . Serve ( * httpListenAddr , requestHandler )
} ( )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
sig := procutil . WaitForSigterm ( )
logger . Infof ( "service received signal %s" , sig )
logger . Infof ( "gracefully shutting down the service at %q" , * httpListenAddr )
startTime = time . Now ( )
if err := httpserver . Stop ( * httpListenAddr ) ; err != nil {
logger . Fatalf ( "cannot stop the service: %s" , err )
}
logger . Infof ( "successfully shut down the service in %s" , time . Since ( startTime ) )
logger . Infof ( "shutting down neststorage..." )
startTime = time . Now ( )
netstorage . Stop ( )
if len ( * cacheDataPath ) > 0 {
promql . StopRollupResultCache ( )
}
logger . Infof ( "successfully stopped netstorage in %s" , time . Since ( startTime ) )
logger . Infof ( "the vmselect has been stopped" )
2019-05-22 23:16:55 +02:00
}
2019-05-22 23:23:23 +02:00
var concurrencyCh chan struct { }
func requestHandler ( w http . ResponseWriter , r * http . Request ) bool {
2019-05-22 23:16:55 +02:00
// Limit the number of concurrent queries.
// Sleep for a second until giving up. This should resolve short bursts in requests.
t := time . NewTimer ( * maxQueueDuration )
select {
case concurrencyCh <- struct { } { } :
t . Stop ( )
defer func ( ) { <- concurrencyCh } ( )
case <- t . C :
httpserver . Errorf ( w , "cannot handle more than %d concurrent requests" , cap ( concurrencyCh ) )
return true
}
2019-05-22 23:23:23 +02:00
path := r . URL . Path
if path == "/internal/resetRollupResultCache" {
promql . ResetRollupResultCache ( )
return true
}
p , err := httpserver . ParsePath ( path )
if err != nil {
httpserver . Errorf ( w , "cannot parse path %q: %s" , path , err )
return true
}
at , err := auth . NewToken ( p . AuthToken )
if err != nil {
httpserver . Errorf ( w , "auth error: %s" , err )
return true
}
switch p . Prefix {
case "select" :
return selectHandler ( w , r , p , at )
case "delete" :
return deleteHandler ( w , r , p , at )
default :
// This is not our link
return false
}
}
func selectHandler ( w http . ResponseWriter , r * http . Request , p * httpserver . Path , at * auth . Token ) bool {
if strings . HasPrefix ( p . Suffix , "prometheus/api/v1/label/" ) {
s := p . Suffix [ len ( "prometheus/api/v1/label/" ) : ]
2019-05-22 23:16:55 +02:00
if strings . HasSuffix ( s , "/values" ) {
labelValuesRequests . Inc ( )
labelName := s [ : len ( s ) - len ( "/values" ) ]
httpserver . EnableCORS ( w , r )
2019-05-22 23:23:23 +02:00
if err := prometheus . LabelValuesHandler ( at , labelName , w , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
labelValuesErrors . Inc ( )
sendPrometheusError ( w , r , err )
return true
}
return true
}
}
2019-05-22 23:23:23 +02:00
switch p . Suffix {
case "prometheus/api/v1/query" :
2019-05-22 23:16:55 +02:00
queryRequests . Inc ( )
httpserver . EnableCORS ( w , r )
2019-05-22 23:23:23 +02:00
if err := prometheus . QueryHandler ( at , w , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
queryErrors . Inc ( )
sendPrometheusError ( w , r , err )
return true
}
return true
2019-05-22 23:23:23 +02:00
case "prometheus/api/v1/query_range" :
2019-05-22 23:16:55 +02:00
queryRangeRequests . Inc ( )
httpserver . EnableCORS ( w , r )
2019-05-22 23:23:23 +02:00
if err := prometheus . QueryRangeHandler ( at , w , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
queryRangeErrors . Inc ( )
sendPrometheusError ( w , r , err )
return true
}
return true
2019-05-22 23:23:23 +02:00
case "prometheus/api/v1/series" :
2019-05-22 23:16:55 +02:00
seriesRequests . Inc ( )
httpserver . EnableCORS ( w , r )
2019-05-22 23:23:23 +02:00
if err := prometheus . SeriesHandler ( at , w , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
seriesErrors . Inc ( )
sendPrometheusError ( w , r , err )
return true
}
return true
2019-05-22 23:23:23 +02:00
case "prometheus/api/v1/series/count" :
2019-05-22 23:16:55 +02:00
seriesCountRequests . Inc ( )
httpserver . EnableCORS ( w , r )
2019-05-22 23:23:23 +02:00
if err := prometheus . SeriesCountHandler ( at , w , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
seriesCountErrors . Inc ( )
sendPrometheusError ( w , r , err )
return true
}
return true
2019-05-22 23:23:23 +02:00
case "prometheus/api/v1/labels" :
2019-05-22 23:16:55 +02:00
labelsRequests . Inc ( )
httpserver . EnableCORS ( w , r )
2019-05-22 23:23:23 +02:00
if err := prometheus . LabelsHandler ( at , w , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
labelsErrors . Inc ( )
sendPrometheusError ( w , r , err )
return true
}
return true
2019-05-22 23:23:23 +02:00
case "prometheus/api/v1/export" :
2019-05-22 23:16:55 +02:00
exportRequests . Inc ( )
2019-05-22 23:23:23 +02:00
if err := prometheus . ExportHandler ( at , w , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
exportErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
return true
2019-05-22 23:23:23 +02:00
case "prometheus/federate" :
2019-05-22 23:16:55 +02:00
federateRequests . Inc ( )
2019-05-22 23:23:23 +02:00
if err := prometheus . FederateHandler ( at , w , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
federateErrors . Inc ( )
2019-05-22 23:23:23 +02:00
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
2019-05-22 23:16:55 +02:00
return true
}
return true
2019-05-22 23:23:23 +02:00
default :
return false
}
}
func deleteHandler ( w http . ResponseWriter , r * http . Request , p * httpserver . Path , at * auth . Token ) bool {
switch p . Suffix {
case "prometheus/api/v1/admin/tsdb/delete_series" :
2019-05-22 23:16:55 +02:00
deleteRequests . Inc ( )
2019-05-22 23:23:23 +02:00
if err := prometheus . DeleteHandler ( at , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
deleteErrors . Inc ( )
httpserver . Errorf ( w , "error in %q: %s" , r . URL . Path , err )
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
default :
return false
}
}
func sendPrometheusError ( w http . ResponseWriter , r * http . Request , err error ) {
logger . Errorf ( "error in %q: %s" , r . URL . Path , err )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
statusCode := 422
w . WriteHeader ( statusCode )
prometheus . WriteErrorResponse ( w , statusCode , err )
}
var (
2019-05-22 23:23:23 +02:00
labelValuesRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/select/ { }/prometheus/api/v1/label/ { }/values"} ` )
labelValuesErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="select/ { }/prometheus/api/v1/label/ { }/values"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
queryRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/select/ { }/prometheus/api/v1/query"} ` )
queryErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/select/ { }/prometheus/api/v1/query"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
queryRangeRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/select/prometheus/api/v1/query_range"} ` )
queryRangeErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/select/ { }/prometheus/api/v1/query_range"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
seriesRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/select/ { }/prometheus/api/v1/series"} ` )
seriesErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/select/ { }/prometheus/api/v1/series"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
seriesCountRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/select/ { }/prometheus/api/v1/series/count"} ` )
seriesCountErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/select/ { }/prometheus/api/v1/series/count"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
labelsRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/select/ { }/prometheus/api/v1/labels"} ` )
labelsErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/select/ { }/prometheus/api/v1/labels"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
deleteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/delete/ { }/prometheus/api/v1/admin/tsdb/delete_series"} ` )
deleteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/delete/ { }/prometheus/api/v1/admin/tsdb/delete_series"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
exportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/select/ { }/prometheus/api/v1/export"} ` )
exportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/select/ { }/prometheus/api/v1/export"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
federateRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/select/ { }/prometheus/federate"} ` )
federateErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/select/ { }/prometheus/federate"} ` )
2019-05-22 23:16:55 +02:00
)