2019-05-22 23:23:23 +02:00
package main
2019-05-22 23:16:55 +02:00
import (
"flag"
"fmt"
2020-02-23 12:35:47 +01:00
"io"
2021-05-08 16:55:44 +02:00
"net"
2019-05-22 23:16:55 +02:00
"net/http"
2020-05-16 10:59:30 +02:00
"os"
2022-06-07 14:42:17 +02:00
"strings"
2020-06-19 00:10:18 +02:00
"sync/atomic"
2019-05-22 23:23:23 +02:00
"time"
2019-05-22 23:16:55 +02:00
2021-05-08 16:55:44 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/clusternative"
2020-03-10 18:35:58 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/csvimport"
2021-09-28 21:47:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/datadog"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/influx"
2020-09-26 03:29:45 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/native"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb"
2019-08-22 11:27:18 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp"
2020-07-10 11:00:35 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
2020-07-02 18:42:12 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
2019-12-09 19:58:19 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
2020-02-10 12:26:18 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2019-11-12 15:29:43 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
2021-03-15 20:37:13 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/influxutils"
2021-05-08 16:55:44 +02:00
clusternativeserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/clusternative"
2020-02-23 12:35:47 +01:00
graphiteserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/graphite"
2020-02-25 18:09:46 +01:00
influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx"
2020-02-23 12:35:47 +01:00
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2020-09-28 03:11:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2022-07-21 18:49:52 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
2019-08-23 07:45:11 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2020-02-23 12:35:47 +01:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/metrics"
)
var (
2021-05-08 16:55:44 +02:00
clusternativeListenAddr = flag . String ( "clusternativeListenAddr" , "" , "TCP address to listen for data from other vminsert nodes in multi-level cluster setup. " +
"See https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multi-level-cluster-setup . Usually :8400 must be set. Doesn't work if empty" )
2019-12-13 23:29:14 +01:00
graphiteListenAddr = flag . String ( "graphiteListenAddr" , "" , "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty" )
2022-05-13 15:49:22 +02:00
influxListenAddr = flag . String ( "influxListenAddr" , "" , "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. " +
2021-09-13 16:04:28 +02:00
"This flag isn't needed when ingesting data over HTTP - just send it to http://<victoriametrics>:8428/write" )
2019-12-13 23:29:14 +01:00
opentsdbListenAddr = flag . String ( "opentsdbListenAddr" , "" , "TCP and UDP address to listen for OpentTSDB metrics. " +
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. " +
"Usually :4242 must be set. Doesn't work if empty" )
2019-08-22 11:27:18 +02:00
opentsdbHTTPListenAddr = flag . String ( "opentsdbHTTPListenAddr" , "" , "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty" )
httpListenAddr = flag . String ( "httpListenAddr" , ":8480" , "Address to listen for http connections" )
2021-12-06 10:39:19 +01:00
maxLabelsPerTimeseries = flag . Int ( "maxLabelsPerTimeseries" , 30 , "The maximum number of labels accepted per time series. Superfluous labels are dropped. In this case the vm_metrics_with_dropped_labels_total metric at /metrics page is incremented" )
maxLabelValueLen = flag . Int ( "maxLabelValueLen" , 16 * 1024 , "The maximum length of label values in the accepted time series. Longer label values are truncated. In this case the vm_too_long_label_values_total metric at /metrics page is incremented" )
2022-10-01 17:26:05 +02:00
storageNodes = flagutil . NewArrayString ( "storageNode" , "Comma-separated addresses of vmstorage nodes; usage: -storageNode=vmstorage-host1,...,vmstorage-hostN" )
2019-05-22 23:16:55 +02:00
)
2019-12-13 23:29:14 +01:00
var (
2021-05-08 16:55:44 +02:00
clusternativeServer * clusternativeserver . Server
graphiteServer * graphiteserver . Server
influxServer * influxserver . Server
opentsdbServer * opentsdbserver . Server
opentsdbhttpServer * opentsdbhttpserver . Server
2019-12-13 23:29:14 +01:00
)
2019-05-22 23:23:23 +02:00
func main ( ) {
2020-05-16 10:59:30 +02:00
// Write flags and help message to stdout, since it is easier to grep or pipe.
flag . CommandLine . SetOutput ( os . Stdout )
2020-12-03 20:40:30 +01:00
flag . Usage = usage
2020-02-10 12:26:18 +01:00
envflag . Parse ( )
2019-05-22 23:23:23 +02:00
buildinfo . Init ( )
logger . Init ( )
2022-07-22 12:35:58 +02:00
pushmetrics . Init ( )
2019-05-22 23:23:23 +02:00
2019-07-20 09:21:59 +02:00
logger . Infof ( "initializing netstorage for storageNodes %s..." , * storageNodes )
2019-05-22 23:23:23 +02:00
startTime := time . Now ( )
2019-06-18 09:26:44 +02:00
if len ( * storageNodes ) == 0 {
2019-07-20 09:21:59 +02:00
logger . Fatalf ( "missing -storageNode arg" )
2019-05-22 23:23:23 +02:00
}
2022-09-08 20:17:58 +02:00
if duplicatedAddr := checkDuplicates ( * storageNodes ) ; duplicatedAddr != "" {
logger . Fatalf ( "found equal addresses of storage nodes in the -storageNodes flag: %q" , duplicatedAddr )
}
2022-02-06 19:20:02 +01:00
hashSeed := uint64 ( 0 )
2021-10-07 11:21:42 +02:00
if * clusternativeListenAddr != "" {
// Use different hash seed for the second level of vminsert nodes in multi-level cluster setup.
// This should fix uneven distribution of time series among storage nodes.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1672
2022-02-06 19:20:02 +01:00
hashSeed = 0xabcdef0123456789
2021-10-07 11:21:42 +02:00
}
netstorage . InitStorageNodes ( * storageNodes , hashSeed )
2020-01-22 17:27:44 +01:00
logger . Infof ( "successfully initialized netstorage in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 23:23:23 +02:00
2020-07-02 18:42:12 +02:00
relabel . Init ( )
2019-08-23 07:45:11 +02:00
storage . SetMaxLabelsPerTimeseries ( * maxLabelsPerTimeseries )
2021-12-06 10:39:19 +01:00
storage . SetMaxLabelValueLen ( * maxLabelValueLen )
2020-09-28 03:11:55 +02:00
common . StartUnmarshalWorkers ( )
2020-02-23 12:35:47 +01:00
writeconcurrencylimiter . Init ( )
2021-05-08 16:55:44 +02:00
if len ( * clusternativeListenAddr ) > 0 {
clusternativeServer = clusternativeserver . MustStart ( * clusternativeListenAddr , func ( c net . Conn ) error {
return clusternative . InsertHandler ( c )
2020-02-25 18:09:46 +01:00
} )
}
2019-05-22 23:16:55 +02:00
if len ( * graphiteListenAddr ) > 0 {
2020-02-23 12:35:47 +01:00
graphiteServer = graphiteserver . MustStart ( * graphiteListenAddr , func ( r io . Reader ) error {
2022-09-30 16:28:35 +02:00
return graphite . InsertHandler ( nil , r )
2020-02-23 12:35:47 +01:00
} )
2019-05-22 23:16:55 +02:00
}
2021-05-08 16:55:44 +02:00
if len ( * influxListenAddr ) > 0 {
influxServer = influxserver . MustStart ( * influxListenAddr , func ( r io . Reader ) error {
2022-09-30 16:28:35 +02:00
return influx . InsertHandlerForReader ( nil , r )
2021-05-08 16:55:44 +02:00
} )
}
2019-05-22 23:16:55 +02:00
if len ( * opentsdbListenAddr ) > 0 {
2020-02-23 12:35:47 +01:00
opentsdbServer = opentsdbserver . MustStart ( * opentsdbListenAddr , func ( r io . Reader ) error {
2022-09-30 16:28:35 +02:00
return opentsdb . InsertHandler ( nil , r )
2020-02-23 12:35:47 +01:00
} , opentsdbhttp . InsertHandler )
2019-05-22 23:16:55 +02:00
}
2019-08-22 11:27:18 +02:00
if len ( * opentsdbHTTPListenAddr ) > 0 {
2020-02-23 12:35:47 +01:00
opentsdbhttpServer = opentsdbhttpserver . MustStart ( * opentsdbHTTPListenAddr , opentsdbhttp . InsertHandler )
2019-08-22 11:27:18 +02:00
}
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
go func ( ) {
httpserver . Serve ( * httpListenAddr , requestHandler )
} ( )
sig := procutil . WaitForSigterm ( )
logger . Infof ( "service received signal %s" , sig )
app/vmstorage: add missing shutdown for http server on graceful shutdown
This could result in the following panic during graceful shutdown when `/metrics` page is requested:
http: panic serving 10.101.66.5:57366: runtime error: invalid memory address or nil pointer dereference
goroutine 2050 [running]:
net/http.(*conn).serve.func1(0xc00ef22000)
net/http/server.go:1772 +0x139
panic(0xa0fc00, 0xe91d80)
runtime/panic.go:973 +0x3e3
github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache.(*Cache).UpdateStats(0x0, 0xc0000516c8)
github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache/cache.go:224 +0x37
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage.(*indexDB).UpdateMetrics(0xc00b931d00, 0xc02c41acf8)
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/index_db.go:258 +0x9f
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage.(*Storage).UpdateMetrics(0xc0000bc7e0, 0xc02c41ac00)
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/storage.go:413 +0x4c5
main.registerStorageMetrics.func1(0x0)
github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/main.go:186 +0xd9
main.registerStorageMetrics.func3(0xc00008c380)
github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/main.go:196 +0x26
main.registerStorageMetrics.func7(0xc)
github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/main.go:211 +0x26
github.com/VictoriaMetrics/metrics.(*Gauge).marshalTo(0xc000010148, 0xaa407d, 0x20, 0xb50d60, 0xc005319890)
github.com/VictoriaMetrics/metrics@v1.11.2/gauge.go:38 +0x3f
github.com/VictoriaMetrics/metrics.(*Set).WritePrometheus(0xc000084300, 0x7fd56809c940, 0xc005319860)
github.com/VictoriaMetrics/metrics@v1.11.2/set.go:51 +0x1e1
github.com/VictoriaMetrics/metrics.WritePrometheus(0x7fd56809c940, 0xc005319860, 0xa16f01)
github.com/VictoriaMetrics/metrics@v1.11.2/metrics.go:42 +0x41
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver.writePrometheusMetrics(0x7fd56809c940, 0xc005319860)
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver/metrics.go:16 +0x44
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver.handlerWrapper(0xb5a120, 0xc005319860, 0xc005018f00, 0xc00002cc90)
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver/httpserver.go:154 +0x58d
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver.gzipHandler.func1(0xb5a120, 0xc005319860, 0xc005018f00)
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver/httpserver.go:119 +0x8e
net/http.HandlerFunc.ServeHTTP(0xc00002d110, 0xb5a660, 0xc0044141c0, 0xc005018f00)
net/http/server.go:2012 +0x44
net/http.serverHandler.ServeHTTP(0xc004414000, 0xb5a660, 0xc0044141c0, 0xc005018f00)
net/http/server.go:2807 +0xa3
net/http.(*conn).serve(0xc00ef22000, 0xb5bf60, 0xc010532080)
net/http/server.go:1895 +0x86c
created by net/http.(*Server).Serve
net/http/server.go:2933 +0x35c
2020-04-02 20:07:59 +02:00
logger . Infof ( "gracefully shutting down http service at %q" , * httpListenAddr )
2019-05-22 23:23:23 +02:00
startTime = time . Now ( )
if err := httpserver . Stop ( * httpListenAddr ) ; err != nil {
app/vmstorage: add missing shutdown for http server on graceful shutdown
This could result in the following panic during graceful shutdown when `/metrics` page is requested:
http: panic serving 10.101.66.5:57366: runtime error: invalid memory address or nil pointer dereference
goroutine 2050 [running]:
net/http.(*conn).serve.func1(0xc00ef22000)
net/http/server.go:1772 +0x139
panic(0xa0fc00, 0xe91d80)
runtime/panic.go:973 +0x3e3
github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache.(*Cache).UpdateStats(0x0, 0xc0000516c8)
github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache/cache.go:224 +0x37
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage.(*indexDB).UpdateMetrics(0xc00b931d00, 0xc02c41acf8)
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/index_db.go:258 +0x9f
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage.(*Storage).UpdateMetrics(0xc0000bc7e0, 0xc02c41ac00)
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/storage.go:413 +0x4c5
main.registerStorageMetrics.func1(0x0)
github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/main.go:186 +0xd9
main.registerStorageMetrics.func3(0xc00008c380)
github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/main.go:196 +0x26
main.registerStorageMetrics.func7(0xc)
github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/main.go:211 +0x26
github.com/VictoriaMetrics/metrics.(*Gauge).marshalTo(0xc000010148, 0xaa407d, 0x20, 0xb50d60, 0xc005319890)
github.com/VictoriaMetrics/metrics@v1.11.2/gauge.go:38 +0x3f
github.com/VictoriaMetrics/metrics.(*Set).WritePrometheus(0xc000084300, 0x7fd56809c940, 0xc005319860)
github.com/VictoriaMetrics/metrics@v1.11.2/set.go:51 +0x1e1
github.com/VictoriaMetrics/metrics.WritePrometheus(0x7fd56809c940, 0xc005319860, 0xa16f01)
github.com/VictoriaMetrics/metrics@v1.11.2/metrics.go:42 +0x41
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver.writePrometheusMetrics(0x7fd56809c940, 0xc005319860)
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver/metrics.go:16 +0x44
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver.handlerWrapper(0xb5a120, 0xc005319860, 0xc005018f00, 0xc00002cc90)
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver/httpserver.go:154 +0x58d
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver.gzipHandler.func1(0xb5a120, 0xc005319860, 0xc005018f00)
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver/httpserver.go:119 +0x8e
net/http.HandlerFunc.ServeHTTP(0xc00002d110, 0xb5a660, 0xc0044141c0, 0xc005018f00)
net/http/server.go:2012 +0x44
net/http.serverHandler.ServeHTTP(0xc004414000, 0xb5a660, 0xc0044141c0, 0xc005018f00)
net/http/server.go:2807 +0xa3
net/http.(*conn).serve(0xc00ef22000, 0xb5bf60, 0xc010532080)
net/http/server.go:1895 +0x86c
created by net/http.(*Server).Serve
net/http/server.go:2933 +0x35c
2020-04-02 20:07:59 +02:00
logger . Fatalf ( "cannot stop http service: %s" , err )
2019-05-22 23:23:23 +02:00
}
app/vmstorage: add missing shutdown for http server on graceful shutdown
This could result in the following panic during graceful shutdown when `/metrics` page is requested:
http: panic serving 10.101.66.5:57366: runtime error: invalid memory address or nil pointer dereference
goroutine 2050 [running]:
net/http.(*conn).serve.func1(0xc00ef22000)
net/http/server.go:1772 +0x139
panic(0xa0fc00, 0xe91d80)
runtime/panic.go:973 +0x3e3
github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache.(*Cache).UpdateStats(0x0, 0xc0000516c8)
github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache/cache.go:224 +0x37
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage.(*indexDB).UpdateMetrics(0xc00b931d00, 0xc02c41acf8)
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/index_db.go:258 +0x9f
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage.(*Storage).UpdateMetrics(0xc0000bc7e0, 0xc02c41ac00)
github.com/VictoriaMetrics/VictoriaMetrics/lib/storage/storage.go:413 +0x4c5
main.registerStorageMetrics.func1(0x0)
github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/main.go:186 +0xd9
main.registerStorageMetrics.func3(0xc00008c380)
github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/main.go:196 +0x26
main.registerStorageMetrics.func7(0xc)
github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage/main.go:211 +0x26
github.com/VictoriaMetrics/metrics.(*Gauge).marshalTo(0xc000010148, 0xaa407d, 0x20, 0xb50d60, 0xc005319890)
github.com/VictoriaMetrics/metrics@v1.11.2/gauge.go:38 +0x3f
github.com/VictoriaMetrics/metrics.(*Set).WritePrometheus(0xc000084300, 0x7fd56809c940, 0xc005319860)
github.com/VictoriaMetrics/metrics@v1.11.2/set.go:51 +0x1e1
github.com/VictoriaMetrics/metrics.WritePrometheus(0x7fd56809c940, 0xc005319860, 0xa16f01)
github.com/VictoriaMetrics/metrics@v1.11.2/metrics.go:42 +0x41
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver.writePrometheusMetrics(0x7fd56809c940, 0xc005319860)
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver/metrics.go:16 +0x44
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver.handlerWrapper(0xb5a120, 0xc005319860, 0xc005018f00, 0xc00002cc90)
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver/httpserver.go:154 +0x58d
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver.gzipHandler.func1(0xb5a120, 0xc005319860, 0xc005018f00)
github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver/httpserver.go:119 +0x8e
net/http.HandlerFunc.ServeHTTP(0xc00002d110, 0xb5a660, 0xc0044141c0, 0xc005018f00)
net/http/server.go:2012 +0x44
net/http.serverHandler.ServeHTTP(0xc004414000, 0xb5a660, 0xc0044141c0, 0xc005018f00)
net/http/server.go:2807 +0xa3
net/http.(*conn).serve(0xc00ef22000, 0xb5bf60, 0xc010532080)
net/http/server.go:1895 +0x86c
created by net/http.(*Server).Serve
net/http/server.go:2933 +0x35c
2020-04-02 20:07:59 +02:00
logger . Infof ( "successfully shut down http service in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 23:23:23 +02:00
2021-05-08 16:55:44 +02:00
if len ( * clusternativeListenAddr ) > 0 {
clusternativeServer . MustStop ( )
2020-02-25 18:09:46 +01:00
}
2019-05-22 23:16:55 +02:00
if len ( * graphiteListenAddr ) > 0 {
2019-12-13 23:29:14 +01:00
graphiteServer . MustStop ( )
2019-05-22 23:16:55 +02:00
}
2021-05-08 16:55:44 +02:00
if len ( * influxListenAddr ) > 0 {
influxServer . MustStop ( )
}
2019-05-22 23:16:55 +02:00
if len ( * opentsdbListenAddr ) > 0 {
2019-12-13 23:29:14 +01:00
opentsdbServer . MustStop ( )
2019-05-22 23:16:55 +02:00
}
2019-08-22 11:27:18 +02:00
if len ( * opentsdbHTTPListenAddr ) > 0 {
2019-12-13 23:29:14 +01:00
opentsdbhttpServer . MustStop ( )
2019-08-22 11:27:18 +02:00
}
2020-09-28 03:11:55 +02:00
common . StopUnmarshalWorkers ( )
2019-05-22 23:23:23 +02:00
logger . Infof ( "shutting down neststorage..." )
startTime = time . Now ( )
netstorage . Stop ( )
2020-01-22 17:27:44 +01:00
logger . Infof ( "successfully stopped netstorage in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 23:23:23 +02:00
2019-11-12 15:29:43 +01:00
fs . MustStopDirRemover ( )
2019-05-22 23:23:23 +02:00
logger . Infof ( "the vminsert has been stopped" )
2019-05-22 23:16:55 +02:00
}
2019-05-22 23:23:23 +02:00
func requestHandler ( w http . ResponseWriter , r * http . Request ) bool {
2021-07-07 12:25:16 +02:00
startTime := time . Now ( )
defer requestDuration . UpdateDuration ( startTime )
2020-12-14 13:02:57 +01:00
if r . URL . Path == "/" {
2021-04-02 21:54:06 +02:00
if r . Method != "GET" {
return false
}
2021-04-20 19:16:17 +02:00
fmt . Fprintf ( w , "vminsert - a component of VictoriaMetrics cluster. See docs at https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html" )
2020-10-06 14:00:38 +02:00
return true
}
2019-05-22 23:23:23 +02:00
p , err := httpserver . ParsePath ( r . URL . Path )
if err != nil {
2020-07-20 13:00:33 +02:00
httpserver . Errorf ( w , r , "cannot parse path %q: %s" , r . URL . Path , err )
2019-05-22 23:23:23 +02:00
return true
}
if p . Prefix != "insert" {
// This is not our link.
return false
}
at , err := auth . NewToken ( p . AuthToken )
if err != nil {
2020-07-20 13:00:33 +02:00
httpserver . Errorf ( w , r , "auth error: %s" , err )
2019-05-22 23:23:23 +02:00
return true
}
2022-06-07 14:06:18 +02:00
if strings . HasPrefix ( p . Suffix , "datadog/" ) {
2022-06-07 14:16:48 +02:00
// Trim suffix from paths starting from /datadog/ in order to support legacy DataDog agent.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2670
2022-06-07 14:06:18 +02:00
p . Suffix = strings . TrimSuffix ( p . Suffix , "/" )
}
2019-05-22 23:23:23 +02:00
switch p . Suffix {
2019-06-03 17:17:25 +02:00
case "prometheus/" , "prometheus" , "prometheus/api/v1/write" :
2019-05-22 23:16:55 +02:00
prometheusWriteRequests . Inc ( )
2020-02-23 12:35:47 +01:00
if err := promremotewrite . InsertHandler ( at , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
prometheusWriteErrors . Inc ( )
2021-07-07 11:59:03 +02:00
httpserver . Errorf ( w , r , "%s" , err )
2019-05-22 23:16:55 +02:00
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-12-09 19:58:19 +01:00
case "prometheus/api/v1/import" :
vmimportRequests . Inc ( )
if err := vmimport . InsertHandler ( at , r ) ; err != nil {
vmimportErrors . Inc ( )
2021-07-07 11:59:03 +02:00
httpserver . Errorf ( w , r , "%s" , err )
2019-12-09 19:58:19 +01:00
return true
}
2019-12-19 00:21:49 +01:00
w . WriteHeader ( http . StatusNoContent )
2019-12-09 19:58:19 +01:00
return true
2020-03-10 18:35:58 +01:00
case "prometheus/api/v1/import/csv" :
csvimportRequests . Inc ( )
if err := csvimport . InsertHandler ( at , r ) ; err != nil {
csvimportErrors . Inc ( )
2021-07-07 11:59:03 +02:00
httpserver . Errorf ( w , r , "%s" , err )
2020-03-10 18:35:58 +01:00
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2020-07-10 11:00:35 +02:00
case "prometheus/api/v1/import/prometheus" :
prometheusimportRequests . Inc ( )
if err := prometheusimport . InsertHandler ( at , r ) ; err != nil {
prometheusimportErrors . Inc ( )
2021-07-07 11:59:03 +02:00
httpserver . Errorf ( w , r , "%s" , err )
2020-07-10 11:00:35 +02:00
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2020-09-26 03:29:45 +02:00
case "prometheus/api/v1/import/native" :
nativeimportRequests . Inc ( )
if err := native . InsertHandler ( at , r ) ; err != nil {
nativeimportErrors . Inc ( )
2021-07-07 11:59:03 +02:00
httpserver . Errorf ( w , r , "%s" , err )
2020-09-26 03:29:45 +02:00
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-05-22 23:23:23 +02:00
case "influx/write" , "influx/api/v2/write" :
2019-05-22 23:16:55 +02:00
influxWriteRequests . Inc ( )
2022-02-17 11:45:20 +01:00
addInfluxResponseHeaders ( w )
2020-02-25 18:09:46 +01:00
if err := influx . InsertHandlerForHTTP ( at , r ) ; err != nil {
2019-05-22 23:16:55 +02:00
influxWriteErrors . Inc ( )
2021-07-07 11:59:03 +02:00
httpserver . Errorf ( w , r , "%s" , err )
2019-05-22 23:16:55 +02:00
return true
}
w . WriteHeader ( http . StatusNoContent )
return true
2019-05-22 23:23:23 +02:00
case "influx/query" :
2019-05-22 23:16:55 +02:00
influxQueryRequests . Inc ( )
2022-02-17 11:45:20 +01:00
addInfluxResponseHeaders ( w )
2021-03-15 20:37:13 +01:00
influxutils . WriteDatabaseNames ( w )
2019-05-22 23:16:55 +02:00
return true
2021-09-28 21:47:45 +02:00
case "datadog/api/v1/series" :
datadogWriteRequests . Inc ( )
if err := datadog . InsertHandlerForHTTP ( at , r ) ; err != nil {
datadogWriteErrors . Inc ( )
httpserver . Errorf ( w , r , "%s" , err )
return true
}
// See https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2021-09-28 21:47:45 +02:00
w . WriteHeader ( 202 )
fmt . Fprintf ( w , ` { "status":"ok"} ` )
return true
case "datadog/api/v1/validate" :
datadogValidateRequests . Inc ( )
// See https://docs.datadoghq.com/api/latest/authentication/#validate-api-key
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2021-09-28 21:47:45 +02:00
fmt . Fprintf ( w , ` { "valid":true} ` )
return true
case "datadog/api/v1/check_run" :
datadogCheckRunRequests . Inc ( )
// See https://docs.datadoghq.com/api/latest/service-checks/#submit-a-service-check
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2021-09-28 21:47:45 +02:00
w . WriteHeader ( 202 )
fmt . Fprintf ( w , ` { "status":"ok"} ` )
return true
2022-06-07 14:16:48 +02:00
case "datadog/intake" :
2021-09-28 21:47:45 +02:00
datadogIntakeRequests . Inc ( )
2021-11-09 17:03:50 +01:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
2021-09-28 21:47:45 +02:00
fmt . Fprintf ( w , ` { } ` )
return true
2022-06-13 08:52:13 +02:00
case "datadog/api/v1/metadata" :
datadogMetadataRequests . Inc ( )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
fmt . Fprintf ( w , ` { } ` )
return true
2019-05-22 23:16:55 +02:00
default :
// This is not our link
return false
}
}
2022-02-17 11:45:20 +01:00
func addInfluxResponseHeaders ( w http . ResponseWriter ) {
// This is needed for some clients, which expect InfluxDB version header.
// See, for example, https://github.com/ntop/ntopng/issues/5449#issuecomment-1005347597
w . Header ( ) . Set ( "X-Influxdb-Version" , "1.8.0" )
}
2019-05-22 23:16:55 +02:00
var (
2021-07-07 12:25:16 +02:00
requestDuration = metrics . NewHistogram ( ` vminsert_request_duration_seconds ` )
2020-03-10 18:35:58 +01:00
prometheusWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/prometheus/", protocol="promremotewrite"} ` )
prometheusWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/prometheus/", protocol="promremotewrite"} ` )
vmimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/prometheus/api/v1/import", protocol="vmimport"} ` )
vmimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/prometheus/api/v1/import", protocol="vmimport"} ` )
2019-05-22 23:16:55 +02:00
2020-03-10 18:35:58 +01:00
csvimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/prometheus/api/v1/import/csv", protocol="csvimport"} ` )
csvimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/prometheus/api/v1/import/csv", protocol="csvimport"} ` )
2019-12-09 19:58:19 +01:00
2020-07-10 11:00:35 +02:00
prometheusimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/prometheus/api/v1/import/prometheus", protocol="prometheusimport"} ` )
prometheusimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/prometheus/api/v1/import/prometheus", protocol="prometheusimport"} ` )
2020-09-26 03:29:45 +02:00
nativeimportRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/prometheus/api/v1/import/native", protocol="nativeimport"} ` )
nativeimportErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/prometheus/api/v1/import/native", protocol="nativeimport"} ` )
2021-09-28 21:47:45 +02:00
influxWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/influx/write", protocol="influx"} ` )
influxWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/influx/write", protocol="influx"} ` )
2019-05-22 23:16:55 +02:00
2019-05-22 23:23:23 +02:00
influxQueryRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/influx/query", protocol="influx"} ` )
2020-06-19 00:10:18 +02:00
2021-09-28 21:47:45 +02:00
datadogWriteRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/datadog/api/v1/series", protocol="datadog"} ` )
datadogWriteErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="/insert/ { }/datadog/api/v1/series", protocol="datadog"} ` )
datadogValidateRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/datadog/api/v1/validate", protocol="datadog"} ` )
datadogCheckRunRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/datadog/api/v1/check_run", protocol="datadog"} ` )
2022-06-07 14:16:48 +02:00
datadogIntakeRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/datadog/intake", protocol="datadog"} ` )
2022-06-13 08:52:13 +02:00
datadogMetadataRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/insert/ { }/datadog/api/v1/metadata", protocol="datadog"} ` )
2021-09-28 21:47:45 +02:00
2020-06-19 00:10:18 +02:00
_ = metrics . NewGauge ( ` vm_metrics_with_dropped_labels_total ` , func ( ) float64 {
return float64 ( atomic . LoadUint64 ( & storage . MetricsWithDroppedLabels ) )
} )
_ = metrics . NewGauge ( ` vm_too_long_label_names_total ` , func ( ) float64 {
return float64 ( atomic . LoadUint64 ( & storage . TooLongLabelNames ) )
} )
_ = metrics . NewGauge ( ` vm_too_long_label_values_total ` , func ( ) float64 {
return float64 ( atomic . LoadUint64 ( & storage . TooLongLabelValues ) )
} )
2019-05-22 23:16:55 +02:00
)
2020-12-03 20:40:30 +01:00
func usage ( ) {
const s = `
vminsert accepts data via popular data ingestion protocols and routes it to vmstorage nodes configured via - storageNode .
2021-04-20 19:16:17 +02:00
See the docs at https : //docs.victoriametrics.com/Cluster-VictoriaMetrics.html .
2020-12-03 20:40:30 +01:00
`
flagutil . Usage ( s )
}
2022-09-08 20:17:58 +02:00
func checkDuplicates ( arr [ ] string ) string {
visited := make ( map [ string ] struct { } )
for _ , s := range arr {
if _ , ok := visited [ s ] ; ok {
return s
}
visited [ s ] = struct { } { }
}
return ""
}