2019-05-22 23:23:23 +02:00
package netstorage
import (
2019-05-24 11:51:07 +02:00
"flag"
2019-05-22 23:23:23 +02:00
"fmt"
2020-04-27 08:32:08 +02:00
"io"
2019-05-22 23:23:23 +02:00
"sync"
2020-05-27 14:07:16 +02:00
"sync/atomic"
2019-05-22 23:23:23 +02:00
"time"
2020-05-27 14:07:16 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2019-06-08 21:29:25 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-06-01 13:33:29 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-06-08 21:29:25 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
2019-06-08 21:29:25 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2020-09-28 20:35:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/metrics"
2019-06-08 21:29:25 +02:00
xxhash "github.com/cespare/xxhash/v2"
2019-05-22 23:23:23 +02:00
)
2020-05-28 18:57:05 +02:00
var (
disableRPCCompression = flag . Bool ( ` rpc.disableCompression ` , false , "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage" )
replicationFactor = flag . Int ( "replicationFactor" , 1 , "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. " +
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. " +
"Higher values for -dedup.minScrapeInterval at vmselect is OK" )
)
2019-05-24 11:51:07 +02:00
2020-05-27 14:07:16 +02:00
func ( sn * storageNode ) isBroken ( ) bool {
return atomic . LoadUint32 ( & sn . broken ) != 0
}
// push pushes buf to sn internal bufs.
2019-05-22 23:23:23 +02:00
//
2020-05-27 14:07:16 +02:00
// This function doesn't block on fast path.
// It may block only if all the storageNodes cannot handle the incoming ingestion rate.
// This blocking provides backpressure to the caller.
//
// The function falls back to sending data to other vmstorage nodes
// if sn is currently unavailable or overloaded.
2019-06-08 21:29:25 +02:00
//
// rows is the number of rows in the buf.
func ( sn * storageNode ) push ( buf [ ] byte , rows int ) error {
2020-05-25 00:39:24 +02:00
if len ( buf ) > maxBufSizePerStorageNode {
logger . Panicf ( "BUG: len(buf)=%d cannot exceed %d" , len ( buf ) , maxBufSizePerStorageNode )
2019-06-08 21:29:25 +02:00
}
sn . rowsPushed . Add ( rows )
2020-05-27 14:07:16 +02:00
if sn . isBroken ( ) {
// The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes.
2020-06-25 15:38:42 +02:00
if err := addToReroutedBufMayBlock ( buf , rows ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "%d rows dropped because the current vsmtorage is unavailable and %w" , rows , err )
2019-06-08 21:29:25 +02:00
}
sn . rowsReroutedFromHere . Add ( rows )
2019-05-22 23:23:23 +02:00
return nil
}
2020-05-27 14:07:16 +02:00
sn . brLock . Lock ( )
if len ( sn . br . buf ) + len ( buf ) <= maxBufSizePerStorageNode {
2019-06-08 21:29:25 +02:00
// Fast path: the buf contents fits sn.buf.
2020-05-27 14:07:16 +02:00
sn . br . buf = append ( sn . br . buf , buf ... )
sn . br . rows += rows
sn . brLock . Unlock ( )
2019-06-08 21:29:25 +02:00
return nil
2019-05-22 23:23:23 +02:00
}
2020-05-27 14:07:16 +02:00
sn . brLock . Unlock ( )
2019-06-08 21:29:25 +02:00
// Slow path: the buf contents doesn't fit sn.buf.
2020-05-27 14:07:16 +02:00
// This means that the current vmstorage is slow or will become broken soon.
// Re-route buf to healthy vmstorage nodes.
2020-06-25 15:38:42 +02:00
if err := addToReroutedBufMayBlock ( buf , rows ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "%d rows dropped because the current vmstorage buf is full and %w" , rows , err )
2019-06-08 21:29:25 +02:00
}
2020-05-27 14:07:16 +02:00
sn . rowsReroutedFromHere . Add ( rows )
2019-06-08 21:29:25 +02:00
return nil
}
2020-05-27 14:07:16 +02:00
var closedCh = func ( ) <- chan struct { } {
ch := make ( chan struct { } )
close ( ch )
return ch
} ( )
2019-06-08 21:29:25 +02:00
2020-05-28 18:57:05 +02:00
func ( sn * storageNode ) run ( stopCh <- chan struct { } , snIdx int ) {
replicas := * replicationFactor
if replicas <= 0 {
replicas = 1
}
if replicas > len ( storageNodes ) {
replicas = len ( storageNodes )
}
2020-09-28 20:35:40 +02:00
ticker := time . NewTicker ( 200 * time . Millisecond )
2020-05-27 14:07:16 +02:00
defer ticker . Stop ( )
var br bufRows
2020-06-01 13:33:29 +02:00
brLastResetTime := fasttime . UnixTimestamp ( )
2020-05-27 14:07:16 +02:00
var waitCh <- chan struct { }
mustStop := false
for ! mustStop {
sn . brLock . Lock ( )
bufLen := len ( sn . br . buf )
sn . brLock . Unlock ( )
waitCh = nil
2020-09-28 20:35:40 +02:00
if bufLen > 0 {
// Do not sleep if sn.br.buf isn't empty.
2020-05-27 14:07:16 +02:00
waitCh = closedCh
}
select {
case <- stopCh :
mustStop = true
// Make sure the sn.buf is flushed last time before returning
// in order to send the remaining bits of data.
case <- ticker . C :
case <- waitCh :
}
2020-09-28 20:35:40 +02:00
sn . brLock . Lock ( )
sn . br , br = br , sn . br
sn . brLock . Unlock ( )
2020-06-01 13:33:29 +02:00
currentTime := fasttime . UnixTimestamp ( )
if len ( br . buf ) < cap ( br . buf ) / 4 && currentTime - brLastResetTime > 10 {
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br . buf = append ( br . buf [ : 0 : 0 ] , br . buf ... )
brLastResetTime = currentTime
}
2020-09-28 20:35:40 +02:00
sn . checkHealth ( )
2020-05-28 18:57:05 +02:00
if len ( br . buf ) == 0 {
2020-09-28 20:35:40 +02:00
// Nothing to send.
2020-05-27 14:07:16 +02:00
continue
}
2020-05-28 18:57:05 +02:00
// Send br to replicas storageNodes starting from snIdx.
2020-09-28 20:35:40 +02:00
for ! sendBufToReplicasNonblocking ( & br , snIdx , replicas ) {
t := timerpool . Get ( 200 * time . Millisecond )
select {
case <- stopCh :
timerpool . Put ( t )
return
case <- t . C :
timerpool . Put ( t )
sn . checkHealth ( )
}
2020-05-27 14:07:16 +02:00
}
2020-05-28 18:57:05 +02:00
br . reset ( )
}
}
2020-09-28 20:35:40 +02:00
func sendBufToReplicasNonblocking ( br * bufRows , snIdx , replicas int ) bool {
2020-05-28 18:57:05 +02:00
usedStorageNodes := make ( map [ * storageNode ] bool , replicas )
for i := 0 ; i < replicas ; i ++ {
idx := snIdx + i
attempts := 0
for {
attempts ++
if attempts > len ( storageNodes ) {
if i == 0 {
// The data wasn't replicated at all.
logger . Warnf ( "cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; " +
"re-trying to send the data soon" , len ( br . buf ) , br . rows )
return false
}
// The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true.
rowsIncompletelyReplicatedTotal . Add ( br . rows )
logger . Warnf ( "cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, " +
"since a part of storage nodes is temporarily unavailable" , i + 1 , replicas , * replicationFactor , len ( br . buf ) , br . rows )
return true
}
if idx >= len ( storageNodes ) {
2020-06-19 11:39:19 +02:00
idx %= len ( storageNodes )
2020-05-28 18:57:05 +02:00
}
sn := storageNodes [ idx ]
idx ++
if usedStorageNodes [ sn ] {
// The br has been already replicated to sn. Skip it.
continue
}
2020-09-28 20:35:40 +02:00
if ! sn . sendBufRowsNonblocking ( br ) {
2020-05-28 18:57:05 +02:00
// Cannot send data to sn. Go to the next sn.
continue
}
// Successfully sent data to sn.
usedStorageNodes [ sn ] = true
break
2020-05-27 14:07:16 +02:00
}
2019-05-22 23:23:23 +02:00
}
2020-05-28 18:57:05 +02:00
return true
}
2020-06-18 19:41:33 +02:00
func ( sn * storageNode ) checkHealth ( ) {
sn . bcLock . Lock ( )
defer sn . bcLock . Unlock ( )
if sn . bc != nil {
2020-09-28 20:35:40 +02:00
// The sn looks healthy.
return
2020-06-18 19:41:33 +02:00
}
bc , err := sn . dial ( )
if err != nil {
2020-11-13 23:43:32 +01:00
atomic . StoreUint32 ( & sn . broken , 1 )
2020-09-28 23:20:01 +02:00
if sn . lastDialErr == nil {
// Log the error only once.
sn . lastDialErr = err
logger . Warnf ( "cannot dial storageNode %q: %s" , sn . dialer . Addr ( ) , err )
}
2020-06-18 19:51:28 +02:00
return
2020-06-18 19:41:33 +02:00
}
2020-09-28 20:35:40 +02:00
logger . Infof ( "successfully dialed -storageNode=%q" , sn . dialer . Addr ( ) )
2020-09-28 23:20:01 +02:00
sn . lastDialErr = nil
2020-06-18 19:41:33 +02:00
sn . bc = bc
atomic . StoreUint32 ( & sn . broken , 0 )
}
2020-09-28 20:35:40 +02:00
func ( sn * storageNode ) sendBufRowsNonblocking ( br * bufRows ) bool {
if sn . isBroken ( ) {
return false
}
2020-05-28 18:57:05 +02:00
sn . bcLock . Lock ( )
defer sn . bcLock . Unlock ( )
if sn . bc == nil {
2020-09-28 20:35:40 +02:00
// Do not call sn.dial() here in order to prevent long blocking on sn.bcLock.Lock(),
// which can negatively impact data sending in sendBufToReplicasNonblocking().
2020-11-13 23:43:32 +01:00
// sn.dial() should be called by sn.checkHealth() on unsuccessful call to sendBufToReplicasNonblocking().
2020-09-28 20:35:40 +02:00
return false
2020-05-28 18:57:05 +02:00
}
err := sendToConn ( sn . bc , br . buf )
if err == nil {
2020-09-28 20:35:40 +02:00
// Successfully sent buf to bc.
2020-05-28 18:57:05 +02:00
sn . rowsSent . Add ( br . rows )
return true
}
// Couldn't flush buf to sn. Mark sn as broken.
2020-09-28 20:35:40 +02:00
logger . Warnf ( "cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and " +
"re-routing this data to healthy storage nodes" , len ( br . buf ) , br . rows , sn . dialer . Addr ( ) , err )
2020-05-28 18:57:05 +02:00
if err = sn . bc . Close ( ) ; err != nil {
logger . Warnf ( "cannot close connection to storageNode %q: %s" , sn . dialer . Addr ( ) , err )
}
sn . bc = nil
atomic . StoreUint32 ( & sn . broken , 1 )
2020-09-28 20:35:40 +02:00
sn . connectionErrors . Inc ( )
2020-05-28 18:57:05 +02:00
return false
2019-05-22 23:23:23 +02:00
}
2020-05-27 14:07:16 +02:00
func sendToConn ( bc * handshake . BufferedConn , buf [ ] byte ) error {
2019-09-11 13:25:53 +02:00
if len ( buf ) == 0 {
2020-05-27 14:07:16 +02:00
// Nothing to send
2019-09-11 13:25:53 +02:00
return nil
}
2020-01-21 17:20:14 +01:00
timeoutSeconds := len ( buf ) / 3e5
2019-09-13 21:25:15 +02:00
if timeoutSeconds < 60 {
timeoutSeconds = 60
2019-09-11 12:37:03 +02:00
}
timeout := time . Duration ( timeoutSeconds ) * time . Second
deadline := time . Now ( ) . Add ( timeout )
2020-04-27 08:32:08 +02:00
if err := bc . SetWriteDeadline ( deadline ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot set write deadline to %s: %w" , deadline , err )
2019-05-22 23:23:23 +02:00
}
2019-09-11 12:37:03 +02:00
// sizeBuf guarantees that the rows batch will be either fully
// read or fully discarded on the vmstorage side.
// sizeBuf is used for read optimization in vmstorage.
2020-05-27 14:07:16 +02:00
sizeBuf := sizeBufPool . Get ( )
defer sizeBufPool . Put ( sizeBuf )
sizeBuf . B = encoding . MarshalUint64 ( sizeBuf . B [ : 0 ] , uint64 ( len ( buf ) ) )
if _ , err := bc . Write ( sizeBuf . B ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot write data size %d: %w" , len ( buf ) , err )
2019-05-22 23:23:23 +02:00
}
2020-04-27 08:32:08 +02:00
if _ , err := bc . Write ( buf ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot write data with size %d: %w" , len ( buf ) , err )
2019-05-22 23:23:23 +02:00
}
2020-04-27 08:32:08 +02:00
if err := bc . Flush ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot flush data with size %d: %w" , len ( buf ) , err )
2019-06-08 21:29:25 +02:00
}
2020-04-27 08:32:08 +02:00
// Wait for `ack` from vmstorage.
// This guarantees that the message has been fully received by vmstorage.
2020-04-28 10:18:41 +02:00
deadline = time . Now ( ) . Add ( timeout )
2020-04-27 08:32:08 +02:00
if err := bc . SetReadDeadline ( deadline ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot set read deadline for reading `ack` to vmstorage: %w" , err )
2020-04-27 08:32:08 +02:00
}
2020-05-27 14:07:16 +02:00
if _ , err := io . ReadFull ( bc , sizeBuf . B [ : 1 ] ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot read `ack` from vmstorage: %w" , err )
2020-04-27 08:32:08 +02:00
}
2020-05-27 14:07:16 +02:00
if sizeBuf . B [ 0 ] != 1 {
return fmt . Errorf ( "unexpected `ack` received from vmstorage; got %d; want %d" , sizeBuf . B [ 0 ] , 1 )
2020-04-27 08:32:08 +02:00
}
2019-05-22 23:23:23 +02:00
return nil
}
2020-05-27 14:07:16 +02:00
var sizeBufPool bytesutil . ByteBufferPool
func ( sn * storageNode ) dial ( ) ( * handshake . BufferedConn , error ) {
2019-05-22 23:23:23 +02:00
c , err := sn . dialer . Dial ( )
if err != nil {
sn . dialErrors . Inc ( )
2020-05-27 14:07:16 +02:00
return nil , err
2019-05-22 23:23:23 +02:00
}
compressionLevel := 1
2019-05-24 11:51:07 +02:00
if * disableRPCCompression {
compressionLevel = 0
}
2019-05-22 23:23:23 +02:00
bc , err := handshake . VMInsertClient ( c , compressionLevel )
if err != nil {
_ = c . Close ( )
sn . handshakeErrors . Inc ( )
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "handshake error: %w" , err )
2019-05-22 23:23:23 +02:00
}
2020-05-27 14:07:16 +02:00
return bc , nil
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
func rerouteWorker ( stopCh <- chan struct { } ) {
2020-09-28 20:35:40 +02:00
ticker := time . NewTicker ( 200 * time . Millisecond )
2020-02-13 11:55:58 +01:00
defer ticker . Stop ( )
2020-05-27 14:07:16 +02:00
var br bufRows
2020-06-01 13:33:29 +02:00
brLastResetTime := fasttime . UnixTimestamp ( )
2020-05-27 14:07:16 +02:00
var waitCh <- chan struct { }
2019-06-08 21:29:25 +02:00
mustStop := false
for ! mustStop {
2020-05-27 14:07:16 +02:00
reroutedBRLock . Lock ( )
bufLen := len ( reroutedBR . buf )
reroutedBRLock . Unlock ( )
waitCh = nil
2020-09-28 20:35:40 +02:00
if bufLen > 0 {
// Do not sleep if reroutedBR contains data to process.
2020-05-27 14:07:16 +02:00
waitCh = closedCh
}
2019-06-08 21:29:25 +02:00
select {
case <- stopCh :
mustStop = true
2020-05-27 14:07:16 +02:00
// Make sure reroutedBR is re-routed last time before returning
2019-06-08 21:29:25 +02:00
// in order to reroute the remaining data to healthy vmstorage nodes.
2020-02-13 11:55:58 +01:00
case <- ticker . C :
2020-05-27 14:07:16 +02:00
case <- waitCh :
2019-06-08 21:29:25 +02:00
}
2020-09-28 20:35:40 +02:00
reroutedBRLock . Lock ( )
reroutedBR , br = br , reroutedBR
reroutedBRLock . Unlock ( )
2020-05-27 14:07:16 +02:00
reroutedBRCond . Broadcast ( )
2020-06-01 13:33:29 +02:00
currentTime := fasttime . UnixTimestamp ( )
if len ( br . buf ) < cap ( br . buf ) / 4 && currentTime - brLastResetTime > 10 {
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br . buf = append ( br . buf [ : 0 : 0 ] , br . buf ... )
brLastResetTime = currentTime
}
2020-05-27 14:07:16 +02:00
if len ( br . buf ) == 0 {
// Nothing to re-route.
continue
}
2020-09-28 20:35:40 +02:00
spreadReroutedBufToStorageNodesBlocking ( stopCh , & br )
br . reset ( )
2019-05-22 23:23:23 +02:00
}
2020-06-25 15:38:42 +02:00
// Notify all the blocked addToReroutedBufMayBlock callers, so they may finish the work.
2020-05-27 14:07:16 +02:00
reroutedBRCond . Broadcast ( )
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
// storageNode is a client sending data to vmstorage node.
2019-05-22 23:23:23 +02:00
type storageNode struct {
2020-05-27 14:07:16 +02:00
// broken is set to non-zero if the given vmstorage node is temporarily unhealthy.
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
broken uint32
2019-06-08 21:29:25 +02:00
2020-05-27 14:07:16 +02:00
// brLock protects br.
brLock sync . Mutex
2019-05-22 23:23:23 +02:00
2020-05-27 14:07:16 +02:00
// Buffer with data that needs to be written to the storage node.
2020-05-28 18:57:05 +02:00
// It must be accessed under brLock.
2020-05-27 14:07:16 +02:00
br bufRows
2019-05-22 23:23:23 +02:00
2020-05-28 18:57:05 +02:00
// bcLock protects bc.
bcLock sync . Mutex
// bc is a single connection to vmstorage for data transfer.
// It must be accessed under bcLock.
bc * handshake . BufferedConn
2019-06-08 21:29:25 +02:00
dialer * netutil . TCPDialer
2020-09-28 23:20:01 +02:00
// last error during dial.
lastDialErr error
2019-06-08 21:29:25 +02:00
// The number of dial errors to vmstorage node.
2019-05-22 23:23:23 +02:00
dialErrors * metrics . Counter
2019-06-08 21:29:25 +02:00
// The number of handshake errors to vmstorage node.
2019-05-22 23:23:23 +02:00
handshakeErrors * metrics . Counter
2019-06-08 21:29:25 +02:00
// The number of connection errors to vmstorage node.
2019-05-22 23:23:23 +02:00
connectionErrors * metrics . Counter
2019-06-08 21:29:25 +02:00
// The number of rows pushed to storageNode with push method.
rowsPushed * metrics . Counter
// The number of rows sent to vmstorage node.
rowsSent * metrics . Counter
2019-05-22 23:23:23 +02:00
2019-06-08 21:29:25 +02:00
// The number of rows rerouted from the given vmstorage node
// to healthy nodes when the given node was unhealthy.
rowsReroutedFromHere * metrics . Counter
// The number of rows rerouted to the given vmstorage node
// from other nodes when they were unhealthy.
rowsReroutedToHere * metrics . Counter
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
// storageNodes contains a list of vmstorage node clients.
2019-05-22 23:23:23 +02:00
var storageNodes [ ] * storageNode
2019-06-08 21:29:25 +02:00
var (
storageNodesWG sync . WaitGroup
rerouteWorkerWG sync . WaitGroup
)
2019-05-22 23:23:23 +02:00
2019-06-08 21:29:25 +02:00
var (
storageNodesStopCh = make ( chan struct { } )
rerouteWorkerStopCh = make ( chan struct { } )
)
2019-05-22 23:23:23 +02:00
2019-06-08 21:29:25 +02:00
// InitStorageNodes initializes vmstorage nodes' connections to the given addrs.
2019-05-22 23:23:23 +02:00
func InitStorageNodes ( addrs [ ] string ) {
if len ( addrs ) == 0 {
logger . Panicf ( "BUG: addrs must be non-empty" )
}
if len ( addrs ) > 255 {
logger . Panicf ( "BUG: too much addresses: %d; max supported %d addresses" , len ( addrs ) , 255 )
}
2020-05-28 18:57:05 +02:00
storageNodes = storageNodes [ : 0 ]
2019-05-22 23:23:23 +02:00
for _ , addr := range addrs {
sn := & storageNode {
dialer : netutil . NewTCPDialer ( "vminsert" , addr ) ,
2019-06-08 21:29:25 +02:00
dialErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_dial_errors_total { name="vminsert", addr=%q} ` , addr ) ) ,
handshakeErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_handshake_errors_total { name="vminsert", addr=%q} ` , addr ) ) ,
connectionErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_connection_errors_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsPushed : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_pushed_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsSent : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_sent_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsReroutedFromHere : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_rerouted_from_here_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsReroutedToHere : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_rerouted_to_here_total { name="vminsert", addr=%q} ` , addr ) ) ,
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
_ = metrics . NewGauge ( fmt . Sprintf ( ` vm_rpc_rows_pending { name="vminsert", addr=%q} ` , addr ) , func ( ) float64 {
2020-05-27 14:07:16 +02:00
sn . brLock . Lock ( )
n := sn . br . rows
sn . brLock . Unlock ( )
2019-06-08 21:29:25 +02:00
return float64 ( n )
} )
_ = metrics . NewGauge ( fmt . Sprintf ( ` vm_rpc_buf_pending_bytes { name="vminsert", addr=%q} ` , addr ) , func ( ) float64 {
2020-05-27 14:07:16 +02:00
sn . brLock . Lock ( )
n := len ( sn . br . buf )
sn . brLock . Unlock ( )
2019-06-08 21:29:25 +02:00
return float64 ( n )
} )
2020-11-17 21:13:20 +01:00
_ = metrics . NewGauge ( fmt . Sprintf ( ` vm_rpc_vmstorage_is_reachable { name="vminsert", addr=%q} ` , addr ) , func ( ) float64 {
if sn . isBroken ( ) {
return 0
}
return 1
} )
2019-05-22 23:23:23 +02:00
storageNodes = append ( storageNodes , sn )
}
2019-06-08 21:29:25 +02:00
2020-05-27 14:07:16 +02:00
maxBufSizePerStorageNode = memory . Allowed ( ) / 8 / len ( storageNodes )
2020-05-25 00:39:24 +02:00
if maxBufSizePerStorageNode > consts . MaxInsertPacketSize {
maxBufSizePerStorageNode = consts . MaxInsertPacketSize
}
2019-09-11 13:25:53 +02:00
reroutedBufMaxSize = memory . Allowed ( ) / 16
2020-05-27 14:07:16 +02:00
if reroutedBufMaxSize < maxBufSizePerStorageNode {
reroutedBufMaxSize = maxBufSizePerStorageNode
}
if reroutedBufMaxSize > maxBufSizePerStorageNode * len ( storageNodes ) {
reroutedBufMaxSize = maxBufSizePerStorageNode * len ( storageNodes )
}
2020-05-28 18:57:05 +02:00
for idx , sn := range storageNodes {
storageNodesWG . Add ( 1 )
go func ( sn * storageNode , idx int ) {
sn . run ( storageNodesStopCh , idx )
storageNodesWG . Done ( )
} ( sn , idx )
}
2019-06-08 21:29:25 +02:00
rerouteWorkerWG . Add ( 1 )
go func ( ) {
rerouteWorker ( rerouteWorkerStopCh )
rerouteWorkerWG . Done ( )
} ( )
2019-05-22 23:23:23 +02:00
}
// Stop gracefully stops netstorage.
func Stop ( ) {
2019-06-08 21:29:25 +02:00
close ( rerouteWorkerStopCh )
rerouteWorkerWG . Wait ( )
close ( storageNodesStopCh )
2019-05-22 23:23:23 +02:00
storageNodesWG . Wait ( )
}
2019-06-08 21:29:25 +02:00
2020-06-25 15:38:42 +02:00
// addToReroutedBufMayBlock adds buf to reroutedBR.
2020-05-27 14:07:16 +02:00
//
// It waits until the reroutedBR has enough space for buf or if Stop is called.
// This guarantees backpressure if the ingestion rate exceeds vmstorage nodes'
// ingestion rate capacity.
//
// It returns non-nil error only in the following cases:
//
// - if all the storage nodes are unhealthy.
// - if Stop is called.
2020-06-25 15:38:42 +02:00
func addToReroutedBufMayBlock ( buf [ ] byte , rows int ) error {
2020-05-27 14:07:16 +02:00
if len ( buf ) > reroutedBufMaxSize {
logger . Panicf ( "BUG: len(buf)=%d cannot exceed reroutedBufMaxSize=%d" , len ( buf ) , reroutedBufMaxSize )
2019-06-08 21:29:25 +02:00
}
2020-05-27 14:07:16 +02:00
reroutedBRLock . Lock ( )
defer reroutedBRLock . Unlock ( )
for len ( reroutedBR . buf ) + len ( buf ) > reroutedBufMaxSize {
if getHealthyStorageNodesCount ( ) == 0 {
rowsLostTotal . Add ( rows )
2020-08-31 16:51:40 +02:00
return fmt . Errorf ( "all the vmstorage nodes are unavailable and reroutedBR has no enough space for storing %d bytes; " +
2020-09-28 20:35:40 +02:00
"only %d free bytes left out of %d bytes in reroutedBR" ,
2020-08-31 16:51:40 +02:00
len ( buf ) , reroutedBufMaxSize - len ( reroutedBR . buf ) , reroutedBufMaxSize )
2020-05-27 14:07:16 +02:00
}
select {
case <- rerouteWorkerStopCh :
rowsLostTotal . Add ( rows )
return fmt . Errorf ( "rerouteWorker cannot send the data since it is stopped" )
default :
}
2020-09-28 20:35:40 +02:00
// The reroutedBR.buf has no enough space for len(buf). Wait while the reroutedBR.buf is sent by rerouteWorker.
2020-05-27 14:07:16 +02:00
reroutedBufWaits . Inc ( )
reroutedBRCond . Wait ( )
}
reroutedBR . buf = append ( reroutedBR . buf , buf ... )
reroutedBR . rows += rows
2019-06-08 21:29:25 +02:00
reroutesTotal . Inc ( )
2020-05-27 14:07:16 +02:00
return nil
2019-06-08 21:29:25 +02:00
}
2020-05-27 14:07:16 +02:00
func getHealthyStorageNodesCount ( ) int {
n := 0
for _ , sn := range storageNodes {
if ! sn . isBroken ( ) {
n ++
}
}
return n
}
2019-06-08 21:29:25 +02:00
2020-05-27 14:07:16 +02:00
func getHealthyStorageNodes ( ) [ ] * storageNode {
sns := make ( [ ] * storageNode , 0 , len ( storageNodes ) - 1 )
for _ , sn := range storageNodes {
if ! sn . isBroken ( ) {
sns = append ( sns , sn )
}
2019-06-08 21:29:25 +02:00
}
2020-05-27 14:07:16 +02:00
return sns
}
2019-06-08 21:29:25 +02:00
2020-09-28 20:35:40 +02:00
func getHealthyStorageNodesBlocking ( stopCh <- chan struct { } ) [ ] * storageNode {
// Wait for at least a single healthy storage node.
for {
2020-11-13 23:43:32 +01:00
// Wake up goroutines blocked at addToReroutedBufMayBlock, so they can return error to the caller if reroutedBR is full.
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896
reroutedBRCond . Broadcast ( )
2020-09-28 20:35:40 +02:00
sns := getHealthyStorageNodes ( )
if len ( sns ) > 0 {
return sns
}
// There is no healthy storage nodes.
// Wait for a while until such nodes appear.
t := timerpool . Get ( time . Second )
select {
case <- stopCh :
timerpool . Put ( t )
return nil
case <- t . C :
timerpool . Put ( t )
}
}
}
func spreadReroutedBufToStorageNodesBlocking ( stopCh <- chan struct { } , br * bufRows ) {
2019-06-08 21:29:25 +02:00
var mr storage . MetricRow
rowsProcessed := 0
2020-06-25 15:38:42 +02:00
defer reroutedRowsProcessed . Add ( rowsProcessed )
2020-09-29 00:27:36 +02:00
sns := getHealthyStorageNodesBlocking ( stopCh )
if len ( sns ) == 0 {
// stopCh is notified to stop.
return
}
2020-05-27 14:07:16 +02:00
src := br . buf
2019-06-08 21:29:25 +02:00
for len ( src ) > 0 {
tail , err := mr . Unmarshal ( src )
if err != nil {
2020-05-27 14:07:16 +02:00
logger . Panicf ( "BUG: cannot unmarshal MetricRow from reroutedBR.buf: %s" , err )
2019-06-08 21:29:25 +02:00
}
rowBuf := src [ : len ( src ) - len ( tail ) ]
src = tail
2020-06-25 15:38:42 +02:00
rowsProcessed ++
2020-09-28 20:35:40 +02:00
var h uint64
if len ( storageNodes ) > 1 {
2020-05-30 12:51:07 +02:00
// Do not use jump.Hash(h, int32(len(sns))) here,
// since this leads to uneven distribution of rerouted rows among sns -
// they all go to the original or to the next sn.
2020-09-28 20:35:40 +02:00
h = xxhash . Sum64 ( mr . MetricNameRaw )
2020-05-27 14:07:16 +02:00
}
2019-06-08 21:29:25 +02:00
for {
2020-09-28 20:35:40 +02:00
idx := h % uint64 ( len ( sns ) )
sn := sns [ idx ]
if sn . sendReroutedRow ( rowBuf ) {
// The row has been successfully re-routed to sn.
break
2019-09-11 13:25:53 +02:00
}
2020-09-28 20:35:40 +02:00
// The row cannot be re-routed to sn. Wait for a while and try again.
// Do not re-route the row to the remaining storage nodes,
// since this may result in increased resource usage (CPU, memory, disk IO) on these nodes,
// because they'll have to accept and register new time series (this is resource-intensive operation).
//
// Do not skip rowBuf in the hope it may be sent later, since this wastes CPU time for no reason.
rerouteErrors . Inc ( )
t := timerpool . Get ( 200 * time . Millisecond )
select {
case <- stopCh :
// stopCh is notified to stop.
timerpool . Put ( t )
return
case <- t . C :
timerpool . Put ( t )
2020-06-25 15:38:42 +02:00
}
2020-09-29 00:27:36 +02:00
// Obtain fresh list of healthy storage nodes after the delay, since it may be already updated.
sns = getHealthyStorageNodesBlocking ( stopCh )
if len ( sns ) == 0 {
// stopCh is notified to stop.
return
}
2019-06-08 21:29:25 +02:00
}
}
2020-05-27 14:07:16 +02:00
}
func ( sn * storageNode ) sendReroutedRow ( buf [ ] byte ) bool {
sn . brLock . Lock ( )
ok := len ( sn . br . buf ) + len ( buf ) <= maxBufSizePerStorageNode
if ok {
sn . br . buf = append ( sn . br . buf , buf ... )
sn . br . rows ++
sn . rowsReroutedToHere . Inc ( )
}
sn . brLock . Unlock ( )
return ok
2019-06-08 21:29:25 +02:00
}
var (
2020-05-25 00:39:24 +02:00
maxBufSizePerStorageNode int
2020-05-27 14:07:16 +02:00
reroutedBR bufRows
reroutedBRLock sync . Mutex
reroutedBRCond = sync . NewCond ( & reroutedBRLock )
2019-08-23 09:29:40 +02:00
reroutedBufMaxSize int
2019-06-08 21:29:25 +02:00
reroutedRowsProcessed = metrics . NewCounter ( ` vm_rpc_rerouted_rows_processed_total { name="vminsert"} ` )
2020-05-27 14:07:16 +02:00
reroutedBufWaits = metrics . NewCounter ( ` vm_rpc_rerouted_buf_waits_total { name="vminsert"} ` )
2019-06-08 21:29:25 +02:00
reroutesTotal = metrics . NewCounter ( ` vm_rpc_reroutes_total { name="vminsert"} ` )
_ = metrics . NewGauge ( ` vm_rpc_rerouted_rows_pending { name="vminsert"} ` , func ( ) float64 {
2020-05-27 14:07:16 +02:00
reroutedBRLock . Lock ( )
n := reroutedBR . rows
reroutedBRLock . Unlock ( )
2019-06-08 21:29:25 +02:00
return float64 ( n )
} )
_ = metrics . NewGauge ( ` vm_rpc_rerouted_buf_pending_bytes { name="vminsert"} ` , func ( ) float64 {
2020-05-27 14:07:16 +02:00
reroutedBRLock . Lock ( )
n := len ( reroutedBR . buf )
reroutedBRLock . Unlock ( )
2019-06-08 21:29:25 +02:00
return float64 ( n )
} )
2020-05-28 18:57:05 +02:00
rerouteErrors = metrics . NewCounter ( ` vm_rpc_reroute_errors_total { name="vminsert"} ` )
rowsLostTotal = metrics . NewCounter ( ` vm_rpc_rows_lost_total { name="vminsert"} ` )
rowsIncompletelyReplicatedTotal = metrics . NewCounter ( ` vm_rpc_rows_incompletely_replicated_total { name="vminsert"} ` )
2019-06-08 21:29:25 +02:00
)