diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index 29f26a030a..7f38689a4c 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/consts" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" @@ -272,7 +273,15 @@ func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error { sizeBuf := make([]byte, 8) var buf []byte var mrs []storage.MetricRow + lastMRsResetTime := fasttime.UnixTimestamp() for { + if fasttime.UnixTimestamp()-lastMRsResetTime > 10 { + // Periodically reset mrs in order to prevent from gradual memory usage growth + // when ceratin entries in mr contain too long labels. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details. + mrs = nil + lastMRsResetTime = fasttime.UnixTimestamp() + } if _, err := io.ReadFull(bc, sizeBuf); err != nil { if err == io.EOF { // Remote end gracefully closed the connection. @@ -317,6 +326,16 @@ func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error { if err != nil { return fmt.Errorf("cannot unmarshal MetricRow: %s", err) } + if len(mrs) >= 10000 { + // Store the collected mrs in order to reduce memory usage + // when too big number of mrs are sent in each packet. + // This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 + vminsertMetricsRead.Add(len(mrs)) + if err := s.storage.AddRows(mrs, uint8(*precisionBits)); err != nil { + return fmt.Errorf("cannot store metrics: %s", err) + } + mrs = mrs[:0] + } } vminsertMetricsRead.Add(len(mrs)) if err := s.storage.AddRows(mrs, uint8(*precisionBits)); err != nil {