mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-06 08:02:17 +01:00
f92db26a93
This should increase block sizes and subsequently increase the maximum possible bandwidth per each connection to remote storage. This, in turn, should reduce the probability of storing the data in local buffers. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1235
234 lines
6.2 KiB
Go
234 lines
6.2 KiB
Go
package remotewrite
|
|
|
|
import (
|
|
"flag"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/golang/snappy"
|
|
)
|
|
|
|
var (
|
|
flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+
|
|
"This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url")
|
|
maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 8*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+
|
|
"It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics")
|
|
)
|
|
|
|
// the maximum number of rows to send per each block.
|
|
const maxRowsPerBlock = 10000
|
|
|
|
// the maximum number of labels to send per each block.
|
|
const maxLabelsPerBlock = 10 * maxRowsPerBlock
|
|
|
|
type pendingSeries struct {
|
|
mu sync.Mutex
|
|
wr writeRequest
|
|
|
|
stopCh chan struct{}
|
|
periodicFlusherWG sync.WaitGroup
|
|
}
|
|
|
|
func newPendingSeries(pushBlock func(block []byte), significantFigures, roundDigits int) *pendingSeries {
|
|
var ps pendingSeries
|
|
ps.wr.pushBlock = pushBlock
|
|
ps.wr.significantFigures = significantFigures
|
|
ps.wr.roundDigits = roundDigits
|
|
ps.stopCh = make(chan struct{})
|
|
ps.periodicFlusherWG.Add(1)
|
|
go func() {
|
|
defer ps.periodicFlusherWG.Done()
|
|
ps.periodicFlusher()
|
|
}()
|
|
return &ps
|
|
}
|
|
|
|
func (ps *pendingSeries) MustStop() {
|
|
close(ps.stopCh)
|
|
ps.periodicFlusherWG.Wait()
|
|
}
|
|
|
|
func (ps *pendingSeries) Push(tss []prompbmarshal.TimeSeries) {
|
|
ps.mu.Lock()
|
|
ps.wr.push(tss)
|
|
ps.mu.Unlock()
|
|
}
|
|
|
|
func (ps *pendingSeries) periodicFlusher() {
|
|
flushSeconds := int64(flushInterval.Seconds())
|
|
if flushSeconds <= 0 {
|
|
flushSeconds = 1
|
|
}
|
|
ticker := time.NewTicker(*flushInterval)
|
|
defer ticker.Stop()
|
|
mustStop := false
|
|
for !mustStop {
|
|
select {
|
|
case <-ps.stopCh:
|
|
mustStop = true
|
|
case <-ticker.C:
|
|
if fasttime.UnixTimestamp()-atomic.LoadUint64(&ps.wr.lastFlushTime) < uint64(flushSeconds) {
|
|
continue
|
|
}
|
|
}
|
|
ps.mu.Lock()
|
|
ps.wr.flush()
|
|
ps.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
type writeRequest struct {
|
|
// Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures.
|
|
lastFlushTime uint64
|
|
|
|
// pushBlock is called when whe write request is ready to be sent.
|
|
pushBlock func(block []byte)
|
|
|
|
// How many significant figures must be left before sending the writeRequest to pushBlock.
|
|
significantFigures int
|
|
|
|
// How many decimal digits after point must be left before sending the writeRequest to pushBlock.
|
|
roundDigits int
|
|
|
|
wr prompbmarshal.WriteRequest
|
|
|
|
tss []prompbmarshal.TimeSeries
|
|
|
|
labels []prompbmarshal.Label
|
|
samples []prompbmarshal.Sample
|
|
buf []byte
|
|
}
|
|
|
|
func (wr *writeRequest) reset() {
|
|
// Do not reset pushBlock, significantFigures and roundDigits, since they are re-used.
|
|
|
|
wr.wr.Timeseries = nil
|
|
|
|
for i := range wr.tss {
|
|
ts := &wr.tss[i]
|
|
ts.Labels = nil
|
|
ts.Samples = nil
|
|
}
|
|
wr.tss = wr.tss[:0]
|
|
|
|
promrelabel.CleanLabels(wr.labels)
|
|
wr.labels = wr.labels[:0]
|
|
|
|
wr.samples = wr.samples[:0]
|
|
wr.buf = wr.buf[:0]
|
|
}
|
|
|
|
func (wr *writeRequest) flush() {
|
|
sortLabelsIfNeeded(wr.tss)
|
|
wr.wr.Timeseries = wr.tss
|
|
wr.adjustSampleValues()
|
|
atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp())
|
|
pushWriteRequest(&wr.wr, wr.pushBlock)
|
|
wr.reset()
|
|
}
|
|
|
|
func (wr *writeRequest) adjustSampleValues() {
|
|
samples := wr.samples
|
|
if n := wr.significantFigures; n > 0 {
|
|
for i := range samples {
|
|
s := &samples[i]
|
|
s.Value = decimal.RoundToSignificantFigures(s.Value, n)
|
|
}
|
|
}
|
|
if n := wr.roundDigits; n < 100 {
|
|
for i := range samples {
|
|
s := &samples[i]
|
|
s.Value = decimal.RoundToDecimalDigits(s.Value, n)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) {
|
|
tssDst := wr.tss
|
|
for i := range src {
|
|
tssDst = append(tssDst, prompbmarshal.TimeSeries{})
|
|
wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i])
|
|
if len(wr.samples) >= maxRowsPerBlock || len(wr.labels) >= maxLabelsPerBlock {
|
|
wr.tss = tssDst
|
|
wr.flush()
|
|
tssDst = wr.tss
|
|
}
|
|
}
|
|
wr.tss = tssDst
|
|
}
|
|
|
|
func (wr *writeRequest) copyTimeSeries(dst, src *prompbmarshal.TimeSeries) {
|
|
labelsDst := wr.labels
|
|
labelsLen := len(wr.labels)
|
|
samplesDst := wr.samples
|
|
buf := wr.buf
|
|
for i := range src.Labels {
|
|
labelsDst = append(labelsDst, prompbmarshal.Label{})
|
|
dstLabel := &labelsDst[len(labelsDst)-1]
|
|
srcLabel := &src.Labels[i]
|
|
|
|
buf = append(buf, srcLabel.Name...)
|
|
dstLabel.Name = bytesutil.ToUnsafeString(buf[len(buf)-len(srcLabel.Name):])
|
|
buf = append(buf, srcLabel.Value...)
|
|
dstLabel.Value = bytesutil.ToUnsafeString(buf[len(buf)-len(srcLabel.Value):])
|
|
}
|
|
dst.Labels = labelsDst[labelsLen:]
|
|
|
|
samplesDst = append(samplesDst, src.Samples...)
|
|
dst.Samples = samplesDst[len(samplesDst)-len(src.Samples):]
|
|
|
|
wr.samples = samplesDst
|
|
wr.labels = labelsDst
|
|
wr.buf = buf
|
|
}
|
|
|
|
func pushWriteRequest(wr *prompbmarshal.WriteRequest, pushBlock func(block []byte)) {
|
|
if len(wr.Timeseries) == 0 {
|
|
// Nothing to push
|
|
return
|
|
}
|
|
bb := writeRequestBufPool.Get()
|
|
bb.B = prompbmarshal.MarshalWriteRequest(bb.B[:0], wr)
|
|
if len(bb.B) <= maxUnpackedBlockSize.N {
|
|
zb := snappyBufPool.Get()
|
|
zb.B = snappy.Encode(zb.B[:cap(zb.B)], bb.B)
|
|
writeRequestBufPool.Put(bb)
|
|
if len(zb.B) <= persistentqueue.MaxBlockSize {
|
|
pushBlock(zb.B)
|
|
blockSizeRows.Update(float64(len(wr.Timeseries)))
|
|
blockSizeBytes.Update(float64(len(zb.B)))
|
|
snappyBufPool.Put(zb)
|
|
return
|
|
}
|
|
snappyBufPool.Put(zb)
|
|
} else {
|
|
writeRequestBufPool.Put(bb)
|
|
}
|
|
|
|
// Too big block. Recursively split it into smaller parts.
|
|
timeseries := wr.Timeseries
|
|
n := len(timeseries) / 2
|
|
wr.Timeseries = timeseries[:n]
|
|
pushWriteRequest(wr, pushBlock)
|
|
wr.Timeseries = timeseries[n:]
|
|
pushWriteRequest(wr, pushBlock)
|
|
wr.Timeseries = timeseries
|
|
}
|
|
|
|
var (
|
|
blockSizeBytes = metrics.NewHistogram(`vmagent_remotewrite_block_size_bytes`)
|
|
blockSizeRows = metrics.NewHistogram(`vmagent_remotewrite_block_size_rows`)
|
|
)
|
|
|
|
var writeRequestBufPool bytesutil.ByteBufferPool
|
|
var snappyBufPool bytesutil.ByteBufferPool
|