VictoriaMetrics/lib/storage/table.go

548 lines
14 KiB
Go
Raw Normal View History

2019-05-22 23:16:55 +02:00
package storage
import (
"fmt"
"path/filepath"
"strings"
2019-05-22 23:16:55 +02:00
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
2019-05-22 23:16:55 +02:00
)
// table represents a single table with time series data.
type table struct {
path string
smallPartitionsPath string
bigPartitionsPath string
s *Storage
2019-05-22 23:16:55 +02:00
ptws []*partitionWrapper
ptwsLock sync.Mutex
stop chan struct{}
retentionWatcherWG sync.WaitGroup
finalDedupWatcherWG sync.WaitGroup
2019-05-22 23:16:55 +02:00
}
// partitionWrapper provides refcounting mechanism for the partition.
type partitionWrapper struct {
// Atomic counters must be at the top of struct for proper 8-byte alignment on 32-bit archs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
2019-05-22 23:16:55 +02:00
refCount uint64
// The partition must be dropped if mustDrop > 0
mustDrop uint64
2019-10-17 19:04:26 +02:00
pt *partition
2019-05-22 23:16:55 +02:00
}
func (ptw *partitionWrapper) incRef() {
atomic.AddUint64(&ptw.refCount, 1)
}
func (ptw *partitionWrapper) decRef() {
n := atomic.AddUint64(&ptw.refCount, ^uint64(0))
if int64(n) < 0 {
logger.Panicf("BUG: pts.refCount must be positive; got %d", int64(n))
}
if n > 0 {
return
}
// refCount is zero. Close the partition.
ptw.pt.MustClose()
if atomic.LoadUint64(&ptw.mustDrop) == 0 {
ptw.pt = nil
return
}
// ptw.mustDrop > 0. Drop the partition.
ptw.pt.Drop()
ptw.pt = nil
}
func (ptw *partitionWrapper) scheduleToDrop() {
atomic.AddUint64(&ptw.mustDrop, 1)
}
// mustOpenTable opens a table on the given path.
2019-05-22 23:16:55 +02:00
//
// The table is created if it doesn't exist.
func mustOpenTable(path string, s *Storage) *table {
2019-05-22 23:16:55 +02:00
path = filepath.Clean(path)
// Create a directory for the table if it doesn't exist yet.
fs.MustMkdirIfNotExist(path)
2019-05-22 23:16:55 +02:00
// Create directories for small and big partitions if they don't exist yet.
smallPartitionsPath := filepath.Join(path, smallDirname)
fs.MustMkdirIfNotExist(smallPartitionsPath)
fs.MustRemoveTemporaryDirs(smallPartitionsPath)
smallSnapshotsPath := filepath.Join(smallPartitionsPath, snapshotsDirname)
fs.MustMkdirIfNotExist(smallSnapshotsPath)
fs.MustRemoveTemporaryDirs(smallSnapshotsPath)
bigPartitionsPath := filepath.Join(path, bigDirname)
fs.MustMkdirIfNotExist(bigPartitionsPath)
fs.MustRemoveTemporaryDirs(bigPartitionsPath)
bigSnapshotsPath := filepath.Join(bigPartitionsPath, snapshotsDirname)
fs.MustMkdirIfNotExist(bigSnapshotsPath)
fs.MustRemoveTemporaryDirs(bigSnapshotsPath)
2019-05-22 23:16:55 +02:00
// Open partitions.
pts := mustOpenPartitions(smallPartitionsPath, bigPartitionsPath, s)
2019-05-22 23:16:55 +02:00
tb := &table{
path: path,
smallPartitionsPath: smallPartitionsPath,
bigPartitionsPath: bigPartitionsPath,
s: s,
2019-05-22 23:16:55 +02:00
stop: make(chan struct{}),
}
for _, pt := range pts {
tb.addPartitionNolock(pt)
}
tb.startRetentionWatcher()
tb.startFinalDedupWatcher()
return tb
2019-05-22 23:16:55 +02:00
}
// CreateSnapshot creates tb snapshot and returns paths to small and big parts of it.
// If deadline is reached before snapshot is created error is returned.
// If any error occurs during snapshot created data is not removed.
func (tb *table) CreateSnapshot(snapshotName string, deadline uint64) (string, string, error) {
2019-05-22 23:16:55 +02:00
logger.Infof("creating table snapshot of %q...", tb.path)
startTime := time.Now()
ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws)
dstSmallDir := filepath.Join(tb.path, smallDirname, snapshotsDirname, snapshotName)
fs.MustMkdirFailIfExist(dstSmallDir)
dstBigDir := filepath.Join(tb.path, bigDirname, snapshotsDirname, snapshotName)
fs.MustMkdirFailIfExist(dstBigDir)
2019-05-22 23:16:55 +02:00
for _, ptw := range ptws {
if deadline > 0 && fasttime.UnixTimestamp() > deadline {
fs.MustRemoveAll(dstSmallDir)
fs.MustRemoveAll(dstBigDir)
return "", "", fmt.Errorf("cannot create snapshot for %q: timeout exceeded", tb.path)
}
smallPath := filepath.Join(dstSmallDir, ptw.pt.name)
bigPath := filepath.Join(dstBigDir, ptw.pt.name)
ptw.pt.MustCreateSnapshotAt(smallPath, bigPath)
2019-05-22 23:16:55 +02:00
}
fs.MustSyncPath(dstSmallDir)
fs.MustSyncPath(dstBigDir)
fs.MustSyncPath(filepath.Dir(dstSmallDir))
fs.MustSyncPath(filepath.Dir(dstBigDir))
2019-05-22 23:16:55 +02:00
logger.Infof("created table snapshot for %q at (%q, %q) in %.3f seconds", tb.path, dstSmallDir, dstBigDir, time.Since(startTime).Seconds())
2019-05-22 23:16:55 +02:00
return dstSmallDir, dstBigDir, nil
}
// MustDeleteSnapshot deletes snapshot with the given snapshotName.
func (tb *table) MustDeleteSnapshot(snapshotName string) {
smallDir := filepath.Join(tb.path, smallDirname, snapshotsDirname, snapshotName)
fs.MustRemoveDirAtomic(smallDir)
bigDir := filepath.Join(tb.path, bigDirname, snapshotsDirname, snapshotName)
fs.MustRemoveDirAtomic(bigDir)
2019-05-22 23:16:55 +02:00
}
func (tb *table) addPartitionNolock(pt *partition) {
ptw := &partitionWrapper{
pt: pt,
refCount: 1,
}
tb.ptws = append(tb.ptws, ptw)
}
// MustClose closes the table.
// It is expected that all the pending searches on the table are finished before calling MustClose.
2019-05-22 23:16:55 +02:00
func (tb *table) MustClose() {
close(tb.stop)
tb.retentionWatcherWG.Wait()
tb.finalDedupWatcherWG.Wait()
2019-05-22 23:16:55 +02:00
tb.ptwsLock.Lock()
ptws := tb.ptws
tb.ptws = nil
tb.ptwsLock.Unlock()
for _, ptw := range ptws {
if n := atomic.LoadUint64(&ptw.refCount); n != 1 {
logger.Panicf("BUG: unexpected refCount=%d when closing the partition; probably there are pending searches", n)
}
ptw.decRef()
2019-05-22 23:16:55 +02:00
}
}
// flushPendingRows flushes all the pending raw rows, so they become visible to search.
2019-05-22 23:16:55 +02:00
//
// This function is for debug purposes only.
func (tb *table) flushPendingRows() {
2019-05-22 23:16:55 +02:00
ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws)
var rows []rawRow
2019-05-22 23:16:55 +02:00
for _, ptw := range ptws {
rows = ptw.pt.flushPendingRows(rows[:0], true)
2019-05-22 23:16:55 +02:00
}
}
lib/{mergeset,storage}: make background merge more responsive and scalable - Maintain a separate worker pool per each part type (in-memory, file, big and small). Previously a shared pool was used for merging all the part types. A single merge worker could merge parts with mixed types at once. For example, it could merge simultaneously an in-memory part plus a big file part. Such a merge could take hours for big file part. During the duration of this merge the in-memory part was pinned in memory and couldn't be persisted to disk under the configured -inmemoryDataFlushInterval . Another common issue, which could happen when parts with mixed types are merged, is uncontrolled growth of in-memory parts or small parts when all the merge workers were busy with merging big files. Such growth could lead to significant performance degradataion for queries, since every query needs to check ever growing list of parts. This could also slow down the registration of new time series, since VictoriaMetrics searches for the internal series_id in the indexdb for every new time series. The third issue is graceful shutdown duration, which could be very long when a background merge is running on in-memory parts plus big file parts. This merge couldn't be interrupted, since it merges in-memory parts. A separate pool of merge workers per every part type elegantly resolves both issues: - In-memory parts are merged to file-based parts in a timely manner, since the maximum size of in-memory parts is limited. - Long-running merges for big parts do not block merges for in-memory parts and small parts. - Graceful shutdown duration is now limited by the time needed for flushing in-memory parts to files. Merging for file parts is instantly canceled on graceful shutdown now. - Deprecate -smallMergeConcurrency command-line flag, since the new background merge algorithm should automatically self-tune according to the number of available CPU cores. - Deprecate -finalMergeDelay command-line flag, since it wasn't working correctly. It is better to run forced merge when needed - https://docs.victoriametrics.com/#forced-merge - Tune the number of shards for pending rows and items before the data goes to in-memory parts and becomes visible for search. This improves the maximum data ingestion rate and the maximum rate for registration of new time series. This should reduce the duration of data ingestion slowdown in VictoriaMetrics cluster on e.g. re-routing events, when some of vmstorage nodes become temporarily unavailable. - Prevent from possible "sync: WaitGroup misuse" panic on graceful shutdown. This is a follow-up for fa566c68a6ccf7385a05f649aee7e5f5a38afb15 . Thanks @misutoth to for the inspiration at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5190 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3790 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3425 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291
2024-01-26 21:39:49 +01:00
func (tb *table) NotifyReadWriteMode() {
tb.ptwsLock.Lock()
for _, ptw := range tb.ptws {
ptw.pt.NotifyReadWriteMode()
}
tb.ptwsLock.Unlock()
}
2019-05-22 23:16:55 +02:00
// TableMetrics contains essential metrics for the table.
type TableMetrics struct {
partitionMetrics
// LastPartition contains metrics for the last partition.
// These metrics are important, since the majority of data ingestion
// and querying goes to the last partition.
LastPartition partitionMetrics
2019-05-22 23:16:55 +02:00
PartitionsRefCount uint64
}
// UpdateMetrics updates m with metrics from tb.
func (tb *table) UpdateMetrics(m *TableMetrics) {
ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws)
for _, ptw := range ptws {
2019-05-22 23:16:55 +02:00
ptw.pt.UpdateMetrics(&m.partitionMetrics)
m.PartitionsRefCount += atomic.LoadUint64(&ptw.refCount)
}
// Collect separate metrics for the last partition.
if len(ptws) > 0 {
ptwLast := ptws[0]
for _, ptw := range ptws[1:] {
if ptw.pt.tr.MinTimestamp > ptwLast.pt.tr.MinTimestamp {
ptwLast = ptw
}
}
ptwLast.pt.UpdateMetrics(&m.LastPartition)
}
2019-05-22 23:16:55 +02:00
}
// ForceMergePartitions force-merges partitions in tb with names starting from the given partitionNamePrefix.
//
// Partitions are merged sequentially in order to reduce load on the system.
func (tb *table) ForceMergePartitions(partitionNamePrefix string) error {
ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws)
for _, ptw := range ptws {
if !strings.HasPrefix(ptw.pt.name, partitionNamePrefix) {
continue
}
logger.Infof("starting forced merge for partition %q", ptw.pt.name)
startTime := time.Now()
if err := ptw.pt.ForceMergeAllParts(); err != nil {
return fmt.Errorf("cannot complete forced merge for partition %q: %w", ptw.pt.name, err)
}
logger.Infof("forced merge for partition %q has been finished in %.3f seconds", ptw.pt.name, time.Since(startTime).Seconds())
}
return nil
}
// MustAddRows adds the given rows to the table tb.
func (tb *table) MustAddRows(rows []rawRow) {
2019-05-22 23:16:55 +02:00
if len(rows) == 0 {
return
2019-05-22 23:16:55 +02:00
}
// Verify whether all the rows may be added to a single partition.
ptwsX := getPartitionWrappers()
defer putPartitionWrappers(ptwsX)
ptwsX.a = tb.GetPartitions(ptwsX.a[:0])
ptws := ptwsX.a
for i, ptw := range ptws {
2019-05-22 23:16:55 +02:00
singlePt := true
for j := range rows {
if !ptw.pt.HasTimestamp(rows[j].Timestamp) {
2019-05-22 23:16:55 +02:00
singlePt = false
break
}
}
if !singlePt {
continue
}
if i != 0 {
// Move the partition with the matching rows to the front of tb.ptws,
// so it will be detected faster next time.
tb.ptwsLock.Lock()
for j := range tb.ptws {
if ptw == tb.ptws[j] {
tb.ptws[0], tb.ptws[j] = tb.ptws[j], tb.ptws[0]
break
}
2019-05-22 23:16:55 +02:00
}
tb.ptwsLock.Unlock()
2019-05-22 23:16:55 +02:00
}
// Fast path - add all the rows into the ptw.
ptw.pt.AddRows(rows)
tb.PutPartitions(ptws)
return
2019-05-22 23:16:55 +02:00
}
// Slower path - split rows into per-partition buckets.
ptBuckets := make(map[*partitionWrapper][]rawRow)
var missingRows []rawRow
for i := range rows {
r := &rows[i]
ptFound := false
for _, ptw := range ptws {
if ptw.pt.HasTimestamp(r.Timestamp) {
ptBuckets[ptw] = append(ptBuckets[ptw], *r)
ptFound = true
break
}
}
if !ptFound {
missingRows = append(missingRows, *r)
}
}
for ptw, ptRows := range ptBuckets {
ptw.pt.AddRows(ptRows)
}
tb.PutPartitions(ptws)
if len(missingRows) == 0 {
return
2019-05-22 23:16:55 +02:00
}
// The slowest path - there are rows that don't fit any existing partition.
// Create new partitions for these rows.
// Do this under tb.ptwsLock.
minTimestamp, maxTimestamp := tb.getMinMaxTimestamps()
2019-05-22 23:16:55 +02:00
tb.ptwsLock.Lock()
for i := range missingRows {
r := &missingRows[i]
if r.Timestamp < minTimestamp || r.Timestamp > maxTimestamp {
// Silently skip row outside retention, since it should be deleted anyway.
2019-05-22 23:16:55 +02:00
continue
}
// Make sure the partition for the r hasn't been added by another goroutines.
ptFound := false
for _, ptw := range tb.ptws {
if ptw.pt.HasTimestamp(r.Timestamp) {
ptFound = true
ptw.pt.AddRows(missingRows[i : i+1])
break
}
}
if ptFound {
continue
}
pt := mustCreatePartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.s)
2019-05-22 23:16:55 +02:00
pt.AddRows(missingRows[i : i+1])
tb.addPartitionNolock(pt)
}
tb.ptwsLock.Unlock()
}
func (tb *table) getMinMaxTimestamps() (int64, int64) {
now := int64(fasttime.UnixTimestamp() * 1000)
minTimestamp := now - tb.s.retentionMsecs
maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
if minTimestamp < 0 {
// Negative timestamps aren't supported by the storage.
minTimestamp = 0
}
if maxTimestamp < 0 {
maxTimestamp = (1 << 63) - 1
}
return minTimestamp, maxTimestamp
}
2019-05-22 23:16:55 +02:00
func (tb *table) startRetentionWatcher() {
tb.retentionWatcherWG.Add(1)
go func() {
tb.retentionWatcher()
tb.retentionWatcherWG.Done()
}()
}
func (tb *table) retentionWatcher() {
d := timeutil.AddJitterToDuration(time.Minute)
ticker := time.NewTicker(d)
defer ticker.Stop()
2019-05-22 23:16:55 +02:00
for {
select {
case <-tb.stop:
return
case <-ticker.C:
2019-05-22 23:16:55 +02:00
}
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.s.retentionMsecs
2019-05-22 23:16:55 +02:00
var ptwsDrop []*partitionWrapper
tb.ptwsLock.Lock()
dst := tb.ptws[:0]
for _, ptw := range tb.ptws {
if ptw.pt.tr.MaxTimestamp < minTimestamp {
ptwsDrop = append(ptwsDrop, ptw)
} else {
dst = append(dst, ptw)
}
}
tb.ptws = dst
tb.ptwsLock.Unlock()
if len(ptwsDrop) == 0 {
continue
}
2023-02-13 13:27:13 +01:00
// There are partitions to drop. Drop them.
2019-05-22 23:16:55 +02:00
// Remove table references from partitions, so they will be eventually
// closed and dropped after all the pending searches are done.
for _, ptw := range ptwsDrop {
ptw.scheduleToDrop()
ptw.decRef()
}
}
}
func (tb *table) startFinalDedupWatcher() {
tb.finalDedupWatcherWG.Add(1)
go func() {
tb.finalDedupWatcher()
tb.finalDedupWatcherWG.Done()
}()
}
func (tb *table) finalDedupWatcher() {
if !isDedupEnabled() {
// Deduplication is disabled.
return
}
f := func() {
ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws)
timestamp := timestampFromTime(time.Now())
currentPartitionName := timestampToPartitionName(timestamp)
for _, ptw := range ptws {
if ptw.pt.name == currentPartitionName || !ptw.pt.isFinalDedupNeeded() {
// Do not run final dedup for the current month.
continue
}
if err := ptw.pt.runFinalDedup(); err != nil {
logger.Errorf("cannot run final dedup for partition %s: %s", ptw.pt.name, err)
continue
}
}
}
d := timeutil.AddJitterToDuration(time.Hour)
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-tb.stop:
return
case <-t.C:
f()
}
}
}
2019-05-22 23:16:55 +02:00
// GetPartitions appends tb's partitions snapshot to dst and returns the result.
//
// The returned partitions must be passed to PutPartitions
// when they no longer needed.
func (tb *table) GetPartitions(dst []*partitionWrapper) []*partitionWrapper {
tb.ptwsLock.Lock()
for _, ptw := range tb.ptws {
ptw.incRef()
dst = append(dst, ptw)
}
tb.ptwsLock.Unlock()
return dst
}
// PutPartitions deregisters ptws obtained via GetPartitions.
func (tb *table) PutPartitions(ptws []*partitionWrapper) {
for _, ptw := range ptws {
ptw.decRef()
}
}
func mustOpenPartitions(smallPartitionsPath, bigPartitionsPath string, s *Storage) []*partition {
// Certain partition directories in either `big` or `small` dir may be missing
// after restoring from backup. So populate partition names from both dirs.
ptNames := make(map[string]bool)
mustPopulatePartitionNames(smallPartitionsPath, ptNames)
mustPopulatePartitionNames(bigPartitionsPath, ptNames)
var pts []*partition
for ptName := range ptNames {
smallPartsPath := filepath.Join(smallPartitionsPath, ptName)
bigPartsPath := filepath.Join(bigPartitionsPath, ptName)
pt := mustOpenPartition(smallPartsPath, bigPartsPath, s)
pts = append(pts, pt)
}
return pts
}
func mustPopulatePartitionNames(partitionsPath string, ptNames map[string]bool) {
des := fs.MustReadDir(partitionsPath)
for _, de := range des {
if !fs.IsDirOrSymlink(de) {
2019-05-22 23:16:55 +02:00
// Skip non-directories
continue
}
ptName := de.Name()
if ptName == snapshotsDirname {
// Skip directory with snapshots
2019-05-22 23:16:55 +02:00
continue
}
ptNames[ptName] = true
2019-05-22 23:16:55 +02:00
}
}
type partitionWrappers struct {
a []*partitionWrapper
}
func getPartitionWrappers() *partitionWrappers {
v := ptwsPool.Get()
if v == nil {
return &partitionWrappers{}
}
return v.(*partitionWrappers)
}
func putPartitionWrappers(ptwsX *partitionWrappers) {
ptwsX.a = ptwsX.a[:0]
ptwsPool.Put(ptwsX)
}
var ptwsPool sync.Pool