diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 30d7ffaaff..251a8b430e 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -16,6 +16,7 @@ sort: 15 * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): limit the number of samples per each imported JSON line. This should limit the memory usage at VictoriaMetrics side when importing time series with big number of samples. * FEATURE: vmselect: log slow queries across all the `/api/v1/*` handlers (aka [Prometheus query API](https://prometheus.io/docs/prometheus/latest/querying/api)) if their execution duration exceeds `-search.logSlowQueryDuration`. This should simplify debugging slow requests to such handlers as `/api/v1/labels` or `/api/v1/series` additionally to `/api/v1/query` and `/api/v1/query_range`, which were logged in the previous releases. * FEATURE: vminsert: sort the `-storageNode` list in order to guarantee the identical `series -> vmstorage` mapping across all the `vminsert` nodes. This should reduce resource usage (RAM, CPU and disk IO) at `vmstorage` nodes if `vmstorage` addresses are passed in random order to `vminsert` nodes. +* FEATURE: vmstorage: reduce memory usage on a system with many CPU cores under high ingestion rate. * BUGFIX: prevent from adding new samples to deleted time series after the rotation of the inverted index (the rotation is performed once per `-retentionPeriod`). See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347#issuecomment-861232136) for details. * BUGFIX: vmstorage: reduce high disk write IO usage on systems with big number of CPU cores. The issue has been introduced in the release [v1.59.0](#v1590). See [this commit](aa9b56a046b6ae8083fa659df35dd5e994bf9115) and [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1338#issuecomment-863046999) for details. diff --git a/lib/protoparser/clusternative/streamparser.go b/lib/protoparser/clusternative/streamparser.go index 79e73c373f..218270e108 100644 --- a/lib/protoparser/clusternative/streamparser.go +++ b/lib/protoparser/clusternative/streamparser.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/consts" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -29,7 +28,10 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric callbackErr error ) for { - uw := getUnmarshalWork() + // Do not use unmarshalWork pool, since every unmarshalWork structure usually occupies + // big amounts of memory (more than consts.MaxInsertPacketSize bytes). + // The pool would result in increased memory usage. + uw := &unmarshalWork{} uw.callback = func(rows []storage.MetricRow) { if err := callback(rows); err != nil { processErrors.Inc() @@ -47,7 +49,6 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric wg.Wait() if err == io.EOF { // Remote end gracefully closed the connection. - putUnmarshalWork(uw) return nil } return err @@ -119,68 +120,25 @@ type unmarshalWork struct { lastResetTime uint64 } -func (uw *unmarshalWork) reset() { - if len(uw.reqBuf)*4 < cap(uw.reqBuf) && fasttime.UnixTimestamp()-uw.lastResetTime > 10 { - // Periodically reset reqBuf and mrs in order to prevent from gradual memory usage growth - // when ceratin entries in mr contain too long labels. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details. - uw.reqBuf = nil - uw.mrs = nil - uw.lastResetTime = fasttime.UnixTimestamp() - } - uw.wg = nil - uw.callback = nil - uw.reqBuf = uw.reqBuf[:0] - mrs := uw.mrs - for i := range mrs { - mrs[i].ResetX() - } - uw.mrs = uw.mrs[:0] -} - // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { - defer uw.wg.Done() - if err := uw.unmarshal(); err != nil { - parseErrors.Inc() - logger.Errorf("error when unmarshaling clusternative block: %s", err) - putUnmarshalWork(uw) - return - } - mrs := uw.mrs - for len(mrs) > maxRowsPerCallback { + reqBuf := uw.reqBuf + for len(reqBuf) > 0 { // Limit the number of rows passed to callback in order to reduce memory usage // when processing big packets of rows. - uw.callback(mrs[:maxRowsPerCallback]) - mrs = mrs[maxRowsPerCallback:] + mrs, tail, err := storage.UnmarshalMetricRows(uw.mrs[:0], reqBuf, maxRowsPerCallback) + uw.mrs = mrs + if err != nil { + parseErrors.Inc() + logger.Errorf("cannot unmarshal MetricRow from clusternative block with size %d (remaining %d bytes): %s", len(reqBuf), len(tail), err) + break + } + rowsRead.Add(len(mrs)) + uw.callback(mrs) + reqBuf = tail } - uw.callback(mrs) - putUnmarshalWork(uw) + wg := uw.wg + wg.Done() } const maxRowsPerCallback = 10000 - -func (uw *unmarshalWork) unmarshal() error { - var err error - uw.mrs, err = storage.UnmarshalMetricRows(uw.mrs[:0], uw.reqBuf) - if err != nil { - return fmt.Errorf("cannot unmarshal MetricRow from clusternative block: %s", err) - } - rowsRead.Add(len(uw.mrs)) - return nil -} - -func getUnmarshalWork() *unmarshalWork { - v := unmarshalWorkPool.Get() - if v == nil { - return &unmarshalWork{} - } - return v.(*unmarshalWork) -} - -func putUnmarshalWork(uw *unmarshalWork) { - uw.reset() - unmarshalWorkPool.Put(uw) -} - -var unmarshalWorkPool sync.Pool diff --git a/lib/storage/storage.go b/lib/storage/storage.go index fc5810c50e..318b9b4b60 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1438,9 +1438,11 @@ func MarshalMetricRow(dst []byte, metricNameRaw []byte, timestamp int64, value f // UnmarshalMetricRows appends unmarshaled MetricRow items from src to dst and returns the result. // +// Up to maxRows rows are unmarshaled at once. The remaining byte slice is returned to the caller. +// // The returned MetricRow items refer to src, so they become invalid as soon as src changes. -func UnmarshalMetricRows(dst []MetricRow, src []byte) ([]MetricRow, error) { - for len(src) > 0 { +func UnmarshalMetricRows(dst []MetricRow, src []byte, maxRows int) ([]MetricRow, []byte, error) { + for len(src) > 0 && maxRows > 0 { if len(dst) < cap(dst) { dst = dst[:len(dst)+1] } else { @@ -1449,11 +1451,12 @@ func UnmarshalMetricRows(dst []MetricRow, src []byte) ([]MetricRow, error) { mr := &dst[len(dst)-1] tail, err := mr.UnmarshalX(src) if err != nil { - return dst, err + return dst, tail, err } src = tail + maxRows-- } - return dst, nil + return dst, src, nil } // UnmarshalX unmarshals mr from src and returns the remaining tail from src.