From 5810ba57c2d1f7d9088a2a7cf00463cc0ef0c519 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 9 Nov 2019 23:05:14 +0200 Subject: [PATCH] lib/storage: use specialized cache for (date, metricID) entries This improves ingestion performance. --- app/vmstorage/main.go | 12 --- lib/storage/index_db.go | 2 +- lib/storage/storage.go | 162 ++++++++++++++++++++++++++++++++++------ 3 files changed, 139 insertions(+), 37 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index de1fb3def9..2400e95eb0 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -469,9 +469,6 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheSizeBytes) }) - metrics.NewGauge(`vm_cache_size_bytes{type="storage/date_metricID"}`, func() float64 { - return float64(m().DateMetricIDCacheSizeBytes) - }) metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagCacheSizeBytes) }) @@ -488,9 +485,6 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_cache_requests_total{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheRequests) }) - metrics.NewGauge(`vm_cache_requests_total{type="storage/date_metricID"}`, func() float64 { - return float64(m().DateMetricIDCacheRequests) - }) metrics.NewGauge(`vm_cache_requests_total{type="storage/bigIndexBlocks"}`, func() float64 { return float64(tm().BigIndexBlocksCacheRequests) }) @@ -522,9 +516,6 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_cache_misses_total{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheMisses) }) - metrics.NewGauge(`vm_cache_misses_total{type="storage/date_metricID"}`, func() float64 { - return float64(m().DateMetricIDCacheMisses) - }) metrics.NewGauge(`vm_cache_misses_total{type="storage/bigIndexBlocks"}`, func() float64 { return float64(tm().BigIndexBlocksCacheMisses) }) @@ -557,7 +548,4 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_cache_collisions_total{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheCollisions) }) - metrics.NewGauge(`vm_cache_collisions_total{type="storage/date_metricID"}`, func() float64 { - return float64(m().DateMetricIDCacheCollisions) - }) } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 679cb95ccb..711de0e303 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -890,7 +890,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { // Obtain metricIDs to delete. tr := TimeRange{ MinTimestamp: 0, - MaxTimestamp: (1<<63)-1, + MaxTimestamp: (1 << 63) - 1, } is := db.getIndexSearch() metricIDs, err := is.searchMetricIDs(tfss, tr, 1e12) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index e14307a64a..19a7af9b2c 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -59,7 +59,7 @@ type Storage struct { metricNameCache *workingsetcache.Cache // dateMetricIDCache is (Date, MetricID) cache. - dateMetricIDCache *workingsetcache.Cache + dateMetricIDCache *dateMetricIDCache // Fast cache for MetricID values occured during the current hour. currHourMetricIDs atomic.Value @@ -118,7 +118,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.tsidCache = s.mustLoadCache("MetricName->TSID", "metricName_tsid", mem/3) s.metricIDCache = s.mustLoadCache("MetricID->TSID", "metricID_tsid", mem/16) s.metricNameCache = s.mustLoadCache("MetricID->MetricName", "metricID_metricName", mem/8) - s.dateMetricIDCache = s.mustLoadCache("Date->MetricID", "date_metricID", mem/32) + s.dateMetricIDCache = newDateMetricIDCache() hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmCurr := s.mustLoadHourMetricIDs(hour, "curr_hour_metric_ids") @@ -312,11 +312,7 @@ type Metrics struct { MetricNameCacheMisses uint64 MetricNameCacheCollisions uint64 - DateMetricIDCacheSize uint64 - DateMetricIDCacheSizeBytes uint64 - DateMetricIDCacheRequests uint64 - DateMetricIDCacheMisses uint64 - DateMetricIDCacheCollisions uint64 + DateMetricIDCacheSize uint64 HourMetricIDCacheSize uint64 @@ -368,13 +364,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.MetricNameCacheMisses += cs.Misses m.MetricNameCacheCollisions += cs.Collisions - cs.Reset() - s.dateMetricIDCache.UpdateStats(&cs) - m.DateMetricIDCacheSize += cs.EntriesCount - m.DateMetricIDCacheSizeBytes += cs.BytesSize - m.DateMetricIDCacheRequests += cs.GetCalls - m.DateMetricIDCacheMisses += cs.Misses - m.DateMetricIDCacheCollisions += cs.Collisions + m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount()) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) @@ -488,7 +478,6 @@ func (s *Storage) MustClose() { s.mustSaveAndStopCache(s.tsidCache, "MetricName->TSID", "metricName_tsid") s.mustSaveAndStopCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid") s.mustSaveAndStopCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName") - s.mustSaveAndStopCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID") hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids") @@ -907,11 +896,6 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error var date uint64 var hour uint64 var prevTimestamp int64 - kb := kbPool.Get() - defer kbPool.Put(kb) - kb.B = bytesutil.Resize(kb.B, 16) - keyBuf := kb.B - a := (*[2]uint64)(unsafe.Pointer(&keyBuf[0])) idb := s.idb() hm := s.currHourMetricIDs.Load().(*hourMetricIDs) for i := range rows { @@ -935,16 +919,14 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error } // Slower path: check global cache for (date, metricID) entry. - a[0] = date - a[1] = metricID - if s.dateMetricIDCache.Has(keyBuf) { + if s.dateMetricIDCache.Has(date, metricID) { continue } // Slow path: store the entry in the (date, metricID) cache and in the indexDB. // It is OK if the (date, metricID) entry is added multiple times to db // by concurrent goroutines. - s.dateMetricIDCache.Set(keyBuf, nil) + s.dateMetricIDCache.Set(date, metricID) if err := idb.storeDateMetricID(date, metricID); err != nil { lastError = err continue @@ -953,6 +935,138 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error return lastError } +// dateMetricIDCache is fast cache for holding (date, metricID) entries. +// +// It should be faster than map[date]*uint64set.Set on multicore systems. +type dateMetricIDCache struct { + // Contains immutable map + byDate atomic.Value + + // Contains mutable map protected by mu + byDateMutable *byDateMetricIDMap + lastSyncTime time.Time + mu sync.RWMutex +} + +func newDateMetricIDCache() *dateMetricIDCache { + var dmc dateMetricIDCache + dmc.Reset() + return &dmc +} + +func (dmc *dateMetricIDCache) Reset() { + dmc.mu.Lock() + dmc.byDate.Store(newByDateMetricIDMap()) + dmc.byDateMutable = newByDateMetricIDMap() + dmc.mu.Unlock() +} + +func (dmc *dateMetricIDCache) EntriesCount() int { + byDate := dmc.byDate.Load().(*byDateMetricIDMap) + n := 0 + for _, e := range byDate.m { + n += e.v.Len() + } + return n +} + +func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { + byDate := dmc.byDate.Load().(*byDateMetricIDMap) + v := byDate.get(date) + if v.Has(metricID) { + // Fast path. + // The majority of calls must go here. + return true + } + + // Slow path. Check mutable map. + currentTime := time.Now() + + dmc.mu.RLock() + v = dmc.byDateMutable.get(date) + ok := v.Has(metricID) + mustSync := false + if currentTime.Sub(dmc.lastSyncTime) > 10*time.Second { + mustSync = true + dmc.lastSyncTime = currentTime + } + dmc.mu.RUnlock() + + if mustSync { + dmc.sync() + } + return ok +} + +func (dmc *dateMetricIDCache) Set(date, metricID uint64) { + dmc.mu.Lock() + v := dmc.byDateMutable.getOrCreate(date) + v.Add(metricID) + dmc.mu.Unlock() +} + +func (dmc *dateMetricIDCache) sync() { + dmc.mu.Lock() + byDate := dmc.byDate.Load().(*byDateMetricIDMap) + for date, e := range dmc.byDateMutable.m { + v := byDate.get(date) + e.v.Union(v) + } + dmc.byDate.Store(dmc.byDateMutable) + byDateMutable := newByDateMetricIDMap() + dmc.byDateMutable = byDateMutable + dmc.mu.Unlock() + + if dmc.EntriesCount() > memory.Allowed()/128 { + dmc.Reset() + } +} + +type byDateMetricIDMap struct { + hotEntry atomic.Value + m map[uint64]*byDateMetricIDEntry +} + +func newByDateMetricIDMap() *byDateMetricIDMap { + dmm := &byDateMetricIDMap{ + m: make(map[uint64]*byDateMetricIDEntry), + } + dmm.hotEntry.Store(&byDateMetricIDEntry{}) + return dmm +} + +func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set { + hotEntry := dmm.hotEntry.Load().(*byDateMetricIDEntry) + if hotEntry.date == date { + // Fast path + return &hotEntry.v + } + // Slow path + e := dmm.m[date] + if e == nil { + return nil + } + dmm.hotEntry.Store(e) + return &e.v +} + +func (dmm *byDateMetricIDMap) getOrCreate(date uint64) *uint64set.Set { + v := dmm.get(date) + if v != nil { + return v + } + e := &byDateMetricIDEntry{ + date: date, + } + dmm.m[date] = e + return &e.v +} + +type byDateMetricIDEntry struct { + date uint64 + v uint64set.Set +} + func (s *Storage) updateCurrHourMetricIDs() { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) s.pendingHourEntriesLock.Lock()