2019-05-22 23:16:55 +02:00
package storage
import (
"fmt"
2020-01-30 23:54:28 +01:00
"io"
2019-06-14 06:52:32 +02:00
"io/ioutil"
2019-05-22 23:16:55 +02:00
"math"
"os"
"path/filepath"
"regexp"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2019-05-28 16:17:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2019-09-24 20:10:22 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
2019-08-13 20:35:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/fastcache"
)
const maxRetentionMonths = 12 * 100
// Storage represents TSDB storage.
type Storage struct {
2019-10-17 17:22:56 +02:00
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
tooSmallTimestampRows uint64
tooBigTimestampRows uint64
addRowsConcurrencyLimitReached uint64
addRowsConcurrencyLimitTimeout uint64
addRowsConcurrencyDroppedRows uint64
2019-05-22 23:16:55 +02:00
path string
cachePath string
retentionMonths int
2019-05-25 20:51:11 +02:00
// lock file for exclusive access to the storage on the given path.
2019-05-22 23:16:55 +02:00
flockF * os . File
idbCurr atomic . Value
tb * table
// tsidCache is MetricName -> TSID cache.
2019-08-13 20:35:19 +02:00
tsidCache * workingsetcache . Cache
2019-05-22 23:16:55 +02:00
// metricIDCache is MetricID -> TSID cache.
2019-08-13 20:35:19 +02:00
metricIDCache * workingsetcache . Cache
2019-05-22 23:16:55 +02:00
// metricNameCache is MetricID -> MetricName cache.
2019-08-13 20:35:19 +02:00
metricNameCache * workingsetcache . Cache
2019-05-22 23:16:55 +02:00
// dateMetricIDCache is (Date, MetricID) cache.
2019-11-09 22:05:14 +01:00
dateMetricIDCache * dateMetricIDCache
2019-05-22 23:16:55 +02:00
2019-11-11 23:16:42 +01:00
// Fast cache for MetricID values occurred during the current hour.
2019-06-09 18:06:53 +02:00
currHourMetricIDs atomic . Value
2019-11-11 23:16:42 +01:00
// Fast cache for MetricID values occurred during the previous hour.
2019-06-09 18:06:53 +02:00
prevHourMetricIDs atomic . Value
// Pending MetricID values to be added to currHourMetricIDs.
2019-11-08 18:37:16 +01:00
pendingHourEntriesLock sync . Mutex
pendingHourEntries * uint64set . Set
2019-06-02 17:34:08 +02:00
2020-01-30 00:59:43 +01:00
// metricIDs for pre-fetched metricNames in the prefetchMetricNames function.
prefetchedMetricIDs atomic . Value
2019-06-02 17:34:08 +02:00
stop chan struct { }
2020-01-30 00:59:43 +01:00
currHourMetricIDsUpdaterWG sync . WaitGroup
retentionWatcherWG sync . WaitGroup
prefetchedMetricIDsCleanerWG sync . WaitGroup
2019-05-22 23:16:55 +02:00
}
// OpenStorage opens storage on the given path with the given number of retention months.
func OpenStorage ( path string , retentionMonths int ) ( * Storage , error ) {
if retentionMonths > maxRetentionMonths {
return nil , fmt . Errorf ( "too big retentionMonths=%d; cannot exceed %d" , retentionMonths , maxRetentionMonths )
}
if retentionMonths <= 0 {
retentionMonths = maxRetentionMonths
}
path , err := filepath . Abs ( path )
if err != nil {
return nil , fmt . Errorf ( "cannot determine absolute path for %q: %s" , path , err )
}
s := & Storage {
path : path ,
cachePath : path + "/cache" ,
retentionMonths : retentionMonths ,
stop : make ( chan struct { } ) ,
}
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
return nil , fmt . Errorf ( "cannot create a directory for the storage at %q: %s" , path , err )
}
snapshotsPath := path + "/snapshots"
if err := fs . MkdirAllIfNotExist ( snapshotsPath ) ; err != nil {
return nil , fmt . Errorf ( "cannot create %q: %s" , snapshotsPath , err )
}
2019-08-13 00:45:22 +02:00
// Protect from concurrent opens.
flockF , err := fs . CreateFlockFile ( path )
2019-05-22 23:16:55 +02:00
if err != nil {
2019-08-13 00:45:22 +02:00
return nil , err
2019-05-22 23:16:55 +02:00
}
s . flockF = flockF
// Load caches.
mem := memory . Allowed ( )
s . tsidCache = s . mustLoadCache ( "MetricName->TSID" , "metricName_tsid" , mem / 3 )
s . metricIDCache = s . mustLoadCache ( "MetricID->TSID" , "metricID_tsid" , mem / 16 )
s . metricNameCache = s . mustLoadCache ( "MetricID->MetricName" , "metricID_metricName" , mem / 8 )
2019-11-09 22:05:14 +01:00
s . dateMetricIDCache = newDateMetricIDCache ( )
2019-05-22 23:16:55 +02:00
2019-06-14 06:52:32 +02:00
hour := uint64 ( timestampFromTime ( time . Now ( ) ) ) / msecPerHour
hmCurr := s . mustLoadHourMetricIDs ( hour , "curr_hour_metric_ids" )
hmPrev := s . mustLoadHourMetricIDs ( hour - 1 , "prev_hour_metric_ids" )
s . currHourMetricIDs . Store ( hmCurr )
s . prevHourMetricIDs . Store ( hmPrev )
2019-11-08 18:37:16 +01:00
s . pendingHourEntries = & uint64set . Set { }
2019-06-14 06:52:32 +02:00
2020-01-30 00:59:43 +01:00
s . prefetchedMetricIDs . Store ( & uint64set . Set { } )
2019-05-22 23:16:55 +02:00
// Load indexdb
idbPath := path + "/indexdb"
idbSnapshotsPath := idbPath + "/snapshots"
if err := fs . MkdirAllIfNotExist ( idbSnapshotsPath ) ; err != nil {
return nil , fmt . Errorf ( "cannot create %q: %s" , idbSnapshotsPath , err )
}
2019-06-09 18:06:53 +02:00
idbCurr , idbPrev , err := openIndexDBTables ( idbPath , s . metricIDCache , s . metricNameCache , & s . currHourMetricIDs , & s . prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
return nil , fmt . Errorf ( "cannot open indexdb tables at %q: %s" , idbPath , err )
}
idbCurr . SetExtDB ( idbPrev )
s . idbCurr . Store ( idbCurr )
// Load data
tablePath := path + "/data"
tb , err := openTable ( tablePath , retentionMonths , s . getDeletedMetricIDs )
if err != nil {
s . idb ( ) . MustClose ( )
return nil , fmt . Errorf ( "cannot open table at %q: %s" , tablePath , err )
}
s . tb = tb
2019-06-09 18:06:53 +02:00
s . startCurrHourMetricIDsUpdater ( )
2019-05-22 23:16:55 +02:00
s . startRetentionWatcher ( )
2020-01-30 00:59:43 +01:00
s . startPrefetchedMetricIDsCleaner ( )
2019-05-22 23:16:55 +02:00
return s , nil
}
// debugFlush flushes recently added storage data, so it becomes visible to search.
func ( s * Storage ) debugFlush ( ) {
s . tb . flushRawRows ( )
s . idb ( ) . tb . DebugFlush ( )
}
2019-09-24 20:10:22 +02:00
func ( s * Storage ) getDeletedMetricIDs ( ) * uint64set . Set {
2019-05-22 23:16:55 +02:00
return s . idb ( ) . getDeletedMetricIDs ( )
}
// CreateSnapshot creates snapshot for s and returns the snapshot name.
func ( s * Storage ) CreateSnapshot ( ) ( string , error ) {
logger . Infof ( "creating Storage snapshot for %q..." , s . path )
startTime := time . Now ( )
snapshotName := fmt . Sprintf ( "%s-%08X" , time . Now ( ) . UTC ( ) . Format ( "20060102150405" ) , nextSnapshotIdx ( ) )
srcDir := s . path
dstDir := fmt . Sprintf ( "%s/snapshots/%s" , srcDir , snapshotName )
if err := fs . MkdirAllFailIfExist ( dstDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot create dir %q: %s" , dstDir , err )
}
dstDataDir := dstDir + "/data"
if err := fs . MkdirAllFailIfExist ( dstDataDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot create dir %q: %s" , dstDataDir , err )
}
smallDir , bigDir , err := s . tb . CreateSnapshot ( snapshotName )
if err != nil {
return "" , fmt . Errorf ( "cannot create table snapshot: %s" , err )
}
dstSmallDir := dstDataDir + "/small"
if err := fs . SymlinkRelative ( smallDir , dstSmallDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %s" , smallDir , dstSmallDir , err )
}
dstBigDir := dstDataDir + "/big"
if err := fs . SymlinkRelative ( bigDir , dstBigDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %s" , bigDir , dstBigDir , err )
}
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( dstDataDir )
2019-05-22 23:16:55 +02:00
idbSnapshot := fmt . Sprintf ( "%s/indexdb/snapshots/%s" , s . path , snapshotName )
idb := s . idb ( )
currSnapshot := idbSnapshot + "/" + idb . name
if err := idb . tb . CreateSnapshotAt ( currSnapshot ) ; err != nil {
return "" , fmt . Errorf ( "cannot create curr indexDB snapshot: %s" , err )
}
ok := idb . doExtDB ( func ( extDB * indexDB ) {
prevSnapshot := idbSnapshot + "/" + extDB . name
err = extDB . tb . CreateSnapshotAt ( prevSnapshot )
} )
if ok && err != nil {
return "" , fmt . Errorf ( "cannot create prev indexDB snapshot: %s" , err )
}
dstIdbDir := dstDir + "/indexdb"
if err := fs . SymlinkRelative ( idbSnapshot , dstIdbDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %s" , idbSnapshot , dstIdbDir , err )
}
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( dstDir )
fs . MustSyncPath ( srcDir + "/snapshots" )
2019-05-22 23:16:55 +02:00
2020-01-22 17:27:44 +01:00
logger . Infof ( "created Storage snapshot for %q at %q in %.3f seconds" , srcDir , dstDir , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 23:16:55 +02:00
return snapshotName , nil
}
var snapshotNameRegexp = regexp . MustCompile ( "^[0-9]{14}-[0-9A-Fa-f]+$" )
// ListSnapshots returns sorted list of existing snapshots for s.
func ( s * Storage ) ListSnapshots ( ) ( [ ] string , error ) {
snapshotsPath := s . path + "/snapshots"
d , err := os . Open ( snapshotsPath )
if err != nil {
return nil , fmt . Errorf ( "cannot open %q: %s" , snapshotsPath , err )
}
defer fs . MustClose ( d )
fnames , err := d . Readdirnames ( - 1 )
if err != nil {
return nil , fmt . Errorf ( "cannot read contents of %q: %s" , snapshotsPath , err )
}
snapshotNames := make ( [ ] string , 0 , len ( fnames ) )
for _ , fname := range fnames {
if ! snapshotNameRegexp . MatchString ( fname ) {
continue
}
snapshotNames = append ( snapshotNames , fname )
}
sort . Strings ( snapshotNames )
return snapshotNames , nil
}
// DeleteSnapshot deletes the given snapshot.
func ( s * Storage ) DeleteSnapshot ( snapshotName string ) error {
if ! snapshotNameRegexp . MatchString ( snapshotName ) {
return fmt . Errorf ( "invalid snapshotName %q" , snapshotName )
}
snapshotPath := s . path + "/snapshots/" + snapshotName
logger . Infof ( "deleting snapshot %q..." , snapshotPath )
startTime := time . Now ( )
s . tb . MustDeleteSnapshot ( snapshotName )
idbPath := fmt . Sprintf ( "%s/indexdb/snapshots/%s" , s . path , snapshotName )
2019-06-12 00:53:43 +02:00
fs . MustRemoveAll ( idbPath )
fs . MustRemoveAll ( snapshotPath )
2019-05-22 23:16:55 +02:00
2020-01-22 17:27:44 +01:00
logger . Infof ( "deleted snapshot %q in %.3f seconds" , snapshotPath , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 23:16:55 +02:00
return nil
}
var snapshotIdx = uint64 ( time . Now ( ) . UnixNano ( ) )
func nextSnapshotIdx ( ) uint64 {
return atomic . AddUint64 ( & snapshotIdx , 1 )
}
func ( s * Storage ) idb ( ) * indexDB {
return s . idbCurr . Load ( ) . ( * indexDB )
}
// Metrics contains essential metrics for the Storage.
type Metrics struct {
2019-07-26 13:10:25 +02:00
TooSmallTimestampRows uint64
TooBigTimestampRows uint64
2019-08-06 13:09:17 +02:00
AddRowsConcurrencyLimitReached uint64
AddRowsConcurrencyLimitTimeout uint64
AddRowsConcurrencyDroppedRows uint64
AddRowsConcurrencyCapacity uint64
AddRowsConcurrencyCurrent uint64
2019-05-22 23:16:55 +02:00
TSIDCacheSize uint64
2019-07-09 23:47:29 +02:00
TSIDCacheSizeBytes uint64
2019-05-22 23:16:55 +02:00
TSIDCacheRequests uint64
TSIDCacheMisses uint64
TSIDCacheCollisions uint64
MetricIDCacheSize uint64
2019-07-09 23:47:29 +02:00
MetricIDCacheSizeBytes uint64
2019-05-22 23:16:55 +02:00
MetricIDCacheRequests uint64
MetricIDCacheMisses uint64
MetricIDCacheCollisions uint64
MetricNameCacheSize uint64
2019-07-09 23:47:29 +02:00
MetricNameCacheSizeBytes uint64
2019-05-22 23:16:55 +02:00
MetricNameCacheRequests uint64
MetricNameCacheMisses uint64
MetricNameCacheCollisions uint64
2019-11-11 12:21:05 +01:00
DateMetricIDCacheSize uint64
2019-11-13 16:58:05 +01:00
DateMetricIDCacheSizeBytes uint64
2019-11-11 12:21:05 +01:00
DateMetricIDCacheSyncsCount uint64
DateMetricIDCacheResetsCount uint64
2019-05-22 23:16:55 +02:00
2019-11-13 18:00:02 +01:00
HourMetricIDCacheSize uint64
HourMetricIDCacheSizeBytes uint64
2019-06-19 17:36:47 +02:00
2020-01-30 00:59:43 +01:00
PrefetchedMetricIDsSize uint64
PrefetchedMetricIDsSizeBytes uint64
2019-05-22 23:16:55 +02:00
IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics
}
// Reset resets m.
func ( m * Metrics ) Reset ( ) {
* m = Metrics { }
}
// UpdateMetrics updates m with metrics from s.
func ( s * Storage ) UpdateMetrics ( m * Metrics ) {
2019-07-26 13:10:25 +02:00
m . TooSmallTimestampRows += atomic . LoadUint64 ( & s . tooSmallTimestampRows )
m . TooBigTimestampRows += atomic . LoadUint64 ( & s . tooBigTimestampRows )
2019-08-06 13:09:17 +02:00
m . AddRowsConcurrencyLimitReached += atomic . LoadUint64 ( & s . addRowsConcurrencyLimitReached )
m . AddRowsConcurrencyLimitTimeout += atomic . LoadUint64 ( & s . addRowsConcurrencyLimitTimeout )
m . AddRowsConcurrencyDroppedRows += atomic . LoadUint64 ( & s . addRowsConcurrencyDroppedRows )
m . AddRowsConcurrencyCapacity = uint64 ( cap ( addRowsConcurrencyCh ) )
m . AddRowsConcurrencyCurrent = uint64 ( len ( addRowsConcurrencyCh ) )
2019-05-22 23:16:55 +02:00
var cs fastcache . Stats
s . tsidCache . UpdateStats ( & cs )
m . TSIDCacheSize += cs . EntriesCount
2019-07-09 23:47:29 +02:00
m . TSIDCacheSizeBytes += cs . BytesSize
2019-05-22 23:16:55 +02:00
m . TSIDCacheRequests += cs . GetCalls
m . TSIDCacheMisses += cs . Misses
m . TSIDCacheCollisions += cs . Collisions
cs . Reset ( )
s . metricIDCache . UpdateStats ( & cs )
m . MetricIDCacheSize += cs . EntriesCount
2019-07-09 23:47:29 +02:00
m . MetricIDCacheSizeBytes += cs . BytesSize
2019-05-22 23:16:55 +02:00
m . MetricIDCacheRequests += cs . GetCalls
m . MetricIDCacheMisses += cs . Misses
m . MetricIDCacheCollisions += cs . Collisions
cs . Reset ( )
s . metricNameCache . UpdateStats ( & cs )
m . MetricNameCacheSize += cs . EntriesCount
2019-07-09 23:47:29 +02:00
m . MetricNameCacheSizeBytes += cs . BytesSize
2019-05-22 23:16:55 +02:00
m . MetricNameCacheRequests += cs . GetCalls
m . MetricNameCacheMisses += cs . Misses
m . MetricNameCacheCollisions += cs . Collisions
2019-11-09 22:05:14 +01:00
m . DateMetricIDCacheSize += uint64 ( s . dateMetricIDCache . EntriesCount ( ) )
2019-11-13 16:58:05 +01:00
m . DateMetricIDCacheSizeBytes += uint64 ( s . dateMetricIDCache . SizeBytes ( ) )
2019-11-11 12:21:05 +01:00
m . DateMetricIDCacheSyncsCount += atomic . LoadUint64 ( & s . dateMetricIDCache . syncsCount )
m . DateMetricIDCacheResetsCount += atomic . LoadUint64 ( & s . dateMetricIDCache . resetsCount )
2019-05-22 23:16:55 +02:00
2019-06-19 17:36:47 +02:00
hmCurr := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
hmPrev := s . prevHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2019-09-24 20:10:22 +02:00
hourMetricIDsLen := hmPrev . m . Len ( )
if hmCurr . m . Len ( ) > hourMetricIDsLen {
hourMetricIDsLen = hmCurr . m . Len ( )
2019-06-19 17:36:47 +02:00
}
m . HourMetricIDCacheSize += uint64 ( hourMetricIDsLen )
2019-11-13 18:00:02 +01:00
m . HourMetricIDCacheSizeBytes += hmCurr . m . SizeBytes ( )
m . HourMetricIDCacheSizeBytes += hmPrev . m . SizeBytes ( )
2019-06-19 17:36:47 +02:00
2020-01-30 00:59:43 +01:00
prefetchedMetricIDs := s . prefetchedMetricIDs . Load ( ) . ( * uint64set . Set )
m . PrefetchedMetricIDsSize += uint64 ( prefetchedMetricIDs . Len ( ) )
m . PrefetchedMetricIDsSizeBytes += uint64 ( prefetchedMetricIDs . SizeBytes ( ) )
2019-05-22 23:16:55 +02:00
s . idb ( ) . UpdateMetrics ( & m . IndexDBMetrics )
s . tb . UpdateMetrics ( & m . TableMetrics )
}
2020-01-30 00:59:43 +01:00
func ( s * Storage ) startPrefetchedMetricIDsCleaner ( ) {
s . prefetchedMetricIDsCleanerWG . Add ( 1 )
go func ( ) {
s . prefetchedMetricIDsCleaner ( )
s . prefetchedMetricIDsCleanerWG . Done ( )
} ( )
}
func ( s * Storage ) prefetchedMetricIDsCleaner ( ) {
t := time . NewTicker ( 7 * time . Minute )
for {
select {
case <- s . stop :
t . Stop ( )
return
case <- t . C :
s . prefetchedMetricIDs . Store ( & uint64set . Set { } )
}
}
}
2019-05-22 23:16:55 +02:00
func ( s * Storage ) startRetentionWatcher ( ) {
s . retentionWatcherWG . Add ( 1 )
go func ( ) {
s . retentionWatcher ( )
s . retentionWatcherWG . Done ( )
} ( )
}
func ( s * Storage ) retentionWatcher ( ) {
for {
d := nextRetentionDuration ( s . retentionMonths )
select {
case <- s . stop :
return
case <- time . After ( d ) :
s . mustRotateIndexDB ( )
}
}
}
2019-06-09 18:06:53 +02:00
func ( s * Storage ) startCurrHourMetricIDsUpdater ( ) {
s . currHourMetricIDsUpdaterWG . Add ( 1 )
2019-06-02 17:34:08 +02:00
go func ( ) {
2019-06-09 18:06:53 +02:00
s . currHourMetricIDsUpdater ( )
s . currHourMetricIDsUpdaterWG . Done ( )
2019-06-02 17:34:08 +02:00
} ( )
}
2019-06-09 18:06:53 +02:00
var currHourMetricIDsUpdateInterval = time . Second * 10
2019-06-02 20:58:14 +02:00
2019-06-09 18:06:53 +02:00
func ( s * Storage ) currHourMetricIDsUpdater ( ) {
2020-02-13 11:55:58 +01:00
ticker := time . NewTicker ( currHourMetricIDsUpdateInterval )
defer ticker . Stop ( )
2019-06-02 17:34:08 +02:00
for {
select {
case <- s . stop :
2019-11-08 12:16:40 +01:00
s . updateCurrHourMetricIDs ( )
2019-06-02 17:34:08 +02:00
return
2020-02-13 11:55:58 +01:00
case <- ticker . C :
2019-06-09 18:06:53 +02:00
s . updateCurrHourMetricIDs ( )
2019-06-02 17:34:08 +02:00
}
}
}
2019-05-22 23:16:55 +02:00
func ( s * Storage ) mustRotateIndexDB ( ) {
// Create new indexdb table.
newTableName := nextIndexDBTableName ( )
idbNewPath := s . path + "/indexdb/" + newTableName
2019-06-09 18:06:53 +02:00
idbNew , err := openIndexDB ( idbNewPath , s . metricIDCache , s . metricNameCache , & s . currHourMetricIDs , & s . prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
logger . Panicf ( "FATAL: cannot create new indexDB at %q: %s" , idbNewPath , err )
}
// Drop extDB
idbCurr := s . idb ( )
idbCurr . doExtDB ( func ( extDB * indexDB ) {
extDB . scheduleToDrop ( )
} )
idbCurr . SetExtDB ( nil )
// Start using idbNew
idbNew . SetExtDB ( idbCurr )
s . idbCurr . Store ( idbNew )
// Persist changes on the file system.
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( s . path )
2019-05-22 23:16:55 +02:00
// Flush tsidCache, so idbNew can be populated with fresh data.
s . tsidCache . Reset ( )
// Flush dateMetricIDCache, so idbNew can be populated with fresh data.
s . dateMetricIDCache . Reset ( )
// Do not flush metricIDCache and metricNameCache, since all the metricIDs
// from prev idb remain valid after the rotation.
}
// MustClose closes the storage.
func ( s * Storage ) MustClose ( ) {
close ( s . stop )
s . retentionWatcherWG . Wait ( )
2019-06-09 18:06:53 +02:00
s . currHourMetricIDsUpdaterWG . Wait ( )
2019-05-22 23:16:55 +02:00
s . tb . MustClose ( )
s . idb ( ) . MustClose ( )
// Save caches.
2019-08-13 20:35:19 +02:00
s . mustSaveAndStopCache ( s . tsidCache , "MetricName->TSID" , "metricName_tsid" )
s . mustSaveAndStopCache ( s . metricIDCache , "MetricID->TSID" , "metricID_tsid" )
s . mustSaveAndStopCache ( s . metricNameCache , "MetricID->MetricName" , "metricID_metricName" )
2019-05-22 23:16:55 +02:00
2019-06-14 06:52:32 +02:00
hmCurr := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
s . mustSaveHourMetricIDs ( hmCurr , "curr_hour_metric_ids" )
hmPrev := s . prevHourMetricIDs . Load ( ) . ( * hourMetricIDs )
s . mustSaveHourMetricIDs ( hmPrev , "prev_hour_metric_ids" )
2019-05-22 23:16:55 +02:00
// Release lock file.
if err := s . flockF . Close ( ) ; err != nil {
logger . Panicf ( "FATAL: cannot close lock file %q: %s" , s . flockF . Name ( ) , err )
}
}
2019-06-14 06:52:32 +02:00
func ( s * Storage ) mustLoadHourMetricIDs ( hour uint64 , name string ) * hourMetricIDs {
path := s . cachePath + "/" + name
logger . Infof ( "loading %s from %q..." , name , path )
startTime := time . Now ( )
if ! fs . IsPathExist ( path ) {
logger . Infof ( "nothing to load from %q" , path )
2019-11-08 12:16:40 +01:00
return & hourMetricIDs {
hour : hour ,
}
2019-06-14 06:52:32 +02:00
}
src , err := ioutil . ReadFile ( path )
if err != nil {
logger . Panicf ( "FATAL: cannot read %s: %s" , path , err )
}
srcOrigLen := len ( src )
if len ( src ) < 24 {
logger . Errorf ( "discarding %s, since it has broken header; got %d bytes; want %d bytes" , path , len ( src ) , 24 )
2019-11-08 12:16:40 +01:00
return & hourMetricIDs {
hour : hour ,
}
2019-06-14 06:52:32 +02:00
}
2019-11-08 18:37:16 +01:00
// Unmarshal header
2019-06-14 06:52:32 +02:00
isFull := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
hourLoaded := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
if hourLoaded != hour {
2019-11-08 12:16:40 +01:00
logger . Infof ( "discarding %s, since it contains outdated hour; got %d; want %d" , name , hourLoaded , hour )
return & hourMetricIDs {
hour : hour ,
}
2019-06-14 06:52:32 +02:00
}
2019-11-08 18:37:16 +01:00
// Unmarshal hm.m
2019-06-14 06:52:32 +02:00
hmLen := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
2019-11-13 12:11:02 +01:00
if uint64 ( len ( src ) ) < 8 * hmLen {
logger . Errorf ( "discarding %s, since it has broken hm.m data; got %d bytes; want at least %d bytes" , path , len ( src ) , 8 * hmLen )
2019-11-08 12:16:40 +01:00
return & hourMetricIDs {
hour : hour ,
}
2019-06-14 06:52:32 +02:00
}
2019-09-24 20:10:22 +02:00
m := & uint64set . Set { }
2019-06-14 06:52:32 +02:00
for i := uint64 ( 0 ) ; i < hmLen ; i ++ {
metricID := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
2019-09-24 20:10:22 +02:00
m . Add ( metricID )
2019-06-14 06:52:32 +02:00
}
2019-11-13 12:11:02 +01:00
2020-01-22 17:27:44 +01:00
logger . Infof ( "loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) . Seconds ( ) , hmLen , srcOrigLen )
2019-06-14 06:52:32 +02:00
return & hourMetricIDs {
m : m ,
hour : hourLoaded ,
isFull : isFull != 0 ,
}
}
func ( s * Storage ) mustSaveHourMetricIDs ( hm * hourMetricIDs , name string ) {
path := s . cachePath + "/" + name
logger . Infof ( "saving %s to %q..." , name , path )
startTime := time . Now ( )
2019-09-24 20:10:22 +02:00
dst := make ( [ ] byte , 0 , hm . m . Len ( ) * 8 + 24 )
2019-06-14 06:52:32 +02:00
isFull := uint64 ( 0 )
if hm . isFull {
isFull = 1
}
2019-11-08 18:37:16 +01:00
// Marshal header
2019-06-14 06:52:32 +02:00
dst = encoding . MarshalUint64 ( dst , isFull )
dst = encoding . MarshalUint64 ( dst , hm . hour )
2019-11-08 18:37:16 +01:00
// Marshal hm.m
2019-09-24 20:10:22 +02:00
dst = encoding . MarshalUint64 ( dst , uint64 ( hm . m . Len ( ) ) )
2020-01-15 11:12:46 +01:00
hm . m . ForEach ( func ( part [ ] uint64 ) bool {
for _ , metricID := range part {
dst = encoding . MarshalUint64 ( dst , metricID )
}
return true
} )
2019-11-13 12:11:02 +01:00
2019-06-14 06:52:32 +02:00
if err := ioutil . WriteFile ( path , dst , 0644 ) ; err != nil {
logger . Panicf ( "FATAL: cannot write %d bytes to %q: %s" , len ( dst ) , path , err )
}
2020-01-22 17:27:44 +01:00
logger . Infof ( "saved %s to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) . Seconds ( ) , hm . m . Len ( ) , len ( dst ) )
2019-06-14 06:52:32 +02:00
}
2019-08-13 20:35:19 +02:00
func ( s * Storage ) mustLoadCache ( info , name string , sizeBytes int ) * workingsetcache . Cache {
2019-05-22 23:16:55 +02:00
path := s . cachePath + "/" + name
logger . Infof ( "loading %s cache from %q..." , info , path )
startTime := time . Now ( )
2019-08-13 20:35:19 +02:00
c := workingsetcache . Load ( path , sizeBytes , time . Hour )
2019-05-22 23:16:55 +02:00
var cs fastcache . Stats
c . UpdateStats ( & cs )
2020-01-22 17:27:44 +01:00
logger . Infof ( "loaded %s cache from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" ,
info , path , time . Since ( startTime ) . Seconds ( ) , cs . EntriesCount , cs . BytesSize )
2019-05-22 23:16:55 +02:00
return c
}
2019-08-13 20:35:19 +02:00
func ( s * Storage ) mustSaveAndStopCache ( c * workingsetcache . Cache , info , name string ) {
2019-05-22 23:16:55 +02:00
path := s . cachePath + "/" + name
logger . Infof ( "saving %s cache to %q..." , info , path )
startTime := time . Now ( )
2019-08-13 20:35:19 +02:00
if err := c . Save ( path ) ; err != nil {
2019-05-22 23:16:55 +02:00
logger . Panicf ( "FATAL: cannot save %s cache to %q: %s" , info , path , err )
}
var cs fastcache . Stats
c . UpdateStats ( & cs )
2019-08-13 20:35:19 +02:00
c . Stop ( )
2020-01-22 17:27:44 +01:00
logger . Infof ( "saved %s cache to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d" ,
info , path , time . Since ( startTime ) . Seconds ( ) , cs . EntriesCount , cs . BytesSize )
2019-05-22 23:16:55 +02:00
}
func nextRetentionDuration ( retentionMonths int ) time . Duration {
t := time . Now ( ) . UTC ( )
n := t . Year ( ) * 12 + int ( t . Month ( ) ) - 1 + retentionMonths
n -= n % retentionMonths
y := n / 12
m := time . Month ( ( n % 12 ) + 1 )
2019-12-02 13:42:26 +01:00
// Schedule the deadline to +4 hours from the next retention period start.
// This should prevent from possible double deletion of indexdb
// due to time drift - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/248 .
deadline := time . Date ( y , m , 1 , 4 , 0 , 0 , 0 , time . UTC )
2019-05-22 23:16:55 +02:00
return deadline . Sub ( t )
}
2019-09-23 21:34:04 +02:00
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
2019-05-22 23:16:55 +02:00
func ( s * Storage ) searchTSIDs ( tfss [ ] * TagFilters , tr TimeRange , maxMetrics int ) ( [ ] TSID , error ) {
// Do not cache tfss -> tsids here, since the caching is performed
// on idb level.
tsids , err := s . idb ( ) . searchTSIDs ( tfss , tr , maxMetrics )
if err != nil {
return nil , fmt . Errorf ( "error when searching tsids for tfss %q: %s" , tfss , err )
}
return tsids , nil
}
2020-01-30 00:59:43 +01:00
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
//
// This should speed-up further searchMetricName calls for metricIDs from tsids.
func ( s * Storage ) prefetchMetricNames ( tsids [ ] TSID ) error {
var metricIDs uint64Sorter
prefetchedMetricIDs := s . prefetchedMetricIDs . Load ( ) . ( * uint64set . Set )
for i := range tsids {
metricID := tsids [ i ] . MetricID
if prefetchedMetricIDs . Has ( metricID ) {
continue
}
metricIDs = append ( metricIDs , metricID )
}
if len ( metricIDs ) < 500 {
// It is cheaper to skip pre-fetching and obtain metricNames inline.
return nil
}
// Pre-fetch metricIDs.
sort . Sort ( metricIDs )
var metricName [ ] byte
var err error
2020-01-30 23:54:28 +01:00
idb := s . idb ( )
is := idb . getIndexSearch ( )
defer idb . putIndexSearch ( is )
2020-01-30 00:59:43 +01:00
for _ , metricID := range metricIDs {
2020-01-30 23:54:28 +01:00
metricName , err = is . searchMetricName ( metricName [ : 0 ] , metricID )
if err != nil && err != io . EOF {
2020-01-30 00:59:43 +01:00
return fmt . Errorf ( "error in pre-fetching metricName for metricID=%d: %s" , metricID , err )
}
}
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
prefetchedMetricIDsNew := prefetchedMetricIDs . Clone ( )
for _ , metricID := range metricIDs {
prefetchedMetricIDsNew . Add ( metricID )
}
s . prefetchedMetricIDs . Store ( prefetchedMetricIDsNew )
return nil
}
2019-05-22 23:16:55 +02:00
// DeleteMetrics deletes all the metrics matching the given tfss.
//
// Returns the number of metrics deleted.
func ( s * Storage ) DeleteMetrics ( tfss [ ] * TagFilters ) ( int , error ) {
deletedCount , err := s . idb ( ) . DeleteTSIDs ( tfss )
if err != nil {
return deletedCount , fmt . Errorf ( "cannot delete tsids: %s" , err )
}
// Do not reset MetricName -> TSID cache (tsidCache), since the obtained
// entries must be checked against deleted metricIDs.
// See Storage.add for details.
//
// Do not reset MetricID -> MetricName cache, since it must be used only
// after filtering out deleted metricIDs.
return deletedCount , nil
}
// searchMetricName appends metric name for the given metricID to dst
// and returns the result.
func ( s * Storage ) searchMetricName ( dst [ ] byte , metricID uint64 ) ( [ ] byte , error ) {
return s . idb ( ) . searchMetricName ( dst , metricID )
}
// SearchTagKeys searches for tag keys
func ( s * Storage ) SearchTagKeys ( maxTagKeys int ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagKeys ( maxTagKeys )
}
// SearchTagValues searches for tag values for the given tagKey
func ( s * Storage ) SearchTagValues ( tagKey [ ] byte , maxTagValues int ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagValues ( tagKey , maxTagValues )
}
2019-11-21 22:18:22 +01:00
// SearchTagEntries returns a list of (tagName -> tagValues)
2019-06-10 17:55:20 +02:00
func ( s * Storage ) SearchTagEntries ( maxTagKeys , maxTagValues int ) ( [ ] TagEntry , error ) {
idb := s . idb ( )
keys , err := idb . SearchTagKeys ( maxTagKeys )
if err != nil {
return nil , fmt . Errorf ( "cannot search tag keys: %s" , err )
}
// Sort keys for faster seeks below
sort . Strings ( keys )
tes := make ( [ ] TagEntry , len ( keys ) )
for i , key := range keys {
values , err := idb . SearchTagValues ( [ ] byte ( key ) , maxTagValues )
if err != nil {
return nil , fmt . Errorf ( "cannot search values for tag %q: %s" , key , err )
}
te := & tes [ i ]
te . Key = key
te . Values = values
}
return tes , nil
}
// TagEntry contains (tagName -> tagValues) mapping
type TagEntry struct {
// Key is tagName
Key string
// Values contains all the values for Key.
Values [ ] string
}
2019-05-22 23:16:55 +02:00
// GetSeriesCount returns the approximate number of unique time series.
//
// It includes the deleted series too and may count the same series
// up to two times - in db and extDB.
func ( s * Storage ) GetSeriesCount ( ) ( uint64 , error ) {
return s . idb ( ) . GetSeriesCount ( )
}
// MetricRow is a metric to insert into storage.
type MetricRow struct {
// MetricNameRaw contains raw metric name, which must be decoded
// with MetricName.unmarshalRaw.
MetricNameRaw [ ] byte
Timestamp int64
Value float64
}
// CopyFrom copies src to mr.
func ( mr * MetricRow ) CopyFrom ( src * MetricRow ) {
mr . MetricNameRaw = append ( mr . MetricNameRaw [ : 0 ] , src . MetricNameRaw ... )
mr . Timestamp = src . Timestamp
mr . Value = src . Value
}
// String returns string representation of the mr.
func ( mr * MetricRow ) String ( ) string {
metricName := string ( mr . MetricNameRaw )
var mn MetricName
if err := mn . unmarshalRaw ( mr . MetricNameRaw ) ; err == nil {
metricName = mn . String ( )
}
2019-11-08 18:37:16 +01:00
return fmt . Sprintf ( "MetricName=%s, Timestamp=%d, Value=%f\n" , metricName , mr . Timestamp , mr . Value )
2019-05-22 23:16:55 +02:00
}
// Marshal appends marshaled mr to dst and returns the result.
func ( mr * MetricRow ) Marshal ( dst [ ] byte ) [ ] byte {
dst = encoding . MarshalBytes ( dst , mr . MetricNameRaw )
dst = encoding . MarshalUint64 ( dst , uint64 ( mr . Timestamp ) )
dst = encoding . MarshalUint64 ( dst , math . Float64bits ( mr . Value ) )
return dst
}
// Unmarshal unmarshals mr from src and returns the remaining tail from src.
func ( mr * MetricRow ) Unmarshal ( src [ ] byte ) ( [ ] byte , error ) {
tail , metricNameRaw , err := encoding . UnmarshalBytes ( src )
if err != nil {
return tail , fmt . Errorf ( "cannot unmarshal MetricName: %s" , err )
}
mr . MetricNameRaw = append ( mr . MetricNameRaw [ : 0 ] , metricNameRaw ... )
if len ( tail ) < 8 {
return tail , fmt . Errorf ( "cannot unmarshal Timestamp: want %d bytes; have %d bytes" , 8 , len ( tail ) )
}
timestamp := encoding . UnmarshalUint64 ( tail )
mr . Timestamp = int64 ( timestamp )
tail = tail [ 8 : ]
if len ( tail ) < 8 {
return tail , fmt . Errorf ( "cannot unmarshal Value: want %d bytes; have %d bytes" , 8 , len ( tail ) )
}
value := encoding . UnmarshalUint64 ( tail )
mr . Value = math . Float64frombits ( value )
tail = tail [ 8 : ]
return tail , nil
}
// AddRows adds the given mrs to s.
func ( s * Storage ) AddRows ( mrs [ ] MetricRow , precisionBits uint8 ) error {
if len ( mrs ) == 0 {
return nil
}
// Limit the number of concurrent goroutines that may add rows to the storage.
// This should prevent from out of memory errors and CPU trashing when too many
// goroutines call AddRows.
select {
case addRowsConcurrencyCh <- struct { } { } :
defer func ( ) { <- addRowsConcurrencyCh } ( )
2019-08-06 13:09:17 +02:00
default :
// Sleep for a while until giving up
atomic . AddUint64 ( & s . addRowsConcurrencyLimitReached , 1 )
t := timerpool . Get ( addRowsTimeout )
select {
case addRowsConcurrencyCh <- struct { } { } :
timerpool . Put ( t )
defer func ( ) { <- addRowsConcurrencyCh } ( )
case <- t . C :
timerpool . Put ( t )
atomic . AddUint64 ( & s . addRowsConcurrencyLimitTimeout , 1 )
atomic . AddUint64 ( & s . addRowsConcurrencyDroppedRows , uint64 ( len ( mrs ) ) )
return fmt . Errorf ( "Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load" ,
len ( mrs ) , addRowsTimeout , cap ( addRowsConcurrencyCh ) )
}
2019-05-22 23:16:55 +02:00
}
// Add rows to the storage.
var err error
rr := getRawRowsWithSize ( len ( mrs ) )
rr . rows , err = s . add ( rr . rows , mrs , precisionBits )
putRawRows ( rr )
return err
}
var (
addRowsConcurrencyCh = make ( chan struct { } , runtime . GOMAXPROCS ( - 1 ) * 2 )
addRowsTimeout = 30 * time . Second
)
func ( s * Storage ) add ( rows [ ] rawRow , mrs [ ] MetricRow , precisionBits uint8 ) ( [ ] rawRow , error ) {
var is * indexSearch
var mn * MetricName
var kb * bytesutil . ByteBuffer
idb := s . idb ( )
dmis := idb . getDeletedMetricIDs ( )
rowsLen := len ( rows )
if n := rowsLen + len ( mrs ) - cap ( rows ) ; n > 0 {
rows = append ( rows [ : cap ( rows ) ] , make ( [ ] rawRow , n ) ... )
}
rows = rows [ : rowsLen + len ( mrs ) ]
j := 0
2019-12-19 14:12:50 +01:00
var (
// These vars are used for speeding up bulk imports of multiple adjancent rows for the same metricName.
prevTSID TSID
prevMetricNameRaw [ ] byte
)
2019-07-11 16:04:56 +02:00
minTimestamp , maxTimestamp := s . tb . getMinMaxTimestamps ( )
2019-11-08 12:16:40 +01:00
// Return only the last error, since it has no sense in returning all errors.
var lastWarn error
2019-05-22 23:16:55 +02:00
for i := range mrs {
mr := & mrs [ i ]
if math . IsNaN ( mr . Value ) {
// Just skip NaNs, since the underlying encoding
// doesn't know how to work with them.
continue
}
2019-07-26 13:10:25 +02:00
if mr . Timestamp < minTimestamp {
// Skip rows with too small timestamps outside the retention.
2019-10-20 22:38:51 +02:00
lastWarn = fmt . Errorf ( "cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d" , mr . Timestamp , minTimestamp )
2019-07-26 13:10:25 +02:00
atomic . AddUint64 ( & s . tooSmallTimestampRows , 1 )
continue
}
if mr . Timestamp > maxTimestamp {
// Skip rows with too big timestamps significantly exceeding the current time.
2019-10-20 22:38:51 +02:00
lastWarn = fmt . Errorf ( "cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d" , mr . Timestamp , maxTimestamp )
2019-07-26 13:10:25 +02:00
atomic . AddUint64 ( & s . tooBigTimestampRows , 1 )
2019-07-11 16:04:56 +02:00
continue
}
2019-05-22 23:16:55 +02:00
r := & rows [ rowsLen + j ]
j ++
r . Timestamp = mr . Timestamp
r . Value = mr . Value
r . PrecisionBits = precisionBits
2019-12-19 14:12:50 +01:00
if string ( mr . MetricNameRaw ) == string ( prevMetricNameRaw ) {
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r . TSID = prevTSID
continue
}
2019-05-22 23:16:55 +02:00
if s . getTSIDFromCache ( & r . TSID , mr . MetricNameRaw ) {
2019-09-24 20:10:22 +02:00
if ! dmis . Has ( r . TSID . MetricID ) {
2019-05-22 23:16:55 +02:00
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
2019-12-19 14:12:50 +01:00
prevTSID = r . TSID
prevMetricNameRaw = mr . MetricNameRaw
2019-05-22 23:16:55 +02:00
continue
}
}
// Slow path - the TSID is missing in the cache. Search for it in the index.
if is == nil {
is = idb . getIndexSearch ( )
mn = GetMetricName ( )
kb = kbPool . Get ( )
}
if err := mn . unmarshalRaw ( mr . MetricNameRaw ) ; err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
2019-10-20 22:38:51 +02:00
lastWarn = fmt . Errorf ( "cannot unmarshal MetricNameRaw %q: %s" , mr . MetricNameRaw , err )
2019-05-22 23:16:55 +02:00
j --
continue
}
mn . sortTags ( )
kb . B = mn . Marshal ( kb . B [ : 0 ] )
if err := is . GetOrCreateTSIDByName ( & r . TSID , kb . B ) ; err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
2019-10-20 22:38:51 +02:00
lastWarn = fmt . Errorf ( "cannot obtain TSID for MetricName %q: %s" , kb . B , err )
2019-05-22 23:16:55 +02:00
j --
continue
}
s . putTSIDToCache ( & r . TSID , mr . MetricNameRaw )
}
2019-10-31 13:29:35 +01:00
if lastWarn != nil {
logger . Errorf ( "warn occurred during rows addition: %s" , lastWarn )
}
2019-05-22 23:16:55 +02:00
if is != nil {
kbPool . Put ( kb )
PutMetricName ( mn )
idb . putIndexSearch ( is )
}
rows = rows [ : rowsLen + j ]
2019-10-31 13:29:35 +01:00
var lastError error
2019-05-22 23:16:55 +02:00
if err := s . tb . AddRows ( rows ) ; err != nil {
2019-08-25 14:28:32 +02:00
lastError = fmt . Errorf ( "cannot add rows to table: %s" , err )
2019-05-22 23:16:55 +02:00
}
2020-02-16 14:51:11 +01:00
if err := s . updatePerDateData ( rows , lastError ) ; err != nil lastError == nil {
lastError = fmt . Errorf ( "cannot update per-date data: %s" , err )
2019-05-22 23:16:55 +02:00
}
2019-10-31 13:29:35 +01:00
if lastError != nil {
return rows , fmt . Errorf ( "error occurred during rows addition: %s" , lastError )
2019-10-20 22:38:51 +02:00
}
2019-05-22 23:16:55 +02:00
return rows , nil
}
2019-11-09 22:17:42 +01:00
func ( s * Storage ) updatePerDateData ( rows [ ] rawRow , lastError error ) error {
2019-05-22 23:16:55 +02:00
var date uint64
2019-06-09 18:06:53 +02:00
var hour uint64
2019-05-22 23:16:55 +02:00
var prevTimestamp int64
2019-12-19 14:12:50 +01:00
var (
// These vars are used for speeding up bulk imports when multiple adjancent rows
// contain the same (metricID, date) pairs.
prevMatchedDate uint64
prevMatchedMetricID uint64
)
2019-05-22 23:16:55 +02:00
idb := s . idb ( )
2019-11-08 12:16:40 +01:00
hm := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2019-05-22 23:16:55 +02:00
for i := range rows {
r := & rows [ i ]
if r . Timestamp != prevTimestamp {
date = uint64 ( r . Timestamp ) / msecPerDay
2019-06-09 18:06:53 +02:00
hour = uint64 ( r . Timestamp ) / msecPerHour
2019-05-22 23:16:55 +02:00
prevTimestamp = r . Timestamp
}
metricID := r . TSID . MetricID
2019-06-09 18:06:53 +02:00
if hour == hm . hour {
// The r belongs to the current hour. Check for the current hour cache.
2019-09-24 20:10:22 +02:00
if hm . m . Has ( metricID ) {
2019-06-09 18:06:53 +02:00
// Fast path: the metricID is in the current hour cache.
2019-11-09 22:17:42 +01:00
// This means the metricID has been already added to per-day inverted index.
2019-06-02 17:34:08 +02:00
continue
}
2019-11-08 18:37:16 +01:00
s . pendingHourEntriesLock . Lock ( )
s . pendingHourEntries . Add ( metricID )
s . pendingHourEntriesLock . Unlock ( )
2019-06-02 17:34:08 +02:00
}
// Slower path: check global cache for (date, metricID) entry.
2019-12-19 14:12:50 +01:00
if metricID == prevMatchedMetricID && date == prevMatchedDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
}
2019-11-09 22:05:14 +01:00
if s . dateMetricIDCache . Has ( date , metricID ) {
2019-11-09 22:17:42 +01:00
// The metricID has been already added to per-day inverted index.
2019-12-19 14:12:50 +01:00
prevMatchedDate = date
prevMatchedMetricID = metricID
2019-05-22 23:16:55 +02:00
continue
}
2019-11-13 12:11:02 +01:00
// Slow path: store the (date, metricID) entry in the indexDB.
2019-05-22 23:16:55 +02:00
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
if err := idb . storeDateMetricID ( date , metricID ) ; err != nil {
2019-08-25 14:28:32 +02:00
lastError = err
2019-05-22 23:16:55 +02:00
continue
}
2019-11-09 22:17:42 +01:00
// The metric must be added to cache only after it has been successfully added to indexDB.
s . dateMetricIDCache . Set ( date , metricID )
2019-05-22 23:16:55 +02:00
}
2019-08-25 14:28:32 +02:00
return lastError
2019-05-22 23:16:55 +02:00
}
2019-11-09 22:05:14 +01:00
// dateMetricIDCache is fast cache for holding (date, metricID) entries.
//
// It should be faster than map[date]*uint64set.Set on multicore systems.
type dateMetricIDCache struct {
2019-11-11 12:21:05 +01:00
// 64-bit counters must be at the top of the structure to be properly aligned on 32-bit arches.
syncsCount uint64
resetsCount uint64
2019-11-09 22:05:14 +01:00
// Contains immutable map
byDate atomic . Value
// Contains mutable map protected by mu
byDateMutable * byDateMetricIDMap
lastSyncTime time . Time
2019-11-10 21:03:46 +01:00
mu sync . Mutex
2019-11-09 22:05:14 +01:00
}
func newDateMetricIDCache ( ) * dateMetricIDCache {
var dmc dateMetricIDCache
dmc . Reset ( )
return & dmc
}
func ( dmc * dateMetricIDCache ) Reset ( ) {
dmc . mu . Lock ( )
2019-11-11 12:21:05 +01:00
// Do not reset syncsCount and resetsCount
2019-11-09 22:05:14 +01:00
dmc . byDate . Store ( newByDateMetricIDMap ( ) )
dmc . byDateMutable = newByDateMetricIDMap ( )
2019-11-11 12:21:05 +01:00
dmc . lastSyncTime = time . Now ( )
2019-11-09 22:05:14 +01:00
dmc . mu . Unlock ( )
2019-11-11 12:21:05 +01:00
atomic . AddUint64 ( & dmc . resetsCount , 1 )
2019-11-09 22:05:14 +01:00
}
func ( dmc * dateMetricIDCache ) EntriesCount ( ) int {
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
n := 0
for _ , e := range byDate . m {
n += e . v . Len ( )
}
return n
}
2019-11-13 16:58:05 +01:00
func ( dmc * dateMetricIDCache ) SizeBytes ( ) uint64 {
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
n := uint64 ( 0 )
for _ , e := range byDate . m {
n += e . v . SizeBytes ( )
}
return n
}
2019-11-09 22:05:14 +01:00
func ( dmc * dateMetricIDCache ) Has ( date , metricID uint64 ) bool {
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
v := byDate . get ( date )
if v . Has ( metricID ) {
// Fast path.
// The majority of calls must go here.
return true
}
// Slow path. Check mutable map.
currentTime := time . Now ( )
2019-11-10 21:03:46 +01:00
dmc . mu . Lock ( )
2019-11-09 22:05:14 +01:00
v = dmc . byDateMutable . get ( date )
ok := v . Has ( metricID )
mustSync := false
if currentTime . Sub ( dmc . lastSyncTime ) > 10 * time . Second {
mustSync = true
dmc . lastSyncTime = currentTime
}
2019-11-10 21:03:46 +01:00
dmc . mu . Unlock ( )
2019-11-09 22:05:14 +01:00
if mustSync {
dmc . sync ( )
}
return ok
}
func ( dmc * dateMetricIDCache ) Set ( date , metricID uint64 ) {
dmc . mu . Lock ( )
v := dmc . byDateMutable . getOrCreate ( date )
v . Add ( metricID )
dmc . mu . Unlock ( )
}
func ( dmc * dateMetricIDCache ) sync ( ) {
dmc . mu . Lock ( )
byDate := dmc . byDate . Load ( ) . ( * byDateMetricIDMap )
for date , e := range dmc . byDateMutable . m {
v := byDate . get ( date )
e . v . Union ( v )
}
dmc . byDate . Store ( dmc . byDateMutable )
2019-11-11 12:21:05 +01:00
dmc . byDateMutable = newByDateMetricIDMap ( )
2019-11-09 22:05:14 +01:00
dmc . mu . Unlock ( )
2019-11-11 12:21:05 +01:00
atomic . AddUint64 ( & dmc . syncsCount , 1 )
2019-11-09 22:05:14 +01:00
if dmc . EntriesCount ( ) > memory . Allowed ( ) / 128 {
dmc . Reset ( )
}
}
type byDateMetricIDMap struct {
hotEntry atomic . Value
m map [ uint64 ] * byDateMetricIDEntry
}
func newByDateMetricIDMap ( ) * byDateMetricIDMap {
dmm := & byDateMetricIDMap {
m : make ( map [ uint64 ] * byDateMetricIDEntry ) ,
}
dmm . hotEntry . Store ( & byDateMetricIDEntry { } )
return dmm
}
func ( dmm * byDateMetricIDMap ) get ( date uint64 ) * uint64set . Set {
hotEntry := dmm . hotEntry . Load ( ) . ( * byDateMetricIDEntry )
if hotEntry . date == date {
// Fast path
return & hotEntry . v
}
// Slow path
e := dmm . m [ date ]
if e == nil {
return nil
}
dmm . hotEntry . Store ( e )
return & e . v
}
func ( dmm * byDateMetricIDMap ) getOrCreate ( date uint64 ) * uint64set . Set {
v := dmm . get ( date )
if v != nil {
return v
}
e := & byDateMetricIDEntry {
date : date ,
}
dmm . m [ date ] = e
return & e . v
}
type byDateMetricIDEntry struct {
date uint64
v uint64set . Set
}
2019-06-09 18:06:53 +02:00
func ( s * Storage ) updateCurrHourMetricIDs ( ) {
hm := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2019-11-08 18:37:16 +01:00
s . pendingHourEntriesLock . Lock ( )
newMetricIDs := s . pendingHourEntries
s . pendingHourEntries = & uint64set . Set { }
s . pendingHourEntriesLock . Unlock ( )
2019-06-09 18:06:53 +02:00
hour := uint64 ( timestampFromTime ( time . Now ( ) ) ) / msecPerHour
2019-11-08 12:16:40 +01:00
if newMetricIDs . Len ( ) == 0 && hm . hour == hour {
2019-06-09 18:06:53 +02:00
// Fast path: nothing to update.
2019-06-02 17:34:08 +02:00
return
}
2019-11-08 18:37:16 +01:00
// Slow path: hm.m must be updated with non-empty s.pendingHourEntries.
2019-09-24 20:10:22 +02:00
var m * uint64set . Set
2019-06-09 18:06:53 +02:00
isFull := hm . isFull
if hm . hour == hour {
2019-09-24 20:10:22 +02:00
m = hm . m . Clone ( )
2019-06-09 18:06:53 +02:00
} else {
2019-09-24 20:10:22 +02:00
m = & uint64set . Set { }
2019-06-09 18:06:53 +02:00
isFull = true
}
2019-11-03 23:34:24 +01:00
m . Union ( newMetricIDs )
2019-06-09 18:06:53 +02:00
hmNew := & hourMetricIDs {
m : m ,
hour : hour ,
isFull : isFull ,
}
s . currHourMetricIDs . Store ( hmNew )
if hm . hour != hour {
s . prevHourMetricIDs . Store ( hm )
2019-06-02 17:34:08 +02:00
}
}
2019-06-09 18:06:53 +02:00
type hourMetricIDs struct {
2019-09-24 20:10:22 +02:00
m * uint64set . Set
2019-06-09 18:06:53 +02:00
hour uint64
isFull bool
2019-06-02 17:34:08 +02:00
}
2019-05-22 23:16:55 +02:00
func ( s * Storage ) getTSIDFromCache ( dst * TSID , metricName [ ] byte ) bool {
buf := ( * [ unsafe . Sizeof ( * dst ) ] byte ) ( unsafe . Pointer ( dst ) ) [ : ]
buf = s . tsidCache . Get ( buf [ : 0 ] , metricName )
return uintptr ( len ( buf ) ) == unsafe . Sizeof ( * dst )
}
func ( s * Storage ) putTSIDToCache ( tsid * TSID , metricName [ ] byte ) {
buf := ( * [ unsafe . Sizeof ( * tsid ) ] byte ) ( unsafe . Pointer ( tsid ) ) [ : ]
s . tsidCache . Set ( metricName , buf )
}
2019-08-13 20:35:19 +02:00
func openIndexDBTables ( path string , metricIDCache , metricNameCache * workingsetcache . Cache , currHourMetricIDs , prevHourMetricIDs * atomic . Value ) ( curr , prev * indexDB , err error ) {
2019-05-22 23:16:55 +02:00
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
return nil , nil , fmt . Errorf ( "cannot create directory %q: %s" , path , err )
}
d , err := os . Open ( path )
if err != nil {
return nil , nil , fmt . Errorf ( "cannot open directory: %s" , err )
}
defer fs . MustClose ( d )
// Search for the two most recent tables - the last one is active,
// the previous one contains backup data.
fis , err := d . Readdir ( - 1 )
if err != nil {
return nil , nil , fmt . Errorf ( "cannot read directory: %s" , err )
}
var tableNames [ ] string
for _ , fi := range fis {
if ! fs . IsDirOrSymlink ( fi ) {
// Skip non-directories.
continue
}
tableName := fi . Name ( )
if ! indexDBTableNameRegexp . MatchString ( tableName ) {
// Skip invalid directories.
continue
}
tableNames = append ( tableNames , tableName )
}
sort . Slice ( tableNames , func ( i , j int ) bool {
return tableNames [ i ] < tableNames [ j ]
} )
if len ( tableNames ) < 2 {
// Create missing tables
if len ( tableNames ) == 0 {
prevName := nextIndexDBTableName ( )
tableNames = append ( tableNames , prevName )
}
currName := nextIndexDBTableName ( )
tableNames = append ( tableNames , currName )
}
// Invariant: len(tableNames) >= 2
// Remove all the tables except two last tables.
for _ , tn := range tableNames [ : len ( tableNames ) - 2 ] {
pathToRemove := path + "/" + tn
logger . Infof ( "removing obsolete indexdb dir %q..." , pathToRemove )
2019-06-12 00:53:43 +02:00
fs . MustRemoveAll ( pathToRemove )
2019-05-22 23:16:55 +02:00
logger . Infof ( "removed obsolete indexdb dir %q" , pathToRemove )
}
// Persist changes on the file system.
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( path )
2019-05-22 23:16:55 +02:00
// Open the last two tables.
currPath := path + "/" + tableNames [ len ( tableNames ) - 1 ]
2019-06-09 18:06:53 +02:00
curr , err = openIndexDB ( currPath , metricIDCache , metricNameCache , currHourMetricIDs , prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
return nil , nil , fmt . Errorf ( "cannot open curr indexdb table at %q: %s" , currPath , err )
}
prevPath := path + "/" + tableNames [ len ( tableNames ) - 2 ]
2019-06-09 18:06:53 +02:00
prev , err = openIndexDB ( prevPath , metricIDCache , metricNameCache , currHourMetricIDs , prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
curr . MustClose ( )
return nil , nil , fmt . Errorf ( "cannot open prev indexdb table at %q: %s" , prevPath , err )
}
2019-11-09 22:17:42 +01:00
// Adjust startDateForPerDayInvertedIndex for the previous index.
if prev . startDateForPerDayInvertedIndex > curr . startDateForPerDayInvertedIndex {
prev . startDateForPerDayInvertedIndex = curr . startDateForPerDayInvertedIndex
}
2019-05-22 23:16:55 +02:00
return curr , prev , nil
}
var indexDBTableNameRegexp = regexp . MustCompile ( "^[0-9A-F]{16}$" )
func nextIndexDBTableName ( ) string {
n := atomic . AddUint64 ( & indexDBTableIdx , 1 )
return fmt . Sprintf ( "%016X" , n )
}
var indexDBTableIdx = uint64 ( time . Now ( ) . UnixNano ( ) )