package netstorage

import (
	"errors"
	"flag"
	"fmt"
	"io"
	"net"
	"sync"
	"sync/atomic"
	"time"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
	"github.com/VictoriaMetrics/metrics"
	"github.com/cespare/xxhash/v2"
)

var (
	disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Whether to disable compression for the data sent from vminsert to vmstorage. This reduces CPU usage at the cost of higher network bandwidth usage")
	replicationFactor     = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+
		"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+
		"Higher values for -dedup.minScrapeInterval at vmselect is OK")
	disableRerouting      = flag.Bool("disableRerouting", true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate. See also -dropSamplesOnOverload")
	dropSamplesOnOverload = flag.Bool("dropSamplesOnOverload", false, "Whether to drop incoming samples if the destination vmstorage node is overloaded and/or unavailable. This prioritizes cluster availability over consistency, e.g. the cluster continues accepting all the ingested samples, but some of them may be dropped if vmstorage nodes are temporarily unavailable and/or overloaded. The drop of samples happens before the replication, so it's not recommended to use this flag with -replicationFactor enabled.")
	vmstorageDialTimeout  = flag.Duration("vmstorageDialTimeout", 3*time.Second, "Timeout for establishing RPC connections from vminsert to vmstorage. "+
		"See also -vmstorageUserTimeout")
	vmstorageUserTimeout = flag.Duration("vmstorageUserTimeout", 3*time.Second, "Network timeout for RPC connections from vminsert to vmstorage (Linux only). "+
		"Lower values speed up re-rerouting recovery when some of vmstorage nodes become unavailable because of networking issues. "+
		"Read more about TCP_USER_TIMEOUT at https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/ . "+
		"See also -vmstorageDialTimeout")
)

var errStorageReadOnly = errors.New("storage node is read only")

func (sn *storageNode) isReady() bool {
	return atomic.LoadUint32(&sn.broken) == 0 && atomic.LoadUint32(&sn.isReadOnly) == 0
}

// push pushes buf to sn internal bufs.
//
// This function doesn't block on fast path.
// It may block only if storage nodes cannot handle the incoming ingestion rate.
// This blocking provides backpressure to the caller.
//
// The function falls back to sending data to other vmstorage nodes
// if sn is currently unavailable or overloaded.
//
// rows must match the number of rows in the buf.
func (sn *storageNode) push(snb *storageNodesBucket, buf []byte, rows int) error {
	if len(buf) > maxBufSizePerStorageNode {
		logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode)
	}
	sn.rowsPushed.Add(rows)
	if sn.trySendBuf(buf, rows) {
		// Fast path - the buffer is successfully sent to sn.
		return nil
	}
	if *dropSamplesOnOverload && atomic.LoadUint32(&sn.isReadOnly) == 0 {
		sn.rowsDroppedOnOverload.Add(rows)
		dropSamplesOnOverloadLogger.Warnf("some rows dropped, because -dropSamplesOnOverload is set and vmstorage %s cannot accept new rows now. "+
			"See vm_rpc_rows_dropped_on_overload_total metric at /metrics page", sn.dialer.Addr())
		return nil
	}
	// Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes.
	if err := sn.rerouteBufToOtherStorageNodes(snb, buf, rows); err != nil {
		return fmt.Errorf("error when re-routing rows from %s: %w", sn.dialer.Addr(), err)
	}
	return nil
}

var dropSamplesOnOverloadLogger = logger.WithThrottler("droppedSamplesOnOverload", 5*time.Second)

func (sn *storageNode) rerouteBufToOtherStorageNodes(snb *storageNodesBucket, buf []byte, rows int) error {
	sns := snb.sns
	sn.brLock.Lock()
again:
	select {
	case <-sn.stopCh:
		sn.brLock.Unlock()
		return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows)
	default:
	}
	if !sn.isReady() {
		if len(sns) == 1 {
			// There are no other storage nodes to re-route to. So wait until the current node becomes healthy.
			sn.brCond.Wait()
			goto again
		}
		sn.brLock.Unlock()
		// The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set.
		rowsProcessed, err := rerouteRowsToReadyStorageNodes(snb, sn, buf)
		rows -= rowsProcessed
		if err != nil {
			return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
		}
		return nil
	}
	if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
		// Fast path: the buf contents fits sn.buf.
		sn.br.buf = append(sn.br.buf, buf...)
		sn.br.rows += rows
		sn.brLock.Unlock()
		return nil
	}
	// Slow path: the buf contents doesn't fit sn.buf, so try re-routing it to other vmstorage nodes.
	if *disableRerouting || len(sns) == 1 {
		sn.brCond.Wait()
		goto again
	}
	sn.brLock.Unlock()
	rowsProcessed, err := rerouteRowsToFreeStorageNodes(snb, sn, buf)
	rows -= rowsProcessed
	if err != nil {
		return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err)
	}
	return nil
}

var closedCh = func() <-chan struct{} {
	ch := make(chan struct{})
	close(ch)
	return ch
}()

func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
	replicas := *replicationFactor
	if replicas <= 0 {
		replicas = 1
	}
	sns := snb.sns
	if replicas > len(sns) {
		replicas = len(sns)
	}

	sn.readOnlyCheckerWG.Add(1)
	go func() {
		defer sn.readOnlyCheckerWG.Done()
		sn.readOnlyChecker()
	}()
	defer sn.readOnlyCheckerWG.Wait()

	ticker := time.NewTicker(200 * time.Millisecond)
	defer ticker.Stop()
	var br bufRows
	brLastResetTime := fasttime.UnixTimestamp()
	var waitCh <-chan struct{}
	mustStop := false
	for !mustStop {
		sn.brLock.Lock()
		bufLen := len(sn.br.buf)
		sn.brLock.Unlock()
		waitCh = nil
		if bufLen > 0 {
			// Do not sleep if sn.br.buf isn't empty.
			waitCh = closedCh
		}
		select {
		case <-sn.stopCh:
			mustStop = true
			// Make sure the sn.buf is flushed last time before returning
			// in order to send the remaining bits of data.
		case <-ticker.C:
		case <-waitCh:
		}
		sn.brLock.Lock()
		sn.br, br = br, sn.br
		sn.brCond.Broadcast()
		sn.brLock.Unlock()
		currentTime := fasttime.UnixTimestamp()
		if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
			// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
			br.buf = append(br.buf[:0:0], br.buf...)
			brLastResetTime = currentTime
		}
		sn.checkHealth()
		if len(br.buf) == 0 {
			// Nothing to send.
			continue
		}
		// Send br to replicas storage nodes starting from snIdx.
		for !sendBufToReplicasNonblocking(snb, &br, snIdx, replicas) {
			t := timerpool.Get(200 * time.Millisecond)
			select {
			case <-sn.stopCh:
				timerpool.Put(t)
				return
			case <-t.C:
				timerpool.Put(t)
				sn.checkHealth()
			}
		}
		br.reset()
	}
}

func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, replicas int) bool {
	usedStorageNodes := make(map[*storageNode]struct{}, replicas)
	sns := snb.sns
	for i := 0; i < replicas; i++ {
		idx := snIdx + i
		attempts := 0
		for {
			attempts++
			if attempts > len(sns) {
				if i == 0 {
					// The data wasn't replicated at all.
					cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
						"re-trying to send the data soon", len(br.buf), br.rows)
					return false
				}
				// The data is partially replicated, so just emit a warning and return true.
				// We could retry sending the data again, but this may result in uncontrolled duplicate data.
				// So it is better returning true.
				rowsIncompletelyReplicatedTotal.Add(br.rows)
				incompleteReplicationLogger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
					"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
				return true
			}
			if idx >= len(sns) {
				idx %= len(sns)
			}
			sn := sns[idx]
			idx++
			if _, ok := usedStorageNodes[sn]; ok {
				// The br has been already replicated to sn. Skip it.
				continue
			}
			if !sn.sendBufRowsNonblocking(br) {
				// Cannot send data to sn. Go to the next sn.
				continue
			}
			// Successfully sent data to sn.
			usedStorageNodes[sn] = struct{}{}
			break
		}
	}
	return true
}

var (
	cannotReplicateLogger       = logger.WithThrottler("cannotReplicateDataBecauseNoStorageNodes", 5*time.Second)
	incompleteReplicationLogger = logger.WithThrottler("incompleteReplication", 5*time.Second)
)

func (sn *storageNode) checkHealth() {
	sn.bcLock.Lock()
	defer sn.bcLock.Unlock()

	if sn.bc != nil {
		// The sn looks healthy.
		return
	}
	bc, err := sn.dial()
	if err != nil {
		atomic.StoreUint32(&sn.broken, 1)
		sn.brCond.Broadcast()
		if sn.lastDialErr == nil {
			// Log the error only once.
			sn.lastDialErr = err
			logger.Warnf("cannot dial storageNode %q: %s", sn.dialer.Addr(), err)
		}
		return
	}
	logger.Infof("successfully dialed -storageNode=%q", sn.dialer.Addr())
	sn.lastDialErr = nil
	sn.bc = bc
	atomic.StoreUint32(&sn.broken, 0)
	sn.brCond.Broadcast()
}

func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
	if !sn.isReady() {
		return false
	}
	sn.bcLock.Lock()
	defer sn.bcLock.Unlock()

	if sn.bc == nil {
		// Do not call sn.dial() here in order to prevent long blocking on sn.bcLock.Lock(),
		// which can negatively impact data sending in sendBufToReplicasNonblocking().
		// sn.dial() should be called by sn.checkHealth() on unsuccessful call to sendBufToReplicasNonblocking().
		return false
	}
	startTime := time.Now()
	err := sendToConn(sn.bc, br.buf)
	duration := time.Since(startTime)
	sn.sendDurationSeconds.Add(duration.Seconds())
	if err == nil {
		// Successfully sent buf to bc.
		sn.rowsSent.Add(br.rows)
		return true
	}
	if errors.Is(err, errStorageReadOnly) {
		// The vmstorage is transitioned to readonly mode.
		atomic.StoreUint32(&sn.isReadOnly, 1)
		sn.brCond.Broadcast()
		// Signal the caller that the data wasn't accepted by the vmstorage,
		// so it will be re-routed to the remaining vmstorage nodes.
		return false
	}
	// Couldn't flush buf to sn. Mark sn as broken.
	cannotSendBufsLogger.Warnf("cannot send %d bytes with %d rows to -storageNode=%q: %s; closing the connection to storageNode and "+
		"re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err)
	if err = sn.bc.Close(); err != nil {
		cannotCloseStorageNodeConnLogger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
	}
	sn.bc = nil
	atomic.StoreUint32(&sn.broken, 1)
	sn.brCond.Broadcast()
	sn.connectionErrors.Inc()
	return false
}

var cannotCloseStorageNodeConnLogger = logger.WithThrottler("cannotCloseStorageNodeConn", 5*time.Second)

var cannotSendBufsLogger = logger.WithThrottler("cannotSendBufRows", 5*time.Second)

func sendToConn(bc *handshake.BufferedConn, buf []byte) error {
	// if len(buf) == 0, it must be sent to the vmstorage too in order to check for vmstorage health
	// See checkReadOnlyMode() and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4870

	timeoutSeconds := len(buf) / 3e5
	if timeoutSeconds < 60 {
		timeoutSeconds = 60
	}
	timeout := time.Duration(timeoutSeconds) * time.Second
	deadline := time.Now().Add(timeout)
	if err := bc.SetWriteDeadline(deadline); err != nil {
		return fmt.Errorf("cannot set write deadline to %s: %w", deadline, err)
	}
	// sizeBuf guarantees that the rows batch will be either fully
	// read or fully discarded on the vmstorage side.
	// sizeBuf is used for read optimization in vmstorage.
	sizeBuf := sizeBufPool.Get()
	defer sizeBufPool.Put(sizeBuf)
	sizeBuf.B = encoding.MarshalUint64(sizeBuf.B[:0], uint64(len(buf)))
	if _, err := bc.Write(sizeBuf.B); err != nil {
		return fmt.Errorf("cannot write data size %d: %w", len(buf), err)
	}
	if _, err := bc.Write(buf); err != nil {
		return fmt.Errorf("cannot write data with size %d: %w", len(buf), err)
	}
	if err := bc.Flush(); err != nil {
		return fmt.Errorf("cannot flush data with size %d: %w", len(buf), err)
	}

	// Wait for `ack` from vmstorage.
	// This guarantees that the message has been fully received by vmstorage.
	deadline = time.Now().Add(timeout)
	if err := bc.SetReadDeadline(deadline); err != nil {
		return fmt.Errorf("cannot set read deadline for reading `ack` to vmstorage: %w", err)
	}
	if _, err := io.ReadFull(bc, sizeBuf.B[:1]); err != nil {
		return fmt.Errorf("cannot read `ack` from vmstorage: %w", err)
	}

	ackResp := sizeBuf.B[0]
	switch ackResp {
	case 1:
		// ok response, data successfully accepted by vmstorage
	case 2:
		// vmstorage is in readonly mode
		return errStorageReadOnly
	default:
		return fmt.Errorf("unexpected `ack` received from vmstorage; got %d; want 1 or 2", sizeBuf.B[0])
	}

	return nil
}

var sizeBufPool bytesutil.ByteBufferPool

func (sn *storageNode) dial() (*handshake.BufferedConn, error) {
	c, err := sn.dialer.Dial()
	if err != nil {
		sn.dialErrors.Inc()
		return nil, err
	}
	compressionLevel := 1
	if *disableRPCCompression {
		compressionLevel = 0
	}
	bc, err := handshake.VMInsertClient(c, compressionLevel)
	if err != nil {
		_ = c.Close()
		sn.handshakeErrors.Inc()
		return nil, fmt.Errorf("handshake error: %w", err)
	}
	return bc, nil
}

// storageNode is a client sending data to vmstorage node.
type storageNode struct {
	// broken is set to non-zero if the given vmstorage node is temporarily unhealthy.
	// In this case the data is re-routed to the remaining healthy vmstorage nodes.
	broken uint32

	// isReadOnly is set to non-zero if the given vmstorage node is read only
	// In this case the data is re-routed to the remaining healthy vmstorage nodes.
	isReadOnly uint32

	// brLock protects br.
	brLock sync.Mutex

	// brCond is used for waiting for free space in br.
	brCond *sync.Cond

	// Buffer with data that needs to be written to the storage node.
	// It must be accessed under brLock.
	br bufRows

	// bcLock protects bc.
	bcLock sync.Mutex

	// waitGroup for readOnlyChecker
	readOnlyCheckerWG sync.WaitGroup

	// bc is a single connection to vmstorage for data transfer.
	// It must be accessed under bcLock.
	bc *handshake.BufferedConn

	dialer *netutil.TCPDialer

	stopCh chan struct{}

	// last error during dial.
	lastDialErr error

	// The number of dial errors to vmstorage node.
	dialErrors *metrics.Counter

	// The number of handshake errors to vmstorage node.
	handshakeErrors *metrics.Counter

	// The number of connection errors to vmstorage node.
	connectionErrors *metrics.Counter

	// The number of rows pushed to storageNode with push method.
	rowsPushed *metrics.Counter

	// The number of rows sent to vmstorage node.
	rowsSent *metrics.Counter

	// The number of rows dropped on overload if -dropSamplesOnOverload is set.
	rowsDroppedOnOverload *metrics.Counter

	// The number of rows rerouted from the given vmstorage node
	// to healthy nodes when the given node was unhealthy.
	rowsReroutedFromHere *metrics.Counter

	// The number of rows rerouted to the given vmstorage node
	// from other nodes when they were unhealthy.
	rowsReroutedToHere *metrics.Counter

	// The total duration spent for sending data to vmstorage node.
	// This metric is useful for determining the saturation of vminsert->vmstorage link.
	sendDurationSeconds *metrics.FloatCounter
}

type storageNodesBucket struct {
	ms *metrics.Set

	// nodesHash is used for consistently selecting a storage node by key.
	nodesHash *consistentHash

	// sns is a list of storage nodes.
	sns []*storageNode

	stopCh chan struct{}
	wg     *sync.WaitGroup
}

// storageNodes contains a list of vmstorage node clients.
var storageNodes atomic.Pointer[storageNodesBucket]

func getStorageNodesBucket() *storageNodesBucket {
	return storageNodes.Load()
}

func setStorageNodesBucket(snb *storageNodesBucket) {
	storageNodes.Store(snb)
}

// Init initializes vmstorage nodes' connections to the given addrs.
//
// hashSeed is used for changing the distribution of input time series among addrs.
//
// Call MustStop when the initialized vmstorage connections are no longer needed.
func Init(addrs []string, hashSeed uint64) {
	snb := initStorageNodes(addrs, hashSeed)
	setStorageNodesBucket(snb)
}

// MustStop stops netstorage.
func MustStop() {
	snb := getStorageNodesBucket()
	mustStopStorageNodes(snb)
}

func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket {
	if len(addrs) == 0 {
		logger.Panicf("BUG: addrs must be non-empty")
	}
	ms := metrics.NewSet()
	nodesHash := newConsistentHash(addrs, hashSeed)
	sns := make([]*storageNode, 0, len(addrs))
	stopCh := make(chan struct{})
	for _, addr := range addrs {
		if _, _, err := net.SplitHostPort(addr); err != nil {
			// Automatically add missing port.
			addr += ":8400"
		}
		sn := &storageNode{
			dialer: netutil.NewTCPDialer(ms, "vminsert", addr, *vmstorageDialTimeout, *vmstorageUserTimeout),

			stopCh: stopCh,

			dialErrors:            ms.NewCounter(fmt.Sprintf(`vm_rpc_dial_errors_total{name="vminsert", addr=%q}`, addr)),
			handshakeErrors:       ms.NewCounter(fmt.Sprintf(`vm_rpc_handshake_errors_total{name="vminsert", addr=%q}`, addr)),
			connectionErrors:      ms.NewCounter(fmt.Sprintf(`vm_rpc_connection_errors_total{name="vminsert", addr=%q}`, addr)),
			rowsPushed:            ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_pushed_total{name="vminsert", addr=%q}`, addr)),
			rowsSent:              ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_sent_total{name="vminsert", addr=%q}`, addr)),
			rowsDroppedOnOverload: ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_dropped_on_overload_total{name="vminsert", addr=%q}`, addr)),
			rowsReroutedFromHere:  ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_from_here_total{name="vminsert", addr=%q}`, addr)),
			rowsReroutedToHere:    ms.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)),
			sendDurationSeconds:   ms.NewFloatCounter(fmt.Sprintf(`vm_rpc_send_duration_seconds_total{name="vminsert", addr=%q}`, addr)),
		}
		sn.brCond = sync.NewCond(&sn.brLock)
		_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_rows_pending{name="vminsert", addr=%q}`, addr), func() float64 {
			sn.brLock.Lock()
			n := sn.br.rows
			sn.brLock.Unlock()
			return float64(n)
		})
		_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_buf_pending_bytes{name="vminsert", addr=%q}`, addr), func() float64 {
			sn.brLock.Lock()
			n := len(sn.br.buf)
			sn.brLock.Unlock()
			return float64(n)
		})
		_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_reachable{name="vminsert", addr=%q}`, addr), func() float64 {
			if atomic.LoadUint32(&sn.broken) != 0 {
				return 0
			}
			return 1
		})
		_ = ms.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_read_only{name="vminsert", addr=%q}`, addr), func() float64 {
			return float64(atomic.LoadUint32(&sn.isReadOnly))
		})
		sns = append(sns, sn)
	}

	maxBufSizePerStorageNode = memory.Allowed() / 8 / len(sns)
	if maxBufSizePerStorageNode > consts.MaxInsertPacketSizeForVMInsert {
		maxBufSizePerStorageNode = consts.MaxInsertPacketSizeForVMInsert
	}

	metrics.RegisterSet(ms)
	var wg sync.WaitGroup
	snb := &storageNodesBucket{
		ms:        ms,
		nodesHash: nodesHash,
		sns:       sns,
		stopCh:    stopCh,
		wg:        &wg,
	}

	for idx, sn := range sns {
		wg.Add(1)
		go func(sn *storageNode, idx int) {
			sn.run(snb, idx)
			wg.Done()
		}(sn, idx)
	}

	return snb
}

func mustStopStorageNodes(snb *storageNodesBucket) {
	close(snb.stopCh)
	for _, sn := range snb.sns {
		sn.brCond.Broadcast()
	}
	snb.wg.Wait()
	metrics.UnregisterSet(snb.ms)
	snb.ms.UnregisterAllMetrics()
}

// rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes.
//
// The function blocks until src is fully re-routed.
func rerouteRowsToReadyStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) {
	reroutesTotal.Inc()
	rowsProcessed := 0
	var idxsExclude, idxsExcludeNew []int
	nodesHash := snb.nodesHash
	sns := snb.sns
	idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0], nil)
	var mr storage.MetricRow
	for len(src) > 0 {
		tail, err := mr.UnmarshalX(src)
		if err != nil {
			logger.Panicf("BUG: cannot unmarshal MetricRow: %s", err)
		}
		rowBuf := src[:len(src)-len(tail)]
		src = tail
		reroutedRowsProcessed.Inc()
		h := xxhash.Sum64(mr.MetricNameRaw)
		mr.ResetX()
		var sn *storageNode
		for {
			idx := nodesHash.getNodeIdx(h, idxsExclude)
			sn = sns[idx]
			if sn.isReady() {
				break
			}
			// re-generate idxsExclude list, since sn must be put there.
			idxsExclude = getNotReadyStorageNodeIdxsBlocking(snb, idxsExclude[:0], nil)
		}
		if *disableRerouting {
			if !sn.sendBufMayBlock(rowBuf) {
				return rowsProcessed, fmt.Errorf("graceful shutdown started")
			}
			rowsProcessed++
			if sn != snSource {
				snSource.rowsReroutedFromHere.Inc()
				sn.rowsReroutedToHere.Inc()
			}
			continue
		}
	again:
		if sn.trySendBuf(rowBuf, 1) {
			rowsProcessed++
			if sn != snSource {
				snSource.rowsReroutedFromHere.Inc()
				sn.rowsReroutedToHere.Inc()
			}
			continue
		}
		// If the re-routing is enabled, then try sending the row to another storage node.
		idxsExcludeNew = getNotReadyStorageNodeIdxs(snb, idxsExcludeNew[:0], sn)
		idx := nodesHash.getNodeIdx(h, idxsExcludeNew)
		snNew := sns[idx]
		if snNew.trySendBuf(rowBuf, 1) {
			rowsProcessed++
			if snNew != snSource {
				snSource.rowsReroutedFromHere.Inc()
				snNew.rowsReroutedToHere.Inc()
			}
			continue
		}
		// The row cannot be sent to both snSource and the re-routed sn without blocking.
		// Sleep for a while and try sending the row to snSource again.
		time.Sleep(100 * time.Millisecond)
		goto again
	}
	return rowsProcessed, nil
}

// reouteRowsToFreeStorageNodes re-routes src from snSource to other storage nodes.
//
// It is expected that snSource has no enough buffer for sending src.
// It is expected than *dsableRerouting isn't set when calling this function.
func rerouteRowsToFreeStorageNodes(snb *storageNodesBucket, snSource *storageNode, src []byte) (int, error) {
	if *disableRerouting {
		logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes")
	}
	reroutesTotal.Inc()
	rowsProcessed := 0
	var idxsExclude []int
	nodesHash := snb.nodesHash
	sns := snb.sns
	idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource)
	var mr storage.MetricRow
	for len(src) > 0 {
		tail, err := mr.UnmarshalX(src)
		if err != nil {
			logger.Panicf("BUG: cannot unmarshal MetricRow: %s", err)
		}
		rowBuf := src[:len(src)-len(tail)]
		src = tail
		reroutedRowsProcessed.Inc()
		h := xxhash.Sum64(mr.MetricNameRaw)
		mr.ResetX()
		// Try sending the row to snSource in order to minimize re-routing.
	again:
		if snSource.trySendBuf(rowBuf, 1) {
			rowsProcessed++
			continue
		}
		// The row couldn't be sent to snSrouce. Try re-routing it to other nodes.
		var sn *storageNode
		for {
			idx := nodesHash.getNodeIdx(h, idxsExclude)
			sn = sns[idx]
			if sn.isReady() {
				break
			}
			// re-generate idxsExclude list, since sn must be put there.
			idxsExclude = getNotReadyStorageNodeIdxs(snb, idxsExclude[:0], snSource)
		}
		if sn.trySendBuf(rowBuf, 1) {
			rowsProcessed++
			snSource.rowsReroutedFromHere.Inc()
			sn.rowsReroutedToHere.Inc()
			continue
		}
		// The row cannot be sent to both snSource and the re-routed sn without blocking.
		// Sleep for a while and try sending the row to snSource again.
		time.Sleep(100 * time.Millisecond)
		goto again
	}
	return rowsProcessed, nil
}

func getNotReadyStorageNodeIdxsBlocking(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int {
	dst = getNotReadyStorageNodeIdxs(snb, dst[:0], snExtra)
	sns := snb.sns
	if len(dst) < len(sns) {
		return dst
	}
	noStorageNodesLogger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available")
	for {
		time.Sleep(time.Second)
		dst = getNotReadyStorageNodeIdxs(snb, dst[:0], snExtra)
		if availableNodes := len(sns) - len(dst); availableNodes > 0 {
			storageNodesBecameAvailableLogger.Warnf("%d vmstorage nodes became available, so continue data processing", availableNodes)
			return dst
		}
	}
}

var storageNodesBecameAvailableLogger = logger.WithThrottler("storageNodesBecameAvailable", 5*time.Second)

var noStorageNodesLogger = logger.WithThrottler("storageNodesUnavailable", 5*time.Second)

func getNotReadyStorageNodeIdxs(snb *storageNodesBucket, dst []int, snExtra *storageNode) []int {
	dst = dst[:0]
	for i, sn := range snb.sns {
		if sn == snExtra || !sn.isReady() {
			dst = append(dst, i)
		}
	}
	return dst
}

func (sn *storageNode) trySendBuf(buf []byte, rows int) bool {
	sent := false
	sn.brLock.Lock()
	if sn.isReady() && len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
		sn.br.buf = append(sn.br.buf, buf...)
		sn.br.rows += rows
		sent = true
	}
	sn.brLock.Unlock()
	return sent
}

func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
	sn.brLock.Lock()
	for len(sn.br.buf)+len(buf) > maxBufSizePerStorageNode {
		select {
		case <-sn.stopCh:
			sn.brLock.Unlock()
			return false
		default:
		}
		sn.brCond.Wait()
	}
	sn.br.buf = append(sn.br.buf, buf...)
	sn.br.rows++
	sn.brLock.Unlock()
	return true
}

func (sn *storageNode) readOnlyChecker() {
	ticker := time.NewTicker(time.Second * 30)
	defer ticker.Stop()
	for {
		select {
		case <-sn.stopCh:
			return
		case <-ticker.C:
			sn.checkReadOnlyMode()
		}
	}
}

func (sn *storageNode) checkReadOnlyMode() {
	if atomic.LoadUint32(&sn.isReadOnly) == 0 {
		// fast path - the sn isn't in readonly mode
		return
	}
	// Check whether the storage remains in readonly mode
	sn.bcLock.Lock()
	defer sn.bcLock.Unlock()
	if sn.bc == nil {
		return
	}
	// send nil buff to check ack response from storage
	err := sendToConn(sn.bc, nil)
	if err == nil {
		// The storage switched from readonly to non-readonly mode
		atomic.StoreUint32(&sn.isReadOnly, 0)
		return
	}
	if errors.Is(err, errStorageReadOnly) {
		// The storage remains in read-only mode
		return
	}

	// There was an error when sending nil buf to the storage.
	logger.Errorf("cannot check storage readonly mode for -storageNode=%q: %s", sn.dialer.Addr(), err)

	// Mark the connection to the storage as broken.
	if err = sn.bc.Close(); err != nil {
		cannotCloseStorageNodeConnLogger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err)
	}
	sn.bc = nil
	atomic.StoreUint32(&sn.broken, 1)
	sn.brCond.Broadcast()
	sn.connectionErrors.Inc()
}

var (
	maxBufSizePerStorageNode int

	reroutedRowsProcessed           = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`)
	reroutesTotal                   = metrics.NewCounter(`vm_rpc_reroutes_total{name="vminsert"}`)
	rowsIncompletelyReplicatedTotal = metrics.NewCounter(`vm_rpc_rows_incompletely_replicated_total{name="vminsert"}`)
)