lib/storage: use specialized cache for (date, metricID) entries

This improves ingestion performance.
This commit is contained in:
Aliaksandr Valialkin 2019-11-09 23:05:14 +02:00
parent e573ef2126
commit 5810ba57c2
3 changed files with 139 additions and 37 deletions

View File

@ -469,9 +469,6 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/date_metricID"}`, func() float64 {
return float64(m().DateMetricIDCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagCacheSizeBytes)
})
@ -488,9 +485,6 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_requests_total{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/date_metricID"}`, func() float64 {
return float64(m().DateMetricIDCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/bigIndexBlocks"}`, func() float64 {
return float64(tm().BigIndexBlocksCacheRequests)
})
@ -522,9 +516,6 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_misses_total{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="storage/date_metricID"}`, func() float64 {
return float64(m().DateMetricIDCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="storage/bigIndexBlocks"}`, func() float64 {
return float64(tm().BigIndexBlocksCacheMisses)
})
@ -557,7 +548,4 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_collisions_total{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheCollisions)
})
metrics.NewGauge(`vm_cache_collisions_total{type="storage/date_metricID"}`, func() float64 {
return float64(m().DateMetricIDCacheCollisions)
})
}

View File

@ -890,7 +890,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
// Obtain metricIDs to delete.
tr := TimeRange{
MinTimestamp: 0,
MaxTimestamp: (1<<63)-1,
MaxTimestamp: (1 << 63) - 1,
}
is := db.getIndexSearch()
metricIDs, err := is.searchMetricIDs(tfss, tr, 1e12)

View File

@ -59,7 +59,7 @@ type Storage struct {
metricNameCache *workingsetcache.Cache
// dateMetricIDCache is (Date, MetricID) cache.
dateMetricIDCache *workingsetcache.Cache
dateMetricIDCache *dateMetricIDCache
// Fast cache for MetricID values occured during the current hour.
currHourMetricIDs atomic.Value
@ -118,7 +118,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
s.tsidCache = s.mustLoadCache("MetricName->TSID", "metricName_tsid", mem/3)
s.metricIDCache = s.mustLoadCache("MetricID->TSID", "metricID_tsid", mem/16)
s.metricNameCache = s.mustLoadCache("MetricID->MetricName", "metricID_metricName", mem/8)
s.dateMetricIDCache = s.mustLoadCache("Date->MetricID", "date_metricID", mem/32)
s.dateMetricIDCache = newDateMetricIDCache()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hmCurr := s.mustLoadHourMetricIDs(hour, "curr_hour_metric_ids")
@ -312,11 +312,7 @@ type Metrics struct {
MetricNameCacheMisses uint64
MetricNameCacheCollisions uint64
DateMetricIDCacheSize uint64
DateMetricIDCacheSizeBytes uint64
DateMetricIDCacheRequests uint64
DateMetricIDCacheMisses uint64
DateMetricIDCacheCollisions uint64
DateMetricIDCacheSize uint64
HourMetricIDCacheSize uint64
@ -368,13 +364,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.MetricNameCacheMisses += cs.Misses
m.MetricNameCacheCollisions += cs.Collisions
cs.Reset()
s.dateMetricIDCache.UpdateStats(&cs)
m.DateMetricIDCacheSize += cs.EntriesCount
m.DateMetricIDCacheSizeBytes += cs.BytesSize
m.DateMetricIDCacheRequests += cs.GetCalls
m.DateMetricIDCacheMisses += cs.Misses
m.DateMetricIDCacheCollisions += cs.Collisions
m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount())
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
@ -488,7 +478,6 @@ func (s *Storage) MustClose() {
s.mustSaveAndStopCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
s.mustSaveAndStopCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid")
s.mustSaveAndStopCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
s.mustSaveAndStopCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID")
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids")
@ -907,11 +896,6 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
var date uint64
var hour uint64
var prevTimestamp int64
kb := kbPool.Get()
defer kbPool.Put(kb)
kb.B = bytesutil.Resize(kb.B, 16)
keyBuf := kb.B
a := (*[2]uint64)(unsafe.Pointer(&keyBuf[0]))
idb := s.idb()
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
for i := range rows {
@ -935,16 +919,14 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
}
// Slower path: check global cache for (date, metricID) entry.
a[0] = date
a[1] = metricID
if s.dateMetricIDCache.Has(keyBuf) {
if s.dateMetricIDCache.Has(date, metricID) {
continue
}
// Slow path: store the entry in the (date, metricID) cache and in the indexDB.
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
s.dateMetricIDCache.Set(keyBuf, nil)
s.dateMetricIDCache.Set(date, metricID)
if err := idb.storeDateMetricID(date, metricID); err != nil {
lastError = err
continue
@ -953,6 +935,138 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
return lastError
}
// dateMetricIDCache is fast cache for holding (date, metricID) entries.
//
// It should be faster than map[date]*uint64set.Set on multicore systems.
type dateMetricIDCache struct {
// Contains immutable map
byDate atomic.Value
// Contains mutable map protected by mu
byDateMutable *byDateMetricIDMap
lastSyncTime time.Time
mu sync.RWMutex
}
func newDateMetricIDCache() *dateMetricIDCache {
var dmc dateMetricIDCache
dmc.Reset()
return &dmc
}
func (dmc *dateMetricIDCache) Reset() {
dmc.mu.Lock()
dmc.byDate.Store(newByDateMetricIDMap())
dmc.byDateMutable = newByDateMetricIDMap()
dmc.mu.Unlock()
}
func (dmc *dateMetricIDCache) EntriesCount() int {
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
n := 0
for _, e := range byDate.m {
n += e.v.Len()
}
return n
}
func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
v := byDate.get(date)
if v.Has(metricID) {
// Fast path.
// The majority of calls must go here.
return true
}
// Slow path. Check mutable map.
currentTime := time.Now()
dmc.mu.RLock()
v = dmc.byDateMutable.get(date)
ok := v.Has(metricID)
mustSync := false
if currentTime.Sub(dmc.lastSyncTime) > 10*time.Second {
mustSync = true
dmc.lastSyncTime = currentTime
}
dmc.mu.RUnlock()
if mustSync {
dmc.sync()
}
return ok
}
func (dmc *dateMetricIDCache) Set(date, metricID uint64) {
dmc.mu.Lock()
v := dmc.byDateMutable.getOrCreate(date)
v.Add(metricID)
dmc.mu.Unlock()
}
func (dmc *dateMetricIDCache) sync() {
dmc.mu.Lock()
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
for date, e := range dmc.byDateMutable.m {
v := byDate.get(date)
e.v.Union(v)
}
dmc.byDate.Store(dmc.byDateMutable)
byDateMutable := newByDateMetricIDMap()
dmc.byDateMutable = byDateMutable
dmc.mu.Unlock()
if dmc.EntriesCount() > memory.Allowed()/128 {
dmc.Reset()
}
}
type byDateMetricIDMap struct {
hotEntry atomic.Value
m map[uint64]*byDateMetricIDEntry
}
func newByDateMetricIDMap() *byDateMetricIDMap {
dmm := &byDateMetricIDMap{
m: make(map[uint64]*byDateMetricIDEntry),
}
dmm.hotEntry.Store(&byDateMetricIDEntry{})
return dmm
}
func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set {
hotEntry := dmm.hotEntry.Load().(*byDateMetricIDEntry)
if hotEntry.date == date {
// Fast path
return &hotEntry.v
}
// Slow path
e := dmm.m[date]
if e == nil {
return nil
}
dmm.hotEntry.Store(e)
return &e.v
}
func (dmm *byDateMetricIDMap) getOrCreate(date uint64) *uint64set.Set {
v := dmm.get(date)
if v != nil {
return v
}
e := &byDateMetricIDEntry{
date: date,
}
dmm.m[date] = e
return &e.v
}
type byDateMetricIDEntry struct {
date uint64
v uint64set.Set
}
func (s *Storage) updateCurrHourMetricIDs() {
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.pendingHourEntriesLock.Lock()