diff --git a/app/vminsert/netstorage/insert_ctx.go b/app/vminsert/netstorage/insert_ctx.go index 1a7728d3aa..54639ad096 100644 --- a/app/vminsert/netstorage/insert_ctx.go +++ b/app/vminsert/netstorage/insert_ctx.go @@ -1,7 +1,6 @@ package netstorage import ( - "flag" "fmt" "net/http" @@ -15,10 +14,6 @@ import ( jump "github.com/lithammer/go-jump-consistent-hash" ) -var replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+ - "Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+ - "Higher values for -dedup.minScrapeInterval at vmselect is OK") - // InsertCtx is a generic context for inserting data. // // InsertCtx.Reset must be called before the first usage. @@ -120,38 +115,20 @@ func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, time // WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx. func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error { - idx := storageNodeIdx - replicas := *replicationFactor - if replicas <= 0 { - replicas = 1 - } - if replicas > len(storageNodes) { - replicas = len(storageNodes) - } - for { - br := &ctx.bufRowss[idx] - sn := storageNodes[idx] - bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value) - if len(bufNew) >= maxBufSizePerStorageNode { - // Send buf to storageNode, since it is too big. - if err := br.pushTo(sn); err != nil { - return err - } - br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value) - } else { - br.buf = bufNew - } - br.rows++ - - replicas-- - if replicas == 0 { - return nil - } - idx++ - if idx >= len(storageNodes) { - idx = 0 + br := &ctx.bufRowss[storageNodeIdx] + sn := storageNodes[storageNodeIdx] + bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value) + if len(bufNew) >= maxBufSizePerStorageNode { + // Send buf to storageNode, since it is too big. + if err := br.pushTo(sn); err != nil { + return err } + br.buf = storage.MarshalMetricRow(bufNew[:0], metricNameRaw, timestamp, value) + } else { + br.buf = bufNew } + br.rows++ + return nil } // FlushBufs flushes ctx bufs to remote storage nodes. diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index a33e8adf7c..2aa64098c8 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -21,7 +21,12 @@ import ( jump "github.com/lithammer/go-jump-consistent-hash" ) -var disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage") +var ( + disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage") + replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+ + "Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+ + "Higher values for -dedup.minScrapeInterval at vmselect is OK") +) func (sn *storageNode) isBroken() bool { return atomic.LoadUint32(&sn.broken) != 0 @@ -78,12 +83,18 @@ var closedCh = func() <-chan struct{} { return ch }() -func (sn *storageNode) run(stopCh <-chan struct{}) { +func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) { + replicas := *replicationFactor + if replicas <= 0 { + replicas = 1 + } + if replicas > len(storageNodes) { + replicas = len(storageNodes) + } + ticker := time.NewTicker(time.Second) defer ticker.Stop() var br bufRows - var bc *handshake.BufferedConn - var err error var waitCh <-chan struct{} mustStop := false for !mustStop { @@ -108,44 +119,94 @@ func (sn *storageNode) run(stopCh <-chan struct{}) { sn.br, br = br, sn.br sn.brLock.Unlock() } - if bc == nil { - bc, err = sn.dial() - if err != nil { - // Mark sn as broken in order to prevent sending additional data to it until it is recovered. - atomic.StoreUint32(&sn.broken, 1) - if len(br.buf) == 0 { - continue - } - logger.Warnf("re-routing %d bytes with %d rows to other storage nodes because cannot dial storageNode %q: %s", - len(br.buf), br.rows, sn.dialer.Addr(), err) - if addToReroutedBufNonblock(br.buf, br.rows) { - sn.rowsReroutedFromHere.Add(br.rows) - br.reset() - } - continue - } - } - if err = sendToConn(bc, br.buf); err == nil { - // Successfully sent buf to bc. Remove broken flag from sn. - atomic.StoreUint32(&sn.broken, 0) - sn.rowsSent.Add(br.rows) - br.reset() + if len(br.buf) == 0 { + // Nothing to send. continue } - // Couldn't flush buf to sn. Mark sn as broken - // and try re-routing buf to healthy vmstorage nodes. - if err = bc.Close(); err != nil { - logger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err) - // continue executing the code below. + + // Send br to replicas storageNodes starting from snIdx. + if !sendBufToReplicas(&br, snIdx, replicas) { + // do not reset br in the hope it will be sent next time. + continue } - bc = nil - sn.connectionErrors.Inc() - atomic.StoreUint32(&sn.broken, 1) - if addToReroutedBufNonblock(br.buf, br.rows) { - sn.rowsReroutedFromHere.Add(br.rows) - br.reset() + br.reset() + } +} + +func sendBufToReplicas(br *bufRows, snIdx, replicas int) bool { + usedStorageNodes := make(map[*storageNode]bool, replicas) + for i := 0; i < replicas; i++ { + idx := snIdx + i + attempts := 0 + for { + attempts++ + if attempts > len(storageNodes) { + if i == 0 { + // The data wasn't replicated at all. + rowsLostTotal.Add(br.rows) + logger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+ + "re-trying to send the data soon", len(br.buf), br.rows) + return false + } + // The data is partially replicated, so just emit a warning and return true. + // We could retry sending the data again, but this may result in uncontrolled duplicate data. + // So it is better returning true. + rowsIncompletelyReplicatedTotal.Add(br.rows) + logger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+ + "since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows) + return true + } + if idx >= len(storageNodes) { + idx = 0 + } + sn := storageNodes[idx] + idx++ + if usedStorageNodes[sn] { + // The br has been already replicated to sn. Skip it. + continue + } + if !sn.sendBufRows(br) { + // Cannot send data to sn. Go to the next sn. + continue + } + // Successfully sent data to sn. + usedStorageNodes[sn] = true + break } } + return true +} + +func (sn *storageNode) sendBufRows(br *bufRows) bool { + sn.bcLock.Lock() + defer sn.bcLock.Unlock() + + if sn.bc == nil { + bc, err := sn.dial() + if err != nil { + // Mark sn as broken in order to prevent sending additional data to it until it is recovered. + atomic.StoreUint32(&sn.broken, 1) + logger.Warnf("cannot dial storageNode %q: %s", sn.dialer.Addr(), err) + return false + } + sn.bc = bc + } + err := sendToConn(sn.bc, br.buf) + if err == nil { + // Successfully sent buf to bc. Remove broken flag from sn. + atomic.StoreUint32(&sn.broken, 0) + sn.rowsSent.Add(br.rows) + return true + } + // Couldn't flush buf to sn. Mark sn as broken. + logger.Warnf("cannot send %d bytes with %d rows to %q: %s; re-routing this data to healthy storage nodes", len(br.buf), br.rows, sn.dialer.Addr(), err) + if err = sn.bc.Close(); err != nil { + logger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err) + } + sn.bc = nil + sn.connectionErrors.Inc() + atomic.StoreUint32(&sn.broken, 1) + return false } func sendToConn(bc *handshake.BufferedConn, buf []byte) error { @@ -272,8 +333,16 @@ type storageNode struct { brLock sync.Mutex // Buffer with data that needs to be written to the storage node. + // It must be accessed under brLock. br bufRows + // bcLock protects bc. + bcLock sync.Mutex + + // bc is a single connection to vmstorage for data transfer. + // It must be accessed under bcLock. + bc *handshake.BufferedConn + dialer *netutil.TCPDialer // The number of dial errors to vmstorage node. @@ -322,6 +391,7 @@ func InitStorageNodes(addrs []string) { logger.Panicf("BUG: too much addresses: %d; max supported %d addresses", len(addrs), 255) } + storageNodes = storageNodes[:0] for _, addr := range addrs { sn := &storageNode{ dialer: netutil.NewTCPDialer("vminsert", addr), @@ -347,11 +417,6 @@ func InitStorageNodes(addrs []string) { return float64(n) }) storageNodes = append(storageNodes, sn) - storageNodesWG.Add(1) - go func(addr string) { - sn.run(storageNodesStopCh) - storageNodesWG.Done() - }(addr) } maxBufSizePerStorageNode = memory.Allowed() / 8 / len(storageNodes) @@ -365,6 +430,15 @@ func InitStorageNodes(addrs []string) { if reroutedBufMaxSize > maxBufSizePerStorageNode*len(storageNodes) { reroutedBufMaxSize = maxBufSizePerStorageNode * len(storageNodes) } + + for idx, sn := range storageNodes { + storageNodesWG.Add(1) + go func(sn *storageNode, idx int) { + sn.run(storageNodesStopCh, idx) + storageNodesWG.Done() + }(sn, idx) + } + rerouteWorkerWG.Add(1) go func() { rerouteWorker(rerouteWorkerStopCh) @@ -422,24 +496,6 @@ func addToReroutedBuf(buf []byte, rows int) error { return nil } -// addToReroutedBufNonblock adds buf to reroutedBR. -// -// It returns true if buf has been successfully added to reroutedBR. -func addToReroutedBufNonblock(buf []byte, rows int) bool { - if len(buf) > reroutedBufMaxSize { - logger.Panicf("BUG: len(buf)=%d cannot exceed reroutedBufMaxSize=%d", len(buf), reroutedBufMaxSize) - } - reroutedBRLock.Lock() - ok := len(reroutedBR.buf)+len(buf) <= reroutedBufMaxSize - if ok { - reroutedBR.buf = append(reroutedBR.buf, buf...) - reroutedBR.rows += rows - reroutesTotal.Inc() - } - reroutedBRLock.Unlock() - return ok -} - func getHealthyStorageNodesCount() int { n := 0 for _, sn := range storageNodes { @@ -550,6 +606,7 @@ var ( return float64(n) }) - rerouteErrors = metrics.NewCounter(`vm_rpc_reroute_errors_total{name="vminsert"}`) - rowsLostTotal = metrics.NewCounter(`vm_rpc_rows_lost_total{name="vminsert"}`) + rerouteErrors = metrics.NewCounter(`vm_rpc_reroute_errors_total{name="vminsert"}`) + rowsLostTotal = metrics.NewCounter(`vm_rpc_rows_lost_total{name="vminsert"}`) + rowsIncompletelyReplicatedTotal = metrics.NewCounter(`vm_rpc_rows_incompletely_replicated_total{name="vminsert"}`) )