diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index ab8f1f0bda..0f2b2ba5c6 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -93,7 +93,7 @@ func MultitenancyEnabled() bool { } // Contains the current relabelConfigs. -var allRelabelConfigs atomic.Value +var allRelabelConfigs atomic.Pointer[relabelConfigs] // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value, // since it may lead to high memory usage due to big number of buffers. @@ -346,7 +346,7 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) { } var rctx *relabelCtx - rcs := allRelabelConfigs.Load().(*relabelConfigs) + rcs := allRelabelConfigs.Load() pcsGlobal := rcs.global if pcsGlobal.Len() > 0 || len(labelsGlobal) > 0 { rctx = getRelabelCtx() @@ -612,7 +612,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { // Apply relabeling var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries - rcs := allRelabelConfigs.Load().(*relabelConfigs) + rcs := allRelabelConfigs.Load() pcs := rcs.perURL[rwctx.idx] if pcs.Len() > 0 { rctx = getRelabelCtx() diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 502883d5f9..44f45b16ec 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -476,10 +476,10 @@ type storageNodesBucket struct { } // storageNodes contains a list of vmstorage node clients. -var storageNodes atomic.Value +var storageNodes atomic.Pointer[storageNodesBucket] func getStorageNodesBucket() *storageNodesBucket { - return storageNodes.Load().(*storageNodesBucket) + return storageNodes.Load() } func setStorageNodesBucket(snb *storageNodesBucket) { diff --git a/app/vminsert/relabel/relabel.go b/app/vminsert/relabel/relabel.go index 054b8b67f0..fcb107c24e 100644 --- a/app/vminsert/relabel/relabel.go +++ b/app/vminsert/relabel/relabel.go @@ -69,7 +69,7 @@ var ( configTimestamp = metrics.NewCounter(`vm_relabel_config_last_reload_success_timestamp_seconds`) ) -var pcsGlobal atomic.Value +var pcsGlobal atomic.Pointer[promrelabel.ParsedConfigs] // CheckRelabelConfig checks config pointed by -relabelConfig func CheckRelabelConfig() error { @@ -90,7 +90,7 @@ func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) { // HasRelabeling returns true if there is global relabeling. func HasRelabeling() bool { - pcs := pcsGlobal.Load().(*promrelabel.ParsedConfigs) + pcs := pcsGlobal.Load() return pcs.Len() > 0 || *usePromCompatibleNaming } @@ -110,7 +110,7 @@ func (ctx *Ctx) Reset() { // // The returned labels are valid until the next call to ApplyRelabeling. func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label { - pcs := pcsGlobal.Load().(*promrelabel.ParsedConfigs) + pcs := pcsGlobal.Load() if pcs.Len() == 0 && !*usePromCompatibleNaming { // There are no relabeling rules. return labels diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 7c3950c49f..7e51d16610 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -2689,10 +2689,10 @@ type storageNodesBucket struct { sns []*storageNode } -var storageNodes atomic.Value +var storageNodes atomic.Pointer[storageNodesBucket] func getStorageNodesBucket() *storageNodesBucket { - return storageNodes.Load().(*storageNodesBucket) + return storageNodes.Load() } func setStorageNodesBucket(snb *storageNodesBucket) { diff --git a/lib/appmetrics/appmetrics.go b/lib/appmetrics/appmetrics.go index ba6885e29a..8532c528d6 100644 --- a/lib/appmetrics/appmetrics.go +++ b/lib/appmetrics/appmetrics.go @@ -32,14 +32,14 @@ func WritePrometheusMetrics(w io.Writer) { } metricsCacheLock.Unlock() - bb := metricsCache.Load().(*bytesutil.ByteBuffer) + bb := metricsCache.Load() _, _ = w.Write(bb.B) } var ( metricsCacheLock sync.Mutex metricsCacheLastUpdateTime time.Time - metricsCache atomic.Value + metricsCache atomic.Pointer[bytesutil.ByteBuffer] ) func writePrometheusMetrics(w io.Writer) { diff --git a/lib/bloomfilter/limiter.go b/lib/bloomfilter/limiter.go index 1d047414b6..70069ddd85 100644 --- a/lib/bloomfilter/limiter.go +++ b/lib/bloomfilter/limiter.go @@ -11,7 +11,7 @@ import ( // It is safe using the Limiter from concurrent goroutines. type Limiter struct { maxItems int - v atomic.Value + v atomic.Pointer[limiter] wg sync.WaitGroup stopCh chan struct{} @@ -55,7 +55,7 @@ func (l *Limiter) MaxItems() int { // CurrentItems return the current number of items registered in l. func (l *Limiter) CurrentItems() int { - lm := l.v.Load().(*limiter) + lm := l.v.Load() n := atomic.LoadUint64(&lm.currentItems) return int(n) } @@ -67,7 +67,7 @@ func (l *Limiter) CurrentItems() int { // True is returned if h is added or already exists in l. // False is returned if h cannot be added to l, since it already has maxItems unique items. func (l *Limiter) Add(h uint64) bool { - lm := l.v.Load().(*limiter) + lm := l.v.Load() return lm.Add(h) } diff --git a/lib/encoding/zstd/zstd_pure.go b/lib/encoding/zstd/zstd_pure.go index 3fc666b9cd..5336337e3a 100644 --- a/lib/encoding/zstd/zstd_pure.go +++ b/lib/encoding/zstd/zstd_pure.go @@ -15,7 +15,7 @@ var ( decoder *zstd.Decoder mu sync.Mutex - av atomic.Value + av atomic.Pointer[registry] ) type registry map[int]*zstd.Encoder @@ -45,7 +45,7 @@ func CompressLevel(dst, src []byte, compressionLevel int) []byte { } func getEncoder(compressionLevel int) *zstd.Encoder { - r := av.Load().(registry) + r := av.Load() e := r[compressionLevel] if e != nil { return e @@ -54,7 +54,7 @@ func getEncoder(compressionLevel int) *zstd.Encoder { mu.Lock() // Create the encoder under lock in order to prevent from wasted work // when concurrent goroutines create encoder for the same compressionLevel. - r1 := av.Load().(registry) + r1 := av.Load() if e = r1[compressionLevel]; e == nil { e = newEncoder(compressionLevel) r2 := make(registry) diff --git a/lib/promscrape/discovery/kuma/api.go b/lib/promscrape/discovery/kuma/api.go index c2c33359a3..ce93572d07 100644 --- a/lib/promscrape/discovery/kuma/api.go +++ b/lib/promscrape/discovery/kuma/api.go @@ -27,7 +27,7 @@ type apiConfig struct { apiPath string // labels contains the latest discovered labels. - labels atomic.Value + labels atomic.Pointer[[]*promutils.Labels] cancel context.CancelFunc wg sync.WaitGroup diff --git a/lib/promscrape/discovery/kuma/kuma.go b/lib/promscrape/discovery/kuma/kuma.go index 7d42a46baa..c5bd824904 100644 --- a/lib/promscrape/discovery/kuma/kuma.go +++ b/lib/promscrape/discovery/kuma/kuma.go @@ -38,8 +38,7 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) { if err != nil { return nil, fmt.Errorf("cannot get API config for kuma_sd: %w", err) } - v := cfg.labels.Load() - pLabels := v.(*[]*promutils.Labels) + pLabels := cfg.labels.Load() return *pLabels, nil } diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 53bfccda48..f6bf3dd8d8 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -84,18 +84,17 @@ var ( PendingScrapeConfigs int32 // configData contains -promscrape.config data - configData atomic.Value + configData atomic.Pointer[[]byte] ) // WriteConfigData writes -promscrape.config contents to w func WriteConfigData(w io.Writer) { - v := configData.Load() - if v == nil { + p := configData.Load() + if p == nil { // Nothing to write to w return } - b := v.(*[]byte) - _, _ = w.Write(*b) + _, _ = w.Write(*p) } func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 71077c848c..77a0e967aa 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -60,7 +60,7 @@ type Storage struct { // lock file for exclusive access to the storage on the given path. flockF *os.File - idbCurr atomic.Value + idbCurr atomic.Pointer[indexDB] tb *table @@ -81,16 +81,16 @@ type Storage struct { dateMetricIDCache *dateMetricIDCache // Fast cache for MetricID values occurred during the current hour. - currHourMetricIDs atomic.Value + currHourMetricIDs atomic.Pointer[hourMetricIDs] // Fast cache for MetricID values occurred during the previous hour. - prevHourMetricIDs atomic.Value + prevHourMetricIDs atomic.Pointer[hourMetricIDs] // Fast cache for pre-populating per-day inverted index for the next day. // This is needed in order to remove CPU usage spikes at 00:00 UTC // due to creation of per-day inverted index for active time series. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 for details. - nextDayMetricIDs atomic.Value + nextDayMetricIDs atomic.Pointer[byDateMetricIDEntry] // Pending MetricID values to be added to currHourMetricIDs. pendingHourEntriesLock sync.Mutex @@ -101,7 +101,7 @@ type Storage struct { pendingNextDayMetricIDs *uint64set.Set // prefetchedMetricIDs contains metricIDs for pre-fetched metricNames in the prefetchMetricNames function. - prefetchedMetricIDs atomic.Value + prefetchedMetricIDs atomic.Pointer[uint64set.Set] // prefetchedMetricIDsDeadline is used for periodic reset of prefetchedMetricIDs in order to limit its size under high rate of creating new series. prefetchedMetricIDsDeadline uint64 @@ -129,7 +129,7 @@ type Storage struct { // // It is safe to keep the set in memory even for big number of deleted // metricIDs, since it usually requires 1 bit per deleted metricID. - deletedMetricIDs atomic.Value + deletedMetricIDs atomic.Pointer[uint64set.Set] deletedMetricIDsUpdateLock sync.Mutex isReadOnly uint32 @@ -283,7 +283,7 @@ func getTSIDCacheSize() int { } func (s *Storage) getDeletedMetricIDs() *uint64set.Set { - return s.deletedMetricIDs.Load().(*uint64set.Set) + return s.deletedMetricIDs.Load() } func (s *Storage) setDeletedMetricIDs(dmis *uint64set.Set) { @@ -449,7 +449,7 @@ func (s *Storage) DeleteStaleSnapshots(maxAge time.Duration) error { } func (s *Storage) idb() *indexDB { - return s.idbCurr.Load().(*indexDB) + return s.idbCurr.Load() } // Metrics contains essential metrics for the Storage. @@ -584,8 +584,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount) m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() + hmPrev := s.prevHourMetricIDs.Load() hourMetricIDsLen := hmPrev.m.Len() if hmCurr.m.Len() > hourMetricIDsLen { hourMetricIDsLen = hmCurr.m.Len() @@ -594,11 +594,11 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes() m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes() - nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v + nextDayMetricIDs := &s.nextDayMetricIDs.Load().v m.NextDayMetricIDCacheSize += uint64(nextDayMetricIDs.Len()) m.NextDayMetricIDCacheSizeBytes += nextDayMetricIDs.SizeBytes() - prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) + prefetchedMetricIDs := s.prefetchedMetricIDs.Load() m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len()) m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes()) @@ -812,12 +812,12 @@ func (s *Storage) MustClose() { s.mustSaveCache(s.metricNameCache, "metricID_metricName") s.metricNameCache.Stop() - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids") - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() s.mustSaveHourMetricIDs(hmPrev, "prev_hour_metric_ids") - nextDayMetricIDs := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry) + nextDayMetricIDs := s.nextDayMetricIDs.Load() s.mustSaveNextDayMetricIDs(nextDayMetricIDs) // Release lock file. @@ -1160,7 +1160,7 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, accountID, project return nil } var metricIDs uint64Sorter - prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) + prefetchedMetricIDs := s.prefetchedMetricIDs.Load() for _, metricID := range srcMetricIDs { if prefetchedMetricIDs.Has(metricID) { continue @@ -2014,10 +2014,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { prevDate uint64 prevMetricID uint64 ) - hm := s.currHourMetricIDs.Load().(*hourMetricIDs) - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hm := s.currHourMetricIDs.Load() + hmPrev := s.prevHourMetricIDs.Load() hmPrevDate := hmPrev.hour / 24 - nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v + nextDayMetricIDs := &s.nextDayMetricIDs.Load().v ts := fasttime.UnixTimestamp() // Start pre-populating the next per-day inverted index during the last hour of the current day. // pMin linearly increases from 0 to 1 during the last hour of the day. @@ -2174,7 +2174,7 @@ type dateMetricIDCache struct { resetsCount uint64 // Contains immutable map - byDate atomic.Value + byDate atomic.Pointer[byDateMetricIDMap] // Contains mutable map protected by mu byDateMutable *byDateMetricIDMap @@ -2204,7 +2204,7 @@ func (dmc *dateMetricIDCache) resetLocked() { } func (dmc *dateMetricIDCache) EntriesCount() int { - byDate := dmc.byDate.Load().(*byDateMetricIDMap) + byDate := dmc.byDate.Load() n := 0 for _, e := range byDate.m { n += e.v.Len() @@ -2213,7 +2213,7 @@ func (dmc *dateMetricIDCache) EntriesCount() int { } func (dmc *dateMetricIDCache) SizeBytes() uint64 { - byDate := dmc.byDate.Load().(*byDateMetricIDMap) + byDate := dmc.byDate.Load() n := uint64(0) for _, e := range byDate.m { n += e.v.SizeBytes() @@ -2222,7 +2222,7 @@ func (dmc *dateMetricIDCache) SizeBytes() uint64 { } func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { - byDate := dmc.byDate.Load().(*byDateMetricIDMap) + byDate := dmc.byDate.Load() v := byDate.get(date) if v.Has(metricID) { // Fast path. @@ -2288,7 +2288,7 @@ func (dmc *dateMetricIDCache) syncLocked() { // Nothing to sync. return } - byDate := dmc.byDate.Load().(*byDateMetricIDMap) + byDate := dmc.byDate.Load() byDateMutable := dmc.byDateMutable for date, e := range byDateMutable.m { v := byDate.get(date) @@ -2301,7 +2301,7 @@ func (dmc *dateMetricIDCache) syncLocked() { date: date, v: *v, } - if date == byDateMutable.hotEntry.Load().(*byDateMetricIDEntry).date { + if date == byDateMutable.hotEntry.Load().date { byDateMutable.hotEntry.Store(dme) } byDateMutable.m[date] = dme @@ -2324,7 +2324,7 @@ func (dmc *dateMetricIDCache) syncLocked() { } type byDateMetricIDMap struct { - hotEntry atomic.Value + hotEntry atomic.Pointer[byDateMetricIDEntry] m map[uint64]*byDateMetricIDEntry } @@ -2337,7 +2337,7 @@ func newByDateMetricIDMap() *byDateMetricIDMap { } func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set { - hotEntry := dmm.hotEntry.Load().(*byDateMetricIDEntry) + hotEntry := dmm.hotEntry.Load() if hotEntry.date == date { // Fast path return &hotEntry.v @@ -2369,7 +2369,7 @@ type byDateMetricIDEntry struct { } func (s *Storage) updateNextDayMetricIDs(date uint64) { - e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry) + e := s.nextDayMetricIDs.Load() s.pendingNextDayMetricIDsLock.Lock() pendingMetricIDs := s.pendingNextDayMetricIDs s.pendingNextDayMetricIDs = &uint64set.Set{} @@ -2396,7 +2396,7 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) { } func (s *Storage) updateCurrHourMetricIDs(hour uint64) { - hm := s.currHourMetricIDs.Load().(*hourMetricIDs) + hm := s.currHourMetricIDs.Load() var newEntries []pendingHourMetricIDEntry s.pendingHourEntriesLock.Lock() if len(s.pendingHourEntries) < cap(s.pendingHourEntries)/2 { diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 35dd4c6866..01a5c00bdc 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -168,7 +168,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. hour = uint64(timestampFromTime(time.Now())) / msecPerHour @@ -180,7 +180,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() if !reflect.DeepEqual(hmPrev, hmOrig) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } @@ -200,7 +200,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. hour = uint64(timestampFromTime(time.Now())) / msecPerHour @@ -214,7 +214,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() hmEmpty := &hourMetricIDs{} if !reflect.DeepEqual(hmPrev, hmEmpty) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) @@ -260,7 +260,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. hour = uint64(timestampFromTime(time.Now())) / msecPerHour @@ -275,7 +275,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() if !reflect.DeepEqual(hmPrev, hmOrig) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } @@ -317,7 +317,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. hour = uint64(timestampFromTime(time.Now())) / msecPerHour @@ -341,7 +341,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() hmEmpty := &hourMetricIDs{} if !reflect.DeepEqual(hmPrev, hmEmpty) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) @@ -385,7 +385,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. hour = uint64(timestampFromTime(time.Now())) / msecPerHour @@ -409,7 +409,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() hmEmpty := &hourMetricIDs{} if !reflect.DeepEqual(hmPrev, hmEmpty) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) @@ -436,7 +436,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { } s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } @@ -447,7 +447,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) { t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() if !reflect.DeepEqual(hmPrev, hmOrig) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index be70279760..a48235521f 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -30,8 +30,8 @@ const ( // The cache evicts inactive entries after the given expireDuration. // Recently accessed entries survive expireDuration. type Cache struct { - curr atomic.Value - prev atomic.Value + curr atomic.Pointer[fastcache.Cache] + prev atomic.Pointer[fastcache.Cache] // csHistory holds cache stats history csHistory fastcache.Stats @@ -148,8 +148,8 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) { return } // Reset prev cache and swap it with the curr cache. - prev := c.prev.Load().(*fastcache.Cache) - curr := c.curr.Load().(*fastcache.Cache) + prev := c.prev.Load() + curr := c.curr.Load() c.prev.Store(curr) var cs fastcache.Stats prev.UpdateStats(&cs) @@ -188,8 +188,8 @@ func (c *Cache) prevCacheWatcher() { c.mu.Unlock() return } - prev := c.prev.Load().(*fastcache.Cache) - curr := c.curr.Load().(*fastcache.Cache) + prev := c.prev.Load() + curr := c.curr.Load() var csCurr, csPrev fastcache.Stats curr.UpdateStats(&csCurr) prev.UpdateStats(&csPrev) @@ -232,7 +232,7 @@ func (c *Cache) cacheSizeWatcher() { continue } var cs fastcache.Stats - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.UpdateStats(&cs) if cs.BytesSize >= uint64(0.9*float64(cs.MaxBytesSize)) { maxBytesSize = cs.MaxBytesSize @@ -254,8 +254,8 @@ func (c *Cache) cacheSizeWatcher() { c.mu.Lock() c.setMode(switching) - prev := c.prev.Load().(*fastcache.Cache) - curr := c.curr.Load().(*fastcache.Cache) + prev := c.prev.Load() + curr := c.curr.Load() c.prev.Store(curr) var cs fastcache.Stats prev.UpdateStats(&cs) @@ -273,7 +273,7 @@ func (c *Cache) cacheSizeWatcher() { case <-t.C: } var cs fastcache.Stats - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.UpdateStats(&cs) if cs.BytesSize >= maxBytesSize { break @@ -282,7 +282,7 @@ func (c *Cache) cacheSizeWatcher() { c.mu.Lock() c.setMode(whole) - prev = c.prev.Load().(*fastcache.Cache) + prev = c.prev.Load() c.prev.Store(fastcache.New(1024)) cs.Reset() prev.UpdateStats(&cs) @@ -293,7 +293,7 @@ func (c *Cache) cacheSizeWatcher() { // Save saves the cache to filePath. func (c *Cache) Save(filePath string) error { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() concurrency := cgroup.AvailableCPUs() return curr.SaveToFileConcurrent(filePath, concurrency) } @@ -311,10 +311,10 @@ func (c *Cache) Stop() { // Reset resets the cache. func (c *Cache) Reset() { var cs fastcache.Stats - prev := c.prev.Load().(*fastcache.Cache) + prev := c.prev.Load() prev.UpdateStats(&cs) prev.Reset() - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.UpdateStats(&cs) updateCacheStatsHistory(&c.csHistory, &cs) curr.Reset() @@ -335,11 +335,11 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) { updateCacheStatsHistory(fcs, &c.csHistory) var cs fastcache.Stats - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.UpdateStats(&cs) updateCacheStats(fcs, &cs) - prev := c.prev.Load().(*fastcache.Cache) + prev := c.prev.Load() cs.Reset() prev.UpdateStats(&cs) updateCacheStats(fcs, &cs) @@ -369,7 +369,7 @@ func updateCacheStatsHistory(dst, src *fastcache.Stats) { // Get appends the found value for the given key to dst and returns the result. func (c *Cache) Get(dst, key []byte) []byte { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() result := curr.Get(dst, key) if len(result) > len(dst) { // Fast path - the entry is found in the current cache. @@ -381,7 +381,7 @@ func (c *Cache) Get(dst, key []byte) []byte { } // Search for the entry in the previous cache. - prev := c.prev.Load().(*fastcache.Cache) + prev := c.prev.Load() result = prev.Get(dst, key) if len(result) <= len(dst) { // Nothing found. @@ -394,14 +394,14 @@ func (c *Cache) Get(dst, key []byte) []byte { // Has verifies whether the cache contains the given key. func (c *Cache) Has(key []byte) bool { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() if curr.Has(key) { return true } if c.loadMode() == whole { return false } - prev := c.prev.Load().(*fastcache.Cache) + prev := c.prev.Load() if !prev.Has(key) { return false } @@ -417,13 +417,13 @@ var tmpBufPool bytesutil.ByteBufferPool // Set sets the given value for the given key. func (c *Cache) Set(key, value []byte) { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.Set(key, value) } // GetBig appends the found value for the given key to dst and returns the result. func (c *Cache) GetBig(dst, key []byte) []byte { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() result := curr.GetBig(dst, key) if len(result) > len(dst) { // Fast path - the entry is found in the current cache. @@ -435,7 +435,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte { } // Search for the entry in the previous cache. - prev := c.prev.Load().(*fastcache.Cache) + prev := c.prev.Load() result = prev.GetBig(dst, key) if len(result) <= len(dst) { // Nothing found. @@ -448,7 +448,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte { // SetBig sets the given value for the given key. func (c *Cache) SetBig(key, value []byte) { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.SetBig(key, value) }