2019-05-22 23:23:23 +02:00
package netstorage
import (
2021-10-08 11:52:56 +02:00
"errors"
2019-05-24 11:51:07 +02:00
"flag"
2019-05-22 23:23:23 +02:00
"fmt"
2020-04-27 08:32:08 +02:00
"io"
2021-09-15 17:04:28 +02:00
"net"
2019-05-22 23:23:23 +02:00
"sync"
2020-05-27 14:07:16 +02:00
"sync/atomic"
2019-05-22 23:23:23 +02:00
"time"
2020-05-27 14:07:16 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2019-06-08 21:29:25 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-06-01 13:33:29 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-06-08 21:29:25 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
2019-06-08 21:29:25 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2020-09-28 20:35:40 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/metrics"
2019-06-08 21:29:25 +02:00
xxhash "github.com/cespare/xxhash/v2"
2019-05-22 23:23:23 +02:00
)
2020-05-28 18:57:05 +02:00
var (
2021-06-04 03:33:49 +02:00
disableRPCCompression = flag . Bool ( ` rpc.disableCompression ` , false , "Whether to disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage" )
2020-05-28 18:57:05 +02:00
replicationFactor = flag . Int ( "replicationFactor" , 1 , "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. " +
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. " +
"Higher values for -dedup.minScrapeInterval at vmselect is OK" )
2022-02-06 19:21:40 +01:00
disableRerouting = flag . Bool ( "disableRerouting" , true , "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate. See also -dropSamplesOnOverload" )
dropSamplesOnOverload = flag . Bool ( "dropSamplesOnOverload" , false , "Whether to drop incoming samples if the destination vmstorage node is overloaded and/or unavailable. This prioritizes cluster availability over consistency, e.g. the cluster continues accepting all the ingested samples, but some of them may be dropped if vmstorage nodes are temporarily unavailable and/or overloaded" )
2020-05-28 18:57:05 +02:00
)
2019-05-24 11:51:07 +02:00
2021-10-08 12:52:56 +02:00
var errStorageReadOnly = errors . New ( "storage node is read only" )
2021-10-08 11:52:56 +02:00
2022-02-06 19:20:02 +01:00
func ( sn * storageNode ) isReady ( ) bool {
return atomic . LoadUint32 ( & sn . broken ) == 0 && atomic . LoadUint32 ( & sn . isReadOnly ) == 0
2020-05-27 14:07:16 +02:00
}
// push pushes buf to sn internal bufs.
2019-05-22 23:23:23 +02:00
//
2020-05-27 14:07:16 +02:00
// This function doesn't block on fast path.
2022-02-06 19:20:02 +01:00
// It may block only if storageNodes cannot handle the incoming ingestion rate.
2020-05-27 14:07:16 +02:00
// This blocking provides backpressure to the caller.
//
// The function falls back to sending data to other vmstorage nodes
// if sn is currently unavailable or overloaded.
2019-06-08 21:29:25 +02:00
//
2022-02-06 19:20:02 +01:00
// rows must match the number of rows in the buf.
2019-06-08 21:29:25 +02:00
func ( sn * storageNode ) push ( buf [ ] byte , rows int ) error {
2020-05-25 00:39:24 +02:00
if len ( buf ) > maxBufSizePerStorageNode {
logger . Panicf ( "BUG: len(buf)=%d cannot exceed %d" , len ( buf ) , maxBufSizePerStorageNode )
2019-06-08 21:29:25 +02:00
}
sn . rowsPushed . Add ( rows )
2022-02-06 19:20:02 +01:00
if sn . trySendBuf ( buf , rows ) {
// Fast path - the buffer is successfully sent to sn.
return nil
}
2022-02-06 19:21:40 +01:00
if * dropSamplesOnOverload {
sn . rowsDroppedOnOverload . Add ( rows )
2022-02-07 14:38:54 +01:00
logger . WithThrottler ( "droppedSamplesOnOverload" , 5 * time . Second ) . Warnf (
"some rows dropped, because -dropSamplesOnOverload is set and vmstorage %s cannot accept new rows now. " +
"See vm_rpc_rows_dropped_on_overload_total metric at /metrics page" , sn . dialer . Addr ( ) )
2022-02-06 19:21:40 +01:00
return nil
}
2022-02-06 19:20:02 +01:00
// Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes.
if err := sn . rerouteBufToOtherStorageNodes ( buf , rows ) ; err != nil {
return fmt . Errorf ( "error when re-routing rows from %s: %w" , sn . dialer . Addr ( ) , err )
}
return nil
}
2019-06-08 21:29:25 +02:00
2022-02-06 19:20:02 +01:00
func ( sn * storageNode ) rerouteBufToOtherStorageNodes ( buf [ ] byte , rows int ) error {
2021-06-04 03:33:49 +02:00
sn . brLock . Lock ( )
again :
select {
case <- storageNodesStopCh :
sn . brLock . Unlock ( )
return fmt . Errorf ( "cannot send %d rows because of graceful shutdown" , rows )
default :
}
2022-02-06 19:20:02 +01:00
if ! sn . isReady ( ) {
2021-06-04 03:33:49 +02:00
if len ( storageNodes ) == 1 {
// There are no other storage nodes to re-route to. So wait until the current node becomes healthy.
sn . brCond . Wait ( )
goto again
}
sn . brLock . Unlock ( )
2022-02-06 19:20:02 +01:00
// The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set.
rowsProcessed , err := rerouteRowsToReadyStorageNodes ( sn , buf )
rows -= rowsProcessed
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "%d rows dropped because the current vsmtorage is unavailable and %w" , rows , err )
2019-06-08 21:29:25 +02:00
}
2019-05-22 23:23:23 +02:00
return nil
}
2020-05-27 14:07:16 +02:00
if len ( sn . br . buf ) + len ( buf ) <= maxBufSizePerStorageNode {
2019-06-08 21:29:25 +02:00
// Fast path: the buf contents fits sn.buf.
2020-05-27 14:07:16 +02:00
sn . br . buf = append ( sn . br . buf , buf ... )
sn . br . rows += rows
sn . brLock . Unlock ( )
2019-06-08 21:29:25 +02:00
return nil
2019-05-22 23:23:23 +02:00
}
2022-02-06 19:20:02 +01:00
// Slow path: the buf contents doesn't fit sn.buf, so try re-routing it to other vmstorage nodes.
2021-06-04 03:33:49 +02:00
if * disableRerouting || len ( storageNodes ) == 1 {
sn . brCond . Wait ( )
goto again
}
2020-05-27 14:07:16 +02:00
sn . brLock . Unlock ( )
2022-02-06 19:20:02 +01:00
rowsProcessed , err := rerouteRowsToFreeStorageNodes ( sn , buf )
rows -= rowsProcessed
if err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "%d rows dropped because the current vmstorage buf is full and %w" , rows , err )
2019-06-08 21:29:25 +02:00
}
return nil
}
2020-05-27 14:07:16 +02:00
var closedCh = func ( ) <- chan struct { } {
ch := make ( chan struct { } )
close ( ch )
return ch
} ( )
2019-06-08 21:29:25 +02:00
2020-05-28 18:57:05 +02:00
func ( sn * storageNode ) run ( stopCh <- chan struct { } , snIdx int ) {
replicas := * replicationFactor
if replicas <= 0 {
replicas = 1
}
if replicas > len ( storageNodes ) {
replicas = len ( storageNodes )
}
2021-10-08 12:52:56 +02:00
sn . readOnlyCheckerWG . Add ( 1 )
go func ( ) {
defer sn . readOnlyCheckerWG . Done ( )
sn . readOnlyChecker ( stopCh )
} ( )
defer sn . readOnlyCheckerWG . Wait ( )
2020-09-28 20:35:40 +02:00
ticker := time . NewTicker ( 200 * time . Millisecond )
2020-05-27 14:07:16 +02:00
defer ticker . Stop ( )
var br bufRows
2020-06-01 13:33:29 +02:00
brLastResetTime := fasttime . UnixTimestamp ( )
2020-05-27 14:07:16 +02:00
var waitCh <- chan struct { }
mustStop := false
for ! mustStop {
sn . brLock . Lock ( )
bufLen := len ( sn . br . buf )
sn . brLock . Unlock ( )
waitCh = nil
2020-09-28 20:35:40 +02:00
if bufLen > 0 {
// Do not sleep if sn.br.buf isn't empty.
2020-05-27 14:07:16 +02:00
waitCh = closedCh
}
select {
case <- stopCh :
mustStop = true
// Make sure the sn.buf is flushed last time before returning
// in order to send the remaining bits of data.
case <- ticker . C :
case <- waitCh :
}
2020-09-28 20:35:40 +02:00
sn . brLock . Lock ( )
sn . br , br = br , sn . br
2021-06-04 03:33:49 +02:00
sn . brCond . Broadcast ( )
2020-09-28 20:35:40 +02:00
sn . brLock . Unlock ( )
2020-06-01 13:33:29 +02:00
currentTime := fasttime . UnixTimestamp ( )
if len ( br . buf ) < cap ( br . buf ) / 4 && currentTime - brLastResetTime > 10 {
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br . buf = append ( br . buf [ : 0 : 0 ] , br . buf ... )
brLastResetTime = currentTime
}
2020-09-28 20:35:40 +02:00
sn . checkHealth ( )
2020-05-28 18:57:05 +02:00
if len ( br . buf ) == 0 {
2020-09-28 20:35:40 +02:00
// Nothing to send.
2020-05-27 14:07:16 +02:00
continue
}
2020-05-28 18:57:05 +02:00
// Send br to replicas storageNodes starting from snIdx.
2020-09-28 20:35:40 +02:00
for ! sendBufToReplicasNonblocking ( & br , snIdx , replicas ) {
t := timerpool . Get ( 200 * time . Millisecond )
select {
case <- stopCh :
timerpool . Put ( t )
return
case <- t . C :
timerpool . Put ( t )
sn . checkHealth ( )
}
2020-05-27 14:07:16 +02:00
}
2020-05-28 18:57:05 +02:00
br . reset ( )
}
}
2020-09-28 20:35:40 +02:00
func sendBufToReplicasNonblocking ( br * bufRows , snIdx , replicas int ) bool {
2022-02-07 14:38:54 +01:00
usedStorageNodes := make ( map [ * storageNode ] struct { } , replicas )
2020-05-28 18:57:05 +02:00
for i := 0 ; i < replicas ; i ++ {
idx := snIdx + i
attempts := 0
for {
attempts ++
if attempts > len ( storageNodes ) {
if i == 0 {
// The data wasn't replicated at all.
2022-02-07 14:38:54 +01:00
logger . WithThrottler ( "cannotReplicateDataBecauseNoStorageNodes" , 5 * time . Second ) . Warnf (
"cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; " +
"re-trying to send the data soon" , len ( br . buf ) , br . rows )
2020-05-28 18:57:05 +02:00
return false
}
// The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true.
rowsIncompletelyReplicatedTotal . Add ( br . rows )
2022-02-07 14:38:54 +01:00
logger . WithThrottler ( "incompleteReplication" , 5 * time . Second ) . Warnf (
"cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, " +
"since a part of storage nodes is temporarily unavailable" , i + 1 , replicas , * replicationFactor , len ( br . buf ) , br . rows )
2020-05-28 18:57:05 +02:00
return true
}
if idx >= len ( storageNodes ) {
2020-06-19 11:39:19 +02:00
idx %= len ( storageNodes )
2020-05-28 18:57:05 +02:00
}
sn := storageNodes [ idx ]
idx ++
2022-02-07 14:38:54 +01:00
if _ , ok := usedStorageNodes [ sn ] ; ok {
2020-05-28 18:57:05 +02:00
// The br has been already replicated to sn. Skip it.
continue
}
2020-09-28 20:35:40 +02:00
if ! sn . sendBufRowsNonblocking ( br ) {
2020-05-28 18:57:05 +02:00
// Cannot send data to sn. Go to the next sn.
continue
}
// Successfully sent data to sn.
2022-02-07 14:38:54 +01:00
usedStorageNodes [ sn ] = struct { } { }
2020-05-28 18:57:05 +02:00
break
2020-05-27 14:07:16 +02:00
}
2019-05-22 23:23:23 +02:00
}
2020-05-28 18:57:05 +02:00
return true
}
2020-06-18 19:41:33 +02:00
func ( sn * storageNode ) checkHealth ( ) {
sn . bcLock . Lock ( )
defer sn . bcLock . Unlock ( )
if sn . bc != nil {
2020-09-28 20:35:40 +02:00
// The sn looks healthy.
return
2020-06-18 19:41:33 +02:00
}
bc , err := sn . dial ( )
if err != nil {
2020-11-13 23:43:32 +01:00
atomic . StoreUint32 ( & sn . broken , 1 )
2021-06-04 03:33:49 +02:00
sn . brCond . Broadcast ( )
2020-09-28 23:20:01 +02:00
if sn . lastDialErr == nil {
// Log the error only once.
sn . lastDialErr = err
logger . Warnf ( "cannot dial storageNode %q: %s" , sn . dialer . Addr ( ) , err )
}
2020-06-18 19:51:28 +02:00
return
2020-06-18 19:41:33 +02:00
}
2020-09-28 20:35:40 +02:00
logger . Infof ( "successfully dialed -storageNode=%q" , sn . dialer . Addr ( ) )
2020-09-28 23:20:01 +02:00
sn . lastDialErr = nil
2020-06-18 19:41:33 +02:00
sn . bc = bc
atomic . StoreUint32 ( & sn . broken , 0 )
2021-06-04 03:33:49 +02:00
sn . brCond . Broadcast ( )
2020-06-18 19:41:33 +02:00
}
2020-09-28 20:35:40 +02:00
func ( sn * storageNode ) sendBufRowsNonblocking ( br * bufRows ) bool {
2022-02-06 19:20:02 +01:00
if ! sn . isReady ( ) {
2020-09-28 20:35:40 +02:00
return false
}
2020-05-28 18:57:05 +02:00
sn . bcLock . Lock ( )
defer sn . bcLock . Unlock ( )
if sn . bc == nil {
2020-09-28 20:35:40 +02:00
// Do not call sn.dial() here in order to prevent long blocking on sn.bcLock.Lock(),
// which can negatively impact data sending in sendBufToReplicasNonblocking().
2020-11-13 23:43:32 +01:00
// sn.dial() should be called by sn.checkHealth() on unsuccessful call to sendBufToReplicasNonblocking().
2020-09-28 20:35:40 +02:00
return false
2020-05-28 18:57:05 +02:00
}
2021-08-11 10:40:52 +02:00
startTime := time . Now ( )
2020-05-28 18:57:05 +02:00
err := sendToConn ( sn . bc , br . buf )
2021-08-11 10:40:52 +02:00
duration := time . Since ( startTime )
sn . sendDurationSeconds . Add ( duration . Seconds ( ) )
2020-05-28 18:57:05 +02:00
if err == nil {
2020-09-28 20:35:40 +02:00
// Successfully sent buf to bc.
2020-05-28 18:57:05 +02:00
sn . rowsSent . Add ( br . rows )
return true
}
2021-10-08 12:52:56 +02:00
if errors . Is ( err , errStorageReadOnly ) {
// The vmstorage is transitioned to readonly mode.
2021-10-08 11:52:56 +02:00
atomic . StoreUint32 ( & sn . isReadOnly , 1 )
sn . brCond . Broadcast ( )
2021-10-08 12:52:56 +02:00
// Signal the caller that the data wasn't accepted by the vmstorage,
// so it will be re-routed to the remaining vmstorage nodes.
2021-10-08 11:52:56 +02:00
return false
}
2020-05-28 18:57:05 +02:00
// Couldn't flush buf to sn. Mark sn as broken.
2022-02-07 14:38:54 +01:00
logger . WithThrottler ( "cannotSendBufRows" , 5 * time . Second ) . Warnf (
"cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and " +
"re-routing this data to healthy storage nodes" , len ( br . buf ) , br . rows , sn . dialer . Addr ( ) , err )
2020-05-28 18:57:05 +02:00
if err = sn . bc . Close ( ) ; err != nil {
2022-02-07 14:38:54 +01:00
logger . WithThrottler ( "cannotCloseStorageNodeConn" , 5 * time . Second ) . Warnf ( "cannot close connection to storageNode %q: %s" , sn . dialer . Addr ( ) , err )
2020-05-28 18:57:05 +02:00
}
sn . bc = nil
atomic . StoreUint32 ( & sn . broken , 1 )
2021-06-04 03:33:49 +02:00
sn . brCond . Broadcast ( )
2020-09-28 20:35:40 +02:00
sn . connectionErrors . Inc ( )
2020-05-28 18:57:05 +02:00
return false
2019-05-22 23:23:23 +02:00
}
2020-05-27 14:07:16 +02:00
func sendToConn ( bc * handshake . BufferedConn , buf [ ] byte ) error {
2019-09-11 13:25:53 +02:00
if len ( buf ) == 0 {
2020-05-27 14:07:16 +02:00
// Nothing to send
2019-09-11 13:25:53 +02:00
return nil
}
2020-01-21 17:20:14 +01:00
timeoutSeconds := len ( buf ) / 3e5
2019-09-13 21:25:15 +02:00
if timeoutSeconds < 60 {
timeoutSeconds = 60
2019-09-11 12:37:03 +02:00
}
timeout := time . Duration ( timeoutSeconds ) * time . Second
deadline := time . Now ( ) . Add ( timeout )
2020-04-27 08:32:08 +02:00
if err := bc . SetWriteDeadline ( deadline ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot set write deadline to %s: %w" , deadline , err )
2019-05-22 23:23:23 +02:00
}
2019-09-11 12:37:03 +02:00
// sizeBuf guarantees that the rows batch will be either fully
// read or fully discarded on the vmstorage side.
// sizeBuf is used for read optimization in vmstorage.
2020-05-27 14:07:16 +02:00
sizeBuf := sizeBufPool . Get ( )
defer sizeBufPool . Put ( sizeBuf )
sizeBuf . B = encoding . MarshalUint64 ( sizeBuf . B [ : 0 ] , uint64 ( len ( buf ) ) )
if _ , err := bc . Write ( sizeBuf . B ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot write data size %d: %w" , len ( buf ) , err )
2019-05-22 23:23:23 +02:00
}
2020-04-27 08:32:08 +02:00
if _ , err := bc . Write ( buf ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot write data with size %d: %w" , len ( buf ) , err )
2019-05-22 23:23:23 +02:00
}
2020-04-27 08:32:08 +02:00
if err := bc . Flush ( ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot flush data with size %d: %w" , len ( buf ) , err )
2019-06-08 21:29:25 +02:00
}
2020-04-27 08:32:08 +02:00
// Wait for `ack` from vmstorage.
// This guarantees that the message has been fully received by vmstorage.
2020-04-28 10:18:41 +02:00
deadline = time . Now ( ) . Add ( timeout )
2020-04-27 08:32:08 +02:00
if err := bc . SetReadDeadline ( deadline ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot set read deadline for reading `ack` to vmstorage: %w" , err )
2020-04-27 08:32:08 +02:00
}
2020-05-27 14:07:16 +02:00
if _ , err := io . ReadFull ( bc , sizeBuf . B [ : 1 ] ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot read `ack` from vmstorage: %w" , err )
2020-04-27 08:32:08 +02:00
}
2021-10-08 11:52:56 +02:00
ackResp := sizeBuf . B [ 0 ]
switch ackResp {
case 1 :
2021-10-08 12:52:56 +02:00
// ok response, data successfully accepted by vmstorage
2021-10-08 11:52:56 +02:00
case 2 :
2021-10-08 12:52:56 +02:00
// vmstorage is in readonly mode
return errStorageReadOnly
2021-10-08 11:52:56 +02:00
default :
return fmt . Errorf ( "unexpected `ack` received from vmstorage; got %d; want 1 or 2" , sizeBuf . B [ 0 ] )
2020-04-27 08:32:08 +02:00
}
2021-10-08 11:52:56 +02:00
2019-05-22 23:23:23 +02:00
return nil
}
2020-05-27 14:07:16 +02:00
var sizeBufPool bytesutil . ByteBufferPool
func ( sn * storageNode ) dial ( ) ( * handshake . BufferedConn , error ) {
2019-05-22 23:23:23 +02:00
c , err := sn . dialer . Dial ( )
if err != nil {
sn . dialErrors . Inc ( )
2020-05-27 14:07:16 +02:00
return nil , err
2019-05-22 23:23:23 +02:00
}
compressionLevel := 1
2019-05-24 11:51:07 +02:00
if * disableRPCCompression {
compressionLevel = 0
}
2019-05-22 23:23:23 +02:00
bc , err := handshake . VMInsertClient ( c , compressionLevel )
if err != nil {
_ = c . Close ( )
sn . handshakeErrors . Inc ( )
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "handshake error: %w" , err )
2019-05-22 23:23:23 +02:00
}
2020-05-27 14:07:16 +02:00
return bc , nil
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
// storageNode is a client sending data to vmstorage node.
2019-05-22 23:23:23 +02:00
type storageNode struct {
2020-05-27 14:07:16 +02:00
// broken is set to non-zero if the given vmstorage node is temporarily unhealthy.
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
broken uint32
2019-06-08 21:29:25 +02:00
2021-10-08 11:52:56 +02:00
// isReadOnly is set to non-zero if the given vmstorage node is read only
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
isReadOnly uint32
2020-05-27 14:07:16 +02:00
// brLock protects br.
brLock sync . Mutex
2019-05-22 23:23:23 +02:00
2021-06-04 03:33:49 +02:00
// brCond is used for waiting for free space in br.
brCond * sync . Cond
2020-05-27 14:07:16 +02:00
// Buffer with data that needs to be written to the storage node.
2020-05-28 18:57:05 +02:00
// It must be accessed under brLock.
2020-05-27 14:07:16 +02:00
br bufRows
2019-05-22 23:23:23 +02:00
2020-05-28 18:57:05 +02:00
// bcLock protects bc.
bcLock sync . Mutex
2021-10-08 12:52:56 +02:00
// waitGroup for readOnlyChecker
readOnlyCheckerWG sync . WaitGroup
2020-05-28 18:57:05 +02:00
// bc is a single connection to vmstorage for data transfer.
// It must be accessed under bcLock.
bc * handshake . BufferedConn
2019-06-08 21:29:25 +02:00
dialer * netutil . TCPDialer
2020-09-28 23:20:01 +02:00
// last error during dial.
lastDialErr error
2019-06-08 21:29:25 +02:00
// The number of dial errors to vmstorage node.
2019-05-22 23:23:23 +02:00
dialErrors * metrics . Counter
2019-06-08 21:29:25 +02:00
// The number of handshake errors to vmstorage node.
2019-05-22 23:23:23 +02:00
handshakeErrors * metrics . Counter
2019-06-08 21:29:25 +02:00
// The number of connection errors to vmstorage node.
2019-05-22 23:23:23 +02:00
connectionErrors * metrics . Counter
2019-06-08 21:29:25 +02:00
// The number of rows pushed to storageNode with push method.
rowsPushed * metrics . Counter
// The number of rows sent to vmstorage node.
rowsSent * metrics . Counter
2019-05-22 23:23:23 +02:00
2022-02-06 19:21:40 +01:00
// The number of rows dropped on overload if -dropSamplesOnOverload is set.
rowsDroppedOnOverload * metrics . Counter
2019-06-08 21:29:25 +02:00
// The number of rows rerouted from the given vmstorage node
// to healthy nodes when the given node was unhealthy.
rowsReroutedFromHere * metrics . Counter
// The number of rows rerouted to the given vmstorage node
// from other nodes when they were unhealthy.
rowsReroutedToHere * metrics . Counter
2021-08-11 10:40:52 +02:00
// The total duration spent for sending data to vmstorage node.
// This metric is useful for determining the saturation of vminsert->vmstorage link.
sendDurationSeconds * metrics . FloatCounter
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
// storageNodes contains a list of vmstorage node clients.
2019-05-22 23:23:23 +02:00
var storageNodes [ ] * storageNode
2021-06-04 03:33:49 +02:00
var storageNodesWG sync . WaitGroup
2019-05-22 23:23:23 +02:00
2021-06-04 03:33:49 +02:00
var storageNodesStopCh = make ( chan struct { } )
2019-05-22 23:23:23 +02:00
2022-02-06 19:20:02 +01:00
// nodesHash is used for consistently selecting a storage node by key.
var nodesHash * consistentHash
2021-10-07 11:21:42 +02:00
2019-06-08 21:29:25 +02:00
// InitStorageNodes initializes vmstorage nodes' connections to the given addrs.
2021-10-07 11:21:42 +02:00
//
2022-02-06 19:20:02 +01:00
// hashSeed is used for changing the distribution of input time series among addrs.
func InitStorageNodes ( addrs [ ] string , hashSeed uint64 ) {
2019-05-22 23:23:23 +02:00
if len ( addrs ) == 0 {
logger . Panicf ( "BUG: addrs must be non-empty" )
}
2022-02-06 19:20:02 +01:00
nodesHash = newConsistentHash ( addrs , hashSeed )
2020-05-28 18:57:05 +02:00
storageNodes = storageNodes [ : 0 ]
2019-05-22 23:23:23 +02:00
for _ , addr := range addrs {
2021-09-15 17:04:28 +02:00
if _ , _ , err := net . SplitHostPort ( addr ) ; err != nil {
// Automatically add missing port.
addr += ":8400"
}
2019-05-22 23:23:23 +02:00
sn := & storageNode {
dialer : netutil . NewTCPDialer ( "vminsert" , addr ) ,
2022-02-06 19:21:40 +01:00
dialErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_dial_errors_total { name="vminsert", addr=%q} ` , addr ) ) ,
handshakeErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_handshake_errors_total { name="vminsert", addr=%q} ` , addr ) ) ,
connectionErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_connection_errors_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsPushed : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_pushed_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsSent : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_sent_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsDroppedOnOverload : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_dropped_on_overload_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsReroutedFromHere : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_rerouted_from_here_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsReroutedToHere : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_rerouted_to_here_total { name="vminsert", addr=%q} ` , addr ) ) ,
sendDurationSeconds : metrics . NewFloatCounter ( fmt . Sprintf ( ` vm_rpc_send_duration_seconds_total { name="vminsert", addr=%q} ` , addr ) ) ,
2019-05-22 23:23:23 +02:00
}
2021-06-04 03:33:49 +02:00
sn . brCond = sync . NewCond ( & sn . brLock )
2019-06-08 21:29:25 +02:00
_ = metrics . NewGauge ( fmt . Sprintf ( ` vm_rpc_rows_pending { name="vminsert", addr=%q} ` , addr ) , func ( ) float64 {
2020-05-27 14:07:16 +02:00
sn . brLock . Lock ( )
n := sn . br . rows
sn . brLock . Unlock ( )
2019-06-08 21:29:25 +02:00
return float64 ( n )
} )
_ = metrics . NewGauge ( fmt . Sprintf ( ` vm_rpc_buf_pending_bytes { name="vminsert", addr=%q} ` , addr ) , func ( ) float64 {
2020-05-27 14:07:16 +02:00
sn . brLock . Lock ( )
n := len ( sn . br . buf )
sn . brLock . Unlock ( )
2019-06-08 21:29:25 +02:00
return float64 ( n )
} )
2020-11-17 21:13:20 +01:00
_ = metrics . NewGauge ( fmt . Sprintf ( ` vm_rpc_vmstorage_is_reachable { name="vminsert", addr=%q} ` , addr ) , func ( ) float64 {
2021-10-08 11:52:56 +02:00
if atomic . LoadUint32 ( & sn . broken ) != 0 {
2020-11-17 21:13:20 +01:00
return 0
}
return 1
} )
2021-10-08 11:52:56 +02:00
_ = metrics . NewGauge ( fmt . Sprintf ( ` vm_rpc_vmstorage_is_read_only { name="vminsert", addr=%q} ` , addr ) , func ( ) float64 {
return float64 ( atomic . LoadUint32 ( & sn . isReadOnly ) )
} )
2019-05-22 23:23:23 +02:00
storageNodes = append ( storageNodes , sn )
}
2019-06-08 21:29:25 +02:00
2020-05-27 14:07:16 +02:00
maxBufSizePerStorageNode = memory . Allowed ( ) / 8 / len ( storageNodes )
2020-05-25 00:39:24 +02:00
if maxBufSizePerStorageNode > consts . MaxInsertPacketSize {
maxBufSizePerStorageNode = consts . MaxInsertPacketSize
}
2020-05-28 18:57:05 +02:00
for idx , sn := range storageNodes {
storageNodesWG . Add ( 1 )
go func ( sn * storageNode , idx int ) {
sn . run ( storageNodesStopCh , idx )
storageNodesWG . Done ( )
} ( sn , idx )
}
2019-05-22 23:23:23 +02:00
}
// Stop gracefully stops netstorage.
func Stop ( ) {
2019-06-08 21:29:25 +02:00
close ( storageNodesStopCh )
2021-06-04 03:33:49 +02:00
for _ , sn := range storageNodes {
sn . brCond . Broadcast ( )
}
2019-05-22 23:23:23 +02:00
storageNodesWG . Wait ( )
}
2019-06-08 21:29:25 +02:00
2022-02-06 19:20:02 +01:00
// rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes.
2020-05-27 14:07:16 +02:00
//
2022-02-06 19:20:02 +01:00
// The function blocks until src is fully re-routed.
func rerouteRowsToReadyStorageNodes ( snSource * storageNode , src [ ] byte ) ( int , error ) {
2022-02-07 13:35:39 +01:00
reroutesTotal . Inc ( )
2022-02-06 19:20:02 +01:00
rowsProcessed := 0
var idxsExclude , idxsExcludeNew [ ] int
idxsExclude = getNotReadyStorageNodeIdxsBlocking ( idxsExclude [ : 0 ] , nil )
var mr storage . MetricRow
for len ( src ) > 0 {
tail , err := mr . UnmarshalX ( src )
if err != nil {
logger . Panicf ( "BUG: cannot unmarshal MetricRow: %s" , err )
}
rowBuf := src [ : len ( src ) - len ( tail ) ]
src = tail
reroutedRowsProcessed . Inc ( )
h := xxhash . Sum64 ( mr . MetricNameRaw )
mr . ResetX ( )
var sn * storageNode
for {
idx := nodesHash . getNodeIdx ( h , idxsExclude )
sn = storageNodes [ idx ]
if sn . isReady ( ) {
break
}
// re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxsBlocking ( idxsExclude [ : 0 ] , nil )
}
if * disableRerouting {
if ! sn . sendBufMayBlock ( rowBuf ) {
return rowsProcessed , fmt . Errorf ( "graceful shutdown started" )
}
rowsProcessed ++
if sn != snSource {
snSource . rowsReroutedFromHere . Inc ( )
sn . rowsReroutedToHere . Inc ( )
}
continue
}
2022-02-07 13:35:39 +01:00
again :
2022-02-06 19:20:02 +01:00
if sn . trySendBuf ( rowBuf , 1 ) {
rowsProcessed ++
if sn != snSource {
snSource . rowsReroutedFromHere . Inc ( )
sn . rowsReroutedToHere . Inc ( )
}
continue
}
// If the re-routing is enabled, then try sending the row to another storage node.
2022-02-07 13:35:39 +01:00
idxsExcludeNew = getNotReadyStorageNodeIdxs ( idxsExcludeNew [ : 0 ] , sn )
2022-02-06 19:20:02 +01:00
idx := nodesHash . getNodeIdx ( h , idxsExcludeNew )
snNew := storageNodes [ idx ]
if snNew . trySendBuf ( rowBuf , 1 ) {
rowsProcessed ++
if snNew != snSource {
snSource . rowsReroutedFromHere . Inc ( )
snNew . rowsReroutedToHere . Inc ( )
}
continue
2021-06-05 15:16:16 +02:00
}
2022-02-07 13:35:39 +01:00
// The row cannot be sent to both snSource and the re-routed sn without blocking.
// Sleep for a while and try sending the row to snSource again.
time . Sleep ( 100 * time . Millisecond )
goto again
2020-09-28 20:35:40 +02:00
}
2022-02-06 19:20:02 +01:00
return rowsProcessed , nil
}
// reouteRowsToFreeStorageNodes re-routes src from snSource to other storage nodes.
//
// It is expected that snSource has no enough buffer for sending src.
// It is expected than *dsableRerouting isn't set when calling this function.
func rerouteRowsToFreeStorageNodes ( snSource * storageNode , src [ ] byte ) ( int , error ) {
if * disableRerouting {
logger . Panicf ( "BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes" )
}
2022-02-07 13:35:39 +01:00
reroutesTotal . Inc ( )
2022-02-06 19:20:02 +01:00
rowsProcessed := 0
var idxsExclude [ ] int
2022-02-07 13:35:39 +01:00
idxsExclude = getNotReadyStorageNodeIdxs ( idxsExclude [ : 0 ] , snSource )
2021-06-05 15:16:16 +02:00
var mr storage . MetricRow
2019-06-08 21:29:25 +02:00
for len ( src ) > 0 {
2021-05-08 16:55:44 +02:00
tail , err := mr . UnmarshalX ( src )
2019-06-08 21:29:25 +02:00
if err != nil {
2021-06-04 03:33:49 +02:00
logger . Panicf ( "BUG: cannot unmarshal MetricRow: %s" , err )
2019-06-08 21:29:25 +02:00
}
rowBuf := src [ : len ( src ) - len ( tail ) ]
src = tail
2021-06-04 03:33:49 +02:00
reroutedRowsProcessed . Inc ( )
h := xxhash . Sum64 ( mr . MetricNameRaw )
2021-05-08 16:55:44 +02:00
mr . ResetX ( )
2022-02-06 19:20:02 +01:00
// Try sending the row to snSource in order to minimize re-routing.
2022-02-07 13:35:39 +01:00
again :
2022-02-06 19:20:02 +01:00
if snSource . trySendBuf ( rowBuf , 1 ) {
rowsProcessed ++
continue
2021-06-04 03:33:49 +02:00
}
2022-02-06 19:20:02 +01:00
// The row couldn't be sent to snSrouce. Try re-routing it to other nodes.
var sn * storageNode
for {
idx := nodesHash . getNodeIdx ( h , idxsExclude )
sn = storageNodes [ idx ]
if sn . isReady ( ) {
break
}
// re-generate idxsExclude list, since sn must be put there.
2022-02-07 13:35:39 +01:00
idxsExclude = getNotReadyStorageNodeIdxs ( idxsExclude [ : 0 ] , snSource )
2022-02-06 19:20:02 +01:00
}
if sn . trySendBuf ( rowBuf , 1 ) {
rowsProcessed ++
2021-06-05 15:16:16 +02:00
snSource . rowsReroutedFromHere . Inc ( )
sn . rowsReroutedToHere . Inc ( )
2022-02-06 19:20:02 +01:00
continue
}
2022-02-07 13:35:39 +01:00
// The row cannot be sent to both snSource and the re-routed sn without blocking.
// Sleep for a while and try sending the row to snSource again.
time . Sleep ( 100 * time . Millisecond )
goto again
2019-06-08 21:29:25 +02:00
}
2022-02-06 19:20:02 +01:00
return rowsProcessed , nil
}
func getNotReadyStorageNodeIdxsBlocking ( dst [ ] int , snExtra * storageNode ) [ ] int {
dst = getNotReadyStorageNodeIdxs ( dst [ : 0 ] , snExtra )
if len ( dst ) < len ( storageNodes ) {
return dst
}
2022-02-07 14:38:54 +01:00
logger . WithThrottler ( "storageNodesUnavailable" , 5 * time . Second ) . Warnf (
"all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available" )
2022-02-06 19:20:02 +01:00
for {
time . Sleep ( time . Second )
dst = getNotReadyStorageNodeIdxs ( dst [ : 0 ] , snExtra )
if availableNodes := len ( storageNodes ) - len ( dst ) ; availableNodes > 0 {
2022-02-07 14:38:54 +01:00
logger . WithThrottler ( "storageNodesBecameAvailable" , 5 * time . Second ) . Warnf ( "%d vmstorage nodes became available, so continue data processing" , availableNodes )
2022-02-06 19:20:02 +01:00
return dst
}
}
}
func getNotReadyStorageNodeIdxs ( dst [ ] int , snExtra * storageNode ) [ ] int {
dst = dst [ : 0 ]
for i , sn := range storageNodes {
2022-02-07 12:20:43 +01:00
if sn == snExtra || ! sn . isReady ( ) {
2022-02-06 19:20:02 +01:00
dst = append ( dst , i )
}
}
return dst
}
func ( sn * storageNode ) trySendBuf ( buf [ ] byte , rows int ) bool {
sent := false
sn . brLock . Lock ( )
if sn . isReady ( ) && len ( sn . br . buf ) + len ( buf ) <= maxBufSizePerStorageNode {
sn . br . buf = append ( sn . br . buf , buf ... )
sn . br . rows += rows
sent = true
}
sn . brLock . Unlock ( )
return sent
2020-05-27 14:07:16 +02:00
}
2021-06-04 03:33:49 +02:00
func ( sn * storageNode ) sendBufMayBlock ( buf [ ] byte ) bool {
2020-05-27 14:07:16 +02:00
sn . brLock . Lock ( )
2021-06-04 03:33:49 +02:00
for len ( sn . br . buf ) + len ( buf ) > maxBufSizePerStorageNode {
select {
case <- storageNodesStopCh :
sn . brLock . Unlock ( )
return false
default :
}
sn . brCond . Wait ( )
2020-05-27 14:07:16 +02:00
}
2021-06-04 03:33:49 +02:00
sn . br . buf = append ( sn . br . buf , buf ... )
sn . br . rows ++
2020-05-27 14:07:16 +02:00
sn . brLock . Unlock ( )
2021-06-04 03:33:49 +02:00
return true
2019-06-08 21:29:25 +02:00
}
2021-10-08 12:52:56 +02:00
func ( sn * storageNode ) readOnlyChecker ( stop <- chan struct { } ) {
ticker := time . NewTicker ( time . Second * 30 )
defer ticker . Stop ( )
for {
select {
case <- stop :
2021-10-08 11:52:56 +02:00
return
2021-10-08 12:52:56 +02:00
case <- ticker . C :
sn . checkReadOnlyMode ( )
2021-10-08 11:52:56 +02:00
}
}
2021-10-08 12:52:56 +02:00
}
func ( sn * storageNode ) checkReadOnlyMode ( ) {
if atomic . LoadUint32 ( & sn . isReadOnly ) == 0 {
// fast path - the sn isn't in readonly mode
return
}
// Check whether the storage remains in readonly mode
sn . bcLock . Lock ( )
defer sn . bcLock . Unlock ( )
if sn . bc == nil {
return
}
// send nil buff to check ack response from storage
err := sendToConn ( sn . bc , nil )
if err == nil {
// The storage switched from readonly to non-readonly mode
atomic . StoreUint32 ( & sn . isReadOnly , 0 )
return
}
if ! errors . Is ( err , errStorageReadOnly ) {
logger . Errorf ( "cannot check storage readonly mode for -storageNode=%q: %s" , sn . dialer . Addr ( ) , err )
}
2021-10-08 11:52:56 +02:00
}
2019-06-08 21:29:25 +02:00
var (
2020-05-25 00:39:24 +02:00
maxBufSizePerStorageNode int
2021-06-04 03:33:49 +02:00
reroutedRowsProcessed = metrics . NewCounter ( ` vm_rpc_rerouted_rows_processed_total { name="vminsert"} ` )
reroutesTotal = metrics . NewCounter ( ` vm_rpc_reroutes_total { name="vminsert"} ` )
2020-05-28 18:57:05 +02:00
rowsIncompletelyReplicatedTotal = metrics . NewCounter ( ` vm_rpc_rows_incompletely_replicated_total { name="vminsert"} ` )
2019-06-08 21:29:25 +02:00
)