From b84aea1e6ef1fe1786443501752d156ee9bc7bfc Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 23 Jun 2021 15:45:05 +0300 Subject: [PATCH] lib/protoparser/clusternative: do not pool unmarshalWork structs, since they can occupy big amounts of memory (more than 100MB per each struct) This should reduce memory usage for vmstorage under high ingestion rate when the vmstorage runs on a system with big number of CPU cores --- docs/CHANGELOG.md | 1 + lib/protoparser/clusternative/streamparser.go | 78 +++++-------------- lib/storage/storage.go | 11 ++- 3 files changed, 26 insertions(+), 64 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 30d7ffaaff..251a8b430e 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -16,6 +16,7 @@ sort: 15 * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): limit the number of samples per each imported JSON line. This should limit the memory usage at VictoriaMetrics side when importing time series with big number of samples. * FEATURE: vmselect: log slow queries across all the `/api/v1/*` handlers (aka [Prometheus query API](https://prometheus.io/docs/prometheus/latest/querying/api)) if their execution duration exceeds `-search.logSlowQueryDuration`. This should simplify debugging slow requests to such handlers as `/api/v1/labels` or `/api/v1/series` additionally to `/api/v1/query` and `/api/v1/query_range`, which were logged in the previous releases. * FEATURE: vminsert: sort the `-storageNode` list in order to guarantee the identical `series -> vmstorage` mapping across all the `vminsert` nodes. This should reduce resource usage (RAM, CPU and disk IO) at `vmstorage` nodes if `vmstorage` addresses are passed in random order to `vminsert` nodes. +* FEATURE: vmstorage: reduce memory usage on a system with many CPU cores under high ingestion rate. * BUGFIX: prevent from adding new samples to deleted time series after the rotation of the inverted index (the rotation is performed once per `-retentionPeriod`). See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347#issuecomment-861232136) for details. * BUGFIX: vmstorage: reduce high disk write IO usage on systems with big number of CPU cores. The issue has been introduced in the release [v1.59.0](#v1590). See [this commit](aa9b56a046b6ae8083fa659df35dd5e994bf9115) and [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1338#issuecomment-863046999) for details. diff --git a/lib/protoparser/clusternative/streamparser.go b/lib/protoparser/clusternative/streamparser.go index 79e73c373f..218270e108 100644 --- a/lib/protoparser/clusternative/streamparser.go +++ b/lib/protoparser/clusternative/streamparser.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/consts" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -29,7 +28,10 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric callbackErr error ) for { - uw := getUnmarshalWork() + // Do not use unmarshalWork pool, since every unmarshalWork structure usually occupies + // big amounts of memory (more than consts.MaxInsertPacketSize bytes). + // The pool would result in increased memory usage. + uw := &unmarshalWork{} uw.callback = func(rows []storage.MetricRow) { if err := callback(rows); err != nil { processErrors.Inc() @@ -47,7 +49,6 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric wg.Wait() if err == io.EOF { // Remote end gracefully closed the connection. - putUnmarshalWork(uw) return nil } return err @@ -119,68 +120,25 @@ type unmarshalWork struct { lastResetTime uint64 } -func (uw *unmarshalWork) reset() { - if len(uw.reqBuf)*4 < cap(uw.reqBuf) && fasttime.UnixTimestamp()-uw.lastResetTime > 10 { - // Periodically reset reqBuf and mrs in order to prevent from gradual memory usage growth - // when ceratin entries in mr contain too long labels. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details. - uw.reqBuf = nil - uw.mrs = nil - uw.lastResetTime = fasttime.UnixTimestamp() - } - uw.wg = nil - uw.callback = nil - uw.reqBuf = uw.reqBuf[:0] - mrs := uw.mrs - for i := range mrs { - mrs[i].ResetX() - } - uw.mrs = uw.mrs[:0] -} - // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { - defer uw.wg.Done() - if err := uw.unmarshal(); err != nil { - parseErrors.Inc() - logger.Errorf("error when unmarshaling clusternative block: %s", err) - putUnmarshalWork(uw) - return - } - mrs := uw.mrs - for len(mrs) > maxRowsPerCallback { + reqBuf := uw.reqBuf + for len(reqBuf) > 0 { // Limit the number of rows passed to callback in order to reduce memory usage // when processing big packets of rows. - uw.callback(mrs[:maxRowsPerCallback]) - mrs = mrs[maxRowsPerCallback:] + mrs, tail, err := storage.UnmarshalMetricRows(uw.mrs[:0], reqBuf, maxRowsPerCallback) + uw.mrs = mrs + if err != nil { + parseErrors.Inc() + logger.Errorf("cannot unmarshal MetricRow from clusternative block with size %d (remaining %d bytes): %s", len(reqBuf), len(tail), err) + break + } + rowsRead.Add(len(mrs)) + uw.callback(mrs) + reqBuf = tail } - uw.callback(mrs) - putUnmarshalWork(uw) + wg := uw.wg + wg.Done() } const maxRowsPerCallback = 10000 - -func (uw *unmarshalWork) unmarshal() error { - var err error - uw.mrs, err = storage.UnmarshalMetricRows(uw.mrs[:0], uw.reqBuf) - if err != nil { - return fmt.Errorf("cannot unmarshal MetricRow from clusternative block: %s", err) - } - rowsRead.Add(len(uw.mrs)) - return nil -} - -func getUnmarshalWork() *unmarshalWork { - v := unmarshalWorkPool.Get() - if v == nil { - return &unmarshalWork{} - } - return v.(*unmarshalWork) -} - -func putUnmarshalWork(uw *unmarshalWork) { - uw.reset() - unmarshalWorkPool.Put(uw) -} - -var unmarshalWorkPool sync.Pool diff --git a/lib/storage/storage.go b/lib/storage/storage.go index fc5810c50e..318b9b4b60 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1438,9 +1438,11 @@ func MarshalMetricRow(dst []byte, metricNameRaw []byte, timestamp int64, value f // UnmarshalMetricRows appends unmarshaled MetricRow items from src to dst and returns the result. // +// Up to maxRows rows are unmarshaled at once. The remaining byte slice is returned to the caller. +// // The returned MetricRow items refer to src, so they become invalid as soon as src changes. -func UnmarshalMetricRows(dst []MetricRow, src []byte) ([]MetricRow, error) { - for len(src) > 0 { +func UnmarshalMetricRows(dst []MetricRow, src []byte, maxRows int) ([]MetricRow, []byte, error) { + for len(src) > 0 && maxRows > 0 { if len(dst) < cap(dst) { dst = dst[:len(dst)+1] } else { @@ -1449,11 +1451,12 @@ func UnmarshalMetricRows(dst []MetricRow, src []byte) ([]MetricRow, error) { mr := &dst[len(dst)-1] tail, err := mr.UnmarshalX(src) if err != nil { - return dst, err + return dst, tail, err } src = tail + maxRows-- } - return dst, nil + return dst, src, nil } // UnmarshalX unmarshals mr from src and returns the remaining tail from src.