diff --git a/lib/leveledbytebufferpool/pool.go b/lib/leveledbytebufferpool/pool.go index a3a18bd54..14416122e 100644 --- a/lib/leveledbytebufferpool/pool.go +++ b/lib/leveledbytebufferpool/pool.go @@ -19,9 +19,6 @@ var pools [30]sync.Pool // Get returns byte buffer with the given capacity. func Get(capacity int) *bytesutil.ByteBuffer { - if capacity <= 0 { - capacity = 1 - } id, capacityNeeded := getPoolIdAndCapacity(capacity) for i := 0; i < 2; i++ { if id < 0 || id >= len(pools) { diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 485193fa6..1ccfee6ae 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "math" + "math/bits" "strings" "sync" "time" @@ -136,6 +137,10 @@ type scrapeWork struct { // prevBodyLen contains the previous response body length for the given scrape work. // It is used as a hint in order to reduce memory usage for body buffers. prevBodyLen int + + // prevLabelsLen contains the number of all the labels generated during the previous scrape. + // It is used as a hint in order to reduce memory usage when parsing scrape responses. + prevLabelsLen int } func (sw *scrapeWork) run(stopCh <-chan struct{}) { @@ -212,7 +217,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error scrapeDuration.Update(duration) scrapeResponseSize.Update(float64(len(body.B))) up := 1 - wc := writeRequestCtxPool.Get().(*writeRequestCtx) + wc := writeRequestCtxPool.Get(sw.prevLabelsLen) if err != nil { up = 0 scrapesFailed.Inc() @@ -241,6 +246,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error startTime := time.Now() sw.PushData(&wc.writeRequest) pushDataDuration.UpdateDuration(startTime) + sw.prevLabelsLen = len(wc.labels) wc.reset() writeRequestCtxPool.Put(wc) // body must be released only after wc is released, since wc refers to body. @@ -250,6 +256,50 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error return err } +// leveldWriteRequestCtxPool allows reducing memory usage when writeRequesCtx +// structs contain mixed number of labels. +// +// Its logic has been copied from leveledbytebufferpool. +type leveledWriteRequestCtxPool struct { + pools [30]sync.Pool +} + +func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx { + id, capacityNeeded := lwp.getPoolIdAndCapacity(labelsCapacity) + for i := 0; i < 2; i++ { + if id < 0 || id >= len(lwp.pools) { + break + } + if v := lwp.pools[id].Get(); v != nil { + return v.(*writeRequestCtx) + } + id++ + } + return &writeRequestCtx{ + labels: make([]prompbmarshal.Label, 0, capacityNeeded), + } +} + +func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) { + capacity := cap(wc.labels) + id, _ := lwp.getPoolIdAndCapacity(capacity) + wc.reset() + lwp.pools[id].Put(wc) +} + +func (lwp *leveledWriteRequestCtxPool) getPoolIdAndCapacity(size int) (int, int) { + size-- + if size < 0 { + size = 0 + } + size >>= 3 + id := bits.Len(uint(size)) + if id > len(lwp.pools) { + id = len(lwp.pools) - 1 + } + return id, (1 << (id + 3)) +} + type writeRequestCtx struct { rows parser.Rows writeRequest prompbmarshal.WriteRequest @@ -264,11 +314,7 @@ func (wc *writeRequestCtx) reset() { wc.samples = wc.samples[:0] } -var writeRequestCtxPool = &sync.Pool{ - New: func() interface{} { - return &writeRequestCtx{} - }, -} +var writeRequestCtxPool leveledWriteRequestCtxPool func (sw *scrapeWork) getSeriesAdded(wc *writeRequestCtx) int { mPrev := sw.prevSeriesMap