mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-23 12:31:07 +01:00
lib/persistentqueue: add vm_persistentqueue_read_duration_seconds_total and vm_persistentqueue_write_duration_seconds_total metrics for determining disk usage saturation at vmagent
This commit is contained in:
parent
b900560b83
commit
b688960db0
@ -8,6 +8,7 @@ sort: 15
|
|||||||
|
|
||||||
* FEATURE: add `now()` function to MetricsQL. This function returns the current timestamp in seconds. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#now).
|
* FEATURE: add `now()` function to MetricsQL. This function returns the current timestamp in seconds. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#now).
|
||||||
* FEATURE: vmauth: allow using optional `name` field in configs. This field is then used as `username` label value for `vmauth_user_requests_total` metric. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1805).
|
* FEATURE: vmauth: allow using optional `name` field in configs. This field is then used as `username` label value for `vmauth_user_requests_total` metric. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1805).
|
||||||
|
* FEATURE: vmagent: export `vm_persistentqueue_read_duration_seconds_total` and `vm_persistentqueue_write_duration_seconds_total` metrics, which can be used for detecting persistent queue saturation with `rate(vm_persistentqueue_write_duration_seconds_total) > 0.9s` alerting rule.
|
||||||
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html), [vmrestore](https://docs.victoriametrics.com/vmrestore.html): add `-s3ForcePathStyle` command-line flag, which can be used for making backups to [Aliyun OSS](https://www.aliyun.com/product/oss). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1802).
|
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html), [vmrestore](https://docs.victoriametrics.com/vmrestore.html): add `-s3ForcePathStyle` command-line flag, which can be used for making backups to [Aliyun OSS](https://www.aliyun.com/product/oss). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1802).
|
||||||
|
|
||||||
* BUGFIX: vmstorage [enterprise](https://victoriametrics.com/enterprise.html): added missing `vm_tenant_used_tenant_bytes` metric, which shows the approximate per-tenant disk usage. See [these docs](https://docs.victoriametrics.com/PerTenantStatistic.html) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1605).
|
* BUGFIX: vmstorage [enterprise](https://victoriametrics.com/enterprise.html): added missing `vm_tenant_used_tenant_bytes` metric, which shows the approximate per-tenant disk usage. See [these docs](https://docs.victoriametrics.com/PerTenantStatistic.html) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1605).
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
@ -363,8 +364,6 @@ func (q *queue) metainfoPath() string {
|
|||||||
// MustWriteBlock writes block to q.
|
// MustWriteBlock writes block to q.
|
||||||
//
|
//
|
||||||
// The block size cannot exceed MaxBlockSize.
|
// The block size cannot exceed MaxBlockSize.
|
||||||
//
|
|
||||||
// It is safe calling this function from concurrent goroutines.
|
|
||||||
func (q *queue) MustWriteBlock(block []byte) {
|
func (q *queue) MustWriteBlock(block []byte) {
|
||||||
if uint64(len(block)) > q.maxBlockSize {
|
if uint64(len(block)) > q.maxBlockSize {
|
||||||
logger.Panicf("BUG: too big block to send: %d bytes; it mustn't exceed %d bytes", len(block), q.maxBlockSize)
|
logger.Panicf("BUG: too big block to send: %d bytes; it mustn't exceed %d bytes", len(block), q.maxBlockSize)
|
||||||
@ -408,6 +407,10 @@ func (q *queue) MustWriteBlock(block []byte) {
|
|||||||
var blockBufPool bytesutil.ByteBufferPool
|
var blockBufPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
func (q *queue) writeBlock(block []byte) error {
|
func (q *queue) writeBlock(block []byte) error {
|
||||||
|
startTime := time.Now()
|
||||||
|
defer func() {
|
||||||
|
writeDurationSeconds.Add(time.Since(startTime).Seconds())
|
||||||
|
}()
|
||||||
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
if q.writerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
||||||
if err := q.nextChunkFileForWrite(); err != nil {
|
if err := q.nextChunkFileForWrite(); err != nil {
|
||||||
return fmt.Errorf("cannot create next chunk file: %w", err)
|
return fmt.Errorf("cannot create next chunk file: %w", err)
|
||||||
@ -433,6 +436,8 @@ func (q *queue) writeBlock(block []byte) error {
|
|||||||
return q.flushWriterMetainfoIfNeeded()
|
return q.flushWriterMetainfoIfNeeded()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var writeDurationSeconds = metrics.NewFloatCounter(`vm_persistentqueue_write_duration_seconds_total`)
|
||||||
|
|
||||||
func (q *queue) nextChunkFileForWrite() error {
|
func (q *queue) nextChunkFileForWrite() error {
|
||||||
// Finalize the current chunk and start new one.
|
// Finalize the current chunk and start new one.
|
||||||
q.writer.MustClose()
|
q.writer.MustClose()
|
||||||
@ -478,6 +483,10 @@ func (q *queue) MustReadBlockNonblocking(dst []byte) ([]byte, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (q *queue) readBlock(dst []byte) ([]byte, error) {
|
func (q *queue) readBlock(dst []byte) ([]byte, error) {
|
||||||
|
startTime := time.Now()
|
||||||
|
defer func() {
|
||||||
|
readDurationSeconds.Add(time.Since(startTime).Seconds())
|
||||||
|
}()
|
||||||
if q.readerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
if q.readerLocalOffset+q.maxBlockSize+8 > q.chunkFileSize {
|
||||||
if err := q.nextChunkFileForRead(); err != nil {
|
if err := q.nextChunkFileForRead(); err != nil {
|
||||||
return dst, fmt.Errorf("cannot open next chunk file: %w", err)
|
return dst, fmt.Errorf("cannot open next chunk file: %w", err)
|
||||||
@ -524,6 +533,8 @@ again:
|
|||||||
return dst, nil
|
return dst, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var readDurationSeconds = metrics.NewFloatCounter(`vm_persistentqueue_read_duration_seconds_total`)
|
||||||
|
|
||||||
func (q *queue) skipBrokenChunkFile() error {
|
func (q *queue) skipBrokenChunkFile() error {
|
||||||
// Try to recover from broken chunk file by skipping it.
|
// Try to recover from broken chunk file by skipping it.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1030
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1030
|
||||||
|
Loading…
Reference in New Issue
Block a user