2019-05-22 23:23:23 +02:00
package netstorage
import (
2019-05-24 11:51:07 +02:00
"flag"
2019-05-22 23:23:23 +02:00
"fmt"
"sync"
"time"
2019-06-08 21:29:25 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-06-08 21:29:25 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
2019-06-08 21:29:25 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
2019-05-22 23:23:23 +02:00
"github.com/VictoriaMetrics/metrics"
2019-06-08 21:29:25 +02:00
xxhash "github.com/cespare/xxhash/v2"
2019-05-22 23:23:23 +02:00
)
2019-05-24 11:51:07 +02:00
var disableRPCCompression = flag . Bool ( ` rpc.disableCompression ` , false , "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage" )
2019-06-08 21:29:25 +02:00
// push pushes buf to sn.
2019-05-22 23:23:23 +02:00
//
2019-06-08 21:29:25 +02:00
// It falls back to sending data to another vmstorage node if sn is currently
2019-05-22 23:23:23 +02:00
// unavailable.
2019-06-08 21:29:25 +02:00
//
// rows is the number of rows in the buf.
func ( sn * storageNode ) push ( buf [ ] byte , rows int ) error {
if len ( buf ) > consts . MaxInsertPacketSize {
logger . Panicf ( "BUG: len(buf)=%d cannot exceed %d" , len ( buf ) , consts . MaxInsertPacketSize )
}
sn . rowsPushed . Add ( rows )
sn . mu . Lock ( )
defer sn . mu . Unlock ( )
if sn . broken {
// The vmstorage node is broken. Re-route buf to healthy vmstorage nodes.
if err := addToReroutedBuf ( buf , rows ) ; err != nil {
rowsLostTotal . Add ( rows )
return err
}
sn . rowsReroutedFromHere . Add ( rows )
2019-05-22 23:23:23 +02:00
return nil
}
2019-06-08 21:29:25 +02:00
if len ( sn . buf ) + len ( buf ) <= consts . MaxInsertPacketSize {
// Fast path: the buf contents fits sn.buf.
sn . buf = append ( sn . buf , buf ... )
sn . rows += rows
return nil
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
// Slow path: the buf contents doesn't fit sn.buf.
// Flush sn.buf to vmstorage and then add buf to sn.buf.
if err := sn . flushBufLocked ( ) ; err != nil {
// Failed to flush or re-route sn.buf to vmstorage nodes.
// The sn.buf is already dropped by flushBufLocked.
// Drop buf too, since there is litte sense in trying to rescue it.
rowsLostTotal . Add ( rows )
2019-05-22 23:23:23 +02:00
return err
}
2019-06-08 21:29:25 +02:00
// Successful flush.
sn . buf = append ( sn . buf , buf ... )
sn . rows += rows
return nil
}
func ( sn * storageNode ) sendReroutedRow ( buf [ ] byte ) error {
sn . mu . Lock ( )
defer sn . mu . Unlock ( )
if sn . broken {
return errBrokenStorageNode
}
if len ( sn . buf ) + len ( buf ) > consts . MaxInsertPacketSize {
return fmt . Errorf ( "cannot put %d bytes into vmstorage buffer, since its size cannot exceed %d bytes" , len ( sn . buf ) + len ( buf ) , consts . MaxInsertPacketSize )
}
sn . buf = append ( sn . buf , buf ... )
sn . rows ++
return nil
}
var errBrokenStorageNode = fmt . Errorf ( "the vmstorage node is temporarily broken" )
func ( sn * storageNode ) flushBufLocked ( ) error {
if err := sn . sendBufLocked ( sn . buf ) ; err == nil {
// Successful flush. Remove broken flag.
sn . broken = false
sn . rowsSent . Add ( sn . rows )
sn . buf = sn . buf [ : 0 ]
sn . rows = 0
return nil
}
// Couldn't flush sn.buf to vmstorage. Mark sn as broken
// and try re-routing sn.buf to healthy vmstorage nodes.
sn . broken = true
err := addToReroutedBuf ( sn . buf , sn . rows )
if err != nil {
rowsLostTotal . Add ( sn . rows )
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
sn . buf = sn . buf [ : 0 ]
sn . rows = 0
2019-05-22 23:23:23 +02:00
return err
}
2019-06-08 21:29:25 +02:00
func ( sn * storageNode ) sendBufLocked ( buf [ ] byte ) error {
2019-05-22 23:23:23 +02:00
// sizeBuf guarantees that the rows batch will be either fully
2019-06-08 21:29:25 +02:00
// read or fully discarded on the vmstorage side.
2019-05-22 23:23:23 +02:00
// sizeBuf is used for read optimization in vmstorage.
if sn . bc == nil {
if err := sn . dial ( ) ; err != nil {
return fmt . Errorf ( "cannot dial %q: %s" , sn . dialer . Addr ( ) , err )
}
}
2019-06-08 21:29:25 +02:00
if len ( buf ) == 0 {
return nil
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
deadline := time . Now ( ) . Add ( 30 * time . Second )
2019-05-22 23:23:23 +02:00
if err := sn . bc . SetWriteDeadline ( deadline ) ; err != nil {
2019-06-08 21:29:25 +02:00
sn . closeBrokenConn ( )
2019-05-22 23:23:23 +02:00
return fmt . Errorf ( "cannot set write deadline to %s: %s" , deadline , err )
}
2019-06-08 21:29:25 +02:00
sn . sizeBuf = encoding . MarshalUint64 ( sn . sizeBuf [ : 0 ] , uint64 ( len ( buf ) ) )
if _ , err := sn . bc . Write ( sn . sizeBuf ) ; err != nil {
sn . closeBrokenConn ( )
2019-05-22 23:23:23 +02:00
return fmt . Errorf ( "cannot write data size %d: %s" , len ( buf ) , err )
}
if _ , err := sn . bc . Write ( buf ) ; err != nil {
2019-06-08 21:29:25 +02:00
sn . closeBrokenConn ( )
2019-05-22 23:23:23 +02:00
return fmt . Errorf ( "cannot write data: %s" , err )
}
2019-06-08 21:29:25 +02:00
if err := sn . bc . Flush ( ) ; err != nil {
sn . closeBrokenConn ( )
return fmt . Errorf ( "cannot flush data: %s" , err )
}
2019-05-22 23:23:23 +02:00
return nil
}
func ( sn * storageNode ) dial ( ) error {
c , err := sn . dialer . Dial ( )
if err != nil {
sn . dialErrors . Inc ( )
return err
}
compressionLevel := 1
2019-05-24 11:51:07 +02:00
if * disableRPCCompression {
compressionLevel = 0
}
2019-05-22 23:23:23 +02:00
bc , err := handshake . VMInsertClient ( c , compressionLevel )
if err != nil {
_ = c . Close ( )
sn . handshakeErrors . Inc ( )
return fmt . Errorf ( "handshake error: %s" , err )
}
sn . bc = bc
return nil
}
2019-06-08 21:29:25 +02:00
func ( sn * storageNode ) closeBrokenConn ( ) {
2019-05-22 23:23:23 +02:00
_ = sn . bc . Close ( )
sn . bc = nil
sn . connectionErrors . Inc ( )
}
2019-06-08 21:29:25 +02:00
func ( sn * storageNode ) run ( stopCh <- chan struct { } ) {
t := time . NewTimer ( time . Second )
2019-05-22 23:23:23 +02:00
mustStop := false
for ! mustStop {
select {
case <- stopCh :
mustStop = true
2019-06-08 21:29:25 +02:00
// Make sure flushBufLocked is called last time before returning
// in order to send the remaining bits of data.
case <- t . C :
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
sn . mu . Lock ( )
if err := sn . flushBufLocked ( ) ; err != nil {
sn . closeBrokenConn ( )
2019-05-22 23:23:23 +02:00
logger . Errorf ( "cannot flush data to storageNode %q: %s" , sn . dialer . Addr ( ) , err )
}
2019-06-08 21:29:25 +02:00
sn . mu . Unlock ( )
t . Reset ( time . Second )
2019-05-22 23:23:23 +02:00
}
}
2019-06-08 21:29:25 +02:00
func rerouteWorker ( stopCh <- chan struct { } ) {
t := time . NewTimer ( time . Second )
var buf [ ] byte
mustStop := false
for ! mustStop {
select {
case <- stopCh :
mustStop = true
// Make sure spreadReroutedBufToStorageNodes is called last time before returning
// in order to reroute the remaining data to healthy vmstorage nodes.
case <- t . C :
}
var err error
buf , err = spreadReroutedBufToStorageNodes ( buf [ : 0 ] )
if err != nil {
rerouteErrors . Inc ( )
logger . Errorf ( "cannot reroute data among healthy vmstorage nodes: %s" , err )
}
t . Reset ( time . Second )
2019-05-22 23:23:23 +02:00
}
}
2019-06-08 21:29:25 +02:00
// storageNode is a client sending data to vmstorage node.
2019-05-22 23:23:23 +02:00
type storageNode struct {
2019-06-08 21:29:25 +02:00
mu sync . Mutex
// Buffer with data that needs to be written to vmstorage node.
buf [ ] byte
// The number of rows buf contains at the moment.
rows int
// Temporary buffer for encoding marshaled buf size.
sizeBuf [ ] byte
2019-05-22 23:23:23 +02:00
2019-06-08 21:29:25 +02:00
// broken is set to true if the given vmstorage node is temporarily unhealthy.
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
broken bool
2019-05-22 23:23:23 +02:00
2019-06-08 21:29:25 +02:00
dialer * netutil . TCPDialer
bc * handshake . BufferedConn
2019-05-22 23:23:23 +02:00
2019-06-08 21:29:25 +02:00
// The number of dial errors to vmstorage node.
2019-05-22 23:23:23 +02:00
dialErrors * metrics . Counter
2019-06-08 21:29:25 +02:00
// The number of handshake errors to vmstorage node.
2019-05-22 23:23:23 +02:00
handshakeErrors * metrics . Counter
2019-06-08 21:29:25 +02:00
// The number of connection errors to vmstorage node.
2019-05-22 23:23:23 +02:00
connectionErrors * metrics . Counter
2019-06-08 21:29:25 +02:00
// The number of rows pushed to storageNode with push method.
rowsPushed * metrics . Counter
// The number of rows sent to vmstorage node.
rowsSent * metrics . Counter
2019-05-22 23:23:23 +02:00
2019-06-08 21:29:25 +02:00
// The number of rows rerouted from the given vmstorage node
// to healthy nodes when the given node was unhealthy.
rowsReroutedFromHere * metrics . Counter
// The number of rows rerouted to the given vmstorage node
// from other nodes when they were unhealthy.
rowsReroutedToHere * metrics . Counter
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
// storageNodes contains a list of vmstorage node clients.
2019-05-22 23:23:23 +02:00
var storageNodes [ ] * storageNode
2019-06-08 21:29:25 +02:00
var (
storageNodesWG sync . WaitGroup
rerouteWorkerWG sync . WaitGroup
)
2019-05-22 23:23:23 +02:00
2019-06-08 21:29:25 +02:00
var (
storageNodesStopCh = make ( chan struct { } )
rerouteWorkerStopCh = make ( chan struct { } )
)
2019-05-22 23:23:23 +02:00
2019-06-08 21:29:25 +02:00
// InitStorageNodes initializes vmstorage nodes' connections to the given addrs.
2019-05-22 23:23:23 +02:00
func InitStorageNodes ( addrs [ ] string ) {
if len ( addrs ) == 0 {
logger . Panicf ( "BUG: addrs must be non-empty" )
}
if len ( addrs ) > 255 {
logger . Panicf ( "BUG: too much addresses: %d; max supported %d addresses" , len ( addrs ) , 255 )
}
for _ , addr := range addrs {
sn := & storageNode {
dialer : netutil . NewTCPDialer ( "vminsert" , addr ) ,
2019-06-08 21:29:25 +02:00
dialErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_dial_errors_total { name="vminsert", addr=%q} ` , addr ) ) ,
handshakeErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_handshake_errors_total { name="vminsert", addr=%q} ` , addr ) ) ,
connectionErrors : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_connection_errors_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsPushed : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_pushed_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsSent : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_sent_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsReroutedFromHere : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_rerouted_from_here_total { name="vminsert", addr=%q} ` , addr ) ) ,
rowsReroutedToHere : metrics . NewCounter ( fmt . Sprintf ( ` vm_rpc_rows_rerouted_to_here_total { name="vminsert", addr=%q} ` , addr ) ) ,
2019-05-22 23:23:23 +02:00
}
2019-06-08 21:29:25 +02:00
_ = metrics . NewGauge ( fmt . Sprintf ( ` vm_rpc_rows_pending { name="vminsert", addr=%q} ` , addr ) , func ( ) float64 {
sn . mu . Lock ( )
n := sn . rows
sn . mu . Unlock ( )
return float64 ( n )
} )
_ = metrics . NewGauge ( fmt . Sprintf ( ` vm_rpc_buf_pending_bytes { name="vminsert", addr=%q} ` , addr ) , func ( ) float64 {
sn . mu . Lock ( )
n := len ( sn . buf )
sn . mu . Unlock ( )
return float64 ( n )
} )
2019-05-22 23:23:23 +02:00
storageNodes = append ( storageNodes , sn )
storageNodesWG . Add ( 1 )
go func ( addr string ) {
2019-06-08 21:29:25 +02:00
sn . run ( storageNodesStopCh )
2019-05-22 23:23:23 +02:00
storageNodesWG . Done ( )
} ( addr )
}
2019-06-08 21:29:25 +02:00
rerouteWorkerWG . Add ( 1 )
go func ( ) {
rerouteWorker ( rerouteWorkerStopCh )
rerouteWorkerWG . Done ( )
} ( )
2019-05-22 23:23:23 +02:00
}
// Stop gracefully stops netstorage.
func Stop ( ) {
2019-06-08 21:29:25 +02:00
close ( rerouteWorkerStopCh )
rerouteWorkerWG . Wait ( )
close ( storageNodesStopCh )
2019-05-22 23:23:23 +02:00
storageNodesWG . Wait ( )
}
2019-06-08 21:29:25 +02:00
func addToReroutedBuf ( buf [ ] byte , rows int ) error {
reroutedBufMaxSize := memory . Allowed ( ) / 8
reroutedLock . Lock ( )
defer reroutedLock . Unlock ( )
if len ( reroutedBuf ) + len ( buf ) > reroutedBufMaxSize {
reroutedBufOverflows . Inc ( )
return fmt . Errorf ( "%d rows dropped because of reroutedBuf overflows %d bytes" , rows , reroutedBufMaxSize )
}
reroutedBuf = append ( reroutedBuf , buf ... )
reroutedRows += rows
reroutesTotal . Inc ( )
return nil
}
func spreadReroutedBufToStorageNodes ( swapBuf [ ] byte ) ( [ ] byte , error ) {
reroutedLock . Lock ( )
reroutedBuf , swapBuf = swapBuf [ : 0 ] , reroutedBuf
rows := reroutedRows
reroutedRows = 0
reroutedLock . Unlock ( )
if len ( swapBuf ) == 0 {
// Nothing to re-route.
return swapBuf , nil
}
healthyStorageNodes := getHealthyStorageNodes ( )
if len ( healthyStorageNodes ) == 0 {
// No more vmstorage nodes to write data to.
// Try returning the the data to reroutedBuf if it has enough free space.
recovered := false
reroutedLock . Lock ( )
if len ( swapBuf ) + len ( reroutedBuf ) <= consts . MaxInsertPacketSize {
swapBuf = append ( swapBuf , reroutedBuf ... )
reroutedBuf , swapBuf = swapBuf , reroutedBuf [ : 0 ]
reroutedRows += rows
recovered = true
}
reroutedLock . Unlock ( )
if recovered {
return swapBuf , nil
}
rowsLostTotal . Add ( rows )
return swapBuf , fmt . Errorf ( "all the %d vmstorage nodes are unealthy; lost %d rows" , len ( storageNodes ) , rows )
}
var mr storage . MetricRow
src := swapBuf
rowsProcessed := 0
for len ( src ) > 0 {
tail , err := mr . Unmarshal ( src )
if err != nil {
logger . Panicf ( "BUG: cannot unmarshal recently marshaled MetricRow: %s" , err )
}
rowBuf := src [ : len ( src ) - len ( tail ) ]
src = tail
// Use non-consistent hashing instead of jump hash in order to re-route rows
// equally among healthy vmstorage nodes.
// This should spread the increased load among healthy vmstorage nodes.
h := xxhash . Sum64 ( mr . MetricNameRaw )
idx := h % uint64 ( len ( healthyStorageNodes ) )
attempts := 0
for {
sn := healthyStorageNodes [ idx ]
err := sn . sendReroutedRow ( rowBuf )
if err == nil {
sn . rowsReroutedToHere . Inc ( )
break
}
// Cannot send data to sn. Try sending to the next vmstorage node.
idx ++
if idx >= uint64 ( len ( healthyStorageNodes ) ) {
idx = 0
}
attempts ++
if attempts == len ( healthyStorageNodes ) {
// There are no healthy nodes.
// Try returning the remaining data to reroutedBuf if it has enough free space.
rowsRemaining := rows - rowsProcessed
recovered := false
reroutedLock . Lock ( )
if len ( rowBuf ) + len ( tail ) + len ( reroutedBuf ) <= consts . MaxInsertPacketSize {
swapBuf = append ( swapBuf [ : 0 ] , rowBuf ... )
swapBuf = append ( swapBuf , tail ... )
swapBuf = append ( swapBuf , reroutedBuf ... )
reroutedBuf , swapBuf = swapBuf , reroutedBuf [ : 0 ]
reroutedRows += rowsRemaining
recovered = true
}
reroutedLock . Unlock ( )
if recovered {
return swapBuf , nil
}
rowsLostTotal . Add ( rowsRemaining )
return swapBuf , fmt . Errorf ( "all the %d vmstorage nodes are unavailable; lost %d rows; last error: %s" , len ( storageNodes ) , rowsRemaining , err )
}
}
rowsProcessed ++
}
if rowsProcessed != rows {
logger . Panicf ( "BUG: unexpected number of rows processed; got %d; want %d" , rowsProcessed , rows )
}
reroutedRowsProcessed . Add ( rowsProcessed )
return swapBuf , nil
}
var (
reroutedLock sync . Mutex
reroutedBuf [ ] byte
reroutedRows int
reroutedRowsProcessed = metrics . NewCounter ( ` vm_rpc_rerouted_rows_processed_total { name="vminsert"} ` )
reroutedBufOverflows = metrics . NewCounter ( ` vm_rpc_rerouted_buf_overflows_total { name="vminsert"} ` )
reroutesTotal = metrics . NewCounter ( ` vm_rpc_reroutes_total { name="vminsert"} ` )
_ = metrics . NewGauge ( ` vm_rpc_rerouted_rows_pending { name="vminsert"} ` , func ( ) float64 {
reroutedLock . Lock ( )
n := reroutedRows
reroutedLock . Unlock ( )
return float64 ( n )
} )
_ = metrics . NewGauge ( ` vm_rpc_rerouted_buf_pending_bytes { name="vminsert"} ` , func ( ) float64 {
reroutedLock . Lock ( )
n := len ( reroutedBuf )
reroutedLock . Unlock ( )
return float64 ( n )
} )
rerouteErrors = metrics . NewCounter ( ` vm_rpc_reroute_errors_total { name="vminsert"} ` )
rowsLostTotal = metrics . NewCounter ( ` vm_rpc_rows_lost_total { name="vminsert"} ` )
)
func getHealthyStorageNodes ( ) [ ] * storageNode {
sns := make ( [ ] * storageNode , 0 , len ( storageNodes ) - 1 )
for _ , sn := range storageNodes {
sn . mu . Lock ( )
if ! sn . broken {
sns = append ( sns , sn )
}
sn . mu . Unlock ( )
}
return sns
}