mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-15 00:13:30 +01:00
all: properly split vm_deduplicated_samples_total
among cluster components
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/345
This commit is contained in:
parent
5e7b4795bd
commit
cf9aee4ec3
@ -286,9 +286,15 @@ func mergeSortBlocks(dst *Result, sbh sortBlocksHeap) {
|
||||
}
|
||||
}
|
||||
|
||||
dst.Timestamps, dst.Values = storage.DeduplicateSamples(dst.Timestamps, dst.Values)
|
||||
timestamps, values := storage.DeduplicateSamples(dst.Timestamps, dst.Values)
|
||||
dedups := len(dst.Timestamps) - len(timestamps)
|
||||
dedupsDuringSelect.Add(dedups)
|
||||
dst.Timestamps = timestamps
|
||||
dst.Values = values
|
||||
}
|
||||
|
||||
var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`)
|
||||
|
||||
type sortBlock struct {
|
||||
// b is used as a temporary storage for unpacked rows before they
|
||||
// go to Timestamps and Values.
|
||||
|
@ -324,6 +324,10 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||
return float64(idbm().SizeBytes)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_deduplicated_samples_total{type="merge"}`, func() float64 {
|
||||
return float64(m().DedupsDuringMerge)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_rows_ignored_total{reason="big_timestamp"}`, func() float64 {
|
||||
return float64(m().TooBigTimestampRows)
|
||||
})
|
||||
|
@ -2,6 +2,7 @@ package storage
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
@ -166,10 +167,14 @@ func (b *Block) deduplicateSamplesDuringMerge() {
|
||||
srcTimestamps := b.timestamps[b.nextIdx:]
|
||||
srcValues := b.values[b.nextIdx:]
|
||||
timestamps, values := deduplicateSamplesDuringMerge(srcTimestamps, srcValues)
|
||||
dedups := len(srcTimestamps) - len(timestamps)
|
||||
atomic.AddUint64(&dedupsDuringMerge, uint64(dedups))
|
||||
b.timestamps = b.timestamps[:b.nextIdx+len(timestamps)]
|
||||
b.values = b.values[:b.nextIdx+len(values)]
|
||||
}
|
||||
|
||||
var dedupsDuringMerge uint64
|
||||
|
||||
func (b *Block) rowsCount() int {
|
||||
if len(b.values) == 0 {
|
||||
return int(b.bh.RowsCount)
|
||||
|
@ -2,8 +2,6 @@ package storage
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// SetMinScrapeIntervalForDeduplication sets the minimum interval for data points during de-duplication.
|
||||
@ -43,23 +41,18 @@ func DeduplicateSamples(srcTimestamps []int64, srcValues []float64) ([]int64, []
|
||||
prevTimestamp := srcTimestamps[0]
|
||||
dstTimestamps := srcTimestamps[:1]
|
||||
dstValues := srcValues[:1]
|
||||
dedups := 0
|
||||
for i := 1; i < len(srcTimestamps); i++ {
|
||||
ts := srcTimestamps[i]
|
||||
if ts-prevTimestamp < minDelta {
|
||||
dedups++
|
||||
continue
|
||||
}
|
||||
dstTimestamps = append(dstTimestamps, ts)
|
||||
dstValues = append(dstValues, srcValues[i])
|
||||
prevTimestamp = ts
|
||||
}
|
||||
dedupsDuringSelect.Add(dedups)
|
||||
return dstTimestamps, dstValues
|
||||
}
|
||||
|
||||
var dedupsDuringSelect = metrics.NewCounter(`vm_deduplicated_samples_total{type="select"}`)
|
||||
|
||||
func deduplicateSamplesDuringMerge(srcTimestamps []int64, srcValues []int64) ([]int64, []int64) {
|
||||
if minScrapeInterval <= 0 {
|
||||
return srcTimestamps, srcValues
|
||||
@ -79,23 +72,18 @@ func deduplicateSamplesDuringMerge(srcTimestamps []int64, srcValues []int64) ([]
|
||||
prevTimestamp := srcTimestamps[0]
|
||||
dstTimestamps := srcTimestamps[:1]
|
||||
dstValues := srcValues[:1]
|
||||
dedups := 0
|
||||
for i := 1; i < len(srcTimestamps); i++ {
|
||||
ts := srcTimestamps[i]
|
||||
if ts-prevTimestamp < minDelta {
|
||||
dedups++
|
||||
continue
|
||||
}
|
||||
dstTimestamps = append(dstTimestamps, ts)
|
||||
dstValues = append(dstValues, srcValues[i])
|
||||
prevTimestamp = ts
|
||||
}
|
||||
dedupsDuringMerge.Add(dedups)
|
||||
return dstTimestamps, dstValues
|
||||
}
|
||||
|
||||
var dedupsDuringMerge = metrics.NewCounter(`vm_deduplicated_samples_total{type="merge"}`)
|
||||
|
||||
func needsDedup(timestamps []int64, minDelta int64) bool {
|
||||
if len(timestamps) == 0 {
|
||||
return false
|
||||
|
@ -296,6 +296,8 @@ func (s *Storage) idb() *indexDB {
|
||||
|
||||
// Metrics contains essential metrics for the Storage.
|
||||
type Metrics struct {
|
||||
DedupsDuringMerge uint64
|
||||
|
||||
TooSmallTimestampRows uint64
|
||||
TooBigTimestampRows uint64
|
||||
|
||||
@ -345,6 +347,8 @@ func (m *Metrics) Reset() {
|
||||
|
||||
// UpdateMetrics updates m with metrics from s.
|
||||
func (s *Storage) UpdateMetrics(m *Metrics) {
|
||||
m.DedupsDuringMerge = atomic.LoadUint64(&dedupsDuringMerge)
|
||||
|
||||
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
|
||||
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user