From 75f2f3b09d8971dd3dab57e5fb5eddcb8004221b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 27 May 2020 15:07:16 +0300 Subject: [PATCH] app/vminsert/netstorage: improve ingestion performance when a single vmstorage node is slower than other vmstorage nodes Previously the ingestion performance has been limited by the slowest vmstorage node. Now vminsert should re-route data from the slowest vmstorage node to the remaining nodes. --- README.md | 2 + app/vminsert/netstorage/insert_ctx.go | 37 +- app/vminsert/netstorage/netstorage.go | 525 ++++++++++++++------------ docs/Cluster-VictoriaMetrics.md | 2 + 4 files changed, 306 insertions(+), 260 deletions(-) diff --git a/README.md b/README.md index 8135933df0..a5a93f33a4 100644 --- a/README.md +++ b/README.md @@ -250,6 +250,8 @@ Each instance type - `vminsert`, `vmselect` and `vmstorage` - can run on the mos * The recommended total number of vCPU cores for all the `vminsert` instances can be calculated from the ingestion rate: `vCPUs = ingestion_rate / 150K`. * The recommended number of vCPU cores per each `vminsert` instance should equal to the number of `vmstorage` instances in the cluster. * The amount of RAM per each `vminsert` instance should be 1GB or more. RAM is used as a buffer for spikes in ingestion rate. + The maximum amount of used RAM per `vminsert` node can be tuned with `-memory.allowedPercent` command-line flag. For instance, `-memory.allowedPercent=20` + limits the maximum amount of used RAM to 20% of the available RAM on the host system. * Sometimes `-rpc.disableCompression` command-line flag on `vminsert` instances could increase ingestion capacity at the cost of higher network bandwidth usage between `vminsert` and `vmstorage`. diff --git a/app/vminsert/netstorage/insert_ctx.go b/app/vminsert/netstorage/insert_ctx.go index 23053f4980..54639ad096 100644 --- a/app/vminsert/netstorage/insert_ctx.go +++ b/app/vminsert/netstorage/insert_ctx.go @@ -6,10 +6,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/consts" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" xxhash "github.com/cespare/xxhash/v2" @@ -25,8 +23,6 @@ type InsertCtx struct { bufRowss []bufRows labelsBuf []byte - - resultCh chan error } type bufRows struct { @@ -34,11 +30,15 @@ type bufRows struct { rows int } +func (br *bufRows) reset() { + br.buf = br.buf[:0] + br.rows = 0 +} + func (br *bufRows) pushTo(sn *storageNode) error { bufLen := len(br.buf) err := sn.push(br.buf, br.rows) - br.buf = br.buf[:0] - br.rows = 0 + br.reset() if err != nil { return &httpserver.ErrorWithStatusCode{ Err: fmt.Errorf("cannot send %d bytes to storageNode %q: %s", bufLen, sn.dialer.Addr(), err), @@ -61,16 +61,9 @@ func (ctx *InsertCtx) Reset() { ctx.bufRowss = make([]bufRows, len(storageNodes)) } for i := range ctx.bufRowss { - br := &ctx.bufRowss[i] - br.buf = br.buf[:0] - br.rows = 0 + ctx.bufRowss[i].reset() } ctx.labelsBuf = ctx.labelsBuf[:0] - if ctx.resultCh == nil { - ctx.resultCh = make(chan error, len(storageNodes)) - } else if len(ctx.resultCh) > 0 { - logger.Panicf("BUG: ctx.resultCh must be empty on Reset; got %d items", len(ctx.resultCh)) - } } // AddLabelBytes adds (name, value) label to ctx.Labels. @@ -125,7 +118,7 @@ func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metr br := &ctx.bufRowss[storageNodeIdx] sn := storageNodes[storageNodeIdx] bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value) - if len(bufNew) >= consts.MaxInsertPacketSize { + if len(bufNew) >= maxBufSizePerStorageNode { // Send buf to storageNode, since it is too big. if err := br.pushTo(sn); err != nil { return err @@ -140,23 +133,13 @@ func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metr // FlushBufs flushes ctx bufs to remote storage nodes. func (ctx *InsertCtx) FlushBufs() error { - // Send per-storageNode bufs in parallel. - resultCh := ctx.resultCh - resultChLen := 0 + var firstErr error for i := range ctx.bufRowss { br := &ctx.bufRowss[i] if len(br.buf) == 0 { continue } - resultChLen++ - go func(br *bufRows, sn *storageNode) { - resultCh <- br.pushTo(sn) - }(br, storageNodes[i]) - } - var firstErr error - for i := 0; i < resultChLen; i++ { - err := <-resultCh - if err != nil && firstErr == nil { + if err := br.pushTo(storageNodes[i]); err != nil && firstErr == nil { firstErr = err } } diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index b5f358680f..dbe8ea2557 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -5,8 +5,10 @@ import ( "fmt" "io" "sync" + "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/consts" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" @@ -16,14 +18,23 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" xxhash "github.com/cespare/xxhash/v2" + jump "github.com/lithammer/go-jump-consistent-hash" ) var disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage") -// push pushes buf to sn. +func (sn *storageNode) isBroken() bool { + return atomic.LoadUint32(&sn.broken) != 0 +} + +// push pushes buf to sn internal bufs. // -// It falls back to sending data to another vmstorage node if sn is currently -// unavailable. +// This function doesn't block on fast path. +// It may block only if all the storageNodes cannot handle the incoming ingestion rate. +// This blocking provides backpressure to the caller. +// +// The function falls back to sending data to other vmstorage nodes +// if sn is currently unavailable or overloaded. // // rows is the number of rows in the buf. func (sn *storageNode) push(buf []byte, rows int) error { @@ -32,103 +43,116 @@ func (sn *storageNode) push(buf []byte, rows int) error { } sn.rowsPushed.Add(rows) - sn.mu.Lock() - defer sn.mu.Unlock() - - if sn.broken { - // The vmstorage node is broken. Re-route buf to healthy vmstorage nodes. - if !addToReroutedBuf(buf, rows) { - rowsLostTotal.Add(rows) - return fmt.Errorf("%d rows dropped because of reroutedBuf overflows %d bytes", rows, reroutedBufMaxSize) + if sn.isBroken() { + // The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes. + if err := addToReroutedBuf(buf, rows); err != nil { + return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %s", rows, err) } sn.rowsReroutedFromHere.Add(rows) return nil } - if len(sn.buf)+len(buf) <= maxBufSizePerStorageNode { + sn.brLock.Lock() + if len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode { // Fast path: the buf contents fits sn.buf. - sn.buf = append(sn.buf, buf...) - sn.rows += rows + sn.br.buf = append(sn.br.buf, buf...) + sn.br.rows += rows + sn.brLock.Unlock() return nil } + sn.brLock.Unlock() // Slow path: the buf contents doesn't fit sn.buf. - // Flush sn.buf to vmstorage and then add buf to sn.buf. - if err := sn.flushBufLocked(); err != nil { - // Failed to flush or re-route sn.buf to vmstorage nodes. - // The sn.buf is already dropped by flushBufLocked. - // Drop buf too, since there is little sense in trying to rescue it. - rowsLostTotal.Add(rows) - return err + // This means that the current vmstorage is slow or will become broken soon. + // Re-route buf to healthy vmstorage nodes. + if err := addToReroutedBuf(buf, rows); err != nil { + return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %s", rows, err) } - - // Successful flush. - sn.buf = append(sn.buf, buf...) - sn.rows += rows + sn.rowsReroutedFromHere.Add(rows) return nil } -func (sn *storageNode) sendReroutedRow(buf []byte) error { - sn.mu.Lock() - defer sn.mu.Unlock() +var closedCh = func() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +}() - if sn.broken { - return errBrokenStorageNode - } - if len(sn.buf)+len(buf) > maxBufSizePerStorageNode { - return fmt.Errorf("cannot put %d bytes into vmstorage buffer, since its size cannot exceed %d bytes", len(sn.buf)+len(buf), maxBufSizePerStorageNode) - } - sn.buf = append(sn.buf, buf...) - sn.rows++ - return nil -} - -var errBrokenStorageNode = fmt.Errorf("the vmstorage node is temporarily broken") - -func (sn *storageNode) flushBufLocked() error { - err := sn.sendBufLocked(sn.buf) - if err == nil { - // Successful flush. Remove broken flag. - sn.broken = false - sn.rowsSent.Add(sn.rows) - sn.buf = sn.buf[:0] - sn.rows = 0 - return nil - } - - // Couldn't flush sn.buf to vmstorage. Mark sn as broken - // and try re-routing sn.buf to healthy vmstorage nodes. - sn.broken = true - if addToReroutedBuf(sn.buf, sn.rows) { - // Successfully re-routed data to healthy nodes. - sn.buf = sn.buf[:0] - sn.rows = 0 - return nil - } - // Preserve sn.buf when it cannot be sent to healthy nodes - // in the hope the error will disappear on the next call to flushBufLocked. - // - // This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/294 . - return err -} - -func (sn *storageNode) sendBufLocked(buf []byte) error { - if len(buf) == 0 { - return nil - } - if sn.bc == nil { - if err := sn.dial(); err != nil { - return fmt.Errorf("cannot dial %q: %s", sn.dialer.Addr(), err) +func (sn *storageNode) run(stopCh <-chan struct{}) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var br bufRows + var bc *handshake.BufferedConn + var err error + var waitCh <-chan struct{} + mustStop := false + for !mustStop { + sn.brLock.Lock() + bufLen := len(sn.br.buf) + sn.brLock.Unlock() + waitCh = nil + if len(br.buf) == 0 && bufLen > maxBufSizePerStorageNode/4 { + // Do not sleep, since sn.br.buf contains enough data to process. + waitCh = closedCh + } + select { + case <-stopCh: + mustStop = true + // Make sure the sn.buf is flushed last time before returning + // in order to send the remaining bits of data. + case <-ticker.C: + case <-waitCh: + } + if len(br.buf) == 0 { + sn.brLock.Lock() + sn.br, br = br, sn.br + sn.brLock.Unlock() + } + if bc == nil { + bc, err = sn.dial() + if err != nil { + // Mark sn as broken in order to prevent sending additional data to it until it is recovered. + atomic.StoreUint32(&sn.broken, 1) + if len(br.buf) == 0 { + continue + } + logger.Errorf("re-routing %d bytes with %d rows to other storage nodes because cannot dial storageNode %q: %s", + len(br.buf), br.rows, sn.dialer.Addr(), err) + if addToReroutedBufNonblock(br.buf, br.rows) { + sn.rowsReroutedFromHere.Add(br.rows) + br.reset() + } + continue + } + } + if err = sendToConn(bc, br.buf); err == nil { + // Successfully sent buf to bc. Remove broken flag from sn. + atomic.StoreUint32(&sn.broken, 0) + sn.rowsSent.Add(br.rows) + br.reset() + continue + } + // Couldn't flush buf to sn. Mark sn as broken + // and try re-routing buf to healthy vmstorage nodes. + if err = bc.Close(); err != nil { + logger.Errorf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err) + // continue executing the code below. + } + bc = nil + sn.connectionErrors.Inc() + atomic.StoreUint32(&sn.broken, 1) + if addToReroutedBufNonblock(br.buf, br.rows) { + sn.rowsReroutedFromHere.Add(br.rows) + br.reset() } } - if err := sn.sendToConn(sn.bc, buf); err != nil { - sn.closeBrokenConn() - return err - } - return nil } -func (sn *storageNode) sendToConn(bc *handshake.BufferedConn, buf []byte) error { +func sendToConn(bc *handshake.BufferedConn, buf []byte) error { + if len(buf) == 0 { + // Nothing to send + return nil + } timeoutSeconds := len(buf) / 3e5 if timeoutSeconds < 60 { timeoutSeconds = 60 @@ -141,8 +165,10 @@ func (sn *storageNode) sendToConn(bc *handshake.BufferedConn, buf []byte) error // sizeBuf guarantees that the rows batch will be either fully // read or fully discarded on the vmstorage side. // sizeBuf is used for read optimization in vmstorage. - sn.sizeBuf = encoding.MarshalUint64(sn.sizeBuf[:0], uint64(len(buf))) - if _, err := bc.Write(sn.sizeBuf); err != nil { + sizeBuf := sizeBufPool.Get() + defer sizeBufPool.Put(sizeBuf) + sizeBuf.B = encoding.MarshalUint64(sizeBuf.B[:0], uint64(len(buf))) + if _, err := bc.Write(sizeBuf.B); err != nil { return fmt.Errorf("cannot write data size %d: %s", len(buf), err) } if _, err := bc.Write(buf); err != nil { @@ -158,20 +184,22 @@ func (sn *storageNode) sendToConn(bc *handshake.BufferedConn, buf []byte) error if err := bc.SetReadDeadline(deadline); err != nil { return fmt.Errorf("cannot set read deadline for reading `ack` to vmstorage: %s", err) } - if _, err := io.ReadFull(bc, sn.sizeBuf[:1]); err != nil { + if _, err := io.ReadFull(bc, sizeBuf.B[:1]); err != nil { return fmt.Errorf("cannot read `ack` from vmstorage: %s", err) } - if sn.sizeBuf[0] != 1 { - return fmt.Errorf("unexpected `ack` received from vmstorage; got %d; want %d", sn.sizeBuf[0], 1) + if sizeBuf.B[0] != 1 { + return fmt.Errorf("unexpected `ack` received from vmstorage; got %d; want %d", sizeBuf.B[0], 1) } return nil } -func (sn *storageNode) dial() error { +var sizeBufPool bytesutil.ByteBufferPool + +func (sn *storageNode) dial() (*handshake.BufferedConn, error) { c, err := sn.dialer.Dial() if err != nil { sn.dialErrors.Inc() - return err + return nil, err } compressionLevel := 1 if *disableRPCCompression { @@ -181,87 +209,73 @@ func (sn *storageNode) dial() error { if err != nil { _ = c.Close() sn.handshakeErrors.Inc() - return fmt.Errorf("handshake error: %s", err) - } - sn.bc = bc - return nil -} - -func (sn *storageNode) closeBrokenConn() { - if sn.bc == nil { - return - } - _ = sn.bc.Close() - sn.bc = nil - sn.connectionErrors.Inc() -} - -func (sn *storageNode) run(stopCh <-chan struct{}) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - mustStop := false - for !mustStop { - select { - case <-stopCh: - mustStop = true - // Make sure flushBufLocked is called last time before returning - // in order to send the remaining bits of data. - case <-ticker.C: - } - - sn.mu.Lock() - if err := sn.flushBufLocked(); err != nil { - sn.closeBrokenConn() - logger.Errorf("cannot flush data to storageNode %q: %s", sn.dialer.Addr(), err) - } - sn.mu.Unlock() + return nil, fmt.Errorf("handshake error: %s", err) } + return bc, nil } func rerouteWorker(stopCh <-chan struct{}) { ticker := time.NewTicker(time.Second) defer ticker.Stop() - var buf []byte + var br bufRows + var waitCh <-chan struct{} mustStop := false for !mustStop { + reroutedBRLock.Lock() + bufLen := len(reroutedBR.buf) + reroutedBRLock.Unlock() + waitCh = nil + if len(br.buf) == 0 && bufLen > reroutedBufMaxSize/4 { + // Do not sleep if reroutedBR contains enough data to process. + waitCh = closedCh + } select { case <-stopCh: mustStop = true - // Make sure spreadReroutedBufToStorageNodes is called last time before returning + // Make sure reroutedBR is re-routed last time before returning // in order to reroute the remaining data to healthy vmstorage nodes. case <-ticker.C: + case <-waitCh: } - - var err error - buf, err = spreadReroutedBufToStorageNodes(buf[:0]) - if err != nil { + if len(br.buf) == 0 { + reroutedBRLock.Lock() + reroutedBR, br = br, reroutedBR + reroutedBRLock.Unlock() + } + reroutedBRCond.Broadcast() + if len(br.buf) == 0 { + // Nothing to re-route. + continue + } + sns := getHealthyStorageNodes() + if len(sns) == 0 { + // No more vmstorage nodes to write data to. rerouteErrors.Inc() - logger.Errorf("cannot reroute data among healthy vmstorage nodes: %s", err) + logger.Errorf("cannot send rerouted rows because all the storage nodes are unhealthy") + // Do not reset br in the hope it could be sent next time. + continue } + spreadReroutedBufToStorageNodes(sns, &br) + // There is no need in br.reset() here, since it is already done in spreadReroutedBufToStorageNodes. } + // Notify all the blocked addToReroutedBuf callers, so they may finish the work. + reroutedBRCond.Broadcast() } // storageNode is a client sending data to vmstorage node. type storageNode struct { - mu sync.Mutex - - // Buffer with data that needs to be written to vmstorage node. - buf []byte - - // The number of rows buf contains at the moment. - rows int - - // Temporary buffer for encoding marshaled buf size. - sizeBuf []byte - - // broken is set to true if the given vmstorage node is temporarily unhealthy. + // broken is set to non-zero if the given vmstorage node is temporarily unhealthy. // In this case the data is re-routed to the remaining healthy vmstorage nodes. - broken bool + broken uint32 + + // brLock protects br. + brLock sync.Mutex + + // Buffer with data that needs to be written to the storage node. + br bufRows dialer *netutil.TCPDialer - bc *handshake.BufferedConn - // The number of dial errors to vmstorage node. dialErrors *metrics.Counter @@ -321,15 +335,15 @@ func InitStorageNodes(addrs []string) { rowsReroutedToHere: metrics.NewCounter(fmt.Sprintf(`vm_rpc_rows_rerouted_to_here_total{name="vminsert", addr=%q}`, addr)), } _ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_rows_pending{name="vminsert", addr=%q}`, addr), func() float64 { - sn.mu.Lock() - n := sn.rows - sn.mu.Unlock() + sn.brLock.Lock() + n := sn.br.rows + sn.brLock.Unlock() return float64(n) }) _ = metrics.NewGauge(fmt.Sprintf(`vm_rpc_buf_pending_bytes{name="vminsert", addr=%q}`, addr), func() float64 { - sn.mu.Lock() - n := len(sn.buf) - sn.mu.Unlock() + sn.brLock.Lock() + n := len(sn.br.buf) + sn.brLock.Unlock() return float64(n) }) storageNodes = append(storageNodes, sn) @@ -340,11 +354,17 @@ func InitStorageNodes(addrs []string) { }(addr) } - maxBufSizePerStorageNode = memory.Allowed() / 4 / len(storageNodes) + maxBufSizePerStorageNode = memory.Allowed() / 8 / len(storageNodes) if maxBufSizePerStorageNode > consts.MaxInsertPacketSize { maxBufSizePerStorageNode = consts.MaxInsertPacketSize } reroutedBufMaxSize = memory.Allowed() / 16 + if reroutedBufMaxSize < maxBufSizePerStorageNode { + reroutedBufMaxSize = maxBufSizePerStorageNode + } + if reroutedBufMaxSize > maxBufSizePerStorageNode*len(storageNodes) { + reroutedBufMaxSize = maxBufSizePerStorageNode * len(storageNodes) + } rerouteWorkerWG.Add(1) go func() { rerouteWorker(rerouteWorkerStopCh) @@ -361,136 +381,175 @@ func Stop() { storageNodesWG.Wait() } -func addToReroutedBuf(buf []byte, rows int) bool { - reroutedLock.Lock() - defer reroutedLock.Unlock() - if len(reroutedBuf)+len(buf) > reroutedBufMaxSize { - reroutedBufOverflows.Inc() - return false +// addToReroutedBuf adds buf to reroutedBR. +// +// It waits until the reroutedBR has enough space for buf or if Stop is called. +// This guarantees backpressure if the ingestion rate exceeds vmstorage nodes' +// ingestion rate capacity. +// +// It returns non-nil error only in the following cases: +// +// - if all the storage nodes are unhealthy. +// - if Stop is called. +func addToReroutedBuf(buf []byte, rows int) error { + if len(buf) > reroutedBufMaxSize { + logger.Panicf("BUG: len(buf)=%d cannot exceed reroutedBufMaxSize=%d", len(buf), reroutedBufMaxSize) } - reroutedBuf = append(reroutedBuf, buf...) - reroutedRows += rows + + reroutedBRLock.Lock() + defer reroutedBRLock.Unlock() + + for len(reroutedBR.buf)+len(buf) > reroutedBufMaxSize { + if getHealthyStorageNodesCount() == 0 { + rowsLostTotal.Add(rows) + return fmt.Errorf("all the vmstorage nodes are unavailable and reroutedBR has no enough space for storing %d bytes; only %d bytes left in reroutedBR", + len(buf), reroutedBufMaxSize-len(reroutedBR.buf)) + } + select { + case <-rerouteWorkerStopCh: + rowsLostTotal.Add(rows) + return fmt.Errorf("rerouteWorker cannot send the data since it is stopped") + default: + } + + // The reroutedBR.buf has no enough space for len(buf). Wait while the reroutedBR.buf is be sent by rerouteWorker. + reroutedBufWaits.Inc() + reroutedBRCond.Wait() + } + reroutedBR.buf = append(reroutedBR.buf, buf...) + reroutedBR.rows += rows reroutesTotal.Inc() - return true + return nil } -func spreadReroutedBufToStorageNodes(swapBuf []byte) ([]byte, error) { - healthyStorageNodes := getHealthyStorageNodes() - if len(healthyStorageNodes) == 0 { - // No more vmstorage nodes to write data to. - return swapBuf, fmt.Errorf("all the storage nodes are unhealthy") +// addToReroutedBufNonblock adds buf to reroutedBR. +// +// It returns true if buf has been successfully added to reroutedBR. +func addToReroutedBufNonblock(buf []byte, rows int) bool { + if len(buf) > reroutedBufMaxSize { + logger.Panicf("BUG: len(buf)=%d cannot exceed reroutedBufMaxSize=%d", len(buf), reroutedBufMaxSize) } - - reroutedLock.Lock() - reroutedBuf, swapBuf = swapBuf[:0], reroutedBuf - rows := reroutedRows - reroutedRows = 0 - reroutedLock.Unlock() - - if len(swapBuf) == 0 { - // Nothing to re-route. - return swapBuf, nil + reroutedBRLock.Lock() + ok := len(reroutedBR.buf)+len(buf) <= reroutedBufMaxSize + if ok { + reroutedBR.buf = append(reroutedBR.buf, buf...) + reroutedBR.rows += rows + reroutesTotal.Inc() } + reroutedBRLock.Unlock() + return ok +} +func getHealthyStorageNodesCount() int { + n := 0 + for _, sn := range storageNodes { + if !sn.isBroken() { + n++ + } + } + return n +} + +func getHealthyStorageNodes() []*storageNode { + sns := make([]*storageNode, 0, len(storageNodes)-1) + for _, sn := range storageNodes { + if !sn.isBroken() { + sns = append(sns, sn) + } + } + return sns +} + +func spreadReroutedBufToStorageNodes(sns []*storageNode, br *bufRows) { var mr storage.MetricRow - src := swapBuf rowsProcessed := 0 + src := br.buf for len(src) > 0 { tail, err := mr.Unmarshal(src) if err != nil { - logger.Panicf("BUG: cannot unmarshal recently marshaled MetricRow: %s", err) + logger.Panicf("BUG: cannot unmarshal MetricRow from reroutedBR.buf: %s", err) } rowBuf := src[:len(src)-len(tail)] src = tail - // Use non-consistent hashing instead of jump hash in order to re-route rows - // equally among healthy vmstorage nodes. - // This should spread the increased load among healthy vmstorage nodes. - h := xxhash.Sum64(mr.MetricNameRaw) - idx := h % uint64(len(healthyStorageNodes)) + idx := uint64(0) + if len(sns) > 1 { + h := xxhash.Sum64(mr.MetricNameRaw) + idx = uint64(jump.Hash(h, int32(len(sns)))) + } attempts := 0 for { - sn := healthyStorageNodes[idx] - err := sn.sendReroutedRow(rowBuf) - if err == nil { - sn.rowsReroutedToHere.Inc() + sn := sns[idx] + if sn.sendReroutedRow(rowBuf) { + // The row has been successfully re-routed to sn. break } - // Cannot send data to sn. Try sending to the next vmstorage node. + // Cannot re-route data to sn. Try sending to the next vmstorage node. idx++ - if idx >= uint64(len(healthyStorageNodes)) { + if idx >= uint64(len(sns)) { idx = 0 } attempts++ - if attempts < len(healthyStorageNodes) { + if attempts < len(sns) { continue } - // There are no healthy nodes. - // Try returning the remaining data to reroutedBuf if it has enough free space. - rowsRemaining := rows - rowsProcessed - recovered := false - reroutedLock.Lock() - if len(rowBuf)+len(tail)+len(reroutedBuf) <= reroutedBufMaxSize { - swapBuf = append(swapBuf[:0], rowBuf...) - swapBuf = append(swapBuf, tail...) - swapBuf = append(swapBuf, reroutedBuf...) - reroutedBuf, swapBuf = swapBuf, reroutedBuf[:0] - reroutedRows += rowsRemaining - recovered = true - } - reroutedLock.Unlock() - if recovered { - return swapBuf, nil - } - rowsLostTotal.Add(rowsRemaining) - return swapBuf, fmt.Errorf("all the %d vmstorage nodes are unavailable; lost %d rows; last error: %s", len(storageNodes), rowsRemaining, err) + + // There is no enough buffer space in all the vmstorage nodes. + // Return the remaining data to br.buf, so it may be processed later. + br.buf = append(br.buf[:0], rowBuf...) + br.buf = append(br.buf, src...) + br.rows -= rowsProcessed + return } rowsProcessed++ } - if rowsProcessed != rows { - logger.Panicf("BUG: unexpected number of rows processed; got %d; want %d", rowsProcessed, rows) + if rowsProcessed != br.rows { + logger.Panicf("BUG: unexpected number of rows processed; got %d; want %d", rowsProcessed, br.rows) } reroutedRowsProcessed.Add(rowsProcessed) - return swapBuf, nil + br.reset() +} + +func (sn *storageNode) sendReroutedRow(buf []byte) bool { + if sn.isBroken() { + return false + } + sn.brLock.Lock() + ok := len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode + if ok { + sn.br.buf = append(sn.br.buf, buf...) + sn.br.rows++ + sn.rowsReroutedToHere.Inc() + } + sn.brLock.Unlock() + return ok } var ( maxBufSizePerStorageNode int - reroutedLock sync.Mutex - reroutedBuf []byte - reroutedRows int + reroutedBR bufRows + reroutedBRLock sync.Mutex + reroutedBRCond = sync.NewCond(&reroutedBRLock) reroutedBufMaxSize int reroutedRowsProcessed = metrics.NewCounter(`vm_rpc_rerouted_rows_processed_total{name="vminsert"}`) - reroutedBufOverflows = metrics.NewCounter(`vm_rpc_rerouted_buf_overflows_total{name="vminsert"}`) + reroutedBufWaits = metrics.NewCounter(`vm_rpc_rerouted_buf_waits_total{name="vminsert"}`) reroutesTotal = metrics.NewCounter(`vm_rpc_reroutes_total{name="vminsert"}`) _ = metrics.NewGauge(`vm_rpc_rerouted_rows_pending{name="vminsert"}`, func() float64 { - reroutedLock.Lock() - n := reroutedRows - reroutedLock.Unlock() + reroutedBRLock.Lock() + n := reroutedBR.rows + reroutedBRLock.Unlock() return float64(n) }) _ = metrics.NewGauge(`vm_rpc_rerouted_buf_pending_bytes{name="vminsert"}`, func() float64 { - reroutedLock.Lock() - n := len(reroutedBuf) - reroutedLock.Unlock() + reroutedBRLock.Lock() + n := len(reroutedBR.buf) + reroutedBRLock.Unlock() return float64(n) }) rerouteErrors = metrics.NewCounter(`vm_rpc_reroute_errors_total{name="vminsert"}`) rowsLostTotal = metrics.NewCounter(`vm_rpc_rows_lost_total{name="vminsert"}`) ) - -func getHealthyStorageNodes() []*storageNode { - sns := make([]*storageNode, 0, len(storageNodes)-1) - for _, sn := range storageNodes { - sn.mu.Lock() - if !sn.broken { - sns = append(sns, sn) - } - sn.mu.Unlock() - } - return sns -} diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 8135933df0..a5a93f33a4 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -250,6 +250,8 @@ Each instance type - `vminsert`, `vmselect` and `vmstorage` - can run on the mos * The recommended total number of vCPU cores for all the `vminsert` instances can be calculated from the ingestion rate: `vCPUs = ingestion_rate / 150K`. * The recommended number of vCPU cores per each `vminsert` instance should equal to the number of `vmstorage` instances in the cluster. * The amount of RAM per each `vminsert` instance should be 1GB or more. RAM is used as a buffer for spikes in ingestion rate. + The maximum amount of used RAM per `vminsert` node can be tuned with `-memory.allowedPercent` command-line flag. For instance, `-memory.allowedPercent=20` + limits the maximum amount of used RAM to 20% of the available RAM on the host system. * Sometimes `-rpc.disableCompression` command-line flag on `vminsert` instances could increase ingestion capacity at the cost of higher network bandwidth usage between `vminsert` and `vmstorage`.