mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 07:19:17 +01:00
app/vminsert: improve handling of unhealthy vmstorage nodes
* Spread load evenly among remaining healthy nodes instead of hammering the next node after the unhealthy node. * Make sure that the packet is flushed to storage node before returning success. Previously packets could stay in local buffers and thus lost on connection errors. * Keep rows in the limited memory when all the storage nodes are unhealthy.
This commit is contained in:
parent
0f64673327
commit
5fcdb4a59a
@ -18,9 +18,24 @@ type InsertCtx struct {
|
||||
Labels []prompb.Label
|
||||
MetricNameBuf []byte
|
||||
|
||||
bufs [][]byte
|
||||
bufRowss []bufRows
|
||||
labelsBuf []byte
|
||||
sizeBuf [8]byte
|
||||
}
|
||||
|
||||
type bufRows struct {
|
||||
buf []byte
|
||||
rows int
|
||||
}
|
||||
|
||||
func (br *bufRows) pushTo(sn *storageNode) error {
|
||||
bufLen := len(br.buf)
|
||||
err := sn.push(br.buf, br.rows)
|
||||
br.buf = br.buf[:0]
|
||||
br.rows = 0
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot send %d bytes to storageNode %q: %s", bufLen, sn.dialer.Addr(), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reset resets ctx.
|
||||
@ -32,11 +47,13 @@ func (ctx *InsertCtx) Reset() {
|
||||
ctx.Labels = ctx.Labels[:0]
|
||||
ctx.MetricNameBuf = ctx.MetricNameBuf[:0]
|
||||
|
||||
if ctx.bufs == nil {
|
||||
ctx.bufs = make([][]byte, len(storageNodes))
|
||||
if ctx.bufRowss == nil {
|
||||
ctx.bufRowss = make([]bufRows, len(storageNodes))
|
||||
}
|
||||
for i := range ctx.bufs {
|
||||
ctx.bufs[i] = ctx.bufs[i][:0]
|
||||
for i := range ctx.bufRowss {
|
||||
br := &ctx.bufRowss[i]
|
||||
br.buf = br.buf[:0]
|
||||
br.rows = 0
|
||||
}
|
||||
ctx.labelsBuf = ctx.labelsBuf[:0]
|
||||
}
|
||||
@ -70,34 +87,41 @@ func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, time
|
||||
|
||||
// WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx.
|
||||
func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error {
|
||||
buf := ctx.bufs[storageNodeIdx]
|
||||
br := &ctx.bufRowss[storageNodeIdx]
|
||||
sn := storageNodes[storageNodeIdx]
|
||||
bufNew := storage.MarshalMetricRow(buf, metricNameRaw, timestamp, value)
|
||||
if len(bufNew) >= consts.MaxInsertPacketSize {
|
||||
bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value)
|
||||
if len(bufNew) >= maxStorageNodeBufSize {
|
||||
// Send buf to storageNode, since it is too big.
|
||||
if err := sn.sendWithFallback(buf, ctx.sizeBuf[:]); err != nil {
|
||||
return fmt.Errorf("cannot send %d bytes to storageNodes: %s", len(buf), err)
|
||||
if err := br.pushTo(sn); err != nil {
|
||||
return err
|
||||
}
|
||||
buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value)
|
||||
br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value)
|
||||
} else {
|
||||
buf = bufNew
|
||||
br.buf = bufNew
|
||||
}
|
||||
ctx.bufs[storageNodeIdx] = buf
|
||||
sn.RowsPushed.Inc()
|
||||
br.rows++
|
||||
return nil
|
||||
}
|
||||
|
||||
var maxStorageNodeBufSize = func() int {
|
||||
n := 1024 * 1024
|
||||
if n > consts.MaxInsertPacketSize {
|
||||
n = consts.MaxInsertPacketSize
|
||||
}
|
||||
return n
|
||||
}()
|
||||
|
||||
// FlushBufs flushes ctx bufs to remote storage nodes.
|
||||
func (ctx *InsertCtx) FlushBufs() error {
|
||||
// Send per-storageNode bufs.
|
||||
sizeBuf := ctx.sizeBuf[:]
|
||||
for i, buf := range ctx.bufs {
|
||||
if len(buf) == 0 {
|
||||
for i := range ctx.bufRowss {
|
||||
br := &ctx.bufRowss[i]
|
||||
if len(br.buf) == 0 {
|
||||
continue
|
||||
}
|
||||
sn := storageNodes[i]
|
||||
if err := sn.sendWithFallback(buf, sizeBuf); err != nil {
|
||||
return fmt.Errorf("cannot send data to storageNodes: %s", err)
|
||||
if err := br.pushTo(sn); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -6,93 +6,136 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
var disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage")
|
||||
|
||||
// sendWithFallback sends buf to storage node sn.
|
||||
// push pushes buf to sn.
|
||||
//
|
||||
// It falls back to sending data to another storage node if sn is currently
|
||||
// It falls back to sending data to another vmstorage node if sn is currently
|
||||
// unavailable.
|
||||
func (sn *storageNode) sendWithFallback(buf []byte, sizeBuf []byte) error {
|
||||
deadline := time.Now().Add(30 * time.Second)
|
||||
err := sn.sendBuf(buf, deadline, sizeBuf)
|
||||
if err == nil {
|
||||
//
|
||||
// rows is the number of rows in the buf.
|
||||
func (sn *storageNode) push(buf []byte, rows int) error {
|
||||
if len(buf) > consts.MaxInsertPacketSize {
|
||||
logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), consts.MaxInsertPacketSize)
|
||||
}
|
||||
sn.rowsPushed.Add(rows)
|
||||
|
||||
sn.mu.Lock()
|
||||
defer sn.mu.Unlock()
|
||||
|
||||
if sn.broken {
|
||||
// The vmstorage node is broken. Re-route buf to healthy vmstorage nodes.
|
||||
if err := addToReroutedBuf(buf, rows); err != nil {
|
||||
rowsLostTotal.Add(rows)
|
||||
return err
|
||||
}
|
||||
sn.rowsReroutedFromHere.Add(rows)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Failed to send the data to sn. Try sending it to another storageNodes.
|
||||
if time.Until(deadline) <= 0 {
|
||||
sn.timeouts.Inc()
|
||||
if len(sn.buf)+len(buf) <= consts.MaxInsertPacketSize {
|
||||
// Fast path: the buf contents fits sn.buf.
|
||||
sn.buf = append(sn.buf, buf...)
|
||||
sn.rows += rows
|
||||
return nil
|
||||
}
|
||||
|
||||
// Slow path: the buf contents doesn't fit sn.buf.
|
||||
// Flush sn.buf to vmstorage and then add buf to sn.buf.
|
||||
if err := sn.flushBufLocked(); err != nil {
|
||||
// Failed to flush or re-route sn.buf to vmstorage nodes.
|
||||
// The sn.buf is already dropped by flushBufLocked.
|
||||
// Drop buf too, since there is litte sense in trying to rescue it.
|
||||
rowsLostTotal.Add(rows)
|
||||
return err
|
||||
}
|
||||
if len(storageNodes) == 1 {
|
||||
return err
|
||||
|
||||
// Successful flush.
|
||||
sn.buf = append(sn.buf, buf...)
|
||||
sn.rows += rows
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sn *storageNode) sendReroutedRow(buf []byte) error {
|
||||
sn.mu.Lock()
|
||||
defer sn.mu.Unlock()
|
||||
|
||||
if sn.broken {
|
||||
return errBrokenStorageNode
|
||||
}
|
||||
idx := func() int {
|
||||
for i, snOther := range storageNodes {
|
||||
if sn == snOther {
|
||||
return i
|
||||
}
|
||||
}
|
||||
logger.Panicf("BUG: cannot find storageNode %p in storageNodes %p", sn, storageNodes)
|
||||
return -1
|
||||
}()
|
||||
for i := 0; i < len(storageNodes); i++ {
|
||||
idx++
|
||||
if idx >= len(storageNodes) {
|
||||
idx = 0
|
||||
}
|
||||
err = storageNodes[idx].sendBuf(buf, deadline, sizeBuf)
|
||||
if err == nil {
|
||||
storageNodes[idx].fallbacks.Inc()
|
||||
return nil
|
||||
}
|
||||
if time.Until(deadline) <= 0 {
|
||||
sn.timeouts.Inc()
|
||||
return err
|
||||
}
|
||||
if len(sn.buf)+len(buf) > consts.MaxInsertPacketSize {
|
||||
return fmt.Errorf("cannot put %d bytes into vmstorage buffer, since its size cannot exceed %d bytes", len(sn.buf)+len(buf), consts.MaxInsertPacketSize)
|
||||
}
|
||||
sn.buf = append(sn.buf, buf...)
|
||||
sn.rows++
|
||||
return nil
|
||||
}
|
||||
|
||||
var errBrokenStorageNode = fmt.Errorf("the vmstorage node is temporarily broken")
|
||||
|
||||
func (sn *storageNode) flushBufLocked() error {
|
||||
if err := sn.sendBufLocked(sn.buf); err == nil {
|
||||
// Successful flush. Remove broken flag.
|
||||
sn.broken = false
|
||||
sn.rowsSent.Add(sn.rows)
|
||||
sn.buf = sn.buf[:0]
|
||||
sn.rows = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
// Couldn't flush sn.buf to vmstorage. Mark sn as broken
|
||||
// and try re-routing sn.buf to healthy vmstorage nodes.
|
||||
sn.broken = true
|
||||
err := addToReroutedBuf(sn.buf, sn.rows)
|
||||
if err != nil {
|
||||
rowsLostTotal.Add(sn.rows)
|
||||
}
|
||||
sn.buf = sn.buf[:0]
|
||||
sn.rows = 0
|
||||
return err
|
||||
}
|
||||
|
||||
func (sn *storageNode) sendBuf(buf []byte, deadline time.Time, sizeBuf []byte) error {
|
||||
func (sn *storageNode) sendBufLocked(buf []byte) error {
|
||||
// sizeBuf guarantees that the rows batch will be either fully
|
||||
// read or fully discarded on the vmstorage.
|
||||
// read or fully discarded on the vmstorage side.
|
||||
// sizeBuf is used for read optimization in vmstorage.
|
||||
encoding.MarshalUint64(sizeBuf[:0], uint64(len(buf)))
|
||||
|
||||
sn.bcLock.Lock()
|
||||
defer sn.bcLock.Unlock()
|
||||
|
||||
if sn.bc == nil {
|
||||
if err := sn.dial(); err != nil {
|
||||
return fmt.Errorf("cannot dial %q: %s", sn.dialer.Addr(), err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := sn.sendBufNolock(buf, deadline, sizeBuf); err != nil {
|
||||
sn.closeConn()
|
||||
return err
|
||||
if len(buf) == 0 {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sn *storageNode) sendBufNolock(buf []byte, deadline time.Time, sizeBuf []byte) error {
|
||||
deadline := time.Now().Add(30 * time.Second)
|
||||
if err := sn.bc.SetWriteDeadline(deadline); err != nil {
|
||||
sn.closeBrokenConn()
|
||||
return fmt.Errorf("cannot set write deadline to %s: %s", deadline, err)
|
||||
}
|
||||
if _, err := sn.bc.Write(sizeBuf); err != nil {
|
||||
sn.sizeBuf = encoding.MarshalUint64(sn.sizeBuf[:0], uint64(len(buf)))
|
||||
if _, err := sn.bc.Write(sn.sizeBuf); err != nil {
|
||||
sn.closeBrokenConn()
|
||||
return fmt.Errorf("cannot write data size %d: %s", len(buf), err)
|
||||
}
|
||||
if _, err := sn.bc.Write(buf); err != nil {
|
||||
sn.closeBrokenConn()
|
||||
return fmt.Errorf("cannot write data: %s", err)
|
||||
}
|
||||
if err := sn.bc.Flush(); err != nil {
|
||||
sn.closeBrokenConn()
|
||||
return fmt.Errorf("cannot flush data: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -102,7 +145,6 @@ func (sn *storageNode) dial() error {
|
||||
sn.dialErrors.Inc()
|
||||
return err
|
||||
}
|
||||
|
||||
compressionLevel := 1
|
||||
if *disableRPCCompression {
|
||||
compressionLevel = 0
|
||||
@ -113,79 +155,121 @@ func (sn *storageNode) dial() error {
|
||||
sn.handshakeErrors.Inc()
|
||||
return fmt.Errorf("handshake error: %s", err)
|
||||
}
|
||||
|
||||
sn.bc = bc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sn *storageNode) closeConn() {
|
||||
func (sn *storageNode) closeBrokenConn() {
|
||||
_ = sn.bc.Close()
|
||||
sn.bc = nil
|
||||
sn.connectionErrors.Inc()
|
||||
}
|
||||
|
||||
func (sn *storageNode) run() {
|
||||
func (sn *storageNode) run(stopCh <-chan struct{}) {
|
||||
t := time.NewTimer(time.Second)
|
||||
mustStop := false
|
||||
for !mustStop {
|
||||
select {
|
||||
case <-stopCh:
|
||||
mustStop = true
|
||||
case <-time.After(time.Second):
|
||||
// Make sure flushBufLocked is called last time before returning
|
||||
// in order to send the remaining bits of data.
|
||||
case <-t.C:
|
||||
}
|
||||
|
||||
sn.bcLock.Lock()
|
||||
if err := sn.flushNolock(); err != nil {
|
||||
sn.closeConn()
|
||||
sn.mu.Lock()
|
||||
if err := sn.flushBufLocked(); err != nil {
|
||||
sn.closeBrokenConn()
|
||||
logger.Errorf("cannot flush data to storageNode %q: %s", sn.dialer.Addr(), err)
|
||||
}
|
||||
sn.bcLock.Unlock()
|
||||
sn.mu.Unlock()
|
||||
|
||||
t.Reset(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (sn *storageNode) flushNolock() error {
|
||||
if sn.bc == nil {
|
||||
return nil
|
||||
func rerouteWorker(stopCh <-chan struct{}) {
|
||||
t := time.NewTimer(time.Second)
|
||||
var buf []byte
|
||||
mustStop := false
|
||||
for !mustStop {
|
||||
select {
|
||||
case <-stopCh:
|
||||
mustStop = true
|
||||
// Make sure spreadReroutedBufToStorageNodes is called last time before returning
|
||||
// in order to reroute the remaining data to healthy vmstorage nodes.
|
||||
case <-t.C:
|
||||
}
|
||||
|
||||
var err error
|
||||
buf, err = spreadReroutedBufToStorageNodes(buf[:0])
|
||||
if err != nil {
|
||||
rerouteErrors.Inc()
|
||||
logger.Errorf("cannot reroute data among healthy vmstorage nodes: %s", err)
|
||||
}
|
||||
t.Reset(time.Second)
|
||||
}
|
||||
if err := sn.bc.SetWriteDeadline(time.Now().Add(30 * time.Second)); err != nil {
|
||||
return fmt.Errorf("cannot set write deadline: %s", err)
|
||||
}
|
||||
return sn.bc.Flush()
|
||||
}
|
||||
|
||||
// storageNode is a client sending data to storage node.
|
||||
// storageNode is a client sending data to vmstorage node.
|
||||
type storageNode struct {
|
||||
mu sync.Mutex
|
||||
|
||||
// Buffer with data that needs to be written to vmstorage node.
|
||||
buf []byte
|
||||
|
||||
// The number of rows buf contains at the moment.
|
||||
rows int
|
||||
|
||||
// Temporary buffer for encoding marshaled buf size.
|
||||
sizeBuf []byte
|
||||
|
||||
// broken is set to true if the given vmstorage node is temporarily unhealthy.
|
||||
// In this case the data is re-routed to the remaining healthy vmstorage nodes.
|
||||
broken bool
|
||||
|
||||
dialer *netutil.TCPDialer
|
||||
|
||||
bc *handshake.BufferedConn
|
||||
bcLock sync.Mutex
|
||||
bc *handshake.BufferedConn
|
||||
|
||||
// The number of times the storage node was timed out (overflown).
|
||||
timeouts *metrics.Counter
|
||||
|
||||
// The number of dial errors to storage node.
|
||||
// The number of dial errors to vmstorage node.
|
||||
dialErrors *metrics.Counter
|
||||
|
||||
// The number of handshake errors to storage node.
|
||||
// The number of handshake errors to vmstorage node.
|
||||
handshakeErrors *metrics.Counter
|
||||
|
||||
// The number of connection errors to storage node.
|
||||
// The number of connection errors to vmstorage node.
|
||||
connectionErrors *metrics.Counter
|
||||
|
||||
// The number of fallbacks to this node.
|
||||
fallbacks *metrics.Counter
|
||||
// The number of rows pushed to storageNode with push method.
|
||||
rowsPushed *metrics.Counter
|
||||
|
||||
// The number of rows pushed to storage node.
|
||||
RowsPushed *metrics.Counter
|
||||
// The number of rows sent to vmstorage node.
|
||||
rowsSent *metrics.Counter
|
||||
|
||||
// The number of rows rerouted from the given vmstorage node
|
||||
// to healthy nodes when the given node was unhealthy.
|
||||
rowsReroutedFromHere *metrics.Counter
|
||||
|
||||
// The number of rows rerouted to the given vmstorage node
|
||||
// from other nodes when they were unhealthy.
|
||||
rowsReroutedToHere *metrics.Counter
|
||||
}
|
||||
|
||||
// storageNodes contains a list of storage node clients.
|
||||
// storageNodes contains a list of vmstorage node clients.
|
||||
var storageNodes []*storageNode
|
||||
|
||||
var storageNodesWG sync.WaitGroup
|
||||
var (
|
||||
storageNodesWG sync.WaitGroup
|
||||
rerouteWorkerWG sync.WaitGroup
|
||||
)
|
||||
|
||||
var stopCh = make(chan struct{})
|
||||
var (
|
||||
storageNodesStopCh = make(chan struct{})
|
||||
rerouteWorkerStopCh = make(chan struct{})
|
||||
)
|
||||
|
||||
// InitStorageNodes initializes storage nodes' connections to the given addrs.
|
||||
// InitStorageNodes initializes vmstorage nodes' connections to the given addrs.
|
||||
func InitStorageNodes(addrs []string) {
|
||||
if len(addrs) == 0 {
|
||||
logger.Panicf("BUG: addrs must be non-empty")
|
||||
@ -198,24 +282,191 @@ func InitStorageNodes(addrs []string) {
|
||||
sn := &storageNode{
|
||||
dialer: netutil.NewTCPDialer("vminsert", addr),
|
||||
|
||||
timeouts: metrics.NewCounter(fmt.Sprintf(`vm_rpc_timeouts_total{name="vminsert", addr=%q}`, addr)),
|
||||
dialErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
|
||||
handshakeErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
|
||||
connectionErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
|
||||
fallbacks: metrics.NewCounter(fmt.Sprintf(`vm_rpc_fallbacks_total{name="vminsert", addr=%q}`, addr)),
|
||||
RowsPushed: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
|
||||
dialErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
|
||||
handshakeErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
|
||||
connectionErrors: metrics.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
|
||||
rowsPushed: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
|
||||
rowsSent: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)),
|
||||
rowsReroutedFromHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
|
||||
rowsReroutedToHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
|
||||
}
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_rows_pending{name="vminsert", addr=%q}`, addr), func() float64 {
|
||||
sn.mu.Lock()
|
||||
n := sn.rows
|
||||
sn.mu.Unlock()
|
||||
return float64(n)
|
||||
})
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_buf_pending_bytes{name="vminsert", addr=%q}`, addr), func() float64 {
|
||||
sn.mu.Lock()
|
||||
n := len(sn.buf)
|
||||
sn.mu.Unlock()
|
||||
return float64(n)
|
||||
})
|
||||
storageNodes = append(storageNodes, sn)
|
||||
storageNodesWG.Add(1)
|
||||
go func(addr string) {
|
||||
sn.run()
|
||||
sn.run(storageNodesStopCh)
|
||||
storageNodesWG.Done()
|
||||
}(addr)
|
||||
}
|
||||
|
||||
rerouteWorkerWG.Add(1)
|
||||
go func() {
|
||||
rerouteWorker(rerouteWorkerStopCh)
|
||||
rerouteWorkerWG.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop gracefully stops netstorage.
|
||||
func Stop() {
|
||||
close(stopCh)
|
||||
close(rerouteWorkerStopCh)
|
||||
rerouteWorkerWG.Wait()
|
||||
|
||||
close(storageNodesStopCh)
|
||||
storageNodesWG.Wait()
|
||||
}
|
||||
|
||||
func addToReroutedBuf(buf []byte, rows int) error {
|
||||
reroutedBufMaxSize := memory.Allowed() / 8
|
||||
reroutedLock.Lock()
|
||||
defer reroutedLock.Unlock()
|
||||
if len(reroutedBuf)+len(buf) > reroutedBufMaxSize {
|
||||
reroutedBufOverflows.Inc()
|
||||
return fmt.Errorf("%d rows dropped because of reroutedBuf overflows %d bytes", rows, reroutedBufMaxSize)
|
||||
}
|
||||
reroutedBuf = append(reroutedBuf, buf...)
|
||||
reroutedRows += rows
|
||||
reroutesTotal.Inc()
|
||||
return nil
|
||||
}
|
||||
|
||||
func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) {
|
||||
reroutedLock.Lock()
|
||||
reroutedBuf, swapBuf = swapBuf[:0], reroutedBuf
|
||||
rows := reroutedRows
|
||||
reroutedRows = 0
|
||||
reroutedLock.Unlock()
|
||||
|
||||
if len(swapBuf) == 0 {
|
||||
// Nothing to re-route.
|
||||
return swapBuf, nil
|
||||
}
|
||||
|
||||
healthyStorageNodes := getHealthyStorageNodes()
|
||||
if len(healthyStorageNodes) == 0 {
|
||||
// No more vmstorage nodes to write data to.
|
||||
// Try returning the the data to reroutedBuf if it has enough free space.
|
||||
recovered := false
|
||||
reroutedLock.Lock()
|
||||
if len(swapBuf)+len(reroutedBuf) <= consts.MaxInsertPacketSize {
|
||||
swapBuf = append(swapBuf, reroutedBuf...)
|
||||
reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0]
|
||||
reroutedRows += rows
|
||||
recovered = true
|
||||
}
|
||||
reroutedLock.Unlock()
|
||||
if recovered {
|
||||
return swapBuf, nil
|
||||
}
|
||||
rowsLostTotal.Add(rows)
|
||||
return swapBuf, fmt.Errorf("all the %d vmstorage nodes are unealthy; lost %d rows", len(storageNodes), rows)
|
||||
}
|
||||
|
||||
var mr storage.MetricRow
|
||||
src := swapBuf
|
||||
rowsProcessed := 0
|
||||
for len(src) > 0 {
|
||||
tail, err := mr.Unmarshal(src)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot unmarshal recently marshaled MetricRow: %s", err)
|
||||
}
|
||||
rowBuf := src[:len(src)-len(tail)]
|
||||
src = tail
|
||||
|
||||
// Use non-consistent hashing instead of jump hash in order to re-route rows
|
||||
// equally among healthy vmstorage nodes.
|
||||
// This should spread the increased load among healthy vmstorage nodes.
|
||||
h := xxhash.Sum64(mr.MetricNameRaw)
|
||||
idx := h % uint64(len(healthyStorageNodes))
|
||||
attempts := 0
|
||||
for {
|
||||
sn := healthyStorageNodes[idx]
|
||||
err := sn.sendReroutedRow(rowBuf)
|
||||
if err == nil {
|
||||
sn.rowsReroutedToHere.Inc()
|
||||
break
|
||||
}
|
||||
|
||||
// Cannot send data to sn. Try sending to the next vmstorage node.
|
||||
idx++
|
||||
if idx >= uint64(len(healthyStorageNodes)) {
|
||||
idx = 0
|
||||
}
|
||||
attempts++
|
||||
if attempts == len(healthyStorageNodes) {
|
||||
// There are no healthy nodes.
|
||||
// Try returning the remaining data to reroutedBuf if it has enough free space.
|
||||
rowsRemaining := rows - rowsProcessed
|
||||
recovered := false
|
||||
reroutedLock.Lock()
|
||||
if len(rowBuf)+len(tail)+len(reroutedBuf) <= consts.MaxInsertPacketSize {
|
||||
swapBuf = append(swapBuf[:0], rowBuf...)
|
||||
swapBuf = append(swapBuf, tail...)
|
||||
swapBuf = append(swapBuf, reroutedBuf...)
|
||||
reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0]
|
||||
reroutedRows += rowsRemaining
|
||||
recovered = true
|
||||
}
|
||||
reroutedLock.Unlock()
|
||||
if recovered {
|
||||
return swapBuf, nil
|
||||
}
|
||||
rowsLostTotal.Add(rowsRemaining)
|
||||
return swapBuf, fmt.Errorf("all the %d vmstorage nodes are unavailable; lost %d rows; last error: %s", len(storageNodes), rowsRemaining, err)
|
||||
}
|
||||
}
|
||||
rowsProcessed++
|
||||
}
|
||||
if rowsProcessed != rows {
|
||||
logger.Panicf("BUG: unexpected number of rows processed; got %d; want %d", rowsProcessed, rows)
|
||||
}
|
||||
reroutedRowsProcessed.Add(rowsProcessed)
|
||||
return swapBuf, nil
|
||||
}
|
||||
|
||||
var (
|
||||
reroutedLock sync.Mutex
|
||||
reroutedBuf []byte
|
||||
reroutedRows int
|
||||
|
||||
reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`)
|
||||
reroutedBufOverflows = metrics.NewCounter(`vm_rpc_rerouted_buf_overflows_total{name="vminsert"}`)
|
||||
reroutesTotal = metrics.NewCounter(`vm_rpc_reroutes_total{name="vminsert"}`)
|
||||
_ = metrics.NewGauge(`vm_rpc_rerouted_rows_pending{name="vminsert"}`, func() float64 {
|
||||
reroutedLock.Lock()
|
||||
n := reroutedRows
|
||||
reroutedLock.Unlock()
|
||||
return float64(n)
|
||||
})
|
||||
_ = metrics.NewGauge(`vm_rpc_rerouted_buf_pending_bytes{name="vminsert"}`, func() float64 {
|
||||
reroutedLock.Lock()
|
||||
n := len(reroutedBuf)
|
||||
reroutedLock.Unlock()
|
||||
return float64(n)
|
||||
})
|
||||
|
||||
rerouteErrors = metrics.NewCounter(`vm_rpc_reroute_errors_total{name="vminsert"}`)
|
||||
rowsLostTotal = metrics.NewCounter(`vm_rpc_rows_lost_total{name="vminsert"}`)
|
||||
)
|
||||
|
||||
func getHealthyStorageNodes() []*storageNode {
|
||||
sns := make([]*storageNode, 0, len(storageNodes)-1)
|
||||
for _, sn := range storageNodes {
|
||||
sn.mu.Lock()
|
||||
if !sn.broken {
|
||||
sns = append(sns, sn)
|
||||
}
|
||||
sn.mu.Unlock()
|
||||
}
|
||||
return sns
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user