2019-05-22 23:16:55 +02:00
package storage
import (
"fmt"
2019-06-14 06:52:32 +02:00
"io/ioutil"
2019-05-22 23:16:55 +02:00
"math"
"os"
"path/filepath"
"regexp"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2019-05-28 16:17:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2019-09-24 20:10:22 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
2019-08-13 20:35:19 +02:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
2019-05-22 23:16:55 +02:00
"github.com/VictoriaMetrics/fastcache"
)
const maxRetentionMonths = 12 * 100
// Storage represents TSDB storage.
type Storage struct {
2019-10-17 17:22:56 +02:00
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
tooSmallTimestampRows uint64
tooBigTimestampRows uint64
addRowsConcurrencyLimitReached uint64
addRowsConcurrencyLimitTimeout uint64
addRowsConcurrencyDroppedRows uint64
2019-05-22 23:16:55 +02:00
path string
cachePath string
retentionMonths int
2019-05-25 20:51:11 +02:00
// lock file for exclusive access to the storage on the given path.
2019-05-22 23:16:55 +02:00
flockF * os . File
idbCurr atomic . Value
tb * table
// tsidCache is MetricName -> TSID cache.
2019-08-13 20:35:19 +02:00
tsidCache * workingsetcache . Cache
2019-05-22 23:16:55 +02:00
// metricIDCache is MetricID -> TSID cache.
2019-08-13 20:35:19 +02:00
metricIDCache * workingsetcache . Cache
2019-05-22 23:16:55 +02:00
// metricNameCache is MetricID -> MetricName cache.
2019-08-13 20:35:19 +02:00
metricNameCache * workingsetcache . Cache
2019-05-22 23:16:55 +02:00
// dateMetricIDCache is (Date, MetricID) cache.
2019-08-13 20:35:19 +02:00
dateMetricIDCache * workingsetcache . Cache
2019-05-22 23:16:55 +02:00
2019-06-09 18:06:53 +02:00
// Fast cache for MetricID values occured during the current hour.
currHourMetricIDs atomic . Value
// Fast cache for MetricID values occured during the previous hour.
prevHourMetricIDs atomic . Value
// Pending MetricID values to be added to currHourMetricIDs.
2019-11-08 12:16:40 +01:00
pendingHourEntriesLock sync . Mutex
pendingHourEntries [ ] pendingHourMetricIDEntry
2019-06-02 17:34:08 +02:00
stop chan struct { }
2019-06-09 18:06:53 +02:00
currHourMetricIDsUpdaterWG sync . WaitGroup
retentionWatcherWG sync . WaitGroup
2019-05-22 23:16:55 +02:00
}
2019-10-31 14:50:58 +01:00
type pendingHourMetricIDEntry struct {
AccountID uint32
ProjectID uint32
MetricID uint64
}
type accountProjectKey struct {
AccountID uint32
ProjectID uint32
}
2019-05-22 23:16:55 +02:00
// OpenStorage opens storage on the given path with the given number of retention months.
func OpenStorage ( path string , retentionMonths int ) ( * Storage , error ) {
if retentionMonths > maxRetentionMonths {
return nil , fmt . Errorf ( "too big retentionMonths=%d; cannot exceed %d" , retentionMonths , maxRetentionMonths )
}
if retentionMonths <= 0 {
retentionMonths = maxRetentionMonths
}
path , err := filepath . Abs ( path )
if err != nil {
return nil , fmt . Errorf ( "cannot determine absolute path for %q: %s" , path , err )
}
s := & Storage {
path : path ,
cachePath : path + "/cache" ,
retentionMonths : retentionMonths ,
stop : make ( chan struct { } ) ,
}
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
return nil , fmt . Errorf ( "cannot create a directory for the storage at %q: %s" , path , err )
}
snapshotsPath := path + "/snapshots"
if err := fs . MkdirAllIfNotExist ( snapshotsPath ) ; err != nil {
return nil , fmt . Errorf ( "cannot create %q: %s" , snapshotsPath , err )
}
2019-08-13 00:45:22 +02:00
// Protect from concurrent opens.
flockF , err := fs . CreateFlockFile ( path )
2019-05-22 23:16:55 +02:00
if err != nil {
2019-08-13 00:45:22 +02:00
return nil , err
2019-05-22 23:16:55 +02:00
}
s . flockF = flockF
// Load caches.
mem := memory . Allowed ( )
s . tsidCache = s . mustLoadCache ( "MetricName->TSID" , "metricName_tsid" , mem / 3 )
s . metricIDCache = s . mustLoadCache ( "MetricID->TSID" , "metricID_tsid" , mem / 16 )
s . metricNameCache = s . mustLoadCache ( "MetricID->MetricName" , "metricID_metricName" , mem / 8 )
s . dateMetricIDCache = s . mustLoadCache ( "Date->MetricID" , "date_metricID" , mem / 32 )
2019-06-14 06:52:32 +02:00
hour := uint64 ( timestampFromTime ( time . Now ( ) ) ) / msecPerHour
hmCurr := s . mustLoadHourMetricIDs ( hour , "curr_hour_metric_ids" )
hmPrev := s . mustLoadHourMetricIDs ( hour - 1 , "prev_hour_metric_ids" )
s . currHourMetricIDs . Store ( hmCurr )
s . prevHourMetricIDs . Store ( hmPrev )
2019-05-22 23:16:55 +02:00
// Load indexdb
idbPath := path + "/indexdb"
idbSnapshotsPath := idbPath + "/snapshots"
if err := fs . MkdirAllIfNotExist ( idbSnapshotsPath ) ; err != nil {
return nil , fmt . Errorf ( "cannot create %q: %s" , idbSnapshotsPath , err )
}
2019-06-09 18:06:53 +02:00
idbCurr , idbPrev , err := openIndexDBTables ( idbPath , s . metricIDCache , s . metricNameCache , & s . currHourMetricIDs , & s . prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
return nil , fmt . Errorf ( "cannot open indexdb tables at %q: %s" , idbPath , err )
}
idbCurr . SetExtDB ( idbPrev )
s . idbCurr . Store ( idbCurr )
2019-11-08 12:16:40 +01:00
// Initialize iidx. hmCurr and hmPrev shouldn't be used till now,
// so it should be safe initializing it inplace.
hmPrev . iidx = newInmemoryInvertedIndex ( )
hmPrev . iidx . MustUpdate ( s . idb ( ) , hmPrev . byTenant )
hmCurr . iidx = newInmemoryInvertedIndex ( )
hmCurr . iidx . MustUpdate ( s . idb ( ) , hmCurr . byTenant )
2019-05-22 23:16:55 +02:00
// Load data
tablePath := path + "/data"
tb , err := openTable ( tablePath , retentionMonths , s . getDeletedMetricIDs )
if err != nil {
s . idb ( ) . MustClose ( )
return nil , fmt . Errorf ( "cannot open table at %q: %s" , tablePath , err )
}
s . tb = tb
2019-06-09 18:06:53 +02:00
s . startCurrHourMetricIDsUpdater ( )
2019-05-22 23:16:55 +02:00
s . startRetentionWatcher ( )
return s , nil
}
// debugFlush flushes recently added storage data, so it becomes visible to search.
func ( s * Storage ) debugFlush ( ) {
s . tb . flushRawRows ( )
s . idb ( ) . tb . DebugFlush ( )
}
2019-09-24 20:10:22 +02:00
func ( s * Storage ) getDeletedMetricIDs ( ) * uint64set . Set {
2019-05-22 23:16:55 +02:00
return s . idb ( ) . getDeletedMetricIDs ( )
}
// CreateSnapshot creates snapshot for s and returns the snapshot name.
func ( s * Storage ) CreateSnapshot ( ) ( string , error ) {
logger . Infof ( "creating Storage snapshot for %q..." , s . path )
startTime := time . Now ( )
snapshotName := fmt . Sprintf ( "%s-%08X" , time . Now ( ) . UTC ( ) . Format ( "20060102150405" ) , nextSnapshotIdx ( ) )
srcDir := s . path
dstDir := fmt . Sprintf ( "%s/snapshots/%s" , srcDir , snapshotName )
if err := fs . MkdirAllFailIfExist ( dstDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot create dir %q: %s" , dstDir , err )
}
dstDataDir := dstDir + "/data"
if err := fs . MkdirAllFailIfExist ( dstDataDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot create dir %q: %s" , dstDataDir , err )
}
smallDir , bigDir , err := s . tb . CreateSnapshot ( snapshotName )
if err != nil {
return "" , fmt . Errorf ( "cannot create table snapshot: %s" , err )
}
dstSmallDir := dstDataDir + "/small"
if err := fs . SymlinkRelative ( smallDir , dstSmallDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %s" , smallDir , dstSmallDir , err )
}
dstBigDir := dstDataDir + "/big"
if err := fs . SymlinkRelative ( bigDir , dstBigDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %s" , bigDir , dstBigDir , err )
}
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( dstDataDir )
2019-05-22 23:16:55 +02:00
idbSnapshot := fmt . Sprintf ( "%s/indexdb/snapshots/%s" , s . path , snapshotName )
idb := s . idb ( )
currSnapshot := idbSnapshot + "/" + idb . name
if err := idb . tb . CreateSnapshotAt ( currSnapshot ) ; err != nil {
return "" , fmt . Errorf ( "cannot create curr indexDB snapshot: %s" , err )
}
ok := idb . doExtDB ( func ( extDB * indexDB ) {
prevSnapshot := idbSnapshot + "/" + extDB . name
err = extDB . tb . CreateSnapshotAt ( prevSnapshot )
} )
if ok && err != nil {
return "" , fmt . Errorf ( "cannot create prev indexDB snapshot: %s" , err )
}
dstIdbDir := dstDir + "/indexdb"
if err := fs . SymlinkRelative ( idbSnapshot , dstIdbDir ) ; err != nil {
return "" , fmt . Errorf ( "cannot create symlink from %q to %q: %s" , idbSnapshot , dstIdbDir , err )
}
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( dstDir )
fs . MustSyncPath ( srcDir + "/snapshots" )
2019-05-22 23:16:55 +02:00
logger . Infof ( "created Storage snapshot for %q at %q in %s" , srcDir , dstDir , time . Since ( startTime ) )
return snapshotName , nil
}
var snapshotNameRegexp = regexp . MustCompile ( "^[0-9]{14}-[0-9A-Fa-f]+$" )
// ListSnapshots returns sorted list of existing snapshots for s.
func ( s * Storage ) ListSnapshots ( ) ( [ ] string , error ) {
snapshotsPath := s . path + "/snapshots"
d , err := os . Open ( snapshotsPath )
if err != nil {
return nil , fmt . Errorf ( "cannot open %q: %s" , snapshotsPath , err )
}
defer fs . MustClose ( d )
fnames , err := d . Readdirnames ( - 1 )
if err != nil {
return nil , fmt . Errorf ( "cannot read contents of %q: %s" , snapshotsPath , err )
}
snapshotNames := make ( [ ] string , 0 , len ( fnames ) )
for _ , fname := range fnames {
if ! snapshotNameRegexp . MatchString ( fname ) {
continue
}
snapshotNames = append ( snapshotNames , fname )
}
sort . Strings ( snapshotNames )
return snapshotNames , nil
}
// DeleteSnapshot deletes the given snapshot.
func ( s * Storage ) DeleteSnapshot ( snapshotName string ) error {
if ! snapshotNameRegexp . MatchString ( snapshotName ) {
return fmt . Errorf ( "invalid snapshotName %q" , snapshotName )
}
snapshotPath := s . path + "/snapshots/" + snapshotName
logger . Infof ( "deleting snapshot %q..." , snapshotPath )
startTime := time . Now ( )
s . tb . MustDeleteSnapshot ( snapshotName )
idbPath := fmt . Sprintf ( "%s/indexdb/snapshots/%s" , s . path , snapshotName )
2019-06-12 00:53:43 +02:00
fs . MustRemoveAll ( idbPath )
fs . MustRemoveAll ( snapshotPath )
2019-05-22 23:16:55 +02:00
logger . Infof ( "deleted snapshot %q in %s" , snapshotPath , time . Since ( startTime ) )
return nil
}
var snapshotIdx = uint64 ( time . Now ( ) . UnixNano ( ) )
func nextSnapshotIdx ( ) uint64 {
return atomic . AddUint64 ( & snapshotIdx , 1 )
}
func ( s * Storage ) idb ( ) * indexDB {
return s . idbCurr . Load ( ) . ( * indexDB )
}
// Metrics contains essential metrics for the Storage.
type Metrics struct {
2019-07-26 13:10:25 +02:00
TooSmallTimestampRows uint64
TooBigTimestampRows uint64
2019-08-06 13:09:17 +02:00
AddRowsConcurrencyLimitReached uint64
AddRowsConcurrencyLimitTimeout uint64
AddRowsConcurrencyDroppedRows uint64
AddRowsConcurrencyCapacity uint64
AddRowsConcurrencyCurrent uint64
2019-05-22 23:16:55 +02:00
TSIDCacheSize uint64
2019-07-09 23:47:29 +02:00
TSIDCacheSizeBytes uint64
2019-05-22 23:16:55 +02:00
TSIDCacheRequests uint64
TSIDCacheMisses uint64
TSIDCacheCollisions uint64
MetricIDCacheSize uint64
2019-07-09 23:47:29 +02:00
MetricIDCacheSizeBytes uint64
2019-05-22 23:16:55 +02:00
MetricIDCacheRequests uint64
MetricIDCacheMisses uint64
MetricIDCacheCollisions uint64
MetricNameCacheSize uint64
2019-07-09 23:47:29 +02:00
MetricNameCacheSizeBytes uint64
2019-05-22 23:16:55 +02:00
MetricNameCacheRequests uint64
MetricNameCacheMisses uint64
MetricNameCacheCollisions uint64
DateMetricIDCacheSize uint64
2019-07-09 23:47:29 +02:00
DateMetricIDCacheSizeBytes uint64
2019-05-22 23:16:55 +02:00
DateMetricIDCacheRequests uint64
DateMetricIDCacheMisses uint64
DateMetricIDCacheCollisions uint64
2019-06-19 17:36:47 +02:00
HourMetricIDCacheSize uint64
2019-11-08 12:16:40 +01:00
RecentHourInvertedIndexSize uint64
RecentHourInvertedIndexUniqueTagPairsSize uint64
RecentHourInvertedIndexPendingMetricIDsSize uint64
2019-05-22 23:16:55 +02:00
IndexDBMetrics IndexDBMetrics
TableMetrics TableMetrics
}
// Reset resets m.
func ( m * Metrics ) Reset ( ) {
* m = Metrics { }
}
// UpdateMetrics updates m with metrics from s.
func ( s * Storage ) UpdateMetrics ( m * Metrics ) {
2019-07-26 13:10:25 +02:00
m . TooSmallTimestampRows += atomic . LoadUint64 ( & s . tooSmallTimestampRows )
m . TooBigTimestampRows += atomic . LoadUint64 ( & s . tooBigTimestampRows )
2019-08-06 13:09:17 +02:00
m . AddRowsConcurrencyLimitReached += atomic . LoadUint64 ( & s . addRowsConcurrencyLimitReached )
m . AddRowsConcurrencyLimitTimeout += atomic . LoadUint64 ( & s . addRowsConcurrencyLimitTimeout )
m . AddRowsConcurrencyDroppedRows += atomic . LoadUint64 ( & s . addRowsConcurrencyDroppedRows )
m . AddRowsConcurrencyCapacity = uint64 ( cap ( addRowsConcurrencyCh ) )
m . AddRowsConcurrencyCurrent = uint64 ( len ( addRowsConcurrencyCh ) )
2019-05-22 23:16:55 +02:00
var cs fastcache . Stats
s . tsidCache . UpdateStats ( & cs )
m . TSIDCacheSize += cs . EntriesCount
2019-07-09 23:47:29 +02:00
m . TSIDCacheSizeBytes += cs . BytesSize
2019-05-22 23:16:55 +02:00
m . TSIDCacheRequests += cs . GetCalls
m . TSIDCacheMisses += cs . Misses
m . TSIDCacheCollisions += cs . Collisions
cs . Reset ( )
s . metricIDCache . UpdateStats ( & cs )
m . MetricIDCacheSize += cs . EntriesCount
2019-07-09 23:47:29 +02:00
m . MetricIDCacheSizeBytes += cs . BytesSize
2019-05-22 23:16:55 +02:00
m . MetricIDCacheRequests += cs . GetCalls
m . MetricIDCacheMisses += cs . Misses
m . MetricIDCacheCollisions += cs . Collisions
cs . Reset ( )
s . metricNameCache . UpdateStats ( & cs )
m . MetricNameCacheSize += cs . EntriesCount
2019-07-09 23:47:29 +02:00
m . MetricNameCacheSizeBytes += cs . BytesSize
2019-05-22 23:16:55 +02:00
m . MetricNameCacheRequests += cs . GetCalls
m . MetricNameCacheMisses += cs . Misses
m . MetricNameCacheCollisions += cs . Collisions
cs . Reset ( )
s . dateMetricIDCache . UpdateStats ( & cs )
m . DateMetricIDCacheSize += cs . EntriesCount
2019-07-09 23:47:29 +02:00
m . DateMetricIDCacheSizeBytes += cs . BytesSize
2019-05-22 23:16:55 +02:00
m . DateMetricIDCacheRequests += cs . GetCalls
m . DateMetricIDCacheMisses += cs . Misses
m . DateMetricIDCacheCollisions += cs . Collisions
2019-06-19 17:36:47 +02:00
hmCurr := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
hmPrev := s . prevHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2019-09-24 20:10:22 +02:00
hourMetricIDsLen := hmPrev . m . Len ( )
if hmCurr . m . Len ( ) > hourMetricIDsLen {
hourMetricIDsLen = hmCurr . m . Len ( )
2019-06-19 17:36:47 +02:00
}
m . HourMetricIDCacheSize += uint64 ( hourMetricIDsLen )
2019-11-08 12:16:40 +01:00
m . RecentHourInvertedIndexSize += uint64 ( hmPrev . iidx . GetEntriesCount ( ) )
m . RecentHourInvertedIndexSize += uint64 ( hmCurr . iidx . GetEntriesCount ( ) )
m . RecentHourInvertedIndexUniqueTagPairsSize += uint64 ( hmPrev . iidx . GetUniqueTagPairsLen ( ) )
m . RecentHourInvertedIndexUniqueTagPairsSize += uint64 ( hmCurr . iidx . GetUniqueTagPairsLen ( ) )
m . RecentHourInvertedIndexPendingMetricIDsSize += uint64 ( hmPrev . iidx . GetPendingMetricIDsLen ( ) )
m . RecentHourInvertedIndexPendingMetricIDsSize += uint64 ( hmCurr . iidx . GetPendingMetricIDsLen ( ) )
2019-05-22 23:16:55 +02:00
s . idb ( ) . UpdateMetrics ( & m . IndexDBMetrics )
s . tb . UpdateMetrics ( & m . TableMetrics )
}
func ( s * Storage ) startRetentionWatcher ( ) {
s . retentionWatcherWG . Add ( 1 )
go func ( ) {
s . retentionWatcher ( )
s . retentionWatcherWG . Done ( )
} ( )
}
func ( s * Storage ) retentionWatcher ( ) {
for {
d := nextRetentionDuration ( s . retentionMonths )
select {
case <- s . stop :
return
case <- time . After ( d ) :
s . mustRotateIndexDB ( )
}
}
}
2019-06-09 18:06:53 +02:00
func ( s * Storage ) startCurrHourMetricIDsUpdater ( ) {
s . currHourMetricIDsUpdaterWG . Add ( 1 )
2019-06-02 17:34:08 +02:00
go func ( ) {
2019-06-09 18:06:53 +02:00
s . currHourMetricIDsUpdater ( )
s . currHourMetricIDsUpdaterWG . Done ( )
2019-06-02 17:34:08 +02:00
} ( )
}
2019-06-09 18:06:53 +02:00
var currHourMetricIDsUpdateInterval = time . Second * 10
2019-06-02 20:58:14 +02:00
2019-06-09 18:06:53 +02:00
func ( s * Storage ) currHourMetricIDsUpdater ( ) {
t := time . NewTimer ( currHourMetricIDsUpdateInterval )
2019-06-02 17:34:08 +02:00
for {
select {
case <- s . stop :
2019-11-08 12:16:40 +01:00
s . updateCurrHourMetricIDs ( )
2019-06-02 17:34:08 +02:00
return
case <- t . C :
2019-06-09 18:06:53 +02:00
s . updateCurrHourMetricIDs ( )
t . Reset ( currHourMetricIDsUpdateInterval )
2019-06-02 17:34:08 +02:00
}
}
}
2019-05-22 23:16:55 +02:00
func ( s * Storage ) mustRotateIndexDB ( ) {
// Create new indexdb table.
newTableName := nextIndexDBTableName ( )
idbNewPath := s . path + "/indexdb/" + newTableName
2019-06-09 18:06:53 +02:00
idbNew , err := openIndexDB ( idbNewPath , s . metricIDCache , s . metricNameCache , & s . currHourMetricIDs , & s . prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
logger . Panicf ( "FATAL: cannot create new indexDB at %q: %s" , idbNewPath , err )
}
// Drop extDB
idbCurr := s . idb ( )
idbCurr . doExtDB ( func ( extDB * indexDB ) {
extDB . scheduleToDrop ( )
} )
idbCurr . SetExtDB ( nil )
// Start using idbNew
idbNew . SetExtDB ( idbCurr )
s . idbCurr . Store ( idbNew )
// Persist changes on the file system.
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( s . path )
2019-05-22 23:16:55 +02:00
// Flush tsidCache, so idbNew can be populated with fresh data.
s . tsidCache . Reset ( )
// Flush dateMetricIDCache, so idbNew can be populated with fresh data.
s . dateMetricIDCache . Reset ( )
// Do not flush metricIDCache and metricNameCache, since all the metricIDs
// from prev idb remain valid after the rotation.
}
// MustClose closes the storage.
func ( s * Storage ) MustClose ( ) {
close ( s . stop )
s . retentionWatcherWG . Wait ( )
2019-06-09 18:06:53 +02:00
s . currHourMetricIDsUpdaterWG . Wait ( )
2019-05-22 23:16:55 +02:00
s . tb . MustClose ( )
s . idb ( ) . MustClose ( )
// Save caches.
2019-08-13 20:35:19 +02:00
s . mustSaveAndStopCache ( s . tsidCache , "MetricName->TSID" , "metricName_tsid" )
s . mustSaveAndStopCache ( s . metricIDCache , "MetricID->TSID" , "metricID_tsid" )
s . mustSaveAndStopCache ( s . metricNameCache , "MetricID->MetricName" , "metricID_metricName" )
s . mustSaveAndStopCache ( s . dateMetricIDCache , "Date->MetricID" , "date_metricID" )
2019-05-22 23:16:55 +02:00
2019-06-14 06:52:32 +02:00
hmCurr := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
s . mustSaveHourMetricIDs ( hmCurr , "curr_hour_metric_ids" )
hmPrev := s . prevHourMetricIDs . Load ( ) . ( * hourMetricIDs )
s . mustSaveHourMetricIDs ( hmPrev , "prev_hour_metric_ids" )
2019-05-22 23:16:55 +02:00
// Release lock file.
if err := s . flockF . Close ( ) ; err != nil {
logger . Panicf ( "FATAL: cannot close lock file %q: %s" , s . flockF . Name ( ) , err )
}
}
2019-06-14 06:52:32 +02:00
func ( s * Storage ) mustLoadHourMetricIDs ( hour uint64 , name string ) * hourMetricIDs {
path := s . cachePath + "/" + name
logger . Infof ( "loading %s from %q..." , name , path )
startTime := time . Now ( )
if ! fs . IsPathExist ( path ) {
logger . Infof ( "nothing to load from %q" , path )
2019-11-08 12:16:40 +01:00
return & hourMetricIDs {
hour : hour ,
}
2019-06-14 06:52:32 +02:00
}
src , err := ioutil . ReadFile ( path )
if err != nil {
logger . Panicf ( "FATAL: cannot read %s: %s" , path , err )
}
srcOrigLen := len ( src )
if len ( src ) < 24 {
logger . Errorf ( "discarding %s, since it has broken header; got %d bytes; want %d bytes" , path , len ( src ) , 24 )
2019-11-08 12:16:40 +01:00
return & hourMetricIDs {
hour : hour ,
}
2019-06-14 06:52:32 +02:00
}
2019-10-31 14:50:58 +01:00
// Unmarshal header
2019-06-14 06:52:32 +02:00
isFull := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
hourLoaded := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
if hourLoaded != hour {
2019-11-08 12:16:40 +01:00
logger . Infof ( "discarding %s, since it contains outdated hour; got %d; want %d" , name , hourLoaded , hour )
return & hourMetricIDs {
hour : hour ,
}
2019-06-14 06:52:32 +02:00
}
2019-10-31 14:50:58 +01:00
// Unmarshal hm.m
2019-06-14 06:52:32 +02:00
hmLen := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
2019-10-31 14:50:58 +01:00
if uint64 ( len ( src ) ) < 8 * hmLen {
logger . Errorf ( "discarding %s, since it has broken hm.m data; got %d bytes; want %d bytes" , path , len ( src ) , 8 * hmLen )
2019-11-08 12:16:40 +01:00
return & hourMetricIDs {
hour : hour ,
}
2019-06-14 06:52:32 +02:00
}
2019-09-24 20:10:22 +02:00
m := & uint64set . Set { }
2019-06-14 06:52:32 +02:00
for i := uint64 ( 0 ) ; i < hmLen ; i ++ {
metricID := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
2019-09-24 20:10:22 +02:00
m . Add ( metricID )
2019-06-14 06:52:32 +02:00
}
2019-10-31 14:50:58 +01:00
// Unmarshal hm.byTenant
if len ( src ) < 8 {
logger . Errorf ( "discarding %s, since it has broken hm.byTenant header; got %d bytes; want %d bytes" , path , len ( src ) , 8 )
return & hourMetricIDs { }
}
byTenantLen := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
byTenant := make ( map [ accountProjectKey ] * uint64set . Set , byTenantLen )
for i := uint64 ( 0 ) ; i < byTenantLen ; i ++ {
if len ( src ) < 16 {
logger . Errorf ( "discarding %s, since it has broken accountID:projectID prefix; got %d bytes; want %d bytes" , path , len ( src ) , 16 )
return & hourMetricIDs { }
}
accountID := encoding . UnmarshalUint32 ( src )
src = src [ 4 : ]
projectID := encoding . UnmarshalUint32 ( src )
src = src [ 4 : ]
mLen := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
if uint64 ( len ( src ) ) < 8 * mLen {
logger . Errorf ( "discarding %s, since it has borken accountID:projectID entry; got %d bytes; want %d bytes" , path , len ( src ) , 8 * mLen )
return & hourMetricIDs { }
}
m := & uint64set . Set { }
for j := uint64 ( 0 ) ; j < mLen ; j ++ {
metricID := encoding . UnmarshalUint64 ( src )
src = src [ 8 : ]
m . Add ( metricID )
}
k := accountProjectKey {
AccountID : accountID ,
ProjectID : projectID ,
}
byTenant [ k ] = m
}
2019-07-09 23:47:29 +02:00
logger . Infof ( "loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) , hmLen , srcOrigLen )
2019-06-14 06:52:32 +02:00
return & hourMetricIDs {
2019-10-31 14:50:58 +01:00
m : m ,
byTenant : byTenant ,
hour : hourLoaded ,
isFull : isFull != 0 ,
2019-06-14 06:52:32 +02:00
}
}
func ( s * Storage ) mustSaveHourMetricIDs ( hm * hourMetricIDs , name string ) {
path := s . cachePath + "/" + name
logger . Infof ( "saving %s to %q..." , name , path )
startTime := time . Now ( )
2019-09-24 20:10:22 +02:00
dst := make ( [ ] byte , 0 , hm . m . Len ( ) * 8 + 24 )
2019-06-14 06:52:32 +02:00
isFull := uint64 ( 0 )
if hm . isFull {
isFull = 1
}
2019-10-31 14:50:58 +01:00
// Marshal header
2019-06-14 06:52:32 +02:00
dst = encoding . MarshalUint64 ( dst , isFull )
dst = encoding . MarshalUint64 ( dst , hm . hour )
2019-10-31 14:50:58 +01:00
// Marshal hm.m
2019-09-24 20:10:22 +02:00
dst = encoding . MarshalUint64 ( dst , uint64 ( hm . m . Len ( ) ) )
for _ , metricID := range hm . m . AppendTo ( nil ) {
2019-06-14 06:52:32 +02:00
dst = encoding . MarshalUint64 ( dst , metricID )
}
2019-10-31 14:50:58 +01:00
// Marshal hm.byTenant
var metricIDs [ ] uint64
dst = encoding . MarshalUint64 ( dst , uint64 ( len ( hm . byTenant ) ) )
for k , e := range hm . byTenant {
dst = encoding . MarshalUint32 ( dst , k . AccountID )
dst = encoding . MarshalUint32 ( dst , k . ProjectID )
dst = encoding . MarshalUint64 ( dst , uint64 ( e . Len ( ) ) )
metricIDs = e . AppendTo ( metricIDs [ : 0 ] )
for _ , metricID := range metricIDs {
dst = encoding . MarshalUint64 ( dst , metricID )
}
}
2019-06-14 06:52:32 +02:00
if err := ioutil . WriteFile ( path , dst , 0644 ) ; err != nil {
logger . Panicf ( "FATAL: cannot write %d bytes to %q: %s" , len ( dst ) , path , err )
}
2019-09-24 20:10:22 +02:00
logger . Infof ( "saved %s to %q in %s; entriesCount: %d; sizeBytes: %d" , name , path , time . Since ( startTime ) , hm . m . Len ( ) , len ( dst ) )
2019-06-14 06:52:32 +02:00
}
2019-08-13 20:35:19 +02:00
func ( s * Storage ) mustLoadCache ( info , name string , sizeBytes int ) * workingsetcache . Cache {
2019-05-22 23:16:55 +02:00
path := s . cachePath + "/" + name
logger . Infof ( "loading %s cache from %q..." , info , path )
startTime := time . Now ( )
2019-08-13 20:35:19 +02:00
c := workingsetcache . Load ( path , sizeBytes , time . Hour )
2019-05-22 23:16:55 +02:00
var cs fastcache . Stats
c . UpdateStats ( & cs )
2019-07-09 23:47:29 +02:00
logger . Infof ( "loaded %s cache from %q in %s; entriesCount: %d; sizeBytes: %d" ,
2019-05-22 23:16:55 +02:00
info , path , time . Since ( startTime ) , cs . EntriesCount , cs . BytesSize )
return c
}
2019-08-13 20:35:19 +02:00
func ( s * Storage ) mustSaveAndStopCache ( c * workingsetcache . Cache , info , name string ) {
2019-05-22 23:16:55 +02:00
path := s . cachePath + "/" + name
logger . Infof ( "saving %s cache to %q..." , info , path )
startTime := time . Now ( )
2019-08-13 20:35:19 +02:00
if err := c . Save ( path ) ; err != nil {
2019-05-22 23:16:55 +02:00
logger . Panicf ( "FATAL: cannot save %s cache to %q: %s" , info , path , err )
}
var cs fastcache . Stats
c . UpdateStats ( & cs )
2019-08-13 20:35:19 +02:00
c . Stop ( )
2019-07-09 23:47:29 +02:00
logger . Infof ( "saved %s cache to %q in %s; entriesCount: %d; sizeBytes: %d" ,
2019-05-22 23:16:55 +02:00
info , path , time . Since ( startTime ) , cs . EntriesCount , cs . BytesSize )
}
func nextRetentionDuration ( retentionMonths int ) time . Duration {
t := time . Now ( ) . UTC ( )
n := t . Year ( ) * 12 + int ( t . Month ( ) ) - 1 + retentionMonths
n -= n % retentionMonths
y := n / 12
m := time . Month ( ( n % 12 ) + 1 )
deadline := time . Date ( y , m , 1 , 0 , 0 , 0 , 0 , time . UTC )
return deadline . Sub ( t )
}
2019-09-23 21:34:04 +02:00
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
2019-05-22 23:16:55 +02:00
func ( s * Storage ) searchTSIDs ( tfss [ ] * TagFilters , tr TimeRange , maxMetrics int ) ( [ ] TSID , error ) {
// Do not cache tfss -> tsids here, since the caching is performed
// on idb level.
tsids , err := s . idb ( ) . searchTSIDs ( tfss , tr , maxMetrics )
if err != nil {
return nil , fmt . Errorf ( "error when searching tsids for tfss %q: %s" , tfss , err )
}
return tsids , nil
}
// DeleteMetrics deletes all the metrics matching the given tfss.
//
// Returns the number of metrics deleted.
func ( s * Storage ) DeleteMetrics ( tfss [ ] * TagFilters ) ( int , error ) {
deletedCount , err := s . idb ( ) . DeleteTSIDs ( tfss )
if err != nil {
return deletedCount , fmt . Errorf ( "cannot delete tsids: %s" , err )
}
// Do not reset MetricName -> TSID cache (tsidCache), since the obtained
// entries must be checked against deleted metricIDs.
// See Storage.add for details.
//
// Do not reset MetricID -> MetricName cache, since it must be used only
// after filtering out deleted metricIDs.
return deletedCount , nil
}
// searchMetricName appends metric name for the given metricID to dst
// and returns the result.
2019-05-22 23:23:23 +02:00
func ( s * Storage ) searchMetricName ( dst [ ] byte , metricID uint64 , accountID , projectID uint32 ) ( [ ] byte , error ) {
return s . idb ( ) . searchMetricName ( dst , metricID , accountID , projectID )
2019-05-22 23:16:55 +02:00
}
2019-05-22 23:23:23 +02:00
// SearchTagKeys searches for tag keys for the given (accountID, projectID).
func ( s * Storage ) SearchTagKeys ( accountID , projectID uint32 , maxTagKeys int ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagKeys ( accountID , projectID , maxTagKeys )
2019-05-22 23:16:55 +02:00
}
2019-05-22 23:23:23 +02:00
// SearchTagValues searches for tag values for the given tagKey in (accountID, projectID).
func ( s * Storage ) SearchTagValues ( accountID , projectID uint32 , tagKey [ ] byte , maxTagValues int ) ( [ ] string , error ) {
return s . idb ( ) . SearchTagValues ( accountID , projectID , tagKey , maxTagValues )
2019-05-22 23:16:55 +02:00
}
2019-06-10 17:55:20 +02:00
// SearchTagEntries returns a list of (tagName -> tagValues) for (accountID, projectID).
func ( s * Storage ) SearchTagEntries ( accountID , projectID uint32 , maxTagKeys , maxTagValues int ) ( [ ] TagEntry , error ) {
idb := s . idb ( )
keys , err := idb . SearchTagKeys ( accountID , projectID , maxTagKeys )
if err != nil {
return nil , fmt . Errorf ( "cannot search tag keys: %s" , err )
}
// Sort keys for faster seeks below
sort . Strings ( keys )
tes := make ( [ ] TagEntry , len ( keys ) )
for i , key := range keys {
values , err := idb . SearchTagValues ( accountID , projectID , [ ] byte ( key ) , maxTagValues )
if err != nil {
return nil , fmt . Errorf ( "cannot search values for tag %q: %s" , key , err )
}
te := & tes [ i ]
te . Key = key
te . Values = values
}
return tes , nil
}
// TagEntry contains (tagName -> tagValues) mapping
type TagEntry struct {
// Key is tagName
Key string
// Values contains all the values for Key.
Values [ ] string
}
2019-05-22 23:23:23 +02:00
// GetSeriesCount returns the approximate number of unique time series for the given (accountID, projectID).
2019-05-22 23:16:55 +02:00
//
// It includes the deleted series too and may count the same series
// up to two times - in db and extDB.
2019-05-22 23:23:23 +02:00
func ( s * Storage ) GetSeriesCount ( accountID , projectID uint32 ) ( uint64 , error ) {
return s . idb ( ) . GetSeriesCount ( accountID , projectID )
2019-05-22 23:16:55 +02:00
}
// MetricRow is a metric to insert into storage.
type MetricRow struct {
// MetricNameRaw contains raw metric name, which must be decoded
// with MetricName.unmarshalRaw.
MetricNameRaw [ ] byte
Timestamp int64
Value float64
}
// CopyFrom copies src to mr.
func ( mr * MetricRow ) CopyFrom ( src * MetricRow ) {
mr . MetricNameRaw = append ( mr . MetricNameRaw [ : 0 ] , src . MetricNameRaw ... )
mr . Timestamp = src . Timestamp
mr . Value = src . Value
}
// String returns string representation of the mr.
func ( mr * MetricRow ) String ( ) string {
metricName := string ( mr . MetricNameRaw )
var mn MetricName
if err := mn . unmarshalRaw ( mr . MetricNameRaw ) ; err == nil {
metricName = mn . String ( )
}
2019-05-22 23:23:23 +02:00
return fmt . Sprintf ( "MetricName=%s, Timestamp=%d, Value=%f\n" , metricName , mr . Timestamp , mr . Value )
2019-05-22 23:16:55 +02:00
}
// Marshal appends marshaled mr to dst and returns the result.
func ( mr * MetricRow ) Marshal ( dst [ ] byte ) [ ] byte {
2019-05-22 23:23:23 +02:00
return MarshalMetricRow ( dst , mr . MetricNameRaw , mr . Timestamp , mr . Value )
}
// MarshalMetricRow marshals MetricRow data to dst and returns the result.
func MarshalMetricRow ( dst [ ] byte , metricNameRaw [ ] byte , timestamp int64 , value float64 ) [ ] byte {
dst = encoding . MarshalBytes ( dst , metricNameRaw )
dst = encoding . MarshalUint64 ( dst , uint64 ( timestamp ) )
dst = encoding . MarshalUint64 ( dst , math . Float64bits ( value ) )
2019-05-22 23:16:55 +02:00
return dst
}
// Unmarshal unmarshals mr from src and returns the remaining tail from src.
func ( mr * MetricRow ) Unmarshal ( src [ ] byte ) ( [ ] byte , error ) {
tail , metricNameRaw , err := encoding . UnmarshalBytes ( src )
if err != nil {
return tail , fmt . Errorf ( "cannot unmarshal MetricName: %s" , err )
}
mr . MetricNameRaw = append ( mr . MetricNameRaw [ : 0 ] , metricNameRaw ... )
if len ( tail ) < 8 {
return tail , fmt . Errorf ( "cannot unmarshal Timestamp: want %d bytes; have %d bytes" , 8 , len ( tail ) )
}
timestamp := encoding . UnmarshalUint64 ( tail )
mr . Timestamp = int64 ( timestamp )
tail = tail [ 8 : ]
if len ( tail ) < 8 {
return tail , fmt . Errorf ( "cannot unmarshal Value: want %d bytes; have %d bytes" , 8 , len ( tail ) )
}
value := encoding . UnmarshalUint64 ( tail )
mr . Value = math . Float64frombits ( value )
tail = tail [ 8 : ]
return tail , nil
}
// AddRows adds the given mrs to s.
func ( s * Storage ) AddRows ( mrs [ ] MetricRow , precisionBits uint8 ) error {
if len ( mrs ) == 0 {
return nil
}
// Limit the number of concurrent goroutines that may add rows to the storage.
// This should prevent from out of memory errors and CPU trashing when too many
// goroutines call AddRows.
select {
case addRowsConcurrencyCh <- struct { } { } :
defer func ( ) { <- addRowsConcurrencyCh } ( )
2019-08-06 13:09:17 +02:00
default :
// Sleep for a while until giving up
atomic . AddUint64 ( & s . addRowsConcurrencyLimitReached , 1 )
t := timerpool . Get ( addRowsTimeout )
select {
case addRowsConcurrencyCh <- struct { } { } :
timerpool . Put ( t )
defer func ( ) { <- addRowsConcurrencyCh } ( )
case <- t . C :
timerpool . Put ( t )
atomic . AddUint64 ( & s . addRowsConcurrencyLimitTimeout , 1 )
atomic . AddUint64 ( & s . addRowsConcurrencyDroppedRows , uint64 ( len ( mrs ) ) )
return fmt . Errorf ( "Cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers. Add more CPUs or reduce load" ,
len ( mrs ) , addRowsTimeout , cap ( addRowsConcurrencyCh ) )
}
2019-05-22 23:16:55 +02:00
}
// Add rows to the storage.
var err error
rr := getRawRowsWithSize ( len ( mrs ) )
rr . rows , err = s . add ( rr . rows , mrs , precisionBits )
putRawRows ( rr )
return err
}
var (
addRowsConcurrencyCh = make ( chan struct { } , runtime . GOMAXPROCS ( - 1 ) * 2 )
addRowsTimeout = 30 * time . Second
)
func ( s * Storage ) add ( rows [ ] rawRow , mrs [ ] MetricRow , precisionBits uint8 ) ( [ ] rawRow , error ) {
var is * indexSearch
var mn * MetricName
var kb * bytesutil . ByteBuffer
idb := s . idb ( )
dmis := idb . getDeletedMetricIDs ( )
rowsLen := len ( rows )
if n := rowsLen + len ( mrs ) - cap ( rows ) ; n > 0 {
rows = append ( rows [ : cap ( rows ) ] , make ( [ ] rawRow , n ) ... )
}
rows = rows [ : rowsLen + len ( mrs ) ]
j := 0
2019-07-11 16:04:56 +02:00
minTimestamp , maxTimestamp := s . tb . getMinMaxTimestamps ( )
2019-11-08 12:16:40 +01:00
// Return only the last error, since it has no sense in returning all errors.
var lastWarn error
2019-05-22 23:16:55 +02:00
for i := range mrs {
mr := & mrs [ i ]
if math . IsNaN ( mr . Value ) {
// Just skip NaNs, since the underlying encoding
// doesn't know how to work with them.
continue
}
2019-07-26 13:10:25 +02:00
if mr . Timestamp < minTimestamp {
// Skip rows with too small timestamps outside the retention.
2019-10-20 22:38:51 +02:00
lastWarn = fmt . Errorf ( "cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d" , mr . Timestamp , minTimestamp )
2019-07-26 13:10:25 +02:00
atomic . AddUint64 ( & s . tooSmallTimestampRows , 1 )
continue
}
if mr . Timestamp > maxTimestamp {
// Skip rows with too big timestamps significantly exceeding the current time.
2019-10-20 22:38:51 +02:00
lastWarn = fmt . Errorf ( "cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d" , mr . Timestamp , maxTimestamp )
2019-07-26 13:10:25 +02:00
atomic . AddUint64 ( & s . tooBigTimestampRows , 1 )
2019-07-11 16:04:56 +02:00
continue
}
2019-05-22 23:16:55 +02:00
r := & rows [ rowsLen + j ]
j ++
r . Timestamp = mr . Timestamp
r . Value = mr . Value
r . PrecisionBits = precisionBits
if s . getTSIDFromCache ( & r . TSID , mr . MetricNameRaw ) {
2019-09-24 20:10:22 +02:00
if ! dmis . Has ( r . TSID . MetricID ) {
2019-05-22 23:16:55 +02:00
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
continue
}
}
// Slow path - the TSID is missing in the cache. Search for it in the index.
if is == nil {
is = idb . getIndexSearch ( )
mn = GetMetricName ( )
kb = kbPool . Get ( )
}
if err := mn . unmarshalRaw ( mr . MetricNameRaw ) ; err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
2019-10-20 22:38:51 +02:00
lastWarn = fmt . Errorf ( "cannot unmarshal MetricNameRaw %q: %s" , mr . MetricNameRaw , err )
2019-05-22 23:16:55 +02:00
j --
continue
}
mn . sortTags ( )
kb . B = mn . Marshal ( kb . B [ : 0 ] )
if err := is . GetOrCreateTSIDByName ( & r . TSID , kb . B ) ; err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
2019-10-20 22:38:51 +02:00
lastWarn = fmt . Errorf ( "cannot obtain TSID for MetricName %q: %s" , kb . B , err )
2019-05-22 23:16:55 +02:00
j --
continue
}
s . putTSIDToCache ( & r . TSID , mr . MetricNameRaw )
}
2019-10-31 13:29:35 +01:00
if lastWarn != nil {
logger . Errorf ( "warn occurred during rows addition: %s" , lastWarn )
}
2019-05-22 23:16:55 +02:00
if is != nil {
kbPool . Put ( kb )
PutMetricName ( mn )
idb . putIndexSearch ( is )
}
rows = rows [ : rowsLen + j ]
2019-10-31 13:29:35 +01:00
var lastError error
2019-05-22 23:16:55 +02:00
if err := s . tb . AddRows ( rows ) ; err != nil {
2019-08-25 14:28:32 +02:00
lastError = fmt . Errorf ( "cannot add rows to table: %s" , err )
2019-05-22 23:16:55 +02:00
}
2019-10-31 13:29:35 +01:00
if err := s . updateDateMetricIDCache ( rows , lastError ) ; err != nil {
lastError = err
2019-05-22 23:16:55 +02:00
}
2019-10-31 13:29:35 +01:00
if lastError != nil {
return rows , fmt . Errorf ( "error occurred during rows addition: %s" , lastError )
2019-10-20 22:38:51 +02:00
}
2019-05-22 23:16:55 +02:00
return rows , nil
}
2019-08-25 14:28:32 +02:00
func ( s * Storage ) updateDateMetricIDCache ( rows [ ] rawRow , lastError error ) error {
2019-05-22 23:16:55 +02:00
var date uint64
2019-06-09 18:06:53 +02:00
var hour uint64
2019-05-22 23:16:55 +02:00
var prevTimestamp int64
kb := kbPool . Get ( )
defer kbPool . Put ( kb )
kb . B = bytesutil . Resize ( kb . B , 16 )
keyBuf := kb . B
a := ( * [ 2 ] uint64 ) ( unsafe . Pointer ( & keyBuf [ 0 ] ) )
idb := s . idb ( )
2019-11-08 12:16:40 +01:00
hm := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2019-05-22 23:16:55 +02:00
for i := range rows {
r := & rows [ i ]
if r . Timestamp != prevTimestamp {
date = uint64 ( r . Timestamp ) / msecPerDay
2019-06-09 18:06:53 +02:00
hour = uint64 ( r . Timestamp ) / msecPerHour
2019-05-22 23:16:55 +02:00
prevTimestamp = r . Timestamp
}
metricID := r . TSID . MetricID
2019-06-09 18:06:53 +02:00
if hour == hm . hour {
// The r belongs to the current hour. Check for the current hour cache.
2019-09-24 20:10:22 +02:00
if hm . m . Has ( metricID ) {
2019-06-09 18:06:53 +02:00
// Fast path: the metricID is in the current hour cache.
2019-06-02 17:34:08 +02:00
continue
}
2019-11-08 12:16:40 +01:00
s . pendingHourEntriesLock . Lock ( )
2019-10-31 14:50:58 +01:00
e := pendingHourMetricIDEntry {
AccountID : r . TSID . AccountID ,
ProjectID : r . TSID . ProjectID ,
MetricID : metricID ,
}
2019-11-08 12:16:40 +01:00
s . pendingHourEntries = append ( s . pendingHourEntries , e )
s . pendingHourEntriesLock . Unlock ( )
hm . iidx . AddMetricID ( idb , e )
2019-06-02 17:34:08 +02:00
}
// Slower path: check global cache for (date, metricID) entry.
2019-05-22 23:16:55 +02:00
a [ 0 ] = date
a [ 1 ] = metricID
if s . dateMetricIDCache . Has ( keyBuf ) {
continue
}
2019-06-02 17:34:08 +02:00
// Slow path: store the entry in the (date, metricID) cache and in the indexDB.
2019-05-22 23:16:55 +02:00
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
s . dateMetricIDCache . Set ( keyBuf , nil )
2019-05-22 23:23:23 +02:00
if err := idb . storeDateMetricID ( date , metricID , r . TSID . AccountID , r . TSID . ProjectID ) ; err != nil {
2019-08-25 14:28:32 +02:00
lastError = err
2019-05-22 23:16:55 +02:00
continue
}
}
2019-08-25 14:28:32 +02:00
return lastError
2019-05-22 23:16:55 +02:00
}
2019-06-09 18:06:53 +02:00
func ( s * Storage ) updateCurrHourMetricIDs ( ) {
hm := s . currHourMetricIDs . Load ( ) . ( * hourMetricIDs )
2019-11-08 12:16:40 +01:00
s . pendingHourEntriesLock . Lock ( )
newEntries := append ( [ ] pendingHourMetricIDEntry { } , s . pendingHourEntries ... )
s . pendingHourEntries = s . pendingHourEntries [ : 0 ]
s . pendingHourEntriesLock . Unlock ( )
2019-06-09 18:06:53 +02:00
hour := uint64 ( timestampFromTime ( time . Now ( ) ) ) / msecPerHour
2019-11-08 12:16:40 +01:00
if len ( newEntries ) == 0 && hm . hour == hour {
2019-06-09 18:06:53 +02:00
// Fast path: nothing to update.
2019-06-02 17:34:08 +02:00
return
}
2019-11-08 12:16:40 +01:00
// Slow path: hm.m must be updated with non-empty s.pendingHourEntries.
2019-09-24 20:10:22 +02:00
var m * uint64set . Set
2019-11-08 12:16:40 +01:00
var iidx * inmemoryInvertedIndex
2019-10-31 14:50:58 +01:00
var byTenant map [ accountProjectKey ] * uint64set . Set
2019-06-09 18:06:53 +02:00
isFull := hm . isFull
if hm . hour == hour {
2019-09-24 20:10:22 +02:00
m = hm . m . Clone ( )
2019-11-08 12:16:40 +01:00
iidx = hm . iidx . Clone ( )
2019-10-31 14:50:58 +01:00
byTenant = make ( map [ accountProjectKey ] * uint64set . Set , len ( hm . byTenant ) )
for k , e := range hm . byTenant {
byTenant [ k ] = e . Clone ( )
}
2019-06-09 18:06:53 +02:00
} else {
2019-09-24 20:10:22 +02:00
m = & uint64set . Set { }
2019-11-08 12:16:40 +01:00
iidx = newInmemoryInvertedIndex ( )
2019-10-31 14:50:58 +01:00
byTenant = make ( map [ accountProjectKey ] * uint64set . Set )
2019-06-09 18:06:53 +02:00
isFull = true
}
2019-10-31 14:50:58 +01:00
2019-11-08 12:16:40 +01:00
for _ , x := range newEntries {
2019-10-31 14:50:58 +01:00
m . Add ( x . MetricID )
k := accountProjectKey {
AccountID : x . AccountID ,
ProjectID : x . ProjectID ,
}
e := byTenant [ k ]
if e == nil {
e = & uint64set . Set { }
byTenant [ k ] = e
}
e . Add ( x . MetricID )
2019-06-02 17:34:08 +02:00
}
2019-06-09 18:06:53 +02:00
hmNew := & hourMetricIDs {
2019-10-31 14:50:58 +01:00
m : m ,
2019-11-08 12:16:40 +01:00
iidx : iidx ,
2019-10-31 14:50:58 +01:00
byTenant : byTenant ,
hour : hour ,
isFull : isFull ,
2019-06-09 18:06:53 +02:00
}
s . currHourMetricIDs . Store ( hmNew )
if hm . hour != hour {
s . prevHourMetricIDs . Store ( hm )
2019-06-02 17:34:08 +02:00
}
}
2019-06-09 18:06:53 +02:00
type hourMetricIDs struct {
2019-10-31 14:50:58 +01:00
m * uint64set . Set
2019-11-08 12:16:40 +01:00
iidx * inmemoryInvertedIndex
2019-10-31 14:50:58 +01:00
byTenant map [ accountProjectKey ] * uint64set . Set
hour uint64
isFull bool
2019-06-02 17:34:08 +02:00
}
2019-05-22 23:16:55 +02:00
func ( s * Storage ) getTSIDFromCache ( dst * TSID , metricName [ ] byte ) bool {
buf := ( * [ unsafe . Sizeof ( * dst ) ] byte ) ( unsafe . Pointer ( dst ) ) [ : ]
buf = s . tsidCache . Get ( buf [ : 0 ] , metricName )
return uintptr ( len ( buf ) ) == unsafe . Sizeof ( * dst )
}
func ( s * Storage ) putTSIDToCache ( tsid * TSID , metricName [ ] byte ) {
buf := ( * [ unsafe . Sizeof ( * tsid ) ] byte ) ( unsafe . Pointer ( tsid ) ) [ : ]
s . tsidCache . Set ( metricName , buf )
}
2019-08-13 20:35:19 +02:00
func openIndexDBTables ( path string , metricIDCache , metricNameCache * workingsetcache . Cache , currHourMetricIDs , prevHourMetricIDs * atomic . Value ) ( curr , prev * indexDB , err error ) {
2019-05-22 23:16:55 +02:00
if err := fs . MkdirAllIfNotExist ( path ) ; err != nil {
return nil , nil , fmt . Errorf ( "cannot create directory %q: %s" , path , err )
}
d , err := os . Open ( path )
if err != nil {
return nil , nil , fmt . Errorf ( "cannot open directory: %s" , err )
}
defer fs . MustClose ( d )
// Search for the two most recent tables - the last one is active,
// the previous one contains backup data.
fis , err := d . Readdir ( - 1 )
if err != nil {
return nil , nil , fmt . Errorf ( "cannot read directory: %s" , err )
}
var tableNames [ ] string
for _ , fi := range fis {
if ! fs . IsDirOrSymlink ( fi ) {
// Skip non-directories.
continue
}
tableName := fi . Name ( )
if ! indexDBTableNameRegexp . MatchString ( tableName ) {
// Skip invalid directories.
continue
}
tableNames = append ( tableNames , tableName )
}
sort . Slice ( tableNames , func ( i , j int ) bool {
return tableNames [ i ] < tableNames [ j ]
} )
if len ( tableNames ) < 2 {
// Create missing tables
if len ( tableNames ) == 0 {
prevName := nextIndexDBTableName ( )
tableNames = append ( tableNames , prevName )
}
currName := nextIndexDBTableName ( )
tableNames = append ( tableNames , currName )
}
// Invariant: len(tableNames) >= 2
// Remove all the tables except two last tables.
for _ , tn := range tableNames [ : len ( tableNames ) - 2 ] {
pathToRemove := path + "/" + tn
logger . Infof ( "removing obsolete indexdb dir %q..." , pathToRemove )
2019-06-12 00:53:43 +02:00
fs . MustRemoveAll ( pathToRemove )
2019-05-22 23:16:55 +02:00
logger . Infof ( "removed obsolete indexdb dir %q" , pathToRemove )
}
// Persist changes on the file system.
2019-06-11 22:13:04 +02:00
fs . MustSyncPath ( path )
2019-05-22 23:16:55 +02:00
// Open the last two tables.
currPath := path + "/" + tableNames [ len ( tableNames ) - 1 ]
2019-06-09 18:06:53 +02:00
curr , err = openIndexDB ( currPath , metricIDCache , metricNameCache , currHourMetricIDs , prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
return nil , nil , fmt . Errorf ( "cannot open curr indexdb table at %q: %s" , currPath , err )
}
prevPath := path + "/" + tableNames [ len ( tableNames ) - 2 ]
2019-06-09 18:06:53 +02:00
prev , err = openIndexDB ( prevPath , metricIDCache , metricNameCache , currHourMetricIDs , prevHourMetricIDs )
2019-05-22 23:16:55 +02:00
if err != nil {
curr . MustClose ( )
return nil , nil , fmt . Errorf ( "cannot open prev indexdb table at %q: %s" , prevPath , err )
}
return curr , prev , nil
}
var indexDBTableNameRegexp = regexp . MustCompile ( "^[0-9A-F]{16}$" )
func nextIndexDBTableName ( ) string {
n := atomic . AddUint64 ( & indexDBTableIdx , 1 )
return fmt . Sprintf ( "%016X" , n )
}
var indexDBTableIdx = uint64 ( time . Now ( ) . UnixNano ( ) )