mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-13 13:11:37 +01:00
all: replace atomic.Value with atomic.Pointer[T]
This eliminates the need in .(*T) casting for results obtained from Load() Leave atomic.Value for map, since atomic.Pointer[map[...]...] makes double pointer to map, because map is already a pointer type.
This commit is contained in:
parent
92cd98e366
commit
992c300ce9
@ -93,7 +93,7 @@ func MultitenancyEnabled() bool {
|
||||
}
|
||||
|
||||
// Contains the current relabelConfigs.
|
||||
var allRelabelConfigs atomic.Value
|
||||
var allRelabelConfigs atomic.Pointer[relabelConfigs]
|
||||
|
||||
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
|
||||
// since it may lead to high memory usage due to big number of buffers.
|
||||
@ -346,7 +346,7 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||
}
|
||||
|
||||
var rctx *relabelCtx
|
||||
rcs := allRelabelConfigs.Load().(*relabelConfigs)
|
||||
rcs := allRelabelConfigs.Load()
|
||||
pcsGlobal := rcs.global
|
||||
if pcsGlobal.Len() > 0 || len(labelsGlobal) > 0 {
|
||||
rctx = getRelabelCtx()
|
||||
@ -612,7 +612,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
|
||||
// Apply relabeling
|
||||
var rctx *relabelCtx
|
||||
var v *[]prompbmarshal.TimeSeries
|
||||
rcs := allRelabelConfigs.Load().(*relabelConfigs)
|
||||
rcs := allRelabelConfigs.Load()
|
||||
pcs := rcs.perURL[rwctx.idx]
|
||||
if pcs.Len() > 0 {
|
||||
rctx = getRelabelCtx()
|
||||
|
@ -476,10 +476,10 @@ type storageNodesBucket struct {
|
||||
}
|
||||
|
||||
// storageNodes contains a list of vmstorage node clients.
|
||||
var storageNodes atomic.Value
|
||||
var storageNodes atomic.Pointer[storageNodesBucket]
|
||||
|
||||
func getStorageNodesBucket() *storageNodesBucket {
|
||||
return storageNodes.Load().(*storageNodesBucket)
|
||||
return storageNodes.Load()
|
||||
}
|
||||
|
||||
func setStorageNodesBucket(snb *storageNodesBucket) {
|
||||
|
@ -69,7 +69,7 @@ var (
|
||||
configTimestamp = metrics.NewCounter(`vm_relabel_config_last_reload_success_timestamp_seconds`)
|
||||
)
|
||||
|
||||
var pcsGlobal atomic.Value
|
||||
var pcsGlobal atomic.Pointer[promrelabel.ParsedConfigs]
|
||||
|
||||
// CheckRelabelConfig checks config pointed by -relabelConfig
|
||||
func CheckRelabelConfig() error {
|
||||
@ -90,7 +90,7 @@ func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) {
|
||||
|
||||
// HasRelabeling returns true if there is global relabeling.
|
||||
func HasRelabeling() bool {
|
||||
pcs := pcsGlobal.Load().(*promrelabel.ParsedConfigs)
|
||||
pcs := pcsGlobal.Load()
|
||||
return pcs.Len() > 0 || *usePromCompatibleNaming
|
||||
}
|
||||
|
||||
@ -110,7 +110,7 @@ func (ctx *Ctx) Reset() {
|
||||
//
|
||||
// The returned labels are valid until the next call to ApplyRelabeling.
|
||||
func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label {
|
||||
pcs := pcsGlobal.Load().(*promrelabel.ParsedConfigs)
|
||||
pcs := pcsGlobal.Load()
|
||||
if pcs.Len() == 0 && !*usePromCompatibleNaming {
|
||||
// There are no relabeling rules.
|
||||
return labels
|
||||
|
@ -2689,10 +2689,10 @@ type storageNodesBucket struct {
|
||||
sns []*storageNode
|
||||
}
|
||||
|
||||
var storageNodes atomic.Value
|
||||
var storageNodes atomic.Pointer[storageNodesBucket]
|
||||
|
||||
func getStorageNodesBucket() *storageNodesBucket {
|
||||
return storageNodes.Load().(*storageNodesBucket)
|
||||
return storageNodes.Load()
|
||||
}
|
||||
|
||||
func setStorageNodesBucket(snb *storageNodesBucket) {
|
||||
|
@ -32,14 +32,14 @@ func WritePrometheusMetrics(w io.Writer) {
|
||||
}
|
||||
metricsCacheLock.Unlock()
|
||||
|
||||
bb := metricsCache.Load().(*bytesutil.ByteBuffer)
|
||||
bb := metricsCache.Load()
|
||||
_, _ = w.Write(bb.B)
|
||||
}
|
||||
|
||||
var (
|
||||
metricsCacheLock sync.Mutex
|
||||
metricsCacheLastUpdateTime time.Time
|
||||
metricsCache atomic.Value
|
||||
metricsCache atomic.Pointer[bytesutil.ByteBuffer]
|
||||
)
|
||||
|
||||
func writePrometheusMetrics(w io.Writer) {
|
||||
|
@ -11,7 +11,7 @@ import (
|
||||
// It is safe using the Limiter from concurrent goroutines.
|
||||
type Limiter struct {
|
||||
maxItems int
|
||||
v atomic.Value
|
||||
v atomic.Pointer[limiter]
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
@ -55,7 +55,7 @@ func (l *Limiter) MaxItems() int {
|
||||
|
||||
// CurrentItems return the current number of items registered in l.
|
||||
func (l *Limiter) CurrentItems() int {
|
||||
lm := l.v.Load().(*limiter)
|
||||
lm := l.v.Load()
|
||||
n := atomic.LoadUint64(&lm.currentItems)
|
||||
return int(n)
|
||||
}
|
||||
@ -67,7 +67,7 @@ func (l *Limiter) CurrentItems() int {
|
||||
// True is returned if h is added or already exists in l.
|
||||
// False is returned if h cannot be added to l, since it already has maxItems unique items.
|
||||
func (l *Limiter) Add(h uint64) bool {
|
||||
lm := l.v.Load().(*limiter)
|
||||
lm := l.v.Load()
|
||||
return lm.Add(h)
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,7 @@ var (
|
||||
decoder *zstd.Decoder
|
||||
|
||||
mu sync.Mutex
|
||||
av atomic.Value
|
||||
av atomic.Pointer[registry]
|
||||
)
|
||||
|
||||
type registry map[int]*zstd.Encoder
|
||||
@ -45,7 +45,7 @@ func CompressLevel(dst, src []byte, compressionLevel int) []byte {
|
||||
}
|
||||
|
||||
func getEncoder(compressionLevel int) *zstd.Encoder {
|
||||
r := av.Load().(registry)
|
||||
r := av.Load()
|
||||
e := r[compressionLevel]
|
||||
if e != nil {
|
||||
return e
|
||||
@ -54,7 +54,7 @@ func getEncoder(compressionLevel int) *zstd.Encoder {
|
||||
mu.Lock()
|
||||
// Create the encoder under lock in order to prevent from wasted work
|
||||
// when concurrent goroutines create encoder for the same compressionLevel.
|
||||
r1 := av.Load().(registry)
|
||||
r1 := av.Load()
|
||||
if e = r1[compressionLevel]; e == nil {
|
||||
e = newEncoder(compressionLevel)
|
||||
r2 := make(registry)
|
||||
|
@ -27,7 +27,7 @@ type apiConfig struct {
|
||||
apiPath string
|
||||
|
||||
// labels contains the latest discovered labels.
|
||||
labels atomic.Value
|
||||
labels atomic.Pointer[[]*promutils.Labels]
|
||||
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
@ -38,8 +38,7 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get API config for kuma_sd: %w", err)
|
||||
}
|
||||
v := cfg.labels.Load()
|
||||
pLabels := v.(*[]*promutils.Labels)
|
||||
pLabels := cfg.labels.Load()
|
||||
return *pLabels, nil
|
||||
}
|
||||
|
||||
|
@ -84,18 +84,17 @@ var (
|
||||
PendingScrapeConfigs int32
|
||||
|
||||
// configData contains -promscrape.config data
|
||||
configData atomic.Value
|
||||
configData atomic.Pointer[[]byte]
|
||||
)
|
||||
|
||||
// WriteConfigData writes -promscrape.config contents to w
|
||||
func WriteConfigData(w io.Writer) {
|
||||
v := configData.Load()
|
||||
if v == nil {
|
||||
p := configData.Load()
|
||||
if p == nil {
|
||||
// Nothing to write to w
|
||||
return
|
||||
}
|
||||
b := v.(*[]byte)
|
||||
_, _ = w.Write(*b)
|
||||
_, _ = w.Write(*p)
|
||||
}
|
||||
|
||||
func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
|
||||
|
@ -60,7 +60,7 @@ type Storage struct {
|
||||
// lock file for exclusive access to the storage on the given path.
|
||||
flockF *os.File
|
||||
|
||||
idbCurr atomic.Value
|
||||
idbCurr atomic.Pointer[indexDB]
|
||||
|
||||
tb *table
|
||||
|
||||
@ -81,16 +81,16 @@ type Storage struct {
|
||||
dateMetricIDCache *dateMetricIDCache
|
||||
|
||||
// Fast cache for MetricID values occurred during the current hour.
|
||||
currHourMetricIDs atomic.Value
|
||||
currHourMetricIDs atomic.Pointer[hourMetricIDs]
|
||||
|
||||
// Fast cache for MetricID values occurred during the previous hour.
|
||||
prevHourMetricIDs atomic.Value
|
||||
prevHourMetricIDs atomic.Pointer[hourMetricIDs]
|
||||
|
||||
// Fast cache for pre-populating per-day inverted index for the next day.
|
||||
// This is needed in order to remove CPU usage spikes at 00:00 UTC
|
||||
// due to creation of per-day inverted index for active time series.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 for details.
|
||||
nextDayMetricIDs atomic.Value
|
||||
nextDayMetricIDs atomic.Pointer[byDateMetricIDEntry]
|
||||
|
||||
// Pending MetricID values to be added to currHourMetricIDs.
|
||||
pendingHourEntriesLock sync.Mutex
|
||||
@ -101,7 +101,7 @@ type Storage struct {
|
||||
pendingNextDayMetricIDs *uint64set.Set
|
||||
|
||||
// prefetchedMetricIDs contains metricIDs for pre-fetched metricNames in the prefetchMetricNames function.
|
||||
prefetchedMetricIDs atomic.Value
|
||||
prefetchedMetricIDs atomic.Pointer[uint64set.Set]
|
||||
|
||||
// prefetchedMetricIDsDeadline is used for periodic reset of prefetchedMetricIDs in order to limit its size under high rate of creating new series.
|
||||
prefetchedMetricIDsDeadline uint64
|
||||
@ -129,7 +129,7 @@ type Storage struct {
|
||||
//
|
||||
// It is safe to keep the set in memory even for big number of deleted
|
||||
// metricIDs, since it usually requires 1 bit per deleted metricID.
|
||||
deletedMetricIDs atomic.Value
|
||||
deletedMetricIDs atomic.Pointer[uint64set.Set]
|
||||
deletedMetricIDsUpdateLock sync.Mutex
|
||||
|
||||
isReadOnly uint32
|
||||
@ -283,7 +283,7 @@ func getTSIDCacheSize() int {
|
||||
}
|
||||
|
||||
func (s *Storage) getDeletedMetricIDs() *uint64set.Set {
|
||||
return s.deletedMetricIDs.Load().(*uint64set.Set)
|
||||
return s.deletedMetricIDs.Load()
|
||||
}
|
||||
|
||||
func (s *Storage) setDeletedMetricIDs(dmis *uint64set.Set) {
|
||||
@ -449,7 +449,7 @@ func (s *Storage) DeleteStaleSnapshots(maxAge time.Duration) error {
|
||||
}
|
||||
|
||||
func (s *Storage) idb() *indexDB {
|
||||
return s.idbCurr.Load().(*indexDB)
|
||||
return s.idbCurr.Load()
|
||||
}
|
||||
|
||||
// Metrics contains essential metrics for the Storage.
|
||||
@ -584,8 +584,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
||||
m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount)
|
||||
m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount)
|
||||
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmCurr := s.currHourMetricIDs.Load()
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
hourMetricIDsLen := hmPrev.m.Len()
|
||||
if hmCurr.m.Len() > hourMetricIDsLen {
|
||||
hourMetricIDsLen = hmCurr.m.Len()
|
||||
@ -594,11 +594,11 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
||||
m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes()
|
||||
m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes()
|
||||
|
||||
nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v
|
||||
nextDayMetricIDs := &s.nextDayMetricIDs.Load().v
|
||||
m.NextDayMetricIDCacheSize += uint64(nextDayMetricIDs.Len())
|
||||
m.NextDayMetricIDCacheSizeBytes += nextDayMetricIDs.SizeBytes()
|
||||
|
||||
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
|
||||
prefetchedMetricIDs := s.prefetchedMetricIDs.Load()
|
||||
m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len())
|
||||
m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes())
|
||||
|
||||
@ -812,12 +812,12 @@ func (s *Storage) MustClose() {
|
||||
s.mustSaveCache(s.metricNameCache, "metricID_metricName")
|
||||
s.metricNameCache.Stop()
|
||||
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmCurr := s.currHourMetricIDs.Load()
|
||||
s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids")
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
s.mustSaveHourMetricIDs(hmPrev, "prev_hour_metric_ids")
|
||||
|
||||
nextDayMetricIDs := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry)
|
||||
nextDayMetricIDs := s.nextDayMetricIDs.Load()
|
||||
s.mustSaveNextDayMetricIDs(nextDayMetricIDs)
|
||||
|
||||
// Release lock file.
|
||||
@ -1160,7 +1160,7 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, accountID, project
|
||||
return nil
|
||||
}
|
||||
var metricIDs uint64Sorter
|
||||
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
|
||||
prefetchedMetricIDs := s.prefetchedMetricIDs.Load()
|
||||
for _, metricID := range srcMetricIDs {
|
||||
if prefetchedMetricIDs.Has(metricID) {
|
||||
continue
|
||||
@ -2014,10 +2014,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
||||
prevDate uint64
|
||||
prevMetricID uint64
|
||||
)
|
||||
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hm := s.currHourMetricIDs.Load()
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
hmPrevDate := hmPrev.hour / 24
|
||||
nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v
|
||||
nextDayMetricIDs := &s.nextDayMetricIDs.Load().v
|
||||
ts := fasttime.UnixTimestamp()
|
||||
// Start pre-populating the next per-day inverted index during the last hour of the current day.
|
||||
// pMin linearly increases from 0 to 1 during the last hour of the day.
|
||||
@ -2174,7 +2174,7 @@ type dateMetricIDCache struct {
|
||||
resetsCount uint64
|
||||
|
||||
// Contains immutable map
|
||||
byDate atomic.Value
|
||||
byDate atomic.Pointer[byDateMetricIDMap]
|
||||
|
||||
// Contains mutable map protected by mu
|
||||
byDateMutable *byDateMetricIDMap
|
||||
@ -2204,7 +2204,7 @@ func (dmc *dateMetricIDCache) resetLocked() {
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) EntriesCount() int {
|
||||
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
|
||||
byDate := dmc.byDate.Load()
|
||||
n := 0
|
||||
for _, e := range byDate.m {
|
||||
n += e.v.Len()
|
||||
@ -2213,7 +2213,7 @@ func (dmc *dateMetricIDCache) EntriesCount() int {
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) SizeBytes() uint64 {
|
||||
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
|
||||
byDate := dmc.byDate.Load()
|
||||
n := uint64(0)
|
||||
for _, e := range byDate.m {
|
||||
n += e.v.SizeBytes()
|
||||
@ -2222,7 +2222,7 @@ func (dmc *dateMetricIDCache) SizeBytes() uint64 {
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
|
||||
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
|
||||
byDate := dmc.byDate.Load()
|
||||
v := byDate.get(date)
|
||||
if v.Has(metricID) {
|
||||
// Fast path.
|
||||
@ -2288,7 +2288,7 @@ func (dmc *dateMetricIDCache) syncLocked() {
|
||||
// Nothing to sync.
|
||||
return
|
||||
}
|
||||
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
|
||||
byDate := dmc.byDate.Load()
|
||||
byDateMutable := dmc.byDateMutable
|
||||
for date, e := range byDateMutable.m {
|
||||
v := byDate.get(date)
|
||||
@ -2301,7 +2301,7 @@ func (dmc *dateMetricIDCache) syncLocked() {
|
||||
date: date,
|
||||
v: *v,
|
||||
}
|
||||
if date == byDateMutable.hotEntry.Load().(*byDateMetricIDEntry).date {
|
||||
if date == byDateMutable.hotEntry.Load().date {
|
||||
byDateMutable.hotEntry.Store(dme)
|
||||
}
|
||||
byDateMutable.m[date] = dme
|
||||
@ -2324,7 +2324,7 @@ func (dmc *dateMetricIDCache) syncLocked() {
|
||||
}
|
||||
|
||||
type byDateMetricIDMap struct {
|
||||
hotEntry atomic.Value
|
||||
hotEntry atomic.Pointer[byDateMetricIDEntry]
|
||||
m map[uint64]*byDateMetricIDEntry
|
||||
}
|
||||
|
||||
@ -2337,7 +2337,7 @@ func newByDateMetricIDMap() *byDateMetricIDMap {
|
||||
}
|
||||
|
||||
func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set {
|
||||
hotEntry := dmm.hotEntry.Load().(*byDateMetricIDEntry)
|
||||
hotEntry := dmm.hotEntry.Load()
|
||||
if hotEntry.date == date {
|
||||
// Fast path
|
||||
return &hotEntry.v
|
||||
@ -2369,7 +2369,7 @@ type byDateMetricIDEntry struct {
|
||||
}
|
||||
|
||||
func (s *Storage) updateNextDayMetricIDs(date uint64) {
|
||||
e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry)
|
||||
e := s.nextDayMetricIDs.Load()
|
||||
s.pendingNextDayMetricIDsLock.Lock()
|
||||
pendingMetricIDs := s.pendingNextDayMetricIDs
|
||||
s.pendingNextDayMetricIDs = &uint64set.Set{}
|
||||
@ -2396,7 +2396,7 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) {
|
||||
}
|
||||
|
||||
func (s *Storage) updateCurrHourMetricIDs(hour uint64) {
|
||||
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hm := s.currHourMetricIDs.Load()
|
||||
var newEntries []pendingHourMetricIDEntry
|
||||
s.pendingHourEntriesLock.Lock()
|
||||
if len(s.pendingHourEntries) < cap(s.pendingHourEntries)/2 {
|
||||
|
@ -168,7 +168,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmCurr := s.currHourMetricIDs.Load()
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
@ -180,7 +180,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0)
|
||||
}
|
||||
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
||||
}
|
||||
@ -200,7 +200,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmCurr := s.currHourMetricIDs.Load()
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
@ -214,7 +214,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig)
|
||||
}
|
||||
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
hmEmpty := &hourMetricIDs{}
|
||||
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
||||
@ -260,7 +260,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmCurr := s.currHourMetricIDs.Load()
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
@ -275,7 +275,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected)
|
||||
}
|
||||
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
||||
}
|
||||
@ -317,7 +317,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmCurr := s.currHourMetricIDs.Load()
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
@ -341,7 +341,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected)
|
||||
}
|
||||
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
hmEmpty := &hourMetricIDs{}
|
||||
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
||||
@ -385,7 +385,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmCurr := s.currHourMetricIDs.Load()
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
hour = uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
@ -409,7 +409,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected)
|
||||
}
|
||||
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
hmEmpty := &hourMetricIDs{}
|
||||
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
||||
@ -436,7 +436,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
}
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmCurr := s.currHourMetricIDs.Load()
|
||||
if hmCurr.hour != hour {
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
@ -447,7 +447,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) {
|
||||
t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected)
|
||||
}
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
||||
}
|
||||
|
@ -30,8 +30,8 @@ const (
|
||||
// The cache evicts inactive entries after the given expireDuration.
|
||||
// Recently accessed entries survive expireDuration.
|
||||
type Cache struct {
|
||||
curr atomic.Value
|
||||
prev atomic.Value
|
||||
curr atomic.Pointer[fastcache.Cache]
|
||||
prev atomic.Pointer[fastcache.Cache]
|
||||
|
||||
// csHistory holds cache stats history
|
||||
csHistory fastcache.Stats
|
||||
@ -148,8 +148,8 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
|
||||
return
|
||||
}
|
||||
// Reset prev cache and swap it with the curr cache.
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
prev := c.prev.Load()
|
||||
curr := c.curr.Load()
|
||||
c.prev.Store(curr)
|
||||
var cs fastcache.Stats
|
||||
prev.UpdateStats(&cs)
|
||||
@ -188,8 +188,8 @@ func (c *Cache) prevCacheWatcher() {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
prev := c.prev.Load()
|
||||
curr := c.curr.Load()
|
||||
var csCurr, csPrev fastcache.Stats
|
||||
curr.UpdateStats(&csCurr)
|
||||
prev.UpdateStats(&csPrev)
|
||||
@ -232,7 +232,7 @@ func (c *Cache) cacheSizeWatcher() {
|
||||
continue
|
||||
}
|
||||
var cs fastcache.Stats
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load()
|
||||
curr.UpdateStats(&cs)
|
||||
if cs.BytesSize >= uint64(0.9*float64(cs.MaxBytesSize)) {
|
||||
maxBytesSize = cs.MaxBytesSize
|
||||
@ -254,8 +254,8 @@ func (c *Cache) cacheSizeWatcher() {
|
||||
|
||||
c.mu.Lock()
|
||||
c.setMode(switching)
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
prev := c.prev.Load()
|
||||
curr := c.curr.Load()
|
||||
c.prev.Store(curr)
|
||||
var cs fastcache.Stats
|
||||
prev.UpdateStats(&cs)
|
||||
@ -273,7 +273,7 @@ func (c *Cache) cacheSizeWatcher() {
|
||||
case <-t.C:
|
||||
}
|
||||
var cs fastcache.Stats
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load()
|
||||
curr.UpdateStats(&cs)
|
||||
if cs.BytesSize >= maxBytesSize {
|
||||
break
|
||||
@ -282,7 +282,7 @@ func (c *Cache) cacheSizeWatcher() {
|
||||
|
||||
c.mu.Lock()
|
||||
c.setMode(whole)
|
||||
prev = c.prev.Load().(*fastcache.Cache)
|
||||
prev = c.prev.Load()
|
||||
c.prev.Store(fastcache.New(1024))
|
||||
cs.Reset()
|
||||
prev.UpdateStats(&cs)
|
||||
@ -293,7 +293,7 @@ func (c *Cache) cacheSizeWatcher() {
|
||||
|
||||
// Save saves the cache to filePath.
|
||||
func (c *Cache) Save(filePath string) error {
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load()
|
||||
concurrency := cgroup.AvailableCPUs()
|
||||
return curr.SaveToFileConcurrent(filePath, concurrency)
|
||||
}
|
||||
@ -311,10 +311,10 @@ func (c *Cache) Stop() {
|
||||
// Reset resets the cache.
|
||||
func (c *Cache) Reset() {
|
||||
var cs fastcache.Stats
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
prev := c.prev.Load()
|
||||
prev.UpdateStats(&cs)
|
||||
prev.Reset()
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load()
|
||||
curr.UpdateStats(&cs)
|
||||
updateCacheStatsHistory(&c.csHistory, &cs)
|
||||
curr.Reset()
|
||||
@ -335,11 +335,11 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) {
|
||||
updateCacheStatsHistory(fcs, &c.csHistory)
|
||||
|
||||
var cs fastcache.Stats
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load()
|
||||
curr.UpdateStats(&cs)
|
||||
updateCacheStats(fcs, &cs)
|
||||
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
prev := c.prev.Load()
|
||||
cs.Reset()
|
||||
prev.UpdateStats(&cs)
|
||||
updateCacheStats(fcs, &cs)
|
||||
@ -369,7 +369,7 @@ func updateCacheStatsHistory(dst, src *fastcache.Stats) {
|
||||
|
||||
// Get appends the found value for the given key to dst and returns the result.
|
||||
func (c *Cache) Get(dst, key []byte) []byte {
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load()
|
||||
result := curr.Get(dst, key)
|
||||
if len(result) > len(dst) {
|
||||
// Fast path - the entry is found in the current cache.
|
||||
@ -381,7 +381,7 @@ func (c *Cache) Get(dst, key []byte) []byte {
|
||||
}
|
||||
|
||||
// Search for the entry in the previous cache.
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
prev := c.prev.Load()
|
||||
result = prev.Get(dst, key)
|
||||
if len(result) <= len(dst) {
|
||||
// Nothing found.
|
||||
@ -394,14 +394,14 @@ func (c *Cache) Get(dst, key []byte) []byte {
|
||||
|
||||
// Has verifies whether the cache contains the given key.
|
||||
func (c *Cache) Has(key []byte) bool {
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load()
|
||||
if curr.Has(key) {
|
||||
return true
|
||||
}
|
||||
if c.loadMode() == whole {
|
||||
return false
|
||||
}
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
prev := c.prev.Load()
|
||||
if !prev.Has(key) {
|
||||
return false
|
||||
}
|
||||
@ -417,13 +417,13 @@ var tmpBufPool bytesutil.ByteBufferPool
|
||||
|
||||
// Set sets the given value for the given key.
|
||||
func (c *Cache) Set(key, value []byte) {
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load()
|
||||
curr.Set(key, value)
|
||||
}
|
||||
|
||||
// GetBig appends the found value for the given key to dst and returns the result.
|
||||
func (c *Cache) GetBig(dst, key []byte) []byte {
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load()
|
||||
result := curr.GetBig(dst, key)
|
||||
if len(result) > len(dst) {
|
||||
// Fast path - the entry is found in the current cache.
|
||||
@ -435,7 +435,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
|
||||
}
|
||||
|
||||
// Search for the entry in the previous cache.
|
||||
prev := c.prev.Load().(*fastcache.Cache)
|
||||
prev := c.prev.Load()
|
||||
result = prev.GetBig(dst, key)
|
||||
if len(result) <= len(dst) {
|
||||
// Nothing found.
|
||||
@ -448,7 +448,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte {
|
||||
|
||||
// SetBig sets the given value for the given key.
|
||||
func (c *Cache) SetBig(key, value []byte) {
|
||||
curr := c.curr.Load().(*fastcache.Cache)
|
||||
curr := c.curr.Load()
|
||||
curr.SetBig(key, value)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user