2019-05-22 23:16:55 +02:00
package storage
import (
"fmt"
2020-01-30 23:54:28 +01:00
"io"
2019-06-14 06:52:32 +02:00
"io/ioutil"
2019-05-22 23:16:55 +02:00
"math"
"os"
"path/filepath"
"regexp"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
2020-05-14 21:01:51 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2020-07-22 23:58:48 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
2019-05-28 16:17:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2019-09-24 20:10:22 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
2019-08-13 20:35:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/fastcache"
)
const maxRetentionMonths = 12 * 100
// Storage represents TSDB storage.
type Storage struct {
2019-10-17 17:22:56 +02:00
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
tooSmallTimestampRows uint64
tooBigTimestampRows uint64
addRowsConcurrencyLimitReached uint64
addRowsConcurrencyLimitTimeout uint64
addRowsConcurrencyDroppedRows uint64
2020-08-05 17:24:51 +02:00
searchTSIDsConcurrencyLimitReached uint64
searchTSIDsConcurrencyLimitTimeout uint64
2020-05-15 12:44:23 +02:00
slowRowInserts uint64
slowPerDayIndexInserts uint64
2020-05-15 13:11:39 +02:00
slowMetricNameLoads uint64
2020-05-15 12:44:23 +02:00
2019-05-22 23:16:55 +02:00
path string
cachePath string
retentionMonths int
2019-05-25 20:51:11 +02:00
// lock file for exclusive access to the storage on the given path.
2019-05-22 23:16:55 +02:00
flockF * os . File
idbCurr atomic . Value
tb * table
// tsidCache is MetricName -> TSID cache.
2019-08-13 20:35:19 +02:00
tsidCache * workingsetcache . Cache
2019-05-22 23:16:55 +02:00
// metricIDCache is MetricID -> TSID cache.
2019-08-13 20:35:19 +02:00
metricIDCache * workingsetcache . Cache
2019-05-22 23:16:55 +02:00
// metricNameCache is MetricID -> MetricName cache.
2019-08-13 20:35:19 +02:00
metricNameCache * workingsetcache . Cache
2019-05-22 23:16:55 +02:00
// dateMetricIDCache is (Date, MetricID) cache.
2019-11-09 22:05:14 +01:00
dateMetricIDCache * dateMetricIDCache
2019-05-22 23:16:55 +02:00
2019-11-11 23:16:42 +01:00
// Fast cache for MetricID values occurred during the current hour.
2019-06-09 18:06:53 +02:00
currHourMetricIDs atomic . Value
2019-11-11 23:16:42 +01:00
// Fast cache for MetricID values occurred during the previous hour.
2019-06-09 18:06:53 +02:00
prevHourMetricIDs atomic . Value
2020-05-12 00:06:17 +02:00
// Fast cache for pre-populating per-day inverted index for the next day.
// This is needed in order to remove CPU usage spikes at 00:00 UTC
// due to creation of per-day inverted index for active time series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 for details.
nextDayMetricIDs atomic . Value
2019-06-09 18:06:53 +02:00
// Pending MetricID values to be added to currHourMetricIDs.
2019-11-08 18:37:16 +01:00
pendingHourEntriesLock sync . Mutex
pendingHourEntries * uint64set . Set
2019-06-02 17:34:08 +02:00
2020-05-12 00:06:17 +02:00
// Pending MetricIDs to be added to nextDayMetricIDs.
pendingNextDayMetricIDsLock sync . Mutex
pendingNextDayMetricIDs * uint64set . Set
2020-01-30 00:59:43 +01:00
// metricIDs for pre-fetched metricNames in the prefetchMetricNames function.
prefetchedMetricIDs atomic . Value
2019-06-02 17:34:08 +02:00
stop chan struct { }
2020-08-06 15:48:21 +02:00
currHourMetricIDsUpdaterWG sync . WaitGroup
nextDayMetricIDsUpdaterWG sync . WaitGroup
retentionWatcherWG sync . WaitGroup
2020-03-24 21:24:54 +01:00
// The snapshotLock prevents from concurrent creation of snapshots,
// since this may result in snapshots without recently added data,
// which may be in the process of flushing to disk by concurrently running
// snapshot process.
snapshotLock sync . Mutex
2019-05-22 23:16:55 +02:00
}
// OpenStorage opens storage on the given path with the given number of retention months.
func OpenStorage ( path string , retentionMonths int ) ( * Storage , error ) {
if retentionMonths > maxRetentionMonths {
return nil , fmt . Errorf ( "too big retentionMonths=%d; cannot exceed %d" , retentionMonths , maxRetentionMonths )
}
if retentionMonths <= 0 {
retentionMonths = maxRetentionMonths
}
path , err := filepath . Abs ( path )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot determine absolute path for %q: %w" , path , err )
2019-05-22 23:16:55 +02:00
}
s := & Storage {
path : path ,
cachePath : path + "/cache" ,
retentionMonths : retentionMonths ,
stop : make ( chan struct { } ) ,
}
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create a directory for the storage at %q: %w" , path , err )
2019-05-22 23:16:55 +02:00
}
snapshotsPath := path + "/snapshots"
if err := fs . MkdirAllIfNotExist ( snapshotsPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create %q: %w" , snapshotsPath , err )
2019-05-22 23:16:55 +02:00
}
2019-08-13 00:45:22 +02:00
// Protect from concurrent opens.
flockF , err := fs . CreateFlockFile ( path )
2019-05-22 23:16:55 +02:00
if err != nil {
2019-08-13 00:45:22 +02:00
return nil , err
2019-05-22 23:16:55 +02:00
}
s . flockF = flockF
// Load caches.
mem := memory . Allowed ( )
s . tsidCache = s . mustLoadCache ( "MetricName->TSID" , "metricName_tsid" , mem / 3 )
s . metricIDCache = s . mustLoadCache ( "MetricID->TSID" , "metricID_tsid" , mem / 16 )
s . metricNameCache = s . mustLoadCache ( "MetricID->MetricName" , "metricID_metricName" , mem / 8 )
2019-11-09 22:05:14 +01:00
s . dateMetricIDCache = newDateMetricIDCache ( )
2019-05-22 23:16:55 +02:00
2020-05-14 21:01:51 +02:00
hour := fasttime . UnixHour ( )
2019-06-14 06:52:32 +02:00
hmCurr := s . mustLoadHourMetricIDs ( hour , "curr_hour_metric_ids" )
hmPrev := s . mustLoadHourMetricIDs ( hour - 1 , "prev_hour_metric_ids" )
s . currHourMetricIDs . Store ( hmCurr )
s . prevHourMetricIDs . Store ( hmPrev )
2019-11-08 18:37:16 +01:00
s . pendingHourEntries = & uint64set . Set { }
2019-06-14 06:52:32 +02:00
2020-05-14 21:01:51 +02:00
date := fasttime . UnixDate ( )
2020-05-12 00:06:17 +02:00
nextDayMetricIDs := s . mustLoadNextDayMetricIDs ( date )
s . nextDayMetricIDs . Store ( nextDayMetricIDs )
s . pendingNextDayMetricIDs = & uint64set . Set { }
2020-01-30 00:59:43 +01:00
s . prefetchedMetricIDs . Store ( & uint64set . Set { } )
2019-05-22 23:16:55 +02:00
// Load indexdb
idbPath := path + "/indexdb"
idbSnapshotsPath := idbPath + "/snapshots"
if err := fs . MkdirAllIfNotExist ( idbSnapshotsPath ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot create %q: %w" , idbSnapshotsPath , err )
2019-05-22 23:16:55 +02:00
}
2020-07-14 13:02:14 +02:00
idbCurr , idbPrev , err := openIndexDBTables ( idbPath , s . metricIDCache , s . metricNameCache , s . tsidCache , & s . currHourMetricIDs , & s . prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot open indexdb tables at %q: %w" , idbPath , err )
2019-05-22 23:16:55 +02:00
}
idbCurr . SetExtDB ( idbPrev )
s . idbCurr . Store ( idbCurr )
// Load data
tablePath := path + "/data"
tb , err := openTable ( tablePath , retentionMonths , s . getDeletedMetricIDs )
if err != nil {
s . idb ( ) . MustClose ( )
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot open table at %q: %w" , tablePath , err )
2019-05-22 23:16:55 +02:00
}
s . tb = tb
2019-06-09 18:06:53 +02:00
s . startCurrHourMetricIDsUpdater ( )
2020-05-12 00:06:17 +02:00
s . startNextDayMetricIDsUpdater ( )
2019-05-22 23:16:55 +02:00
s . startRetentionWatcher ( )
return s , nil
}
// debugFlush flushes recently added storage data, so it becomes visible to search.
func ( s * Storage ) debugFlush ( ) {
s . tb . flushRawRows ( )
s . idb ( ) . tb . DebugFlush ( )
}
2019-09-24 20:10:22 +02:00
func ( s * Storage ) getDeletedMetricIDs ( ) * uint64set . Set {
2019-05-22 23:16:55 +02:00
return s . idb ( ) . getDeletedMetricIDs ( )
}
// CreateSnapshot creates snapshot for s and returns the snapshot name.
func ( s * Storage ) CreateSnapshot ( ) ( string , error ) {
logger . Infof ( "creating Storage snapshot for %q..." , s . path )
startTime := time . Now ( )
2020-03-24 21:24:54 +01:00
s . snapshotLock . Lock ( )
defer s . snapshotLock . Unlock ( )
2019-05-22 23:16:55 +02:00
snapshotName := fmt . Sprintf ( "%s-%08X" , time . Now ( ) . UTC ( ) . Format ( "20060102150405" ) , nextSnapshotIdx ( ) )
srcDir := s . path
dstDir := fmt . Sprintf ( "%s/snapshots/%s" , srcDir , snapshotName )
if err := fs . MkdirAllFailIfExist ( dstDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return "" , fmt . Errorf ( "cannot create dir %q: %w" , dstDir , err )
2019-05-22 23:16:55 +02:00
}
dstDataDir := dstDir + "/data"
if err := fs . MkdirAllFailIfExist ( dstDataDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return "" , fmt . Errorf ( "cannot create dir %q: %w" , dstDataDir , err )
2019-05-22 23:16:55 +02:00
}
smallDir , bigDir , err := s . tb . CreateSnapshot ( snapshotName )
if err != nil {
2020-06-30 21:58:18 +02:00
return "" , fmt . Errorf ( "cannot create table snapshot: %w" , err )
2019-05-22 23:16:55 +02:00
}
dstSmallDir := dstDataDir + "/small"
if err := fs . SymlinkRelative ( smallDir , dstSmallDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %w" , smallDir , dstSmallDir , err )
2019-05-22 23:16:55 +02:00
}
dstBigDir := dstDataDir + "/big"
if err := fs . SymlinkRelative ( bigDir , dstBigDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %w" , bigDir , dstBigDir , err )
2019-05-22 23:16:55 +02:00
}
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( dstDataDir )
2019-05-22 23:16:55 +02:00
idbSnapshot := fmt . Sprintf ( "%s/indexdb/snapshots/%s" , s . path , snapshotName )
idb := s . idb ( )
currSnapshot := idbSnapshot + "/" + idb . name
if err := idb . tb . CreateSnapshotAt ( currSnapshot ) ; err != nil {
2020-06-30 21:58:18 +02:00
return "" , fmt . Errorf ( "cannot create curr indexDB snapshot: %w" , err )
2019-05-22 23:16:55 +02:00
}
ok := idb . doExtDB ( func ( extDB * indexDB ) {
prevSnapshot := idbSnapshot + "/" + extDB . name
err = extDB . tb . CreateSnapshotAt ( prevSnapshot )
} )
if ok && err != nil {
2020-06-30 21:58:18 +02:00
return "" , fmt . Errorf ( "cannot create prev indexDB snapshot: %w" , err )
2019-05-22 23:16:55 +02:00
}
dstIdbDir := dstDir + "/indexdb"
if err := fs . SymlinkRelative ( idbSnapshot , dstIdbDir ) ; err != nil {
2020-06-30 21:58:18 +02:00
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %w" , idbSnapshot , dstIdbDir , err )
2019-05-22 23:16:55 +02:00
}
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( dstDir )
fs . MustSyncPath ( srcDir + "/snapshots" )
2019-05-22 23:16:55 +02:00
2020-01-22 17:27:44 +01:00
logger . Infof ( "created Storage snapshot for %q at %q in %.3f seconds" , srcDir , dstDir , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 23:16:55 +02:00
return snapshotName , nil
}
var snapshotNameRegexp = regexp . MustCompile ( "^[0-9]{14}-[0-9A-Fa-f]+$" )
// ListSnapshots returns sorted list of existing snapshots for s.
func ( s * Storage ) ListSnapshots ( ) ( [ ] string , error ) {
snapshotsPath := s . path + "/snapshots"
d , err := os . Open ( snapshotsPath )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot open %q: %w" , snapshotsPath , err )
2019-05-22 23:16:55 +02:00
}
defer fs . MustClose ( d )
fnames , err := d . Readdirnames ( - 1 )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot read contents of %q: %w" , snapshotsPath , err )
2019-05-22 23:16:55 +02:00
}
snapshotNames := make ( [ ] string , 0 , len ( fnames ) )
for _ , fname := range fnames {
if ! snapshotNameRegexp . MatchString ( fname ) {
continue
}
snapshotNames = append ( snapshotNames , fname )
}
sort . Strings ( snapshotNames )
return snapshotNames , nil
}
// DeleteSnapshot deletes the given snapshot.
func ( s * Storage ) DeleteSnapshot ( snapshotName string ) error {
if ! snapshotNameRegexp . MatchString ( snapshotName ) {
return fmt . Errorf ( "invalid snapshotName %q" , snapshotName )
}
snapshotPath := s . path + "/snapshots/" + snapshotName
logger . Infof ( "deleting snapshot %q..." , snapshotPath )
startTime := time . Now ( )
s . tb . MustDeleteSnapshot ( snapshotName )
idbPath := fmt . Sprintf ( "%s/indexdb/snapshots/%s" , s . path , snapshotName )
2019-06-12 00:53:43 +02:00
fs . MustRemoveAll ( idbPath )
fs . MustRemoveAll ( snapshotPath )
2019-05-22 23:16:55 +02:00
2020-01-22 17:27:44 +01:00
logger . Infof ( "deleted snapshot %q in %.3f seconds" , snapshotPath , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 23:16:55 +02:00
return nil
}
var snapshotIdx = uint64 ( time . Now ( ) . UnixNano ( ) )
func nextSnapshotIdx ( ) uint64 {
return atomic . AddUint64 ( & snapshotIdx , 1 )
}
func ( s * Storage ) idb ( ) * indexDB {
return s . idbCurr . Load ( ) . ( * indexDB )
}
// Metrics contains essential metrics for the Storage.
type Metrics struct {
2020-02-27 22:47:05 +01:00
DedupsDuringMerge uint64
2019-07-26 13:10:25 +02:00
TooSmallTimestampRows uint64
TooBigTimestampRows uint64
2019-08-06 13:09:17 +02:00
AddRowsConcurrencyLimitReached uint64
AddRowsConcurrencyLimitTimeout uint64
AddRowsConcurrencyDroppedRows uint64
AddRowsConcurrencyCapacity uint64
AddRowsConcurrencyCurrent uint64
2020-08-05 17:24:51 +02:00
SearchTSIDsConcurrencyLimitReached uint64
SearchTSIDsConcurrencyLimitTimeout uint64
SearchTSIDsConcurrencyCapacity uint64
SearchTSIDsConcurrencyCurrent uint64
2020-07-05 18:37:38 +02:00
SearchDelays uint64
2020-05-15 12:44:23 +02:00
SlowRowInserts uint64
SlowPerDayIndexInserts uint64
2020-05-15 13:11:39 +02:00
SlowMetricNameLoads uint64
2020-05-15 12:44:23 +02:00
2019-05-22 23:16:55 +02:00
TSIDCacheSize uint64
2019-07-09 23:47:29 +02:00
TSIDCacheSizeBytes uint64
2019-05-22 23:16:55 +02:00
TSIDCacheRequests uint64
TSIDCacheMisses uint64
TSIDCacheCollisions uint64
MetricIDCacheSize uint64
2019-07-09 23:47:29 +02:00
MetricIDCacheSizeBytes uint64
2019-05-22 23:16:55 +02:00
MetricIDCacheRequests uint64
MetricIDCacheMisses uint64
MetricIDCacheCollisions uint64
MetricNameCacheSize uint64
2019-07-09 23:47:29 +02:00
MetricNameCacheSizeBytes uint64
2019-05-22 23:16:55 +02:00
MetricNameCacheRequests uint64
MetricNameCacheMisses uint64
MetricNameCacheCollisions uint64
2019-11-11 12:21:05 +01:00
DateMetricIDCacheSize uint64
2019-11-13 16:58:05 +01:00
DateMetricIDCacheSizeBytes uint64
2019-11-11 12:21:05 +01:00
DateMetricIDCacheSyncsCount uint64
DateMetricIDCacheResetsCount uint64
2019-05-22 23:16:55 +02:00
2019-11-13 18:00:02 +01:00
HourMetricIDCacheSize uint64
HourMetricIDCacheSizeBytes uint64
2019-06-19 17:36:47 +02:00
2020-05-12 00:06:17 +02:00
NextDayMetricIDCacheSize uint64
NextDayMetricIDCacheSizeBytes uint64
2020-01-30 00:59:43 +01:00
PrefetchedMetricIDsSize uint64
PrefetchedMetricIDsSizeBytes uint64
2019-05-22 23:16:55 +02:00
IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics
}
// Reset resets m.
func ( m * Metrics ) Reset ( ) {
* m = Metrics { }
}
// UpdateMetrics updates m with metrics from s.
func ( s * Storage ) UpdateMetrics ( m * Metrics ) {
2020-02-27 22:47:05 +01:00
m . DedupsDuringMerge = atomic . LoadUint64 ( & dedupsDuringMerge )
2019-07-26 13:10:25 +02:00
m . TooSmallTimestampRows += atomic . LoadUint64 ( & s . tooSmallTimestampRows )
m . TooBigTimestampRows += atomic . LoadUint64 ( & s . tooBigTimestampRows )
2019-08-06 13:09:17 +02:00
m . AddRowsConcurrencyLimitReached += atomic . LoadUint64 ( & s . addRowsConcurrencyLimitReached )
m . AddRowsConcurrencyLimitTimeout += atomic . LoadUint64 ( & s . addRowsConcurrencyLimitTimeout )
m . AddRowsConcurrencyDroppedRows += atomic . LoadUint64 ( & s . addRowsConcurrencyDroppedRows )
m . AddRowsConcurrencyCapacity = uint64 ( cap ( addRowsConcurrencyCh ) )
m . AddRowsConcurrencyCurrent = uint64 ( len ( addRowsConcurrencyCh ) )
2020-08-05 17:24:51 +02:00
m . SearchTSIDsConcurrencyLimitReached += atomic . LoadUint64 ( & s . searchTSIDsConcurrencyLimitReached )
m . SearchTSIDsConcurrencyLimitTimeout += atomic . LoadUint64 ( & s . searchTSIDsConcurrencyLimitTimeout )
m . SearchTSIDsConcurrencyCapacity = uint64 ( cap ( searchTSIDsConcurrencyCh ) )
m . SearchTSIDsConcurrencyCurrent = uint64 ( len ( searchTSIDsConcurrencyCh ) )
2020-07-22 23:58:48 +02:00
m . SearchDelays = storagepacelimiter . Search . DelaysTotal ( )
2020-07-05 18:37:38 +02:00
2020-05-15 12:44:23 +02:00
m . SlowRowInserts += atomic . LoadUint64 ( & s . slowRowInserts )
m . SlowPerDayIndexInserts += atomic . LoadUint64 ( & s . slowPerDayIndexInserts )
2020-05-15 13:11:39 +02:00
m . SlowMetricNameLoads += atomic . LoadUint64 ( & s . slowMetricNameLoads )
2020-05-15 12:44:23 +02:00
2019-05-22 23:16:55 +02:00
var cs fastcache . Stats
s . tsidCache . UpdateStats ( & cs )
m . TSIDCacheSize += cs . EntriesCount
2019-07-09 23:47:29 +02:00
m . TSIDCacheSizeBytes += cs . BytesSize
2019-05-22 23:16:55 +02:00
m . TSIDCacheRequests += cs . GetCalls
m . TSIDCacheMisses += cs . Misses
m . TSIDCacheCollisions += cs . Collisions
cs . Reset ( )
s . metricIDCache . UpdateStats ( & cs )
m . MetricIDCacheSize += cs . EntriesCount
2019-07-09 23:47:29 +02:00
m . MetricIDCacheSizeBytes += cs . BytesSize
2019-05-22 23:16:55 +02:00
m . MetricIDCacheRequests += cs . GetCalls
m . MetricIDCacheMisses += cs . Misses
m . MetricIDCacheCollisions += cs . Collisions
cs . Reset ( )
s . metricNameCache . UpdateStats ( & cs )
m . MetricNameCacheSize += cs . EntriesCount
2019-07-09 23:47:29 +02:00
m . MetricNameCacheSizeBytes += cs . BytesSize
2019-05-22 23:16:55 +02:00
m . MetricNameCacheRequests += cs . GetCalls
m . MetricNameCacheMisses += cs . Misses
m . MetricNameCacheCollisions += cs . Collisions
2019-11-09 22:05:14 +01:00
m . DateMetricIDCacheSize += uint64 ( s . dateMetricIDCache . EntriesCount ( ) )
2019-11-13 16:58:05 +01:00
m . DateMetricIDCacheSizeBytes += uint64 ( s . dateMetricIDCache . SizeBytes ( ) )
2019-11-11 12:21:05 +01:00
m . DateMetricIDCacheSyncsCount += atomic . LoadUint64 ( & s . dateMetricIDCache . syncsCount )
m . DateMetricIDCacheResetsCount += atomic . LoadUint64 ( & s . dateMetricIDCache . resetsCount )
2019-05-22 23:16:55 +02:00
2019-06-19 17:36:47 +02:00
hmCurr := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
hmPrev := s . prevHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2019-09-24 20:10:22 +02:00
hourMetricIDsLen := hmPrev . m . Len ( )
if hmCurr . m . Len ( ) > hourMetricIDsLen {
hourMetricIDsLen = hmCurr . m . Len ( )
2019-06-19 17:36:47 +02:00
}
m . HourMetricIDCacheSize += uint64 ( hourMetricIDsLen )
2019-11-13 18:00:02 +01:00
m . HourMetricIDCacheSizeBytes += hmCurr . m . SizeBytes ( )
m . HourMetricIDCacheSizeBytes += hmPrev . m . SizeBytes ( )
2019-06-19 17:36:47 +02:00
2020-05-12 00:06:17 +02:00
nextDayMetricIDs := & s . nextDayMetricIDs . Load ( ) . ( * byDateMetricIDEntry ) . v
m . NextDayMetricIDCacheSize += uint64 ( nextDayMetricIDs . Len ( ) )
m . NextDayMetricIDCacheSizeBytes += nextDayMetricIDs . SizeBytes ( )
2020-01-30 00:59:43 +01:00
prefetchedMetricIDs := s . prefetchedMetricIDs . Load ( ) . ( * uint64set . Set )
m . PrefetchedMetricIDsSize += uint64 ( prefetchedMetricIDs . Len ( ) )
m . PrefetchedMetricIDsSizeBytes += uint64 ( prefetchedMetricIDs . SizeBytes ( ) )
2019-05-22 23:16:55 +02:00
s . idb ( ) . UpdateMetrics ( & m . IndexDBMetrics )
s . tb . UpdateMetrics ( & m . TableMetrics )
}
func ( s * Storage ) startRetentionWatcher ( ) {
s . retentionWatcherWG . Add ( 1 )
go func ( ) {
s . retentionWatcher ( )
s . retentionWatcherWG . Done ( )
} ( )
}
func ( s * Storage ) retentionWatcher ( ) {
for {
d := nextRetentionDuration ( s . retentionMonths )
select {
case <- s . stop :
return
case <- time . After ( d ) :
s . mustRotateIndexDB ( )
}
}
}
2019-06-09 18:06:53 +02:00
func ( s * Storage ) startCurrHourMetricIDsUpdater ( ) {
s . currHourMetricIDsUpdaterWG . Add ( 1 )
2019-06-02 17:34:08 +02:00
go func ( ) {
2019-06-09 18:06:53 +02:00
s . currHourMetricIDsUpdater ( )
s . currHourMetricIDsUpdaterWG . Done ( )
2019-06-02 17:34:08 +02:00
} ( )
}
2020-05-12 00:06:17 +02:00
func ( s * Storage ) startNextDayMetricIDsUpdater ( ) {
s . nextDayMetricIDsUpdaterWG . Add ( 1 )
go func ( ) {
s . nextDayMetricIDsUpdater ( )
s . nextDayMetricIDsUpdaterWG . Done ( )
} ( )
}
2019-06-09 18:06:53 +02:00
var currHourMetricIDsUpdateInterval = time . Second * 10
2019-06-02 20:58:14 +02:00
2019-06-09 18:06:53 +02:00
func ( s * Storage ) currHourMetricIDsUpdater ( ) {
2020-02-13 11:55:58 +01:00
ticker := time . NewTicker ( currHourMetricIDsUpdateInterval )
defer ticker . Stop ( )
2019-06-02 17:34:08 +02:00
for {
select {
case <- s . stop :
2019-11-08 12:16:40 +01:00
s . updateCurrHourMetricIDs ( )
2019-06-02 17:34:08 +02:00
return
2020-02-13 11:55:58 +01:00
case <- ticker . C :
2019-06-09 18:06:53 +02:00
s . updateCurrHourMetricIDs ( )
2019-06-02 17:34:08 +02:00
}
}
}
2020-05-12 00:06:17 +02:00
var nextDayMetricIDsUpdateInterval = time . Second * 11
func ( s * Storage ) nextDayMetricIDsUpdater ( ) {
ticker := time . NewTicker ( nextDayMetricIDsUpdateInterval )
defer ticker . Stop ( )
for {
select {
case <- s . stop :
s . updateNextDayMetricIDs ( )
return
case <- ticker . C :
s . updateNextDayMetricIDs ( )
}
}
}
2019-05-22 23:16:55 +02:00
func ( s * Storage ) mustRotateIndexDB ( ) {
// Create new indexdb table.
newTableName := nextIndexDBTableName ( )
idbNewPath := s . path + "/indexdb/" + newTableName
2020-07-14 13:02:14 +02:00
idbNew , err := openIndexDB ( idbNewPath , s . metricIDCache , s . metricNameCache , s . tsidCache , & s . currHourMetricIDs , & s . prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
logger . Panicf ( "FATAL: cannot create new indexDB at %q: %s" , idbNewPath , err )
}
// Drop extDB
idbCurr := s . idb ( )
idbCurr . doExtDB ( func ( extDB * indexDB ) {
extDB . scheduleToDrop ( )
} )
idbCurr . SetExtDB ( nil )
// Start using idbNew
idbNew . SetExtDB ( idbCurr )
s . idbCurr . Store ( idbNew )
// Persist changes on the file system.
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( s . path )
2019-05-22 23:16:55 +02:00
// Flush tsidCache, so idbNew can be populated with fresh data.
s . tsidCache . Reset ( )
// Flush dateMetricIDCache, so idbNew can be populated with fresh data.
s . dateMetricIDCache . Reset ( )
// Do not flush metricIDCache and metricNameCache, since all the metricIDs
// from prev idb remain valid after the rotation.
2020-05-14 22:45:04 +02:00
// There is no need in resetting nextDayMetricIDs, since it should be automatically reset every day.
2019-05-22 23:16:55 +02:00
}
// MustClose closes the storage.
func ( s * Storage ) MustClose ( ) {
close ( s . stop )
s . retentionWatcherWG . Wait ( )
2019-06-09 18:06:53 +02:00
s . currHourMetricIDsUpdaterWG . Wait ( )
2020-05-12 00:06:17 +02:00
s . nextDayMetricIDsUpdaterWG . Wait ( )
2019-05-22 23:16:55 +02:00
s . tb . MustClose ( )
s . idb ( ) . MustClose ( )
// Save caches.
2019-08-13 20:35:19 +02:00
s . mustSaveAndStopCache ( s . tsidCache , "MetricName->TSID" , "metricName_tsid" )
s . mustSaveAndStopCache ( s . metricIDCache , "MetricID->TSID" , "metricID_tsid" )
s . mustSaveAndStopCache ( s . metricNameCache , "MetricID->MetricName" , "metricID_metricName" )
2019-05-22 23:16:55 +02:00
2019-06-14 06:52:32 +02:00
hmCurr := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
s . mustSaveHourMetricIDs ( hmCurr , "curr_hour_metric_ids" )
hmPrev := s . prevHourMetricIDs . Load ( ) . ( * hourMetricIDs )
s . mustSaveHourMetricIDs ( hmPrev , "prev_hour_metric_ids" )
2020-05-12 00:06:17 +02:00
nextDayMetricIDs := s . nextDayMetricIDs . Load ( ) . ( * byDateMetricIDEntry )
s . mustSaveNextDayMetricIDs ( nextDayMetricIDs )
2019-05-22 23:16:55 +02:00
// Release lock file.
if err := s . flockF . Close ( ) ; err != nil {
logger . Panicf ( "FATAL: cannot close lock file %q: %s" , s . flockF . Name ( ) , err )
}
}
2020-05-12 00:06:17 +02:00
func ( s * Storage ) mustLoadNextDayMetricIDs ( date uint64 ) * byDateMetricIDEntry {
e := & byDateMetricIDEntry {
date : date ,
}
name := "next_day_metric_ids"
path := s . cachePath + "/" + name
logger . Infof ( "loading %s from %q..." , name , path )
startTime := time . Now ( )
if ! fs . IsPathExist ( path ) {
logger . Infof ( "nothing to load from %q" , path )
return e
}
src , err := ioutil . ReadFile ( path )
if err != nil {
logger . Panicf ( "FATAL: cannot read %s: %s" , path , err )
}
srcOrigLen := len ( src )
if len ( src ) < 16 {
logger . Errorf ( "discarding %s, since it has broken header; got %d bytes; want %d bytes" , path , len ( src ) , 16 )
return e
}
// Unmarshal header
dateLoaded := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
if dateLoaded != date {
logger . Infof ( "discarding %s, since it contains data for stale date; got %d; want %d" , path , dateLoaded , date )
return e
}
// Unmarshal uint64set
m , tail , err := unmarshalUint64Set ( src )
if err != nil {
logger . Infof ( "discarding %s because cannot load uint64set: %s" , path , err )
return e
}
if len ( tail ) > 0 {
logger . Infof ( "discarding %s because non-empty tail left; len(tail)=%d" , path , len ( tail ) )
return e
}
e . v = * m
logger . Infof ( "loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) . Seconds ( ) , m . Len ( ) , srcOrigLen )
return e
}
2019-06-14 06:52:32 +02:00
func ( s * Storage ) mustLoadHourMetricIDs ( hour uint64 , name string ) * hourMetricIDs {
2020-05-12 00:06:17 +02:00
hm := & hourMetricIDs {
hour : hour ,
}
2019-06-14 06:52:32 +02:00
path := s . cachePath + "/" + name
logger . Infof ( "loading %s from %q..." , name , path )
startTime := time . Now ( )
if ! fs . IsPathExist ( path ) {
logger . Infof ( "nothing to load from %q" , path )
2020-05-12 00:06:17 +02:00
return hm
2019-06-14 06:52:32 +02:00
}
src , err := ioutil . ReadFile ( path )
if err != nil {
logger . Panicf ( "FATAL: cannot read %s: %s" , path , err )
}
srcOrigLen := len ( src )
if len ( src ) < 24 {
logger . Errorf ( "discarding %s, since it has broken header; got %d bytes; want %d bytes" , path , len ( src ) , 24 )
2020-05-12 00:06:17 +02:00
return hm
2019-06-14 06:52:32 +02:00
}
2019-11-08 18:37:16 +01:00
// Unmarshal header
2019-06-14 06:52:32 +02:00
isFull := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
hourLoaded := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
if hourLoaded != hour {
2020-05-12 00:06:17 +02:00
logger . Infof ( "discarding %s, since it contains outdated hour; got %d; want %d" , path , hourLoaded , hour )
return hm
2019-06-14 06:52:32 +02:00
}
2019-11-08 18:37:16 +01:00
2020-05-12 00:06:17 +02:00
// Unmarshal uint64set
m , tail , err := unmarshalUint64Set ( src )
if err != nil {
logger . Infof ( "discarding %s because cannot load uint64set: %s" , path , err )
return hm
2019-06-14 06:52:32 +02:00
}
2020-05-12 00:06:17 +02:00
if len ( tail ) > 0 {
logger . Infof ( "discarding %s because non-empty tail left; len(tail)=%d" , path , len ( tail ) )
return hm
2019-06-14 06:52:32 +02:00
}
2020-05-12 00:06:17 +02:00
hm . m = m
hm . isFull = isFull != 0
logger . Infof ( "loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) . Seconds ( ) , m . Len ( ) , srcOrigLen )
return hm
}
2019-11-13 12:11:02 +01:00
2020-05-12 00:06:17 +02:00
func ( s * Storage ) mustSaveNextDayMetricIDs ( e * byDateMetricIDEntry ) {
name := "next_day_metric_ids"
path := s . cachePath + "/" + name
logger . Infof ( "saving %s to %q..." , name , path )
startTime := time . Now ( )
dst := make ( [ ] byte , 0 , e . v . Len ( ) * 8 + 16 )
// Marshal header
dst = encoding . MarshalUint64 ( dst , e . date )
// Marshal e.v
dst = marshalUint64Set ( dst , & e . v )
if err := ioutil . WriteFile ( path , dst , 0644 ) ; err != nil {
logger . Panicf ( "FATAL: cannot write %d bytes to %q: %s" , len ( dst ) , path , err )
2019-06-14 06:52:32 +02:00
}
2020-05-12 00:06:17 +02:00
logger . Infof ( "saved %s to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) . Seconds ( ) , e . v . Len ( ) , len ( dst ) )
2019-06-14 06:52:32 +02:00
}
func ( s * Storage ) mustSaveHourMetricIDs ( hm * hourMetricIDs , name string ) {
path := s . cachePath + "/" + name
logger . Infof ( "saving %s to %q..." , name , path )
startTime := time . Now ( )
2019-09-24 20:10:22 +02:00
dst := make ( [ ] byte , 0 , hm . m . Len ( ) * 8 + 24 )
2019-06-14 06:52:32 +02:00
isFull := uint64 ( 0 )
if hm . isFull {
isFull = 1
}
2019-11-08 18:37:16 +01:00
// Marshal header
2019-06-14 06:52:32 +02:00
dst = encoding . MarshalUint64 ( dst , isFull )
dst = encoding . MarshalUint64 ( dst , hm . hour )
2019-11-08 18:37:16 +01:00
// Marshal hm.m
2020-05-12 00:06:17 +02:00
dst = marshalUint64Set ( dst , hm . m )
2019-11-13 12:11:02 +01:00
2019-06-14 06:52:32 +02:00
if err := ioutil . WriteFile ( path , dst , 0644 ) ; err != nil {
logger . Panicf ( "FATAL: cannot write %d bytes to %q: %s" , len ( dst ) , path , err )
}
2020-01-22 17:27:44 +01:00
logger . Infof ( "saved %s to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) . Seconds ( ) , hm . m . Len ( ) , len ( dst ) )
2019-06-14 06:52:32 +02:00
}
2020-05-12 00:06:17 +02:00
func unmarshalUint64Set ( src [ ] byte ) ( * uint64set . Set , [ ] byte , error ) {
mLen := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
if uint64 ( len ( src ) ) < 8 * mLen {
return nil , nil , fmt . Errorf ( "cannot unmarshal uint64set; got %d bytes; want at least %d bytes" , len ( src ) , 8 * mLen )
}
m := & uint64set . Set { }
for i := uint64 ( 0 ) ; i < mLen ; i ++ {
metricID := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
m . Add ( metricID )
}
return m , src , nil
}
func marshalUint64Set ( dst [ ] byte , m * uint64set . Set ) [ ] byte {
dst = encoding . MarshalUint64 ( dst , uint64 ( m . Len ( ) ) )
m . ForEach ( func ( part [ ] uint64 ) bool {
for _ , metricID := range part {
dst = encoding . MarshalUint64 ( dst , metricID )
}
return true
} )
return dst
}
2019-08-13 20:35:19 +02:00
func ( s * Storage ) mustLoadCache ( info , name string , sizeBytes int ) * workingsetcache . Cache {
2019-05-22 23:16:55 +02:00
path := s . cachePath + "/" + name
logger . Infof ( "loading %s cache from %q..." , info , path )
startTime := time . Now ( )
2019-08-13 20:35:19 +02:00
c := workingsetcache . Load ( path , sizeBytes , time . Hour )
2019-05-22 23:16:55 +02:00
var cs fastcache . Stats
c . UpdateStats ( & cs )
2020-01-22 17:27:44 +01:00
logger . Infof ( "loaded %s cache from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" ,
info , path , time . Since ( startTime ) . Seconds ( ) , cs . EntriesCount , cs . BytesSize )
2019-05-22 23:16:55 +02:00
return c
}
2019-08-13 20:35:19 +02:00
func ( s * Storage ) mustSaveAndStopCache ( c * workingsetcache . Cache , info , name string ) {
2019-05-22 23:16:55 +02:00
path := s . cachePath + "/" + name
logger . Infof ( "saving %s cache to %q..." , info , path )
startTime := time . Now ( )
2019-08-13 20:35:19 +02:00
if err := c . Save ( path ) ; err != nil {
2019-05-22 23:16:55 +02:00
logger . Panicf ( "FATAL: cannot save %s cache to %q: %s" , info , path , err )
}
var cs fastcache . Stats
c . UpdateStats ( & cs )
2019-08-13 20:35:19 +02:00
c . Stop ( )
2020-01-22 17:27:44 +01:00
logger . Infof ( "saved %s cache to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" ,
info , path , time . Since ( startTime ) . Seconds ( ) , cs . EntriesCount , cs . BytesSize )
2019-05-22 23:16:55 +02:00
}
func nextRetentionDuration ( retentionMonths int ) time . Duration {
t := time . Now ( ) . UTC ( )
n := t . Year ( ) * 12 + int ( t . Month ( ) ) - 1 + retentionMonths
n -= n % retentionMonths
y := n / 12
m := time . Month ( ( n % 12 ) + 1 )
2019-12-02 13:42:26 +01:00
// Schedule the deadline to +4 hours from the next retention period start.
// This should prevent from possible double deletion of indexdb
// due to time drift - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/248 .
deadline := time . Date ( y , m , 1 , 4 , 0 , 0 , 0 , time . UTC )
2019-05-22 23:16:55 +02:00
return deadline . Sub ( t )
}
2019-09-23 21:34:04 +02:00
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
2020-07-23 19:42:57 +02:00
func ( s * Storage ) searchTSIDs ( tfss [ ] * TagFilters , tr TimeRange , maxMetrics int , deadline uint64 ) ( [ ] TSID , error ) {
2019-05-22 23:16:55 +02:00
// Do not cache tfss -> tsids here, since the caching is performed
// on idb level.
2020-08-05 17:24:51 +02:00
// Limit the number of concurrent goroutines that may search TSIDS in the storage.
// This should prevent from out of memory errors and CPU trashing when too many
// goroutines call searchTSIDs.
select {
case searchTSIDsConcurrencyCh <- struct { } { } :
default :
// Sleep for a while until giving up
atomic . AddUint64 ( & s . searchTSIDsConcurrencyLimitReached , 1 )
currentTime := fasttime . UnixTimestamp ( )
timeoutSecs := uint64 ( 0 )
if currentTime < deadline {
timeoutSecs = deadline - currentTime
}
timeout := time . Second * time . Duration ( timeoutSecs )
t := timerpool . Get ( timeout )
select {
case searchTSIDsConcurrencyCh <- struct { } { } :
timerpool . Put ( t )
case <- t . C :
timerpool . Put ( t )
atomic . AddUint64 ( & s . searchTSIDsConcurrencyLimitTimeout , 1 )
return nil , fmt . Errorf ( "cannot search for tsids, since more than %d concurrent searches are performed during %.3f secs; add more CPUs or reduce query load" ,
cap ( searchTSIDsConcurrencyCh ) , timeout . Seconds ( ) )
}
}
2020-07-23 19:42:57 +02:00
tsids , err := s . idb ( ) . searchTSIDs ( tfss , tr , maxMetrics , deadline )
2020-08-05 17:24:51 +02:00
<- searchTSIDsConcurrencyCh
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "error when searching tsids for tfss %q: %w" , tfss , err )
2019-05-22 23:16:55 +02:00
}
return tsids , nil
}
2020-08-05 17:24:51 +02:00
var (
// Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation
// is CPU bound and sometimes disk IO bound, so there is no sense in running more
// than GOMAXPROCS*2 concurrent goroutines for TSID searches.
searchTSIDsConcurrencyCh = make ( chan struct { } , runtime . GOMAXPROCS ( - 1 ) * 2 )
)
2020-01-30 00:59:43 +01:00
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
//
// This should speed-up further searchMetricName calls for metricIDs from tsids.
2020-07-23 19:42:57 +02:00
func ( s * Storage ) prefetchMetricNames ( tsids [ ] TSID , deadline uint64 ) error {
2020-07-23 18:21:49 +02:00
if len ( tsids ) == 0 {
return nil
}
2020-01-30 00:59:43 +01:00
var metricIDs uint64Sorter
prefetchedMetricIDs := s . prefetchedMetricIDs . Load ( ) . ( * uint64set . Set )
for i := range tsids {
2020-07-23 18:21:49 +02:00
tsid := & tsids [ i ]
metricID := tsid . MetricID
2020-01-30 00:59:43 +01:00
if prefetchedMetricIDs . Has ( metricID ) {
continue
}
metricIDs = append ( metricIDs , metricID )
}
if len ( metricIDs ) < 500 {
// It is cheaper to skip pre-fetching and obtain metricNames inline.
return nil
}
2020-05-16 09:21:17 +02:00
atomic . AddUint64 ( & s . slowMetricNameLoads , uint64 ( len ( metricIDs ) ) )
2020-01-30 00:59:43 +01:00
// Pre-fetch metricIDs.
sort . Sort ( metricIDs )
var metricName [ ] byte
var err error
2020-01-30 23:54:28 +01:00
idb := s . idb ( )
2020-07-23 19:42:57 +02:00
is := idb . getIndexSearch ( deadline )
2020-01-30 23:54:28 +01:00
defer idb . putIndexSearch ( is )
2020-07-23 18:21:49 +02:00
for loops , metricID := range metricIDs {
2020-08-07 07:37:33 +02:00
if loops & paceLimiterSlowIterationsMask == 0 {
2020-07-23 19:42:57 +02:00
if err := checkSearchDeadlineAndPace ( is . deadline ) ; err != nil {
return err
}
2020-07-23 18:21:49 +02:00
}
2020-01-30 23:54:28 +01:00
metricName , err = is . searchMetricName ( metricName [ : 0 ] , metricID )
if err != nil && err != io . EOF {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "error in pre-fetching metricName for metricID=%d: %w" , metricID , err )
2020-01-30 00:59:43 +01:00
}
}
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
2020-08-06 15:48:21 +02:00
2020-01-30 00:59:43 +01:00
prefetchedMetricIDsNew := prefetchedMetricIDs . Clone ( )
2020-07-21 19:56:49 +02:00
prefetchedMetricIDsNew . AddMulti ( metricIDs )
2020-08-06 15:48:21 +02:00
if prefetchedMetricIDsNew . SizeBytes ( ) > uint64 ( memory . Allowed ( ) ) / 32 {
// Reset prefetchedMetricIDsNew if it occupies too much space.
prefetchedMetricIDsNew = & uint64set . Set { }
}
2020-01-30 00:59:43 +01:00
s . prefetchedMetricIDs . Store ( prefetchedMetricIDsNew )
return nil
}
2020-08-10 12:17:12 +02:00
// ErrDeadlineExceeded is returned when the request times out.
var ErrDeadlineExceeded = fmt . Errorf ( "deadline exceeded" )
2020-07-23 19:42:57 +02:00
2019-05-22 23:16:55 +02:00
// DeleteMetrics deletes all the metrics matching the given tfss.
//
// Returns the number of metrics deleted.
func ( s * Storage ) DeleteMetrics ( tfss [ ] * TagFilters ) ( int , error ) {
deletedCount , err := s . idb ( ) . DeleteTSIDs ( tfss )
if err != nil {
2020-06-30 21:58:18 +02:00
return deletedCount , fmt . Errorf ( "cannot delete tsids: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-07-14 13:02:14 +02:00
// Do not reset MetricName->TSID cache in order to prevent from adding new data points
// to deleted time series in Storage.add, since it is already reset inside DeleteTSIDs.
2020-07-06 20:56:14 +02:00
2020-07-14 13:02:14 +02:00
// Do not reset MetricID->MetricName cache, since it must be used only
2019-05-22 23:16:55 +02:00
// after filtering out deleted metricIDs.
2020-07-06 20:56:14 +02:00
2019-05-22 23:16:55 +02:00
return deletedCount , nil
}
// searchMetricName appends metric name for the given metricID to dst
// and returns the result.
func ( s * Storage ) searchMetricName ( dst [ ] byte , metricID uint64 ) ( [ ] byte , error ) {
return s . idb ( ) . searchMetricName ( dst , metricID )
}
// SearchTagKeys searches for tag keys
2020-07-23 19:42:57 +02:00
func ( s * Storage ) SearchTagKeys ( maxTagKeys int , deadline uint64 ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagKeys ( maxTagKeys , deadline )
2019-05-22 23:16:55 +02:00
}
// SearchTagValues searches for tag values for the given tagKey
2020-07-23 19:42:57 +02:00
func ( s * Storage ) SearchTagValues ( tagKey [ ] byte , maxTagValues int , deadline uint64 ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagValues ( tagKey , maxTagValues , deadline )
2019-05-22 23:16:55 +02:00
}
2019-11-21 22:18:22 +01:00
// SearchTagEntries returns a list of (tagName -> tagValues)
2020-07-23 19:42:57 +02:00
func ( s * Storage ) SearchTagEntries ( maxTagKeys , maxTagValues int , deadline uint64 ) ( [ ] TagEntry , error ) {
2019-06-10 17:55:20 +02:00
idb := s . idb ( )
2020-07-23 19:42:57 +02:00
keys , err := idb . SearchTagKeys ( maxTagKeys , deadline )
2019-06-10 17:55:20 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot search tag keys: %w" , err )
2019-06-10 17:55:20 +02:00
}
// Sort keys for faster seeks below
sort . Strings ( keys )
tes := make ( [ ] TagEntry , len ( keys ) )
for i , key := range keys {
2020-07-23 19:42:57 +02:00
values , err := idb . SearchTagValues ( [ ] byte ( key ) , maxTagValues , deadline )
2019-06-10 17:55:20 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , fmt . Errorf ( "cannot search values for tag %q: %w" , key , err )
2019-06-10 17:55:20 +02:00
}
te := & tes [ i ]
te . Key = key
te . Values = values
}
return tes , nil
}
// TagEntry contains (tagName -> tagValues) mapping
type TagEntry struct {
// Key is tagName
Key string
// Values contains all the values for Key.
Values [ ] string
}
2019-05-22 23:16:55 +02:00
// GetSeriesCount returns the approximate number of unique time series.
//
// It includes the deleted series too and may count the same series
// up to two times - in db and extDB.
2020-07-23 19:42:57 +02:00
func ( s * Storage ) GetSeriesCount ( deadline uint64 ) ( uint64 , error ) {
return s . idb ( ) . GetSeriesCount ( deadline )
2019-05-22 23:16:55 +02:00
}
2020-04-22 18:57:36 +02:00
// GetTSDBStatusForDate returns TSDB status data for /api/v1/status/tsdb.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
2020-07-23 19:42:57 +02:00
func ( s * Storage ) GetTSDBStatusForDate ( date uint64 , topN int , deadline uint64 ) ( * TSDBStatus , error ) {
return s . idb ( ) . GetTSDBStatusForDate ( date , topN , deadline )
2020-04-22 18:57:36 +02:00
}
2019-05-22 23:16:55 +02:00
// MetricRow is a metric to insert into storage.
type MetricRow struct {
// MetricNameRaw contains raw metric name, which must be decoded
// with MetricName.unmarshalRaw.
MetricNameRaw [ ] byte
Timestamp int64
Value float64
}
// CopyFrom copies src to mr.
func ( mr * MetricRow ) CopyFrom ( src * MetricRow ) {
mr . MetricNameRaw = append ( mr . MetricNameRaw [ : 0 ] , src . MetricNameRaw ... )
mr . Timestamp = src . Timestamp
mr . Value = src . Value
}
// String returns string representation of the mr.
func ( mr * MetricRow ) String ( ) string {
metricName := string ( mr . MetricNameRaw )
var mn MetricName
if err := mn . unmarshalRaw ( mr . MetricNameRaw ) ; err == nil {
metricName = mn . String ( )
}
2019-11-08 18:37:16 +01:00
return fmt . Sprintf ( "MetricName=%s, Timestamp=%d, Value=%f\n" , metricName , mr . Timestamp , mr . Value )
2019-05-22 23:16:55 +02:00
}
// Marshal appends marshaled mr to dst and returns the result.
func ( mr * MetricRow ) Marshal ( dst [ ] byte ) [ ] byte {
dst = encoding . MarshalBytes ( dst , mr . MetricNameRaw )
dst = encoding . MarshalUint64 ( dst , uint64 ( mr . Timestamp ) )
dst = encoding . MarshalUint64 ( dst , math . Float64bits ( mr . Value ) )
return dst
}
// Unmarshal unmarshals mr from src and returns the remaining tail from src.
func ( mr * MetricRow ) Unmarshal ( src [ ] byte ) ( [ ] byte , error ) {
tail , metricNameRaw , err := encoding . UnmarshalBytes ( src )
if err != nil {
2020-06-30 21:58:18 +02:00
return tail , fmt . Errorf ( "cannot unmarshal MetricName: %w" , err )
2019-05-22 23:16:55 +02:00
}
mr . MetricNameRaw = append ( mr . MetricNameRaw [ : 0 ] , metricNameRaw ... )
if len ( tail ) < 8 {
return tail , fmt . Errorf ( "cannot unmarshal Timestamp: want %d bytes; have %d bytes" , 8 , len ( tail ) )
}
timestamp := encoding . UnmarshalUint64 ( tail )
mr . Timestamp = int64 ( timestamp )
tail = tail [ 8 : ]
if len ( tail ) < 8 {
return tail , fmt . Errorf ( "cannot unmarshal Value: want %d bytes; have %d bytes" , 8 , len ( tail ) )
}
value := encoding . UnmarshalUint64 ( tail )
mr . Value = math . Float64frombits ( value )
tail = tail [ 8 : ]
return tail , nil
}
// AddRows adds the given mrs to s.
func ( s * Storage ) AddRows ( mrs [ ] MetricRow , precisionBits uint8 ) error {
if len ( mrs ) == 0 {
return nil
}
// Limit the number of concurrent goroutines that may add rows to the storage.
// This should prevent from out of memory errors and CPU trashing when too many
// goroutines call AddRows.
select {
case addRowsConcurrencyCh <- struct { } { } :
2019-08-06 13:09:17 +02:00
default :
// Sleep for a while until giving up
atomic . AddUint64 ( & s . addRowsConcurrencyLimitReached , 1 )
t := timerpool . Get ( addRowsTimeout )
2020-08-07 07:47:32 +02:00
// Prioritize data ingestion over concurrent searches.
storagepacelimiter . Search . Inc ( )
2019-08-06 13:09:17 +02:00
select {
case addRowsConcurrencyCh <- struct { } { } :
timerpool . Put ( t )
2020-08-07 07:47:32 +02:00
storagepacelimiter . Search . Dec ( )
2019-08-06 13:09:17 +02:00
case <- t . C :
timerpool . Put ( t )
2020-08-07 07:47:32 +02:00
storagepacelimiter . Search . Dec ( )
2019-08-06 13:09:17 +02:00
atomic . AddUint64 ( & s . addRowsConcurrencyLimitTimeout , 1 )
atomic . AddUint64 ( & s . addRowsConcurrencyDroppedRows , uint64 ( len ( mrs ) ) )
2020-08-05 17:24:51 +02:00
return fmt . Errorf ( "cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load" ,
2019-08-06 13:09:17 +02:00
len ( mrs ) , addRowsTimeout , cap ( addRowsConcurrencyCh ) )
}
2019-05-22 23:16:55 +02:00
}
// Add rows to the storage.
var err error
rr := getRawRowsWithSize ( len ( mrs ) )
rr . rows , err = s . add ( rr . rows , mrs , precisionBits )
putRawRows ( rr )
2020-07-05 18:37:38 +02:00
<- addRowsConcurrencyCh
2019-05-22 23:16:55 +02:00
return err
}
var (
2020-07-08 16:29:57 +02:00
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
// goroutines on data ingestion path.
addRowsConcurrencyCh = make ( chan struct { } , runtime . GOMAXPROCS ( - 1 ) )
2019-05-22 23:16:55 +02:00
addRowsTimeout = 30 * time . Second
)
func ( s * Storage ) add ( rows [ ] rawRow , mrs [ ] MetricRow , precisionBits uint8 ) ( [ ] rawRow , error ) {
idb := s . idb ( )
rowsLen := len ( rows )
if n := rowsLen + len ( mrs ) - cap ( rows ) ; n > 0 {
rows = append ( rows [ : cap ( rows ) ] , make ( [ ] rawRow , n ) ... )
}
rows = rows [ : rowsLen + len ( mrs ) ]
j := 0
2019-12-19 14:12:50 +01:00
var (
// These vars are used for speeding up bulk imports of multiple adjancent rows for the same metricName.
prevTSID TSID
prevMetricNameRaw [ ] byte
)
2020-05-14 22:45:04 +02:00
var pmrs * pendingMetricRows
2019-07-11 16:04:56 +02:00
minTimestamp , maxTimestamp := s . tb . getMinMaxTimestamps ( )
2020-05-14 22:17:22 +02:00
// Return only the first error, since it has no sense in returning all errors.
var firstWarn error
2019-05-22 23:16:55 +02:00
for i := range mrs {
mr := & mrs [ i ]
if math . IsNaN ( mr . Value ) {
// Just skip NaNs, since the underlying encoding
// doesn't know how to work with them.
continue
}
2019-07-26 13:10:25 +02:00
if mr . Timestamp < minTimestamp {
// Skip rows with too small timestamps outside the retention.
2020-05-14 22:17:22 +02:00
if firstWarn == nil {
2020-07-08 12:53:29 +02:00
firstWarn = fmt . Errorf ( "cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d; " +
"probably you need updating -retentionPeriod command-line flag" ,
2020-05-14 22:17:22 +02:00
mr . Timestamp , minTimestamp )
}
2019-07-26 13:10:25 +02:00
atomic . AddUint64 ( & s . tooSmallTimestampRows , 1 )
continue
}
if mr . Timestamp > maxTimestamp {
// Skip rows with too big timestamps significantly exceeding the current time.
2020-05-14 22:17:22 +02:00
if firstWarn == nil {
2020-07-08 12:53:29 +02:00
firstWarn = fmt . Errorf ( "cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d; " +
"propbably you need updating -retentionPeriod command-line flag" ,
2020-05-14 22:17:22 +02:00
mr . Timestamp , maxTimestamp )
}
2019-07-26 13:10:25 +02:00
atomic . AddUint64 ( & s . tooBigTimestampRows , 1 )
2019-07-11 16:04:56 +02:00
continue
}
2019-05-22 23:16:55 +02:00
r := & rows [ rowsLen + j ]
j ++
r . Timestamp = mr . Timestamp
r . Value = mr . Value
r . PrecisionBits = precisionBits
2019-12-19 14:12:50 +01:00
if string ( mr . MetricNameRaw ) == string ( prevMetricNameRaw ) {
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r . TSID = prevTSID
continue
}
2020-07-06 20:56:14 +02:00
if s . getTSIDFromCache ( & r . TSID , mr . MetricNameRaw ) {
2020-05-14 22:23:39 +02:00
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
2020-07-08 16:29:57 +02:00
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
// contain MetricName->TSID entries for deleted time series.
// See Storage.DeleteMetrics code for details.
2020-05-14 22:23:39 +02:00
prevTSID = r . TSID
prevMetricNameRaw = mr . MetricNameRaw
continue
2019-05-22 23:16:55 +02:00
}
2020-05-14 22:45:04 +02:00
// Slow path - the TSID is missing in the cache.
// Postpone its search in the loop below.
j --
if pmrs == nil {
pmrs = getPendingMetricRows ( )
2019-05-22 23:16:55 +02:00
}
2020-05-14 22:45:04 +02:00
if err := pmrs . addRow ( mr ) ; err != nil {
2019-05-22 23:16:55 +02:00
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
2020-05-14 22:17:22 +02:00
if firstWarn == nil {
2020-05-14 22:45:04 +02:00
firstWarn = err
2020-05-14 22:17:22 +02:00
}
2019-05-22 23:16:55 +02:00
continue
}
2020-05-14 22:45:04 +02:00
}
if pmrs != nil {
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
pendingMetricRows := pmrs . pmrs
sort . Slice ( pendingMetricRows , func ( i , j int ) bool {
return string ( pendingMetricRows [ i ] . MetricName ) < string ( pendingMetricRows [ j ] . MetricName )
} )
2020-07-23 19:42:57 +02:00
is := idb . getIndexSearch ( noDeadline )
2020-05-14 22:45:04 +02:00
prevMetricNameRaw = nil
2020-07-30 15:14:51 +02:00
var slowInsertsCount uint64
2020-05-14 22:45:04 +02:00
for i := range pendingMetricRows {
pmr := & pendingMetricRows [ i ]
mr := & pmr . mr
r := & rows [ rowsLen + j ]
j ++
r . Timestamp = mr . Timestamp
r . Value = mr . Value
r . PrecisionBits = precisionBits
if string ( mr . MetricNameRaw ) == string ( prevMetricNameRaw ) {
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r . TSID = prevTSID
continue
2020-05-14 22:17:22 +02:00
}
2020-07-06 20:56:14 +02:00
if s . getTSIDFromCache ( & r . TSID , mr . MetricNameRaw ) {
2020-05-14 22:45:04 +02:00
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
2020-07-08 16:29:57 +02:00
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
// contain MetricName->TSID entries for deleted time series.
// See Storage.DeleteMetrics code for details.
2020-05-14 22:45:04 +02:00
prevTSID = r . TSID
prevMetricNameRaw = mr . MetricNameRaw
continue
}
2020-07-30 15:14:51 +02:00
slowInsertsCount ++
2020-05-14 22:45:04 +02:00
if err := is . GetOrCreateTSIDByName ( & r . TSID , pmr . MetricName ) ; err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
2020-06-30 21:58:18 +02:00
firstWarn = fmt . Errorf ( "cannot obtain or create TSID for MetricName %q: %w" , pmr . MetricName , err )
2020-05-14 22:45:04 +02:00
}
j --
continue
}
s . putTSIDToCache ( & r . TSID , mr . MetricNameRaw )
2019-05-22 23:16:55 +02:00
}
2020-05-14 22:45:04 +02:00
idb . putIndexSearch ( is )
putPendingMetricRows ( pmrs )
2020-07-30 15:14:51 +02:00
atomic . AddUint64 ( & s . slowRowInserts , slowInsertsCount )
2019-05-22 23:16:55 +02:00
}
2020-05-14 22:17:22 +02:00
if firstWarn != nil {
logger . Errorf ( "warn occurred during rows addition: %s" , firstWarn )
2019-10-31 13:29:35 +01:00
}
2019-05-22 23:16:55 +02:00
rows = rows [ : rowsLen + j ]
2020-05-14 22:17:22 +02:00
var firstError error
2019-05-22 23:16:55 +02:00
if err := s . tb . AddRows ( rows ) ; err != nil {
2020-06-30 21:58:18 +02:00
firstError = fmt . Errorf ( "cannot add rows to table: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-05-14 22:17:22 +02:00
if err := s . updatePerDateData ( rows ) ; err != nil && firstError == nil {
2020-06-30 21:58:18 +02:00
firstError = fmt . Errorf ( "cannot update per-date data: %w" , err )
2019-05-22 23:16:55 +02:00
}
2020-05-14 22:17:22 +02:00
if firstError != nil {
2020-06-30 21:58:18 +02:00
return rows , fmt . Errorf ( "error occurred during rows addition: %w" , firstError )
2019-10-20 22:38:51 +02:00
}
2019-05-22 23:16:55 +02:00
return rows , nil
}
2020-05-14 22:45:04 +02:00
type pendingMetricRow struct {
MetricName [ ] byte
mr MetricRow
}
type pendingMetricRows struct {
pmrs [ ] pendingMetricRow
metricNamesBuf [ ] byte
lastMetricNameRaw [ ] byte
lastMetricName [ ] byte
mn MetricName
}
func ( pmrs * pendingMetricRows ) reset ( ) {
for _ , pmr := range pmrs . pmrs {
pmr . MetricName = nil
pmr . mr . MetricNameRaw = nil
}
pmrs . pmrs = pmrs . pmrs [ : 0 ]
pmrs . metricNamesBuf = pmrs . metricNamesBuf [ : 0 ]
pmrs . lastMetricNameRaw = nil
pmrs . lastMetricName = nil
pmrs . mn . Reset ( )
}
func ( pmrs * pendingMetricRows ) addRow ( mr * MetricRow ) error {
// Do not spend CPU time on re-calculating canonical metricName during bulk import
// of many rows for the same metric.
if string ( mr . MetricNameRaw ) != string ( pmrs . lastMetricNameRaw ) {
if err := pmrs . mn . unmarshalRaw ( mr . MetricNameRaw ) ; err != nil {
2020-06-30 21:58:18 +02:00
return fmt . Errorf ( "cannot unmarshal MetricNameRaw %q: %w" , mr . MetricNameRaw , err )
2020-05-14 22:45:04 +02:00
}
pmrs . mn . sortTags ( )
metricNamesBufLen := len ( pmrs . metricNamesBuf )
pmrs . metricNamesBuf = pmrs . mn . Marshal ( pmrs . metricNamesBuf )
pmrs . lastMetricName = pmrs . metricNamesBuf [ metricNamesBufLen : ]
pmrs . lastMetricNameRaw = mr . MetricNameRaw
}
pmrs . pmrs = append ( pmrs . pmrs , pendingMetricRow {
MetricName : pmrs . lastMetricName ,
mr : * mr ,
} )
return nil
}
func getPendingMetricRows ( ) * pendingMetricRows {
v := pendingMetricRowsPool . Get ( )
if v == nil {
v = & pendingMetricRows { }
}
return v . ( * pendingMetricRows )
}
func putPendingMetricRows ( pmrs * pendingMetricRows ) {
pmrs . reset ( )
pendingMetricRowsPool . Put ( pmrs )
}
var pendingMetricRowsPool sync . Pool
2020-05-12 00:06:17 +02:00
func ( s * Storage ) updatePerDateData ( rows [ ] rawRow ) error {
2019-05-22 23:16:55 +02:00
var date uint64
2019-06-09 18:06:53 +02:00
var hour uint64
2019-05-22 23:16:55 +02:00
var prevTimestamp int64
2019-12-19 14:12:50 +01:00
var (
// These vars are used for speeding up bulk imports when multiple adjancent rows
// contain the same (metricID, date) pairs.
2020-05-14 22:45:04 +02:00
prevDate uint64
prevMetricID uint64
2019-12-19 14:12:50 +01:00
)
2019-11-08 12:16:40 +01:00
hm := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2020-05-12 00:06:17 +02:00
nextDayMetricIDs := & s . nextDayMetricIDs . Load ( ) . ( * byDateMetricIDEntry ) . v
2020-05-14 21:01:51 +02:00
todayShare16bit := uint64 ( ( float64 ( fasttime . UnixTimestamp ( ) % ( 3600 * 24 ) ) / ( 3600 * 24 ) ) * ( 1 << 16 ) )
2020-05-14 22:45:04 +02:00
type pendingDateMetricID struct {
date uint64
metricID uint64
}
var pendingDateMetricIDs [ ] pendingDateMetricID
2019-05-22 23:16:55 +02:00
for i := range rows {
r := & rows [ i ]
if r . Timestamp != prevTimestamp {
date = uint64 ( r . Timestamp ) / msecPerDay
2019-06-09 18:06:53 +02:00
hour = uint64 ( r . Timestamp ) / msecPerHour
2019-05-22 23:16:55 +02:00
prevTimestamp = r . Timestamp
}
metricID := r . TSID . MetricID
2019-06-09 18:06:53 +02:00
if hour == hm . hour {
// The r belongs to the current hour. Check for the current hour cache.
2019-09-24 20:10:22 +02:00
if hm . m . Has ( metricID ) {
2019-06-09 18:06:53 +02:00
// Fast path: the metricID is in the current hour cache.
2019-11-09 22:17:42 +01:00
// This means the metricID has been already added to per-day inverted index.
2020-05-12 00:06:17 +02:00
// Gradually pre-populate per-day inverted index for the next day
// during the current day.
// This should reduce CPU usage spike and slowdown at the beginning of the next day
// when entries for all the active time series must be added to the index.
// This should address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 .
if todayShare16bit > ( metricID & ( 1 << 16 - 1 ) ) && ! nextDayMetricIDs . Has ( metricID ) {
2020-05-14 22:45:04 +02:00
pendingDateMetricIDs = append ( pendingDateMetricIDs , pendingDateMetricID {
date : date + 1 ,
metricID : metricID ,
} )
2020-05-12 00:06:17 +02:00
s . pendingNextDayMetricIDsLock . Lock ( )
s . pendingNextDayMetricIDs . Add ( metricID )
s . pendingNextDayMetricIDsLock . Unlock ( )
}
2019-06-02 17:34:08 +02:00
continue
}
2019-11-08 18:37:16 +01:00
s . pendingHourEntriesLock . Lock ( )
s . pendingHourEntries . Add ( metricID )
s . pendingHourEntriesLock . Unlock ( )
2019-06-02 17:34:08 +02:00
}
// Slower path: check global cache for (date, metricID) entry.
2020-05-14 22:45:04 +02:00
if metricID == prevMetricID && date == prevDate {
2019-12-19 14:12:50 +01:00
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
}
2020-05-14 22:45:04 +02:00
prevDate = date
prevMetricID = metricID
if ! s . dateMetricIDCache . Has ( date , metricID ) {
// Slow path: store the (date, metricID) entry in the indexDB.
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
pendingDateMetricIDs = append ( pendingDateMetricIDs , pendingDateMetricID {
date : date ,
metricID : metricID ,
} )
}
}
if len ( pendingDateMetricIDs ) == 0 {
// Fast path - there are no new (date, metricID) entires in rows.
return nil
}
// Slow path - add new (date, metricID) entries to indexDB.
2020-05-15 12:44:23 +02:00
atomic . AddUint64 ( & s . slowPerDayIndexInserts , uint64 ( len ( pendingDateMetricIDs ) ) )
2020-05-14 22:45:04 +02:00
// Sort pendingDateMetricIDs by (date, metricID) in order to speed up `is` search in the loop below.
sort . Slice ( pendingDateMetricIDs , func ( i , j int ) bool {
a := pendingDateMetricIDs [ i ]
b := pendingDateMetricIDs [ j ]
if a . date != b . date {
return a . date < b . date
}
return a . metricID < b . metricID
} )
idb := s . idb ( )
2020-07-23 19:42:57 +02:00
is := idb . getIndexSearch ( noDeadline )
2020-05-14 22:45:04 +02:00
defer idb . putIndexSearch ( is )
var firstError error
prevMetricID = 0
prevDate = 0
for _ , dateMetricID := range pendingDateMetricIDs {
date := dateMetricID . date
metricID := dateMetricID . metricID
if metricID == prevMetricID && date == prevDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
}
prevDate = date
prevMetricID = metricID
2019-11-09 22:05:14 +01:00
if s . dateMetricIDCache . Has ( date , metricID ) {
2019-11-09 22:17:42 +01:00
// The metricID has been already added to per-day inverted index.
2019-05-22 23:16:55 +02:00
continue
}
2020-05-14 22:45:04 +02:00
ok , err := is . hasDateMetricID ( date , metricID )
if err != nil {
2020-05-14 22:17:22 +02:00
if firstError == nil {
2020-06-30 21:58:18 +02:00
firstError = fmt . Errorf ( "error when locating (date=%d, metricID=%d) in database: %w" , date , metricID , err )
2020-05-14 22:17:22 +02:00
}
2019-05-22 23:16:55 +02:00
continue
}
2020-05-14 22:45:04 +02:00
if ! ok {
// The (date, metricID) entry is missing in the indexDB. Add it there.
if err := is . storeDateMetricID ( date , metricID ) ; err != nil {
if firstError == nil {
2020-06-30 21:58:18 +02:00
firstError = fmt . Errorf ( "error when storing (date=%d, metricID=%d) in database: %w" , date , metricID , err )
2020-05-14 22:45:04 +02:00
}
continue
}
}
2019-11-09 22:17:42 +01:00
// The metric must be added to cache only after it has been successfully added to indexDB.
s . dateMetricIDCache . Set ( date , metricID )
2019-05-22 23:16:55 +02:00
}
2020-05-14 22:17:22 +02:00
return firstError
2019-05-22 23:16:55 +02:00
}
2019-11-09 22:05:14 +01:00
// dateMetricIDCache is fast cache for holding (date, metricID) entries.
//
// It should be faster than map[date]*uint64set.Set on multicore systems.
type dateMetricIDCache struct {
2019-11-11 12:21:05 +01:00
// 64-bit counters must be at the top of the structure to be properly aligned on 32-bit arches.
syncsCount uint64
resetsCount uint64
2019-11-09 22:05:14 +01:00
// Contains immutable map
byDate atomic . Value
// Contains mutable map protected by mu
byDateMutable * byDateMetricIDMap
2020-05-14 21:01:51 +02:00
lastSyncTime uint64
2019-11-10 21:03:46 +01:00
mu sync . Mutex
2019-11-09 22:05:14 +01:00
}
func newDateMetricIDCache ( ) * dateMetricIDCache {
var dmc dateMetricIDCache
dmc . Reset ( )
return & dmc
}
func ( dmc * dateMetricIDCache ) Reset ( ) {
dmc . mu . Lock ( )
2019-11-11 12:21:05 +01:00
// Do not reset syncsCount and resetsCount
2019-11-09 22:05:14 +01:00
dmc . byDate . Store ( newByDateMetricIDMap ( ) )
dmc . byDateMutable = newByDateMetricIDMap ( )
2020-05-14 21:01:51 +02:00
dmc . lastSyncTime = fasttime . UnixTimestamp ( )
2019-11-09 22:05:14 +01:00
dmc . mu . Unlock ( )
2019-11-11 12:21:05 +01:00
atomic . AddUint64 ( & dmc . resetsCount , 1 )
2019-11-09 22:05:14 +01:00
}
func ( dmc * dateMetricIDCache ) EntriesCount ( ) int {
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
n := 0
for _ , e := range byDate . m {
n += e . v . Len ( )
}
return n
}
2019-11-13 16:58:05 +01:00
func ( dmc * dateMetricIDCache ) SizeBytes ( ) uint64 {
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
n := uint64 ( 0 )
for _ , e := range byDate . m {
n += e . v . SizeBytes ( )
}
return n
}
2019-11-09 22:05:14 +01:00
func ( dmc * dateMetricIDCache ) Has ( date , metricID uint64 ) bool {
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
v := byDate . get ( date )
if v . Has ( metricID ) {
// Fast path.
// The majority of calls must go here.
return true
}
// Slow path. Check mutable map.
2020-05-14 21:01:51 +02:00
currentTime := fasttime . UnixTimestamp ( )
2019-11-10 21:03:46 +01:00
dmc . mu . Lock ( )
2019-11-09 22:05:14 +01:00
v = dmc . byDateMutable . get ( date )
ok := v . Has ( metricID )
mustSync := false
2020-05-14 21:01:51 +02:00
if currentTime - dmc . lastSyncTime > 10 {
2019-11-09 22:05:14 +01:00
mustSync = true
dmc . lastSyncTime = currentTime
}
2019-11-10 21:03:46 +01:00
dmc . mu . Unlock ( )
2019-11-09 22:05:14 +01:00
if mustSync {
dmc . sync ( )
}
return ok
}
func ( dmc * dateMetricIDCache ) Set ( date , metricID uint64 ) {
dmc . mu . Lock ( )
v := dmc . byDateMutable . getOrCreate ( date )
v . Add ( metricID )
dmc . mu . Unlock ( )
}
func ( dmc * dateMetricIDCache ) sync ( ) {
dmc . mu . Lock ( )
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
for date , e := range dmc . byDateMutable . m {
v := byDate . get ( date )
e . v . Union ( v )
}
dmc . byDate . Store ( dmc . byDateMutable )
2019-11-11 12:21:05 +01:00
dmc . byDateMutable = newByDateMetricIDMap ( )
2019-11-09 22:05:14 +01:00
dmc . mu . Unlock ( )
2019-11-11 12:21:05 +01:00
atomic . AddUint64 ( & dmc . syncsCount , 1 )
2019-11-09 22:05:14 +01:00
if dmc . EntriesCount ( ) > memory . Allowed ( ) / 128 {
dmc . Reset ( )
}
}
type byDateMetricIDMap struct {
hotEntry atomic . Value
m map [ uint64 ] * byDateMetricIDEntry
}
func newByDateMetricIDMap ( ) * byDateMetricIDMap {
dmm := & byDateMetricIDMap {
m : make ( map [ uint64 ] * byDateMetricIDEntry ) ,
}
dmm . hotEntry . Store ( & byDateMetricIDEntry { } )
return dmm
}
func ( dmm * byDateMetricIDMap ) get ( date uint64 ) * uint64set . Set {
hotEntry := dmm . hotEntry . Load ( ) . ( * byDateMetricIDEntry )
if hotEntry . date == date {
// Fast path
return & hotEntry . v
}
// Slow path
e := dmm . m [ date ]
if e == nil {
return nil
}
dmm . hotEntry . Store ( e )
return & e . v
}
func ( dmm * byDateMetricIDMap ) getOrCreate ( date uint64 ) * uint64set . Set {
v := dmm . get ( date )
if v != nil {
return v
}
e := & byDateMetricIDEntry {
date : date ,
}
dmm . m [ date ] = e
return & e . v
}
type byDateMetricIDEntry struct {
date uint64
v uint64set . Set
}
2020-05-12 00:06:17 +02:00
func ( s * Storage ) updateNextDayMetricIDs ( ) {
2020-05-14 21:01:51 +02:00
date := fasttime . UnixDate ( )
2020-05-12 00:06:17 +02:00
e := s . nextDayMetricIDs . Load ( ) . ( * byDateMetricIDEntry )
s . pendingNextDayMetricIDsLock . Lock ( )
pendingMetricIDs := s . pendingNextDayMetricIDs
s . pendingNextDayMetricIDs = & uint64set . Set { }
s . pendingNextDayMetricIDsLock . Unlock ( )
if pendingMetricIDs . Len ( ) == 0 && e . date == date {
// Fast path: nothing to update.
return
}
// Slow path: union pendingMetricIDs with e.v
if e . date == date {
pendingMetricIDs . Union ( & e . v )
}
eNew := & byDateMetricIDEntry {
date : date ,
v : * pendingMetricIDs ,
}
s . nextDayMetricIDs . Store ( eNew )
}
2019-06-09 18:06:53 +02:00
func ( s * Storage ) updateCurrHourMetricIDs ( ) {
hm := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2019-11-08 18:37:16 +01:00
s . pendingHourEntriesLock . Lock ( )
newMetricIDs := s . pendingHourEntries
s . pendingHourEntries = & uint64set . Set { }
s . pendingHourEntriesLock . Unlock ( )
2020-05-14 21:01:51 +02:00
hour := fasttime . UnixHour ( )
2019-11-08 12:16:40 +01:00
if newMetricIDs . Len ( ) == 0 && hm . hour == hour {
2019-06-09 18:06:53 +02:00
// Fast path: nothing to update.
2019-06-02 17:34:08 +02:00
return
}
2019-11-08 18:37:16 +01:00
// Slow path: hm.m must be updated with non-empty s.pendingHourEntries.
2019-09-24 20:10:22 +02:00
var m * uint64set . Set
2019-06-09 18:06:53 +02:00
isFull := hm . isFull
if hm . hour == hour {
2019-09-24 20:10:22 +02:00
m = hm . m . Clone ( )
2019-06-09 18:06:53 +02:00
} else {
2019-09-24 20:10:22 +02:00
m = & uint64set . Set { }
2019-06-09 18:06:53 +02:00
isFull = true
}
2019-11-03 23:34:24 +01:00
m . Union ( newMetricIDs )
2019-06-09 18:06:53 +02:00
hmNew := & hourMetricIDs {
m : m ,
hour : hour ,
isFull : isFull ,
}
s . currHourMetricIDs . Store ( hmNew )
if hm . hour != hour {
s . prevHourMetricIDs . Store ( hm )
2019-06-02 17:34:08 +02:00
}
}
2019-06-09 18:06:53 +02:00
type hourMetricIDs struct {
2019-09-24 20:10:22 +02:00
m * uint64set . Set
2019-06-09 18:06:53 +02:00
hour uint64
isFull bool
2019-06-02 17:34:08 +02:00
}
2019-05-22 23:16:55 +02:00
func ( s * Storage ) getTSIDFromCache ( dst * TSID , metricName [ ] byte ) bool {
buf := ( * [ unsafe . Sizeof ( * dst ) ] byte ) ( unsafe . Pointer ( dst ) ) [ : ]
buf = s . tsidCache . Get ( buf [ : 0 ] , metricName )
return uintptr ( len ( buf ) ) == unsafe . Sizeof ( * dst )
}
func ( s * Storage ) putTSIDToCache ( tsid * TSID , metricName [ ] byte ) {
buf := ( * [ unsafe . Sizeof ( * tsid ) ] byte ) ( unsafe . Pointer ( tsid ) ) [ : ]
s . tsidCache . Set ( metricName , buf )
}
2020-07-14 13:02:14 +02:00
func openIndexDBTables ( path string , metricIDCache , metricNameCache , tsidCache * workingsetcache . Cache ,
currHourMetricIDs , prevHourMetricIDs * atomic . Value ) ( curr , prev * indexDB , err error ) {
2019-05-22 23:16:55 +02:00
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
2020-06-30 21:58:18 +02:00
return nil , nil , fmt . Errorf ( "cannot create directory %q: %w" , path , err )
2019-05-22 23:16:55 +02:00
}
d , err := os . Open ( path )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , nil , fmt . Errorf ( "cannot open directory: %w" , err )
2019-05-22 23:16:55 +02:00
}
defer fs . MustClose ( d )
// Search for the two most recent tables - the last one is active,
// the previous one contains backup data.
fis , err := d . Readdir ( - 1 )
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , nil , fmt . Errorf ( "cannot read directory: %w" , err )
2019-05-22 23:16:55 +02:00
}
var tableNames [ ] string
for _ , fi := range fis {
if ! fs . IsDirOrSymlink ( fi ) {
// Skip non-directories.
continue
}
tableName := fi . Name ( )
if ! indexDBTableNameRegexp . MatchString ( tableName ) {
// Skip invalid directories.
continue
}
tableNames = append ( tableNames , tableName )
}
sort . Slice ( tableNames , func ( i , j int ) bool {
return tableNames [ i ] < tableNames [ j ]
} )
if len ( tableNames ) < 2 {
// Create missing tables
if len ( tableNames ) == 0 {
prevName := nextIndexDBTableName ( )
tableNames = append ( tableNames , prevName )
}
currName := nextIndexDBTableName ( )
tableNames = append ( tableNames , currName )
}
// Invariant: len(tableNames) >= 2
// Remove all the tables except two last tables.
for _ , tn := range tableNames [ : len ( tableNames ) - 2 ] {
pathToRemove := path + "/" + tn
logger . Infof ( "removing obsolete indexdb dir %q..." , pathToRemove )
2019-06-12 00:53:43 +02:00
fs . MustRemoveAll ( pathToRemove )
2019-05-22 23:16:55 +02:00
logger . Infof ( "removed obsolete indexdb dir %q" , pathToRemove )
}
// Persist changes on the file system.
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( path )
2019-05-22 23:16:55 +02:00
// Open the last two tables.
currPath := path + "/" + tableNames [ len ( tableNames ) - 1 ]
2020-07-14 13:02:14 +02:00
curr , err = openIndexDB ( currPath , metricIDCache , metricNameCache , tsidCache , currHourMetricIDs , prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
2020-06-30 21:58:18 +02:00
return nil , nil , fmt . Errorf ( "cannot open curr indexdb table at %q: %w" , currPath , err )
2019-05-22 23:16:55 +02:00
}
prevPath := path + "/" + tableNames [ len ( tableNames ) - 2 ]
2020-07-14 13:02:14 +02:00
prev , err = openIndexDB ( prevPath , metricIDCache , metricNameCache , tsidCache , currHourMetricIDs , prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
curr . MustClose ( )
2020-06-30 21:58:18 +02:00
return nil , nil , fmt . Errorf ( "cannot open prev indexdb table at %q: %w" , prevPath , err )
2019-05-22 23:16:55 +02:00
}
2019-11-09 22:17:42 +01:00
// Adjust startDateForPerDayInvertedIndex for the previous index.
if prev . startDateForPerDayInvertedIndex > curr . startDateForPerDayInvertedIndex {
prev . startDateForPerDayInvertedIndex = curr . startDateForPerDayInvertedIndex
}
2019-05-22 23:16:55 +02:00
return curr , prev , nil
}
var indexDBTableNameRegexp = regexp . MustCompile ( "^[0-9A-F]{16}$" )
func nextIndexDBTableName ( ) string {
n := atomic . AddUint64 ( & indexDBTableIdx , 1 )
return fmt . Sprintf ( "%016X" , n )
}
var indexDBTableIdx = uint64 ( time . Now ( ) . UnixNano ( ) )