mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 23:39:48 +01:00
61e483a01c
This is a follow-up for b84aea1e6e
144 lines
4.6 KiB
Go
144 lines
4.6 KiB
Go
package clusternative
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// ParseStream parses data sent from vminsert to bc and calls callback for parsed rows.
|
|
//
|
|
// The callback can be called concurrently multiple times for streamed data from req.
|
|
//
|
|
// callback shouldn't hold block after returning.
|
|
func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.MetricRow) error) error {
|
|
var wg sync.WaitGroup
|
|
var (
|
|
callbackErrLock sync.Mutex
|
|
callbackErr error
|
|
)
|
|
for {
|
|
// Do not use unmarshalWork pool, since every unmarshalWork structure usually occupies
|
|
// big amounts of memory (more than consts.MaxInsertPacketSize bytes).
|
|
// The pool would result in increased memory usage.
|
|
uw := &unmarshalWork{}
|
|
uw.callback = func(rows []storage.MetricRow) {
|
|
if err := callback(rows); err != nil {
|
|
processErrors.Inc()
|
|
callbackErrLock.Lock()
|
|
if callbackErr == nil {
|
|
callbackErr = fmt.Errorf("error when processing native block: %w", err)
|
|
}
|
|
callbackErrLock.Unlock()
|
|
}
|
|
}
|
|
uw.wg = &wg
|
|
var err error
|
|
uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc)
|
|
if err != nil {
|
|
wg.Wait()
|
|
if err == io.EOF {
|
|
// Remote end gracefully closed the connection.
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
blocksRead.Inc()
|
|
wg.Add(1)
|
|
common.ScheduleUnmarshalWork(uw)
|
|
}
|
|
}
|
|
|
|
// readBlock reads the next data block from vminsert-initiated bc, appends it to dst and returns the result.
|
|
func readBlock(dst []byte, bc *handshake.BufferedConn) ([]byte, error) {
|
|
sizeBuf := sizeBufPool.Get()
|
|
defer sizeBufPool.Put(sizeBuf)
|
|
sizeBuf.B = bytesutil.Resize(sizeBuf.B, 8)
|
|
if _, err := io.ReadFull(bc, sizeBuf.B); err != nil {
|
|
if err != io.EOF {
|
|
readErrors.Inc()
|
|
err = fmt.Errorf("cannot read packet size: %w", err)
|
|
}
|
|
return dst, err
|
|
}
|
|
packetSize := encoding.UnmarshalUint64(sizeBuf.B)
|
|
if packetSize > consts.MaxInsertPacketSize {
|
|
parseErrors.Inc()
|
|
return dst, fmt.Errorf("too big packet size: %d; shouldn't exceed %d", packetSize, consts.MaxInsertPacketSize)
|
|
}
|
|
dstLen := len(dst)
|
|
dst = bytesutil.Resize(dst, dstLen+int(packetSize))
|
|
if n, err := io.ReadFull(bc, dst[dstLen:]); err != nil {
|
|
readErrors.Inc()
|
|
return dst, fmt.Errorf("cannot read packet with size %d bytes: %w; read only %d bytes", packetSize, err, n)
|
|
}
|
|
// Send `ack` to vminsert that the packet has been received.
|
|
deadline := time.Now().Add(5 * time.Second)
|
|
if err := bc.SetWriteDeadline(deadline); err != nil {
|
|
writeErrors.Inc()
|
|
return dst, fmt.Errorf("cannot set write deadline for sending `ack` to vminsert: %w", err)
|
|
}
|
|
sizeBuf.B[0] = 1
|
|
if _, err := bc.Write(sizeBuf.B[:1]); err != nil {
|
|
writeErrors.Inc()
|
|
return dst, fmt.Errorf("cannot send `ack` to vminsert: %w", err)
|
|
}
|
|
if err := bc.Flush(); err != nil {
|
|
writeErrors.Inc()
|
|
return dst, fmt.Errorf("cannot flush `ack` to vminsert: %w", err)
|
|
}
|
|
return dst, nil
|
|
}
|
|
|
|
var sizeBufPool bytesutil.ByteBufferPool
|
|
|
|
var (
|
|
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="clusternative"}`)
|
|
writeErrors = metrics.NewCounter(`vm_protoparser_write_errors_total{type="clusternative"}`)
|
|
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="clusternative"}`)
|
|
blocksRead = metrics.NewCounter(`vm_protoparser_blocks_read_total{type="clusternative"}`)
|
|
|
|
parseErrors = metrics.NewCounter(`vm_protoparser_parse_errors_total{type="clusternative"}`)
|
|
processErrors = metrics.NewCounter(`vm_protoparser_process_errors_total{type="clusternative"}`)
|
|
)
|
|
|
|
type unmarshalWork struct {
|
|
wg *sync.WaitGroup
|
|
callback func(rows []storage.MetricRow)
|
|
reqBuf []byte
|
|
mrs []storage.MetricRow
|
|
}
|
|
|
|
// Unmarshal implements common.UnmarshalWork
|
|
func (uw *unmarshalWork) Unmarshal() {
|
|
reqBuf := uw.reqBuf
|
|
for len(reqBuf) > 0 {
|
|
// Limit the number of rows passed to callback in order to reduce memory usage
|
|
// when processing big packets of rows.
|
|
mrs, tail, err := storage.UnmarshalMetricRows(uw.mrs[:0], reqBuf, maxRowsPerCallback)
|
|
uw.mrs = mrs
|
|
if err != nil {
|
|
parseErrors.Inc()
|
|
logger.Errorf("cannot unmarshal MetricRow from clusternative block with size %d (remaining %d bytes): %s", len(reqBuf), len(tail), err)
|
|
break
|
|
}
|
|
rowsRead.Add(len(mrs))
|
|
uw.callback(mrs)
|
|
reqBuf = tail
|
|
}
|
|
wg := uw.wg
|
|
wg.Done()
|
|
}
|
|
|
|
const maxRowsPerCallback = 10000
|